republisher/repub/spiders/rss_spider.py
2024-04-18 11:57:24 +02:00

168 lines
6.3 KiB
Python

from scrapy.spiders import Spider
from scrapy.utils.spider import iterate_spider_output
from repub.items import (
ChannelElementItem,
ElementItem,
)
import feedparser
import logging
from repub.rss import E, ITUNES, CONTENT, MEDIA, CDATA, normalize_date
class BaseRssFeedSpider(Spider):
"""
This class intends to be the base class for spiders that scrape
from RSS feeds.
"""
def parse_feed(self, feed_text):
parsed = feedparser.parse(feed_text, sanitize_html=False)
if parsed.bozo:
logging.error(
"Bozo feed data. %s: %r",
parsed.bozo_exception.__class__.__name__,
parsed.bozo_exception,
)
if hasattr(parsed.bozo_exception, "getLineNumber") and hasattr(
parsed.bozo_exception, "getMessage"
):
line = parsed.bozo_exception.getLineNumber()
logging.error("Line %d: %s", line, parsed.bozo_exception.getMessage())
segment = feed_text.split("\n")[line - 1]
logging.info("Body segment with error: %r", segment)
return None
return parsed
def parse_channel_meta(self, response, feed):
f = feed.feed
channel = E.channel(
E.title(f.get("title")),
E.link(f.get("link")),
E.description(f.get("description")),
E.language(f.get("language")),
E.copyright(f.get("copyright")),
E.webMaster(f.get("publisher")),
E.generator(f.get("generator")),
E.pubDate(normalize_date(f.get("published_parsed"))),
E.lastBuildDate(normalize_date(f.get("updated_parsed"))),
ITUNES.explicit("yes" if f.get("itunes_explicit", False) else "no"),
)
for tag in f.get("tags", []):
channel.append(E.category(tag.term))
if "image" in f:
if "href" in f.image:
image = E.image(
E.title(f.get("title")),
E.link(f.get("link")),
E.url(f.image.get("href")),
E.description(f.get("description")),
)
else:
image = E.image(
E.title(f.image.get("title")),
E.link(f.image.get("link")),
E.url(f.image.get("url")),
E.description(f.image.get("description")),
E.width(f.image.get("width")),
E.height(f.image.get("height")),
)
channel.append(image)
return ChannelElementItem(el=channel)
def _parse(self, response, **kwargs):
response = self.adapt_response(response)
feed = self.parse_feed(response.body)
if feed and feed.feed:
return self.parse_entries(response, feed)
def parse_entry(self, response, feed, entry):
"""This method must be overridden with your custom spider functionality"""
raise NotImplementedError
def parse_entries(self, response, feed):
channel = self.parse_channel_meta(response, feed)
yield channel
for entry in feed.entries:
ret = iterate_spider_output(self.parse_entry(response, feed, entry))
yield from self.process_results(response, feed, ret)
def process_results(self, response, feed, results):
"""This overridable method is called for each result (item or request)
returned by the spider, and it's intended to perform any last time
processing required before returning the results to the framework core,
for example setting the item GUIDs. It receives a list of results and
the response which originated that results. It must return a list of
results (items or requests).
"""
return results
def adapt_response(self, response):
"""You can override this function in order to make any changes you want
to into the feed before parsing it. This function must return a
response.
"""
return response
class RssFeedSpider(BaseRssFeedSpider):
"""A generic RSS Feed spider"""
name = "rss_spider"
def __init__(self, urls, **kwargs):
self.start_urls = urls
super().__init__(**kwargs)
def parse_entry(self, response, feed, entry):
item = E.item(
E.title(entry.get("title")),
E.link(entry.get("link")),
E.description(entry.get("description")),
E.guid(
entry.get("id"),
{"isPermaLink": "true" if entry.guidislink else "false"},
),
E.pubDate(normalize_date(entry.get("published_parsed"))),
E.author(entry.get("author")),
ITUNES.summary(entry.get("summary")),
ITUNES.duration(entry.get("itunes_duration")),
)
for enc in entry.enclosures:
item.append(
E.enclosure(
E.url(enc.get("href")),
E.length(enc.get("length")),
E.type(enc.get("type")),
)
)
if "content" in entry:
for c in entry.content:
if c.type == "text/html":
item.append(CONTENT.encoded(CDATA(c.value)))
if isinstance(entry.get("media_content"), list):
for media in (
media for media in entry["media_content"] if media.get("url")
):
item.append(
MEDIA.content(
E.url(media.get("url")),
E.type(media.get("type")),
E.medium(media.get("medium")),
E.isDefault(media.get("isDefault")),
E.expression(media.get("expression")),
E.bitrate(media.get("bitrate")),
E.framerate(media.get("framerate")),
E.samplingrate(media.get("samplingrate")),
E.channels(media.get("channels")),
E.duration(media.get("duration")),
E.height(media.get("height")),
E.width(media.get("width")),
E.lang(media.get("lang")),
)
)
return ElementItem(el=item)