This commit is contained in:
Abel Luck 2026-03-29 13:48:30 +02:00
parent 81bb8afc41
commit 98dcea4d7e
10 changed files with 811 additions and 478 deletions

46
flake.lock generated Normal file
View file

@ -0,0 +1,46 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1774386573,
"narHash": "sha256-4hAV26quOxdC6iyG7kYaZcM3VOskcPUrdCQd/nx8obc=",
"rev": "46db2e09e1d3f113a13c0d7b81e2f221c63b8ce9",
"revCount": 969196,
"type": "tarball",
"url": "https://api.flakehub.com/f/pinned/NixOS/nixpkgs/0.1.969196%2Brev-46db2e09e1d3f113a13c0d7b81e2f221c63b8ce9/019d279e-af65-79ce-92be-5dee7b1e36d4/source.tar.gz"
},
"original": {
"type": "tarball",
"url": "https://flakehub.com/f/NixOS/nixpkgs/0.1"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs",
"treefmt-nix": "treefmt-nix"
}
},
"treefmt-nix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1773297127,
"narHash": "sha256-6E/yhXP7Oy/NbXtf1ktzmU8SdVqJQ09HC/48ebEGBpk=",
"owner": "numtide",
"repo": "treefmt-nix",
"rev": "71b125cd05fbfd78cab3e070b73544abe24c5016",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "treefmt-nix",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

138
flake.nix Normal file
View file

@ -0,0 +1,138 @@
{
description = "pygea - Pangea RSS feed generator";
inputs = {
nixpkgs.url = "https://flakehub.com/f/NixOS/nixpkgs/0.1";
treefmt-nix = {
url = "github:numtide/treefmt-nix";
inputs.nixpkgs.follows = "nixpkgs";
};
};
outputs =
{
self,
nixpkgs,
treefmt-nix,
...
}:
let
systems = [ "x86_64-linux" ];
forAllSystems =
fn:
nixpkgs.lib.genAttrs systems (
system:
fn (
import nixpkgs {
inherit system;
config.allowUnfree = true;
}
)
);
mkTreefmtConfig = pkgs: (treefmt-nix.lib.evalModule pkgs ./treefmt.nix).config;
in
{
formatter = forAllSystems (pkgs: (mkTreefmtConfig pkgs).build.wrapper);
packages = forAllSystems (
pkgs:
let
pkg = pkgs.callPackage ./nix/packages/pygea/package.nix { };
in
{
pygea = pkg;
default = pkg;
}
);
apps = forAllSystems (
pkgs:
let
package = self.packages.${pkgs.stdenv.hostPlatform.system}.default;
in
{
pygea = {
type = "app";
program = "${package}/bin/pygea";
meta.description = "pygea runtime";
};
default = {
type = "app";
program = "${package}/bin/pygea";
meta.description = "pygea runtime";
};
}
);
checks = forAllSystems (
pkgs:
let
inherit (pkgs.stdenv.hostPlatform) system;
exportedPackage = self.packages.${system}.default;
treefmtConfig = mkTreefmtConfig pkgs;
smokePython = pkgs.python313.withPackages (ps: [
ps.requests
ps.beautifulsoup4
ps.feedgen
ps."python-dateutil"
]);
smokeCheck = pkgs.runCommand "pygea-smoke" { nativeBuildInputs = [ smokePython ]; } ''
export PYTHONPATH="${exportedPackage}/${pkgs.python313.sitePackages}:$PYTHONPATH"
python - <<'PY'
from pathlib import Path
for source_file in Path("${./.}/pygea").glob("*.py"):
compile(source_file.read_text(encoding="utf-8"), str(source_file), "exec")
PY
python -c "import pygea; import pygea.utilities; import pygea.pexception"
mkdir -p "$out"
touch "$out/passed"
'';
deadnixCheck = pkgs.runCommand "pygea-deadnix" { nativeBuildInputs = [ pkgs.deadnix ]; } ''
cd ${./.}
deadnix --fail .
mkdir -p "$out"
touch "$out/passed"
'';
statixCheck = pkgs.runCommand "pygea-statix" { nativeBuildInputs = [ pkgs.statix ]; } ''
cd ${./.}
statix check
mkdir -p "$out"
touch "$out/passed"
'';
in
{
devshell-default = self.devShells.${system}.default;
formatter = treefmtConfig.build.wrapper;
package-default = exportedPackage;
treefmt = treefmtConfig.build.check ./.;
smoke = smokeCheck;
deadnix = deadnixCheck;
statix = statixCheck;
}
);
devShells = forAllSystems (
pkgs:
let
treefmtConfig = mkTreefmtConfig pkgs;
in
{
default = pkgs.mkShell {
packages = [
pkgs.python313
pkgs.uv
self.packages.${pkgs.stdenv.hostPlatform.system}.default
treefmtConfig.build.wrapper
pkgs.deadnix
pkgs.statix
];
};
}
);
};
}

View file

@ -0,0 +1,36 @@
{
lib,
python3Packages,
}:
python3Packages.buildPythonApplication {
pname = "pygea";
version = "0.1.0";
pyproject = true;
src = lib.cleanSource ../../..;
build-system = [
python3Packages.hatchling
];
dependencies = [
python3Packages.requests
python3Packages.beautifulsoup4
python3Packages.feedgen
python3Packages.python-dateutil
];
pythonImportsCheck = [
"pygea"
"pygea.utilities"
"pygea.pexception"
];
meta = {
description = "Pangea RSS feed generator";
homepage = "https://gitlab.com/guardianproject-ops/pygea";
license = lib.licenses.lgpl3Plus;
mainProgram = "pygea";
};
}

View file

