import copy import logging import math import os import subprocess import sys from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union import ffmpeg logger = logging.getLogger(__name__) MediaMeta = Dict[str, Union[str, int, float]] MediaSettings = TypedDict( "MediaSettings", {"name": str, "extension": str, "mimetype": str} ) class AudioSettings(MediaSettings): format: str max_bitrate: int ffmpeg_audio_params: Dict[str, str] class VideoSettings(MediaSettings): container: str vcodec: str max_height: int acodec: str audio_max_bitrate: int ffmpeg_audio_params: Dict[str, str] ffmpeg_video_params: Dict[str, str] class AudioMeta(TypedDict, total=False): duration: str fileSize: str bitrate: int samplingrate: int channels: int class VideoMeta(TypedDict, total=False): duration: str fileSize: str width: int height: int bitrate: int framerate: str def _decode_ffmpeg_output(output: Any) -> str: if isinstance(output, bytes): return output.decode("utf-8", errors="replace") return str(output) def _print_ffmpeg_error_output(error: ffmpeg.Error) -> None: if error.stderr: print(_decode_ffmpeg_output(error.stderr), file=sys.stderr) if error.stdout: print(_decode_ffmpeg_output(error.stdout)) def probe_media(file_path) -> Dict[str, Any]: """Probes `file_path` using ffmpeg's ffprobe and returns the data.""" try: return ffmpeg.probe(file_path) except ffmpeg.Error as e: _print_ffmpeg_error_output(e) logger.error(f"Failed to probe io {file_path}") logger.error(e) raise RuntimeError(f"Failed to probe io {file_path}") from e def bitrate(info) -> float: try: return int(info["format"]["bit_rate"]) except (KeyError, ValueError): logger.error("extracting bitrate from ffprobe failed") return math.inf def format_name(info) -> Optional[str]: try: return info["format"]["format_name"] except (KeyError, ValueError): logger.error("extracting format from ffprobe failed") return None def _stream_duration_sort_key(stream: Dict[str, Any]) -> tuple[int, float]: duration_ts = _int_value(stream.get("duration_ts")) if duration_ts is not None: return 1, float(duration_ts) try: duration = float(str(stream.get("duration", ""))) except (TypeError, ValueError): duration = 0.0 return 0, duration def _matches_format(probe: Dict[str, Any], expected: str) -> bool: current = format_name(probe) if current is None: return False return expected in current.split(",") def primary_video_stream(probe): video_streams = [ stream for stream in probe["streams"] if stream["codec_type"] == "video" ] video_streams = sorted(video_streams, key=_stream_duration_sort_key, reverse=True) if not video_streams: return None if len(video_streams) > 1: logger.warn( "Encountered video file with more than 1 video stream!, choosing the one with the longest duration" ) return video_streams[0] def primary_audio_stream(probe): audio_streams = [ stream for stream in probe["streams"] if stream["codec_type"] == "audio" ] audio_streams = sorted(audio_streams, key=_stream_duration_sort_key, reverse=True) if not audio_streams: return None if len(audio_streams) > 1: logger.warn( "Encountered video file with more than 1 audio stream!, choosing the one with the longest duration" ) return audio_streams[0] def get_resolution(probe) -> Tuple[Optional[float], Optional[float]]: try: video_stream = primary_video_stream(probe) if not video_stream: return None, None width = int(video_stream["width"]) height = int(video_stream["height"]) return width, height except (KeyError, ValueError): logger.error("extracting resolution from ffprobe failed") return None, None def get_vcodec_name(probe) -> Optional[str]: try: video_stream = primary_video_stream(probe) if not video_stream: return None return video_stream["codec_name"] except (KeyError, ValueError): logger.error("extracting video codec_name from ffprobe failed") return None def get_acodec_info(probe) -> Tuple[Optional[str], Optional[int]]: try: audio_stream = primary_audio_stream(probe) if not audio_stream: return None, None audio_bitrate = _int_value( audio_stream.get("bit_rate") or probe["format"].get("bit_rate") ) return audio_stream["codec_name"], audio_bitrate except (KeyError, ValueError): logger.error("extracting audio codec_name from ffprobe failed") return None, None def _int_value(value: Any) -> Optional[int]: try: if value in (None, ""): return None return int(str(value)) except (TypeError, ValueError): return None def _frame_rate(stream: Dict[str, Any]) -> Optional[str]: for key in ("avg_frame_rate", "r_frame_rate"): value = stream.get(key) if value not in (None, "", "0/0"): return str(value) return None def _scale_to_max_height(max_height: int) -> str: return f"scale=-2:{max_height}" def audio_meta(probe: Dict[str, Any]) -> Optional[AudioMeta]: stream = primary_audio_stream(probe) if not stream: return None meta = AudioMeta( duration=str(probe["format"].get("duration", "")), fileSize=str(probe["format"].get("size", "")), bitrate=_int_value(probe["format"].get("bit_rate")) or 0, samplingrate=_int_value(stream.get("sample_rate")) or 0, channels=_int_value(stream.get("channels")) or 0, ) return {key: value for key, value in meta.items() if value not in ("", 0)} def video_meta(probe: Dict[str, Any]) -> Optional[VideoMeta]: stream = primary_video_stream(probe) if not stream: return None meta = VideoMeta( duration=str(probe["format"].get("duration", "")), fileSize=str(probe["format"].get("size", "")), width=_int_value(stream.get("width")) or 0, height=_int_value(stream.get("height")) or 0, bitrate=_int_value(stream.get("bit_rate") or probe["format"].get("bit_rate")) or 0, framerate=_frame_rate(stream) or "", ) return {key: value for key, value in meta.items() if value not in ("", 0)} def audio_transcode_params( probe_result, settings: AudioSettings ) -> Optional[Dict[str, str]]: """ Given a probe result and some system settings, this function returns a dict containing opaque data that could be passsed to compress_audio. If this function returns None, then the audio does not need to be compressed """ br = settings["max_bitrate"] fmt = settings["format"] if bitrate(probe_result) <= br: is_br = True else: is_br = False if _matches_format(probe_result, fmt): is_fmt = True else: is_fmt = False if is_br and is_fmt: return None params = {"extension": settings["extension"]} params.update(settings["ffmpeg_audio_params"]) return params def transcode_audio(input_file: str, output_dir: str, params: Dict[str, str]) -> str: """ Uses ffmpeg, applying `settings` to `input_file`, storing output in `output_dir`, and returning the path to the compressed file """ params = copy.deepcopy(params) ext = params.pop("extension") output_file = f"{output_dir}/converted.{ext}" try: logger.info( f"Transcoding audio {input_file} to {output_file} with params={params}" ) out, _ = ( ffmpeg.input(input_file) .output( output_file, **params, loglevel="quiet", ) .run(capture_stdout=True, capture_stderr=True) ) before = os.path.getsize(input_file) / 1024 after = os.path.getsize(output_file) / 1024 percent_difference = 0 if before != 0: percent_difference = ((before - after) / before) * 100 logger.info( f"Compressed from {before:.2f} KiB to {after:.2f} KiB, reduction: {percent_difference:.2f}%" ) return output_file except ffmpeg.Error as e: _print_ffmpeg_error_output(e) logger.error(e) raise RuntimeError(f"Failed to compress audio {input_file}") from e def video_transcode_params( probe_result, settings: VideoSettings ) -> Optional[Dict[str, Any]]: """ Given a probe result and some system settings, this function returns a dict containing opaque data that could be passsed to compress_video. If this function returns None, then the video does not need to be compressed """ max_height = settings["max_height"] target_container = settings["container"] target_vcodec = settings["vcodec"] target_acodec = settings["acodec"] audio_max_bitrate = settings["audio_max_bitrate"] width, height = get_resolution(probe_result) vcodec = get_vcodec_name(probe_result) acodec, audio_bit_rate = get_acodec_info(probe_result) if not width or not height or not acodec or not audio_bit_rate: logger.error("Failed to extract data from ffprobe") # TODO: turn this into an exception and catch it for reporting return None is_container = _matches_format(probe_result, target_container) is_vcodec = vcodec == target_vcodec is_acodec = acodec == target_acodec is_audio_bitrate = audio_bit_rate <= audio_max_bitrate is_good_height = height <= max_height if is_good_height and is_container and is_vcodec and is_acodec and is_audio_bitrate: return None passes = settings.get("passes", []) params = {"extension": settings["extension"]} if len(passes) == 0 or is_vcodec: if not is_good_height: params["vf"] = _scale_to_max_height(max_height) if not is_vcodec: params.update(settings["ffmpeg_video_params"]) if not is_acodec or not is_audio_bitrate: params.update(settings["ffmpeg_audio_params"]) return params params["passes"] = [] for p in passes: p = copy.deepcopy(p) if not is_good_height: p["vf"] = _scale_to_max_height(max_height) params["passes"].append(p) return params def transcode_video(input_file: str, output_dir: str, params: Dict[str, Any]) -> str: """ Uses ffmpeg, applying `settings` to `input_file`, storing output in `output_dir`, and returning the path to the compressed file """ params = copy.deepcopy(params) ext = params.pop("extension") output_file = f"{output_dir}/converted.{ext}" try: logger.info( f"Transcoding video {input_file} to {output_file} with params={params}" ) if "passes" not in params: out, _ = ( ffmpeg.input(input_file) .output( output_file, **params, # loglevel="quiet", ) .run(capture_stdout=True, capture_stderr=True) ) else: passes = params["passes"] ffinput = ffmpeg.input(input_file) video = ffinput.video audio = ffinput.audio ffoutput = ffinput.output(video, "pipe:", **passes[0]) ffoutput = ffoutput.global_args( # "-loglevel", "quiet", "-stats" ) logger.info("Running pass #1") ffoutput.run(capture_stdout=True, capture_stderr=True) logger.info("Running pass #2") ffoutput = ffinput.output(video, audio, output_file, **passes[1]) ffoutput = ffoutput.global_args( # "-loglevel", "quiet", "-stats" ) ffoutput.run( capture_stdout=True, capture_stderr=True, overwrite_output=True, ) before = os.path.getsize(input_file) / 1024 after = os.path.getsize(output_file) / 1024 percent_difference = 0 if before != 0: percent_difference = ((before - after) / before) * 100 logger.info( f"Compressed from {before:.2f} KiB to {after:.2f} KiB, reduction: {percent_difference:.2f}%" ) return output_file except ffmpeg.Error as e: _print_ffmpeg_error_output(e) logger.error("Failed to transcode") logger.error(e) raise RuntimeError(f"Failed to transcode video: {e.stderr.decode()}") from e except Exception as e: logger.critical(e, exc_info=True) raise e def check_codecs(codecs: List[str]) -> List[str]: result = subprocess.run( ["ffmpeg", "-v", "quiet", "-codecs"], capture_output=True, text=True ) output = result.stdout available_codecs = set( line.split()[1] for line in output.splitlines() if len(line.split()) > 2 and "E" in line.split()[0] ) missing_codecs = [codec for codec in codecs if codec not in available_codecs] return missing_codecs def check_encoders(encoders: List[str]) -> List: result = subprocess.run( ["ffmpeg", "-v", "quiet", "-encoders"], capture_output=True, text=True ) output = result.stdout lines = output.split("\n") encoder_lines = [ line.strip() for line in lines if line.startswith(" V") or line.startswith(" A") or line.startswith(" S") ] available_encoders = set( line.split()[1] for line in encoder_lines if len(line.split()) > 1 ) missing_encoders = [ encoder for encoder in encoders if encoder not in available_encoders ] return missing_encoders def is_ffmpeg_available() -> bool: try: subprocess.run( ["ffmpeg", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) return True except OSError: return False def check_runtime(encoders: List[str], codecs: List[str]) -> bool: if not is_ffmpeg_available(): logger.error("FFMPEG is not available on the PATH") return False missing_encoders = check_encoders(encoders) missing_codecs = check_codecs(codecs) if missing_encoders: m = ", ".join(missing_encoders) logger.error(f"Missing ffmpeg encoders: {m}") if missing_codecs: m = ", ".join(missing_codecs) logger.error(f"Missing ffmpeg codecs: {m}") if missing_codecs or missing_encoders: return False return True