from __future__ import annotations import argparse import json import signal import sys from dataclasses import dataclass from datetime import UTC, datetime from pathlib import Path from typing import Any from pygea.config import LoggingConfig, PygeaConfig, ResultsConfig, RuntimeConfig from scrapy.crawler import CrawlerProcess from scrapy.statscollectors import StatsCollector from twisted.python.failure import Failure from repub.config import ( FeedConfig, RepublisherConfig, build_base_settings, build_feed_settings, ) from repub.crawl import prepare_output_dirs from repub.model import ( Job, Source, SourceFeed, SourcePangea, database, initialize_database, ) from repub.spiders.rss_spider import RssFeedSpider def _json_default(value: Any) -> Any: if isinstance(value, datetime): if value.tzinfo is None: return value.replace(tzinfo=UTC).isoformat() return value.astimezone(UTC).isoformat() return str(value) def _normalized_stats(stats: dict[str, Any]) -> dict[str, Any]: cache_store = int(stats.get("httpcache/store", 0)) cache_hits = int(stats.get("httpcache/hit", 0)) cache_misses = int(stats.get("httpcache/miss", 0)) return { **stats, "requests_count": int(stats.get("downloader/request_count", 0)), "items_count": int(stats.get("item_scraped_count", 0)), "warnings_count": int(stats.get("log_count/WARNING", 0)), "errors_count": int(stats.get("log_count/ERROR", 0)), "bytes_count": int(stats.get("downloader/response_bytes", 0)), "retries_count": int(stats.get("retry/count", 0)), "exceptions_count": int(stats.get("spider_exceptions/count", 0)), "cache_size_count": cache_store, "cache_object_count": cache_store + cache_hits + cache_misses, } class ExecutionStatsCollector(StatsCollector): def __init__(self, crawler: Any): super().__init__(crawler) self._stats_path = Path(crawler.settings["REPUB_JOB_STATS_PATH"]) self._stats_path.parent.mkdir(parents=True, exist_ok=True) def set_value(self, key: str, value: Any, spider: Any | None = None) -> None: super().set_value(key, value, spider) self._write_snapshot() def set_stats(self, stats: dict[str, Any], spider: Any | None = None) -> None: super().set_stats(stats, spider) self._write_snapshot() def inc_value( self, key: str, count: int = 1, start: int = 0, spider: Any | None = None, ) -> None: super().inc_value(key, count, start, spider) self._write_snapshot() def max_value(self, key: str, value: Any, spider: Any | None = None) -> None: super().max_value(key, value, spider) self._write_snapshot() def min_value(self, key: str, value: Any, spider: Any | None = None) -> None: super().min_value(key, value, spider) self._write_snapshot() def clear_stats(self, spider: Any | None = None) -> None: super().clear_stats(spider) self._write_snapshot() def open_spider(self, spider: Any | None = None) -> None: super().open_spider(spider) self._write_snapshot() def _persist_stats(self, stats: dict[str, Any]) -> None: self._write_snapshot(stats) def _write_snapshot(self, stats: dict[str, Any] | None = None) -> None: payload = { "timestamp": datetime.now(UTC).isoformat(), **_normalized_stats(self._stats if stats is None else stats), } with self._stats_path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(payload, sort_keys=True, default=_json_default)) handle.write("\n") def pangea_feed_class(): from pygea.pangeafeed import PangeaFeed return PangeaFeed def generate_pangea_feed( *, name: str, slug: str, domain: str, category_name: str, content_type: str, only_newest: bool, max_articles: int, oldest_article: int, include_authors: bool, exclude_media: bool, include_content: bool, content_format: str, out_dir: str | Path, log_path: str | Path, ) -> Path: resolved_out_dir = Path(out_dir).resolve() resolved_log_path = Path(log_path).resolve() config = PygeaConfig( config_path=resolved_out_dir / "pygea-runtime.toml", domain=domain, default_content_type=content_type, feeds=( { "name": category_name, "slug": slug, "only_newest": only_newest, "content_type": content_type, }, ), runtime=RuntimeConfig( api_key=None, max_articles=max_articles, oldest_article=oldest_article, authors_p=include_authors, no_media_p=exclude_media, content_inc_p=include_content, content_format=content_format, verbose_p=True, ), results=ResultsConfig( output_to_file_p=True, output_file_name="pangea.rss", output_directory=resolved_out_dir, ), logging=LoggingConfig( log_file=resolved_log_path, default_log_level="INFO", ), ) feed_class = pangea_feed_class() feed = feed_class(config, list(config.feeds)) feed.acquire_content() feed.generate_feed() output_path = feed.disgorge(slug) if output_path is None: raise RuntimeError(f"pygea did not write an output file for {name!r}") return output_path.resolve() @dataclass(frozen=True) class JobSourceConfig: source_name: str source_slug: str source_type: str spider_arguments: dict[str, str] feed_url: str | None = None pangea_domain: str | None = None pangea_category: str | None = None content_type: str | None = None only_newest: bool = True max_articles: int = 10 oldest_article: int = 3 include_authors: bool = True exclude_media: bool = False include_content: bool = True content_format: str = "MOBILE_3" def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run a republisher job worker") parser.add_argument("--job-id", type=int, required=True) parser.add_argument("--execution-id", type=int, required=True) parser.add_argument("--db-path", required=True) parser.add_argument("--out-dir", required=True) parser.add_argument("--stats-path", required=True) return parser.parse_args(argv) def main(argv: list[str] | None = None) -> int: args = parse_args(argv) stop_requested = False process: CrawlerProcess | None = None def request_stop(signum: int, frame: object | None) -> None: del signum, frame nonlocal stop_requested stop_requested = True print( f"worker[{args.job_id}:{args.execution_id}]: graceful stop requested", flush=True, ) if process is None: return try: from twisted.internet import reactor call_from_thread = getattr(reactor, "callFromThread", None) if callable(call_from_thread): call_from_thread(process.stop) else: process.stop() except Exception as error: print( f"worker[{args.job_id}:{args.execution_id}]: failed to stop reactor gracefully: {error}", flush=True, ) signal.signal(signal.SIGTERM, request_stop) signal.signal(signal.SIGINT, request_stop) try: source_config = _load_job_source_config( db_path=args.db_path, job_id=args.job_id ) except Exception as error: print( f"worker[{args.job_id}:{args.execution_id}]: failed to load job config: {error}", flush=True, ) return 1 out_dir = Path(args.out_dir).resolve() stats_path = Path(args.stats_path).resolve() log_path = stats_path.with_suffix(".log") try: feed = _resolve_feed( source_config=source_config, out_dir=out_dir, log_path=log_path, ) process = CrawlerProcess( _build_crawl_settings( out_dir=out_dir, feed=feed, stats_path=stats_path, ) ) print( f"worker[{args.job_id}:{args.execution_id}]: starting crawl for {source_config.source_slug}", flush=True, ) exit_code = _run_crawl( process=process, feed=feed, spider_arguments=source_config.spider_arguments, ) except Exception as error: print( f"worker[{args.job_id}:{args.execution_id}]: crawl failed: {error}", flush=True, ) return 1 if stop_requested: print( f"worker[{args.job_id}:{args.execution_id}]: stopping after graceful request", flush=True, ) return 130 if exit_code == 0: print( f"worker[{args.job_id}:{args.execution_id}]: completed successfully", flush=True, ) return exit_code def _load_job_source_config(*, db_path: str, job_id: int) -> JobSourceConfig: initialize_database(db_path) primary_key = getattr(Job, "_meta").primary_key with database.connection_context(): job = ( Job.select(Job, Source) .join(Source) .where(primary_key == job_id) .get_or_none() ) if job is None: raise ValueError(f"job {job_id} does not exist") source = job.source spider_arguments = _parse_spider_arguments(job.spider_arguments) if source.source_type == "feed": feed = SourceFeed.get_or_none(SourceFeed.source == source) if feed is None: raise ValueError( f"feed source {source.slug!r} is missing its feed config" ) return JobSourceConfig( source_name=source.name, source_slug=source.slug, source_type=source.source_type, spider_arguments=spider_arguments, feed_url=feed.feed_url, ) pangea = SourcePangea.get_or_none(SourcePangea.source == source) if pangea is None: raise ValueError( f"pangea source {source.slug!r} is missing its pangea config" ) return JobSourceConfig( source_name=source.name, source_slug=source.slug, source_type=source.source_type, spider_arguments=spider_arguments, pangea_domain=pangea.domain, pangea_category=pangea.category_name, content_type=pangea.content_type, only_newest=bool(pangea.only_newest), max_articles=int(pangea.max_articles), oldest_article=int(pangea.oldest_article), include_authors=bool(pangea.include_authors), exclude_media=bool(pangea.exclude_media), include_content=bool(pangea.include_content), content_format=pangea.content_format, ) def _parse_spider_arguments(raw_value: str) -> dict[str, str]: arguments: dict[str, str] = {} for raw_line in raw_value.splitlines(): line = raw_line.strip() if line == "": continue key, separator, value = line.partition("=") key = key.strip() if separator == "" or key == "": raise ValueError( f"invalid spider argument {raw_line!r}; expected key=value" ) arguments[key] = value return arguments def _resolve_feed( *, source_config: JobSourceConfig, out_dir: Path, log_path: Path, ) -> FeedConfig: if source_config.source_type == "feed": assert source_config.feed_url is not None return FeedConfig( name=source_config.source_name, slug=source_config.source_slug, url=source_config.feed_url, ) generated_feed_path = generate_pangea_feed( name=source_config.source_name, slug=source_config.source_slug, domain=_require_value(source_config.pangea_domain, "pangea_domain"), category_name=_require_value(source_config.pangea_category, "pangea_category"), content_type=_require_value(source_config.content_type, "content_type"), only_newest=source_config.only_newest, max_articles=source_config.max_articles, oldest_article=source_config.oldest_article, include_authors=source_config.include_authors, exclude_media=source_config.exclude_media, include_content=source_config.include_content, content_format=source_config.content_format, out_dir=out_dir, log_path=log_path.with_suffix(".pygea.log"), ) print( f"pygea: generated intermediate feed at {generated_feed_path}", flush=True, ) return FeedConfig( name=source_config.source_name, slug=source_config.source_slug, url=generated_feed_path.as_uri(), ) def _build_crawl_settings(*, out_dir: Path, feed: FeedConfig, stats_path: Path): base_settings = build_base_settings( RepublisherConfig( config_path=out_dir / "job-runner.toml", out_dir=out_dir, feeds=(feed,), scrapy_settings={}, ) ) prepare_output_dirs(out_dir, feed.slug) settings = build_feed_settings(base_settings, out_dir=out_dir, feed_slug=feed.slug) settings.set("LOG_FILE", None, priority="cmdline") settings.set( "STATS_CLASS", "repub.job_runner.ExecutionStatsCollector", priority="cmdline", ) settings.set("REPUB_JOB_STATS_PATH", str(stats_path), priority="cmdline") return settings def _run_crawl( *, process: CrawlerProcess, feed: FeedConfig, spider_arguments: dict[str, str], ) -> int: results: list[Failure | None] = [] deferred = process.crawl( RssFeedSpider, feed_name=feed.slug, url=feed.url, **spider_arguments, ) def handle_success(_: object) -> None: results.append(None) return None def handle_error(failure: Failure) -> None: print(failure.getTraceback(), flush=True) results.append(failure) return None deferred.addCallbacks(handle_success, handle_error) process.start() return 1 if any(result is not None for result in results) else 0 def _require_value(value: str | None, field_name: str) -> str: if value is None or value == "": raise ValueError(f"missing {field_name}") return value if __name__ == "__main__": sys.exit(main())