import sys from datetime import datetime, timezone from typing import List, TypedDict, NotRequired, Optional from cryptography import x509 from flask import request, abort, jsonify, Blueprint from flask.typing import ResponseReturnValue from sqlalchemy import exc from app.extensions import db from app.api.util import ListFilter, MAX_DOMAIN_NAME_LENGTH, DOMAIN_NAME_REGEX, list_resources, MAX_ALLOWED_ITEMS, \ validate_description, get_single_resource from app.models.base import Group from app.models.onions import Onion from app.util.onion import onion_hostname, decode_onion_keys from app.util.x509 import validate_tls_keys api_onion = Blueprint('api_onion', __name__) @api_onion.route('/onion', methods=['GET']) def list_onions() -> ResponseReturnValue: domain_name_filter = request.args.get('DomainName') group_id_filter = request.args.get('GroupId') filters: List[ListFilter] = [ (Onion.destroyed.is_(None)) ] if domain_name_filter: if len(domain_name_filter) > MAX_DOMAIN_NAME_LENGTH: abort(400, description=f"DomainName cannot exceed {MAX_DOMAIN_NAME_LENGTH} characters.") if not DOMAIN_NAME_REGEX.match(domain_name_filter): abort(400, description="DomainName contains invalid characters.") filters.append(Onion.domain_name.ilike(f"%{domain_name_filter}%")) if group_id_filter: try: filters.append(Onion.group_id == int(group_id_filter)) except ValueError: abort(400, description="GroupId must be a valid integer.") return list_resources( Onion, lambda onion: onion.to_dict(), filters=filters, resource_name='OnionsList', max_allowed_items=MAX_ALLOWED_ITEMS, protective_marking='amber', ) class CreateOnionRequest(TypedDict): DomainName: str Description: str GroupId: int OnionPrivateKey: str OnionPublicKey: str TlsPrivateKey: str TlsCertificate: str SkipChainVerification: NotRequired[bool] SkipNameVerification: NotRequired[bool] @api_onion.route("/onion", methods=["POST"]) def create_onion() -> ResponseReturnValue: data: Optional[CreateOnionRequest] = request.json if not data: abort(400) errors = [] for field in ["DomainName", "Description", "OnionPrivateKey", "OnionPublicKey", "GroupId", "TlsPrivateKey", "TlsCertificate"]: if not data.get(field): errors.append({"Error": f"{field}_missing", "Message": f"Missing required field: {field}"}) onion_private_key, onion_public_key, onion_errors = decode_onion_keys(data["OnionPrivateKey"], data["OnionPublicKey"]) if onion_errors: errors.extend(onion_errors) if onion_public_key is None: return jsonify({"Errors": errors}), 400 if onion_private_key: existing_onion = db.session.query(Onion).where( Onion.onion_private_key == onion_private_key, Onion.destroyed.is_(None), ).first() if existing_onion: errors.append( {"Error": "duplicate_onion_key", "Message": "An onion service with this private key already exists."}) if "GroupId" in data: group = Group.query.get(data["GroupId"]) if not group: errors.append({"Error": "group_id_not_found", "Message": "Invalid group ID."}) chain, san_list, tls_errors = validate_tls_keys( data["TlsPrivateKey"], data["TlsCertificate"], data.get("SkipChainVerification"), data.get("SkipNameVerification"), f"{onion_hostname(onion_public_key)}.onion" ) if tls_errors: errors.extend(tls_errors) if errors: return jsonify({"Errors": errors}), 400 cert_expiry_date = chain[0].not_valid_after if chain else None onion = Onion( domain_name=data["DomainName"], description=data["Description"], onion_private_key=onion_private_key, onion_public_key=onion_public_key, tls_private_key=data["TlsPrivateKey"].encode("utf-8"), tls_public_key=data["TlsCertificate"].encode("utf-8"), group_id=data["GroupId"], added=datetime.now(timezone.utc), updated=datetime.now(timezone.utc), cert_expiry=cert_expiry_date, cert_sans=",".join(san_list) ) try: db.session.add(onion) db.session.commit() return jsonify({"Message": "Onion service created successfully.", "Id": onion.id}), 201 except exc.SQLAlchemyError as e: return jsonify({"Errors": [{"Error": "database_error", "Message": str(e)}]}), 500 class UpdateOnionRequest(TypedDict): Description: NotRequired[str] TlsPrivateKey: NotRequired[str] TlsCertificate: NotRequired[str] SkipChainVerification: NotRequired[bool] SkipNameVerification: NotRequired[bool] @api_onion.route("/onion/", methods=["PUT"]) def update_onion(onion_id: int) -> ResponseReturnValue: data: Optional[UpdateOnionRequest] = request.json if not data: abort(400) errors = [] onion = Onion.query.get(onion_id) if not onion: return jsonify( {"Errors": [{"Error": "onion_not_found", "Message": f"No Onion service found with ID {onion_id}"}]}), 404 if "Description" in data: description = data["Description"] print(f"Description {description}", file=sys.stderr) if validate_description(description): onion.description = description else: errors.append({"Error": "description_error", "Message": "Description field is invalid"}) tls_private_key_pem: Optional[str] = None tls_certificate_pem: Optional[str] = None chain: Optional[List[x509.Certificate]] = None san_list: Optional[List[str]] = None if "TlsCertificate" in data: tls_certificate_pem = data.get("TlsCertificate") if "TlsPrivateKey" in data: tls_private_key_pem = data.get("TlsPrivateKey") else: tls_private_key_pem = onion.tls_private_key.decode("utf-8") chain, san_list, tls_errors = validate_tls_keys( tls_private_key_pem, tls_certificate_pem, data.get("SkipChainVerification", False), data.get("SkipNameVerification", False), f"{onion_hostname(onion.onion_public_key)}.onion", ) if tls_errors: errors.extend(tls_errors) if errors: return jsonify({"Errors": errors}), 400 if tls_private_key_pem: onion.tls_private_key = tls_private_key_pem.encode("utf-8") if tls_certificate_pem and san_list: onion.tls_public_key = tls_certificate_pem.encode("utf-8") onion.cert_expiry_date = chain[0].not_valid_after_utc if chain else None onion.cert_sans = ",".join(san_list) onion.updated = datetime.now(timezone.utc) try: db.session.commit() return jsonify({"Message": "Onion service updated successfully."}), 200 except exc.SQLAlchemyError as e: return jsonify({"Errors": [{"Error": "database_error", "Message": str(e)}]}), 500 @api_onion.route("/onion/", methods=["GET"]) def get_onion(onion_id: int) -> ResponseReturnValue: return get_single_resource(Onion, onion_id, "Onion")