import logging import tempfile from io import BytesIO from os import PathLike from typing import Dict, List, Optional, Union from scrapy.pipelines.files import FilesPipeline as BaseFilesPipeline from scrapy.pipelines.images import ImagesPipeline as BaseImagesPipeline from scrapy.settings import Settings from scrapy.utils.misc import md5sum import repub.utils from repub import media logger = logging.getLogger(__name__) class ImagePipeline(BaseImagesPipeline): def file_path(self, request, response=None, info=None, *, item=None): return repub.utils.local_image_path(request.url) def thumb_path(self, request, thumb_id, response=None, info=None, *, item=None): raise NotImplementedError() class FilePipeline(BaseFilesPipeline): def __init__(self, store_uri, **kwargs): settings = kwargs["settings"] if isinstance(settings, dict) or settings is None: settings = Settings(settings) self.settings = settings super().__init__(store_uri, **kwargs) def file_path(self, request, response=None, info=None, *, item=None): return repub.utils.local_file_path(request.url) def read_asset(file_path) -> BytesIO: buf_converted = BytesIO() with open(file_path, "rb") as f: buf_converted.write(f.read()) buf_converted.seek(0) return buf_converted def media_final_path(base_path, name, ext): return f"{base_path}-{name}.{ext}" class TranscodePipeline(BaseFilesPipeline): def __init__( self, media_type: repub.utils.FileType, store_uri: Union[str, PathLike], **kwargs, ): settings = kwargs["settings"] self.media_type = media_type if isinstance(settings, dict) or settings is None: settings = Settings(settings) self.settings = settings super().__init__(store_uri, **kwargs) def file_downloaded(self, response, request, info, *, item=None): return self.media_downloaded(response, request, info, item=item) def media_downloaded(self, response, request, info, *, item=None): checksum = None for path, buf, meta, mime in self.get_media(response, request, info, item=item): if checksum is None: buf.seek(0) checksum = md5sum(buf) self.store.persist_file( path, buf, info, meta=meta, headers={"Content-Type": mime}, ) return checksum def transcode( self, input_file: str, settings: media.MediaSettings, tmp_dir: str ) -> Optional[str]: probe_result = media.probe_media(input_file) params = self.get_transcode_params(probe_result, settings) if params is not None: converted_file = self.transcode_media(input_file, tmp_dir, params) return converted_file else: logger.info( f"Skipping audio compression for {input_file}, it meets requirements" ) return None def get_media_settings(self) -> List[media.MediaSettings]: raise NotImplementedError() def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]: raise NotImplementedError() def transcode_media(self, input_file: str, tmp_dir: str, params) -> str: raise NotImplementedError() def get_media_meta(self, probe_result) -> media.MediaMeta: raise NotImplementedError() def get_media(self, response, request, info, *, item=None): buf = BytesIO(response.body) base_path = self.file_path(request, response=response, info=info, item=item) with tempfile.TemporaryDirectory() as tmp_dir: settings = self.get_media_settings() tmp_file = f"{tmp_dir}/original" with open(tmp_file, "wb") as f: f.write(buf.read()) for setting in settings: ext = setting["extension"] name = setting["name"] final_path = media_final_path(base_path, name, ext) stat = self.store.stat_file(final_path, info) if stat: logger.info(f"Skipping, transcoded media exists at {final_path}") continue converted_file = self.transcode(tmp_file, setting, tmp_dir) if converted_file: out_buf = read_asset(converted_file) out_file = converted_file else: out_buf = buf out_file = tmp_file probe_result = media.probe_media(out_file) meta = self.get_media_meta(probe_result) logger.info(f"{self.media_type} final {final_path} with {meta}") yield final_path, out_buf, meta, setting["mimetype"] class AudioPipeline(TranscodePipeline): DEFAULT_FILES_URLS_FIELD = "audio_urls" DEFAULT_FILES_RESULT_FIELD = "audios" def __init__(self, store_uri: Union[str, PathLike], **kwargs): store_uri = kwargs["settings"]["AUDIO_STORE"] super().__init__(repub.utils.FileType.AUDIO, store_uri, **kwargs) def file_path(self, request, response=None, info=None, *, item=None): return repub.utils.local_audio_path(request.url) def get_media_settings(self) -> List[media.AudioSettings]: return self.settings["REPUBLISHER_AUDIO"] def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]: return media.audio_transcode_params(probe_result, settings) def transcode_media(self, input_file: str, tmp_dir: str, params) -> str: return media.transcode_audio(input_file, tmp_dir, params) def get_media_meta(self, probe_result) -> Optional[media.AudioMeta]: return media.audio_meta(probe_result) class VideoPipeline(TranscodePipeline): DEFAULT_FILES_URLS_FIELD = "video_urls" DEFAULT_FILES_RESULT_FIELD = "videos" def __init__(self, store_uri: Union[str, PathLike], **kwargs): store_uri = kwargs["settings"]["VIDEO_STORE"] super().__init__(repub.utils.FileType.VIDEO, store_uri, **kwargs) def file_path(self, request, response=None, info=None, *, item=None): return repub.utils.local_video_path(request.url) def get_media_settings(self) -> List[media.VideoSettings]: return self.settings["REPUBLISHER_VIDEO"] def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]: return media.video_transcode_params(probe_result, settings) def transcode_media(self, input_file: str, tmp_dir: str, params) -> str: return media.transcode_video(input_file, tmp_dir, params) def get_media_meta(self, probe_result) -> Optional[media.VideoMeta]: return media.video_meta(probe_result)