diff --git a/app/terraform/block_roskomsvoboda.py b/app/terraform/block_roskomsvoboda.py index d4b60c0..0347ae8 100644 --- a/app/terraform/block_roskomsvoboda.py +++ b/app/terraform/block_roskomsvoboda.py @@ -4,15 +4,22 @@ from io import BytesIO from typing import Any, Optional from zipfile import ZipFile, BadZipFile -import lxml +import lxml # nosec: B410 import requests -from lxml.etree import XMLSyntaxError +from lxml.etree import XMLSyntaxError # nosec: B410 from app.extensions import db from app.models.activity import Activity from app.models.tfstate import TerraformState from app.terraform.block_mirror import BlockMirrorAutomation +# TODO: Security considerations for lxml +# +# This module makes use of lxml for parsing XML. There are some known issues relating to +# malicious XML being crafted to exploit XML parses such that they will exhaust available +# CPU and RAM. Here we use the event-driven parser and disable entity resolution so this +# should help to reduce the risks however a more in-depth review would be good in the future. + class BlockRoskomsvobodaAutomation(BlockMirrorAutomation): """ @@ -31,11 +38,10 @@ class BlockRoskomsvobodaAutomation(BlockMirrorAutomation): _data: Any - def _fetch(self, latest_rev) -> None: + def _fetch(self, latest_rev: str) -> None: self._data = None try: - r = requests.get(f"https://dumps.rublacklist.net/fetch/{latest_rev}", - verify=False, timeout=180) + r = requests.get(f"https://dumps.rublacklist.net/fetch/{latest_rev}", timeout=180) r.raise_for_status() zip_file = ZipFile(BytesIO(r.content)) self._data = zip_file.read("dump.xml") @@ -70,8 +76,7 @@ class BlockRoskomsvobodaAutomation(BlockMirrorAutomation): latest_metadata = {"dump_rev": "0"} else: latest_metadata = json.loads(state.state) - latest_rev = requests.get("https://dumps.rublacklist.net/fetch/latest", - verify=False, timeout=30).text.strip() + latest_rev = requests.get("https://dumps.rublacklist.net/fetch/latest", timeout=30).text.strip() logging.debug("Latest revision is %s, already got %s", latest_rev, latest_metadata["dump_rev"]) if latest_rev != latest_metadata["dump_rev"]: state.state = json.dumps({"dump_rev": latest_rev}) @@ -85,7 +90,8 @@ class BlockRoskomsvobodaAutomation(BlockMirrorAutomation): logging.debug("No new data to parse") return try: - for _event, element in lxml.etree.iterparse(BytesIO(self._data)): + for _event, element in lxml.etree.iterparse(BytesIO(self._data), + resolve_entities=False): if element.tag == "domain": self.patterns.append("https://" + element.text.strip()) except XMLSyntaxError: