import sys from datetime import datetime, timezone from typing import List, NotRequired, Optional, TypedDict from cryptography import x509 from flask import Blueprint, abort, jsonify, request from flask.typing import ResponseReturnValue from sqlalchemy import exc from app.api.util import ( DOMAIN_NAME_REGEX, MAX_ALLOWED_ITEMS, MAX_DOMAIN_NAME_LENGTH, ListFilter, get_single_resource, list_resources, validate_description, ) from app.extensions import db from app.models.base import Group from app.models.onions import Onion from app.util.onion import decode_onion_keys, onion_hostname from app.util.x509 import validate_tls_keys api_onion = Blueprint("api_onion", __name__) @api_onion.route("/onion", methods=["GET"]) def list_onions() -> ResponseReturnValue: domain_name_filter = request.args.get("DomainName") group_id_filter = request.args.get("GroupId") filters: List[ListFilter] = [(Onion.destroyed.is_(None))] if domain_name_filter: if len(domain_name_filter) > MAX_DOMAIN_NAME_LENGTH: abort( 400, description=f"DomainName cannot exceed {MAX_DOMAIN_NAME_LENGTH} characters.", ) if not DOMAIN_NAME_REGEX.match(domain_name_filter): abort(400, description="DomainName contains invalid characters.") filters.append(Onion.domain_name.ilike(f"%{domain_name_filter}%")) if group_id_filter: try: filters.append(Onion.group_id == int(group_id_filter)) except ValueError: abort(400, description="GroupId must be a valid integer.") return list_resources( Onion, lambda onion: onion.to_dict(), filters=filters, resource_name="OnionsList", max_allowed_items=MAX_ALLOWED_ITEMS, protective_marking="amber", ) class CreateOnionRequest(TypedDict): DomainName: str Description: str GroupId: int OnionPrivateKey: str OnionPublicKey: str TlsPrivateKey: str TlsCertificate: str SkipChainVerification: NotRequired[bool] SkipNameVerification: NotRequired[bool] @api_onion.route("/onion", methods=["POST"]) def create_onion() -> ResponseReturnValue: data: Optional[CreateOnionRequest] = request.json if not data: abort(400) errors = [] for field in [ "DomainName", "Description", "OnionPrivateKey", "OnionPublicKey", "GroupId", "TlsPrivateKey", "TlsCertificate", ]: if not data.get(field): errors.append( { "Error": f"{field}_missing", "Message": f"Missing required field: {field}", } ) onion_private_key, onion_public_key, onion_errors = decode_onion_keys( data["OnionPrivateKey"], data["OnionPublicKey"] ) if onion_errors: errors.extend(onion_errors) if onion_public_key is None: return jsonify({"Errors": errors}), 400 if onion_private_key: existing_onion = ( db.session.query(Onion) .where( Onion.onion_private_key == onion_private_key, Onion.destroyed.is_(None), ) .first() ) if existing_onion: errors.append( { "Error": "duplicate_onion_key", "Message": "An onion service with this private key already exists.", } ) if "GroupId" in data: group = Group.query.get(data["GroupId"]) if not group: errors.append( {"Error": "group_id_not_found", "Message": "Invalid group ID."} ) chain, san_list, tls_errors = validate_tls_keys( data["TlsPrivateKey"], data["TlsCertificate"], data.get("SkipChainVerification"), data.get("SkipNameVerification"), f"{onion_hostname(onion_public_key)}.onion", ) if tls_errors: errors.extend(tls_errors) if errors: return jsonify({"Errors": errors}), 400 cert_expiry_date = chain[0].not_valid_after if chain else None onion = Onion( domain_name=data["DomainName"], description=data["Description"], onion_private_key=onion_private_key, onion_public_key=onion_public_key, tls_private_key=data["TlsPrivateKey"].encode("utf-8"), tls_public_key=data["TlsCertificate"].encode("utf-8"), group_id=data["GroupId"], added=datetime.now(timezone.utc), updated=datetime.now(timezone.utc), cert_expiry=cert_expiry_date, cert_sans=",".join(san_list), ) try: db.session.add(onion) db.session.commit() return ( jsonify({"Message": "Onion service created successfully.", "Id": onion.id}), 201, ) except exc.SQLAlchemyError as e: return ( jsonify({"Errors": [{"Error": "database_error", "Message": str(e)}]}), 500, ) class UpdateOnionRequest(TypedDict): Description: NotRequired[str] TlsPrivateKey: NotRequired[str] TlsCertificate: NotRequired[str] SkipChainVerification: NotRequired[bool] SkipNameVerification: NotRequired[bool] @api_onion.route("/onion/", methods=["PUT"]) def update_onion(onion_id: int) -> ResponseReturnValue: data: Optional[UpdateOnionRequest] = request.json if not data: abort(400) errors = [] onion = Onion.query.get(onion_id) if not onion: return ( jsonify( { "Errors": [ { "Error": "onion_not_found", "Message": f"No Onion service found with ID {onion_id}", } ] } ), 404, ) if "Description" in data: description = data["Description"] print(f"Description {description}", file=sys.stderr) if validate_description(description): onion.description = description else: errors.append( { "Error": "description_error", "Message": "Description field is invalid", } ) tls_private_key_pem: Optional[str] = None tls_certificate_pem: Optional[str] = None chain: Optional[List[x509.Certificate]] = None san_list: Optional[List[str]] = None if "TlsCertificate" in data: tls_certificate_pem = data.get("TlsCertificate") if "TlsPrivateKey" in data: tls_private_key_pem = data.get("TlsPrivateKey") else: tls_private_key_pem = onion.tls_private_key.decode("utf-8") chain, san_list, tls_errors = validate_tls_keys( tls_private_key_pem, tls_certificate_pem, data.get("SkipChainVerification", False), data.get("SkipNameVerification", False), f"{onion_hostname(onion.onion_public_key)}.onion", ) if tls_errors: errors.extend(tls_errors) if errors: return jsonify({"Errors": errors}), 400 if tls_private_key_pem: onion.tls_private_key = tls_private_key_pem.encode("utf-8") if tls_certificate_pem and san_list: onion.tls_public_key = tls_certificate_pem.encode("utf-8") onion.cert_expiry_date = chain[0].not_valid_after_utc if chain else None onion.cert_sans = ",".join(san_list) onion.updated = datetime.now(timezone.utc) try: db.session.commit() return jsonify({"Message": "Onion service updated successfully."}), 200 except exc.SQLAlchemyError as e: return ( jsonify({"Errors": [{"Error": "database_error", "Message": str(e)}]}), 500, ) @api_onion.route("/onion/", methods=["GET"]) def get_onion(onion_id: int) -> ResponseReturnValue: return get_single_resource(Onion, onion_id, "Onion")