136 lines
3.9 KiB
Python
136 lines
3.9 KiB
Python
from datetime import datetime
|
|
from time import mktime
|
|
|
|
import lxml.etree as ET
|
|
import lxml.html
|
|
from lxml import etree
|
|
from lxml.builder import ElementMaker
|
|
from lxml.etree import Element
|
|
|
|
from .srcset import SRCSet
|
|
|
|
# monkeypatch lxml.html.defs to support srcset as a link attr
|
|
link_attrs_orig = lxml.html.defs.link_attrs
|
|
lxml.html.defs.link_attrs = frozenset(link_attrs_orig.union({"srcset"}))
|
|
|
|
|
|
class SafeElementMaker:
|
|
"""
|
|
Wraps ElementMaker to silently drop None values
|
|
"""
|
|
|
|
def __init__(self, **kwargs):
|
|
self._maker = ElementMaker(**kwargs)
|
|
|
|
def __getattr__(self, tag):
|
|
def safe_element(*children, **attrib):
|
|
valid_children = [
|
|
child
|
|
for child in children
|
|
if child is not None and (not isinstance(child, str) or child.strip())
|
|
]
|
|
if valid_children or attrib:
|
|
if isinstance(tag, str):
|
|
return self._maker.__getattr__(tag)(*valid_children, **attrib)
|
|
elif issubclass(tag, Element):
|
|
return tag(*valid_children, **attrib)
|
|
|
|
return safe_element
|
|
|
|
|
|
nsmap = {
|
|
"content": "http://purl.org/rss/1.0/modules/content/",
|
|
"media": "http://search.yahoo.com/mrss/",
|
|
"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd",
|
|
"dc": "http://purl.org/dc/elements/1.1/",
|
|
"atom": "http://www.w3.org/2005/Atom",
|
|
}
|
|
|
|
CONTENT = SafeElementMaker(nsmap={None: nsmap["content"]}, namespace=nsmap["content"])
|
|
MEDIA = SafeElementMaker(nsmap={None: nsmap["media"]}, namespace=nsmap["media"])
|
|
ITUNES = SafeElementMaker(nsmap={None: nsmap["itunes"]}, namespace=nsmap["itunes"])
|
|
DC = SafeElementMaker(nsmap={None: nsmap["dc"]}, namespace=nsmap["dc"])
|
|
ATOM = SafeElementMaker(nsmap={None: nsmap["atom"]}, namespace=nsmap["atom"])
|
|
E: ElementMaker = SafeElementMaker(nsmap=nsmap)
|
|
CDATA = ET.CDATA
|
|
|
|
|
|
def rss():
|
|
return E.rss({"version": "2.0"})
|
|
|
|
|
|
def parse_pubdate(date_str):
|
|
try:
|
|
return datetime.strptime(date_str, "%a, %d %b %Y %H:%M:%S %z")
|
|
except ValueError:
|
|
return datetime.min
|
|
|
|
|
|
def sort_rss(root):
|
|
channel = root.find("channel")
|
|
items = list(channel.findall("item"))
|
|
for item in items:
|
|
channel.remove(item)
|
|
|
|
items.sort(
|
|
key=lambda x: parse_pubdate(
|
|
x.find("pubDate").text if x.find("pubDate") is not None else ""
|
|
),
|
|
reverse=True,
|
|
)
|
|
|
|
for item in items:
|
|
channel.append(item)
|
|
return root
|
|
|
|
|
|
def serialize(root):
|
|
# root = sort_rss(root)
|
|
return etree.tostring(
|
|
root, encoding="utf-8", xml_declaration=True, pretty_print=True
|
|
)
|
|
|
|
|
|
def date_format(d):
|
|
if d:
|
|
return d.strftime("%a, %d %b %Y %H:%M:%S %z")
|
|
|
|
|
|
def to_datetime(struct_time):
|
|
if struct_time:
|
|
return datetime.fromtimestamp(mktime(struct_time))
|
|
|
|
|
|
def normalize_date(struct_time):
|
|
return date_format(to_datetime(struct_time))
|
|
|
|
|
|
def munge_cdata_html(raw_html, replace_link_fn) -> str:
|
|
html = lxml.html.fromstring(raw_html)
|
|
for el, attr, link, pos in html.iterlinks():
|
|
if attr == "srcset":
|
|
# these are a messy special case
|
|
o = SRCSet(el.attrib["srcset"])
|
|
o.parse()
|
|
for c in o.candidates:
|
|
link = c["url"]
|
|
new_link = replace_link_fn(el, attr, link.strip())
|
|
c["url"] = new_link
|
|
|
|
el.set(attr, o.stringify())
|
|
continue
|
|
|
|
new_link = replace_link_fn(el, attr, link.strip())
|
|
if new_link == link:
|
|
continue
|
|
if attr is None:
|
|
new = el.text[:pos] + new_link + el.text[pos + len(link) :]
|
|
el.text = new
|
|
else:
|
|
cur = el.get(attr)
|
|
if not pos and len(cur) == len(link):
|
|
new = new_link # most common case
|
|
else:
|
|
new = cur[:pos] + new_link + cur[pos + len(link) :]
|
|
el.set(attr, new)
|
|
return lxml.html.tostring(html, encoding="utf-8", pretty_print=True).decode("utf-8")
|