from __future__ import annotations import argparse import json import random import signal import sys import time from datetime import UTC, datetime from pathlib import Path def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Simulated republisher worker") parser.add_argument("--job-id", type=int, required=True) parser.add_argument("--execution-id", type=int, required=True) parser.add_argument("--stats-path", required=True) parser.add_argument("--duration-seconds", type=float, required=True) parser.add_argument("--interval-seconds", type=float, required=True) parser.add_argument("--failure-probability", type=float, required=True) return parser.parse_args(argv) def main(argv: list[str] | None = None) -> int: args = parse_args(argv) rng = random.Random(f"{args.job_id}:{args.execution_id}") stats_path = Path(args.stats_path) stats_path.parent.mkdir(parents=True, exist_ok=True) stop_requested = False def request_stop(signum: int, frame: object | None) -> None: del signum, frame nonlocal stop_requested if stop_requested: return stop_requested = True print( f"worker[{args.job_id}:{args.execution_id}]: graceful stop requested", flush=True, ) signal.signal(signal.SIGTERM, request_stop) signal.signal(signal.SIGINT, request_stop) counters = { "requests_count": 0, "items_count": 0, "warnings_count": 0, "errors_count": 0, "bytes_count": 0, "retries_count": 0, "exceptions_count": 0, "cache_size_count": 0, "cache_object_count": 0, } print( f"worker[{args.job_id}:{args.execution_id}]: starting simulated crawl", flush=True, ) started = time.monotonic() iteration = 0 with stats_path.open("a", encoding="utf-8") as stats_file: while time.monotonic() - started < args.duration_seconds: time.sleep(args.interval_seconds) iteration += 1 counters["requests_count"] += rng.randint(1, 5) counters["items_count"] += rng.randint(0, 2) counters["bytes_count"] += rng.randint(500, 3000) counters["cache_size_count"] += rng.randint(0, 1) counters["cache_object_count"] += rng.randint(0, 2) if rng.random() < 0.1: counters["warnings_count"] += 1 if rng.random() < 0.05: counters["retries_count"] += 1 snapshot = { "timestamp": datetime.now(UTC).isoformat(), "iteration": iteration, **counters, } stats_file.write(json.dumps(snapshot, sort_keys=True) + "\n") stats_file.flush() print( "stats: " f"requests={counters['requests_count']} " f"items={counters['items_count']} " f"bytes={counters['bytes_count']}", flush=True, ) if stop_requested: print( f"worker[{args.job_id}:{args.execution_id}]: stopping after graceful request", flush=True, ) return 130 if rng.random() < args.failure_probability: counters["errors_count"] += 1 counters["exceptions_count"] += 1 stats_file.write( json.dumps( {"timestamp": datetime.now(UTC).isoformat(), **counters}, sort_keys=True, ) + "\n" ) stats_file.flush() print( f"worker[{args.job_id}:{args.execution_id}]: simulated failure", flush=True, ) return 1 print( f"worker[{args.job_id}:{args.execution_id}]: completed successfully", flush=True, ) return 0 if __name__ == "__main__": sys.exit(main())