from io import BytesIO from typing import Any from lxml.etree import QName from scrapy.exporters import BaseItemExporter from repub import rss from repub.items import ( ChannelElementItem, ElementItem, MediaVariant, ThumbnailVariant, TranscodedImageFile, TranscodedMediaFile, ) from repub.utils import FileType, determine_file_type MEDIA_CONTENT_TAG = QName(rss.nsmap["media"], "content").text MEDIA_GROUP_TAG = QName(rss.nsmap["media"], "group").text MEDIA_THUMBNAIL_TAG = QName(rss.nsmap["media"], "thumbnail").text ANYNEWS_SLOT_ATTR = QName(rss.nsmap["anynews"], "slot").text ANYNEWS_TYPE_ATTR = QName(rss.nsmap["anynews"], "type").text class RssExporter(BaseItemExporter): def __init__(self, file: BytesIO, **kwargs: Any): super().__init__(**kwargs) if not self.encoding: self.encoding = "utf-8" self.file: BytesIO = file self.rss = rss.rss() self.channel = None self.item_buffer = [] def start_exporting(self) -> None: pass def export_item(self, item: Any): if isinstance(item, ChannelElementItem): self.channel = item.el self.rss.append(item.el) self.flush_buffer() return if self.channel is None: self.item_buffer.append(item) else: self.export_rss_item(item) def flush_buffer(self): for item in self.item_buffer: self.export_rss_item(item) self.item_buffer = [] def compact_attrib(self, **attrib): return { key: str(value) for key, value in attrib.items() if value not in (None, "") } def canonical_variant( self, media_file: TranscodedMediaFile | TranscodedImageFile ) -> MediaVariant | None: for variant in media_file["variants"]: if variant.get("isDefault") == "true": return variant if media_file["variants"]: return media_file["variants"][0] return None def rebuild_enclosures(self, item: ElementItem) -> None: audio_lookup = {audio["published_url"]: audio for audio in item.audios} for enclosure in item.el.findall("enclosure"): media_file = audio_lookup.get(enclosure.get("url", "")) if media_file is None: continue canonical = self.canonical_variant(media_file) if canonical is None: continue enclosure.attrib.clear() enclosure.attrib.update( self.compact_attrib( url=canonical.get("url"), length=canonical.get("fileSize") or enclosure.get("length"), type=canonical.get("type") or enclosure.get("type"), ) ) def owned_media_type(self, el, managed_types: set[FileType]) -> FileType | None: url = el.get("url", "") file_type = determine_file_type( url=url, medium=el.get("medium"), mimetype=el.get("type"), ) if file_type in managed_types: return file_type return None def strip_managed_media_nodes(self, item: ElementItem) -> dict[str, dict[str, str]]: fallbacks: dict[str, dict[str, str]] = {} managed_types: set[FileType] = set() if self.managed_image_files(item): managed_types.add(FileType.IMAGE) if item.audios: managed_types.add(FileType.AUDIO) if item.videos: managed_types.add(FileType.VIDEO) if not managed_types: return fallbacks for child in list(item.el): if child.tag == MEDIA_THUMBNAIL_TAG and FileType.IMAGE in managed_types: item.el.remove(child) continue if child.tag == MEDIA_CONTENT_TAG: if self.owned_media_type(child, managed_types) is None: continue fallbacks[child.get("url", "")] = { key: value for key, value in child.attrib.items() if key in {"expression", "lang"} } item.el.remove(child) continue if child.tag != MEDIA_GROUP_TAG: continue managed_image_group = False for media_content in list(child): if media_content.tag != MEDIA_CONTENT_TAG: continue owned_type = self.owned_media_type(media_content, managed_types) if owned_type is None: continue if owned_type == FileType.IMAGE: managed_image_group = True fallbacks[media_content.get("url", "")] = { key: value for key, value in media_content.attrib.items() if key in {"expression", "lang"} } child.remove(media_content) if managed_image_group: for media_thumbnail in list(child): if media_thumbnail.tag == MEDIA_THUMBNAIL_TAG: child.remove(media_thumbnail) if len(child) == 0: item.el.remove(child) return fallbacks def managed_image_files(self, item: ElementItem) -> list[TranscodedImageFile]: media_image_urls = set(item.media_image_urls) if not media_image_urls: return [] return [image for image in item.images if image["url"] in media_image_urls] def append_media_groups( self, item: ElementItem, fallbacks: dict[str, dict[str, str]] ): for media_file in [ *self.managed_image_files(item), *item.audios, *item.videos, ]: if not media_file["variants"]: continue fallback_attrib = fallbacks.get(media_file["published_url"], {}) group = rss.MEDIA.group( *[ rss.MEDIA.content( **self.media_content_attrib(variant, fallback_attrib) ) for variant in media_file["variants"] ], *[ rss.MEDIA.thumbnail(**self.media_thumbnail_attrib(thumbnail)) for thumbnail in media_file.get("thumbnails", []) ], ) if group is not None: item.el.append(group) def media_content_attrib( self, variant: MediaVariant, fallback_attrib: dict[str, str] ) -> dict[str, str]: attrib = dict(fallback_attrib) attrib.update( self.compact_attrib( url=variant.get("url"), type=variant.get("type"), medium=variant.get("medium"), isDefault=variant.get("isDefault"), expression=variant.get("expression"), bitrate=variant.get("bitrate"), framerate=variant.get("framerate"), samplingrate=variant.get("samplingrate"), channels=variant.get("channels"), duration=variant.get("duration"), height=variant.get("height"), width=variant.get("width"), lang=variant.get("lang"), fileSize=variant.get("fileSize"), ) ) return attrib def media_thumbnail_attrib(self, thumbnail: ThumbnailVariant) -> dict[str, str]: attrib = self.compact_attrib( url=thumbnail.get("url"), width=thumbnail.get("width"), height=thumbnail.get("height"), ) if thumbnail.get("slot"): attrib[ANYNEWS_SLOT_ATTR] = str(thumbnail["slot"]) if thumbnail.get("type"): attrib[ANYNEWS_TYPE_ATTR] = str(thumbnail["type"]) return attrib def apply_transcoded_media(self, item: Any) -> None: if not isinstance(item, ElementItem): return if not self.managed_image_files(item) and not item.audios and not item.videos: return self.rebuild_enclosures(item) fallbacks = self.strip_managed_media_nodes(item) self.append_media_groups(item, fallbacks) def export_rss_item(self, item: Any): assert self.channel is not None self.apply_transcoded_media(item) self.channel.append(item.el) def finish_exporting(self) -> None: xml_bytes = rss.serialize(self.rss) self.file.write(xml_bytes)