From bff04afbf612612108d9651f355d066c4b5f6a64 Mon Sep 17 00:00:00 2001 From: Abel Luck Date: Tue, 31 Mar 2026 15:53:29 +0200 Subject: [PATCH] Resolve Pangea tweet snippet placeholders --- pygea/pangeaservice.py | 99 ++++++++++++++++- tests/test_pangeaservice_snippets.py | 156 +++++++++++++++++++++++++++ 2 files changed, 254 insertions(+), 1 deletion(-) create mode 100644 tests/test_pangeaservice_snippets.py diff --git a/pygea/pangeaservice.py b/pygea/pangeaservice.py index 99adf50..5c4316d 100644 --- a/pygea/pangeaservice.py +++ b/pygea/pangeaservice.py @@ -15,17 +15,100 @@ Pangea Documentation: """ import hashlib +import html import json import re import urllib.parse from datetime import datetime, timedelta, timezone import requests +from bs4 import BeautifulSoup from dateutil.parser import * from pygea import pexception, plogger, utilities from pygea.config import PygeaConfig +_EMBED_HTML_RE = re.compile( + r'"Name":"embed_html","Value":"((?:\\.|[^"])*)","DefaultValue"', + re.DOTALL, +) + + +def _supported_snippet_elements(fragment: BeautifulSoup) -> list: + supported = [] + for element in fragment.select(".tag_snippet"): + mode = element.get("mode", "") + querystring = element.get("querystring", "") + mode_parts = mode.split("|") + if len(mode_parts) < 3: + continue + if mode_parts[0] != "infographics": + continue + object_id = mode_parts[2].strip() + if not object_id.isdigit() or not querystring: + continue + supported.append(element) + return supported + + +def snippet_urls_from_article_content(raw_html: str, domain: str) -> list[str]: + fragment = BeautifulSoup(raw_html, "html.parser") + urls = [] + for element in _supported_snippet_elements(fragment): + mode_parts = element.get("mode", "").split("|") + object_id = mode_parts[2].strip() + querystring = element.get("querystring", "") + urls.append(f"https://{domain}/a/{object_id}.html{querystring}") + return urls + + +def extract_embed_html_from_snippet_page(page_html: str) -> str | None: + match = _EMBED_HTML_RE.search(page_html) + if match is None: + return None + + decoded = html.unescape(json.loads(f'"{match.group(1)}"')) + fragment = BeautifulSoup(decoded, "html.parser") + for script in fragment.find_all("script"): + script.decompose() + + blockquote = fragment.find( + "blockquote", class_=lambda value: value and "twitter-tweet" in value + ) + if blockquote is None: + return None + return str(blockquote) + + +def resolve_content_snippets(raw_html: str, domain: str, fetch_page_html) -> str: + fragment = BeautifulSoup(raw_html, "html.parser") + cache: dict[str, str | None] = {} + + for element in _supported_snippet_elements(fragment): + mode_parts = element.get("mode", "").split("|") + object_id = mode_parts[2].strip() + querystring = element.get("querystring", "") + url = f"https://{domain}/a/{object_id}.html{querystring}" + + if url not in cache: + page_html = fetch_page_html(url) + cache[url] = ( + extract_embed_html_from_snippet_page(page_html) + if page_html is not None + else None + ) + + embed_html = cache[url] + if embed_html is None: + continue + + replacement = BeautifulSoup(embed_html, "html.parser") + for child in list(replacement.contents): + element.insert_before(child) + element.decompose() + + return str(fragment) + class PangeaService: """Interface to the Pangea API""" @@ -273,7 +356,9 @@ class PangeaService: # 'audioclips' and 'videoclips' occasionally have no text content if article.get("content"): - rss["content"] = article["content"] + rss["content"] = resolve_content_snippets( + article["content"], self._domain, self._fetch_snippet_page_html + ) else: rss["content"] = "" @@ -515,6 +600,18 @@ class PangeaService: return json.loads(response.text) + def _fetch_snippet_page_html(self, url): + response = requests.get(url, timeout=20) + if response.status_code != 200: + if self._verbose_p: + print( + "snippet fetch failed for {} with status code [{}]".format( + url, str(response.status_code) + ) + ) + return None + return response.text + def _is_implemented(self, cmd): """Test if a provided string references an actual command""" if cmd in self._commands_list: diff --git a/tests/test_pangeaservice_snippets.py b/tests/test_pangeaservice_snippets.py new file mode 100644 index 0000000..b6ef7bc --- /dev/null +++ b/tests/test_pangeaservice_snippets.py @@ -0,0 +1,156 @@ +from pygea import pangeaservice + +ARTICLE_WITH_TWEET_SNIPPETS = """ +
+

