republisher/repub/pipelines.py

366 lines
13 KiB
Python

import hashlib
import logging
import mimetypes
import tempfile
import time
from io import BytesIO
from os import PathLike
from pathlib import Path
from typing import Any, Dict, List, Optional, Union, cast
from scrapy.crawler import Crawler
from scrapy.pipelines.files import FileException
from scrapy.pipelines.files import FilesPipeline as BaseFilesPipeline
from scrapy.pipelines.images import ImagesPipeline as BaseImagesPipeline
import repub.utils
from repub import media
from repub.items import MediaVariant, TranscodedMediaFile
logger = logging.getLogger(__name__)
class ImagePipeline(BaseImagesPipeline):
def file_path(self, request, response=None, info=None, *, item=None):
return repub.utils.local_image_path(request.url)
def thumb_path(self, request, thumb_id, response=None, info=None, *, item=None):
raise NotImplementedError()
class FilePipeline(BaseFilesPipeline):
def __init__(self, store_uri: Union[str, PathLike], *, crawler: Crawler):
self.settings = crawler.settings
super().__init__(store_uri, crawler=crawler)
def file_path(self, request, response=None, info=None, *, item=None):
return repub.utils.local_file_path(request.url)
def read_asset(file_path: str | Path) -> BytesIO:
buf_converted = BytesIO()
with open(file_path, "rb") as f:
buf_converted.write(f.read())
buf_converted.seek(0)
return buf_converted
def buffer_checksum(buf: BytesIO) -> str:
buf.seek(0)
checksum = hashlib.md5(buf.read(), usedforsecurity=False).hexdigest() # nosec
buf.seek(0)
return checksum
class TranscodePipeline(BaseFilesPipeline):
def __init__(
self,
media_type: repub.utils.FileType,
store_uri: Union[str, PathLike],
*,
crawler: Crawler,
):
self.media_type = media_type
self.settings = crawler.settings
super().__init__(store_uri, crawler=crawler)
def transcode(
self, input_file: str, settings: media.MediaSettings, tmp_dir: str
) -> Optional[str]:
probe_result = media.probe_media(input_file)
params = self.get_transcode_params(probe_result, settings)
if params is not None:
return self.transcode_media(input_file, tmp_dir, params)
logger.info(
f"Skipping audio compression for {input_file}, it meets requirements"
)
return None
def get_media_settings(self) -> List[media.MediaSettings]:
raise NotImplementedError()
def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]:
raise NotImplementedError()
def transcode_media(self, input_file: str, tmp_dir: str, params) -> str:
raise NotImplementedError()
def get_media_meta(self, probe_result) -> media.MediaMeta:
raise NotImplementedError()
def media_dir(self) -> str:
setting_name = {
repub.utils.FileType.AUDIO: "REPUBLISHER_AUDIO_DIR",
repub.utils.FileType.VIDEO: "REPUBLISHER_VIDEO_DIR",
}.get(self.media_type)
if setting_name is None:
raise ValueError(f"Unsupported media type: {self.media_type}")
return self.settings[setting_name]
def file_path(self, request, response=None, info=None, *, item=None):
return repub.utils.canonical_published_media_path(
self.media_type,
request.url,
self.get_media_settings(),
)
def variant_paths(
self, source_url: str
) -> list[tuple[bool, media.MediaSettings, str]]:
settings = self.get_media_settings()
return [
(
index == 0,
setting,
repub.utils.published_media_path(self.media_type, source_url, setting),
)
for index, setting in enumerate(settings)
]
def original_path(self, source_url: str) -> str:
if self.media_type == repub.utils.FileType.AUDIO:
return repub.utils.local_audio_path(source_url)
if self.media_type == repub.utils.FileType.VIDEO:
return repub.utils.local_video_path(source_url)
raise ValueError(f"Unsupported media type: {self.media_type}")
def original_mimetype(self, source_url: str, response=None) -> str:
if response is not None:
content_type = response.headers.get(b"Content-Type")
if content_type:
return content_type.decode("utf-8").split(";", 1)[0].strip()
mimetype = mimetypes.guess_type(source_url)[0]
if mimetype:
return mimetype
return {
repub.utils.FileType.AUDIO: "audio/mpeg",
repub.utils.FileType.VIDEO: "video/mp4",
}[self.media_type]
def published_url(self, path: str, item=None) -> str:
relative_path = f"{self.media_dir()}/{path}"
feed_url = str(self.settings.get("REPUBLISHER_FEED_URL", "")).rstrip("/")
if feed_url == "" or item is None:
return relative_path
return f"{feed_url}/feeds/{item.feed_name}/{relative_path}"
def local_store_path(self, path: str) -> Path:
return Path(cast(Any, self.store).basedir) / path
def media_variant(
self,
*,
path: str,
mimetype: str,
probe_result: dict[str, Any],
is_default: bool,
item=None,
) -> MediaVariant:
variant: MediaVariant = {
"url": self.published_url(path, item),
"path": path,
"type": mimetype,
"medium": self.media_type.value,
"isDefault": "true" if is_default else "false",
}
meta = self.get_media_meta(probe_result) or {}
for key, value in meta.items():
if value not in (None, ""):
variant[key] = value
return variant
def load_variants_from_disk(self, request, *, item=None) -> list[MediaVariant]:
variants: list[MediaVariant] = []
for is_default, setting, path in self.variant_paths(request.url):
file_path = self.local_store_path(path)
if not file_path.exists():
continue
probe_result = media.probe_media(str(file_path))
variants.append(
self.media_variant(
path=path,
mimetype=setting["mimetype"],
probe_result=probe_result,
is_default=is_default,
item=item,
)
)
original_path = self.original_path(request.url)
original_file = self.local_store_path(original_path)
if original_file.exists():
variants.append(
self.media_variant(
path=original_path,
mimetype=self.original_mimetype(request.url),
probe_result=media.probe_media(str(original_file)),
is_default=False,
item=item,
)
)
return variants
def make_file_result(
self,
request,
*,
checksum: str | None,
status: str,
item=None,
) -> TranscodedMediaFile:
path = self.file_path(request, item=item)
return {
"url": request.url,
"path": path,
"published_url": self.published_url(path, item),
"checksum": checksum,
"status": status,
"variants": self.load_variants_from_disk(request, item=item),
}
def media_to_download(self, request, info, *, item=None):
canonical_path = self.file_path(request, info=info, item=item)
canonical_stat = cast(
dict[str, Any] | None,
self.store.stat_file(canonical_path, info),
)
if not canonical_stat:
return None
last_modified = canonical_stat.get("last_modified")
if not last_modified:
return None
age_days = (time.time() - last_modified) / 60 / 60 / 24
if age_days > self.expires:
return None
for _, _, path in self.variant_paths(request.url):
if not cast(dict[str, Any] | None, self.store.stat_file(path, info)):
return None
if not cast(
dict[str, Any] | None,
self.store.stat_file(self.original_path(request.url), info),
):
return None
self.inc_stats("uptodate")
return self.make_file_result(
request,
checksum=canonical_stat.get("checksum"),
status="uptodate",
item=item,
)
def persist_variants(self, response, request, info, *, item=None) -> str | None:
canonical_path = self.file_path(
request, response=response, info=info, item=item
)
canonical_checksum = None
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file = f"{tmp_dir}/original"
with open(tmp_file, "wb") as f:
f.write(response.body)
original_path = self.original_path(request.url)
if not cast(
dict[str, Any] | None,
self.store.stat_file(original_path, info),
):
original_buf = read_asset(tmp_file)
self.store.persist_file(
original_path,
original_buf,
info,
meta=self.get_media_meta(media.probe_media(tmp_file)),
headers={
"Content-Type": self.original_mimetype(
request.url, response=response
)
},
)
for _, setting, final_path in self.variant_paths(request.url):
stat = cast(
dict[str, Any] | None,
self.store.stat_file(final_path, info),
)
if stat:
logger.info(f"Skipping, transcoded media exists at {final_path}")
if final_path == canonical_path:
canonical_checksum = stat.get("checksum")
continue
out_file = self.transcode(tmp_file, setting, tmp_dir) or tmp_file
out_buf = read_asset(out_file)
probe_result = media.probe_media(out_file)
meta = self.get_media_meta(probe_result)
logger.info(f"{self.media_type} final {final_path} with {meta}")
checksum = buffer_checksum(out_buf)
self.store.persist_file(
final_path,
out_buf,
info,
meta=meta,
headers={"Content-Type": setting["mimetype"]},
)
if final_path == canonical_path:
canonical_checksum = checksum
return canonical_checksum
def media_downloaded(self, response, request, info, *, item=None):
if response.status != 200:
raise FileException("download-error")
if not response.body:
raise FileException("empty-content")
status = "cached" if "cached" in response.flags else "downloaded"
self.inc_stats(status)
checksum = self.persist_variants(response, request, info, item=item)
return self.make_file_result(
request,
checksum=checksum,
status=status,
item=item,
)
class AudioPipeline(TranscodePipeline):
DEFAULT_FILES_URLS_FIELD = "audio_urls"
DEFAULT_FILES_RESULT_FIELD = "audios"
@classmethod
def from_crawler(cls, crawler: Crawler):
cls._update_stores(crawler.settings)
return cls(crawler.settings["AUDIO_STORE"], crawler=crawler)
def __init__(self, store_uri: Union[str, PathLike], *, crawler: Crawler):
super().__init__(repub.utils.FileType.AUDIO, store_uri, crawler=crawler)
def get_media_settings(self) -> List[media.AudioSettings]:
return self.settings["REPUBLISHER_AUDIO"]
def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]:
return media.audio_transcode_params(probe_result, settings)
def transcode_media(self, input_file: str, tmp_dir: str, params) -> str:
return media.transcode_audio(input_file, tmp_dir, params)
def get_media_meta(self, probe_result) -> Optional[media.AudioMeta]:
return media.audio_meta(probe_result)
class VideoPipeline(TranscodePipeline):
DEFAULT_FILES_URLS_FIELD = "video_urls"
DEFAULT_FILES_RESULT_FIELD = "videos"
@classmethod
def from_crawler(cls, crawler: Crawler):
cls._update_stores(crawler.settings)
return cls(crawler.settings["VIDEO_STORE"], crawler=crawler)
def __init__(self, store_uri: Union[str, PathLike], *, crawler: Crawler):
super().__init__(repub.utils.FileType.VIDEO, store_uri, crawler=crawler)
def get_media_settings(self) -> List[media.VideoSettings]:
return self.settings["REPUBLISHER_VIDEO"]
def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]:
return media.video_transcode_params(probe_result, settings)
def transcode_media(self, input_file: str, tmp_dir: str, params) -> str:
return media.transcode_video(input_file, tmp_dir, params)
def get_media_meta(self, probe_result) -> Optional[media.VideoMeta]:
return media.video_meta(probe_result)