diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..f64c047 --- /dev/null +++ b/flake.lock @@ -0,0 +1,46 @@ +{ + "nodes": { + "nixpkgs": { + "locked": { + "lastModified": 1774386573, + "narHash": "sha256-4hAV26quOxdC6iyG7kYaZcM3VOskcPUrdCQd/nx8obc=", + "rev": "46db2e09e1d3f113a13c0d7b81e2f221c63b8ce9", + "revCount": 969196, + "type": "tarball", + "url": "https://api.flakehub.com/f/pinned/NixOS/nixpkgs/0.1.969196%2Brev-46db2e09e1d3f113a13c0d7b81e2f221c63b8ce9/019d279e-af65-79ce-92be-5dee7b1e36d4/source.tar.gz" + }, + "original": { + "type": "tarball", + "url": "https://flakehub.com/f/NixOS/nixpkgs/0.1" + } + }, + "root": { + "inputs": { + "nixpkgs": "nixpkgs", + "treefmt-nix": "treefmt-nix" + } + }, + "treefmt-nix": { + "inputs": { + "nixpkgs": [ + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1773297127, + "narHash": "sha256-6E/yhXP7Oy/NbXtf1ktzmU8SdVqJQ09HC/48ebEGBpk=", + "owner": "numtide", + "repo": "treefmt-nix", + "rev": "71b125cd05fbfd78cab3e070b73544abe24c5016", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "treefmt-nix", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..8c3d972 --- /dev/null +++ b/flake.nix @@ -0,0 +1,138 @@ +{ + description = "pygea - Pangea RSS feed generator"; + + inputs = { + nixpkgs.url = "https://flakehub.com/f/NixOS/nixpkgs/0.1"; + treefmt-nix = { + url = "github:numtide/treefmt-nix"; + inputs.nixpkgs.follows = "nixpkgs"; + }; + }; + + outputs = + { + self, + nixpkgs, + treefmt-nix, + ... + }: + let + systems = [ "x86_64-linux" ]; + forAllSystems = + fn: + nixpkgs.lib.genAttrs systems ( + system: + fn ( + import nixpkgs { + inherit system; + config.allowUnfree = true; + } + ) + ); + + mkTreefmtConfig = pkgs: (treefmt-nix.lib.evalModule pkgs ./treefmt.nix).config; + in + { + formatter = forAllSystems (pkgs: (mkTreefmtConfig pkgs).build.wrapper); + + packages = forAllSystems ( + pkgs: + let + pkg = pkgs.callPackage ./nix/packages/pygea/package.nix { }; + in + { + pygea = pkg; + default = pkg; + } + ); + + apps = forAllSystems ( + pkgs: + let + package = self.packages.${pkgs.stdenv.hostPlatform.system}.default; + in + { + pygea = { + type = "app"; + program = "${package}/bin/pygea"; + meta.description = "pygea runtime"; + }; + default = { + type = "app"; + program = "${package}/bin/pygea"; + meta.description = "pygea runtime"; + }; + } + ); + + checks = forAllSystems ( + pkgs: + let + inherit (pkgs.stdenv.hostPlatform) system; + exportedPackage = self.packages.${system}.default; + treefmtConfig = mkTreefmtConfig pkgs; + smokePython = pkgs.python313.withPackages (ps: [ + ps.requests + ps.beautifulsoup4 + ps.feedgen + ps."python-dateutil" + ]); + + smokeCheck = pkgs.runCommand "pygea-smoke" { nativeBuildInputs = [ smokePython ]; } '' + export PYTHONPATH="${exportedPackage}/${pkgs.python313.sitePackages}:$PYTHONPATH" + python - <<'PY' + from pathlib import Path + + for source_file in Path("${./.}/pygea").glob("*.py"): + compile(source_file.read_text(encoding="utf-8"), str(source_file), "exec") + PY + python -c "import pygea; import pygea.utilities; import pygea.pexception" + mkdir -p "$out" + touch "$out/passed" + ''; + + deadnixCheck = pkgs.runCommand "pygea-deadnix" { nativeBuildInputs = [ pkgs.deadnix ]; } '' + cd ${./.} + deadnix --fail . + mkdir -p "$out" + touch "$out/passed" + ''; + + statixCheck = pkgs.runCommand "pygea-statix" { nativeBuildInputs = [ pkgs.statix ]; } '' + cd ${./.} + statix check + mkdir -p "$out" + touch "$out/passed" + ''; + in + { + devshell-default = self.devShells.${system}.default; + formatter = treefmtConfig.build.wrapper; + package-default = exportedPackage; + treefmt = treefmtConfig.build.check ./.; + smoke = smokeCheck; + deadnix = deadnixCheck; + statix = statixCheck; + } + ); + + devShells = forAllSystems ( + pkgs: + let + treefmtConfig = mkTreefmtConfig pkgs; + in + { + default = pkgs.mkShell { + packages = [ + pkgs.python313 + pkgs.uv + self.packages.${pkgs.stdenv.hostPlatform.system}.default + treefmtConfig.build.wrapper + pkgs.deadnix + pkgs.statix + ]; + }; + } + ); + }; +} diff --git a/nix/packages/pygea/package.nix b/nix/packages/pygea/package.nix new file mode 100644 index 0000000..b1d7e61 --- /dev/null +++ b/nix/packages/pygea/package.nix @@ -0,0 +1,36 @@ +{ + lib, + python3Packages, +}: + +python3Packages.buildPythonApplication { + pname = "pygea"; + version = "0.1.0"; + pyproject = true; + + src = lib.cleanSource ../../..; + + build-system = [ + python3Packages.hatchling + ]; + + dependencies = [ + python3Packages.requests + python3Packages.beautifulsoup4 + python3Packages.feedgen + python3Packages.python-dateutil + ]; + + pythonImportsCheck = [ + "pygea" + "pygea.utilities" + "pygea.pexception" + ]; + + meta = { + description = "Pangea RSS feed generator"; + homepage = "https://gitlab.com/guardianproject-ops/pygea"; + license = lib.licenses.lgpl3Plus; + mainProgram = "pygea"; + }; +} diff --git a/pygea/main.py b/pygea/main.py index c3be366..0a4a3ff 100644 --- a/pygea/main.py +++ b/pygea/main.py @@ -1,16 +1,16 @@ """Pygea main entry point""" + import hashlib import json import os +from pygea import utilities from pygea.pangeafeed import PangeaFeed from pygea.pexception import PangeaServiceException -from pygea import utilities - -OUTPUT_TO_FILE = utilities.get_configuration_variable('results', 'output_to_file_p') -OUTPUT_FILE_NAME = utilities.get_configuration_variable('results', 'output_file_name') -OUTPUT_DIRECTORY = utilities.get_configuration_variable('results', 'output_directory') +OUTPUT_TO_FILE = utilities.get_configuration_variable("results", "output_to_file_p") +OUTPUT_FILE_NAME = utilities.get_configuration_variable("results", "output_file_name") +OUTPUT_DIRECTORY = utilities.get_configuration_variable("results", "output_directory") def write_manifest(categories): @@ -22,61 +22,71 @@ def write_manifest(categories): if not os.path.exists(output_directory): os.makedirs(output_directory) - manifest_path = os.path.join(output_directory, 'manifest.json') - with open(manifest_path, 'w', encoding='utf-8') as mfile: - json.dump({'categories': categories}, mfile, indent=2, ensure_ascii=False) - mfile.write('\n') + manifest_path = os.path.join(output_directory, "manifest.json") + with open(manifest_path, "w", encoding="utf-8") as mfile: + json.dump({"categories": categories}, mfile, indent=2, ensure_ascii=False) + mfile.write("\n") + def main(): # Feeds are generated for a single, specified, domain - domain = 'www.martinoticias.com' + domain = "www.martinoticias.com" args = { # tuple values: # [0] category name or a string representing a content query # [1] only the newest content desired (as configured in pygea.ini)? # [2] special content_type for this category only (from the approved list of types) - 'categories': [ - ('Titulares',True, None), - ('Cuba', True, None), - ('América Latina', True, None), - ('Info Martí ', False, None), # YES! this category name has a space character at the end! - ('Noticiero Martí Noticias', True, None) + "categories": [ + ("Titulares", True, None), + ("Cuba", True, None), + ("América Latina", True, None), + ( + "Info Martí ", + False, + None, + ), # YES! this category name has a space character at the end! + ("Noticiero Martí Noticias", True, None), ], - 'default_content_type': "articles" + "default_content_type": "articles", } # TWO OPTIONS from the args defined above: # 1. Generate a single feed from the defined categories - #try: + # try: # pf = PangeaFeed(domain, args) # pf.acquire_content() # pf.generate_feed() # pf.disgorge() - #except PangeaServiceException as error: + # except PangeaServiceException as error: # print(error) # 2. Generate different feeds for each defined category try: manifest_categories = [] - for cat_tuple in args['categories']: + for cat_tuple in args["categories"]: # form new args for each category/query - newargs = { - 'categories': [cat_tuple], - 'default_content_type': "articles" - } + newargs = {"categories": [cat_tuple], "default_content_type": "articles"} pf = PangeaFeed(domain, newargs) pf.acquire_content() pf.generate_feed() # put each feed into a different sub-directory - feed_subdir = hashlib.md5(cat_tuple[0].encode('utf-8')).hexdigest()[:7] + feed_subdir = hashlib.md5(cat_tuple[0].encode("utf-8")).hexdigest()[:7] pf.disgorge(feed_subdir) - manifest_categories.append({ - 'name': cat_tuple[0], - 'short-hash': feed_subdir, - 'local-path': os.path.join(feed_subdir, OUTPUT_FILE_NAME).replace(os.sep, '/') - }) - print("feed for {} output to sub-directory {}".format(cat_tuple[0], feed_subdir)) + manifest_categories.append( + { + "name": cat_tuple[0], + "short-hash": feed_subdir, + "local-path": os.path.join(feed_subdir, OUTPUT_FILE_NAME).replace( + os.sep, "/" + ), + } + ) + print( + "feed for {} output to sub-directory {}".format( + cat_tuple[0], feed_subdir + ) + ) write_manifest(manifest_categories) except PangeaServiceException as error: print(error) diff --git a/pygea/pangeafeed.py b/pygea/pangeafeed.py index c09385e..5e4ca1e 100644 --- a/pygea/pangeafeed.py +++ b/pygea/pangeafeed.py @@ -1,30 +1,31 @@ # pylint: disable-msg=C0103 # pylint: disable-msg=C0201 """ - - * - - Generate a custom RSS feed from Pangea, for a specific domain, with one or more - categories or content filters and an optional supplied content-type. - - * - +- * - +Generate a custom RSS feed from Pangea, for a specific domain, with one or more +categories or content filters and an optional supplied content-type. +- * - """ import os import sys from datetime import datetime + from feedgen.feed import FeedGenerator -from pygea import pangeaservice -from pygea import pexception -from pygea import utilities -VERBOSE = utilities.get_configuration_variable('runtime', 'verbose_p') -OUTPUT_TO_FILE = utilities.get_configuration_variable('results', 'output_to_file_p') -OUTPUT_FILE_NAME = utilities.get_configuration_variable('results', 'output_file_name') -OUTPUT_DIRECTORY = utilities.get_configuration_variable('results', 'output_directory') +from pygea import pangeaservice, pexception, utilities -class PangeaFeed(): +VERBOSE = utilities.get_configuration_variable("runtime", "verbose_p") +OUTPUT_TO_FILE = utilities.get_configuration_variable("results", "output_to_file_p") +OUTPUT_FILE_NAME = utilities.get_configuration_variable("results", "output_file_name") +OUTPUT_DIRECTORY = utilities.get_configuration_variable("results", "output_directory") + + +class PangeaFeed: _domain = None _categories = None - _content_type = 'articles' # default + _content_type = "articles" # default def __init__(self, domain, kw_args): try: @@ -33,45 +34,56 @@ class PangeaFeed(): raise error self._domain = domain - if kw_args.get('categories'): - self._categories = kw_args['categories'] + if kw_args.get("categories"): + self._categories = kw_args["categories"] else: - raise pexception.PangeaServiceException("ERROR: At least one category or content-query is required") - - if kw_args.get('default_content_type'): - if kw_args['default_content_type'] not in self._ps.content_types(): - raise pexception.PangeaServiceException("{} is not a valid content type".format(kw_args['content_type'])) - self._content_type = kw_args['default_content_type'] + raise pexception.PangeaServiceException( + "ERROR: At least one category or content-query is required" + ) + if kw_args.get("default_content_type"): + if kw_args["default_content_type"] not in self._ps.content_types(): + raise pexception.PangeaServiceException( + "{} is not a valid content type".format(kw_args["content_type"]) + ) + self._content_type = kw_args["default_content_type"] def acquire_content(self): self._full_article_list = [] - for (cat, old, type) in self._categories: + for cat, old, type in self._categories: opt_args = {} # special type for this category? if type is None: type = self._content_type # wants old stuff (not configured date limit)? if old is not None: - opt_args['daycount'] = 365 # oldest date = one year - opt_args['filter_date'] = False + opt_args["daycount"] = 365 # oldest date = one year + opt_args["filter_date"] = False ci = self._ps.category_info(cat) if ci is not None: # cat is pre-defined category - opt_args['zoneid'] = ci['id'] + opt_args["zoneid"] = ci["id"] jbody = self._ps.get_content(type, opt_args) else: # cat as actually a free-form query string to be used no article content jbody = self._ps.query_content(cat, opt_args) if len(jbody) == 0: if VERBOSE: - print("no articles available for {} [command: {}] [category/query: '{}'])".format(self._domain, self._content_type, cat)) + print( + "no articles available for {} [command: {}] [category/query: '{}'])".format( + self._domain, self._content_type, cat + ) + ) continue if VERBOSE: - print ("{} articles added from category/query '{}'".format(str(len(jbody)), cat)) + print( + "{} articles added from category/query '{}'".format( + str(len(jbody)), cat + ) + ) for art in jbody: self._full_article_list.append(art) @@ -81,7 +93,7 @@ class PangeaFeed(): # Get preparatory information from the domain's homepage. Most characteristics # of the RSS Channel information are acquired from the homepage metadata. # - md = utilities.get_webpage_metadata('https://' + self._domain) + md = utilities.get_webpage_metadata("https://" + self._domain) fg = FeedGenerator() self._fg = fg @@ -90,31 +102,38 @@ class PangeaFeed(): # build the RSS element # fg.id(utilities.hash_site_metadata(md)) - fg.title(self._content_type + ' from ' + md['og:site_name']) - fg.link(href=md['og:url'], rel='alternate') - fg.description(self._content_type + ' from ' + self._domain + " (" + md['description'] + ")") + fg.title(self._content_type + " from " + md["og:site_name"]) + fg.link(href=md["og:url"], rel="alternate") + fg.description( + self._content_type + + " from " + + self._domain + + " (" + + md["description"] + + ")" + ) # # NOTE: the parameters required for in the are different # from in an - fg.image(url=md['og:image'], title=md['og:site_name'], link=md['og:url']) + fg.image(url=md["og:image"], title=md["og:site_name"], link=md["og:url"]) # # Multiple categories/keywords are allowed in the RSS Channel - keywords = md['keywords'] - categories = keywords.split(',') - sch = 'https://' + self._domain + '/' + keywords = md["keywords"] + categories = keywords.split(",") + sch = "https://" + self._domain + "/" for name in categories: fg.category(term=name, scheme=sch, label=name) - fg.language(md['language']) - fg.generator('Guardian Project Pangea CMS Crawler 1.0') - fg.webMaster('support@guardianproject.info') + fg.language(md["language"]) + fg.generator("Guardian Project Pangea CMS Crawler 1.0") + fg.webMaster("support@guardianproject.info") fg.ttl(60) datetime_obj = datetime.now() - formatted_time = datetime_obj.strftime('%a, %d %b %Y %H:%M:%S %Z') - fg.lastBuildDate(formatted_time + '+0000') + formatted_time = datetime_obj.strftime("%a, %d %b %Y %H:%M:%S %Z") + fg.lastBuildDate(formatted_time + "+0000") # # Build the elements for each and add each item to the RSS Channel @@ -122,71 +141,78 @@ class PangeaFeed(): media_extension_loaded = False for article in self._full_article_list: try: - article_deets = self._ps.get_article_detail(article['id']) + article_deets = self._ps.get_article_detail(article["id"]) rss_article = self._ps.rss_article_from_pangea_article(article_deets) except pexception.PangeaServiceException as error: if VERBOSE: print(error) - print("article with id [{}] may no longer exist in Pangea".format(str(article['id']))) + print( + "article with id [{}] may no longer exist in Pangea".format( + str(article["id"]) + ) + ) continue fe = fg.add_entry() - fe.title(rss_article['title']) - fe.link({'href': rss_article['link']}) - fe.guid(rss_article['guid']) - fe.pubDate(rss_article['pubDate']) - fe.content(rss_article['content']) - if rss_article.get('summary'): - fe.description(rss_article['summary']) + fe.title(rss_article["title"]) + fe.link({"href": rss_article["link"]}) + fe.guid(rss_article["guid"]) + fe.pubDate(rss_article["pubDate"]) + fe.content(rss_article["content"]) + if rss_article.get("summary"): + fe.description(rss_article["summary"]) - if rss_article.get('enclosure'): - enc_md = rss_article['enclosure'] - if enc_md.get('type'): + if rss_article.get("enclosure"): + enc_md = rss_article["enclosure"] + if enc_md.get("type"): fe.enclosure( - url=enc_md['url'], - type=enc_md['type'], - length=enc_md['length']) + url=enc_md["url"], type=enc_md["type"], length=enc_md["length"] + ) else: - fe.enclosure(url=enc_md['url']) + fe.enclosure(url=enc_md["url"]) - if rss_article.get('media_content'): + if rss_article.get("media_content"): # # special handling for the RSS media extension # if not media_extension_loaded: - fg.load_extension('media') + fg.load_extension("media") media_extension_loaded = True - if VERBOSE: print("media extension loaded") + if VERBOSE: + print("media extension loaded") - mc_md = rss_article['media_content'] - if mc_md.get('medium'): + mc_md = rss_article["media_content"] + if mc_md.get("medium"): fe.media.content( - url=mc_md['url'], - type=mc_md['type'], - fileSize=mc_md['fileSize'], - medium=mc_md['medium']) + url=mc_md["url"], + type=mc_md["type"], + fileSize=mc_md["fileSize"], + medium=mc_md["medium"], + ) else: - fe.media.content(url=mc_md['url']) + fe.media.content(url=mc_md["url"]) - - def disgorge(self, subdirectory = None): + def disgorge(self, subdirectory=None): # # Output the RSS feed as appropriate # if OUTPUT_TO_FILE is True: try: if subdirectory is not None: - if not os.path.exists(OUTPUT_DIRECTORY + '/' + subdirectory): - os.makedirs(OUTPUT_DIRECTORY + '/' + subdirectory) - ofile = OUTPUT_DIRECTORY + '/' + subdirectory + '/' + OUTPUT_FILE_NAME + if not os.path.exists(OUTPUT_DIRECTORY + "/" + subdirectory): + os.makedirs(OUTPUT_DIRECTORY + "/" + subdirectory) + ofile = ( + OUTPUT_DIRECTORY + "/" + subdirectory + "/" + OUTPUT_FILE_NAME + ) else: if not os.path.exists(OUTPUT_DIRECTORY): os.makedirs(OUTPUT_DIRECTORY) - ofile = OUTPUT_DIRECTORY + '/' + OUTPUT_FILE_NAME + ofile = OUTPUT_DIRECTORY + "/" + OUTPUT_FILE_NAME self._fg.rss_file(ofile, extensions=True, pretty=True) except OSError as fe: print("for {} file error: ".format(ofile, str(fe))) sys.exit(1) - if VERBOSE: print("output written to {}".format(ofile)) + if VERBOSE: + print("output written to {}".format(ofile)) else: print(self._fg.rss_str(extensions=True, pretty=True)) diff --git a/pygea/pangeaservice.py b/pygea/pangeaservice.py index 44265e5..395f330 100644 --- a/pygea/pangeaservice.py +++ b/pygea/pangeaservice.py @@ -1,40 +1,41 @@ """ - - * - - Interface to USAGM Pangea Content Management System API +- * - +Interface to USAGM Pangea Content Management System API - This implementation is a subset of API functions, focusing on the eventual - creation of RSS (or other) data streams from article selections +This implementation is a subset of API functions, focusing on the eventual +creation of RSS (or other) data streams from article selections - Pangea Documentation: - https://showcase.pangea-cms.com/a/pangea-api-methods-and-models/29663096.html +Pangea Documentation: + https://showcase.pangea-cms.com/a/pangea-api-methods-and-models/29663096.html - :copyright: 2024, David Oliver - :license: http://www.gnu.org/copyleft/lesser.html GNU Lesser General Public License - - * - +:copyright: 2024, David Oliver +:license: http://www.gnu.org/copyleft/lesser.html GNU Lesser General Public License +- * - """ +import hashlib import json import re -import hashlib import urllib.parse -from datetime import datetime, timezone, timedelta +from datetime import datetime, timedelta, timezone + import requests from dateutil.parser import * -from pygea import utilities -from pygea import pexception -from pygea import plogger + +from pygea import pexception, plogger, utilities + class PangeaService: - """ Interface to the Pangea API """ + """Interface to the Pangea API""" - _configuration_file_name = 'pygea.ini' - _api_path = '/api2/' + _configuration_file_name = "pygea.ini" + _api_path = "/api2/" _api_key = None # Pangea and RSS time format - TIME_FMT = "%Y-%m-%dT%H:%M:%S.%f" # ex. 2024-08-02T11:46:28.673 - TIME_FMT_I = "%Y-%m-%dT%H:%M:%S" # ex. 2024-08-02T11:46:28 + TIME_FMT = "%Y-%m-%dT%H:%M:%S.%f" # ex. 2024-08-02T11:46:28.673 + TIME_FMT_I = "%Y-%m-%dT%H:%M:%S" # ex. 2024-08-02T11:46:28 RFC822_FMT = "%a, %d %B %Y %H:%M:%S %z" # API commands - commands commented out are valid in the API but NOT SUPPORTED HERE @@ -42,72 +43,71 @@ class PangeaService: "articledetail", "articles", "audioclips", - #"audioscheduler", + # "audioscheduler", "author", - #"blogitem", + # "blogitem", "breakingnews", - #"comment", + # "comment", "config", - #"documentdetail", + # "documentdetail", "empty", - #"factcheckdetail", - #"htmlwidget", - #"infographicdetail", - #"liveblogs", - #"livestream", + # "factcheckdetail", + # "htmlwidget", + # "infographicdetail", + # "liveblogs", + # "livestream", "mostpopular", - #"polldetail", - #"quizdetail", + # "polldetail", + # "quizdetail", "search", "test", "topstories", "videoclips", - #"videoscheduler", - #"widget", - "zone" + # "videoscheduler", + # "widget", + "zone", ] # Position-indexed content category names _category_types_list = [ - 'none', # 0 internally - 'content', # 1 internally - 'audio', # 2 internally - 'content+audio', # 3 internally; compound type 1+2 - 'media', # 4 internally - 'content+media', # 5 internally; compound type 1+4 - 'audio+media' # 6 internally; compound type 2+4 + "none", # 0 internally + "content", # 1 internally + "audio", # 2 internally + "content+audio", # 3 internally; compound type 1+2 + "media", # 4 internally + "content+media", # 5 internally; compound type 1+4 + "audio+media", # 6 internally; compound type 2+4 ] # Content types (in the editorial sense) # Note these also map to commands in _commands_list _content_types_list = [ - 'articles', - 'audioclips', - 'videoclips', - 'breakingnews', - 'mostpopular', - 'topstories' + "articles", + "audioclips", + "videoclips", + "breakingnews", + "mostpopular", + "topstories", ] # How to format content # (we WILL NOT use these in combination, as defined in the API) _content_options = { - 'WTF_0': 0, # Returns basically what is in database - 'TEXT_ONLY': 1, # Removes all html keeping text only - 'WTF_1': 2, # Returns tags as they would be displayed on the page - 'MOBILE_1': 4, # Returns html as for mobile/rss feeds without - # additional stripping - 'MOBILE_2': 8, # Returns html as for mobile/rss feeds with stripping - # some html that is not supported - 'MOBILE_3': 16, # Returns html as for mobile/rss feeds with some extra - # html tags stripped - 'WTF_2': 32, # Same as for Feeds + replaces recognized links with - # internal links and wraps recognized images inside tags - 'XML_TX': 64, # Used with Feeds to apply xsl transformation - 'JSON': 128 # Generates json structured content + "WTF_0": 0, # Returns basically what is in database + "TEXT_ONLY": 1, # Removes all html keeping text only + "WTF_1": 2, # Returns tags as they would be displayed on the page + "MOBILE_1": 4, # Returns html as for mobile/rss feeds without + # additional stripping + "MOBILE_2": 8, # Returns html as for mobile/rss feeds with stripping + # some html that is not supported + "MOBILE_3": 16, # Returns html as for mobile/rss feeds with some extra + # html tags stripped + "WTF_2": 32, # Same as for Feeds + replaces recognized links with + # internal links and wraps recognized images inside tags + "XML_TX": 64, # Used with Feeds to apply xsl transformation + "JSON": 128, # Generates json structured content } - def __init__(self, domain, key=None, verbose=False): self._logger = plogger.PangeaServiceLogger() @@ -123,13 +123,21 @@ class PangeaService: # # preset from configuration file # - self._max_articles = int(utilities.get_configuration_variable('runtime', 'max_articles')) - self._oldest_article = int(utilities.get_configuration_variable('runtime', 'oldest_article')) - self._content_format = utilities.get_configuration_variable('runtime', 'content_format') - self._authors_p = utilities.get_configuration_variable('runtime', 'authors_p') - self._no_media_p = utilities.get_configuration_variable('runtime', 'no_media_p') - self._content_inc_p = utilities.get_configuration_variable('runtime', 'content_inc_p') - self._verbose_p = utilities.get_configuration_variable('runtime', 'verbose_p') + self._max_articles = int( + utilities.get_configuration_variable("runtime", "max_articles") + ) + self._oldest_article = int( + utilities.get_configuration_variable("runtime", "oldest_article") + ) + self._content_format = utilities.get_configuration_variable( + "runtime", "content_format" + ) + self._authors_p = utilities.get_configuration_variable("runtime", "authors_p") + self._no_media_p = utilities.get_configuration_variable("runtime", "no_media_p") + self._content_inc_p = utilities.get_configuration_variable( + "runtime", "content_inc_p" + ) + self._verbose_p = utilities.get_configuration_variable("runtime", "verbose_p") self._domain = domain @@ -140,13 +148,13 @@ class PangeaService: if verbose: self._verbose_p = verbose if self._verbose_p: - print('verbose output') + print("verbose output") # # These two dictionaries index the category information # _all_categories is indexed by category name; _rev_categories is indexed by id # - self._all_categories = { } - self._rev_categories = {'0': 'none'} + self._all_categories = {} + self._rev_categories = {"0": "none"} # Acquire the categories registered for the supplied domain # Invokes an API call! @@ -157,19 +165,18 @@ class PangeaService: # def set_domain(self, value): - """ Sets the USAGM Internet domain name from which content is acquired """ + """Sets the USAGM Internet domain name from which content is acquired""" self._domain = value # Reset the category dictionaries - self._all_categories = { } - self._rev_categories = {'0': 'none'} + self._all_categories = {} + self._rev_categories = {"0": "none"} # Acquire the categories registered for the supplied domain (API call) self.get_categories() - def set_api_key(self, key): - """ Sets the API key that allows access to the API """ + """Sets the API key that allows access to the API""" self._api_key = key # @@ -177,50 +184,44 @@ class PangeaService: # def content_types(self): - """ Return full list of content types. """ + """Return full list of content types.""" return self._content_types_list - def content_type_name(self, type_index): - """ Returns name of a content type given its index. """ + """Returns name of a content type given its index.""" if type_index > len(self._content_types_list): return False return self._content_types_list[type_index] - def commands(self): - """ Return the list of possible commands. """ + """Return the list of possible commands.""" return self._commands_list - def category_types(self): - """Return of list of possible category types. """ + """Return of list of possible category types.""" return self._category_types_list - def category_info(self, category_name): - """ Return rich information about a category. """ + """Return rich information about a category.""" if self._all_categories.get(category_name): return self._all_categories[category_name] return None def content_options(self): - """ Return the dictionary of content format options. """ + """Return the dictionary of content format options.""" return self._content_options - def is_valid_command(self, cmd): - """ Test if the provided command is valid and implemented. """ + """Test if the provided command is valid and implemented.""" return self._is_implemented(cmd) - def is_valid_category(self, category_name): - """ Test if a provided category name is valid. + """Test if a provided category name is valid. - NOTE: Categories are unique on a per-domain basis, so they are retrieve - via the API when this class is instantiated. There are no "generic" - categories that apply to all domains. - .""" + NOTE: Categories are unique on a per-domain basis, so they are retrieve + via the API when this class is instantiated. There are no "generic" + categories that apply to all domains. + .""" keys = self._all_categories.keys() if category_name in keys: return True @@ -232,115 +233,122 @@ class PangeaService: def rss_article_from_pangea_article(self, article): """ - Use this method to convert an API-returned articledetail definition - to an RSS-appropriate definition. + Use this method to convert an API-returned articledetail definition + to an RSS-appropriate definition. - This method succeeds using the bare article definition, but will be - absent the content field and other descriptors. Enclosures are - returned, however. + This method succeeds using the bare article definition, but will be + absent the content field and other descriptors. Enclosures are + returned, however. """ - rss = { } + rss = {} sh = hashlib.sha256() - sh.update(article['url'].encode('utf8')) - rss['guid'] = sh.hexdigest() - rss['title'] = article['title'] - rss['link'] = article['url'] + sh.update(article["url"].encode("utf8")) + rss["guid"] = sh.hexdigest() + rss["title"] = article["title"] + rss["link"] = article["url"] - if article.get('introduction'): - rss['summary'] = article['introduction'] + if article.get("introduction"): + rss["summary"] = article["introduction"] - if article.get('authors'): - as_str = '' - for auth in article['authors']: - as_str += auth['lastname'] + ", " + auth['firstname'] + ";" - if len(article['authors']) > 1: - as_str = as_str[0: (len(as_str) - 2)] - rss['authors'] = as_str + if article.get("authors"): + as_str = "" + for auth in article["authors"]: + as_str += auth["lastname"] + ", " + auth["firstname"] + ";" + if len(article["authors"]) > 1: + as_str = as_str[0 : (len(as_str) - 2)] + rss["authors"] = as_str - if article.get('image'): + if article.get("image"): # Seek the enclosure details from the image's server - metadata = utilities.get_media_metadata(article['image']) + metadata = utilities.get_media_metadata(article["image"]) if metadata: - rss['enclosure'] = { - 'url': article['image'], - 'type': metadata['content_type'], - 'length': metadata['content_length'] + rss["enclosure"] = { + "url": article["image"], + "type": metadata["content_type"], + "length": metadata["content_length"], } else: - rss['enclosure'] = {'url': article['image']} + rss["enclosure"] = {"url": article["image"]} - if rss.get('enclosure'): + if rss.get("enclosure"): if self._verbose_p: print( "article contains an enclosure:\n" - + json.dumps(rss['enclosure'], indent=4)) + + json.dumps(rss["enclosure"], indent=4) + ) # 'audioclips' and 'videoclips' occasionally have no text content - if article.get('content'): - rss['content'] = article['content'] + if article.get("content"): + rss["content"] = article["content"] else: - rss['content'] = '' + rss["content"] = "" # all articles are required to have one category (their 'zone') - if bool(article.get('zone')): - zone_id = article['zone'] + if bool(article.get("zone")): + zone_id = article["zone"] if isinstance(zone_id, int): zone_id = str(zone_id) - if self._rev_categories.get('zone_id'): - rss['categories'] = self._rev_categories[zone_id] - elif article.get('zoneTitle'): - rss['categories'] = article['zoneTitle'] + if self._rev_categories.get("zone_id"): + rss["categories"] = self._rev_categories[zone_id] + elif article.get("zoneTitle"): + rss["categories"] = article["zoneTitle"] # Pangea time is always in GMT # Pangea time is formatted as: 2024-07-31T11:46:28.673 # (though occasionally: 2024-07-31T11:46:28) # Convert to RSS time (RFC822) - if not article.get('pubDate'): + if not article.get("pubDate"): datetime_obj = datetime.now(timezone.utc) else: - if re.match('.*?([.][0-9]+)$', article['pubDate']): - datetime_obj = datetime.strptime(article['pubDate'], self.TIME_FMT) + if re.match(".*?([.][0-9]+)$", article["pubDate"]): + datetime_obj = datetime.strptime(article["pubDate"], self.TIME_FMT) else: - datetime_obj = datetime.strptime(article['pubDate'], self.TIME_FMT_I) + datetime_obj = datetime.strptime(article["pubDate"], self.TIME_FMT_I) formatted_time = datetime_obj.strftime(self.RFC822_FMT) - rss['pubDate'] = formatted_time + '+0000' + rss["pubDate"] = formatted_time + "+0000" # Media types - if bool(article.get('videos')): - if len(article['videos']) > 0: - url = article['videos'][0]['url'] + if bool(article.get("videos")): + if len(article["videos"]) > 0: + url = article["videos"][0]["url"] metadata = utilities.get_media_metadata(url) if metadata: - rss['media_content'] = { - 'url': url, - 'type': metadata['content_type'], - 'fileSize': metadata['content_length'], - 'medium': 'video' + rss["media_content"] = { + "url": url, + "type": metadata["content_type"], + "fileSize": metadata["content_length"], + "medium": "video", } - if self._verbose_p: print("article contains video media:\n" - + json.dumps(rss['media_content'], indent=4)) + if self._verbose_p: + print( + "article contains video media:\n" + + json.dumps(rss["media_content"], indent=4) + ) else: - rss['media_content'] = {'url': url} + rss["media_content"] = {"url": url} - if bool(article.get('audios')): - if len(article['audios']) > 0: - url = article['audios'][0]['url'] + if bool(article.get("audios")): + if len(article["audios"]) > 0: + url = article["audios"][0]["url"] metadata = utilities.get_media_metadata(url) if metadata: - rss['media_content'] = { - 'url': url, - 'type': metadata['content_type'], - 'fileSize': metadata['content_length'], - 'medium': 'audio' + rss["media_content"] = { + "url": url, + "type": metadata["content_type"], + "fileSize": metadata["content_length"], + "medium": "audio", } - if self._verbose_p: print("article contains audio media:\n" - + json.dumps(rss['media_content'], indent=4)) + if self._verbose_p: + print( + "article contains audio media:\n" + + json.dumps(rss["media_content"], indent=4) + ) else: - rss['media_content'] = {'url': url} + rss["media_content"] = {"url": url} return rss @@ -349,29 +357,26 @@ class PangeaService: # def test_pangea_interface(self): - """ TESTING Basic connectivity test """ - return self._retrieve_content('test') - + """TESTING Basic connectivity test""" + return self._retrieve_content("test") def empty(self): """ - TESTING Returns nothing but, if command formatted properly, with proper API - key, HTTP status will be 200 + TESTING Returns nothing but, if command formatted properly, with proper API + key, HTTP status will be 200 """ - res = self._retrieve_content('empty') + res = self._retrieve_content("empty") return res - def config(self): - """ TESTING Returns configuration information about the API """ - return self._retrieve_content('config') + """TESTING Returns configuration information about the API""" + return self._retrieve_content("config") - - def get_content(self, content_type, optional_args_kw = None): + def get_content(self, content_type, optional_args_kw=None): """ - Use this method to get articles by content type, subset by a specific category - as supplied. See API docs for additional API parameters that can be specified - to reduce the volume of articles returned. + Use this method to get articles by content type, subset by a specific category + as supplied. See API docs for additional API parameters that can be specified + to reduce the volume of articles returned. """ try: res = self._retrieve_content(content_type, optional_args_kw) @@ -381,99 +386,99 @@ class PangeaService: # because Pangea does not uniformly apply 'count' and 'daycount' parameters # to all content generation, we'll do that here (unless we're told to ignore). if optional_args_kw is not None: - if optional_args_kw.get('filter_date') is not None: - if optional_args_kw.get('filter_date') is False: + if optional_args_kw.get("filter_date") is not None: + if optional_args_kw.get("filter_date") is False: return res return self._threshold(res) - - def query_content(self, query, optional_args_kw = None): + def query_content(self, query, optional_args_kw=None): """ - Use this method to get articles based on textual search. - See API docs for additional API parameters that can be specified - to reduce the volume of articles returned. Alternatively, see - docs for the 'pageNumber' parameter to handling a search returning - many articles (only query/search supports this parameter). + Use this method to get articles based on textual search. + See API docs for additional API parameters that can be specified + to reduce the volume of articles returned. Alternatively, see + docs for the 'pageNumber' parameter to handling a search returning + many articles (only query/search supports this parameter). """ # make the topic/category URL-safe if optional_args_kw is None: optional_args_kw = {} - optional_args_kw['q'] = urllib.parse.quote_plus(query) + optional_args_kw["q"] = urllib.parse.quote_plus(query) try: - res = self._retrieve_content('search', optional_args_kw) + res = self._retrieve_content("search", optional_args_kw) except pexception.PangeaServiceException as e: raise pexception.PangeaServiceException(str(e)) from e # # because Pangea does not uniformly apply 'count' and 'daycount' parameters # to all content generation, we'll do that here (unless we're told to ignore). - if optional_args_kw.get('filter_date') is not None: - if optional_args_kw.get('filter_date') is False: + if optional_args_kw.get("filter_date") is not None: + if optional_args_kw.get("filter_date") is False: return res return self._threshold(res) - def get_article(self, article_id, optional_args_kw = None): + def get_article(self, article_id, optional_args_kw=None): """ - Use this method to get all the detail for a given article (typically - required to do anything useful). + Use this method to get all the detail for a given article (typically + required to do anything useful). """ if optional_args_kw is None: optional_args_kw = {} - if 'MediaData' not in optional_args_kw.keys(): - optional_args_kw['MediaData'] = 'true' + if "MediaData" not in optional_args_kw.keys(): + optional_args_kw["MediaData"] = "true" - optional_args_kw['itemid'] = article_id + optional_args_kw["itemid"] = article_id try: - res = self._retrieve_content('articles', optional_args_kw) + res = self._retrieve_content("articles", optional_args_kw) except pexception.PangeaServiceException as e: raise pexception.PangeaServiceException(str(e)) from e - #print(json.dumps(res, indent=4)) + # print(json.dumps(res, indent=4)) return res - def get_article_detail(self, article_id, optional_args_kw = None): + def get_article_detail(self, article_id, optional_args_kw=None): """ - Use this method to get all the detail for a given article (typically - required to do anything useful). + Use this method to get all the detail for a given article (typically + required to do anything useful). """ if optional_args_kw is None: optional_args_kw = {} - if 'Content' not in optional_args_kw.keys(): - optional_args_kw['Content'] = 'true' - if 'MediaData' not in optional_args_kw.keys(): - optional_args_kw['MediaData'] = 'true' + if "Content" not in optional_args_kw.keys(): + optional_args_kw["Content"] = "true" + if "MediaData" not in optional_args_kw.keys(): + optional_args_kw["MediaData"] = "true" - optional_args_kw['itemid'] = article_id + optional_args_kw["itemid"] = article_id try: - res = self._retrieve_content('articledetail', optional_args_kw) + res = self._retrieve_content("articledetail", optional_args_kw) except pexception.PangeaServiceException as e: raise pexception.PangeaServiceException(str(e)) from e - #print(json.dumps(res, indent=4)) + # print(json.dumps(res, indent=4)) return res - - def get_categories(self, types = None): + def get_categories(self, types=None): """ - Categories are defined on a PER DOMAIN basis, so to assure the user - provides a proper category name we need to acquire the full set of - categories before we proceed with any queries. + Categories are defined on a PER DOMAIN basis, so to assure the user + provides a proper category name we need to acquire the full set of + categories before we proceed with any queries. """ if len(self._all_categories.keys()) > 0: return self._all_categories if types is None: - types = 'acm' # get all content types 'a', 'c', 'm' at once + types = "acm" # get all content types 'a', 'c', 'm' at once - args = {'type': types} + args = {"type": types} try: - url = self._build_url('zone', args) + url = self._build_url("zone", args) response = requests.get(url, timeout=20) if response.status_code != 200: - msg = "HTP request to {} failed with status code [{}]".format(self._domain, str(response.status_code)) + msg = "HTP request to {} failed with status code [{}]".format( + self._domain, str(response.status_code) + ) self._logger.error(msg) raise pexception.PangeaServiceException(msg) a_cat = json.loads(response.text) @@ -482,12 +487,14 @@ class PangeaService: all_keys = self._all_categories.keys() for c in a_cat: - if not c['name'] in all_keys: - self._all_categories[c['name']] = c - self._rev_categories[str(c['id'])] = c['name'] + if not c["name"] in all_keys: + self._all_categories[c["name"]] = c + self._rev_categories[str(c["id"])] = c["name"] - if c['type'] >= len(self._category_types_list): - msg = "ERROR: unknown type: {} on id [{}], name: {}".format(c['type'], str(c['id']), c['name']) + if c["type"] >= len(self._category_types_list): + msg = "ERROR: unknown type: {} on id [{}], name: {}".format( + c["type"], str(c["id"]), c["name"] + ) self._logger.warning(msg) raise pexception.PangeaServiceException(msg) @@ -498,38 +505,37 @@ class PangeaService: # def _boolean_string(self, boolean_value): - """ Convert a boolean to a string for the API """ + """Convert a boolean to a string for the API""" if boolean_value is True: - return 'true' - return 'false' + return "true" + return "false" - - def _retrieve_content(self, command, args_kw = None): - """ Minimalist content retriever """ + def _retrieve_content(self, command, args_kw=None): + """Minimalist content retriever""" url = self._build_url(command, args_kw) - #print('request URL: ' + url) + # print('request URL: ' + url) response = requests.get(url, timeout=20) if response.status_code != 200: - msg = "received status code {} from {}".format(str(response.status_code), url) + msg = "received status code {} from {}".format( + str(response.status_code), url + ) self._logger.error(msg) raise pexception.PangeaServiceException(msg) - if command == 'empty': - return json.loads('[]') + if command == "empty": + return json.loads("[]") return json.loads(response.text) - def _is_implemented(self, cmd): - """ Test if a provided string references an actual command """ + """Test if a provided string references an actual command""" if cmd in self._commands_list: return True return False - def _threshold(self, articles): - """ Assure article-count and oldest-article settings are obeyed. Turns out, - only a few API commands accept these arguments, though in general our - usage of the API requires it to be consistent. + """Assure article-count and oldest-article settings are obeyed. Turns out, + only a few API commands accept these arguments, though in general our + usage of the API requires it to be consistent. """ output = [] article_count = 0 @@ -539,35 +545,44 @@ class PangeaService: # # pubDate may contain milliseconds, or not # - if re.match('.*?([.][0-9]+)$', blob['pubDate']): - dt = datetime.strptime(blob['pubDate'], self.TIME_FMT) + if re.match(".*?([.][0-9]+)$", blob["pubDate"]): + dt = datetime.strptime(blob["pubDate"], self.TIME_FMT) dt.replace(microsecond=0) else: - dt = datetime.strptime(blob['pubDate'], self.TIME_FMT_I) + dt = datetime.strptime(blob["pubDate"], self.TIME_FMT_I) old_dt = datetime.now() - delta if dt < old_dt: if self._verbose_p: - print("article with ID {} is too old [{}]".format(str(blob['id']), dt.strftime(self.TIME_FMT_I))) + print( + "article with ID {} is too old [{}]".format( + str(blob["id"]), dt.strftime(self.TIME_FMT_I) + ) + ) else: article_count += 1 output.append(blob) if self._verbose_p & (len(output) < len(articles)): - print("request returned {} articles; newest {} processed".format(str(len(articles)), str(len(output)))) + print( + "request returned {} articles; newest {} processed".format( + str(len(articles)), str(len(output)) + ) + ) reordered = output[::-1] return reordered - - def _build_url(self, cmd, args_kw = None): - """ Construct a properly-formatted Pangea API URL """ + def _build_url(self, cmd, args_kw=None): + """Construct a properly-formatted Pangea API URL""" if not self._is_implemented(cmd): msg = "ERROR: command [{}] NOT IMPLEMENTED".format(cmd) self._logger.error(msg) raise pexception.PangeaServiceException(msg) if not self._api_key: - msg = "ERROR: no API key supplied (check config file {})".format(self._configuration_file_name) + msg = "ERROR: no API key supplied (check config file {})".format( + self._configuration_file_name + ) self._logger.error(msg) raise pexception.PangeaServiceException(msg) @@ -576,72 +591,99 @@ class PangeaService: # this switch verifies (and/or completes) the argument array match cmd: - #simple commands - case 'empty' | 'test': + # simple commands + case "empty" | "test": pass # search - case 'search': - if 'q' not in args_kw.keys(): + case "search": + if "q" not in args_kw.keys(): msg = "ERROR: [{}] requires parameter 'q'".format(cmd) self._logger.error(msg) raise pexception.PangeaServiceException(msg) - if 'Authors' not in args_kw.keys(): - args_kw['Authors'] = self._boolean_string(self._authors_p) + if "Authors" not in args_kw.keys(): + args_kw["Authors"] = self._boolean_string(self._authors_p) - if 'count' not in args_kw.keys(): - args_kw['count'] = self._max_articles + if "count" not in args_kw.keys(): + args_kw["count"] = self._max_articles - if 'daycount' not in args_kw.keys(): - args_kw['daycount'] = self._oldest_article + if "daycount" not in args_kw.keys(): + args_kw["daycount"] = self._oldest_article # single-item/detail commands - case 'articledetail' | 'blogitem' | 'comment' | 'author' | 'documentdetail' | 'factcheckdetail' | 'infographicdetail' | 'polldetail' | 'quizdetail': - if 'itemid' not in args_kw.keys(): + case ( + "articledetail" + | "blogitem" + | "comment" + | "author" + | "documentdetail" + | "factcheckdetail" + | "infographicdetail" + | "polldetail" + | "quizdetail" + ): + if "itemid" not in args_kw.keys(): msg = "ERROR: [{}] command requires arg 'itemid'".format(cmd) self._logger.error(msg) raise pexception.PangeaServiceException(msg) - if 'Content' not in args_kw.keys(): - args_kw['Content'] = self._boolean_string(self._content_inc_p) + if "Content" not in args_kw.keys(): + args_kw["Content"] = self._boolean_string(self._content_inc_p) - if 'Authors' not in args_kw.keys(): - args_kw['Authors'] = self._boolean_string(self._authors_p) + if "Authors" not in args_kw.keys(): + args_kw["Authors"] = self._boolean_string(self._authors_p) - if 'html' not in args_kw.keys(): - args_kw['html'] = self._content_options[self._content_format] + if "html" not in args_kw.keys(): + args_kw["html"] = self._content_options[self._content_format] - case 'authorid': - if 'authorid' not in args_kw.keys(): + case "authorid": + if "authorid" not in args_kw.keys(): msg = "ERROR: [{}] command requires arg 'authorid'".format(cmd) self._logger.error(msg) raise pexception.PangeaServiceException(msg) - case 'zone': - if ('zoneid' not in args_kw.keys()) & ('type' not in args_kw.keys()): - msg = "ERROR: [{}] command requires args 'zoneid' or 'type'".format(cmd) + case "zone": + if ("zoneid" not in args_kw.keys()) & ("type" not in args_kw.keys()): + msg = "ERROR: [{}] command requires args 'zoneid' or 'type'".format( + cmd + ) self._logger.error(msg) raise pexception.PangeaServiceException(msg) # content commands - case 'articles' | 'audioclips' | 'videoclips' | 'breakingnews' | 'mostpopular' | 'topstories' | 'blogitem': - if 'Authors' not in args_kw.keys(): - args_kw['Authors'] = self._boolean_string(self._authors_p) + case ( + "articles" + | "audioclips" + | "videoclips" + | "breakingnews" + | "mostpopular" + | "topstories" + | "blogitem" + ): + if "Authors" not in args_kw.keys(): + args_kw["Authors"] = self._boolean_string(self._authors_p) - if 'count' not in args_kw.keys(): - args_kw['count'] = self._max_articles + if "count" not in args_kw.keys(): + args_kw["count"] = self._max_articles - if 'daycount' not in args_kw.keys(): - args_kw['daycount'] = self._oldest_article + if "daycount" not in args_kw.keys(): + args_kw["daycount"] = self._oldest_article # base for all types of command (apikey needs to be first arg) - url = "https://" + self._domain + self._api_path + cmd + '?apikey=' + self._api_key + url = ( + "https://" + + self._domain + + self._api_path + + cmd + + "?apikey=" + + self._api_key + ) # process the arg array to finish construction of the URL for key, value in args_kw.items(): # remove this one - if key == 'filter_date': + if key == "filter_date": continue if not isinstance(key, str): @@ -649,7 +691,7 @@ class PangeaService: if not isinstance(value, str): value = str(value) - url += '&' + key + '=' + value + url += "&" + key + "=" + value if self._verbose_p: print("URL for request: " + url) diff --git a/pygea/pexception.py b/pygea/pexception.py index deb2ad0..c327817 100644 --- a/pygea/pexception.py +++ b/pygea/pexception.py @@ -1,6 +1,7 @@ """ - A less-generic Exception for the Pangea API Service +A less-generic Exception for the Pangea API Service """ + class PangeaServiceException(Exception): - """ An Exception specific to this API """ + """An Exception specific to this API""" diff --git a/pygea/plogger.py b/pygea/plogger.py index 73845a4..cf09bf0 100644 --- a/pygea/plogger.py +++ b/pygea/plogger.py @@ -1,56 +1,60 @@ """ - Logger for the Pangea API Service +Logger for the Pangea API Service """ + import logging + from pygea import utilities + class PangeaServiceLogger: """ - Mostly, so that someone can replace this with a production logger later. + Mostly, so that someone can replace this with a production logger later. """ - _configuration_file_name = 'pygea.ini' + _configuration_file_name = "pygea.ini" _levels = { "NOTSET": 0, "DEBUG": 10, "INFO": 20, "WARNING": 30, "ERROR": 40, - "CRITICAL": 50 + "CRITICAL": 50, } def __init__(self): # # preset from configuration file # - lf = utilities.get_configuration_variable('logging', 'log_file') - dl = utilities.get_configuration_variable('logging', 'default_log_level') + lf = utilities.get_configuration_variable("logging", "log_file") + dl = utilities.get_configuration_variable("logging", "default_log_level") if (dl is None) | (dl not in self._levels): - dl = 'DEBUG' + dl = "DEBUG" - self._logger = logging.getLogger('PangeaLogger') + self._logger = logging.getLogger("PangeaLogger") self._logger.propagate = False logging.basicConfig( filename=lf, level=self._levels[dl], - format='[%(asctime)s] %(levelname)s: %(message)s') + format="[%(asctime)s] %(levelname)s: %(message)s", + ) def debug(self, message): - """ Debug message """ + """Debug message""" self._logger.debug(message) def info(self, message): - """ Info message """ + """Info message""" self._logger.info(message) def warning(self, message): - """ Warning message """ + """Warning message""" self._logger.warning(message) def error(self, message): - """ Error message """ + """Error message""" self._logger.error(message) def critical(self, message): - """ Critical message """ + """Critical message""" self._logger.critical(message) diff --git a/pygea/utilities.py b/pygea/utilities.py index 2832b9f..1b375ec 100644 --- a/pygea/utilities.py +++ b/pygea/utilities.py @@ -1,47 +1,53 @@ # pylint: disable-msg=C0201 """ - - * - - Utilities for the Pangea CMS Service API +- * - +Utilities for the Pangea CMS Service API - - * - +- * - """ import hashlib import os +from configparser import ConfigParser, NoOptionError, NoSectionError from urllib.parse import urlparse -from configparser import ConfigParser, NoSectionError, NoOptionError + import requests from bs4 import BeautifulSoup + def acquire(url): - """ Simple wrapper over the request object. """ + """Simple wrapper over the request object.""" response = requests.get(url, timeout=20) # Check if the request was successful if response.status_code == 200: content = response.text else: - print("Failed to retrieve the web page. Status code: " + str(response.status_code)) + print( + "Failed to retrieve the web page. Status code: " + str(response.status_code) + ) return None return content + def parse_url_elements(url): - """ URL hackery - returns domain and Pangea article ID from a provided URL """ + """URL hackery - returns domain and Pangea article ID from a provided URL""" out = {} parts = urlparse(url) - out['domain'] = parts.hostname + out["domain"] = parts.hostname # article ID is the file name at the end of the path ('324534.html') - more_parts = parts.path.split('/') - file = more_parts[len(more_parts)-1] - file_parts = file.split('.') - out['article_id'] = file_parts[0] + more_parts = parts.path.split("/") + file = more_parts[len(more_parts) - 1] + file_parts = file.split(".") + out["article_id"] = file_parts[0] return out + def get_webpage_metadata(page_url): - """ Get HTML metadata elements from a webpage. """ + """Get HTML metadata elements from a webpage.""" parsed = urlparse(page_url) domain = parsed.netloc # @@ -50,146 +56,154 @@ def get_webpage_metadata(page_url): # html_content = acquire(page_url) if html_content == None: - return None + return None - soup = BeautifulSoup(html_content, 'html.parser') - meta_tags = soup.find_all('meta') + soup = BeautifulSoup(html_content, "html.parser") + meta_tags = soup.find_all("meta") metadata = {} for tag in meta_tags: - if 'name' in tag.attrs: - name = tag.attrs['name'] - content = tag.attrs.get('content', '') + if "name" in tag.attrs: + name = tag.attrs["name"] + content = tag.attrs.get("content", "") metadata[name] = content - elif 'property' in tag.attrs: # For OpenGraph metadata - prop = tag.attrs['property'] - content = tag.attrs.get('content', '') + elif "property" in tag.attrs: # For OpenGraph metadata + prop = tag.attrs["property"] + content = tag.attrs.get("content", "") metadata[prop] = content # add useful language property - html = soup.find_all('html') - metadata['language'] = html[0]['lang'] + html = soup.find_all("html") + metadata["language"] = html[0]["lang"] # add links - link_tags = soup.find_all('link') + link_tags = soup.find_all("link") for tag in link_tags: - if 'rel' in tag.attrs: - #print(json.dumps(tag.attrs, indent=4)) - if 'alternate' in tag.attrs['rel']: - if 'icon' in tag.attrs['rel']: - metadata['favicon'] = 'https://' + domain + tag.attrs.get('href') - if tag.attrs['rel'][0] == 'canonical': - metadata['canonical'] = tag.attrs.get('href') + if "rel" in tag.attrs: + # print(json.dumps(tag.attrs, indent=4)) + if "alternate" in tag.attrs["rel"]: + if "icon" in tag.attrs["rel"]: + metadata["favicon"] = "https://" + domain + tag.attrs.get("href") + if tag.attrs["rel"][0] == "canonical": + metadata["canonical"] = tag.attrs.get("href") return metadata + def get_media_metadata(image_url): - """ Get metadata for media content from website (via response headers). """ + """Get metadata for media content from website (via response headers).""" response = requests.head(image_url, timeout=20) meta = None if response.status_code == 200: meta = { - "content_type": response.headers['Content-Type'], - "content_length": response.headers['Content-Length'] + "content_type": response.headers["Content-Type"], + "content_length": response.headers["Content-Length"], } return meta -def make_boolean(bool_str): - """ Convert a boolean string to an actual Boolean. """ - in_str = bool_str.lower() - if (in_str != 'true') & (in_str != 'false'): - return True # following Python conventions - if in_str == 'true': +def make_boolean(bool_str): + """Convert a boolean string to an actual Boolean.""" + in_str = bool_str.lower() + if (in_str != "true") & (in_str != "false"): + return True # following Python conventions + + if in_str == "true": return True return False + def get_api_key(): - """ Return the API key. PYGEA_API_KEY env var takes precedence over pygea.ini. - Returns None if neither source provides a value. """ - env_key = os.environ.get('PYGEA_API_KEY') + """Return the API key. PYGEA_API_KEY env var takes precedence over pygea.ini. + Returns None if neither source provides a value.""" + env_key = os.environ.get("PYGEA_API_KEY") if env_key: return env_key config = ConfigParser() - config.read('pygea.ini') + config.read("pygea.ini") try: - return config.get('runtime', 'api_key') + return config.get("runtime", "api_key") except (NoSectionError, NoOptionError): return None + def get_configuration_variable(section, vname): - """ Retrieve values from the configuration file. """ + """Retrieve values from the configuration file.""" config = ConfigParser() - config.read('pygea.ini') + config.read("pygea.ini") value = config.get(section, vname) - if (value == 'True') | (value == 'False'): + if (value == "True") | (value == "False"): value = make_boolean(value) return value + def is_domain_name(domain): - """ Does the provided string resemble a domain name? """ + """Does the provided string resemble a domain name?""" if any(char in domain for char in "."): return True return False + def hash_site_metadata(metadata): - """ Create a secure hash of website HTTP meta headers to use as an RSS/ATOM ID. """ + """Create a secure hash of website HTTP meta headers to use as an RSS/ATOM ID.""" sh = hashlib.sha256() for key in metadata.keys(): - sh.update(key.encode('utf8') + metadata[key].encode('utf8')) + sh.update(key.encode("utf8") + metadata[key].encode("utf8")) digest = sh.hexdigest() return digest + def rss_namespace_supported(prop): - """ Determine if a provided RSS/XML namespace is valid in the FeedGen RSS package. """ + """Determine if a provided RSS/XML namespace is valid in the FeedGen RSS package.""" supported_namespaces = [ - 'dc', - 'geo', - 'gen_entry', - 'media', - 'podcast', - 'podcast_entry', - 'syndication', - 'torrent' + "dc", + "geo", + "gen_entry", + "media", + "podcast", + "podcast_entry", + "syndication", + "torrent", ] if prop in supported_namespaces: return True return False + def rss_namespace_for_property(prop): - """ Returns the XML namespace for a specified or - property from among a list of the most popular namespace schemes - according to: - https://www.rssboard.org/news/168/rss-channel-element-usage-stats - For an exhaustive list of namespace schemes see: - https://validator.w3.org/feed/docs/howto/declare_namespaces.html + """Returns the XML namespace for a specified or + property from among a list of the most popular namespace schemes + according to: + https://www.rssboard.org/news/168/rss-channel-element-usage-stats + For an exhaustive list of namespace schemes see: + https://validator.w3.org/feed/docs/howto/declare_namespaces.html """ known_namespaces = { - 'content': 'http://purl.org/rss/1.0/modules/content/', # content - 'dc': 'http://purl.org/dc/elements/1.1/', # Dublin Core - 'atom': 'http://www.w3.org/2005/Atom', # ATOM - 'sy': 'http://purl.org/rss/1.0/modules/syndication/', # Syndication - 'admin': 'http://webns.net/mvcb/', - 'feedburner': 'http://rssnamespace.org/feedburner/ext/1.0', # Feedburner - 'cc': 'http://web.resource.org/cc/', # copyrights - 'geo': 'http://www.w3.org/2003/01/geo/wgs84_pos#', - 'opensearch': 'http://a9.com/-/spec/opensearch/1.1/', # OpenSearch - 'itunes': 'http://www.itunes.com/dtds/podcast-1.0.dtd', # Apple iTunes - 'blogChannel': 'http://backend.userland.com/blogChannelModule', # BlogChannel - 'media': 'http://search.yahoo.com/mrss/', # media RSS - 'icbm': 'http://postneo.com/icbm', # ICBM - 'cf': 'http://www.microsoft.com/schemas/rss/core/2005', # a Microsoft thing - 'podcast': 'https://podcastindex.org/namespace/1.0', # Podcast RSS - 'xhtml': 'http://www.w3.org/1999/xhtml' # XHTML + "content": "http://purl.org/rss/1.0/modules/content/", # content + "dc": "http://purl.org/dc/elements/1.1/", # Dublin Core + "atom": "http://www.w3.org/2005/Atom", # ATOM + "sy": "http://purl.org/rss/1.0/modules/syndication/", # Syndication + "admin": "http://webns.net/mvcb/", + "feedburner": "http://rssnamespace.org/feedburner/ext/1.0", # Feedburner + "cc": "http://web.resource.org/cc/", # copyrights + "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#", + "opensearch": "http://a9.com/-/spec/opensearch/1.1/", # OpenSearch + "itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd", # Apple iTunes + "blogChannel": "http://backend.userland.com/blogChannelModule", # BlogChannel + "media": "http://search.yahoo.com/mrss/", # media RSS + "icbm": "http://postneo.com/icbm", # ICBM + "cf": "http://www.microsoft.com/schemas/rss/core/2005", # a Microsoft thing + "podcast": "https://podcastindex.org/namespace/1.0", # Podcast RSS + "xhtml": "http://www.w3.org/1999/xhtml", # XHTML } - components = prop.split(':') + components = prop.split(":") if known_namespaces.get(components[0]): return known_namespaces[components[0]] diff --git a/treefmt.nix b/treefmt.nix new file mode 100644 index 0000000..38bc574 --- /dev/null +++ b/treefmt.nix @@ -0,0 +1,16 @@ +_: { + projectRootFile = "flake.nix"; + + programs = { + nixfmt.enable = true; + + black.enable = true; + + isort = { + enable = true; + profile = "black"; + }; + + shfmt.enable = true; + }; +}