republisher/repub/rss.py

136 lines
3.9 KiB
Python

from datetime import datetime
from time import mktime
import lxml.etree as ET
import lxml.html
from lxml import etree
from lxml.builder import ElementMaker
from lxml.etree import Element
from .srcset import SRCSet
# monkeypatch lxml.html.defs to support srcset as a link attr
link_attrs_orig = lxml.html.defs.link_attrs
lxml.html.defs.link_attrs = frozenset(link_attrs_orig.union({"srcset"}))
class SafeElementMaker:
"""
Wraps ElementMaker to silently drop None values
"""
def __init__(self, **kwargs):
self._maker = ElementMaker(**kwargs)
def __getattr__(self, tag):
def safe_element(*children, **attrib):
valid_children = [
child
for child in children
if child is not None and (not isinstance(child, str) or child.strip())
]
if valid_children or attrib:
if isinstance(tag, str):
return self._maker.__getattr__(tag)(*valid_children, **attrib)
elif issubclass(tag, Element):
return tag(*valid_children, **attrib)
return safe_element
nsmap = {
"content": "http://purl.org/rss/1.0/modules/content/",
"media": "http://search.yahoo.com/mrss/",
"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd",
"dc": "http://purl.org/dc/elements/1.1/",
"atom": "http://www.w3.org/2005/Atom",
}
CONTENT = SafeElementMaker(nsmap={None: nsmap["content"]}, namespace=nsmap["content"])
MEDIA = SafeElementMaker(nsmap={None: nsmap["media"]}, namespace=nsmap["media"])
ITUNES = SafeElementMaker(nsmap={None: nsmap["itunes"]}, namespace=nsmap["itunes"])
DC = SafeElementMaker(nsmap={None: nsmap["dc"]}, namespace=nsmap["dc"])
ATOM = SafeElementMaker(nsmap={None: nsmap["atom"]}, namespace=nsmap["atom"])
E: ElementMaker = SafeElementMaker(nsmap=nsmap)
CDATA = ET.CDATA
def rss():
return E.rss({"version": "2.0"})
def parse_pubdate(date_str):
try:
return datetime.strptime(date_str, "%a, %d %b %Y %H:%M:%S %z")
except ValueError:
return datetime.min
def sort_rss(root):
channel = root.find("channel")
items = list(channel.findall("item"))
for item in items:
channel.remove(item)
items.sort(
key=lambda x: parse_pubdate(
x.find("pubDate").text if x.find("pubDate") is not None else ""
),
reverse=True,
)
for item in items:
channel.append(item)
return root
def serialize(root):
# root = sort_rss(root)
return etree.tostring(
root, encoding="utf-8", xml_declaration=True, pretty_print=True
)
def date_format(d):
if d:
return d.strftime("%a, %d %b %Y %H:%M:%S %z")
def to_datetime(struct_time):
if struct_time:
return datetime.fromtimestamp(mktime(struct_time))
def normalize_date(struct_time):
return date_format(to_datetime(struct_time))
def munge_cdata_html(raw_html, replace_link_fn) -> str:
html = lxml.html.fromstring(raw_html)
for el, attr, link, pos in html.iterlinks():
if attr == "srcset":
# these are a messy special case
o = SRCSet(el.attrib["srcset"])
o.parse()
for c in o.candidates:
link = c["url"]
new_link = replace_link_fn(el, attr, link.strip())
c["url"] = new_link
el.set(attr, o.stringify())
continue
new_link = replace_link_fn(el, attr, link.strip())
if new_link == link:
continue
if attr is None:
new = el.text[:pos] + new_link + el.text[pos + len(link) :]
el.text = new
else:
cur = el.get(attr)
if not pos and len(cur) == len(link):
new = new_link # most common case
else:
new = cur[:pos] + new_link + cur[pos + len(link) :]
el.set(attr, new)
return lxml.html.tostring(html, encoding="utf-8", pretty_print=True).decode("utf-8")