republisher/repub/rss.py

98 lines
2.6 KiB
Python

import lxml.etree as ET
from lxml import etree
from lxml.builder import ElementMaker
from lxml.etree import Element
class SafeElementMaker:
"""
Wraps ElementMaker to silently drop None values
"""
def __init__(self, **kwargs):
self._maker = ElementMaker(**kwargs)
def __getattr__(self, tag):
def safe_element(*children, **attrib):
valid_children = [
child
for child in children
if child is not None and (not isinstance(child, str) or child.strip())
]
if valid_children or attrib:
if isinstance(tag, str):
return self._maker.__getattr__(tag)(*valid_children, **attrib)
elif issubclass(tag, Element):
return tag(*valid_children, **attrib)
return safe_element
nsmap = {
"content": "http://purl.org/rss/1.0/modules/content/",
"media": "http://search.yahoo.com/mrss/",
"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd",
"dc": "http://purl.org/dc/elements/1.1/",
"atom": "http://www.w3.org/2005/Atom",
}
CONTENT = SafeElementMaker(nsmap={None: nsmap["content"]}, namespace=nsmap["content"])
MEDIA = SafeElementMaker(nsmap={None: nsmap["media"]}, namespace=nsmap["media"])
ITUNES = SafeElementMaker(nsmap={None: nsmap["itunes"]}, namespace=nsmap["itunes"])
DC = SafeElementMaker(nsmap={None: nsmap["dc"]}, namespace=nsmap["dc"])
ATOM = SafeElementMaker(nsmap={None: nsmap["atom"]}, namespace=nsmap["atom"])
E: ElementMaker = SafeElementMaker(nsmap=nsmap)
CDATA = ET.CDATA
from datetime import datetime
from time import mktime
def rss():
return E.rss({"version": "2.0"})
def parse_pubdate(date_str):
try:
return datetime.strptime(date_str, "%a, %d %b %Y %H:%M:%S %z")
except ValueError:
return datetime.min
def sort_rss(root):
channel = root.find("channel")
items = list(channel.findall("item"))
for item in items:
channel.remove(item)
items.sort(
key=lambda x: parse_pubdate(
x.find("pubDate").text if x.find("pubDate") is not None else ""
),
reverse=True,
)
for item in items:
channel.append(item)
return root
def serialize(root):
# root = sort_rss(root)
return etree.tostring(
root, encoding="utf-8", xml_declaration=True, pretty_print=True
)
def date_format(d):
if d:
return d.strftime("%a, %d %b %Y %H:%M:%S %z")
def to_datetime(struct_time):
if struct_time:
return datetime.fromtimestamp(mktime(struct_time))
def normalize_date(struct_time):
return date_format(to_datetime(struct_time))