import logging from typing import Dict, List, Tuple import feedparser from scrapy.crawler import Crawler from scrapy.spiders import Spider from scrapy.utils.spider import iterate_spider_output from repub.items import ChannelElementItem, ElementItem from repub.rss import CDATA, CONTENT, ITUNES, MEDIA, E, munge_cdata_html, normalize_date from repub.utils import FileType, determine_file_type, local_file_path class BaseRssFeedSpider(Spider): """ This class intends to be the base class for spiders that scrape from RSS feeds. """ def __init__(self, feed_name, **kwargs): super().__init__(**kwargs) self.feed_name = feed_name def _set_crawler(self, crawler: Crawler) -> None: super()._set_crawler(crawler) for s in [ "REPUBLISHER_IMAGE_DIR", "REPUBLISHER_FILE_DIR", "REPUBLISHER_AUDIO_DIR", "REPUBLISHER_VIDEO_DIR", ]: if self.settings.get(s) is None: raise RuntimeError(f"Missing setting: {s}") def rewrite_file_url(self, file_type: FileType, url): file_dir = self.settings["REPUBLISHER_FILE_DIR"] if file_type == FileType.IMAGE: file_dir = self.settings["REPUBLISHER_IMAGE_DIR"] elif file_type == FileType.VIDEO: file_dir = self.settings["REPUBLISHER_VIDEO_DIR"] elif file_type == FileType.AUDIO: file_dir = self.settings["REPUBLISHER_AUDIO_DIR"] return f"/{file_dir}/{local_file_path(url)}" def rewrite_image_url(self, url): return self.rewrite_file_url(FileType.IMAGE, url) def munge_cdata_html(self, html) -> Tuple[str, Dict[FileType, List[str]]]: urls = {FileType.IMAGE: [], FileType.VIDEO: [], FileType.AUDIO: []} def replace_link(el, attr, old_link): if len(old_link) == 0 or el.tag in ["a", "iframe"]: return old_link file_type = None if el.tag in ["img"]: file_type = FileType.IMAGE elif el.tag in ["source"] and el.getparent() is not None: if el.getparent().tag == "video": file_type = FileType.VIDEO elif el.getparent().tag == "audio": file_type = FileType.AUDIO elif el.getparent().tag == "picture": file_type = FileType.IMAGE if not file_type: self.logger.warn( f"Could not identify file type of link, skipping. tag={el.tag} attr={attr} link={old_link}" ) return old_link urls[file_type].append(old_link) new_link = self.rewrite_file_url(file_type, old_link) if file_type != FileType.IMAGE: print(f"{old_link} -> {new_link}") return new_link return munge_cdata_html(html, replace_link), urls def parse_feed(self, feed_text): parsed = feedparser.parse(feed_text, sanitize_html=False) if parsed.bozo: logging.error( "Bozo feed data. %s: %r", parsed.bozo_exception.__class__.__name__, parsed.bozo_exception, ) if hasattr(parsed.bozo_exception, "getLineNumber") and hasattr( parsed.bozo_exception, "getMessage" ): line = parsed.bozo_exception.getLineNumber() logging.error("Line %d: %s", line, parsed.bozo_exception.getMessage()) segment = feed_text.split("\n")[line - 1] logging.info("Body segment with error: %r", segment) return None return parsed def parse_channel_meta(self, response, feed): f = feed.feed channel = E.channel( E.title(f.get("title")), E.link(f.get("link")), E.description(f.get("description")), E.language(f.get("language")), E.copyright(f.get("copyright")), E.webMaster(f.get("publisher")), E.generator(f.get("generator")), E.pubDate(normalize_date(f.get("published_parsed"))), E.lastBuildDate(normalize_date(f.get("updated_parsed"))), ITUNES.explicit("yes" if f.get("itunes_explicit", False) else "no"), ) for tag in f.get("tags", []): channel.append(E.category(tag.term)) image_urls = [] if "image" in f: if "href" in f.image: image = E.image( E.title(f.get("title")), E.link(f.get("link")), E.url(self.rewrite_image_url(f.image.get("href"))), E.description(f.get("description")), ) image_urls.append(f.image.get("href")) else: image = E.image( E.title(f.image.get("title")), E.link(f.image.get("link")), E.url(self.rewrite_image_url(f.image.get("url"))), E.description(f.image.get("description")), E.width(f.image.get("width")), E.height(f.image.get("height")), ) image_urls.append(f.image.get("url")) channel.append(image) return ChannelElementItem( feed_name=self.feed_name, el=channel, image_urls=image_urls, images=[] ) def _parse(self, response, **kwargs): response = self.adapt_response(response) feed = self.parse_feed(response.body) if feed and feed.feed: return self.parse_entries(response, feed) def parse_entry(self, response, feed, entry): """This method must be overridden with your custom spider functionality""" raise NotImplementedError def parse_entries(self, response, feed): channel = self.parse_channel_meta(response, feed) yield channel for entry in feed.entries: ret = iterate_spider_output(self.parse_entry(response, feed, entry)) yield from self.process_results(response, feed, ret) def process_results(self, response, feed, results): """This overridable method is called for each result (item or request) returned by the spider, and it's intended to perform any last time processing required before returning the results to the framework core, for example setting the item GUIDs. It receives a list of results and the response which originated that results. It must return a list of results (items or requests). """ return results def adapt_response(self, response): """You can override this function in order to make any changes you want to into the feed before parsing it. This function must return a response. """ return response class RssFeedSpider(BaseRssFeedSpider): """A generic RSS Feed spider""" name = "rss_spider" def __init__(self, urls, **kwargs): self.start_urls = urls super().__init__(**kwargs) def parse_entry(self, response, feed, entry): image_urls = [] file_urls = [] audio_urls = [] video_urls = [] def add_url(file_type, url): if file_type == FileType.IMAGE: image_urls.append(url) elif file_type == FileType.AUDIO: audio_urls.append(url) elif file_type == FileType.VIDEO: video_urls.append(url) elif file_type == FileType.FILE: file_urls.append(url) item = E.item( E.title(entry.get("title")), E.link(entry.get("link")), E.description(entry.get("description")), E.guid( entry.get("id"), {"isPermaLink": "true" if entry.guidislink else "false"}, ), E.pubDate(normalize_date(entry.get("published_parsed"))), E.author(entry.get("author")), ITUNES.summary(entry.get("summary")), ITUNES.duration(entry.get("itunes_duration")), ITUNES.image( None, ( {"href": self.rewrite_image_url(entry.get("image").href)} if "image" in entry else None ), ), ) if entry.get("image"): image_urls.append(entry.get("image").href) for enc in entry.enclosures: url = enc.get("href") file_type = determine_file_type(url=url, mimetype=enc.get("type")) item.append( E.enclosure( E.url(self.rewrite_file_url(file_type, url)), E.length(enc.get("length")), E.type(enc.get("type")), ) ) self.logger.debug( f"feed {self.feed_name} encountered enclsoure {url} {file_type}" ) add_url(file_type, url) if "content" in entry: for c in entry.content: if c.type == "text/html": html, urls = self.munge_cdata_html(c.value) item.append(CONTENT.encoded(CDATA(html))) image_urls.extend(urls[FileType.IMAGE]) video_urls.extend(urls[FileType.VIDEO]) audio_urls.extend(urls[FileType.AUDIO]) if isinstance(entry.get("media_content"), list): for media in ( media for media in entry["media_content"] if media.get("url") ): file_type = determine_file_type( url=media.get("url"), medium=media.get("medium"), mimetype=media.get("type"), ) item.append( MEDIA.content( E.url(self.rewrite_file_url(file_type, media.get("url"))), E.type(media.get("type")), E.medium(media.get("medium")), E.isDefault(media.get("isDefault")), E.expression(media.get("expression")), E.bitrate(media.get("bitrate")), E.framerate(media.get("framerate")), E.samplingrate(media.get("samplingrate")), E.channels(media.get("channels")), E.duration(media.get("duration")), E.height(media.get("height")), E.width(media.get("width")), E.lang(media.get("lang")), ) ) add_url(file_type, media.get("url")) return ElementItem( feed_name=self.feed_name, el=item, images=[], image_urls=image_urls, files=[], file_urls=file_urls, audio_urls=audio_urls, audios=[], video_urls=video_urls, videos=[], )