republisher/repub/pipelines.py

464 lines
16 KiB
Python
Raw Normal View History

import functools
import hashlib
2024-04-18 17:28:09 +02:00
import logging
2026-03-31 14:33:49 +02:00
import mimetypes
2024-04-18 17:28:09 +02:00
import tempfile
import time
2024-04-18 17:28:09 +02:00
from io import BytesIO
from os import PathLike
from pathlib import Path
from typing import Any, Dict, List, Optional, Union, cast
2024-04-18 11:57:24 +02:00
import pyvips
from scrapy.crawler import Crawler
from scrapy.pipelines.files import FileException
2024-04-18 17:28:09 +02:00
from scrapy.pipelines.files import FilesPipeline as BaseFilesPipeline
2026-03-29 12:59:08 +02:00
import repub.utils
from repub import media
from repub.items import MediaVariant, TranscodedMediaFile
2026-03-29 12:59:08 +02:00
2024-04-18 17:28:09 +02:00
logger = logging.getLogger(__name__)
2024-04-18 11:57:24 +02:00
class ImageException(FileException):
"""General image error exception"""
def image_mimetype(response=None, *, url: str | None = None) -> str | None:
del url
if response is not None:
content_type = response.headers.get(b"Content-Type")
if content_type:
return content_type.decode("utf-8").split(";", 1)[0].strip()
return None
def convert_image_body_to_jpeg(
body: bytes,
*,
source_mimetype: str | None = None,
) -> tuple[BytesIO, int, int]:
try:
image = cast(
Any,
pyvips.Image.new_from_buffer(body, "", access="sequential"),
).autorot()
except pyvips.Error as exc:
raise ImageException(str(exc)) from exc
width = image.width
height = image.height
loader = ""
if image.get_typeof("vips-loader"):
loader = str(image.get("vips-loader"))
if source_mimetype == "image/jpeg" or loader.startswith("jpegload"):
return BytesIO(body), width, height
if image.hasalpha():
image = image.flatten(background=[255, 255, 255])
image = image.colourspace("srgb")
return BytesIO(image.jpegsave_buffer()), width, height
class ImagePipeline(BaseFilesPipeline):
MEDIA_NAME = "image"
EXPIRES = 90
MIN_WIDTH = 0
MIN_HEIGHT = 0
DEFAULT_FILES_URLS_FIELD = "image_urls"
DEFAULT_FILES_RESULT_FIELD = "images"
@classmethod
def from_crawler(cls, crawler: Crawler):
cls._update_stores(crawler.settings)
return cls(crawler.settings["IMAGES_STORE"], crawler=crawler)
def __init__(self, store_uri: Union[str, PathLike], *, crawler: Crawler):
self.settings = crawler.settings
super().__init__(store_uri, crawler=crawler)
resolve = functools.partial(
self._key_for_pipe,
base_class_name="ImagesPipeline",
settings=self.settings,
)
self.expires = self.settings.getint(resolve("IMAGES_EXPIRES"), self.EXPIRES)
self.files_urls_field = self.settings.get(
resolve("IMAGES_URLS_FIELD"),
self.DEFAULT_FILES_URLS_FIELD,
)
self.files_result_field = self.settings.get(
resolve("IMAGES_RESULT_FIELD"),
self.DEFAULT_FILES_RESULT_FIELD,
)
self.min_width = self.settings.getint(
resolve("IMAGES_MIN_WIDTH"),
self.MIN_WIDTH,
)
self.min_height = self.settings.getint(
resolve("IMAGES_MIN_HEIGHT"),
self.MIN_HEIGHT,
)
def file_path(self, request, response=None, info=None, *, item=None):
return repub.utils.local_image_path(request.url)
2024-04-18 11:57:24 +02:00
def file_downloaded(self, response, request, info, *, item=None):
path = self.file_path(request, response=response, info=info, item=item)
buf, width, height = convert_image_body_to_jpeg(
response.body,
source_mimetype=image_mimetype(response, url=request.url),
)
if width < self.min_width or height < self.min_height:
raise ImageException(
"Image too small "
f"({width}x{height} < {self.min_width}x{self.min_height})"
)
checksum = buffer_checksum(buf)
self.store.persist_file(
path,
buf,
info,
meta={"width": width, "height": height},
headers={"Content-Type": "image/jpeg"},
)
return checksum
2024-04-18 11:57:24 +02:00
class FilePipeline(BaseFilesPipeline):
def __init__(self, store_uri: Union[str, PathLike], *, crawler: Crawler):
self.settings = crawler.settings
super().__init__(store_uri, crawler=crawler)
def file_path(self, request, response=None, info=None, *, item=None):
return repub.utils.local_file_path(request.url)
2024-04-18 11:57:24 +02:00
def read_asset(file_path: str | Path) -> BytesIO:
buf_converted = BytesIO()
with open(file_path, "rb") as f:
buf_converted.write(f.read())
buf_converted.seek(0)
return buf_converted
2024-04-18 11:57:24 +02:00
def buffer_checksum(buf: BytesIO) -> str:
buf.seek(0)
checksum = hashlib.md5(buf.read(), usedforsecurity=False).hexdigest() # nosec
buf.seek(0)
return checksum
class TranscodePipeline(BaseFilesPipeline):
def __init__(
self,
media_type: repub.utils.FileType,
store_uri: Union[str, PathLike],
*,
crawler: Crawler,
):
self.media_type = media_type
self.settings = crawler.settings
super().__init__(store_uri, crawler=crawler)
2024-04-18 11:57:24 +02:00
def transcode(
self, input_file: str, settings: media.MediaSettings, tmp_dir: str
) -> Optional[str]:
probe_result = media.probe_media(input_file)
params = self.get_transcode_params(probe_result, settings)
if params is not None:
return self.transcode_media(input_file, tmp_dir, params)
logger.info(
f"Skipping audio compression for {input_file}, it meets requirements"
)
return None
def get_media_settings(self) -> List[media.MediaSettings]:
raise NotImplementedError()
def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]:
raise NotImplementedError()
def transcode_media(self, input_file: str, tmp_dir: str, params) -> str:
raise NotImplementedError()
def get_media_meta(self, probe_result) -> media.MediaMeta:
raise NotImplementedError()
def media_dir(self) -> str:
setting_name = {
repub.utils.FileType.AUDIO: "REPUBLISHER_AUDIO_DIR",
repub.utils.FileType.VIDEO: "REPUBLISHER_VIDEO_DIR",
}.get(self.media_type)
if setting_name is None:
raise ValueError(f"Unsupported media type: {self.media_type}")
return self.settings[setting_name]
def file_path(self, request, response=None, info=None, *, item=None):
return repub.utils.canonical_published_media_path(
self.media_type,
request.url,
self.get_media_settings(),
)
def variant_paths(
self, source_url: str
) -> list[tuple[bool, media.MediaSettings, str]]:
settings = self.get_media_settings()
return [
(
index == 0,
setting,
repub.utils.published_media_path(self.media_type, source_url, setting),
)
for index, setting in enumerate(settings)
]
2026-03-31 14:33:49 +02:00
def original_path(self, source_url: str) -> str:
if self.media_type == repub.utils.FileType.AUDIO:
return repub.utils.local_audio_path(source_url)
if self.media_type == repub.utils.FileType.VIDEO:
return repub.utils.local_video_path(source_url)
raise ValueError(f"Unsupported media type: {self.media_type}")
def original_mimetype(self, source_url: str, response=None) -> str:
if response is not None:
content_type = response.headers.get(b"Content-Type")
if content_type:
return content_type.decode("utf-8").split(";", 1)[0].strip()
mimetype = mimetypes.guess_type(source_url)[0]
if mimetype:
return mimetype
return {
repub.utils.FileType.AUDIO: "audio/mpeg",
repub.utils.FileType.VIDEO: "video/mp4",
}[self.media_type]
def published_url(self, path: str, item=None) -> str:
relative_path = f"{self.media_dir()}/{path}"
feed_url = str(self.settings.get("REPUBLISHER_FEED_URL", "")).rstrip("/")
if feed_url == "" or item is None:
return relative_path
return f"{feed_url}/feeds/{item.feed_name}/{relative_path}"
def local_store_path(self, path: str) -> Path:
return Path(cast(Any, self.store).basedir) / path
def media_variant(
self,
*,
path: str,
2026-03-31 14:33:49 +02:00
mimetype: str,
probe_result: dict[str, Any],
is_default: bool,
item=None,
) -> MediaVariant:
variant: MediaVariant = {
"url": self.published_url(path, item),
"path": path,
2026-03-31 14:33:49 +02:00
"type": mimetype,
"medium": self.media_type.value,
"isDefault": "true" if is_default else "false",
}
meta = self.get_media_meta(probe_result) or {}
for key, value in meta.items():
if value not in (None, ""):
variant[key] = value
return variant
def load_variants_from_disk(self, request, *, item=None) -> list[MediaVariant]:
variants: list[MediaVariant] = []
for is_default, setting, path in self.variant_paths(request.url):
file_path = self.local_store_path(path)
if not file_path.exists():
continue
probe_result = media.probe_media(str(file_path))
variants.append(
self.media_variant(
path=path,
2026-03-31 14:33:49 +02:00
mimetype=setting["mimetype"],
probe_result=probe_result,
is_default=is_default,
item=item,
)
)
2026-03-31 14:33:49 +02:00
original_path = self.original_path(request.url)
original_file = self.local_store_path(original_path)
if original_file.exists():
variants.append(
self.media_variant(
path=original_path,
mimetype=self.original_mimetype(request.url),
probe_result=media.probe_media(str(original_file)),
is_default=False,
item=item,
)
)
return variants
def make_file_result(
self,
request,
*,
checksum: str | None,
status: str,
item=None,
) -> TranscodedMediaFile:
path = self.file_path(request, item=item)
return {
"url": request.url,
"path": path,
"published_url": self.published_url(path, item),
"checksum": checksum,
"status": status,
"variants": self.load_variants_from_disk(request, item=item),
}
def media_to_download(self, request, info, *, item=None):
canonical_path = self.file_path(request, info=info, item=item)
canonical_stat = cast(
dict[str, Any] | None,
self.store.stat_file(canonical_path, info),
)
if not canonical_stat:
return None
last_modified = canonical_stat.get("last_modified")
if not last_modified:
return None
age_days = (time.time() - last_modified) / 60 / 60 / 24
if age_days > self.expires:
return None
for _, _, path in self.variant_paths(request.url):
if not cast(dict[str, Any] | None, self.store.stat_file(path, info)):
return None
2026-03-31 14:33:49 +02:00
if not cast(
dict[str, Any] | None,
self.store.stat_file(self.original_path(request.url), info),
):
return None
self.inc_stats("uptodate")
return self.make_file_result(
request,
checksum=canonical_stat.get("checksum"),
status="uptodate",
item=item,
)
def persist_variants(self, response, request, info, *, item=None) -> str | None:
canonical_path = self.file_path(
request, response=response, info=info, item=item
)
canonical_checksum = None
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file = f"{tmp_dir}/original"
2024-04-18 17:28:09 +02:00
with open(tmp_file, "wb") as f:
f.write(response.body)
2026-03-31 14:33:49 +02:00
original_path = self.original_path(request.url)
if not cast(
dict[str, Any] | None,
self.store.stat_file(original_path, info),
):
original_buf = read_asset(tmp_file)
self.store.persist_file(
original_path,
original_buf,
info,
meta=self.get_media_meta(media.probe_media(tmp_file)),
headers={
"Content-Type": self.original_mimetype(
request.url, response=response
)
},
)
for _, setting, final_path in self.variant_paths(request.url):
stat = cast(
dict[str, Any] | None,
self.store.stat_file(final_path, info),
)
if stat:
logger.info(f"Skipping, transcoded media exists at {final_path}")
if final_path == canonical_path:
canonical_checksum = stat.get("checksum")
continue
out_file = self.transcode(tmp_file, setting, tmp_dir) or tmp_file
out_buf = read_asset(out_file)
probe_result = media.probe_media(out_file)
meta = self.get_media_meta(probe_result)
logger.info(f"{self.media_type} final {final_path} with {meta}")
checksum = buffer_checksum(out_buf)
self.store.persist_file(
final_path,
out_buf,
info,
meta=meta,
headers={"Content-Type": setting["mimetype"]},
)
if final_path == canonical_path:
canonical_checksum = checksum
return canonical_checksum
def media_downloaded(self, response, request, info, *, item=None):
if response.status != 200:
raise FileException("download-error")
if not response.body:
raise FileException("empty-content")
status = "cached" if "cached" in response.flags else "downloaded"
self.inc_stats(status)
checksum = self.persist_variants(response, request, info, item=item)
return self.make_file_result(
request,
checksum=checksum,
status=status,
item=item,
)
class AudioPipeline(TranscodePipeline):
DEFAULT_FILES_URLS_FIELD = "audio_urls"
DEFAULT_FILES_RESULT_FIELD = "audios"
2024-04-18 17:28:09 +02:00
@classmethod
def from_crawler(cls, crawler: Crawler):
cls._update_stores(crawler.settings)
return cls(crawler.settings["AUDIO_STORE"], crawler=crawler)
def __init__(self, store_uri: Union[str, PathLike], *, crawler: Crawler):
super().__init__(repub.utils.FileType.AUDIO, store_uri, crawler=crawler)
def get_media_settings(self) -> List[media.AudioSettings]:
return self.settings["REPUBLISHER_AUDIO"]
def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]:
return media.audio_transcode_params(probe_result, settings)
2024-04-18 17:28:09 +02:00
def transcode_media(self, input_file: str, tmp_dir: str, params) -> str:
return media.transcode_audio(input_file, tmp_dir, params)
def get_media_meta(self, probe_result) -> Optional[media.AudioMeta]:
return media.audio_meta(probe_result)
class VideoPipeline(TranscodePipeline):
DEFAULT_FILES_URLS_FIELD = "video_urls"
DEFAULT_FILES_RESULT_FIELD = "videos"
2024-04-18 11:57:24 +02:00
@classmethod
def from_crawler(cls, crawler: Crawler):
cls._update_stores(crawler.settings)
return cls(crawler.settings["VIDEO_STORE"], crawler=crawler)
def __init__(self, store_uri: Union[str, PathLike], *, crawler: Crawler):
super().__init__(repub.utils.FileType.VIDEO, store_uri, crawler=crawler)
2024-04-18 11:57:24 +02:00
def get_media_settings(self) -> List[media.VideoSettings]:
return self.settings["REPUBLISHER_VIDEO"]
def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]:
return media.video_transcode_params(probe_result, settings)
def transcode_media(self, input_file: str, tmp_dir: str, params) -> str:
return media.transcode_video(input_file, tmp_dir, params)
def get_media_meta(self, probe_result) -> Optional[media.VideoMeta]:
return media.video_meta(probe_result)