Replace image pipeline with profile-driven variants

- add image normalization profiles and thumbnail profiles
- generate source, full-size variant, and thumbnail image artifacts
- rewrite canonical image URLs through the first configured profile
- emit explicit image Media RSS groups with named thumbnails
- preserve legacy image paths when image conversion is disabled
- cover cache-hit source paths, inline image handling, and thumbnail export
This commit is contained in:
Abel Luck 2026-05-27 09:24:22 +02:00
parent 7316d4723f
commit 525393272e
13 changed files with 1299 additions and 124 deletions

View file

@ -9,12 +9,17 @@ from repub.items import (
ChannelElementItem,
ElementItem,
MediaVariant,
ThumbnailVariant,
TranscodedImageFile,
TranscodedMediaFile,
)
from repub.utils import FileType, determine_file_type
MEDIA_CONTENT_TAG = QName(rss.nsmap["media"], "content").text
MEDIA_GROUP_TAG = QName(rss.nsmap["media"], "group").text
MEDIA_THUMBNAIL_TAG = QName(rss.nsmap["media"], "thumbnail").text
ANYNEWS_SLOT_ATTR = QName(rss.nsmap["anynews"], "slot").text
ANYNEWS_TYPE_ATTR = QName(rss.nsmap["anynews"], "type").text
class RssExporter(BaseItemExporter):
@ -52,7 +57,9 @@ class RssExporter(BaseItemExporter):
key: str(value) for key, value in attrib.items() if value not in (None, "")
}
def canonical_variant(self, media_file: TranscodedMediaFile) -> MediaVariant | None:
def canonical_variant(
self, media_file: TranscodedMediaFile | TranscodedImageFile
) -> MediaVariant | None:
for variant in media_file["variants"]:
if variant.get("isDefault") == "true":
return variant
@ -92,6 +99,8 @@ class RssExporter(BaseItemExporter):
def strip_managed_media_nodes(self, item: ElementItem) -> dict[str, dict[str, str]]:
fallbacks: dict[str, dict[str, str]] = {}
managed_types: set[FileType] = set()
if self.managed_image_files(item):
managed_types.add(FileType.IMAGE)
if item.audios:
managed_types.add(FileType.AUDIO)
if item.videos:
@ -100,6 +109,9 @@ class RssExporter(BaseItemExporter):
return fallbacks
for child in list(item.el):
if child.tag == MEDIA_THUMBNAIL_TAG and FileType.IMAGE in managed_types:
item.el.remove(child)
continue
if child.tag == MEDIA_CONTENT_TAG:
if self.owned_media_type(child, managed_types) is None:
continue
@ -113,25 +125,43 @@ class RssExporter(BaseItemExporter):
if child.tag != MEDIA_GROUP_TAG:
continue
managed_image_group = False
for media_content in list(child):
if media_content.tag != MEDIA_CONTENT_TAG:
continue
if self.owned_media_type(media_content, managed_types) is None:
owned_type = self.owned_media_type(media_content, managed_types)
if owned_type is None:
continue
if owned_type == FileType.IMAGE:
managed_image_group = True
fallbacks[media_content.get("url", "")] = {
key: value
for key, value in media_content.attrib.items()
if key in {"expression", "lang"}
}
child.remove(media_content)
if managed_image_group:
for media_thumbnail in list(child):
if media_thumbnail.tag == MEDIA_THUMBNAIL_TAG:
child.remove(media_thumbnail)
if len(child) == 0:
item.el.remove(child)
return fallbacks
def managed_image_files(self, item: ElementItem) -> list[TranscodedImageFile]:
media_image_urls = set(item.media_image_urls)
if not media_image_urls:
return []
return [image for image in item.images if image["url"] in media_image_urls]
def append_media_groups(
self, item: ElementItem, fallbacks: dict[str, dict[str, str]]
):
for media_file in [*item.audios, *item.videos]:
for media_file in [
*self.managed_image_files(item),
*item.audios,
*item.videos,
]:
if not media_file["variants"]:
continue
fallback_attrib = fallbacks.get(media_file["published_url"], {})
@ -141,7 +171,11 @@ class RssExporter(BaseItemExporter):
**self.media_content_attrib(variant, fallback_attrib)
)
for variant in media_file["variants"]
]
],
*[
rss.MEDIA.thumbnail(**self.media_thumbnail_attrib(thumbnail))
for thumbnail in media_file.get("thumbnails", [])
],
)
if group is not None:
item.el.append(group)
@ -170,10 +204,22 @@ class RssExporter(BaseItemExporter):
)
return attrib
def media_thumbnail_attrib(self, thumbnail: ThumbnailVariant) -> dict[str, str]:
attrib = self.compact_attrib(
url=thumbnail.get("url"),
width=thumbnail.get("width"),
height=thumbnail.get("height"),
)
if thumbnail.get("slot"):
attrib[ANYNEWS_SLOT_ATTR] = str(thumbnail["slot"])
if thumbnail.get("type"):
attrib[ANYNEWS_TYPE_ATTR] = str(thumbnail["type"])
return attrib
def apply_transcoded_media(self, item: Any) -> None:
if not isinstance(item, ElementItem):
return
if not item.audios and not item.videos:
if not self.managed_image_files(item) and not item.audios and not item.videos:
return
self.rebuild_enclosures(item)
fallbacks = self.strip_managed_media_nodes(item)