republisher/repub/pipelines.py

188 lines
6.8 KiB
Python
Raw Permalink Normal View History

2024-04-18 17:28:09 +02:00
import logging
import tempfile
from io import BytesIO
from os import PathLike
from typing import Dict, List, Optional, Union
2024-04-18 11:57:24 +02:00
from scrapy.crawler import Crawler
2024-04-18 17:28:09 +02:00
from scrapy.pipelines.files import FilesPipeline as BaseFilesPipeline
from scrapy.pipelines.images import ImagesPipeline as BaseImagesPipeline
2024-04-18 17:28:09 +02:00
from scrapy.utils.misc import md5sum
2026-03-29 12:59:08 +02:00
import repub.utils
from repub import media
2024-04-18 17:28:09 +02:00
logger = logging.getLogger(__name__)
2024-04-18 11:57:24 +02:00
class ImagePipeline(BaseImagesPipeline):
def file_path(self, request, response=None, info=None, *, item=None):
return repub.utils.local_image_path(request.url)
2024-04-18 11:57:24 +02:00
def thumb_path(self, request, thumb_id, response=None, info=None, *, item=None):
raise NotImplementedError()
2024-04-18 11:57:24 +02:00
class FilePipeline(BaseFilesPipeline):
def __init__(self, store_uri: Union[str, PathLike], *, crawler: Crawler):
self.settings = crawler.settings
super().__init__(store_uri, crawler=crawler)
def file_path(self, request, response=None, info=None, *, item=None):
return repub.utils.local_file_path(request.url)
2024-04-18 11:57:24 +02:00
def read_asset(file_path) -> BytesIO:
buf_converted = BytesIO()
with open(file_path, "rb") as f:
buf_converted.write(f.read())
buf_converted.seek(0)
return buf_converted
2024-04-18 11:57:24 +02:00
def media_final_path(base_path, name, ext):
return f"{base_path}-{name}.{ext}"
class TranscodePipeline(BaseFilesPipeline):
def __init__(
self,
media_type: repub.utils.FileType,
store_uri: Union[str, PathLike],
*,
crawler: Crawler,
):
self.media_type = media_type
self.settings = crawler.settings
super().__init__(store_uri, crawler=crawler)
2024-04-18 11:57:24 +02:00
2024-04-18 17:28:09 +02:00
def file_downloaded(self, response, request, info, *, item=None):
return self.media_downloaded(response, request, info, item=item)
2024-04-18 17:28:09 +02:00
def media_downloaded(self, response, request, info, *, item=None):
2024-04-18 17:28:09 +02:00
checksum = None
for path, buf, meta, mime in self.get_media(response, request, info, item=item):
2024-04-18 17:28:09 +02:00
if checksum is None:
buf.seek(0)
checksum = md5sum(buf)
self.store.persist_file(
path,
buf,
info,
meta=meta,
headers={"Content-Type": mime},
2024-04-18 17:28:09 +02:00
)
return checksum
def transcode(
self, input_file: str, settings: media.MediaSettings, tmp_dir: str
) -> Optional[str]:
probe_result = media.probe_media(input_file)
params = self.get_transcode_params(probe_result, settings)
if params is not None:
converted_file = self.transcode_media(input_file, tmp_dir, params)
return converted_file
else:
logger.info(
f"Skipping audio compression for {input_file}, it meets requirements"
)
return None
def get_media_settings(self) -> List[media.MediaSettings]:
raise NotImplementedError()
def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]:
raise NotImplementedError()
def transcode_media(self, input_file: str, tmp_dir: str, params) -> str:
raise NotImplementedError()
def get_media_meta(self, probe_result) -> media.MediaMeta:
raise NotImplementedError()
def get_media(self, response, request, info, *, item=None):
2024-04-18 17:28:09 +02:00
buf = BytesIO(response.body)
base_path = self.file_path(request, response=response, info=info, item=item)
with tempfile.TemporaryDirectory() as tmp_dir:
settings = self.get_media_settings()
tmp_file = f"{tmp_dir}/original"
2024-04-18 17:28:09 +02:00
with open(tmp_file, "wb") as f:
f.write(buf.read())
for setting in settings:
ext = setting["extension"]
name = setting["name"]
final_path = media_final_path(base_path, name, ext)
stat = self.store.stat_file(final_path, info)
if stat:
logger.info(f"Skipping, transcoded media exists at {final_path}")
continue
converted_file = self.transcode(tmp_file, setting, tmp_dir)
if converted_file:
out_buf = read_asset(converted_file)
out_file = converted_file
else:
out_buf = buf
out_file = tmp_file
probe_result = media.probe_media(out_file)
meta = self.get_media_meta(probe_result)
logger.info(f"{self.media_type} final {final_path} with {meta}")
yield final_path, out_buf, meta, setting["mimetype"]
class AudioPipeline(TranscodePipeline):
DEFAULT_FILES_URLS_FIELD = "audio_urls"
DEFAULT_FILES_RESULT_FIELD = "audios"
2024-04-18 17:28:09 +02:00
@classmethod
def from_crawler(cls, crawler: Crawler):
cls._update_stores(crawler.settings)
return cls(crawler.settings["AUDIO_STORE"], crawler=crawler)
def __init__(self, store_uri: Union[str, PathLike], *, crawler: Crawler):
super().__init__(repub.utils.FileType.AUDIO, store_uri, crawler=crawler)
def file_path(self, request, response=None, info=None, *, item=None):
return repub.utils.local_audio_path(request.url)
def get_media_settings(self) -> List[media.AudioSettings]:
return self.settings["REPUBLISHER_AUDIO"]
def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]:
return media.audio_transcode_params(probe_result, settings)
2024-04-18 17:28:09 +02:00
def transcode_media(self, input_file: str, tmp_dir: str, params) -> str:
return media.transcode_audio(input_file, tmp_dir, params)
def get_media_meta(self, probe_result) -> Optional[media.AudioMeta]:
return media.audio_meta(probe_result)
class VideoPipeline(TranscodePipeline):
DEFAULT_FILES_URLS_FIELD = "video_urls"
DEFAULT_FILES_RESULT_FIELD = "videos"
2024-04-18 11:57:24 +02:00
@classmethod
def from_crawler(cls, crawler: Crawler):
cls._update_stores(crawler.settings)
return cls(crawler.settings["VIDEO_STORE"], crawler=crawler)
def __init__(self, store_uri: Union[str, PathLike], *, crawler: Crawler):
super().__init__(repub.utils.FileType.VIDEO, store_uri, crawler=crawler)
2024-04-18 11:57:24 +02:00
def file_path(self, request, response=None, info=None, *, item=None):
return repub.utils.local_video_path(request.url)
def get_media_settings(self) -> List[media.VideoSettings]:
return self.settings["REPUBLISHER_VIDEO"]
def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]:
return media.video_transcode_params(probe_result, settings)
def transcode_media(self, input_file: str, tmp_dir: str, params) -> str:
return media.transcode_video(input_file, tmp_dir, params)
def get_media_meta(self, probe_result) -> Optional[media.VideoMeta]:
return media.video_meta(probe_result)