@ -1,16 +1,16 @@
"""Pygea main entry point"""
import hashlib
import json
import os
from pygea import utilities
from pygea.pangeafeed import PangeaFeed
from pygea.pexception import PangeaServiceException
from pygea import utilities
OUTPUT_TO_FILE = utilities.get_configuration_variable('results', 'output_to_file_p')
OUTPUT_FILE_NAME = utilities.get_configuration_variable('results', 'output_file_name')
OUTPUT_DIRECTORY = utilities.get_configuration_variable('results', 'output_directory')
OUTPUT_TO_FILE = utilities.get_configuration_variable("results", "output_to_file_p")
OUTPUT_FILE_NAME = utilities.get_configuration_variable("results", "output_file_name")
OUTPUT_DIRECTORY = utilities.get_configuration_variable("results", "output_directory")
def write_manifest(categories):
@ -22,28 +22,33 @@ def write_manifest(categories):
if not os.path.exists(output_directory):
os.makedirs(output_directory)
manifest_path = os.path.join(output_directory, 'manifest.json')
with open(manifest_path, 'w', encoding='utf-8') as mfile:
json.dump({'categories': categories}, mfile, indent=2, ensure_ascii=False)
mfile.write('\n')
manifest_path = os.path.join(output_directory, "manifest.json")
with open(manifest_path, "w", encoding="utf-8") as mfile:
json.dump({"categories": categories}, mfile, indent=2, ensure_ascii=False)
mfile.write("\n")
def main():
# Feeds are generated for a single, specified, domain
domain = 'www.martinoticias.com'
domain = "www.martinoticias.com"
args = {
# tuple values:
# [0] category name or a string representing a content query
# [1] only the newest content desired (as configured in pygea.ini)?
# [2] special content_type for this category only (from the approved list of types)
'categories': [
('Titulares',True, None),
('Cuba', True, None),
('América Latina', True, None),
('Info Martí ', False, None), # YES! this category name has a space character at the end!
('Noticiero Martí Noticias', True, None)
"categories": [
("Titulares", True, None),
("Cuba", True, None),
("América Latina", True, None),
(
"Info Martí ",
False,
None,
), # YES! this category name has a space character at the end!
("Noticiero Martí Noticias", True, None),
],
'default_content_type': "articles"
"default_content_type": "articles",
}
# TWO OPTIONS from the args defined above:
@ -59,24 +64,29 @@ def main():
# 2. Generate different feeds for each defined category
try:
manifest_categories = []
for cat_tuple in args['categories']:
for cat_tuple in args["categories"]:
# form new args for each category/query
newargs = {
'categories': [cat_tuple],
'default_content_type': "articles"
}
newargs = {"categories": [cat_tuple], "default_content_type": "articles"}
pf = PangeaFeed(domain, newargs)
pf.acquire_content()
pf.generate_feed()
# put each feed into a different sub-directory
feed_subdir = hashlib.md5(cat_tuple[0].encode('utf-8')).hexdigest()[:7]
feed_subdir = hashlib.md5(cat_tuple[0].encode("utf-8")).hexdigest()[:7]
pf.disgorge(feed_subdir)
manifest_categories.append({
'name': cat_tuple[0],
'short-hash': feed_subdir,
'local-path': os.path.join(feed_subdir, OUTPUT_FILE_NAME).replace(os.sep, '/')
})
print("feed for {} output to sub-directory {}".format(cat_tuple[0], feed_subdir))
manifest_categories.append(
{
"name": cat_tuple[0],
"short-hash": feed_subdir,
"local-path": os.path.join(feed_subdir, OUTPUT_FILE_NAME).replace(
os.sep, "/"
),
}
)
print(
"feed for {} output to sub-directory {}".format(
cat_tuple[0], feed_subdir
)
)
write_manifest(manifest_categories)
except PangeaServiceException as error:
print(error)

View file

@ -10,21 +10,22 @@
import os
import sys
from datetime import datetime
from feedgen.feed import FeedGenerator
from pygea import pangeaservice
from pygea import pexception
from pygea import utilities
VERBOSE = utilities.get_configuration_variable('runtime', 'verbose_p')
OUTPUT_TO_FILE = utilities.get_configuration_variable('results', 'output_to_file_p')
OUTPUT_FILE_NAME = utilities.get_configuration_variable('results', 'output_file_name')
OUTPUT_DIRECTORY = utilities.get_configuration_variable('results', 'output_directory')
from pygea import pangeaservice, pexception, utilities
class PangeaFeed():
VERBOSE = utilities.get_configuration_variable("runtime", "verbose_p")
OUTPUT_TO_FILE = utilities.get_configuration_variable("results", "output_to_file_p")
OUTPUT_FILE_NAME = utilities.get_configuration_variable("results", "output_file_name")
OUTPUT_DIRECTORY = utilities.get_configuration_variable("results", "output_directory")
class PangeaFeed:
_domain = None
_categories = None
_content_type = 'articles' # default
_content_type = "articles" # default
def __init__(self, domain, kw_args):
try:
@ -33,45 +34,56 @@ class PangeaFeed():
raise error
self._domain = domain
if kw_args.get('categories'):
self._categories = kw_args['categories']
if kw_args.get("categories"):
self._categories = kw_args["categories"]
else:
raise pexception.PangeaServiceException("ERROR: At least one category or content-query is required")
if kw_args.get('default_content_type'):
if kw_args['default_content_type'] not in self._ps.content_types():
raise pexception.PangeaServiceException("{} is not a valid content type".format(kw_args['content_type']))
self._content_type = kw_args['default_content_type']
raise pexception.PangeaServiceException(
"ERROR: At least one category or content-query is required"
)
if kw_args.get("default_content_type"):
if kw_args["default_content_type"] not in self._ps.content_types():
raise pexception.PangeaServiceException(
"{} is not a valid content type".format(kw_args["content_type"])
)
self._content_type = kw_args["default_content_type"]
def acquire_content(self):
self._full_article_list = []
for (cat, old, type) in self._categories:
for cat, old, type in self._categories:
opt_args = {}
# special type for this category?
if type is None:
type = self._content_type
# wants old stuff (not configured date limit)?
if old is not None:
opt_args['daycount'] = 365 # oldest date = one year
opt_args['filter_date'] = False
opt_args["daycount"] = 365 # oldest date = one year
opt_args["filter_date"] = False
ci = self._ps.category_info(cat)
if ci is not None:
# cat is pre-defined category
opt_args['zoneid'] = ci['id']
opt_args["zoneid"] = ci["id"]
jbody = self._ps.get_content(type, opt_args)
else:
# cat as actually a free-form query string to be used no article content
jbody = self._ps.query_content(cat, opt_args)
if len(jbody) == 0:
if VERBOSE:
print("no articles available for {} [command: {}] [category/query: '{}'])".format(self._domain, self._content_type, cat))
print(
"no articles available for {} [command: {}] [category/query: '{}'])".format(
self._domain, self._content_type, cat
)
)
continue
if VERBOSE:
print ("{} articles added from category/query '{}'".format(str(len(jbody)), cat))
print(
"{} articles added from category/query '{}'".format(
str(len(jbody)), cat
)
)
for art in jbody:
self._full_article_list.append(art)
@ -81,7 +93,7 @@ class PangeaFeed():
# Get preparatory information from the domain's homepage. Most characteristics
# of the RSS Channel information are acquired from the homepage metadata.
#
md = utilities.get_webpage_metadata('https://' + self._domain)
md = utilities.get_webpage_metadata("https://" + self._domain)
fg = FeedGenerator()
self._fg = fg
@ -90,31 +102,38 @@ class PangeaFeed():
# build the RSS <channel> element
#
fg.id(utilities.hash_site_metadata(md))
fg.title(self._content_type + ' from ' + md['og:site_name'])
fg.link(href=md['og:url'], rel='alternate')
fg.description(self._content_type + ' from ' + self._domain + " (" + md['description'] + ")")
fg.title(self._content_type + " from " + md["og:site_name"])
fg.link(href=md["og:url"], rel="alternate")
fg.description(
self._content_type
+ " from "
+ self._domain
+ " ("
+ md["description"]
+ ")"
)
#
# NOTE: the parameters required for <image> in the <channel> are different
# from <image> in an <item>
fg.image(url=md['og:image'], title=md['og:site_name'], link=md['og:url'])
fg.image(url=md["og:image"], title=md["og:site_name"], link=md["og:url"])
#
# Multiple categories/keywords are allowed in the RSS Channel
keywords = md['keywords']
categories = keywords.split(',')
sch = 'https://' + self._domain + '/'
keywords = md["keywords"]
categories = keywords.split(",")
sch = "https://" + self._domain + "/"
for name in categories:
fg.category(term=name, scheme=sch, label=name)
fg.language(md['language'])
fg.generator('Guardian Project Pangea CMS Crawler 1.0')
fg.webMaster('support@guardianproject.info')
fg.language(md["language"])
fg.generator("Guardian Project Pangea CMS Crawler 1.0")
fg.webMaster("support@guardianproject.info")
fg.ttl(60)
datetime_obj = datetime.now()
formatted_time = datetime_obj.strftime('%a, %d %b %Y %H:%M:%S %Z')
fg.lastBuildDate(formatted_time + '+0000')
formatted_time = datetime_obj.strftime("%a, %d %b %Y %H:%M:%S %Z")
fg.lastBuildDate(formatted_time + "+0000")
#
# Build the <item> elements for each <item> and add each item to the RSS Channel
@ -122,52 +141,56 @@ class PangeaFeed():
media_extension_loaded = False
for article in self._full_article_list:
try:
article_deets = self._ps.get_article_detail(article['id'])
article_deets = self._ps.get_article_detail(article["id"])
rss_article = self._ps.rss_article_from_pangea_article(article_deets)
except pexception.PangeaServiceException as error:
if VERBOSE:
print(error)
print("article with id [{}] may no longer exist in Pangea".format(str(article['id'])))
print(
"article with id [{}] may no longer exist in Pangea".format(
str(article["id"])
)
)
continue
fe = fg.add_entry()
fe.title(rss_article['title'])
fe.link({'href': rss_article['link']})
fe.guid(rss_article['guid'])
fe.pubDate(rss_article['pubDate'])
fe.content(rss_article['content'])
if rss_article.get('summary'):
fe.description(rss_article['summary'])
fe.title(rss_article["title"])
fe.link({"href": rss_article["link"]})
fe.guid(rss_article["guid"])
fe.pubDate(rss_article["pubDate"])
fe.content(rss_article["content"])
if rss_article.get("summary"):
fe.description(rss_article["summary"])
if rss_article.get('enclosure'):
enc_md = rss_article['enclosure']
if enc_md.get('type'):
if rss_article.get("enclosure"):
enc_md = rss_article["enclosure"]
if enc_md.get("type"):
fe.enclosure(
url=enc_md['url'],
type=enc_md['type'],
length=enc_md['length'])
url=enc_md["url"], type=enc_md["type"], length=enc_md["length"]
)
else:
fe.enclosure(url=enc_md['url'])
fe.enclosure(url=enc_md["url"])
if rss_article.get('media_content'):
if rss_article.get("media_content"):
#
# special handling for the RSS media extension
#
if not media_extension_loaded:
fg.load_extension('media')
fg.load_extension("media")
media_extension_loaded = True
if VERBOSE: print("media extension loaded")
if VERBOSE:
print("media extension loaded")
mc_md = rss_article['media_content']
if mc_md.get('medium'):
mc_md = rss_article["media_content"]
if mc_md.get("medium"):
fe.media.content(
url=mc_md['url'],
type=mc_md['type'],
fileSize=mc_md['fileSize'],
medium=mc_md['medium'])
url=mc_md["url"],
type=mc_md["type"],
fileSize=mc_md["fileSize"],
medium=mc_md["medium"],
)
else:
fe.media.content(url=mc_md['url'])
fe.media.content(url=mc_md["url"])
def disgorge(self, subdirectory=None):
#
@ -176,17 +199,20 @@ class PangeaFeed():
if OUTPUT_TO_FILE is True:
try:
if subdirectory is not None:
if not os.path.exists(OUTPUT_DIRECTORY + '/' + subdirectory):
os.makedirs(OUTPUT_DIRECTORY + '/' + subdirectory)
ofile = OUTPUT_DIRECTORY + '/' + subdirectory + '/' + OUTPUT_FILE_NAME
if not os.path.exists(OUTPUT_DIRECTORY + "/" + subdirectory):
os.makedirs(OUTPUT_DIRECTORY + "/" + subdirectory)
ofile = (
OUTPUT_DIRECTORY + "/" + subdirectory + "/" + OUTPUT_FILE_NAME
)
else:
if not os.path.exists(OUTPUT_DIRECTORY):
os.makedirs(OUTPUT_DIRECTORY)
ofile = OUTPUT_DIRECTORY + '/' + OUTPUT_FILE_NAME
ofile = OUTPUT_DIRECTORY + "/" + OUTPUT_FILE_NAME
self._fg.rss_file(ofile, extensions=True, pretty=True)
except OSError as fe:
print("for {} file error: ".format(ofile, str(fe)))
sys.exit(1)
if VERBOSE: print("output written to {}".format(ofile))
if VERBOSE:
print("output written to {}".format(ofile))
else:
print(self._fg.rss_str(extensions=True, pretty=True))

