now with configuration
This commit is contained in:
parent
65b1520697
commit
34d26f7def
10 changed files with 497 additions and 83 deletions
|
|
@ -1,19 +1,33 @@
|
|||
import logging
|
||||
import multiprocessing as mp
|
||||
import multiprocessing.connection as mpc
|
||||
from __future__ import annotations
|
||||
|
||||
feeds = {
|
||||
"gp-pod": {"url": "https://guardianproject.info/podcast/podcast.xml"},
|
||||
"nasa": {"url": "https://www.nasa.gov/rss/dyn/breaking_news.rss"},
|
||||
}
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from scrapy.crawler import Crawler, CrawlerProcess
|
||||
from scrapy.settings import Settings
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from repub.config import (
|
||||
FeedConfig,
|
||||
build_base_settings,
|
||||
build_feed_settings,
|
||||
load_config,
|
||||
)
|
||||
from repub.media import check_runtime
|
||||
from repub.spiders.rss_spider import RssFeedSpider
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
ch = logging.StreamHandler()
|
||||
ch.setLevel(logging.DEBUG)
|
||||
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
||||
ch.setFormatter(formatter)
|
||||
logger.addHandler(ch)
|
||||
logger.propagate = False
|
||||
if not logger.handlers:
|
||||
handler = logging.StreamHandler()
|
||||
handler.setLevel(logging.DEBUG)
|
||||
handler.setFormatter(
|
||||
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
||||
)
|
||||
logger.addHandler(handler)
|
||||
|
||||
|
||||
class FeedNameFilter:
|
||||
|
|
@ -24,73 +38,106 @@ class FeedNameFilter:
|
|||
return item.feed_name == self.feed_options["feed_name"]
|
||||
|
||||
|
||||
def execute_spider(queue, name, url):
|
||||
from scrapy.crawler import CrawlerProcess
|
||||
from scrapy.settings import Settings
|
||||
from scrapy.utils.project import get_project_settings
|
||||
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Mirror RSS and Atom feeds")
|
||||
parser.add_argument(
|
||||
"-c",
|
||||
"--config",
|
||||
default="repub.toml",
|
||||
help="Path to runtime config TOML file",
|
||||
)
|
||||
return parser.parse_args(argv)
|
||||
|
||||
from repub.media import check_runtime
|
||||
from repub.spiders.rss_spider import RssFeedSpider
|
||||
|
||||
try:
|
||||
settings: Settings = {
|
||||
**get_project_settings(),
|
||||
"REPUBLISHER_OUT_DIR": "out",
|
||||
"FEEDS": {
|
||||
f"out/{name}.rss": {
|
||||
"format": "rss",
|
||||
"postprocessing": [],
|
||||
# "item_filter": FeedNameFilter,
|
||||
"feed_name": name,
|
||||
}
|
||||
},
|
||||
"ITEM_PIPELINES": {
|
||||
"repub.pipelines.ImagePipeline": 1,
|
||||
"repub.pipelines.AudioPipeline": 2,
|
||||
"repub.pipelines.VideoPipeline": 3,
|
||||
"repub.pipelines.FilePipeline": 4,
|
||||
},
|
||||
"LOG_FILE": f"logs/{name}.log",
|
||||
"REPUBLISHER_IMAGE_DIR": "images",
|
||||
"REPUBLISHER_VIDEO_DIR": "video",
|
||||
"REPUBLISHER_AUDIO_DIR": "audio",
|
||||
"REPUBLISHER_FILE_DIR": "files",
|
||||
"IMAGES_STORE": f"out/{name}/images",
|
||||
"AUDIO_STORE": f"out/{name}/audio",
|
||||
"VIDEO_STORE": f"out/{name}/videos",
|
||||
"FILES_STORE": f"out/{name}/files",
|
||||
}
|
||||
if not check_runtime(
|
||||
settings.get("REPUBLISHER_FFMPEG_ENCODERS"),
|
||||
settings.get("REPUBLISHER_FFMPEG_CODECS"),
|
||||
):
|
||||
logger.error("Runtime depenencies not met")
|
||||
queue.put("missing dependencies")
|
||||
def prepare_output_dirs(out_dir: Path, feed_name: str) -> None:
|
||||
(out_dir / "logs").mkdir(parents=True, exist_ok=True)
|
||||
(out_dir / "httpcache").mkdir(parents=True, exist_ok=True)
|
||||
(out_dir / feed_name).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def create_feed_crawler(
|
||||
*,
|
||||
base_settings: Settings,
|
||||
out_dir: Path,
|
||||
feed: FeedConfig,
|
||||
init_reactor: bool,
|
||||
) -> Crawler:
|
||||
prepare_output_dirs(out_dir, feed.name)
|
||||
settings = build_feed_settings(base_settings, out_dir=out_dir, feed_name=feed.name)
|
||||
return Crawler(RssFeedSpider, settings, init_reactor=init_reactor)
|
||||
|
||||
|
||||
def run_feeds(
|
||||
base_settings: Settings,
|
||||
out_dir: Path,
|
||||
feeds: tuple[FeedConfig, ...],
|
||||
) -> int:
|
||||
process = CrawlerProcess(base_settings)
|
||||
results: list[tuple[str, Failure | None]] = []
|
||||
feed_iter = iter(feeds)
|
||||
needs_reactor_init = True
|
||||
|
||||
def crawl_next(_: object | None = None) -> None:
|
||||
nonlocal needs_reactor_init
|
||||
|
||||
try:
|
||||
feed = next(feed_iter)
|
||||
except StopIteration:
|
||||
from twisted.internet import reactor
|
||||
|
||||
reactor.stop()
|
||||
return
|
||||
process = CrawlerProcess(settings)
|
||||
# colorlog.load_colorlog()
|
||||
process.crawl(RssFeedSpider, feed_name=name, urls=[url])
|
||||
process.start()
|
||||
queue.put(None)
|
||||
except Exception as e:
|
||||
queue.put(e)
|
||||
|
||||
logger.info("Starting feed %s", feed.name)
|
||||
crawler = create_feed_crawler(
|
||||
base_settings=base_settings,
|
||||
out_dir=out_dir,
|
||||
feed=feed,
|
||||
init_reactor=needs_reactor_init,
|
||||
)
|
||||
needs_reactor_init = False
|
||||
|
||||
deferred = process.crawl(crawler, feed_name=feed.name, url=feed.url)
|
||||
|
||||
def handle_success(_: object) -> None:
|
||||
logger.info("Feed %s completed successfully", feed.name)
|
||||
results.append((feed.name, None))
|
||||
return None
|
||||
|
||||
def handle_error(failure: Failure) -> None:
|
||||
logger.error("Feed %s encountered an error", feed.name)
|
||||
logger.critical("%s", failure.getTraceback())
|
||||
results.append((feed.name, failure))
|
||||
return None
|
||||
|
||||
deferred.addCallbacks(handle_success, handle_error)
|
||||
deferred.addBoth(crawl_next)
|
||||
|
||||
crawl_next()
|
||||
process.start(stop_after_crawl=False)
|
||||
|
||||
return 1 if any(failure is not None for _, failure in results) else 0
|
||||
|
||||
|
||||
def entrypoint():
|
||||
pool = []
|
||||
for name, data in feeds.items():
|
||||
logger.info(f"Starting feed {name}")
|
||||
queue = mp.Queue()
|
||||
process = mp.Process(target=execute_spider, args=(queue, name, data["url"]))
|
||||
pool.append((name, process, queue))
|
||||
for n, proc, q in pool:
|
||||
proc.start()
|
||||
mpc.wait(p.sentinel for n, p, q in pool)
|
||||
for name, p, q in pool:
|
||||
result = q.get()
|
||||
if result is not None:
|
||||
print()
|
||||
logger.error(f"Feed {name} encountered error")
|
||||
logger.critical(result, exc_info=True)
|
||||
else:
|
||||
logger.info(f"Feed {name} completed successfully")
|
||||
def entrypoint(argv: list[str] | None = None) -> int:
|
||||
args = parse_args(argv)
|
||||
try:
|
||||
config = load_config(args.config)
|
||||
except FileNotFoundError:
|
||||
logger.error("Config file not found: %s", Path(args.config).expanduser())
|
||||
logger.error("Use --config PATH or create repub.toml in the project root")
|
||||
return 2
|
||||
base_settings = build_base_settings(config)
|
||||
|
||||
if not check_runtime(
|
||||
base_settings.get("REPUBLISHER_FFMPEG_ENCODERS"),
|
||||
base_settings.get("REPUBLISHER_FFMPEG_CODECS"),
|
||||
):
|
||||
logger.error("Runtime dependencies not met")
|
||||
return 1
|
||||
|
||||
return run_feeds(base_settings, config.out_dir, config.feeds)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(entrypoint())
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue