Implement clean audio and video transcoding pipeline

This commit is contained in:
Abel Luck 2024-04-19 13:22:49 +02:00
parent ca17e44687
commit ac92eef8db
8 changed files with 540 additions and 74 deletions

1
.gitignore vendored
View file

@ -10,3 +10,4 @@ tmp/
/test*py
data
logs
archive

View file

@ -12,9 +12,9 @@ poetry run repub
- [x] Downloads media and enclosures
- [x] Rewrites media urls
- [x] Image normalization (JPG, RGB)
- [x] Audio compression
- [x] Audio transcoding
- [x] Video transcoding
- [ ] Image compression
- [ ] Video compression
- [ ] Download and rewrite media embedded in content/CDATA fields
- [ ] Config file to drive the program
- [ ] Daemonize the program

View file

@ -25,12 +25,12 @@ class FeedNameFilter:
def execute_spider(queue, name, url):
from repub.media import check_runtime
from repub.spiders.rss_spider import RssFeedSpider
from scrapy.crawler import CrawlerProcess
from scrapy.settings import Settings
from scrapy.utils.project import get_project_settings
from .spiders.rss_spider import RssFeedSpider
try:
settings: Settings = {
**get_project_settings(),
@ -59,6 +59,13 @@ def execute_spider(queue, name, url):
"VIDEO_STORE": f"out/{name}/images",
"FILES_STORE": f"out/{name}/files",
}
if not check_runtime(
settings.get("REPUBLISHER_FFMPEG_ENCODERS"),
settings.get("REPUBLISHER_FFMPEG_CODECS"),
):
logger.error("Runtime depenencies not met")
queue.put("missing dependencies")
return
process = CrawlerProcess(settings)
# colorlog.load_colorlog()
process.crawl(RssFeedSpider, feed_name=name, urls=[url])

View file