View file

@ -14,22 +14,23 @@
"""
import hashlib
import json
import re
import hashlib
import urllib.parse
from datetime import datetime, timezone, timedelta
from datetime import datetime, timedelta, timezone
import requests
from dateutil.parser import *
from pygea import utilities
from pygea import pexception
from pygea import plogger
from pygea import pexception, plogger, utilities
class PangeaService:
"""Interface to the Pangea API"""
_configuration_file_name = 'pygea.ini'
_api_path = '/api2/'
_configuration_file_name = "pygea.ini"
_api_path = "/api2/"
_api_key = None
# Pangea and RSS time format
@ -64,50 +65,49 @@ class PangeaService:
"videoclips",
# "videoscheduler",
# "widget",
"zone"
"zone",
]
# Position-indexed content category names
_category_types_list = [
'none', # 0 internally
'content', # 1 internally
'audio', # 2 internally
'content+audio', # 3 internally; compound type 1+2
'media', # 4 internally
'content+media', # 5 internally; compound type 1+4
'audio+media' # 6 internally; compound type 2+4
"none", # 0 internally
"content", # 1 internally
"audio", # 2 internally
"content+audio", # 3 internally; compound type 1+2
"media", # 4 internally
"content+media", # 5 internally; compound type 1+4
"audio+media", # 6 internally; compound type 2+4
]
# Content types (in the editorial sense)
# Note these also map to commands in _commands_list
_content_types_list = [
'articles',
'audioclips',
'videoclips',
'breakingnews',
'mostpopular',
'topstories'
"articles",
"audioclips",
"videoclips",
"breakingnews",
"mostpopular",
"topstories",
]
# How to format content
# (we WILL NOT use these in combination, as defined in the API)
_content_options = {
'WTF_0': 0, # Returns basically what is in database
'TEXT_ONLY': 1, # Removes all html keeping text only
'WTF_1': 2, # Returns tags as they would be displayed on the page
'MOBILE_1': 4, # Returns html as for mobile/rss feeds without
"WTF_0": 0, # Returns basically what is in database
"TEXT_ONLY": 1, # Removes all html keeping text only
"WTF_1": 2, # Returns tags as they would be displayed on the page
"MOBILE_1": 4, # Returns html as for mobile/rss feeds without
# additional stripping
'MOBILE_2': 8, # Returns html as for mobile/rss feeds with stripping
"MOBILE_2": 8, # Returns html as for mobile/rss feeds with stripping
# some html that is not supported
'MOBILE_3': 16, # Returns html as for mobile/rss feeds with some extra
"MOBILE_3": 16, # Returns html as for mobile/rss feeds with some extra
# html tags stripped
'WTF_2': 32, # Same as for Feeds + replaces recognized links with
"WTF_2": 32, # Same as for Feeds + replaces recognized links with
# internal links and wraps recognized images inside tags
'XML_TX': 64, # Used with Feeds to apply xsl transformation
'JSON': 128 # Generates json structured content
"XML_TX": 64, # Used with Feeds to apply xsl transformation
"JSON": 128, # Generates json structured content
}
def __init__(self, domain, key=None, verbose=False):
self._logger = plogger.PangeaServiceLogger()
@ -123,13 +123,21 @@ class PangeaService:
#
# preset from configuration file
#
self._max_articles = int(utilities.get_configuration_variable('runtime', 'max_articles'))
self._oldest_article = int(utilities.get_configuration_variable('runtime', 'oldest_article'))
self._content_format = utilities.get_configuration_variable('runtime', 'content_format')
self._authors_p = utilities.get_configuration_variable('runtime', 'authors_p')
self._no_media_p = utilities.get_configuration_variable('runtime', 'no_media_p')
self._content_inc_p = utilities.get_configuration_variable('runtime', 'content_inc_p')
self._verbose_p = utilities.get_configuration_variable('runtime', 'verbose_p')
self._max_articles = int(
utilities.get_configuration_variable("runtime", "max_articles")
)
self._oldest_article = int(
utilities.get_configuration_variable("runtime", "oldest_article")
)
self._content_format = utilities.get_configuration_variable(
"runtime", "content_format"
)
self._authors_p = utilities.get_configuration_variable("runtime", "authors_p")
self._no_media_p = utilities.get_configuration_variable("runtime", "no_media_p")
self._content_inc_p = utilities.get_configuration_variable(
"runtime", "content_inc_p"
)
self._verbose_p = utilities.get_configuration_variable("runtime", "verbose_p")
self._domain = domain
@ -140,13 +148,13 @@ class PangeaService:
if verbose:
self._verbose_p = verbose
if self._verbose_p:
print('verbose output')
print("verbose output")
#
# These two dictionaries index the category information
# _all_categories is indexed by category name; _rev_categories is indexed by id
#
self._all_categories = {}
self._rev_categories = {'0': 'none'}
self._rev_categories = {"0": "none"}
# Acquire the categories registered for the supplied domain
# Invokes an API call!
@ -162,12 +170,11 @@ class PangeaService:
# Reset the category dictionaries
self._all_categories = {}
self._rev_categories = {'0': 'none'}
self._rev_categories = {"0": "none"}
# Acquire the categories registered for the supplied domain (API call)
self.get_categories()
def set_api_key(self, key):
"""Sets the API key that allows access to the API"""
self._api_key = key
@ -180,24 +187,20 @@ class PangeaService:
"""Return full list of content types."""
return self._content_types_list
def content_type_name(self, type_index):
"""Returns name of a content type given its index."""
if type_index > len(self._content_types_list):
return False
return self._content_types_list[type_index]
def commands(self):
"""Return the list of possible commands."""
return self._commands_list
def category_types(self):
"""Return of list of possible category types."""
return self._category_types_list
def category_info(self, category_name):
"""Return rich information about a category."""
if self._all_categories.get(category_name):
@ -208,12 +211,10 @@ class PangeaService:
"""Return the dictionary of content format options."""
return self._content_options
def is_valid_command(self, cmd):
"""Test if the provided command is valid and implemented."""
return self._is_implemented(cmd)
def is_valid_category(self, category_name):
"""Test if a provided category name is valid.
@ -243,104 +244,111 @@ class PangeaService:
rss = {}
sh = hashlib.sha256()
sh.update(article['url'].encode('utf8'))
rss['guid'] = sh.hexdigest()
rss['title'] = article['title']
rss['link'] = article['url']
sh.update(article["url"].encode("utf8"))
rss["guid"] = sh.hexdigest()
rss["title"] = article["title"]
rss["link"] = article["url"]
if article.get('introduction'):
rss['summary'] = article['introduction']
if article.get("introduction"):
rss["summary"] = article["introduction"]
if article.get('authors'):
as_str = ''
for auth in article['authors']:
as_str += auth['lastname'] + ", " + auth['firstname'] + ";"
if len(article['authors']) > 1:
if article.get("authors"):
as_str = ""
for auth in article["authors"]:
as_str += auth["lastname"] + ", " + auth["firstname"] + ";"
if len(article["authors"]) > 1:
as_str = as_str[0 : (len(as_str) - 2)]
rss['authors'] = as_str
rss["authors"] = as_str
if article.get('image'):
if article.get("image"):
# Seek the enclosure details from the image's server
metadata = utilities.get_media_metadata(article['image'])
metadata = utilities.get_media_metadata(article["image"])
if metadata:
rss['enclosure'] = {
'url': article['image'],
'type': metadata['content_type'],
'length': metadata['content_length']
rss["enclosure"] = {
"url": article["image"],
"type": metadata["content_type"],
"length": metadata["content_length"],
}
else:
rss['enclosure'] = {'url': article['image']}
rss["enclosure"] = {"url": article["image"]}
if rss.get('enclosure'):
if rss.get("enclosure"):
if self._verbose_p:
print(
"article contains an enclosure:\n"
+ json.dumps(rss['enclosure'], indent=4))
+ json.dumps(rss["enclosure"], indent=4)
)
# 'audioclips' and 'videoclips' occasionally have no text content
if article.get('content'):
rss['content'] = article['content']
if article.get("content"):
rss["content"] = article["content"]
else:
rss['content'] = ''
rss["content"] = ""
# all articles are required to have one category (their 'zone')
if bool(article.get('zone')):
zone_id = article['zone']
if bool(article.get("zone")):
zone_id = article["zone"]
if isinstance(zone_id, int):
zone_id = str(zone_id)
if self._rev_categories.get('zone_id'):
rss['categories'] = self._rev_categories[zone_id]
elif article.get('zoneTitle'):
rss['categories'] = article['zoneTitle']
if self._rev_categories.get("zone_id"):
rss["categories"] = self._rev_categories[zone_id]
elif article.get("zoneTitle"):
rss["categories"] = article["zoneTitle"]
# Pangea time is always in GMT
# Pangea time is formatted as: 2024-07-31T11:46:28.673
# (though occasionally: 2024-07-31T11:46:28)
# Convert to RSS time (RFC822)
if not article.get('pubDate'):
if not article.get("pubDate"):
datetime_obj = datetime.now(timezone.utc)
else:
if re.match('.*?([.][0-9]+)$', article['pubDate']):
datetime_obj = datetime.strptime(article['pubDate'], self.TIME_FMT)
if re.match(".*?([.][0-9]+)$", article["pubDate"]):
datetime_obj = datetime.strptime(article["pubDate"], self.TIME_FMT)
else:
datetime_obj = datetime.strptime(article['pubDate'], self.TIME_FMT_I)
datetime_obj = datetime.strptime(article["pubDate"], self.TIME_FMT_I)
formatted_time = datetime_obj.strftime(self.RFC822_FMT)
rss['pubDate'] = formatted_time + '+0000'
rss["pubDate"] = formatted_time + "+0000"
# Media types
if bool(article.get('videos')):
if len(article['videos']) > 0:
url = article['videos'][0]['url']
if bool(article.get("videos")):
if len(article["videos"]) > 0:
url = article["videos"][0]["url"]
metadata = utilities.get_media_metadata(url)
if metadata:
rss['media_content'] = {
'url': url,
'type': metadata['content_type'],
'fileSize': metadata['content_length'],
'medium': 'video'
rss["media_content"] = {
"url": url,
"type": metadata["content_type"],
"fileSize": metadata["content_length"],
"medium": "video",
}
if self._verbose_p: print("article contains video media:\n"
+ json.dumps(rss['media_content'], indent=4))
if self._verbose_p:
print(
"article contains video media:\n"
+ json.dumps(rss["media_content"], indent=4)
)
else:
rss['media_content'] = {'url': url}
rss["media_content"] = {"url": url}
if bool(article.get('audios')):
if len(article['audios']) > 0:
url = article['audios'][0]['url']
if bool(article.get("audios")):
if len(article["audios"]) > 0:
url = article["audios"][0]["url"]
metadata = utilities.get_media_metadata(url)
if metadata:
rss['media_content'] = {
'url': url,
'type': metadata['content_type'],
'fileSize': metadata['content_length'],
'medium': 'audio'
rss["media_content"] = {
"url": url,
"type": metadata["content_type"],
"fileSize": metadata["content_length"],
"medium": "audio",
}
if self._verbose_p: print("article contains audio media:\n"
+ json.dumps(rss['media_content'], indent=4))
if self._verbose_p:
print(
"article contains audio media:\n"
+ json.dumps(rss["media_content"], indent=4)
)
else:
rss['media_content'] = {'url': url}
rss["media_content"] = {"url": url}
return rss
@ -350,22 +358,19 @@ class PangeaService:
def test_pangea_interface(self):
"""TESTING Basic connectivity test"""
return self._retrieve_content('test')
return self._retrieve_content("test")
def empty(self):
"""
TESTING Returns nothing but, if command formatted properly, with proper API
key, HTTP status will be 200
"""
res = self._retrieve_content('empty')
res = self._retrieve_content("empty")
return res
def config(self):
"""TESTING Returns configuration information about the API"""
return self._retrieve_content('config')
return self._retrieve_content("config")
def get_content(self, content_type, optional_args_kw=None):
"""
@ -381,13 +386,12 @@ class PangeaService:
# because Pangea does not uniformly apply 'count' and 'daycount' parameters
# to all content generation, we'll do that here (unless we're told to ignore).
if optional_args_kw is not None:
if optional_args_kw.get('filter_date') is not None:
if optional_args_kw.get('filter_date') is False:
if optional_args_kw.get("filter_date") is not None:
if optional_args_kw.get("filter_date") is False:
return res
return self._threshold(res)
def query_content(self, query, optional_args_kw=None):
"""
Use this method to get articles based on textual search.
@ -399,18 +403,18 @@ class PangeaService:
# make the topic/category URL-safe
if optional_args_kw is None:
optional_args_kw = {}
optional_args_kw['q'] = urllib.parse.quote_plus(query)
optional_args_kw["q"] = urllib.parse.quote_plus(query)
try:
res = self._retrieve_content('search', optional_args_kw)
res = self._retrieve_content("search", optional_args_kw)
except pexception.PangeaServiceException as e:
raise pexception.PangeaServiceException(str(e)) from e
#
# because Pangea does not uniformly apply 'count' and 'daycount' parameters
# to all content generation, we'll do that here (unless we're told to ignore).
if optional_args_kw.get('filter_date') is not None:
if optional_args_kw.get('filter_date') is False:
if optional_args_kw.get("filter_date") is not None:
if optional_args_kw.get("filter_date") is False:
return res
return self._threshold(res)
@ -422,12 +426,12 @@ class PangeaService:
"""
if optional_args_kw is None:
optional_args_kw = {}
if 'MediaData' not in optional_args_kw.keys():
optional_args_kw['MediaData'] = 'true'
if "MediaData" not in optional_args_kw.keys():
optional_args_kw["MediaData"] = "true"
optional_args_kw['itemid'] = article_id
optional_args_kw["itemid"] = article_id
try:
res = self._retrieve_content('articles', optional_args_kw)
res = self._retrieve_content("articles", optional_args_kw)
except pexception.PangeaServiceException as e:
raise pexception.PangeaServiceException(str(e)) from e
@ -441,21 +445,20 @@ class PangeaService:
"""
if optional_args_kw is None:
optional_args_kw = {}
if 'Content' not in optional_args_kw.keys():
optional_args_kw['Content'] = 'true'
if 'MediaData' not in optional_args_kw.keys():
optional_args_kw['MediaData'] = 'true'
if "Content" not in optional_args_kw.keys():
optional_args_kw["Content"] = "true"
if "MediaData" not in optional_args_kw.keys():
optional_args_kw["MediaData"] = "true"
optional_args_kw['itemid'] = article_id
optional_args_kw["itemid"] = article_id
try:
res = self._retrieve_content('articledetail', optional_args_kw)
res = self._retrieve_content("articledetail", optional_args_kw)
except pexception.PangeaServiceException as e:
raise pexception.PangeaServiceException(str(e)) from e
# print(json.dumps(res, indent=4))
return res
def get_categories(self, types=None):
"""
Categories are defined on a PER DOMAIN basis, so to assure the user
@ -466,14 +469,16 @@ class PangeaService:
return self._all_categories
if types is None:
types = 'acm' # get all content types 'a', 'c', 'm' at once
types = "acm" # get all content types 'a', 'c', 'm' at once
args = {'type': types}
args = {"type": types}
try:
url = self._build_url('zone', args)
url = self._build_url("zone", args)
response = requests.get(url, timeout=20)
if response.status_code != 200:
msg = "HTP request to {} failed with status code [{}]".format(self._domain, str(response.status_code))
msg = "HTP request to {} failed with status code [{}]".format(
self._domain, str(response.status_code)
)
self._logger.error(msg)
raise pexception.PangeaServiceException(msg)
a_cat = json.loads(response.text)
@ -482,12 +487,14 @@ class PangeaService:
all_keys = self._all_categories.keys()
for c in a_cat:
if not c['name'] in all_keys:
self._all_categories[c['name']] = c
self._rev_categories[str(c['id'])] = c['name']
if not c["name"] in all_keys:
self._all_categories[c["name"]] = c
self._rev_categories[str(c["id"])] = c["name"]
if c['type'] >= len(self._category_types_list):
msg = "ERROR: unknown type: {} on id [{}], name: {}".format(c['type'], str(c['id']), c['name'])
if c["type"] >= len(self._category_types_list):
msg = "ERROR: unknown type: {} on id [{}], name: {}".format(
c["type"], str(c["id"]), c["name"]
)
self._logger.warning(msg)
raise pexception.PangeaServiceException(msg)
@ -500,9 +507,8 @@ class PangeaService:
def _boolean_string(self, boolean_value):
"""Convert a boolean to a string for the API"""
if boolean_value is True:
return 'true'
return 'false'
return "true"
return "false"
def _retrieve_content(self, command, args_kw=None):
"""Minimalist content retriever"""
@ -510,22 +516,22 @@ class PangeaService:
# print('request URL: ' + url)
response = requests.get(url, timeout=20)
if response.status_code != 200:
msg = "received status code {} from {}".format(str(response.status_code), url)
msg = "received status code {} from {}".format(
str(response.status_code), url
)
self._logger.error(msg)
raise pexception.PangeaServiceException(msg)
if command == 'empty':
return json.loads('[]')
if command == "empty":
return json.loads("[]")
return json.loads(response.text)
def _is_implemented(self, cmd):
"""Test if a provided string references an actual command"""
if cmd in self._commands_list:
return True
return False
def _threshold(self, articles):
"""Assure article-count and oldest-article settings are obeyed. Turns out,
only a few API commands accept these arguments, though in general our
@ -539,26 +545,33 @@ class PangeaService:
#
# pubDate may contain milliseconds, or not
#
if re.match('.*?([.][0-9]+)$', blob['pubDate']):
dt = datetime.strptime(blob['pubDate'], self.TIME_FMT)
if re.match(".*?([.][0-9]+)$", blob["pubDate"]):
dt = datetime.strptime(blob["pubDate"], self.TIME_FMT)
dt.replace(microsecond=0)
else:
dt = datetime.strptime(blob['pubDate'], self.TIME_FMT_I)
dt = datetime.strptime(blob["pubDate"], self.TIME_FMT_I)
old_dt = datetime.now() - delta
if dt < old_dt:
if self._verbose_p:
print("article with ID {} is too old [{}]".format(str(blob['id']), dt.strftime(self.TIME_FMT_I)))
print(
"article with ID {} is too old [{}]".format(
str(blob["id"]), dt.strftime(self.TIME_FMT_I)
)
)
else:
article_count += 1
output.append(blob)
if self._verbose_p & (len(output) < len(articles)):
print("request returned {} articles; newest {} processed".format(str(len(articles)), str(len(output))))
print(
"request returned {} articles; newest {} processed".format(
str(len(articles)), str(len(output))
)
)
reordered = output[::-1]
return reordered
def _build_url(self, cmd, args_kw=None):
"""Construct a properly-formatted Pangea API URL"""
if not self._is_implemented(cmd):
@ -567,7 +580,9 @@ class PangeaService:
raise pexception.PangeaServiceException(msg)
if not self._api_key:
msg = "ERROR: no API key supplied (check config file {})".format(self._configuration_file_name)
msg = "ERROR: no API key supplied (check config file {})".format(
self._configuration_file_name
)
self._logger.error(msg)
raise pexception.PangeaServiceException(msg)
@ -577,71 +592,98 @@ class PangeaService:
# this switch verifies (and/or completes) the argument array
match cmd:
# simple commands
case 'empty' | 'test':
case "empty" | "test":
pass
# search
case 'search':
if 'q' not in args_kw.keys():
case "search":
if "q" not in args_kw.keys():
msg = "ERROR: [{}] requires parameter 'q'".format(cmd)
self._logger.error(msg)
raise pexception.PangeaServiceException(msg)
if 'Authors' not in args_kw.keys():
args_kw['Authors'] = self._boolean_string(self._authors_p)
if "Authors" not in args_kw.keys():
args_kw["Authors"] = self._boolean_string(self._authors_p)
if 'count' not in args_kw.keys():
args_kw['count'] = self._max_articles
if "count" not in args_kw.keys():
args_kw["count"] = self._max_articles
if 'daycount' not in args_kw.keys():
args_kw['daycount'] = self._oldest_article
if "daycount" not in args_kw.keys():
args_kw["daycount"] = self._oldest_article
# single-item/detail commands
case 'articledetail' | 'blogitem' | 'comment' | 'author' | 'documentdetail' | 'factcheckdetail' | 'infographicdetail' | 'polldetail' | 'quizdetail':
if 'itemid' not in args_kw.keys():
case (
"articledetail"
| "blogitem"
| "comment"
| "author"
| "documentdetail"
| "factcheckdetail"
| "infographicdetail"
| "polldetail"
| "quizdetail"
):
if "itemid" not in args_kw.keys():
msg = "ERROR: [{}] command requires arg 'itemid'".format(cmd)
self._logger.error(msg)
raise pexception.PangeaServiceException(msg)
if 'Content' not in args_kw.keys():
args_kw['Content'] = self._boolean_string(self._content_inc_p)
if "Content" not in args_kw.keys():
args_kw["Content"] = self._boolean_string(self._content_inc_p)
if 'Authors' not in args_kw.keys():
args_kw['Authors'] = self._boolean_string(self._authors_p)
if "Authors" not in args_kw.keys():
args_kw["Authors"] = self._boolean_string(self._authors_p)
if 'html' not in args_kw.keys():
args_kw['html'] = self._content_options[self._content_format]
if "html" not in args_kw.keys():
args_kw["html"] = self._content_options[self._content_format]
case 'authorid':
if 'authorid' not in args_kw.keys():
case "authorid":
if "authorid" not in args_kw.keys():
msg = "ERROR: [{}] command requires arg 'authorid'".format(cmd)
self._logger.error(msg)
raise pexception.PangeaServiceException(msg)
case 'zone':
if ('zoneid' not in args_kw.keys()) & ('type' not in args_kw.keys()):
msg = "ERROR: [{}] command requires args 'zoneid' or 'type'".format(cmd)
case "zone":
if ("zoneid" not in args_kw.keys()) & ("type" not in args_kw.keys()):
msg = "ERROR: [{}] command requires args 'zoneid' or 'type'".format(
cmd
)
self._logger.error(msg)
raise pexception.PangeaServiceException(msg)
# content commands
case 'articles' | 'audioclips' | 'videoclips' | 'breakingnews' | 'mostpopular' | 'topstories' | 'blogitem':
if 'Authors' not in args_kw.keys():
args_kw['Authors'] = self._boolean_string(self._authors_p)
case (
"articles"
| "audioclips"
| "videoclips"
| "breakingnews"
| "mostpopular"
| "topstories"
| "blogitem"
):
if "Authors" not in args_kw.keys():
args_kw["Authors"] = self._boolean_string(self._authors_p)
if 'count' not in args_kw.keys():
args_kw['count'] = self._max_articles
if "count" not in args_kw.keys():
args_kw["count"] = self._max_articles
if 'daycount' not in args_kw.keys():
args_kw['daycount'] = self._oldest_article
if "daycount" not in args_kw.keys():
args_kw["daycount"] = self._oldest_article
# base for all types of command (apikey needs to be first arg)
url = "https://" + self._domain + self._api_path + cmd + '?apikey=' + self._api_key
url = (
"https://"
+ self._domain
+ self._api_path
+ cmd
+ "?apikey="
+ self._api_key
)
# process the arg array to finish construction of the URL
for key, value in args_kw.items():
# remove this one
if key == 'filter_date':
if key == "filter_date":
continue
if not isinstance(key, str):
@ -649,7 +691,7 @@ class PangeaService:
if not isinstance(value, str):
value = str(value)
url += '&' + key + '=' + value
url += "&" + key + "=" + value
if self._verbose_p:
print("URL for request: " + url)

View file

@ -2,5 +2,6 @@
A less-generic Exception for the Pangea API Service
"""
class PangeaServiceException(Exception):
"""An Exception specific to this API"""

View file

@ -1,39 +1,43 @@
"""
Logger for the Pangea API Service
"""
import logging
from pygea import utilities
class PangeaServiceLogger:
"""
Mostly, so that someone can replace this with a production logger later.
"""
_configuration_file_name = 'pygea.ini'
_configuration_file_name = "pygea.ini"
_levels = {
"NOTSET": 0,
"DEBUG": 10,
"INFO": 20,
"WARNING": 30,
"ERROR": 40,
"CRITICAL": 50
"CRITICAL": 50,
}
def __init__(self):
#
# preset from configuration file
#
lf = utilities.get_configuration_variable('logging', 'log_file')
dl = utilities.get_configuration_variable('logging', 'default_log_level')
lf = utilities.get_configuration_variable("logging", "log_file")
dl = utilities.get_configuration_variable("logging", "default_log_level")
if (dl is None) | (dl not in self._levels):
dl = 'DEBUG'
dl = "DEBUG"
self._logger = logging.getLogger('PangeaLogger')
self._logger = logging.getLogger("PangeaLogger")
self._logger.propagate = False
logging.basicConfig(
filename=lf,
level=self._levels[dl],
format='[%(asctime)s] %(levelname)s: %(message)s')
format="[%(asctime)s] %(levelname)s: %(message)s",
)
def debug(self, message):
"""Debug message"""

View file

@ -7,11 +7,13 @@
"""
import hashlib
import os
from configparser import ConfigParser, NoOptionError, NoSectionError
from urllib.parse import urlparse
from configparser import ConfigParser, NoSectionError, NoOptionError
import requests
from bs4 import BeautifulSoup
def acquire(url):
"""Simple wrapper over the request object."""
response = requests.get(url, timeout=20)
@ -20,26 +22,30 @@ def acquire(url):
if response.status_code == 200:
content = response.text
else:
print("Failed to retrieve the web page. Status code: " + str(response.status_code))
print(
"Failed to retrieve the web page. Status code: " + str(response.status_code)
)
return None
return content
def parse_url_elements(url):
"""URL hackery - returns domain and Pangea article ID from a provided URL"""
out = {}
parts = urlparse(url)
out['domain'] = parts.hostname
out["domain"] = parts.hostname
# article ID is the file name at the end of the path ('324534.html')
more_parts = parts.path.split('/')
more_parts = parts.path.split("/")
file = more_parts[len(more_parts) - 1]
file_parts = file.split('.')
out['article_id'] = file_parts[0]
file_parts = file.split(".")
out["article_id"] = file_parts[0]
return out
def get_webpage_metadata(page_url):
"""Get HTML metadata elements from a webpage."""
parsed = urlparse(page_url)
@ -52,116 +58,124 @@ def get_webpage_metadata(page_url):
if html_content == None:
return None
soup = BeautifulSoup(html_content, 'html.parser')
meta_tags = soup.find_all('meta')
soup = BeautifulSoup(html_content, "html.parser")
meta_tags = soup.find_all("meta")
metadata = {}
for tag in meta_tags:
if 'name' in tag.attrs:
name = tag.attrs['name']
content = tag.attrs.get('content', '')
if "name" in tag.attrs:
name = tag.attrs["name"]
content = tag.attrs.get("content", "")
metadata[name] = content
elif 'property' in tag.attrs: # For OpenGraph metadata
prop = tag.attrs['property']
content = tag.attrs.get('content', '')
elif "property" in tag.attrs: # For OpenGraph metadata
prop = tag.attrs["property"]
content = tag.attrs.get("content", "")
metadata[prop] = content
# add useful language property
html = soup.find_all('html')
metadata['language'] = html[0]['lang']
html = soup.find_all("html")
metadata["language"] = html[0]["lang"]
# add links
link_tags = soup.find_all('link')
link_tags = soup.find_all("link")
for tag in link_tags:
if 'rel' in tag.attrs:
if "rel" in tag.attrs:
# print(json.dumps(tag.attrs, indent=4))
if 'alternate' in tag.attrs['rel']:
if 'icon' in tag.attrs['rel']:
metadata['favicon'] = 'https://' + domain + tag.attrs.get('href')
if tag.attrs['rel'][0] == 'canonical':
metadata['canonical'] = tag.attrs.get('href')
if "alternate" in tag.attrs["rel"]:
if "icon" in tag.attrs["rel"]:
metadata["favicon"] = "https://" + domain + tag.attrs.get("href")
if tag.attrs["rel"][0] == "canonical":
metadata["canonical"] = tag.attrs.get("href")
return metadata
def get_media_metadata(image_url):
"""Get metadata for media content from website (via response headers)."""
response = requests.head(image_url, timeout=20)
meta = None
if response.status_code == 200:
meta = {
"content_type": response.headers['Content-Type'],
"content_length": response.headers['Content-Length']
"content_type": response.headers["Content-Type"],
"content_length": response.headers["Content-Length"],
}
return meta
def make_boolean(bool_str):
"""Convert a boolean string to an actual Boolean."""
in_str = bool_str.lower()
if (in_str != 'true') & (in_str != 'false'):
if (in_str != "true") & (in_str != "false"):
return True # following Python conventions
if in_str == 'true':
if in_str == "true":
return True
return False
def get_api_key():
"""Return the API key. PYGEA_API_KEY env var takes precedence over pygea.ini.
Returns None if neither source provides a value."""
env_key = os.environ.get('PYGEA_API_KEY')
env_key = os.environ.get("PYGEA_API_KEY")
if env_key:
return env_key
config = ConfigParser()
config.read('pygea.ini')
config.read("pygea.ini")
try:
return config.get('runtime', 'api_key')
return config.get("runtime", "api_key")
except (NoSectionError, NoOptionError):
return None
def get_configuration_variable(section, vname):
"""Retrieve values from the configuration file."""
config = ConfigParser()
config.read('pygea.ini')
config.read("pygea.ini")
value = config.get(section, vname)
if (value == 'True') | (value == 'False'):
if (value == "True") | (value == "False"):
value = make_boolean(value)
return value
def is_domain_name(domain):
"""Does the provided string resemble a domain name?"""
if any(char in domain for char in "."):
return True
return False
def hash_site_metadata(metadata):
"""Create a secure hash of website HTTP meta headers to use as an RSS/ATOM ID."""
sh = hashlib.sha256()
for key in metadata.keys():
sh.update(key.encode('utf8') + metadata[key].encode('utf8'))
sh.update(key.encode("utf8") + metadata[key].encode("utf8"))
digest = sh.hexdigest()
return digest
def rss_namespace_supported(prop):
"""Determine if a provided RSS/XML namespace is valid in the FeedGen RSS package."""
supported_namespaces = [
'dc',
'geo',
'gen_entry',
'media',
'podcast',
'podcast_entry',
'syndication',
'torrent'
"dc",
"geo",
"gen_entry",
"media",
"podcast",
"podcast_entry",
"syndication",
"torrent",
]
if prop in supported_namespaces:
return True
return False
def rss_namespace_for_property(prop):
"""Returns the XML namespace for a specified <channel> or <item>
property from among a list of the most popular namespace schemes
@ -171,25 +185,25 @@ def rss_namespace_for_property(prop):
https://validator.w3.org/feed/docs/howto/declare_namespaces.html
"""
known_namespaces = {
'content': 'http://purl.org/rss/1.0/modules/content/', # content
'dc': 'http://purl.org/dc/elements/1.1/', # Dublin Core
'atom': 'http://www.w3.org/2005/Atom', # ATOM
'sy': 'http://purl.org/rss/1.0/modules/syndication/', # Syndication
'admin': 'http://webns.net/mvcb/',
'feedburner': 'http://rssnamespace.org/feedburner/ext/1.0', # Feedburner
'cc': 'http://web.resource.org/cc/', # copyrights
'geo': 'http://www.w3.org/2003/01/geo/wgs84_pos#',
'opensearch': 'http://a9.com/-/spec/opensearch/1.1/', # OpenSearch
'itunes': 'http://www.itunes.com/dtds/podcast-1.0.dtd', # Apple iTunes
'blogChannel': 'http://backend.userland.com/blogChannelModule', # BlogChannel
'media': 'http://search.yahoo.com/mrss/', # media RSS
'icbm': 'http://postneo.com/icbm', # ICBM
'cf': 'http://www.microsoft.com/schemas/rss/core/2005', # a Microsoft thing
'podcast': 'https://podcastindex.org/namespace/1.0', # Podcast RSS
'xhtml': 'http://www.w3.org/1999/xhtml' # XHTML
"content": "http://purl.org/rss/1.0/modules/content/", # content
"dc": "http://purl.org/dc/elements/1.1/", # Dublin Core
"atom": "http://www.w3.org/2005/Atom", # ATOM
"sy": "http://purl.org/rss/1.0/modules/syndication/", # Syndication
"admin": "http://webns.net/mvcb/",
"feedburner": "http://rssnamespace.org/feedburner/ext/1.0", # Feedburner
"cc": "http://web.resource.org/cc/", # copyrights
"geo": "http://www.w3.org/2003/01/geo/wgs84_pos#",
"opensearch": "http://a9.com/-/spec/opensearch/1.1/", # OpenSearch
"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd", # Apple iTunes
"blogChannel": "http://backend.userland.com/blogChannelModule", # BlogChannel
"media": "http://search.yahoo.com/mrss/", # media RSS
"icbm": "http://postneo.com/icbm", # ICBM
"cf": "http://www.microsoft.com/schemas/rss/core/2005", # a Microsoft thing
"podcast": "https://podcastindex.org/namespace/1.0", # Podcast RSS
"xhtml": "http://www.w3.org/1999/xhtml", # XHTML
}
components = prop.split(':')
components = prop.split(":")
if known_namespaces.get(components[0]):
return known_namespaces[components[0]]

16
treefmt.nix Normal file
View file

@ -0,0 +1,16 @@
_: {
projectRootFile = "flake.nix";
programs = {
nixfmt.enable = true;
black.enable = true;
isort = {
enable = true;
profile = "black";
};
shfmt.enable = true;
};
}