from __future__ import annotations import sqlite3 from pathlib import Path import pytest from peewee import IntegrityError from repub.model import ( AppSetting, Job, Source, database, initialize_database, load_feed_url, load_max_concurrent_jobs, load_settings_form, resolve_database_path, save_setting, schema_paths, ) def test_resolve_database_path_defaults_to_republisher_db( monkeypatch: pytest.MonkeyPatch, tmp_path: Path ) -> None: monkeypatch.chdir(tmp_path) monkeypatch.delenv("REPUBLISHER_DB_PATH", raising=False) assert resolve_database_path() == tmp_path / "republisher.db" def test_resolve_database_path_prefers_environment_variable( monkeypatch: pytest.MonkeyPatch, tmp_path: Path ) -> None: db_path = tmp_path / "env-configured.db" monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) assert resolve_database_path() == db_path def test_initialize_database_bootstraps_schema_from_sql_files(tmp_path: Path) -> None: db_path = tmp_path / "bootstrap.db" initialize_database(db_path) connection = sqlite3.connect(db_path) try: table_names = { row[0] for row in connection.execute( """ SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT LIKE 'sqlite_%' """ ) } assert table_names == { "app_setting", "job", "job_execution", "source", "source_feed", "source_pangea", } defaults = { row[1]: row[4] for row in connection.execute("PRAGMA table_info('job')") } assert defaults["convert_images"] == "1" assert defaults["convert_video"] == "1" finally: connection.close() def test_initialize_database_applies_newer_sql_files_to_existing_databases( tmp_path: Path, ) -> None: db_path = tmp_path / "existing.db" connection = sqlite3.connect(db_path) try: connection.executescript(schema_paths()[0].read_text(encoding="utf-8")) finally: connection.close() initialize_database(db_path) connection = sqlite3.connect(db_path) try: table_names = { row[0] for row in connection.execute( """ SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT LIKE 'sqlite_%' """ ) } assert "app_setting" in table_names job_columns = {row[1] for row in connection.execute("PRAGMA table_info('job')")} assert "convert_images" in job_columns assert "convert_video" in job_columns finally: connection.close() def test_initialize_database_configures_sqlite_pragmas(tmp_path: Path) -> None: db_path = tmp_path / "pragmas.db" initialize_database(db_path) database.connect(reuse_if_open=True) try: pragma_values = { "cache_size": database.execute_sql("PRAGMA cache_size").fetchone()[0], "page_size": database.execute_sql("PRAGMA page_size").fetchone()[0], "journal_mode": database.execute_sql("PRAGMA journal_mode").fetchone()[0], "synchronous": database.execute_sql("PRAGMA synchronous").fetchone()[0], "temp_store": database.execute_sql("PRAGMA temp_store").fetchone()[0], "foreign_keys": database.execute_sql("PRAGMA foreign_keys").fetchone()[0], "busy_timeout": database.execute_sql("PRAGMA busy_timeout").fetchone()[0], } assert pragma_values == { "cache_size": 15625, "page_size": 4096, "journal_mode": "wal", "synchronous": 1, "temp_store": 2, "foreign_keys": 1, "busy_timeout": 5000, } finally: database.close() def test_initialize_database_creates_scheduler_and_execution_indexes( tmp_path: Path, ) -> None: db_path = tmp_path / "indexes.db" initialize_database(db_path) connection = sqlite3.connect(db_path) try: index_names = { row[0] for row in connection.execute( """ SELECT name FROM sqlite_master WHERE type = 'index' AND name IN ( 'job_enabled_idx', 'job_execution_job_created_at_idx', 'job_execution_status_started_at_idx', 'job_execution_status_ended_at_idx' ) """ ) } assert index_names == { "job_enabled_idx", "job_execution_job_created_at_idx", "job_execution_status_started_at_idx", "job_execution_status_ended_at_idx", } finally: connection.close() def test_initialize_database_creates_run_queue_indexes(tmp_path: Path) -> None: db_path = tmp_path / "queue-indexes.db" initialize_database(db_path) connection = sqlite3.connect(db_path) try: indexes = { row[0]: row[1] for row in connection.execute( """ SELECT name, sql FROM sqlite_master WHERE type = 'index' AND name IN ( 'job_execution_pending_created_at_idx', 'job_execution_pending_unique_job_idx' ) """ ) } assert set(indexes) == { "job_execution_pending_created_at_idx", "job_execution_pending_unique_job_idx", } assert indexes["job_execution_pending_unique_job_idx"] is not None assert ( "WHERE running_status = 0" in indexes["job_execution_pending_unique_job_idx"] ) finally: connection.close() def test_job_table_allows_exactly_one_job_per_source(tmp_path: Path) -> None: initialize_database(tmp_path / "jobs.db") source = Source.create( name="Guardian feed mirror", slug="guardian-feed", source_type="feed", ) Job.create( source=source, enabled=True, spider_arguments="", cron_minute="15", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", ) with pytest.raises(IntegrityError): Job.create( source=source, enabled=True, spider_arguments="language=en", cron_minute="30", cron_hour="*", cron_day_of_month="*", cron_day_of_week="*", cron_month="*", ) def test_load_max_concurrent_jobs_defaults_to_one(tmp_path: Path) -> None: initialize_database(tmp_path / "settings-defaults.db") assert load_max_concurrent_jobs() == 1 def test_save_setting_persists_json_value(tmp_path: Path) -> None: initialize_database(tmp_path / "settings-roundtrip.db") save_setting("max_concurrent_jobs", 4) row = AppSetting.get(AppSetting.key == "max_concurrent_jobs") assert row.value == "4" assert load_max_concurrent_jobs() == 4 def test_load_settings_form_includes_feed_url(tmp_path: Path) -> None: initialize_database(tmp_path / "settings-form.db") save_setting("feed_url", "https://mirror.example") assert load_feed_url() == "https://mirror.example" assert load_settings_form() == { "max_concurrent_jobs": 1, "feed_url": "https://mirror.example", }