@ -1,38 +1,192 @@
import copy
import logging
import math
import os
import subprocess
import sys
from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union
import ffmpeg
logger = logging.getLogger(__name__)
MediaMeta = Dict[str, Union[str, int, float]]
def media_info(file_path):
return ffmpeg.probe(file_path)
MediaSettings = TypedDict(
"MediaSettings", {"name": str, "extension": str, "mimetype": str}
)
class AudioSettings(MediaSettings):
format: str
max_bitrate: int
ffmpeg_audio_params: Dict[str, str]
class VideoSettings(MediaSettings):
container: str
vcodec: str
max_height: int
acodec: str
audio_max_bitrate: int
ffmpeg_audio_params: Dict[str, str]
ffmpeg_video_params: Dict[str, str]
class AudioMeta(TypedDict):
format_name: str
format_long_name: str
duration: str
bit_rate: float
size: str
class VideoMeta(TypedDict):
duration: str
size: str
format_name: str
format_long_name: str
width: int
height: int
codec_name: str
display_aspect_ratio: str
duration_ts: float
bit_rate: float
def probe_media(file_path) -> Dict[str, Any]:
"""Probes `file_path` using ffmpeg's ffprobe and returns the data."""
try:
return ffmpeg.probe(file_path)
except ffmpeg.Error as e:
print(e.stderr, file=sys.stderr)
logger.error(f"Failed to probe io {file_path}")
logger.error(e)
raise RuntimeError(f"Failed to probe io {file_path}") from e
def bitrate(info) -> float:
try:
return int(info["format"]["bit_rate"])
except KeyError | ValueError:
logger.error("extracting bitrate from ffprobe failed")
return math.inf
def format(info):
def format_name(info) -> Optional[str]:
try:
return info["format"]["format_name"]
except KeyError | ValueError:
logger.error("extracting format from ffprobe failed")
return None
def compression_settings(input_file, settings):
info = media_info(input_file)
br = settings.get("REPUBLISHER_AUDIO_BITRATE", 96000)
fmt = settings.get("REPUBLISHER_AUDIO_FORMAT", "mp3")
if bitrate(info) <= br:
def primary_video_stream(probe):
video_streams = [
stream for stream in probe["streams"] if stream["codec_type"] == "video"
]
video_streams = sorted(video_streams, key=lambda x: x["duration_ts"], reverse=True)
if not video_streams:
return None
if len(video_streams) > 1:
logger.warn(
"Encountered video file with more than 1 video stream!, choosing the one with the longest duration"
)
return video_streams[0]
def primary_audio_stream(probe):
audio_streams = [
stream for stream in probe["streams"] if stream["codec_type"] == "audio"
]
audio_streams = sorted(audio_streams, key=lambda x: x["duration_ts"], reverse=True)
if not audio_streams:
return None
if len(audio_streams) > 1:
logger.warn(
"Encountered video file with more than 1 audio stream!, choosing the one with the longest duration"
)
return audio_streams[0]
def get_resolution(probe) -> Tuple[Optional[float], Optional[float]]:
try:
video_stream = primary_video_stream(probe)
if not video_stream:
return None, None
width = int(video_stream["width"])
height = int(video_stream["height"])
return width, height
except KeyError | ValueError:
logger.error("extracting resolution from ffprobe failed")
return None, None
def get_vcodec_name(probe) -> Optional[str]:
try:
video_stream = primary_video_stream(probe)
if not video_stream:
return None
return video_stream["codec_name"]
except KeyError | ValueError:
logger.error("extracting video codec_name from ffprobe failed")
return None
def get_acodec_info(probe) -> Tuple[Optional[str], Optional[int]]:
try:
audio_stream = primary_audio_stream(probe)
if not audio_stream:
return None, None
return audio_stream["codec_name"], int(audio_stream["bit_rate"])
except KeyError | ValueError:
logger.error("extracting audio codec_name from ffprobe failed")
return None, None
def audio_meta(probe: Dict[str, Any]) -> Optional[AudioMeta]:
return AudioMeta(
duration=probe["format"].get("duration", ""),
size=probe["format"].get("size", ""),
format_name=probe["format"].get("format_name", ""),
format_long_name=probe["format"].get("format_long_name", ""),
bit_rate=float(probe["format"].get("bit_rate", 0.0)),
)
def video_meta(probe: Dict[str, Any]) -> Optional[VideoMeta]:
stream = primary_video_stream(probe)
if not stream:
return None
return VideoMeta(
duration=probe["format"].get("duration", ""),
size=probe["format"].get("size", ""),
format_name=probe["format"].get("format_name", ""),
format_long_name=probe["format"].get("format_long_name", ""),
width=int(stream.get("width", 0)),
height=int(stream.get("height", 0)),
codec_name=stream.get("codec_name", ""),
display_aspect_ratio=stream.get("display_aspect_ratio", ""),
duration_ts=float(stream.get("duration_ts", 0.0)),
bit_rate=float(stream.get("bit_rate", 0.0)),
)
def audio_transcode_params(
probe_result, settings: AudioSettings
) -> Optional[Dict[str, str]]:
"""
Given a probe result and some system settings,
this function returns a dict containing opaque data that could be passsed to compress_audio.
If this function returns None, then the audio does not need to be compressed
"""
br = settings["max_bitrate"]
fmt = settings["format"]
if bitrate(probe_result) <= br:
is_br = True
else:
is_br = False
if format(info) == fmt:
if format_name(probe_result) == fmt:
is_fmt = True
else:
is_fmt = False
@ -40,28 +194,188 @@ def compression_settings(input_file, settings):
if is_br and is_fmt:
return None
if is_br:
target_br = bitrate(info)
else:
target_br = br
return {"bitrate": target_br, "ext": "mp3"}
params = {"extension": settings["extension"]}
params.update(settings["ffmpeg_audio_params"])
return params
def compress_audio(input_file, output_file_base, settings):
ext = settings["ext"]
br = settings["bitrate"]
output_file = f"{output_file_base}.{ext}"
def transcode_audio(input_file: str, output_dir: str, params: Dict[str, str]) -> str:
"""
Uses ffmpeg, applying `settings` to `input_file`, storing output in `output_dir`, and returning the path to the compressed file
"""
params = copy.deepcopy(params)
ext = params.pop("extension")
output_file = f"{output_dir}/converted.{ext}"
try:
logger.info(f"Compressing audio {input_file} to {output_file} target_br={br}")
logger.info(
f"Compressing audio {input_file} to {output_file} with params={params}"
)
out, _ = (
ffmpeg.input(input_file)
.output(
output_file,
**{"b:a": f"{br}", "map": "0:a:0"},
**params,
loglevel="quiet",
)
.run()
)
before = os.path.getsize(input_file) / 1024
after = os.path.getsize(output_file) / 1024
percent_difference = 0
if before != 0:
percent_difference = ((before - after) / before) * 100
logger.info(
f"Compressed from {before:.2f} KiB to {after:.2f} KiB, reduction: {percent_difference:.2f}%"
)
return output_file
except ffmpeg.Error as e:
raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e
print(e.stderr, file=sys.stderr)
print(e.stdout)
logger.error(e)
raise RuntimeError(f"Failed to compress audio {input_file}") from e
def video_transcode_params(
probe_result, settings: VideoSettings
) -> Optional[Dict[str, Any]]:
"""
Given a probe result and some system settings,
this function returns a dict containing opaque data that could be passsed to compress_video.
If this function returns None, then the video does not need to be compressed
"""
max_height = settings["max_height"]
target_container = settings["container"]
target_vcodec = settings["vcodec"]
target_acodec = settings["acodec"]
audio_max_bitrate = settings["audio_max_bitrate"]
width, height = get_resolution(probe_result)
vcodec = get_vcodec_name(probe_result)
acodec, audio_bit_rate = get_acodec_info(probe_result)
if not width or not height or not acodec or not audio_bit_rate:
logger.error("Failed to extract data from ffprobe")
# TODO: turn this into an exception and catch it for reporting
return None
current_container_many = format_name(probe_result)
is_container = False
if current_container_many is not None:
if target_container in current_container_many.split(","):
is_container = True
is_vcodec = vcodec == target_vcodec
is_acodec = acodec == target_acodec
is_audio_bitrate = audio_bit_rate <= audio_max_bitrate
is_good_height = height <= max_height
if is_good_height and is_container and is_vcodec and is_acodec and is_audio_bitrate:
return None
params = {"extension": settings["extension"], "strict": "-2"}
if not is_good_height:
params["vf"] = f"scale={width}:{height}"
if not is_vcodec:
params.update(settings["ffmpeg_video_params"])
if not is_acodec or not is_audio_bitrate:
params.update(settings["ffmpeg_audio_params"])
return params
def transcode_video(input_file: str, output_dir: str, params: Dict[str, Any]) -> str:
"""
Uses ffmpeg, applying `settings` to `input_file`, storing output in `output_dir`, and returning the path to the compressed file
"""
params = copy.deepcopy(params)
ext = params.pop("extension")
output_file = f"{output_dir}/converted.{ext}"
try:
logger.info(
f"Compressing video {input_file} to {output_file} with params={params}"
)
out, _ = (
ffmpeg.input(input_file)
.output(
output_file,
**params,
loglevel="quiet",
)
.run()
)
before = os.path.getsize(input_file) / 1024
after = os.path.getsize(output_file) / 1024
percent_difference = 0
if before != 0:
percent_difference = ((before - after) / before) * 100
logger.info(
f"Compressed from {before:.2f} KiB to {after:.2f} KiB, reduction: {percent_difference:.2f}%"
)
return output_file
except ffmpeg.Error as e:
raise RuntimeError(f"Failed to load video: {e.stderr.decode()}") from e
def check_codecs(codecs: List[str]) -> List[str]:
result = subprocess.run(
["ffmpeg", "-v", "quiet", "-codecs"], capture_output=True, text=True
)
output = result.stdout
available_codecs = set(
line.split()[1]
for line in output.splitlines()
if len(line.split()) > 2 and "E" in line.split()[0]
)
missing_codecs = [codec for codec in codecs if codec not in available_codecs]
return missing_codecs
def check_encoders(encoders: List[str]) -> List:
result = subprocess.run(
["ffmpeg", "-v", "quiet", "-encoders"], capture_output=True, text=True
)
output = result.stdout
lines = output.split("\n")
encoder_lines = [
line.strip()
for line in lines
if line.startswith(" V") or line.startswith(" A") or line.startswith(" S")
]
available_encoders = set(
line.split()[1] for line in encoder_lines if len(line.split()) > 1
)
missing_encoders = [
encoder for encoder in encoders if encoder not in available_encoders
]
return missing_encoders
def is_ffmpeg_available() -> bool:
try:
subprocess.run(
["ffmpeg", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
return True
except OSError:
return False
def check_runtime(encoders: List[str], codecs: List[str]) -> bool:
if not is_ffmpeg_available():
logger.error("FFMPEG is not available on the PATH")
return False
missing_encoders = check_encoders(encoders)
missing_codecs = check_codecs(codecs)
if missing_encoders:
m = ", ".join(missing_encoders)
logger.error(f"Missing ffmpeg encoders: {m}")
if missing_codecs:
m = ", ".join(missing_codecs)
logger.error(f"Missing ffmpeg codecs: {m}")
if missing_codecs or missing_encoders:
return False
return True

View file

@ -2,16 +2,13 @@ import logging
import tempfile
from io import BytesIO
from os import PathLike
from pathlib import PurePosixPath
from typing import IO, DefaultDict, Dict, Optional, Set, Tuple, Union
from urllib.parse import urlparse
from typing import Dict, List, Optional, Union
import repub.utils
from repub import media
from repub.exporters import RssExporter
from scrapy.pipelines.files import FileException
from scrapy.pipelines.files import FilesPipeline as BaseFilesPipeline
from scrapy.pipelines.images import ImagesPipeline as BaseImagesPipeline
from scrapy.settings import Settings
from scrapy.utils.misc import md5sum
logger = logging.getLogger(__name__)
@ -26,70 +23,161 @@ class ImagePipeline(BaseImagesPipeline):
class FilePipeline(BaseFilesPipeline):
def __init__(self, store_uri, **kwargs):
settings = kwargs["settings"]
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.settings = settings
super().__init__(store_uri, **kwargs)
def file_path(self, request, response=None, info=None, *, item=None):
return repub.utils.local_file_path(request.url)
class AudioPipeline(BaseFilesPipeline):
def __init__(self, store_uri: Union[str, PathLike], **kwargs):
self.FILES_URLS_FIELD = "audio_urls"
self.FILES_RESULT_FIELD = "audios"
store_uri = kwargs["settings"]["AUDIO_STORE"]
def read_asset(file_path) -> BytesIO:
buf_converted = BytesIO()
with open(file_path, "rb") as f:
buf_converted.write(f.read())
buf_converted.seek(0)
return buf_converted
def media_final_path(base_path, name, ext):
return f"{base_path}-{name}.{ext}"
class TranscodePipeline(BaseFilesPipeline):
def __init__(
self,
media_type: repub.utils.FileType,
store_uri: Union[str, PathLike],
**kwargs,
):
settings = kwargs["settings"]
self.media_type = media_type
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.settings = settings
super().__init__(store_uri, **kwargs)
def file_path(self, request, response=None, info=None, *, item=None):
return repub.utils.local_audio_path(request.url)
def file_downloaded(self, response, request, info, *, item=None):
return self.audio_downloaded(response, request, info, item=item)
return self.media_downloaded(response, request, info, item=item)
def audio_downloaded(self, response, request, info, *, item=None):
def media_downloaded(self, response, request, info, *, item=None):
checksum = None
for path, buf in self.get_audio(response, request, info, item=item):
for path, buf, meta, mime in self.get_media(response, request, info, item=item):
if checksum is None:
buf.seek(0)
checksum = md5sum(buf)
# width, height = image.size
self.store.persist_file(
path,
buf,
info,
# meta={"width": width, "height": height},
headers={"Content-Type": "audio/mp3"},
meta=meta,
headers={"Content-Type": mime},
)
return checksum
def get_audio(self, response, request, info, *, item=None):
path = self.file_path(request, response=response, info=info, item=item)
def transcode(
self, input_file: str, settings: media.MediaSettings, tmp_dir: str
) -> Optional[str]:
probe_result = media.probe_media(input_file)
params = self.get_transcode_params(probe_result, settings)
if params is not None:
converted_file = self.transcode_media(input_file, tmp_dir, params)
return converted_file
else:
logger.info(
f"Skipping audio compression for {input_file}, it meets requirements"
)
return None
def get_media_settings(self) -> List[media.MediaSettings]:
raise NotImplementedError()
def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]:
raise NotImplementedError()
def transcode_media(self, input_file: str, tmp_dir: str, params) -> str:
raise NotImplementedError()
def get_media_meta(self, probe_result) -> media.MediaMeta:
raise NotImplementedError()
def get_media(self, response, request, info, *, item=None):
buf = BytesIO(response.body)
with tempfile.TemporaryDirectory() as tmpdir:
tmp_file = f"{tmpdir}/file"
converted_file_base = f"{tmpdir}/converted"
base_path = self.file_path(request, response=response, info=info, item=item)
with tempfile.TemporaryDirectory() as tmp_dir:
settings = self.get_media_settings()
tmp_file = f"{tmp_dir}/original"
with open(tmp_file, "wb") as f:
f.write(buf.read())
s = media.compression_settings(tmp_file, {})
if s is not None:
converted_file = media.compress_audio(tmp_file, converted_file_base, s)
buf_converted = BytesIO()
with open(converted_file, "rb") as f:
buf_converted.write(f.read())
buf_converted.seek(0)
yield path, buf_converted
else:
logger.info(
f"Skipping audio compression for {path}, it meets requirements"
)
buf.seek(0)
yield path, buf
for setting in settings:
ext = setting["extension"]
name = setting["name"]
final_path = media_final_path(base_path, name, ext)
stat = self.store.stat_file(final_path, info)
if stat:
logger.info(f"Skipping, transcoded media exists at {final_path}")
continue
converted_file = self.transcode(tmp_file, setting, tmp_dir)
if converted_file:
out_buf = read_asset(converted_file)
out_file = converted_file
else:
out_buf = buf
out_file = tmp_file
probe_result = media.probe_media(out_file)
meta = self.get_media_meta(probe_result)
logger.info(f"{self.media_type} final {final_path} with {meta}")
yield final_path, out_buf, meta, setting["mimetype"]
class VideoPipeline(BaseFilesPipeline):
class AudioPipeline(TranscodePipeline):
DEFAULT_FILES_URLS_FIELD = "audio_urls"
DEFAULT_FILES_RESULT_FIELD = "audios"
def __init__(self, store_uri: Union[str, PathLike], **kwargs):
store_uri = kwargs["settings"]["AUDIO_STORE"]
super().__init__(repub.utils.FileType.AUDIO, store_uri, **kwargs)
def file_path(self, request, response=None, info=None, *, item=None):
return repub.utils.local_audio_path(request.url)
def get_media_settings(self) -> List[media.AudioSettings]:
return self.settings["REPUBLISHER_AUDIO"]
def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]:
return media.audio_transcode_params(probe_result, settings)
def transcode_media(self, input_file: str, tmp_dir: str, params) -> str:
return media.transcode_audio(input_file, tmp_dir, params)
def get_media_meta(self, probe_result) -> Optional[media.AudioMeta]:
return media.audio_meta(probe_result)
class VideoPipeline(TranscodePipeline):
DEFAULT_FILES_URLS_FIELD = "video_urls"
DEFAULT_FILES_RESULT_FIELD = "videos"
def __init__(self, store_uri: Union[str, PathLike], **kwargs):
self.FILES_URLS_FIELD = "video_urls"
self.FILES_RESULT_FIELD = "videos"
store_uri = kwargs["settings"]["VIDEO_STORE"]
super().__init__(store_uri, **kwargs)
super().__init__(repub.utils.FileType.VIDEO, store_uri, **kwargs)
def file_path(self, request, response=None, info=None, *, item=None):
return repub.utils.local_video_path(request.url)
def get_media_settings(self) -> List[media.VideoSettings]:
return self.settings["REPUBLISHER_VIDEO"]
def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]:
return media.video_transcode_params(probe_result, settings)
def transcode_media(self, input_file: str, tmp_dir: str, params) -> str:
return media.transcode_video(input_file, tmp_dir, params)
def get_media_meta(self, probe_result) -> Optional[media.VideoMeta]:
return media.video_meta(probe_result)

