republisher/repub/media.py

418 lines
13 KiB
Python

import copy
import logging
import math
import os
import subprocess
import sys
from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union
import ffmpeg
logger = logging.getLogger(__name__)
MediaMeta = Dict[str, Union[str, int, float]]
MediaSettings = TypedDict(
"MediaSettings", {"name": str, "extension": str, "mimetype": str}
)
class AudioSettings(MediaSettings):
format: str
max_bitrate: int
ffmpeg_audio_params: Dict[str, str]
class VideoSettings(MediaSettings):
container: str
vcodec: str
max_height: int
acodec: str
audio_max_bitrate: int
ffmpeg_audio_params: Dict[str, str]
ffmpeg_video_params: Dict[str, str]
class AudioMeta(TypedDict):
format_name: str
format_long_name: str
duration: str
bit_rate: float
size: str
class VideoMeta(TypedDict):
duration: str
size: str
format_name: str
format_long_name: str
width: int
height: int
codec_name: str
display_aspect_ratio: str
duration_ts: float
bit_rate: float
def probe_media(file_path) -> Dict[str, Any]:
"""Probes `file_path` using ffmpeg's ffprobe and returns the data."""
try:
return ffmpeg.probe(file_path)
except ffmpeg.Error as e:
print(e.stderr, file=sys.stderr)
logger.error(f"Failed to probe io {file_path}")
logger.error(e)
raise RuntimeError(f"Failed to probe io {file_path}") from e
def bitrate(info) -> float:
try:
return int(info["format"]["bit_rate"])
except KeyError | ValueError:
logger.error("extracting bitrate from ffprobe failed")
return math.inf
def format_name(info) -> Optional[str]:
try:
return info["format"]["format_name"]
except KeyError | ValueError:
logger.error("extracting format from ffprobe failed")
return None
def primary_video_stream(probe):
video_streams = [
stream for stream in probe["streams"] if stream["codec_type"] == "video"
]
video_streams = sorted(video_streams, key=lambda x: x["duration_ts"], reverse=True)
if not video_streams:
return None
if len(video_streams) > 1:
logger.warn(
"Encountered video file with more than 1 video stream!, choosing the one with the longest duration"
)
return video_streams[0]
def primary_audio_stream(probe):
audio_streams = [
stream for stream in probe["streams"] if stream["codec_type"] == "audio"
]
audio_streams = sorted(audio_streams, key=lambda x: x["duration_ts"], reverse=True)
if not audio_streams:
return None
if len(audio_streams) > 1:
logger.warn(
"Encountered video file with more than 1 audio stream!, choosing the one with the longest duration"
)
return audio_streams[0]
def get_resolution(probe) -> Tuple[Optional[float], Optional[float]]:
try:
video_stream = primary_video_stream(probe)
if not video_stream:
return None, None
width = int(video_stream["width"])
height = int(video_stream["height"])
return width, height
except KeyError | ValueError:
logger.error("extracting resolution from ffprobe failed")
return None, None
def get_vcodec_name(probe) -> Optional[str]:
try:
video_stream = primary_video_stream(probe)
if not video_stream:
return None
return video_stream["codec_name"]
except KeyError | ValueError:
logger.error("extracting video codec_name from ffprobe failed")
return None
def get_acodec_info(probe) -> Tuple[Optional[str], Optional[int]]:
try:
audio_stream = primary_audio_stream(probe)
if not audio_stream:
return None, None
return audio_stream["codec_name"], int(audio_stream["bit_rate"])
except KeyError | ValueError:
logger.error("extracting audio codec_name from ffprobe failed")
return None, None
def audio_meta(probe: Dict[str, Any]) -> Optional[AudioMeta]:
return AudioMeta(
duration=probe["format"].get("duration", ""),
size=probe["format"].get("size", ""),
format_name=probe["format"].get("format_name", ""),
format_long_name=probe["format"].get("format_long_name", ""),
bit_rate=float(probe["format"].get("bit_rate", 0.0)),
)
def video_meta(probe: Dict[str, Any]) -> Optional[VideoMeta]:
stream = primary_video_stream(probe)
if not stream:
return None
return VideoMeta(
duration=probe["format"].get("duration", ""),
size=probe["format"].get("size", ""),
format_name=probe["format"].get("format_name", ""),
format_long_name=probe["format"].get("format_long_name", ""),
width=int(stream.get("width", 0)),
height=int(stream.get("height", 0)),
codec_name=stream.get("codec_name", ""),
display_aspect_ratio=stream.get("display_aspect_ratio", ""),
duration_ts=float(stream.get("duration_ts", 0.0)),
bit_rate=float(stream.get("bit_rate", 0.0)),
)
def audio_transcode_params(
probe_result, settings: AudioSettings
) -> Optional[Dict[str, str]]:
"""
Given a probe result and some system settings,
this function returns a dict containing opaque data that could be passsed to compress_audio.
If this function returns None, then the audio does not need to be compressed
"""
br = settings["max_bitrate"]
fmt = settings["format"]
if bitrate(probe_result) <= br:
is_br = True
else:
is_br = False
if format_name(probe_result) == fmt:
is_fmt = True
else:
is_fmt = False
if is_br and is_fmt:
return None
params = {"extension": settings["extension"]}
params.update(settings["ffmpeg_audio_params"])
return params
def transcode_audio(input_file: str, output_dir: str, params: Dict[str, str]) -> str:
"""
Uses ffmpeg, applying `settings` to `input_file`, storing output in `output_dir`, and returning the path to the compressed file
"""
params = copy.deepcopy(params)
ext = params.pop("extension")
output_file = f"{output_dir}/converted.{ext}"
try:
logger.info(
f"Transcoding audio {input_file} to {output_file} with params={params}"
)
out, _ = (
ffmpeg.input(input_file)
.output(
output_file,
**params,
loglevel="quiet",
)
.run()
)
before = os.path.getsize(input_file) / 1024
after = os.path.getsize(output_file) / 1024
percent_difference = 0
if before != 0:
percent_difference = ((before - after) / before) * 100
logger.info(
f"Compressed from {before:.2f} KiB to {after:.2f} KiB, reduction: {percent_difference:.2f}%"
)
return output_file
except ffmpeg.Error as e:
print(e.stderr, file=sys.stderr)
print(e.stdout)
logger.error(e)
raise RuntimeError(f"Failed to compress audio {input_file}") from e
def video_transcode_params(
probe_result, settings: VideoSettings
) -> Optional[Dict[str, Any]]:
"""
Given a probe result and some system settings,
this function returns a dict containing opaque data that could be passsed to compress_video.
If this function returns None, then the video does not need to be compressed
"""
max_height = settings["max_height"]
target_container = settings["container"]
target_vcodec = settings["vcodec"]
target_acodec = settings["acodec"]
audio_max_bitrate = settings["audio_max_bitrate"]
width, height = get_resolution(probe_result)
vcodec = get_vcodec_name(probe_result)
acodec, audio_bit_rate = get_acodec_info(probe_result)
if not width or not height or not acodec or not audio_bit_rate:
logger.error("Failed to extract data from ffprobe")
# TODO: turn this into an exception and catch it for reporting
return None
current_container_many = format_name(probe_result)
is_container = False
if current_container_many is not None:
if target_container in current_container_many.split(","):
is_container = True
is_vcodec = vcodec == target_vcodec
is_acodec = acodec == target_acodec
is_audio_bitrate = audio_bit_rate <= audio_max_bitrate
is_good_height = height <= max_height
if is_good_height and is_container and is_vcodec and is_acodec and is_audio_bitrate:
return None
passes = settings.get("passes", [])
params = {"extension": settings["extension"]}
if len(passes) == 0 or is_vcodec:
if not is_good_height:
params["vf"] = f"scale={width}:{height}"
if not is_vcodec:
params.update(settings["ffmpeg_video_params"])
if not is_acodec or not is_audio_bitrate:
params.update(settings["ffmpeg_audio_params"])
return params
params["passes"] = []
for p in passes:
p = copy.deepcopy(p)
if not is_good_height:
p["vf"] = f"scale={width}:{height}"
params["passes"].append(p)
return params
def transcode_video(input_file: str, output_dir: str, params: Dict[str, Any]) -> str:
"""
Uses ffmpeg, applying `settings` to `input_file`, storing output in `output_dir`, and returning the path to the compressed file
"""
params = copy.deepcopy(params)
ext = params.pop("extension")
output_file = f"{output_dir}/converted.{ext}"
try:
logger.info(
f"Transcoding video {input_file} to {output_file} with params={params}"
)
if "passes" not in params:
out, _ = (
ffmpeg.input(input_file)
.output(
output_file,
**params,
# loglevel="quiet",
)
.run()
)
else:
passes = params["passes"]
ffinput = ffmpeg.input(input_file)
video = ffinput.video
audio = ffinput.audio
ffoutput = ffinput.output(video, "pipe:", **passes[0])
ffoutput = ffoutput.global_args(
# "-loglevel", "quiet",
"-stats"
)
logger.info("Running pass #1")
std_out, std_err = ffoutput.run(capture_stdout=True)
print(std_out)
print(std_err)
logger.info("Running pass #2")
ffoutput = ffinput.output(video, audio, output_file, **passes[1])
ffoutput = ffoutput.global_args(
# "-loglevel", "quiet",
"-stats"
)
ffoutput.run(overwrite_output=True)
before = os.path.getsize(input_file) / 1024
after = os.path.getsize(output_file) / 1024
percent_difference = 0
if before != 0:
percent_difference = ((before - after) / before) * 100
logger.info(
f"Compressed from {before:.2f} KiB to {after:.2f} KiB, reduction: {percent_difference:.2f}%"
)
return output_file
except ffmpeg.Error as e:
print(e.stderr, file=sys.stderr)
logger.error("Failed to transcode")
logger.error(e)
raise RuntimeError(f"Failed to transcode video: {e.stderr.decode()}") from e
except Exception as e:
logger.critical(e, exc_info=True)
raise e
def check_codecs(codecs: List[str]) -> List[str]:
result = subprocess.run(
["ffmpeg", "-v", "quiet", "-codecs"], capture_output=True, text=True
)
output = result.stdout
available_codecs = set(
line.split()[1]
for line in output.splitlines()
if len(line.split()) > 2 and "E" in line.split()[0]
)
missing_codecs = [codec for codec in codecs if codec not in available_codecs]
return missing_codecs
def check_encoders(encoders: List[str]) -> List:
result = subprocess.run(
["ffmpeg", "-v", "quiet", "-encoders"], capture_output=True, text=True
)
output = result.stdout
lines = output.split("\n")
encoder_lines = [
line.strip()
for line in lines
if line.startswith(" V") or line.startswith(" A") or line.startswith(" S")
]
available_encoders = set(
line.split()[1] for line in encoder_lines if len(line.split()) > 1
)
missing_encoders = [
encoder for encoder in encoders if encoder not in available_encoders
]
return missing_encoders
def is_ffmpeg_available() -> bool:
try:
subprocess.run(
["ffmpeg", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
return True
except OSError:
return False
def check_runtime(encoders: List[str], codecs: List[str]) -> bool:
if not is_ffmpeg_available():
logger.error("FFMPEG is not available on the PATH")
return False
missing_encoders = check_encoders(encoders)
missing_codecs = check_codecs(codecs)
if missing_encoders:
m = ", ".join(missing_encoders)
logger.error(f"Missing ffmpeg encoders: {m}")
if missing_codecs:
m = ", ".join(missing_codecs)
logger.error(f"Missing ffmpeg codecs: {m}")
if missing_codecs or missing_encoders:
return False
return True