import copy import logging import math import os import subprocess import sys from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union import ffmpeg logger = logging.getLogger(__name__) MediaMeta = Dict[str, Union[str, int, float]] MediaSettings = TypedDict( "MediaSettings", {"name": str, "extension": str, "mimetype": str} ) class AudioSettings(MediaSettings): format: str max_bitrate: int ffmpeg_audio_params: Dict[str, str] class VideoSettings(MediaSettings): container: str vcodec: str max_height: int acodec: str audio_max_bitrate: int ffmpeg_audio_params: Dict[str, str] ffmpeg_video_params: Dict[str, str] class AudioMeta(TypedDict): format_name: str format_long_name: str duration: str bit_rate: float size: str class VideoMeta(TypedDict): duration: str size: str format_name: str format_long_name: str width: int height: int codec_name: str display_aspect_ratio: str duration_ts: float bit_rate: float def probe_media(file_path) -> Dict[str, Any]: """Probes `file_path` using ffmpeg's ffprobe and returns the data.""" try: return ffmpeg.probe(file_path) except ffmpeg.Error as e: print(e.stderr, file=sys.stderr) logger.error(f"Failed to probe io {file_path}") logger.error(e) raise RuntimeError(f"Failed to probe io {file_path}") from e def bitrate(info) -> float: try: return int(info["format"]["bit_rate"]) except KeyError | ValueError: logger.error("extracting bitrate from ffprobe failed") return math.inf def format_name(info) -> Optional[str]: try: return info["format"]["format_name"] except KeyError | ValueError: logger.error("extracting format from ffprobe failed") return None def primary_video_stream(probe): video_streams = [ stream for stream in probe["streams"] if stream["codec_type"] == "video" ] video_streams = sorted(video_streams, key=lambda x: x["duration_ts"], reverse=True) if not video_streams: return None if len(video_streams) > 1: logger.warn( "Encountered video file with more than 1 video stream!, choosing the one with the longest duration" ) return video_streams[0] def primary_audio_stream(probe): audio_streams = [ stream for stream in probe["streams"] if stream["codec_type"] == "audio" ] audio_streams = sorted(audio_streams, key=lambda x: x["duration_ts"], reverse=True) if not audio_streams: return None if len(audio_streams) > 1: logger.warn( "Encountered video file with more than 1 audio stream!, choosing the one with the longest duration" ) return audio_streams[0] def get_resolution(probe) -> Tuple[Optional[float], Optional[float]]: try: video_stream = primary_video_stream(probe) if not video_stream: return None, None width = int(video_stream["width"]) height = int(video_stream["height"]) return width, height except KeyError | ValueError: logger.error("extracting resolution from ffprobe failed") return None, None def get_vcodec_name(probe) -> Optional[str]: try: video_stream = primary_video_stream(probe) if not video_stream: return None return video_stream["codec_name"] except KeyError | ValueError: logger.error("extracting video codec_name from ffprobe failed") return None def get_acodec_info(probe) -> Tuple[Optional[str], Optional[int]]: try: audio_stream = primary_audio_stream(probe) if not audio_stream: return None, None return audio_stream["codec_name"], int(audio_stream["bit_rate"]) except KeyError | ValueError: logger.error("extracting audio codec_name from ffprobe failed") return None, None def audio_meta(probe: Dict[str, Any]) -> Optional[AudioMeta]: return AudioMeta( duration=probe["format"].get("duration", ""), size=probe["format"].get("size", ""), format_name=probe["format"].get("format_name", ""), format_long_name=probe["format"].get("format_long_name", ""), bit_rate=float(probe["format"].get("bit_rate", 0.0)), ) def video_meta(probe: Dict[str, Any]) -> Optional[VideoMeta]: stream = primary_video_stream(probe) if not stream: return None return VideoMeta( duration=probe["format"].get("duration", ""), size=probe["format"].get("size", ""), format_name=probe["format"].get("format_name", ""), format_long_name=probe["format"].get("format_long_name", ""), width=int(stream.get("width", 0)), height=int(stream.get("height", 0)), codec_name=stream.get("codec_name", ""), display_aspect_ratio=stream.get("display_aspect_ratio", ""), duration_ts=float(stream.get("duration_ts", 0.0)), bit_rate=float(stream.get("bit_rate", 0.0)), ) def audio_transcode_params( probe_result, settings: AudioSettings ) -> Optional[Dict[str, str]]: """ Given a probe result and some system settings, this function returns a dict containing opaque data that could be passsed to compress_audio. If this function returns None, then the audio does not need to be compressed """ br = settings["max_bitrate"] fmt = settings["format"] if bitrate(probe_result) <= br: is_br = True else: is_br = False if format_name(probe_result) == fmt: is_fmt = True else: is_fmt = False if is_br and is_fmt: return None params = {"extension": settings["extension"]} params.update(settings["ffmpeg_audio_params"]) return params def transcode_audio(input_file: str, output_dir: str, params: Dict[str, str]) -> str: """ Uses ffmpeg, applying `settings` to `input_file`, storing output in `output_dir`, and returning the path to the compressed file """ params = copy.deepcopy(params) ext = params.pop("extension") output_file = f"{output_dir}/converted.{ext}" try: logger.info( f"Transcoding audio {input_file} to {output_file} with params={params}" ) out, _ = ( ffmpeg.input(input_file) .output( output_file, **params, loglevel="quiet", ) .run() ) before = os.path.getsize(input_file) / 1024 after = os.path.getsize(output_file) / 1024 percent_difference = 0 if before != 0: percent_difference = ((before - after) / before) * 100 logger.info( f"Compressed from {before:.2f} KiB to {after:.2f} KiB, reduction: {percent_difference:.2f}%" ) return output_file except ffmpeg.Error as e: print(e.stderr, file=sys.stderr) print(e.stdout) logger.error(e) raise RuntimeError(f"Failed to compress audio {input_file}") from e def video_transcode_params( probe_result, settings: VideoSettings ) -> Optional[Dict[str, Any]]: """ Given a probe result and some system settings, this function returns a dict containing opaque data that could be passsed to compress_video. If this function returns None, then the video does not need to be compressed """ max_height = settings["max_height"] target_container = settings["container"] target_vcodec = settings["vcodec"] target_acodec = settings["acodec"] audio_max_bitrate = settings["audio_max_bitrate"] width, height = get_resolution(probe_result) vcodec = get_vcodec_name(probe_result) acodec, audio_bit_rate = get_acodec_info(probe_result) if not width or not height or not acodec or not audio_bit_rate: logger.error("Failed to extract data from ffprobe") # TODO: turn this into an exception and catch it for reporting return None current_container_many = format_name(probe_result) is_container = False if current_container_many is not None: if target_container in current_container_many.split(","): is_container = True is_vcodec = vcodec == target_vcodec is_acodec = acodec == target_acodec is_audio_bitrate = audio_bit_rate <= audio_max_bitrate is_good_height = height <= max_height if is_good_height and is_container and is_vcodec and is_acodec and is_audio_bitrate: return None passes = settings.get("passes", []) params = {"extension": settings["extension"]} if len(passes) == 0 or is_vcodec: if not is_good_height: params["vf"] = f"scale={width}:{height}" if not is_vcodec: params.update(settings["ffmpeg_video_params"]) if not is_acodec or not is_audio_bitrate: params.update(settings["ffmpeg_audio_params"]) return params params["passes"] = [] for p in passes: p = copy.deepcopy(p) if not is_good_height: p["vf"] = f"scale={width}:{height}" params["passes"].append(p) return params def transcode_video(input_file: str, output_dir: str, params: Dict[str, Any]) -> str: """ Uses ffmpeg, applying `settings` to `input_file`, storing output in `output_dir`, and returning the path to the compressed file """ params = copy.deepcopy(params) ext = params.pop("extension") output_file = f"{output_dir}/converted.{ext}" try: logger.info( f"Transcoding video {input_file} to {output_file} with params={params}" ) if "passes" not in params: out, _ = ( ffmpeg.input(input_file) .output( output_file, **params, # loglevel="quiet", ) .run() ) else: passes = params["passes"] ffinput = ffmpeg.input(input_file) video = ffinput.video audio = ffinput.audio ffoutput = ffinput.output(video, "pipe:", **passes[0]) ffoutput = ffoutput.global_args( # "-loglevel", "quiet", "-stats" ) logger.info("Running pass #1") std_out, std_err = ffoutput.run(capture_stdout=True) print(std_out) print(std_err) logger.info("Running pass #2") ffoutput = ffinput.output(video, audio, output_file, **passes[1]) ffoutput = ffoutput.global_args( # "-loglevel", "quiet", "-stats" ) ffoutput.run(overwrite_output=True) before = os.path.getsize(input_file) / 1024 after = os.path.getsize(output_file) / 1024 percent_difference = 0 if before != 0: percent_difference = ((before - after) / before) * 100 logger.info( f"Compressed from {before:.2f} KiB to {after:.2f} KiB, reduction: {percent_difference:.2f}%" ) return output_file except ffmpeg.Error as e: print(e.stderr, file=sys.stderr) logger.error(f"Failed to transcode") logger.error(e) raise RuntimeError(f"Failed to transcode video: {e.stderr.decode()}") from e except Exception as e: logger.critical(e, exc_info=True) raise e def check_codecs(codecs: List[str]) -> List[str]: result = subprocess.run( ["ffmpeg", "-v", "quiet", "-codecs"], capture_output=True, text=True ) output = result.stdout available_codecs = set( line.split()[1] for line in output.splitlines() if len(line.split()) > 2 and "E" in line.split()[0] ) missing_codecs = [codec for codec in codecs if codec not in available_codecs] return missing_codecs def check_encoders(encoders: List[str]) -> List: result = subprocess.run( ["ffmpeg", "-v", "quiet", "-encoders"], capture_output=True, text=True ) output = result.stdout lines = output.split("\n") encoder_lines = [ line.strip() for line in lines if line.startswith(" V") or line.startswith(" A") or line.startswith(" S") ] available_encoders = set( line.split()[1] for line in encoder_lines if len(line.split()) > 1 ) missing_encoders = [ encoder for encoder in encoders if encoder not in available_encoders ] return missing_encoders def is_ffmpeg_available() -> bool: try: subprocess.run( ["ffmpeg", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) return True except OSError: return False def check_runtime(encoders: List[str], codecs: List[str]) -> bool: if not is_ffmpeg_available(): logger.error("FFMPEG is not available on the PATH") return False missing_encoders = check_encoders(encoders) missing_codecs = check_codecs(codecs) if missing_encoders: m = ", ".join(missing_encoders) logger.error(f"Missing ffmpeg encoders: {m}") if missing_codecs: m = ", ".join(missing_codecs) logger.error(f"Missing ffmpeg codecs: {m}") if missing_codecs or missing_encoders: return False return True