View file

@ -99,3 +99,54 @@ LOG_LEVEL = "INFO"
# LOG_LEVEL = "ERROR"
MEDIA_ALLOW_REDIRECTS = True
REPUBLISHER_AUDIO = [
{
"name": "vbr7",
"format": "mp3",
"max_bitrate": 96000,
"mimetype": "audio/mp3",
"extension": "mp3",
"ffmpeg_audio_params": {
"acodec": "libmp3lame",
# https://trac.ffmpeg.org/wiki/Encode/MP3#VBREncoding
"qscale:a": "7",
},
},
{
"name": "vbr3",
"format": "aac",
"max_bitrate": 96000,
"mimetype": "audio/aac",
"extension": "aac",
"ffmpeg_audio_params": {
"acodec": "libfdk_aac",
# https://trac.ffmpeg.org/wiki/Encode/MP3#VBREncoding
"vbr": "3",
},
},
]
REPUBLISHER_VIDEO = [
{
"name": "720",
"container": "mp4",
"vcodec": "h264",
"acodec": "mp3",
"audio_max_bitrate": 96000,
"ffmpeg_audio_params": {
"acodec": "libmp3lame",
# https://trac.ffmpeg.org/wiki/Encode/MP3#VBREncoding
"qscale:a": "7",
},
"ffmpeg_video_params": {"vcodec": "h264", "strict": "-2"},
"max_height": 720,
"mimetype": "video/mp4",
"extension": "mp4",
}
]
REPUBLISHER_FFMPEG_ENCODERS = ["libmp3lame", "libfdk_aac"]
REPUBLISHER_FFMPEG_CODECS = ["aac", "mp3", "mpeg4", "vp9", "vorbis"]
CLOSESPIDER_ERRORCOUNT = 1

View file

@ -187,17 +187,19 @@ class RssFeedSpider(BaseRssFeedSpider):
if entry.get("image"):
image_urls.append(entry.get("image").href)
for enc in entry.enclosures:
file_type = determine_file_type(
url=enc.get("href"), mimetype=enc.get("type")
)
url = enc.get("href")
file_type = determine_file_type(url=url, mimetype=enc.get("type"))
item.append(
E.enclosure(
E.url(self.rewrite_file_url(file_type, enc.get("href"))),
E.url(self.rewrite_file_url(file_type, url)),
E.length(enc.get("length")),
E.type(enc.get("type")),
)
)
add_url(file_type, enc.get("href"))
self.logger.debug(
f"feed {self.feed_name} encountered enclsoure {url} {file_type}"
)
add_url(file_type, url)
if "content" in entry:
for c in entry.content:

View file

@ -17,7 +17,10 @@ let
]
);
packages = [
pkgs.ffmpeg_5-headless
(pkgs.ffmpeg_5-full.override {
withUnfree = true;
withFdkAac = true;
})
#(pyCurrent (ps: with ps; [ ffmpeg-python ]))
pkgs.zsh
(pkgs.poetry.withPlugins (ps: with ps; [ poetry-plugin-up ]))