468 lines
14 KiB
Python
468 lines
14 KiB
Python
import copy
|
|
import logging
|
|
import math
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union
|
|
|
|
import ffmpeg
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
MediaMeta = Dict[str, Union[str, int, float]]
|
|
|
|
MediaSettings = TypedDict(
|
|
"MediaSettings", {"name": str, "extension": str, "mimetype": str}
|
|
)
|
|
|
|
|
|
class AudioSettings(MediaSettings):
|
|
format: str
|
|
max_bitrate: int
|
|
ffmpeg_audio_params: Dict[str, str]
|
|
|
|
|
|
class VideoSettings(MediaSettings):
|
|
container: str
|
|
vcodec: str
|
|
max_height: int
|
|
acodec: str
|
|
audio_max_bitrate: int
|
|
ffmpeg_audio_params: Dict[str, str]
|
|
ffmpeg_video_params: Dict[str, str]
|
|
|
|
|
|
class AudioMeta(TypedDict, total=False):
|
|
duration: str
|
|
fileSize: str
|
|
bitrate: int
|
|
samplingrate: int
|
|
channels: int
|
|
|
|
|
|
class VideoMeta(TypedDict, total=False):
|
|
duration: str
|
|
fileSize: str
|
|
width: int
|
|
height: int
|
|
bitrate: int
|
|
framerate: str
|
|
|
|
|
|
def _decode_ffmpeg_output(output: Any) -> str:
|
|
if isinstance(output, bytes):
|
|
return output.decode("utf-8", errors="replace")
|
|
return str(output)
|
|
|
|
|
|
def _print_ffmpeg_error_output(error: ffmpeg.Error) -> None:
|
|
if error.stderr:
|
|
print(_decode_ffmpeg_output(error.stderr), file=sys.stderr)
|
|
if error.stdout:
|
|
print(_decode_ffmpeg_output(error.stdout))
|
|
|
|
|
|
def probe_media(file_path) -> Dict[str, Any]:
|
|
"""Probes `file_path` using ffmpeg's ffprobe and returns the data."""
|
|
try:
|
|
return ffmpeg.probe(file_path)
|
|
except ffmpeg.Error as e:
|
|
_print_ffmpeg_error_output(e)
|
|
logger.error(f"Failed to probe io {file_path}")
|
|
logger.error(e)
|
|
raise RuntimeError(f"Failed to probe io {file_path}") from e
|
|
|
|
|
|
def bitrate(info) -> float:
|
|
try:
|
|
return int(info["format"]["bit_rate"])
|
|
except (KeyError, ValueError):
|
|
logger.error("extracting bitrate from ffprobe failed")
|
|
return math.inf
|
|
|
|
|
|
def format_name(info) -> Optional[str]:
|
|
try:
|
|
return info["format"]["format_name"]
|
|
except (KeyError, ValueError):
|
|
logger.error("extracting format from ffprobe failed")
|
|
return None
|
|
|
|
|
|
def _stream_duration_sort_key(stream: Dict[str, Any]) -> tuple[int, float]:
|
|
duration_ts = _int_value(stream.get("duration_ts"))
|
|
if duration_ts is not None:
|
|
return 1, float(duration_ts)
|
|
try:
|
|
duration = float(str(stream.get("duration", "")))
|
|
except (TypeError, ValueError):
|
|
duration = 0.0
|
|
return 0, duration
|
|
|
|
|
|
def _matches_format(probe: Dict[str, Any], expected: str) -> bool:
|
|
current = format_name(probe)
|
|
if current is None:
|
|
return False
|
|
return expected in current.split(",")
|
|
|
|
|
|
def primary_video_stream(probe):
|
|
video_streams = [
|
|
stream for stream in probe["streams"] if stream["codec_type"] == "video"
|
|
]
|
|
video_streams = sorted(video_streams, key=_stream_duration_sort_key, reverse=True)
|
|
if not video_streams:
|
|
return None
|
|
if len(video_streams) > 1:
|
|
logger.warn(
|
|
"Encountered video file with more than 1 video stream!, choosing the one with the longest duration"
|
|
)
|
|
return video_streams[0]
|
|
|
|
|
|
def primary_audio_stream(probe):
|
|
audio_streams = [
|
|
stream for stream in probe["streams"] if stream["codec_type"] == "audio"
|
|
]
|
|
audio_streams = sorted(audio_streams, key=_stream_duration_sort_key, reverse=True)
|
|
if not audio_streams:
|
|
return None
|
|
if len(audio_streams) > 1:
|
|
logger.warn(
|
|
"Encountered video file with more than 1 audio stream!, choosing the one with the longest duration"
|
|
)
|
|
return audio_streams[0]
|
|
|
|
|
|
def get_resolution(probe) -> Tuple[Optional[float], Optional[float]]:
|
|
try:
|
|
video_stream = primary_video_stream(probe)
|
|
if not video_stream:
|
|
return None, None
|
|
width = int(video_stream["width"])
|
|
height = int(video_stream["height"])
|
|
return width, height
|
|
except (KeyError, ValueError):
|
|
logger.error("extracting resolution from ffprobe failed")
|
|
return None, None
|
|
|
|
|
|
def get_vcodec_name(probe) -> Optional[str]:
|
|
try:
|
|
video_stream = primary_video_stream(probe)
|
|
if not video_stream:
|
|
return None
|
|
return video_stream["codec_name"]
|
|
except (KeyError, ValueError):
|
|
logger.error("extracting video codec_name from ffprobe failed")
|
|
return None
|
|
|
|
|
|
def get_acodec_info(probe) -> Tuple[Optional[str], Optional[int]]:
|
|
try:
|
|
audio_stream = primary_audio_stream(probe)
|
|
if not audio_stream:
|
|
return None, None
|
|
audio_bitrate = _int_value(
|
|
audio_stream.get("bit_rate") or probe["format"].get("bit_rate")
|
|
)
|
|
return audio_stream["codec_name"], audio_bitrate
|
|
except (KeyError, ValueError):
|
|
logger.error("extracting audio codec_name from ffprobe failed")
|
|
return None, None
|
|
|
|
|
|
def _int_value(value: Any) -> Optional[int]:
|
|
try:
|
|
if value in (None, ""):
|
|
return None
|
|
return int(str(value))
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _frame_rate(stream: Dict[str, Any]) -> Optional[str]:
|
|
for key in ("avg_frame_rate", "r_frame_rate"):
|
|
value = stream.get(key)
|
|
if value not in (None, "", "0/0"):
|
|
return str(value)
|
|
return None
|
|
|
|
|
|
def _scale_to_max_height(max_height: int) -> str:
|
|
return f"scale=-2:{max_height}"
|
|
|
|
|
|
def audio_meta(probe: Dict[str, Any]) -> Optional[AudioMeta]:
|
|
stream = primary_audio_stream(probe)
|
|
if not stream:
|
|
return None
|
|
meta = AudioMeta(
|
|
duration=str(probe["format"].get("duration", "")),
|
|
fileSize=str(probe["format"].get("size", "")),
|
|
bitrate=_int_value(probe["format"].get("bit_rate")) or 0,
|
|
samplingrate=_int_value(stream.get("sample_rate")) or 0,
|
|
channels=_int_value(stream.get("channels")) or 0,
|
|
)
|
|
return {key: value for key, value in meta.items() if value not in ("", 0)}
|
|
|
|
|
|
def video_meta(probe: Dict[str, Any]) -> Optional[VideoMeta]:
|
|
stream = primary_video_stream(probe)
|
|
if not stream:
|
|
return None
|
|
meta = VideoMeta(
|
|
duration=str(probe["format"].get("duration", "")),
|
|
fileSize=str(probe["format"].get("size", "")),
|
|
width=_int_value(stream.get("width")) or 0,
|
|
height=_int_value(stream.get("height")) or 0,
|
|
bitrate=_int_value(stream.get("bit_rate") or probe["format"].get("bit_rate"))
|
|
or 0,
|
|
framerate=_frame_rate(stream) or "",
|
|
)
|
|
return {key: value for key, value in meta.items() if value not in ("", 0)}
|
|
|
|
|
|
def audio_transcode_params(
|
|
probe_result, settings: AudioSettings
|
|
) -> Optional[Dict[str, str]]:
|
|
"""
|
|
Given a probe result and some system settings,
|
|
this function returns a dict containing opaque data that could be passsed to compress_audio.
|
|
If this function returns None, then the audio does not need to be compressed
|
|
"""
|
|
br = settings["max_bitrate"]
|
|
fmt = settings["format"]
|
|
if bitrate(probe_result) <= br:
|
|
is_br = True
|
|
else:
|
|
is_br = False
|
|
if _matches_format(probe_result, fmt):
|
|
is_fmt = True
|
|
else:
|
|
is_fmt = False
|
|
|
|
if is_br and is_fmt:
|
|
return None
|
|
|
|
params = {"extension": settings["extension"]}
|
|
params.update(settings["ffmpeg_audio_params"])
|
|
return params
|
|
|
|
|
|
def transcode_audio(input_file: str, output_dir: str, params: Dict[str, str]) -> str:
|
|
"""
|
|
Uses ffmpeg, applying `settings` to `input_file`, storing output in `output_dir`, and returning the path to the compressed file
|
|
"""
|
|
params = copy.deepcopy(params)
|
|
ext = params.pop("extension")
|
|
output_file = f"{output_dir}/converted.{ext}"
|
|
try:
|
|
logger.info(
|
|
f"Transcoding audio {input_file} to {output_file} with params={params}"
|
|
)
|
|
out, _ = (
|
|
ffmpeg.input(input_file)
|
|
.output(
|
|
output_file,
|
|
**params,
|
|
loglevel="quiet",
|
|
)
|
|
.run(capture_stdout=True, capture_stderr=True)
|
|
)
|
|
before = os.path.getsize(input_file) / 1024
|
|
after = os.path.getsize(output_file) / 1024
|
|
percent_difference = 0
|
|
if before != 0:
|
|
percent_difference = ((before - after) / before) * 100
|
|
logger.info(
|
|
f"Compressed from {before:.2f} KiB to {after:.2f} KiB, reduction: {percent_difference:.2f}%"
|
|
)
|
|
return output_file
|
|
except ffmpeg.Error as e:
|
|
_print_ffmpeg_error_output(e)
|
|
logger.error(e)
|
|
raise RuntimeError(f"Failed to compress audio {input_file}") from e
|
|
|
|
|
|
def video_transcode_params(
|
|
probe_result, settings: VideoSettings
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Given a probe result and some system settings,
|
|
this function returns a dict containing opaque data that could be passsed to compress_video.
|
|
If this function returns None, then the video does not need to be compressed
|
|
"""
|
|
max_height = settings["max_height"]
|
|
target_container = settings["container"]
|
|
target_vcodec = settings["vcodec"]
|
|
target_acodec = settings["acodec"]
|
|
audio_max_bitrate = settings["audio_max_bitrate"]
|
|
|
|
width, height = get_resolution(probe_result)
|
|
vcodec = get_vcodec_name(probe_result)
|
|
acodec, audio_bit_rate = get_acodec_info(probe_result)
|
|
|
|
if not width or not height or not acodec or not audio_bit_rate:
|
|
logger.error("Failed to extract data from ffprobe")
|
|
# TODO: turn this into an exception and catch it for reporting
|
|
return None
|
|
|
|
is_container = _matches_format(probe_result, target_container)
|
|
|
|
is_vcodec = vcodec == target_vcodec
|
|
is_acodec = acodec == target_acodec
|
|
is_audio_bitrate = audio_bit_rate <= audio_max_bitrate
|
|
is_good_height = height <= max_height
|
|
|
|
if is_good_height and is_container and is_vcodec and is_acodec and is_audio_bitrate:
|
|
return None
|
|
|
|
passes = settings.get("passes", [])
|
|
params = {"extension": settings["extension"]}
|
|
if len(passes) == 0 or is_vcodec:
|
|
if not is_good_height:
|
|
params["vf"] = _scale_to_max_height(max_height)
|
|
if not is_vcodec:
|
|
params.update(settings["ffmpeg_video_params"])
|
|
if not is_acodec or not is_audio_bitrate:
|
|
params.update(settings["ffmpeg_audio_params"])
|
|
return params
|
|
params["passes"] = []
|
|
for p in passes:
|
|
p = copy.deepcopy(p)
|
|
if not is_good_height:
|
|
p["vf"] = _scale_to_max_height(max_height)
|
|
params["passes"].append(p)
|
|
return params
|
|
|
|
|
|
def transcode_video(input_file: str, output_dir: str, params: Dict[str, Any]) -> str:
|
|
"""
|
|
Uses ffmpeg, applying `settings` to `input_file`, storing output in `output_dir`, and returning the path to the compressed file
|
|
"""
|
|
params = copy.deepcopy(params)
|
|
ext = params.pop("extension")
|
|
output_file = f"{output_dir}/converted.{ext}"
|
|
try:
|
|
logger.info(
|
|
f"Transcoding video {input_file} to {output_file} with params={params}"
|
|
)
|
|
if "passes" not in params:
|
|
out, _ = (
|
|
ffmpeg.input(input_file)
|
|
.output(
|
|
output_file,
|
|
**params,
|
|
# loglevel="quiet",
|
|
)
|
|
.run(capture_stdout=True, capture_stderr=True)
|
|
)
|
|
else:
|
|
passes = params["passes"]
|
|
ffinput = ffmpeg.input(input_file)
|
|
video = ffinput.video
|
|
audio = ffinput.audio
|
|
ffoutput = ffinput.output(video, "pipe:", **passes[0])
|
|
ffoutput = ffoutput.global_args(
|
|
# "-loglevel", "quiet",
|
|
"-stats"
|
|
)
|
|
logger.info("Running pass #1")
|
|
ffoutput.run(capture_stdout=True, capture_stderr=True)
|
|
logger.info("Running pass #2")
|
|
ffoutput = ffinput.output(video, audio, output_file, **passes[1])
|
|
ffoutput = ffoutput.global_args(
|
|
# "-loglevel", "quiet",
|
|
"-stats"
|
|
)
|
|
ffoutput.run(
|
|
capture_stdout=True,
|
|
capture_stderr=True,
|
|
overwrite_output=True,
|
|
)
|
|
|
|
before = os.path.getsize(input_file) / 1024
|
|
after = os.path.getsize(output_file) / 1024
|
|
percent_difference = 0
|
|
if before != 0:
|
|
percent_difference = ((before - after) / before) * 100
|
|
logger.info(
|
|
f"Compressed from {before:.2f} KiB to {after:.2f} KiB, reduction: {percent_difference:.2f}%"
|
|
)
|
|
return output_file
|
|
except ffmpeg.Error as e:
|
|
_print_ffmpeg_error_output(e)
|
|
logger.error("Failed to transcode")
|
|
logger.error(e)
|
|
raise RuntimeError(f"Failed to transcode video: {e.stderr.decode()}") from e
|
|
except Exception as e:
|
|
logger.critical(e, exc_info=True)
|
|
raise e
|
|
|
|
|
|
def check_codecs(codecs: List[str]) -> List[str]:
|
|
result = subprocess.run(
|
|
["ffmpeg", "-v", "quiet", "-codecs"], capture_output=True, text=True
|
|
)
|
|
output = result.stdout
|
|
|
|
available_codecs = set(
|
|
line.split()[1]
|
|
for line in output.splitlines()
|
|
if len(line.split()) > 2 and "E" in line.split()[0]
|
|
)
|
|
missing_codecs = [codec for codec in codecs if codec not in available_codecs]
|
|
|
|
return missing_codecs
|
|
|
|
|
|
def check_encoders(encoders: List[str]) -> List:
|
|
result = subprocess.run(
|
|
["ffmpeg", "-v", "quiet", "-encoders"], capture_output=True, text=True
|
|
)
|
|
output = result.stdout
|
|
lines = output.split("\n")
|
|
encoder_lines = [
|
|
line.strip()
|
|
for line in lines
|
|
if line.startswith(" V") or line.startswith(" A") or line.startswith(" S")
|
|
]
|
|
available_encoders = set(
|
|
line.split()[1] for line in encoder_lines if len(line.split()) > 1
|
|
)
|
|
missing_encoders = [
|
|
encoder for encoder in encoders if encoder not in available_encoders
|
|
]
|
|
return missing_encoders
|
|
|
|
|
|
def is_ffmpeg_available() -> bool:
|
|
try:
|
|
subprocess.run(
|
|
["ffmpeg", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
|
|
)
|
|
return True
|
|
except OSError:
|
|
return False
|
|
|
|
|
|
def check_runtime(encoders: List[str], codecs: List[str]) -> bool:
|
|
if not is_ffmpeg_available():
|
|
logger.error("FFMPEG is not available on the PATH")
|
|
return False
|
|
missing_encoders = check_encoders(encoders)
|
|
missing_codecs = check_codecs(codecs)
|
|
if missing_encoders:
|
|
m = ", ".join(missing_encoders)
|
|
logger.error(f"Missing ffmpeg encoders: {m}")
|
|
|
|
if missing_codecs:
|
|
m = ", ".join(missing_codecs)
|
|
logger.error(f"Missing ffmpeg codecs: {m}")
|
|
if missing_codecs or missing_encoders:
|
|
return False
|
|
|
|
return True
|