Resolve Pangea tweet snippet placeholders
This commit is contained in:
parent
897af2872c
commit
bff04afbf6
2 changed files with 254 additions and 1 deletions
|
|
@ -15,17 +15,100 @@ Pangea Documentation:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import html
|
||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
from dateutil.parser import *
|
from dateutil.parser import *
|
||||||
|
|
||||||
from pygea import pexception, plogger, utilities
|
from pygea import pexception, plogger, utilities
|
||||||
from pygea.config import PygeaConfig
|
from pygea.config import PygeaConfig
|
||||||
|
|
||||||
|
_EMBED_HTML_RE = re.compile(
|
||||||
|
r'"Name":"embed_html","Value":"((?:\\.|[^"])*)","DefaultValue"',
|
||||||
|
re.DOTALL,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _supported_snippet_elements(fragment: BeautifulSoup) -> list:
|
||||||
|
supported = []
|
||||||
|
for element in fragment.select(".tag_snippet"):
|
||||||
|
mode = element.get("mode", "")
|
||||||
|
querystring = element.get("querystring", "")
|
||||||
|
mode_parts = mode.split("|")
|
||||||
|
if len(mode_parts) < 3:
|
||||||
|
continue
|
||||||
|
if mode_parts[0] != "infographics":
|
||||||
|
continue
|
||||||
|
object_id = mode_parts[2].strip()
|
||||||
|
if not object_id.isdigit() or not querystring:
|
||||||
|
continue
|
||||||
|
supported.append(element)
|
||||||
|
return supported
|
||||||
|
|
||||||
|
|
||||||
|
def snippet_urls_from_article_content(raw_html: str, domain: str) -> list[str]:
|
||||||
|
fragment = BeautifulSoup(raw_html, "html.parser")
|
||||||
|
urls = []
|
||||||
|
for element in _supported_snippet_elements(fragment):
|
||||||
|
mode_parts = element.get("mode", "").split("|")
|
||||||
|
object_id = mode_parts[2].strip()
|
||||||
|
querystring = element.get("querystring", "")
|
||||||
|
urls.append(f"https://{domain}/a/{object_id}.html{querystring}")
|
||||||
|
return urls
|
||||||
|
|
||||||
|
|
||||||
|
def extract_embed_html_from_snippet_page(page_html: str) -> str | None:
|
||||||
|
match = _EMBED_HTML_RE.search(page_html)
|
||||||
|
if match is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
decoded = html.unescape(json.loads(f'"{match.group(1)}"'))
|
||||||
|
fragment = BeautifulSoup(decoded, "html.parser")
|
||||||
|
for script in fragment.find_all("script"):
|
||||||
|
script.decompose()
|
||||||
|
|
||||||
|
blockquote = fragment.find(
|
||||||
|
"blockquote", class_=lambda value: value and "twitter-tweet" in value
|
||||||
|
)
|
||||||
|
if blockquote is None:
|
||||||
|
return None
|
||||||
|
return str(blockquote)
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_content_snippets(raw_html: str, domain: str, fetch_page_html) -> str:
|
||||||
|
fragment = BeautifulSoup(raw_html, "html.parser")
|
||||||
|
cache: dict[str, str | None] = {}
|
||||||
|
|
||||||
|
for element in _supported_snippet_elements(fragment):
|
||||||
|
mode_parts = element.get("mode", "").split("|")
|
||||||
|
object_id = mode_parts[2].strip()
|
||||||
|
querystring = element.get("querystring", "")
|
||||||
|
url = f"https://{domain}/a/{object_id}.html{querystring}"
|
||||||
|
|
||||||
|
if url not in cache:
|
||||||
|
page_html = fetch_page_html(url)
|
||||||
|
cache[url] = (
|
||||||
|
extract_embed_html_from_snippet_page(page_html)
|
||||||
|
if page_html is not None
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
|
||||||
|
embed_html = cache[url]
|
||||||
|
if embed_html is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
replacement = BeautifulSoup(embed_html, "html.parser")
|
||||||
|
for child in list(replacement.contents):
|
||||||
|
element.insert_before(child)
|
||||||
|
element.decompose()
|
||||||
|
|
||||||
|
return str(fragment)
|
||||||
|
|
||||||
|
|
||||||
class PangeaService:
|
class PangeaService:
|
||||||
"""Interface to the Pangea API"""
|
"""Interface to the Pangea API"""
|
||||||
|
|
@ -273,7 +356,9 @@ class PangeaService:
|
||||||
|
|
||||||
# 'audioclips' and 'videoclips' occasionally have no text content
|
# 'audioclips' and 'videoclips' occasionally have no text content
|
||||||
if article.get("content"):
|
if article.get("content"):
|
||||||
rss["content"] = article["content"]
|
rss["content"] = resolve_content_snippets(
|
||||||
|
article["content"], self._domain, self._fetch_snippet_page_html
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
rss["content"] = ""
|
rss["content"] = ""
|
||||||
|
|
||||||
|
|
@ -515,6 +600,18 @@ class PangeaService:
|
||||||
|
|
||||||
return json.loads(response.text)
|
return json.loads(response.text)
|
||||||
|
|
||||||
|
def _fetch_snippet_page_html(self, url):
|
||||||
|
response = requests.get(url, timeout=20)
|
||||||
|
if response.status_code != 200:
|
||||||
|
if self._verbose_p:
|
||||||
|
print(
|
||||||
|
"snippet fetch failed for {} with status code [{}]".format(
|
||||||
|
url, str(response.status_code)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
return response.text
|
||||||
|
|
||||||
def _is_implemented(self, cmd):
|
def _is_implemented(self, cmd):
|
||||||
"""Test if a provided string references an actual command"""
|
"""Test if a provided string references an actual command"""
|
||||||
if cmd in self._commands_list:
|
if cmd in self._commands_list:
|
||||||
|
|
|
||||||
156
tests/test_pangeaservice_snippets.py
Normal file
156
tests/test_pangeaservice_snippets.py
Normal file
|
|
@ -0,0 +1,156 @@
|
||||||
|
from pygea import pangeaservice
|
||||||
|
|
||||||
|
ARTICLE_WITH_TWEET_SNIPPETS = """
|
||||||
|
<div contenteditable="false" class="tag_image tag_snippet" mode="infographics|plain|389510|large||Trinity Audio Embed" querystring=""></div>
|
||||||
|
<p>Lead paragraph.</p>
|
||||||
|
<div contenteditable="false" class="tag_image tag_snippet" mode="infographics|plain|29036|large||" querystring="?parameterid=58108"></div>
|
||||||
|
<p>Middle paragraph.</p>
|
||||||
|
<div contenteditable="false" class="tag_image tag_snippet" mode="infographics|plain|29036|large||" querystring="?parameterid=58109"></div>
|
||||||
|
<p>Tail paragraph.</p>
|
||||||
|
""".strip()
|
||||||
|
|
||||||
|
|
||||||
|
SNIPPET_PAGE_58108 = """
|
||||||
|
<html>
|
||||||
|
<body>
|
||||||
|
<script>
|
||||||
|
var snippet = {
|
||||||
|
params:[{"Name":"embed_html","Value":"<blockquote class=\\"twitter-tweet\\"><p lang=\\"es\\" dir=\\"ltr\\">VENTE VENEZUELA COMPITE Y GANA <a href=\\"https://twitter.com/HenryAlviarez?ref_src=twsrc%5Etfw\\">@HenryAlviarez</a></p>— Vente Venezuela (@VenteVenezuela) <a href=\\"https://twitter.com/VenteVenezuela/status/2037926275017294113?ref_src=twsrc%5Etfw\\">March 28, 2026</a></blockquote>","DefaultValue":"","HtmlEncode":false,"Type":"HTML"}]
|
||||||
|
};
|
||||||
|
</script>
|
||||||
|
<div class="snippetLoading twitterSnippet"></div>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
""".strip()
|
||||||
|
|
||||||
|
|
||||||
|
SNIPPET_PAGE_58109 = """
|
||||||
|
<html>
|
||||||
|
<body>
|
||||||
|
<script>
|
||||||
|
var snippet = {
|
||||||
|
params:[{"Name":"embed_html","Value":"<blockquote class=\\"twitter-tweet\\"><p lang=\\"es\\" dir=\\"ltr\\">VENTE VUELVE A CASA!! Estoy ahí, con cada uno de ustedes.</p>— María Corina Machado (@MariaCorinaYA) <a href=\\"https://twitter.com/MariaCorinaYA/status/2037922462881423594?ref_src=twsrc%5Etfw\\">March 28, 2026</a></blockquote>","DefaultValue":"","HtmlEncode":false,"Type":"HTML"}]
|
||||||
|
};
|
||||||
|
</script>
|
||||||
|
<div class="snippetLoading twitterSnippet"></div>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
""".strip()
|
||||||
|
|
||||||
|
|
||||||
|
def test_snippet_urls_from_article_content_finds_supported_infographics_snippets() -> (
|
||||||
|
None
|
||||||
|
):
|
||||||
|
urls = pangeaservice.snippet_urls_from_article_content(
|
||||||
|
ARTICLE_WITH_TWEET_SNIPPETS, "www.martinoticias.com"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert urls == [
|
||||||
|
"https://www.martinoticias.com/a/29036.html?parameterid=58108",
|
||||||
|
"https://www.martinoticias.com/a/29036.html?parameterid=58109",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_snippet_urls_from_article_content_ignores_malformed_or_unsupported_snippets() -> (
|
||||||
|
None
|
||||||
|
):
|
||||||
|
raw_html = """
|
||||||
|
<div class="tag_image tag_snippet" mode="video|plain|453931|large" querystring=""></div>
|
||||||
|
<div class="tag_image tag_snippet" mode="infographics|plain||large||" querystring="?parameterid=58108"></div>
|
||||||
|
<div class="tag_image tag_snippet" querystring="?parameterid=58109"></div>
|
||||||
|
<p>No supported snippet target is present here.</p>
|
||||||
|
""".strip()
|
||||||
|
|
||||||
|
urls = pangeaservice.snippet_urls_from_article_content(
|
||||||
|
raw_html, "www.martinoticias.com"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert urls == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_embed_html_from_snippet_page_returns_decoded_blockquote() -> None:
|
||||||
|
embed_html = pangeaservice.extract_embed_html_from_snippet_page(SNIPPET_PAGE_58108)
|
||||||
|
|
||||||
|
assert embed_html is not None
|
||||||
|
assert "<blockquote" in embed_html
|
||||||
|
assert 'class="twitter-tweet"' in embed_html
|
||||||
|
assert "@HenryAlviarez" in embed_html
|
||||||
|
assert "<blockquote" not in embed_html
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_embed_html_from_snippet_page_returns_none_when_missing() -> None:
|
||||||
|
embed_html = pangeaservice.extract_embed_html_from_snippet_page(
|
||||||
|
"<html><body><script>var x = 1;</script></body></html>"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert embed_html is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_resolve_content_snippets_replaces_supported_tweet_placeholders() -> None:
|
||||||
|
def fetch_html(url: str) -> str | None:
|
||||||
|
pages = {
|
||||||
|
"https://www.martinoticias.com/a/29036.html?parameterid=58108": SNIPPET_PAGE_58108,
|
||||||
|
"https://www.martinoticias.com/a/29036.html?parameterid=58109": SNIPPET_PAGE_58109,
|
||||||
|
}
|
||||||
|
return pages.get(url)
|
||||||
|
|
||||||
|
resolved = pangeaservice.resolve_content_snippets(
|
||||||
|
ARTICLE_WITH_TWEET_SNIPPETS, "www.martinoticias.com", fetch_html
|
||||||
|
)
|
||||||
|
|
||||||
|
assert 'class="twitter-tweet"' in resolved
|
||||||
|
assert "@HenryAlviarez" in resolved
|
||||||
|
assert "@MariaCorinaYA" in resolved
|
||||||
|
assert "?parameterid=58108" not in resolved
|
||||||
|
assert "?parameterid=58109" not in resolved
|
||||||
|
assert "Lead paragraph." in resolved
|
||||||
|
assert "Tail paragraph." in resolved
|
||||||
|
|
||||||
|
|
||||||
|
def test_resolve_content_snippets_leaves_placeholder_when_resolution_fails() -> None:
|
||||||
|
resolved = pangeaservice.resolve_content_snippets(
|
||||||
|
ARTICLE_WITH_TWEET_SNIPPETS,
|
||||||
|
"www.martinoticias.com",
|
||||||
|
lambda _url: None,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert 'mode="infographics|plain|29036|large||"' in resolved
|
||||||
|
assert "?parameterid=58108" in resolved
|
||||||
|
assert "?parameterid=58109" in resolved
|
||||||
|
assert 'class="twitter-tweet"' not in resolved
|
||||||
|
|
||||||
|
|
||||||
|
def test_rss_article_from_pangea_article_resolves_supported_snippets(
|
||||||
|
monkeypatch,
|
||||||
|
) -> None:
|
||||||
|
service = object.__new__(pangeaservice.PangeaService)
|
||||||
|
service._verbose_p = False
|
||||||
|
service._domain = "www.martinoticias.com"
|
||||||
|
service._rev_categories = {}
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
pangeaservice.utilities,
|
||||||
|
"get_media_metadata",
|
||||||
|
lambda _url: None,
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
service,
|
||||||
|
"_fetch_snippet_page_html",
|
||||||
|
lambda url: {
|
||||||
|
"https://www.martinoticias.com/a/29036.html?parameterid=58108": SNIPPET_PAGE_58108,
|
||||||
|
"https://www.martinoticias.com/a/29036.html?parameterid=58109": SNIPPET_PAGE_58109,
|
||||||
|
}.get(url),
|
||||||
|
)
|
||||||
|
|
||||||
|
rss_article = service.rss_article_from_pangea_article(
|
||||||
|
{
|
||||||
|
"url": "https://www.martinoticias.com/a/reabre-sede-del-partido-de-maria-corina-machado-en-caracas/453944.html",
|
||||||
|
"title": "Reabre sede del partido de María Corina Machado en Caracas (VIDEO)",
|
||||||
|
"content": ARTICLE_WITH_TWEET_SNIPPETS,
|
||||||
|
"pubDate": "2026-03-29T16:29:38",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
assert 'class="twitter-tweet"' in rss_article["content"]
|
||||||
|
assert "@HenryAlviarez" in rss_article["content"]
|
||||||
|
assert "@MariaCorinaYA" in rss_article["content"]
|
||||||
Loading…
Add table
Add a link
Reference in a new issue