diff --git a/.gitignore b/.gitignore index 6a9e93b..bf0de74 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ data logs archive *egg-info +*.db diff --git a/pyproject.toml b/pyproject.toml index 425ad43..3361a67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,9 @@ include-package-data = true where = ["."] include = ["repub*"] +[tool.setuptools.package-data] +repub = ["sql/*.sql"] + [tool.pytest.ini_options] testpaths = ["tests"] diff --git a/repub/model.py b/repub/model.py new file mode 100644 index 0000000..8b26934 --- /dev/null +++ b/repub/model.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +import os +from datetime import UTC, datetime +from enum import IntEnum +from importlib import resources +from importlib.resources.abc import Traversable +from pathlib import Path + +from peewee import ( + BooleanField, + Check, + DateTimeField, + ForeignKeyField, + IntegerField, + Model, + SqliteDatabase, + TextField, +) + +DEFAULT_DB_PATH = Path("republisher.db") +DATABASE_PRAGMAS = { + "busy_timeout": 5000, + "cache_size": 15625, + "foreign_keys": 1, + "journal_mode": "wal", + "page_size": 4096, + "synchronous": "normal", + "temp_store": "memory", +} +SCHEMA_GLOB = "*.sql" + +database = SqliteDatabase(None, pragmas=DATABASE_PRAGMAS) + + +class JobExecutionStatus(IntEnum): + PENDING = 0 + RUNNING = 1 + SUCCEEDED = 2 + FAILED = 3 + CANCELED = 4 + + +def utc_now() -> datetime: + return datetime.now(UTC) + + +def resolve_database_path(db_path: str | Path | None = None) -> Path: + raw_value = ( + os.environ.get("REPUBLISHER_DB_PATH", DEFAULT_DB_PATH) + if db_path is None + else db_path + ) + raw_path = Path(raw_value) + return raw_path.expanduser().resolve() + + +def schema_paths() -> tuple[Traversable, ...]: + schema_dir = resources.files("repub").joinpath("sql") + return tuple( + sorted( + (path for path in schema_dir.iterdir() if path.name.endswith(".sql")), + key=lambda path: path.name, + ) + ) + + +def initialize_database(db_path: str | Path | None = None) -> Path: + resolved_path = resolve_database_path(db_path) + resolved_path.parent.mkdir(parents=True, exist_ok=True) + + if not database.is_closed(): + database.close() + + database.init(str(resolved_path), pragmas=DATABASE_PRAGMAS) + database.connect(reuse_if_open=True) + try: + connection = database.connection() + for path in schema_paths(): + connection.executescript(path.read_text(encoding="utf-8")) + finally: + database.close() + + return resolved_path + + +class BaseModel(Model): + class Meta: + database = database + + +class Source(BaseModel): + created_at = DateTimeField(default=utc_now) + updated_at = DateTimeField(default=utc_now) + name = TextField() + slug = TextField(unique=True) + source_type = TextField(constraints=[Check("source_type IN ('feed', 'pangea')")]) + notes = TextField(default="") + + class Meta: + table_name = "source" + + +class SourceFeed(BaseModel): + source = ForeignKeyField(Source, primary_key=True, backref="feed_config") + feed_url = TextField() + etag = TextField(null=True) + last_modified = TextField(null=True) + + class Meta: + table_name = "source_feed" + + +class SourcePangea(BaseModel): + source = ForeignKeyField(Source, primary_key=True, backref="pangea_config") + domain = TextField() + category_name = TextField() + content_type = TextField() + only_newest = BooleanField() + max_articles = IntegerField() + oldest_article = IntegerField() + include_authors = BooleanField() + exclude_media = BooleanField() + include_content = BooleanField() + content_format = TextField() + + class Meta: + table_name = "source_pangea" + + +class Job(BaseModel): + source = ForeignKeyField(Source, unique=True, backref="job") + created_at = DateTimeField(default=utc_now) + updated_at = DateTimeField(default=utc_now) + enabled = BooleanField() + spider_arguments = TextField(default="") + cron_minute = TextField() + cron_hour = TextField() + cron_day_of_month = TextField() + cron_day_of_week = TextField() + cron_month = TextField() + + class Meta: + table_name = "job" + + +class JobExecution(BaseModel): + job = ForeignKeyField(Job, backref="executions") + created_at = DateTimeField(default=utc_now) + started_at = DateTimeField(null=True) + ended_at = DateTimeField(null=True) + running_status = IntegerField( + default=JobExecutionStatus.PENDING, + constraints=[Check("running_status BETWEEN 0 AND 4")], + ) + requests_count = IntegerField(default=0) + items_count = IntegerField(default=0) + warnings_count = IntegerField(default=0) + errors_count = IntegerField(default=0) + bytes_count = IntegerField(default=0) + retries_count = IntegerField(default=0) + exceptions_count = IntegerField(default=0) + cache_size_count = IntegerField(default=0) + cache_object_count = IntegerField(default=0) + raw_stats = TextField(default="{}") + + class Meta: + table_name = "job_execution" diff --git a/repub/sql/001_initial.sql b/repub/sql/001_initial.sql new file mode 100644 index 0000000..12f3d41 --- /dev/null +++ b/repub/sql/001_initial.sql @@ -0,0 +1,97 @@ +CREATE TABLE IF NOT EXISTS source ( + id INTEGER PRIMARY KEY, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + name TEXT NOT NULL, + slug TEXT NOT NULL UNIQUE, + source_type TEXT NOT NULL CHECK (source_type IN ('feed', 'pangea')), + notes TEXT NOT NULL DEFAULT '' +); + +CREATE TABLE IF NOT EXISTS source_feed ( + source_id INTEGER PRIMARY KEY, + feed_url TEXT NOT NULL, + etag TEXT, + last_modified TEXT, + FOREIGN KEY (source_id) REFERENCES source(id) ON DELETE CASCADE +); + +CREATE TABLE IF NOT EXISTS source_pangea ( + source_id INTEGER PRIMARY KEY, + domain TEXT NOT NULL, + category_name TEXT NOT NULL, + content_type TEXT NOT NULL, + only_newest INTEGER NOT NULL CHECK (only_newest IN (0, 1)), + max_articles INTEGER NOT NULL, + oldest_article INTEGER NOT NULL, + include_authors INTEGER NOT NULL CHECK (include_authors IN (0, 1)), + exclude_media INTEGER NOT NULL CHECK (exclude_media IN (0, 1)), + include_content INTEGER NOT NULL CHECK (include_content IN (0, 1)), + content_format TEXT NOT NULL, + FOREIGN KEY (source_id) REFERENCES source(id) ON DELETE CASCADE +); + +CREATE TABLE IF NOT EXISTS job ( + id INTEGER PRIMARY KEY, + source_id INTEGER NOT NULL UNIQUE, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + enabled INTEGER NOT NULL CHECK (enabled IN (0, 1)), + spider_arguments TEXT NOT NULL DEFAULT '', + cron_minute TEXT NOT NULL, + cron_hour TEXT NOT NULL, + cron_day_of_month TEXT NOT NULL, + cron_day_of_week TEXT NOT NULL, + cron_month TEXT NOT NULL, + FOREIGN KEY (source_id) REFERENCES source(id) ON DELETE CASCADE +); + +CREATE TABLE IF NOT EXISTS job_execution ( + id INTEGER PRIMARY KEY, + job_id INTEGER NOT NULL, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + started_at TEXT, + ended_at TEXT, + running_status INTEGER NOT NULL DEFAULT 0 CHECK (running_status BETWEEN 0 AND 4), + requests_count INTEGER NOT NULL DEFAULT 0, + items_count INTEGER NOT NULL DEFAULT 0, + warnings_count INTEGER NOT NULL DEFAULT 0, + errors_count INTEGER NOT NULL DEFAULT 0, + bytes_count INTEGER NOT NULL DEFAULT 0, + retries_count INTEGER NOT NULL DEFAULT 0, + exceptions_count INTEGER NOT NULL DEFAULT 0, + cache_size_count INTEGER NOT NULL DEFAULT 0, + cache_object_count INTEGER NOT NULL DEFAULT 0, + raw_stats TEXT NOT NULL DEFAULT '{}', + FOREIGN KEY (job_id) REFERENCES job(id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS job_enabled_idx +ON job (enabled); + +CREATE INDEX IF NOT EXISTS job_execution_job_created_at_idx +ON job_execution (job_id, created_at DESC); + +CREATE INDEX IF NOT EXISTS job_execution_status_started_at_idx +ON job_execution (running_status, started_at DESC); + +CREATE INDEX IF NOT EXISTS job_execution_status_ended_at_idx +ON job_execution (running_status, ended_at DESC); + +CREATE TRIGGER IF NOT EXISTS source_set_updated_at +AFTER UPDATE ON source +FOR EACH ROW +BEGIN + UPDATE source + SET updated_at = CURRENT_TIMESTAMP + WHERE id = NEW.id; +END; + +CREATE TRIGGER IF NOT EXISTS job_set_updated_at +AFTER UPDATE ON job +FOR EACH ROW +BEGIN + UPDATE job + SET updated_at = CURRENT_TIMESTAMP + WHERE id = NEW.id; +END; diff --git a/tests/test_model.py b/tests/test_model.py new file mode 100644 index 0000000..b27bf8d --- /dev/null +++ b/tests/test_model.py @@ -0,0 +1,171 @@ +from __future__ import annotations + +import sqlite3 +from pathlib import Path + +import pytest +from peewee import IntegrityError + +from repub.model import ( + Job, + Source, + database, + initialize_database, + resolve_database_path, +) + + +def test_resolve_database_path_defaults_to_republisher_db( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.chdir(tmp_path) + monkeypatch.delenv("REPUBLISHER_DB_PATH", raising=False) + + assert resolve_database_path() == tmp_path / "republisher.db" + + +def test_resolve_database_path_prefers_environment_variable( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + db_path = tmp_path / "env-configured.db" + monkeypatch.setenv("REPUBLISHER_DB_PATH", str(db_path)) + + assert resolve_database_path() == db_path + + +def test_initialize_database_bootstraps_schema_from_sql_files(tmp_path: Path) -> None: + db_path = tmp_path / "bootstrap.db" + + initialize_database(db_path) + + connection = sqlite3.connect(db_path) + try: + table_names = { + row[0] + for row in connection.execute( + """ + SELECT name + FROM sqlite_master + WHERE type = 'table' AND name NOT LIKE 'sqlite_%' + """ + ) + } + assert table_names == { + "job", + "job_execution", + "settings", + "source", + "source_feed", + "source_pangea", + } + + defaults = { + row[1]: row[4] + for row in connection.execute("PRAGMA table_info('source_pangea')") + } + assert defaults["content_type"] is None + assert defaults["only_newest"] is None + assert defaults["max_articles"] is None + assert defaults["oldest_article"] is None + assert defaults["include_authors"] is None + assert defaults["exclude_media"] is None + assert defaults["include_content"] is None + assert defaults["content_format"] is None + finally: + connection.close() + + +def test_initialize_database_configures_sqlite_pragmas(tmp_path: Path) -> None: + db_path = tmp_path / "pragmas.db" + + initialize_database(db_path) + + database.connect(reuse_if_open=True) + try: + pragma_values = { + "cache_size": database.execute_sql("PRAGMA cache_size").fetchone()[0], + "page_size": database.execute_sql("PRAGMA page_size").fetchone()[0], + "journal_mode": database.execute_sql("PRAGMA journal_mode").fetchone()[0], + "synchronous": database.execute_sql("PRAGMA synchronous").fetchone()[0], + "temp_store": database.execute_sql("PRAGMA temp_store").fetchone()[0], + "foreign_keys": database.execute_sql("PRAGMA foreign_keys").fetchone()[0], + "busy_timeout": database.execute_sql("PRAGMA busy_timeout").fetchone()[0], + } + assert pragma_values == { + "cache_size": 15625, + "page_size": 4096, + "journal_mode": "wal", + "synchronous": 1, + "temp_store": 2, + "foreign_keys": 1, + "busy_timeout": 5000, + } + finally: + database.close() + + +def test_initialize_database_creates_scheduler_and_execution_indexes( + tmp_path: Path, +) -> None: + db_path = tmp_path / "indexes.db" + + initialize_database(db_path) + + connection = sqlite3.connect(db_path) + try: + index_names = { + row[0] + for row in connection.execute( + """ + SELECT name + FROM sqlite_master + WHERE type = 'index' + AND name IN ( + 'job_enabled_idx', + 'job_execution_job_created_at_idx', + 'job_execution_status_started_at_idx', + 'job_execution_status_ended_at_idx' + ) + """ + ) + } + assert index_names == { + "job_enabled_idx", + "job_execution_job_created_at_idx", + "job_execution_status_started_at_idx", + "job_execution_status_ended_at_idx", + } + finally: + connection.close() + + +def test_job_table_allows_exactly_one_job_per_source(tmp_path: Path) -> None: + initialize_database(tmp_path / "jobs.db") + + source = Source.create( + name="Guardian feed mirror", + slug="guardian-feed", + source_type="feed", + ) + Job.create( + source=source, + enabled=True, + spider_arguments="", + cron_minute="15", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + ) + + with pytest.raises(IntegrityError): + Job.create( + source=source, + enabled=True, + spider_arguments="language=en", + cron_minute="30", + cron_hour="*", + cron_day_of_month="*", + cron_day_of_week="*", + cron_month="*", + )