diff --git a/.gitignore b/.gitignore index 7e75396..36f6103 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ tmp/ /test*py data logs +archive diff --git a/README.md b/README.md index 9e053bb..b9ef25e 100644 --- a/README.md +++ b/README.md @@ -12,9 +12,9 @@ poetry run repub - [x] Downloads media and enclosures - [x] Rewrites media urls - [x] Image normalization (JPG, RGB) -- [x] Audio compression +- [x] Audio transcoding +- [x] Video transcoding - [ ] Image compression -- [ ] Video compression - [ ] Download and rewrite media embedded in content/CDATA fields - [ ] Config file to drive the program - [ ] Daemonize the program diff --git a/repub/entrypoint.py b/repub/entrypoint.py index 7a6357d..d23d49d 100644 --- a/repub/entrypoint.py +++ b/repub/entrypoint.py @@ -25,12 +25,12 @@ class FeedNameFilter: def execute_spider(queue, name, url): + from repub.media import check_runtime + from repub.spiders.rss_spider import RssFeedSpider from scrapy.crawler import CrawlerProcess from scrapy.settings import Settings from scrapy.utils.project import get_project_settings - from .spiders.rss_spider import RssFeedSpider - try: settings: Settings = { **get_project_settings(), @@ -59,6 +59,13 @@ def execute_spider(queue, name, url): "VIDEO_STORE": f"out/{name}/images", "FILES_STORE": f"out/{name}/files", } + if not check_runtime( + settings.get("REPUBLISHER_FFMPEG_ENCODERS"), + settings.get("REPUBLISHER_FFMPEG_CODECS"), + ): + logger.error("Runtime depenencies not met") + queue.put("missing dependencies") + return process = CrawlerProcess(settings) # colorlog.load_colorlog() process.crawl(RssFeedSpider, feed_name=name, urls=[url]) diff --git a/repub/media.py b/repub/media.py index 9134804..604798a 100644 --- a/repub/media.py +++ b/repub/media.py @@ -1,38 +1,192 @@ +import copy import logging import math +import os +import subprocess +import sys +from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union import ffmpeg logger = logging.getLogger(__name__) +MediaMeta = Dict[str, Union[str, int, float]] -def media_info(file_path): - return ffmpeg.probe(file_path) +MediaSettings = TypedDict( + "MediaSettings", {"name": str, "extension": str, "mimetype": str} +) + + +class AudioSettings(MediaSettings): + format: str + max_bitrate: int + ffmpeg_audio_params: Dict[str, str] + + +class VideoSettings(MediaSettings): + container: str + vcodec: str + max_height: int + acodec: str + audio_max_bitrate: int + ffmpeg_audio_params: Dict[str, str] + ffmpeg_video_params: Dict[str, str] + + +class AudioMeta(TypedDict): + format_name: str + format_long_name: str + duration: str + bit_rate: float + size: str + + +class VideoMeta(TypedDict): + duration: str + size: str + format_name: str + format_long_name: str + width: int + height: int + codec_name: str + display_aspect_ratio: str + duration_ts: float + bit_rate: float + + +def probe_media(file_path) -> Dict[str, Any]: + """Probes `file_path` using ffmpeg's ffprobe and returns the data.""" + try: + return ffmpeg.probe(file_path) + except ffmpeg.Error as e: + print(e.stderr, file=sys.stderr) + logger.error(f"Failed to probe io {file_path}") + logger.error(e) + raise RuntimeError(f"Failed to probe io {file_path}") from e def bitrate(info) -> float: try: return int(info["format"]["bit_rate"]) except KeyError | ValueError: + logger.error("extracting bitrate from ffprobe failed") return math.inf -def format(info): +def format_name(info) -> Optional[str]: try: return info["format"]["format_name"] except KeyError | ValueError: + logger.error("extracting format from ffprobe failed") return None -def compression_settings(input_file, settings): - info = media_info(input_file) - br = settings.get("REPUBLISHER_AUDIO_BITRATE", 96000) - fmt = settings.get("REPUBLISHER_AUDIO_FORMAT", "mp3") - if bitrate(info) <= br: +def primary_video_stream(probe): + video_streams = [ + stream for stream in probe["streams"] if stream["codec_type"] == "video" + ] + video_streams = sorted(video_streams, key=lambda x: x["duration_ts"], reverse=True) + if not video_streams: + return None + if len(video_streams) > 1: + logger.warn( + "Encountered video file with more than 1 video stream!, choosing the one with the longest duration" + ) + return video_streams[0] + + +def primary_audio_stream(probe): + audio_streams = [ + stream for stream in probe["streams"] if stream["codec_type"] == "audio" + ] + audio_streams = sorted(audio_streams, key=lambda x: x["duration_ts"], reverse=True) + if not audio_streams: + return None + if len(audio_streams) > 1: + logger.warn( + "Encountered video file with more than 1 audio stream!, choosing the one with the longest duration" + ) + return audio_streams[0] + + +def get_resolution(probe) -> Tuple[Optional[float], Optional[float]]: + try: + video_stream = primary_video_stream(probe) + if not video_stream: + return None, None + width = int(video_stream["width"]) + height = int(video_stream["height"]) + return width, height + except KeyError | ValueError: + logger.error("extracting resolution from ffprobe failed") + return None, None + + +def get_vcodec_name(probe) -> Optional[str]: + try: + video_stream = primary_video_stream(probe) + if not video_stream: + return None + return video_stream["codec_name"] + except KeyError | ValueError: + logger.error("extracting video codec_name from ffprobe failed") + return None + + +def get_acodec_info(probe) -> Tuple[Optional[str], Optional[int]]: + try: + audio_stream = primary_audio_stream(probe) + if not audio_stream: + return None, None + return audio_stream["codec_name"], int(audio_stream["bit_rate"]) + except KeyError | ValueError: + logger.error("extracting audio codec_name from ffprobe failed") + return None, None + + +def audio_meta(probe: Dict[str, Any]) -> Optional[AudioMeta]: + return AudioMeta( + duration=probe["format"].get("duration", ""), + size=probe["format"].get("size", ""), + format_name=probe["format"].get("format_name", ""), + format_long_name=probe["format"].get("format_long_name", ""), + bit_rate=float(probe["format"].get("bit_rate", 0.0)), + ) + + +def video_meta(probe: Dict[str, Any]) -> Optional[VideoMeta]: + stream = primary_video_stream(probe) + if not stream: + return None + return VideoMeta( + duration=probe["format"].get("duration", ""), + size=probe["format"].get("size", ""), + format_name=probe["format"].get("format_name", ""), + format_long_name=probe["format"].get("format_long_name", ""), + width=int(stream.get("width", 0)), + height=int(stream.get("height", 0)), + codec_name=stream.get("codec_name", ""), + display_aspect_ratio=stream.get("display_aspect_ratio", ""), + duration_ts=float(stream.get("duration_ts", 0.0)), + bit_rate=float(stream.get("bit_rate", 0.0)), + ) + + +def audio_transcode_params( + probe_result, settings: AudioSettings +) -> Optional[Dict[str, str]]: + """ + Given a probe result and some system settings, + this function returns a dict containing opaque data that could be passsed to compress_audio. + If this function returns None, then the audio does not need to be compressed + """ + br = settings["max_bitrate"] + fmt = settings["format"] + if bitrate(probe_result) <= br: is_br = True else: is_br = False - if format(info) == fmt: + if format_name(probe_result) == fmt: is_fmt = True else: is_fmt = False @@ -40,28 +194,188 @@ def compression_settings(input_file, settings): if is_br and is_fmt: return None - if is_br: - target_br = bitrate(info) - else: - target_br = br - return {"bitrate": target_br, "ext": "mp3"} + params = {"extension": settings["extension"]} + params.update(settings["ffmpeg_audio_params"]) + return params -def compress_audio(input_file, output_file_base, settings): - ext = settings["ext"] - br = settings["bitrate"] - output_file = f"{output_file_base}.{ext}" +def transcode_audio(input_file: str, output_dir: str, params: Dict[str, str]) -> str: + """ + Uses ffmpeg, applying `settings` to `input_file`, storing output in `output_dir`, and returning the path to the compressed file + """ + params = copy.deepcopy(params) + ext = params.pop("extension") + output_file = f"{output_dir}/converted.{ext}" try: - logger.info(f"Compressing audio {input_file} to {output_file} target_br={br}") + logger.info( + f"Compressing audio {input_file} to {output_file} with params={params}" + ) out, _ = ( ffmpeg.input(input_file) .output( output_file, - **{"b:a": f"{br}", "map": "0:a:0"}, + **params, loglevel="quiet", ) .run() ) + before = os.path.getsize(input_file) / 1024 + after = os.path.getsize(output_file) / 1024 + percent_difference = 0 + if before != 0: + percent_difference = ((before - after) / before) * 100 + logger.info( + f"Compressed from {before:.2f} KiB to {after:.2f} KiB, reduction: {percent_difference:.2f}%" + ) return output_file except ffmpeg.Error as e: - raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e + print(e.stderr, file=sys.stderr) + print(e.stdout) + logger.error(e) + raise RuntimeError(f"Failed to compress audio {input_file}") from e + + +def video_transcode_params( + probe_result, settings: VideoSettings +) -> Optional[Dict[str, Any]]: + """ + Given a probe result and some system settings, + this function returns a dict containing opaque data that could be passsed to compress_video. + If this function returns None, then the video does not need to be compressed + """ + max_height = settings["max_height"] + target_container = settings["container"] + target_vcodec = settings["vcodec"] + target_acodec = settings["acodec"] + audio_max_bitrate = settings["audio_max_bitrate"] + + width, height = get_resolution(probe_result) + vcodec = get_vcodec_name(probe_result) + acodec, audio_bit_rate = get_acodec_info(probe_result) + + if not width or not height or not acodec or not audio_bit_rate: + logger.error("Failed to extract data from ffprobe") + # TODO: turn this into an exception and catch it for reporting + return None + + current_container_many = format_name(probe_result) + is_container = False + if current_container_many is not None: + if target_container in current_container_many.split(","): + is_container = True + + is_vcodec = vcodec == target_vcodec + is_acodec = acodec == target_acodec + is_audio_bitrate = audio_bit_rate <= audio_max_bitrate + is_good_height = height <= max_height + + if is_good_height and is_container and is_vcodec and is_acodec and is_audio_bitrate: + return None + + params = {"extension": settings["extension"], "strict": "-2"} + if not is_good_height: + params["vf"] = f"scale={width}:{height}" + + if not is_vcodec: + params.update(settings["ffmpeg_video_params"]) + if not is_acodec or not is_audio_bitrate: + params.update(settings["ffmpeg_audio_params"]) + return params + + +def transcode_video(input_file: str, output_dir: str, params: Dict[str, Any]) -> str: + """ + Uses ffmpeg, applying `settings` to `input_file`, storing output in `output_dir`, and returning the path to the compressed file + """ + params = copy.deepcopy(params) + ext = params.pop("extension") + output_file = f"{output_dir}/converted.{ext}" + try: + logger.info( + f"Compressing video {input_file} to {output_file} with params={params}" + ) + out, _ = ( + ffmpeg.input(input_file) + .output( + output_file, + **params, + loglevel="quiet", + ) + .run() + ) + before = os.path.getsize(input_file) / 1024 + after = os.path.getsize(output_file) / 1024 + percent_difference = 0 + if before != 0: + percent_difference = ((before - after) / before) * 100 + logger.info( + f"Compressed from {before:.2f} KiB to {after:.2f} KiB, reduction: {percent_difference:.2f}%" + ) + return output_file + except ffmpeg.Error as e: + raise RuntimeError(f"Failed to load video: {e.stderr.decode()}") from e + + +def check_codecs(codecs: List[str]) -> List[str]: + result = subprocess.run( + ["ffmpeg", "-v", "quiet", "-codecs"], capture_output=True, text=True + ) + output = result.stdout + + available_codecs = set( + line.split()[1] + for line in output.splitlines() + if len(line.split()) > 2 and "E" in line.split()[0] + ) + missing_codecs = [codec for codec in codecs if codec not in available_codecs] + + return missing_codecs + + +def check_encoders(encoders: List[str]) -> List: + result = subprocess.run( + ["ffmpeg", "-v", "quiet", "-encoders"], capture_output=True, text=True + ) + output = result.stdout + lines = output.split("\n") + encoder_lines = [ + line.strip() + for line in lines + if line.startswith(" V") or line.startswith(" A") or line.startswith(" S") + ] + available_encoders = set( + line.split()[1] for line in encoder_lines if len(line.split()) > 1 + ) + missing_encoders = [ + encoder for encoder in encoders if encoder not in available_encoders + ] + return missing_encoders + + +def is_ffmpeg_available() -> bool: + try: + subprocess.run( + ["ffmpeg", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ) + return True + except OSError: + return False + + +def check_runtime(encoders: List[str], codecs: List[str]) -> bool: + if not is_ffmpeg_available(): + logger.error("FFMPEG is not available on the PATH") + return False + missing_encoders = check_encoders(encoders) + missing_codecs = check_codecs(codecs) + if missing_encoders: + m = ", ".join(missing_encoders) + logger.error(f"Missing ffmpeg encoders: {m}") + + if missing_codecs: + m = ", ".join(missing_codecs) + logger.error(f"Missing ffmpeg codecs: {m}") + if missing_codecs or missing_encoders: + return False + + return True diff --git a/repub/pipelines.py b/repub/pipelines.py index 985f9c8..e739d7c 100644 --- a/repub/pipelines.py +++ b/repub/pipelines.py @@ -2,16 +2,13 @@ import logging import tempfile from io import BytesIO from os import PathLike -from pathlib import PurePosixPath -from typing import IO, DefaultDict, Dict, Optional, Set, Tuple, Union -from urllib.parse import urlparse +from typing import Dict, List, Optional, Union import repub.utils from repub import media -from repub.exporters import RssExporter -from scrapy.pipelines.files import FileException from scrapy.pipelines.files import FilesPipeline as BaseFilesPipeline from scrapy.pipelines.images import ImagesPipeline as BaseImagesPipeline +from scrapy.settings import Settings from scrapy.utils.misc import md5sum logger = logging.getLogger(__name__) @@ -26,70 +23,161 @@ class ImagePipeline(BaseImagesPipeline): class FilePipeline(BaseFilesPipeline): + def __init__(self, store_uri, **kwargs): + settings = kwargs["settings"] + if isinstance(settings, dict) or settings is None: + settings = Settings(settings) + self.settings = settings + super().__init__(store_uri, **kwargs) + def file_path(self, request, response=None, info=None, *, item=None): return repub.utils.local_file_path(request.url) -class AudioPipeline(BaseFilesPipeline): - def __init__(self, store_uri: Union[str, PathLike], **kwargs): - self.FILES_URLS_FIELD = "audio_urls" - self.FILES_RESULT_FIELD = "audios" - store_uri = kwargs["settings"]["AUDIO_STORE"] +def read_asset(file_path) -> BytesIO: + buf_converted = BytesIO() + with open(file_path, "rb") as f: + buf_converted.write(f.read()) + buf_converted.seek(0) + return buf_converted + + +def media_final_path(base_path, name, ext): + return f"{base_path}-{name}.{ext}" + + +class TranscodePipeline(BaseFilesPipeline): + def __init__( + self, + media_type: repub.utils.FileType, + store_uri: Union[str, PathLike], + **kwargs, + ): + settings = kwargs["settings"] + self.media_type = media_type + if isinstance(settings, dict) or settings is None: + settings = Settings(settings) + self.settings = settings super().__init__(store_uri, **kwargs) - def file_path(self, request, response=None, info=None, *, item=None): - return repub.utils.local_audio_path(request.url) - def file_downloaded(self, response, request, info, *, item=None): - return self.audio_downloaded(response, request, info, item=item) + return self.media_downloaded(response, request, info, item=item) - def audio_downloaded(self, response, request, info, *, item=None): + def media_downloaded(self, response, request, info, *, item=None): checksum = None - for path, buf in self.get_audio(response, request, info, item=item): + for path, buf, meta, mime in self.get_media(response, request, info, item=item): if checksum is None: buf.seek(0) checksum = md5sum(buf) - # width, height = image.size self.store.persist_file( path, buf, info, - # meta={"width": width, "height": height}, - headers={"Content-Type": "audio/mp3"}, + meta=meta, + headers={"Content-Type": mime}, ) return checksum - def get_audio(self, response, request, info, *, item=None): - path = self.file_path(request, response=response, info=info, item=item) + def transcode( + self, input_file: str, settings: media.MediaSettings, tmp_dir: str + ) -> Optional[str]: + probe_result = media.probe_media(input_file) + params = self.get_transcode_params(probe_result, settings) + if params is not None: + converted_file = self.transcode_media(input_file, tmp_dir, params) + return converted_file + else: + logger.info( + f"Skipping audio compression for {input_file}, it meets requirements" + ) + return None + + def get_media_settings(self) -> List[media.MediaSettings]: + raise NotImplementedError() + + def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]: + raise NotImplementedError() + + def transcode_media(self, input_file: str, tmp_dir: str, params) -> str: + raise NotImplementedError() + + def get_media_meta(self, probe_result) -> media.MediaMeta: + raise NotImplementedError() + + def get_media(self, response, request, info, *, item=None): buf = BytesIO(response.body) - with tempfile.TemporaryDirectory() as tmpdir: - tmp_file = f"{tmpdir}/file" - converted_file_base = f"{tmpdir}/converted" + base_path = self.file_path(request, response=response, info=info, item=item) + with tempfile.TemporaryDirectory() as tmp_dir: + settings = self.get_media_settings() + tmp_file = f"{tmp_dir}/original" with open(tmp_file, "wb") as f: f.write(buf.read()) - - s = media.compression_settings(tmp_file, {}) - if s is not None: - converted_file = media.compress_audio(tmp_file, converted_file_base, s) - buf_converted = BytesIO() - with open(converted_file, "rb") as f: - buf_converted.write(f.read()) - buf_converted.seek(0) - yield path, buf_converted - else: - logger.info( - f"Skipping audio compression for {path}, it meets requirements" - ) - buf.seek(0) - yield path, buf + for setting in settings: + ext = setting["extension"] + name = setting["name"] + final_path = media_final_path(base_path, name, ext) + stat = self.store.stat_file(final_path, info) + if stat: + logger.info(f"Skipping, transcoded media exists at {final_path}") + continue + converted_file = self.transcode(tmp_file, setting, tmp_dir) + if converted_file: + out_buf = read_asset(converted_file) + out_file = converted_file + else: + out_buf = buf + out_file = tmp_file + probe_result = media.probe_media(out_file) + meta = self.get_media_meta(probe_result) + logger.info(f"{self.media_type} final {final_path} with {meta}") + yield final_path, out_buf, meta, setting["mimetype"] -class VideoPipeline(BaseFilesPipeline): +class AudioPipeline(TranscodePipeline): + + DEFAULT_FILES_URLS_FIELD = "audio_urls" + DEFAULT_FILES_RESULT_FIELD = "audios" + + def __init__(self, store_uri: Union[str, PathLike], **kwargs): + store_uri = kwargs["settings"]["AUDIO_STORE"] + super().__init__(repub.utils.FileType.AUDIO, store_uri, **kwargs) + + def file_path(self, request, response=None, info=None, *, item=None): + return repub.utils.local_audio_path(request.url) + + def get_media_settings(self) -> List[media.AudioSettings]: + return self.settings["REPUBLISHER_AUDIO"] + + def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]: + return media.audio_transcode_params(probe_result, settings) + + def transcode_media(self, input_file: str, tmp_dir: str, params) -> str: + return media.transcode_audio(input_file, tmp_dir, params) + + def get_media_meta(self, probe_result) -> Optional[media.AudioMeta]: + return media.audio_meta(probe_result) + + +class VideoPipeline(TranscodePipeline): + + DEFAULT_FILES_URLS_FIELD = "video_urls" + DEFAULT_FILES_RESULT_FIELD = "videos" + def __init__(self, store_uri: Union[str, PathLike], **kwargs): - self.FILES_URLS_FIELD = "video_urls" - self.FILES_RESULT_FIELD = "videos" store_uri = kwargs["settings"]["VIDEO_STORE"] - super().__init__(store_uri, **kwargs) + super().__init__(repub.utils.FileType.VIDEO, store_uri, **kwargs) def file_path(self, request, response=None, info=None, *, item=None): return repub.utils.local_video_path(request.url) + + def get_media_settings(self) -> List[media.VideoSettings]: + return self.settings["REPUBLISHER_VIDEO"] + + def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]: + return media.video_transcode_params(probe_result, settings) + + def transcode_media(self, input_file: str, tmp_dir: str, params) -> str: + return media.transcode_video(input_file, tmp_dir, params) + + def get_media_meta(self, probe_result) -> Optional[media.VideoMeta]: + return media.video_meta(probe_result) diff --git a/repub/settings.py b/repub/settings.py index 076ed7e..85b31c9 100644 --- a/repub/settings.py +++ b/repub/settings.py @@ -99,3 +99,54 @@ LOG_LEVEL = "INFO" # LOG_LEVEL = "ERROR" MEDIA_ALLOW_REDIRECTS = True + +REPUBLISHER_AUDIO = [ + { + "name": "vbr7", + "format": "mp3", + "max_bitrate": 96000, + "mimetype": "audio/mp3", + "extension": "mp3", + "ffmpeg_audio_params": { + "acodec": "libmp3lame", + # https://trac.ffmpeg.org/wiki/Encode/MP3#VBREncoding + "qscale:a": "7", + }, + }, + { + "name": "vbr3", + "format": "aac", + "max_bitrate": 96000, + "mimetype": "audio/aac", + "extension": "aac", + "ffmpeg_audio_params": { + "acodec": "libfdk_aac", + # https://trac.ffmpeg.org/wiki/Encode/MP3#VBREncoding + "vbr": "3", + }, + }, +] +REPUBLISHER_VIDEO = [ + { + "name": "720", + "container": "mp4", + "vcodec": "h264", + "acodec": "mp3", + "audio_max_bitrate": 96000, + "ffmpeg_audio_params": { + "acodec": "libmp3lame", + # https://trac.ffmpeg.org/wiki/Encode/MP3#VBREncoding + "qscale:a": "7", + }, + "ffmpeg_video_params": {"vcodec": "h264", "strict": "-2"}, + "max_height": 720, + "mimetype": "video/mp4", + "extension": "mp4", + } +] + +REPUBLISHER_FFMPEG_ENCODERS = ["libmp3lame", "libfdk_aac"] +REPUBLISHER_FFMPEG_CODECS = ["aac", "mp3", "mpeg4", "vp9", "vorbis"] + + +CLOSESPIDER_ERRORCOUNT = 1 diff --git a/repub/spiders/rss_spider.py b/repub/spiders/rss_spider.py index ab4b7b9..6efb5d1 100644 --- a/repub/spiders/rss_spider.py +++ b/repub/spiders/rss_spider.py @@ -187,17 +187,19 @@ class RssFeedSpider(BaseRssFeedSpider): if entry.get("image"): image_urls.append(entry.get("image").href) for enc in entry.enclosures: - file_type = determine_file_type( - url=enc.get("href"), mimetype=enc.get("type") - ) + url = enc.get("href") + file_type = determine_file_type(url=url, mimetype=enc.get("type")) item.append( E.enclosure( - E.url(self.rewrite_file_url(file_type, enc.get("href"))), + E.url(self.rewrite_file_url(file_type, url)), E.length(enc.get("length")), E.type(enc.get("type")), ) ) - add_url(file_type, enc.get("href")) + self.logger.debug( + f"feed {self.feed_name} encountered enclsoure {url} {file_type}" + ) + add_url(file_type, url) if "content" in entry: for c in entry.content: diff --git a/shell.nix b/shell.nix index 7e78f14..f53b453 100644 --- a/shell.nix +++ b/shell.nix @@ -17,7 +17,10 @@ let ] ); packages = [ - pkgs.ffmpeg_5-headless + (pkgs.ffmpeg_5-full.override { + withUnfree = true; + withFdkAac = true; + }) #(pyCurrent (ps: with ps; [ ffmpeg-python ])) pkgs.zsh (pkgs.poetry.withPlugins (ps: with ps; [ poetry-plugin-up ]))