from __future__ import annotations import re import tomllib from dataclasses import dataclass from pathlib import Path from typing import Any from scrapy.settings import Settings IMAGE_DIR = "images" VIDEO_DIR = "video" AUDIO_DIR = "audio" FILE_DIR = "files" SLUG_PATTERN = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*$") @dataclass(frozen=True) class FeedConfig: name: str slug: str url: str @dataclass(frozen=True) class RepublisherConfig: config_path: Path out_dir: Path feeds: tuple[FeedConfig, ...] scrapy_settings: dict[str, Any] def _resolve_path(base_path: Path, value: str) -> Path: path = Path(value).expanduser() if not path.is_absolute(): path = (base_path.parent / path).resolve() return path def _load_toml(path: Path) -> dict[str, Any]: with path.open("rb") as config_file: raw_config = tomllib.load(config_file) if not isinstance(raw_config, dict): raise ValueError(f"Config file {path} must contain a TOML table") return raw_config def _parse_feed_config_paths( raw_config: dict[str, Any], *, config_path: Path ) -> tuple[Path, ...]: raw_paths = raw_config.get("feed_config_files", []) if raw_paths is None: return () if isinstance(raw_paths, str): raw_paths = [raw_paths] if not isinstance(raw_paths, list): raise ValueError("Config field 'feed_config_files' must be a string or list") paths: list[Path] = [] for index, raw_path in enumerate(raw_paths, start=1): if not isinstance(raw_path, str) or not raw_path: raise ValueError( f"Config field 'feed_config_files[{index}]' must be a non-empty string" ) paths.append(_resolve_path(config_path, raw_path)) return tuple(paths) def _parse_feed_tables(raw_feeds: Any, *, source_path: Path) -> tuple[FeedConfig, ...]: if raw_feeds is None: return () if not isinstance(raw_feeds, list): raise ValueError(f"Config file {source_path} field 'feeds' must be an array") feeds: list[FeedConfig] = [] for raw_feed in raw_feeds: if not isinstance(raw_feed, dict): raise ValueError( f"Config file {source_path} has a non-table [[feeds]] entry" ) name = raw_feed.get("name") slug = raw_feed.get("slug") url = raw_feed.get("url") if not isinstance(name, str) or not name: raise ValueError( f"Config file {source_path} has a [[feeds]] entry without a valid 'name'" ) if not isinstance(slug, str) or not slug: raise ValueError(f"Feed {name!r} in {source_path} needs a non-empty 'slug'") if SLUG_PATTERN.fullmatch(slug) is None: raise ValueError( f"Feed slug {slug!r} in {source_path} must match {SLUG_PATTERN.pattern!r}" ) if not isinstance(url, str) or not url: raise ValueError(f"Feed {name!r} in {source_path} needs a non-empty 'url'") feeds.append(FeedConfig(name=name, slug=slug, url=url)) return tuple(feeds) def _merge_feeds(feed_groups: list[tuple[FeedConfig, ...]]) -> tuple[FeedConfig, ...]: feeds: list[FeedConfig] = [] feed_names: set[str] = set() feed_slugs: set[str] = set() for group in feed_groups: for feed in group: if feed.name in feed_names: raise ValueError(f"Feed name {feed.name!r} is duplicated") if feed.slug in feed_slugs: raise ValueError(f"Feed slug {feed.slug!r} is duplicated") feed_names.add(feed.name) feed_slugs.add(feed.slug) feeds.append(feed) return tuple(feeds) def load_config(path: str | Path) -> RepublisherConfig: config_path = Path(path).expanduser().resolve() raw_config = _load_toml(config_path) out_dir_value = raw_config.get("out_dir", "out") if not isinstance(out_dir_value, str) or not out_dir_value: raise ValueError("Config field 'out_dir' must be a non-empty string") out_dir = _resolve_path(config_path, out_dir_value) feed_config_paths = _parse_feed_config_paths(raw_config, config_path=config_path) feed_groups = [_parse_feed_tables(raw_config.get("feeds"), source_path=config_path)] for feed_config_path in feed_config_paths: imported_config = _load_toml(feed_config_path) feed_groups.append( _parse_feed_tables( imported_config.get("feeds"), source_path=feed_config_path, ) ) feeds = _merge_feeds(feed_groups) if not feeds: raise ValueError( "Config must include at least one [[feeds]] entry or feed_config_files import" ) raw_scrapy = raw_config.get("scrapy", {}) if raw_scrapy is None: raw_scrapy = {} if not isinstance(raw_scrapy, dict): raise ValueError("Config field 'scrapy' must be a table") scrapy_settings = raw_scrapy.get("settings", {}) if scrapy_settings is None: scrapy_settings = {} if not isinstance(scrapy_settings, dict): raise ValueError("Config field 'scrapy.settings' must be a table") return RepublisherConfig( config_path=config_path, out_dir=out_dir, feeds=feeds, scrapy_settings=scrapy_settings, ) def build_base_settings(config: RepublisherConfig) -> Settings: settings = Settings() settings.setmodule("repub.settings", priority="project") if config.scrapy_settings: settings.setdict(config.scrapy_settings, priority="cmdline") return settings def build_feed_settings( base_settings: Settings, *, out_dir: Path, feed_slug: str, ) -> Settings: feed_dir = out_dir / feed_slug image_dir = base_settings.get("REPUBLISHER_IMAGE_DIR", IMAGE_DIR) video_dir = base_settings.get("REPUBLISHER_VIDEO_DIR", VIDEO_DIR) audio_dir = base_settings.get("REPUBLISHER_AUDIO_DIR", AUDIO_DIR) file_dir = base_settings.get("REPUBLISHER_FILE_DIR", FILE_DIR) item_pipelines = dict(base_settings.getdict("ITEM_PIPELINES")) item_pipelines.update( { "repub.pipelines.ImagePipeline": 1, "repub.pipelines.AudioPipeline": 2, "repub.pipelines.VideoPipeline": 3, "repub.pipelines.FilePipeline": 4, } ) settings = base_settings.copy() settings.setdict( { "REPUBLISHER_OUT_DIR": str(out_dir), "FEEDS": { str(out_dir / f"{feed_slug}.rss"): { "format": "rss", "postprocessing": [], "feed_name": feed_slug, } }, "ITEM_PIPELINES": item_pipelines, "LOG_FILE": str(out_dir / "logs" / f"{feed_slug}.log"), "HTTPCACHE_DIR": str(out_dir / "httpcache"), "REPUBLISHER_IMAGE_DIR": image_dir, "REPUBLISHER_VIDEO_DIR": video_dir, "REPUBLISHER_AUDIO_DIR": audio_dir, "REPUBLISHER_FILE_DIR": file_dir, "IMAGES_STORE": str(feed_dir / image_dir), "AUDIO_STORE": str(feed_dir / audio_dir), "VIDEO_STORE": str(feed_dir / video_dir), "FILES_STORE": str(feed_dir / file_dir), }, priority="cmdline", ) return settings