from __future__ import annotations import os from datetime import UTC, datetime from enum import IntEnum from importlib import resources from importlib.resources.abc import Traversable from pathlib import Path from peewee import ( BooleanField, Check, DateTimeField, ForeignKeyField, IntegerField, Model, SqliteDatabase, TextField, ) DEFAULT_DB_PATH = Path("republisher.db") DATABASE_PRAGMAS = { "busy_timeout": 5000, "cache_size": 15625, "foreign_keys": 1, "journal_mode": "wal", "page_size": 4096, "synchronous": "normal", "temp_store": "memory", } SCHEMA_GLOB = "*.sql" database = SqliteDatabase(None, pragmas=DATABASE_PRAGMAS) class JobExecutionStatus(IntEnum): PENDING = 0 RUNNING = 1 SUCCEEDED = 2 FAILED = 3 CANCELED = 4 def utc_now() -> datetime: return datetime.now(UTC) def resolve_database_path(db_path: str | Path | None = None) -> Path: raw_value = ( os.environ.get("REPUBLISHER_DB_PATH", DEFAULT_DB_PATH) if db_path is None else db_path ) raw_path = Path(raw_value) return raw_path.expanduser().resolve() def schema_paths() -> tuple[Traversable, ...]: schema_dir = resources.files("repub").joinpath("sql") return tuple( sorted( (path for path in schema_dir.iterdir() if path.name.endswith(".sql")), key=lambda path: path.name, ) ) def initialize_database(db_path: str | Path | None = None) -> Path: resolved_path = resolve_database_path(db_path) resolved_path.parent.mkdir(parents=True, exist_ok=True) if not database.is_closed(): database.close() database.init(str(resolved_path), pragmas=DATABASE_PRAGMAS) database.connect(reuse_if_open=True) try: connection = database.connection() for path in schema_paths(): connection.executescript(path.read_text(encoding="utf-8")) finally: database.close() return resolved_path def source_slug_exists(slug: str) -> bool: with database.connection_context(): return Source.select().where(Source.slug == slug).exists() def load_source_form(slug: str) -> dict[str, object] | None: with database.connection_context(): source = Source.get_or_none(Source.slug == slug) if source is None: return None job = Job.get(Job.source == source) form_data: dict[str, object] = { "name": source.name, "slug": source.slug, "source_type": source.source_type, "notes": source.notes, "spider_arguments": job.spider_arguments, "enabled": job.enabled, "cron_minute": job.cron_minute, "cron_hour": job.cron_hour, "cron_day_of_month": job.cron_day_of_month, "cron_day_of_week": job.cron_day_of_week, "cron_month": job.cron_month, "feed_url": "", "pangea_domain": "", "pangea_category": "", "content_format": "MOBILE_3", "content_type": "articles", "max_articles": "10", "oldest_article": "3", "only_newest": True, "include_authors": True, "exclude_media": False, "include_content": True, } if source.source_type == "feed": feed = SourceFeed.get(SourceFeed.source == source) form_data["feed_url"] = feed.feed_url else: pangea = SourcePangea.get(SourcePangea.source == source) form_data.update( { "pangea_domain": pangea.domain, "pangea_category": pangea.category_name, "content_format": pangea.content_format, "content_type": pangea.content_type, "max_articles": str(pangea.max_articles), "oldest_article": str(pangea.oldest_article), "only_newest": pangea.only_newest, "include_authors": pangea.include_authors, "exclude_media": pangea.exclude_media, "include_content": pangea.include_content, } ) return form_data def create_source( *, name: str, slug: str, source_type: str, notes: str, spider_arguments: str, enabled: bool, cron_minute: str, cron_hour: str, cron_day_of_month: str, cron_day_of_week: str, cron_month: str, feed_url: str = "", pangea_domain: str = "", pangea_category: str = "", content_type: str = "", only_newest: bool = True, max_articles: int | None = None, oldest_article: int | None = None, include_authors: bool = True, exclude_media: bool = False, include_content: bool = True, content_format: str = "", ) -> Source: with database.connection_context(): with database.atomic(): source = Source.create( name=name, slug=slug, source_type=source_type, notes=notes, ) if source_type == "feed": SourceFeed.create( source=source, feed_url=feed_url, ) else: SourcePangea.create( source=source, domain=pangea_domain, category_name=pangea_category, content_type=content_type, only_newest=only_newest, max_articles=max_articles, oldest_article=oldest_article, include_authors=include_authors, exclude_media=exclude_media, include_content=include_content, content_format=content_format, ) Job.create( source=source, enabled=enabled, spider_arguments=spider_arguments, cron_minute=cron_minute, cron_hour=cron_hour, cron_day_of_month=cron_day_of_month, cron_day_of_week=cron_day_of_week, cron_month=cron_month, ) return source def update_source( source_slug: str, *, name: str, slug: str, source_type: str, notes: str, spider_arguments: str, enabled: bool, cron_minute: str, cron_hour: str, cron_day_of_month: str, cron_day_of_week: str, cron_month: str, feed_url: str = "", pangea_domain: str = "", pangea_category: str = "", content_type: str = "", only_newest: bool = True, max_articles: int | None = None, oldest_article: int | None = None, include_authors: bool = True, exclude_media: bool = False, include_content: bool = True, content_format: str = "", ) -> Source | None: with database.connection_context(): with database.atomic(): source = Source.get_or_none(Source.slug == source_slug) if source is None: return None source.name = name source.notes = notes source.source_type = source_type source.save() job = Job.get(Job.source == source) job.enabled = enabled job.spider_arguments = spider_arguments job.cron_minute = cron_minute job.cron_hour = cron_hour job.cron_day_of_month = cron_day_of_month job.cron_day_of_week = cron_day_of_week job.cron_month = cron_month job.save() if source_type == "feed": SourcePangea.delete().where(SourcePangea.source == source).execute() feed = SourceFeed.get_or_none(SourceFeed.source == source) if feed is None: SourceFeed.create(source=source, feed_url=feed_url) else: feed.feed_url = feed_url feed.save() else: SourceFeed.delete().where(SourceFeed.source == source).execute() pangea = SourcePangea.get_or_none(SourcePangea.source == source) if pangea is None: SourcePangea.create( source=source, domain=pangea_domain, category_name=pangea_category, content_type=content_type, only_newest=only_newest, max_articles=max_articles, oldest_article=oldest_article, include_authors=include_authors, exclude_media=exclude_media, include_content=include_content, content_format=content_format, ) else: pangea.domain = pangea_domain pangea.category_name = pangea_category pangea.content_type = content_type pangea.only_newest = only_newest pangea.max_articles = max_articles pangea.oldest_article = oldest_article pangea.include_authors = include_authors pangea.exclude_media = exclude_media pangea.include_content = include_content pangea.content_format = content_format pangea.save() return source def load_sources() -> tuple[dict[str, object], ...]: with database.connection_context(): sources = tuple(Source.select().order_by(Source.created_at.desc())) source_ids = tuple(int(source.get_id()) for source in sources) if not source_ids: return () jobs = { job.source_id: job for job in Job.select().where(Job.source.in_(source_ids)) } feed_configs = { config.source_id: config for config in SourceFeed.select().where(SourceFeed.source.in_(source_ids)) } pangea_configs = { config.source_id: config for config in SourcePangea.select().where( SourcePangea.source.in_(source_ids) ) } return tuple( _project_source(source, jobs, feed_configs, pangea_configs) for source in sources ) def _project_source( source: "Source", jobs: dict[int, "Job"], feed_configs: dict[int, "SourceFeed"], pangea_configs: dict[int, "SourcePangea"], ) -> dict[str, object]: source_id = int(source.get_id()) job = jobs[source_id] if source.source_type == "feed": upstream = feed_configs[source_id].feed_url source_type = "Feed" else: pangea = pangea_configs[source_id] upstream = f"{pangea.domain} / {pangea.category_name}" source_type = "Pangea" return { "name": source.name, "slug": source.slug, "source_type": source_type, "upstream": upstream, "schedule": ( f"cron: {job.cron_minute} {job.cron_hour} {job.cron_day_of_month} " f"{job.cron_month} {job.cron_day_of_week}" ), "last_run": "Never run", "state": "Enabled" if job.enabled else "Disabled", "state_tone": "scheduled" if job.enabled else "idle", } class BaseModel(Model): class Meta: database = database class Source(BaseModel): created_at = DateTimeField(default=utc_now) updated_at = DateTimeField(default=utc_now) name = TextField() slug = TextField(unique=True) source_type = TextField(constraints=[Check("source_type IN ('feed', 'pangea')")]) notes = TextField(default="") class Meta: table_name = "source" class SourceFeed(BaseModel): source = ForeignKeyField(Source, primary_key=True, backref="feed_config") feed_url = TextField() etag = TextField(null=True) last_modified = TextField(null=True) class Meta: table_name = "source_feed" class SourcePangea(BaseModel): source = ForeignKeyField(Source, primary_key=True, backref="pangea_config") domain = TextField() category_name = TextField() content_type = TextField() only_newest = BooleanField() max_articles = IntegerField() oldest_article = IntegerField() include_authors = BooleanField() exclude_media = BooleanField() include_content = BooleanField() content_format = TextField() class Meta: table_name = "source_pangea" class Job(BaseModel): source = ForeignKeyField(Source, unique=True, backref="job") created_at = DateTimeField(default=utc_now) updated_at = DateTimeField(default=utc_now) enabled = BooleanField() spider_arguments = TextField(default="") cron_minute = TextField() cron_hour = TextField() cron_day_of_month = TextField() cron_day_of_week = TextField() cron_month = TextField() class Meta: table_name = "job" class JobExecution(BaseModel): job = ForeignKeyField(Job, backref="executions") created_at = DateTimeField(default=utc_now) started_at = DateTimeField(null=True) ended_at = DateTimeField(null=True) running_status = IntegerField( default=JobExecutionStatus.PENDING, constraints=[Check("running_status BETWEEN 0 AND 4")], ) requests_count = IntegerField(default=0) items_count = IntegerField(default=0) warnings_count = IntegerField(default=0) errors_count = IntegerField(default=0) bytes_count = IntegerField(default=0) retries_count = IntegerField(default=0) exceptions_count = IntegerField(default=0) cache_size_count = IntegerField(default=0) cache_object_count = IntegerField(default=0) raw_stats = TextField(default="{}") class Meta: table_name = "job_execution"