187 lines
6.8 KiB
Python
187 lines
6.8 KiB
Python
import logging
|
|
import tempfile
|
|
from io import BytesIO
|
|
from os import PathLike
|
|
from typing import Dict, List, Optional, Union
|
|
|
|
from scrapy.crawler import Crawler
|
|
from scrapy.pipelines.files import FilesPipeline as BaseFilesPipeline
|
|
from scrapy.pipelines.images import ImagesPipeline as BaseImagesPipeline
|
|
from scrapy.utils.misc import md5sum
|
|
|
|
import repub.utils
|
|
from repub import media
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ImagePipeline(BaseImagesPipeline):
|
|
def file_path(self, request, response=None, info=None, *, item=None):
|
|
return repub.utils.local_image_path(request.url)
|
|
|
|
def thumb_path(self, request, thumb_id, response=None, info=None, *, item=None):
|
|
raise NotImplementedError()
|
|
|
|
|
|
class FilePipeline(BaseFilesPipeline):
|
|
def __init__(self, store_uri: Union[str, PathLike], *, crawler: Crawler):
|
|
self.settings = crawler.settings
|
|
super().__init__(store_uri, crawler=crawler)
|
|
|
|
def file_path(self, request, response=None, info=None, *, item=None):
|
|
return repub.utils.local_file_path(request.url)
|
|
|
|
|
|
def read_asset(file_path) -> BytesIO:
|
|
buf_converted = BytesIO()
|
|
with open(file_path, "rb") as f:
|
|
buf_converted.write(f.read())
|
|
buf_converted.seek(0)
|
|
return buf_converted
|
|
|
|
|
|
def media_final_path(base_path, name, ext):
|
|
return f"{base_path}-{name}.{ext}"
|
|
|
|
|
|
class TranscodePipeline(BaseFilesPipeline):
|
|
def __init__(
|
|
self,
|
|
media_type: repub.utils.FileType,
|
|
store_uri: Union[str, PathLike],
|
|
*,
|
|
crawler: Crawler,
|
|
):
|
|
self.media_type = media_type
|
|
self.settings = crawler.settings
|
|
super().__init__(store_uri, crawler=crawler)
|
|
|
|
def file_downloaded(self, response, request, info, *, item=None):
|
|
return self.media_downloaded(response, request, info, item=item)
|
|
|
|
def media_downloaded(self, response, request, info, *, item=None):
|
|
checksum = None
|
|
for path, buf, meta, mime in self.get_media(response, request, info, item=item):
|
|
if checksum is None:
|
|
buf.seek(0)
|
|
checksum = md5sum(buf)
|
|
self.store.persist_file(
|
|
path,
|
|
buf,
|
|
info,
|
|
meta=meta,
|
|
headers={"Content-Type": mime},
|
|
)
|
|
return checksum
|
|
|
|
def transcode(
|
|
self, input_file: str, settings: media.MediaSettings, tmp_dir: str
|
|
) -> Optional[str]:
|
|
probe_result = media.probe_media(input_file)
|
|
params = self.get_transcode_params(probe_result, settings)
|
|
if params is not None:
|
|
converted_file = self.transcode_media(input_file, tmp_dir, params)
|
|
return converted_file
|
|
else:
|
|
logger.info(
|
|
f"Skipping audio compression for {input_file}, it meets requirements"
|
|
)
|
|
return None
|
|
|
|
def get_media_settings(self) -> List[media.MediaSettings]:
|
|
raise NotImplementedError()
|
|
|
|
def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]:
|
|
raise NotImplementedError()
|
|
|
|
def transcode_media(self, input_file: str, tmp_dir: str, params) -> str:
|
|
raise NotImplementedError()
|
|
|
|
def get_media_meta(self, probe_result) -> media.MediaMeta:
|
|
raise NotImplementedError()
|
|
|
|
def get_media(self, response, request, info, *, item=None):
|
|
buf = BytesIO(response.body)
|
|
base_path = self.file_path(request, response=response, info=info, item=item)
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
settings = self.get_media_settings()
|
|
tmp_file = f"{tmp_dir}/original"
|
|
with open(tmp_file, "wb") as f:
|
|
f.write(buf.read())
|
|
for setting in settings:
|
|
ext = setting["extension"]
|
|
name = setting["name"]
|
|
final_path = media_final_path(base_path, name, ext)
|
|
stat = self.store.stat_file(final_path, info)
|
|
if stat:
|
|
logger.info(f"Skipping, transcoded media exists at {final_path}")
|
|
continue
|
|
converted_file = self.transcode(tmp_file, setting, tmp_dir)
|
|
if converted_file:
|
|
out_buf = read_asset(converted_file)
|
|
out_file = converted_file
|
|
else:
|
|
out_buf = buf
|
|
out_file = tmp_file
|
|
probe_result = media.probe_media(out_file)
|
|
meta = self.get_media_meta(probe_result)
|
|
logger.info(f"{self.media_type} final {final_path} with {meta}")
|
|
yield final_path, out_buf, meta, setting["mimetype"]
|
|
|
|
|
|
class AudioPipeline(TranscodePipeline):
|
|
|
|
DEFAULT_FILES_URLS_FIELD = "audio_urls"
|
|
DEFAULT_FILES_RESULT_FIELD = "audios"
|
|
|
|
@classmethod
|
|
def from_crawler(cls, crawler: Crawler):
|
|
cls._update_stores(crawler.settings)
|
|
return cls(crawler.settings["AUDIO_STORE"], crawler=crawler)
|
|
|
|
def __init__(self, store_uri: Union[str, PathLike], *, crawler: Crawler):
|
|
super().__init__(repub.utils.FileType.AUDIO, store_uri, crawler=crawler)
|
|
|
|
def file_path(self, request, response=None, info=None, *, item=None):
|
|
return repub.utils.local_audio_path(request.url)
|
|
|
|
def get_media_settings(self) -> List[media.AudioSettings]:
|
|
return self.settings["REPUBLISHER_AUDIO"]
|
|
|
|
def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]:
|
|
return media.audio_transcode_params(probe_result, settings)
|
|
|
|
def transcode_media(self, input_file: str, tmp_dir: str, params) -> str:
|
|
return media.transcode_audio(input_file, tmp_dir, params)
|
|
|
|
def get_media_meta(self, probe_result) -> Optional[media.AudioMeta]:
|
|
return media.audio_meta(probe_result)
|
|
|
|
|
|
class VideoPipeline(TranscodePipeline):
|
|
|
|
DEFAULT_FILES_URLS_FIELD = "video_urls"
|
|
DEFAULT_FILES_RESULT_FIELD = "videos"
|
|
|
|
@classmethod
|
|
def from_crawler(cls, crawler: Crawler):
|
|
cls._update_stores(crawler.settings)
|
|
return cls(crawler.settings["VIDEO_STORE"], crawler=crawler)
|
|
|
|
def __init__(self, store_uri: Union[str, PathLike], *, crawler: Crawler):
|
|
super().__init__(repub.utils.FileType.VIDEO, store_uri, crawler=crawler)
|
|
|
|
def file_path(self, request, response=None, info=None, *, item=None):
|
|
return repub.utils.local_video_path(request.url)
|
|
|
|
def get_media_settings(self) -> List[media.VideoSettings]:
|
|
return self.settings["REPUBLISHER_VIDEO"]
|
|
|
|
def get_transcode_params(self, probe_result, settings) -> Optional[Dict[str, str]]:
|
|
return media.video_transcode_params(probe_result, settings)
|
|
|
|
def transcode_media(self, input_file: str, tmp_dir: str, params) -> str:
|
|
return media.transcode_video(input_file, tmp_dir, params)
|
|
|
|
def get_media_meta(self, probe_result) -> Optional[media.VideoMeta]:
|
|
return media.video_meta(probe_result)
|