Lead paragraph.

+
+

Middle paragraph.

+
+

Tail paragraph.

+""".strip() + + +SNIPPET_PAGE_58108 = """ + + + +
+ + +""".strip() + + +SNIPPET_PAGE_58109 = """ + + + +
+ + +""".strip() + + +def test_snippet_urls_from_article_content_finds_supported_infographics_snippets() -> ( + None +): + urls = pangeaservice.snippet_urls_from_article_content( + ARTICLE_WITH_TWEET_SNIPPETS, "www.martinoticias.com" + ) + + assert urls == [ + "https://www.martinoticias.com/a/29036.html?parameterid=58108", + "https://www.martinoticias.com/a/29036.html?parameterid=58109", + ] + + +def test_snippet_urls_from_article_content_ignores_malformed_or_unsupported_snippets() -> ( + None +): + raw_html = """ +
+
+
+

No supported snippet target is present here.

+ """.strip() + + urls = pangeaservice.snippet_urls_from_article_content( + raw_html, "www.martinoticias.com" + ) + + assert urls == [] + + +def test_extract_embed_html_from_snippet_page_returns_decoded_blockquote() -> None: + embed_html = pangeaservice.extract_embed_html_from_snippet_page(SNIPPET_PAGE_58108) + + assert embed_html is not None + assert " None: + embed_html = pangeaservice.extract_embed_html_from_snippet_page( + "" + ) + + assert embed_html is None + + +def test_resolve_content_snippets_replaces_supported_tweet_placeholders() -> None: + def fetch_html(url: str) -> str | None: + pages = { + "https://www.martinoticias.com/a/29036.html?parameterid=58108": SNIPPET_PAGE_58108, + "https://www.martinoticias.com/a/29036.html?parameterid=58109": SNIPPET_PAGE_58109, + } + return pages.get(url) + + resolved = pangeaservice.resolve_content_snippets( + ARTICLE_WITH_TWEET_SNIPPETS, "www.martinoticias.com", fetch_html + ) + + assert 'class="twitter-tweet"' in resolved + assert "@HenryAlviarez" in resolved + assert "@MariaCorinaYA" in resolved + assert "?parameterid=58108" not in resolved + assert "?parameterid=58109" not in resolved + assert "Lead paragraph." in resolved + assert "Tail paragraph." in resolved + + +def test_resolve_content_snippets_leaves_placeholder_when_resolution_fails() -> None: + resolved = pangeaservice.resolve_content_snippets( + ARTICLE_WITH_TWEET_SNIPPETS, + "www.martinoticias.com", + lambda _url: None, + ) + + assert 'mode="infographics|plain|29036|large||"' in resolved + assert "?parameterid=58108" in resolved + assert "?parameterid=58109" in resolved + assert 'class="twitter-tweet"' not in resolved + + +def test_rss_article_from_pangea_article_resolves_supported_snippets( + monkeypatch, +) -> None: + service = object.__new__(pangeaservice.PangeaService) + service._verbose_p = False + service._domain = "www.martinoticias.com" + service._rev_categories = {} + + monkeypatch.setattr( + pangeaservice.utilities, + "get_media_metadata", + lambda _url: None, + ) + monkeypatch.setattr( + service, + "_fetch_snippet_page_html", + lambda url: { + "https://www.martinoticias.com/a/29036.html?parameterid=58108": SNIPPET_PAGE_58108, + "https://www.martinoticias.com/a/29036.html?parameterid=58109": SNIPPET_PAGE_58109, + }.get(url), + ) + + rss_article = service.rss_article_from_pangea_article( + { + "url": "https://www.martinoticias.com/a/reabre-sede-del-partido-de-maria-corina-machado-en-caracas/453944.html", + "title": "Reabre sede del partido de María Corina Machado en Caracas (VIDEO)", + "content": ARTICLE_WITH_TWEET_SNIPPETS, + "pubDate": "2026-03-29T16:29:38", + } + ) + + assert 'class="twitter-tweet"' in rss_article["content"] + assert "@HenryAlviarez" in rss_article["content"] + assert "@MariaCorinaYA" in rss_article["content"]