majuna/app/api/onion.py

267 lines
7.9 KiB
Python
Raw Permalink Normal View History

2024-12-06 13:34:44 +00:00
import sys
from datetime import datetime, timezone
from typing import List, NotRequired, Optional, TypedDict
2024-12-06 13:34:44 +00:00
from cryptography import x509
from flask import Blueprint, abort, jsonify, request
2024-12-06 13:34:44 +00:00
from flask.typing import ResponseReturnValue
from sqlalchemy import exc
2024-12-06 18:15:47 +00:00
from app.api.util import (
DOMAIN_NAME_REGEX,
MAX_ALLOWED_ITEMS,
MAX_DOMAIN_NAME_LENGTH,
ListFilter,
get_single_resource,
list_resources,
validate_description,
)
2024-12-06 13:34:44 +00:00
from app.extensions import db
from app.models.base import Group
from app.models.onions import Onion
from app.util.onion import decode_onion_keys, onion_hostname
2024-12-06 13:34:44 +00:00
from app.util.x509 import validate_tls_keys
2024-12-06 18:15:47 +00:00
api_onion = Blueprint("api_onion", __name__)
2024-12-06 13:34:44 +00:00
2024-12-06 18:15:47 +00:00
@api_onion.route("/onion", methods=["GET"])
2024-12-06 13:34:44 +00:00
def list_onions() -> ResponseReturnValue:
2024-12-06 18:15:47 +00:00
domain_name_filter = request.args.get("DomainName")
group_id_filter = request.args.get("GroupId")
2024-12-06 13:34:44 +00:00
2024-12-06 18:15:47 +00:00
filters: List[ListFilter] = [(Onion.destroyed.is_(None))]
2024-12-06 13:34:44 +00:00
if domain_name_filter:
if len(domain_name_filter) > MAX_DOMAIN_NAME_LENGTH:
2024-12-06 18:15:47 +00:00
abort(
400,
description=f"DomainName cannot exceed {MAX_DOMAIN_NAME_LENGTH} characters.",
)
2024-12-06 13:34:44 +00:00
if not DOMAIN_NAME_REGEX.match(domain_name_filter):
abort(400, description="DomainName contains invalid characters.")
filters.append(Onion.domain_name.ilike(f"%{domain_name_filter}%"))
if group_id_filter:
try:
filters.append(Onion.group_id == int(group_id_filter))
except ValueError:
abort(400, description="GroupId must be a valid integer.")
return list_resources(
Onion,
lambda onion: onion.to_dict(),
filters=filters,
2024-12-06 18:15:47 +00:00
resource_name="OnionsList",
2024-12-06 13:34:44 +00:00
max_allowed_items=MAX_ALLOWED_ITEMS,
2024-12-06 18:15:47 +00:00
protective_marking="amber",
2024-12-06 13:34:44 +00:00
)
class CreateOnionRequest(TypedDict):
DomainName: str
Description: str
GroupId: int
OnionPrivateKey: str
OnionPublicKey: str
TlsPrivateKey: str
TlsCertificate: str
SkipChainVerification: NotRequired[bool]
SkipNameVerification: NotRequired[bool]
@api_onion.route("/onion", methods=["POST"])
def create_onion() -> ResponseReturnValue:
data: Optional[CreateOnionRequest] = request.json
if not data:
abort(400)
errors = []
2024-12-06 18:15:47 +00:00
for field in [
"DomainName",
"Description",
"OnionPrivateKey",
"OnionPublicKey",
"GroupId",
"TlsPrivateKey",
"TlsCertificate",
]:
2024-12-06 13:34:44 +00:00
if not data.get(field):
2024-12-06 18:15:47 +00:00
errors.append(
{
"Error": f"{field}_missing",
"Message": f"Missing required field: {field}",
}
)
onion_private_key, onion_public_key, onion_errors = decode_onion_keys(
data["OnionPrivateKey"], data["OnionPublicKey"]
)
2024-12-06 13:34:44 +00:00
if onion_errors:
errors.extend(onion_errors)
if onion_public_key is None:
return jsonify({"Errors": errors}), 400
if onion_private_key:
2024-12-06 18:15:47 +00:00
existing_onion = (
db.session.query(Onion)
.where(
Onion.onion_private_key == onion_private_key,
Onion.destroyed.is_(None),
)
.first()
)
2024-12-06 13:34:44 +00:00
if existing_onion:
errors.append(
2024-12-06 18:15:47 +00:00
{
"Error": "duplicate_onion_key",
"Message": "An onion service with this private key already exists.",
}
)
2024-12-06 13:34:44 +00:00
if "GroupId" in data:
group = Group.query.get(data["GroupId"])
if not group:
2024-12-06 18:15:47 +00:00
errors.append(
{"Error": "group_id_not_found", "Message": "Invalid group ID."}
)
2024-12-06 13:34:44 +00:00
chain, san_list, tls_errors = validate_tls_keys(
2024-12-06 18:15:47 +00:00
data["TlsPrivateKey"],
data["TlsCertificate"],
data.get("SkipChainVerification"),
2024-12-06 13:34:44 +00:00
data.get("SkipNameVerification"),
2024-12-06 18:15:47 +00:00
f"{onion_hostname(onion_public_key)}.onion",
2024-12-06 13:34:44 +00:00
)
if tls_errors:
errors.extend(tls_errors)
if errors:
return jsonify({"Errors": errors}), 400
cert_expiry_date = chain[0].not_valid_after if chain else None
onion = Onion(
domain_name=data["DomainName"],
description=data["Description"],
onion_private_key=onion_private_key,
onion_public_key=onion_public_key,
tls_private_key=data["TlsPrivateKey"].encode("utf-8"),
tls_public_key=data["TlsCertificate"].encode("utf-8"),
group_id=data["GroupId"],
added=datetime.now(timezone.utc),
updated=datetime.now(timezone.utc),
cert_expiry=cert_expiry_date,
2024-12-06 18:15:47 +00:00
cert_sans=",".join(san_list),
2024-12-06 13:34:44 +00:00
)
try:
db.session.add(onion)
db.session.commit()
2024-12-06 18:15:47 +00:00
return (
jsonify({"Message": "Onion service created successfully.", "Id": onion.id}),
201,
)
2024-12-06 13:34:44 +00:00
except exc.SQLAlchemyError as e:
2024-12-06 18:15:47 +00:00
return (
jsonify({"Errors": [{"Error": "database_error", "Message": str(e)}]}),
500,
)
2024-12-06 13:34:44 +00:00
class UpdateOnionRequest(TypedDict):
Description: NotRequired[str]
TlsPrivateKey: NotRequired[str]
TlsCertificate: NotRequired[str]
SkipChainVerification: NotRequired[bool]
SkipNameVerification: NotRequired[bool]
@api_onion.route("/onion/<int:onion_id>", methods=["PUT"])
def update_onion(onion_id: int) -> ResponseReturnValue:
data: Optional[UpdateOnionRequest] = request.json
if not data:
abort(400)
errors = []
onion = Onion.query.get(onion_id)
if not onion:
2024-12-06 18:15:47 +00:00
return (
jsonify(
{
"Errors": [
{
"Error": "onion_not_found",
"Message": f"No Onion service found with ID {onion_id}",
}
]
}
),
404,
)
2024-12-06 13:34:44 +00:00
if "Description" in data:
description = data["Description"]
print(f"Description {description}", file=sys.stderr)
if validate_description(description):
onion.description = description
else:
2024-12-06 18:15:47 +00:00
errors.append(
{
"Error": "description_error",
"Message": "Description field is invalid",
}
)
2024-12-06 13:34:44 +00:00
tls_private_key_pem: Optional[str] = None
tls_certificate_pem: Optional[str] = None
chain: Optional[List[x509.Certificate]] = None
san_list: Optional[List[str]] = None
if "TlsCertificate" in data:
tls_certificate_pem = data.get("TlsCertificate")
if "TlsPrivateKey" in data:
tls_private_key_pem = data.get("TlsPrivateKey")
else:
tls_private_key_pem = onion.tls_private_key.decode("utf-8")
chain, san_list, tls_errors = validate_tls_keys(
2024-12-06 18:15:47 +00:00
tls_private_key_pem,
tls_certificate_pem,
data.get("SkipChainVerification", False),
2024-12-06 13:34:44 +00:00
data.get("SkipNameVerification", False),
f"{onion_hostname(onion.onion_public_key)}.onion",
)
if tls_errors:
errors.extend(tls_errors)
if errors:
return jsonify({"Errors": errors}), 400
if tls_private_key_pem:
onion.tls_private_key = tls_private_key_pem.encode("utf-8")
if tls_certificate_pem and san_list:
onion.tls_public_key = tls_certificate_pem.encode("utf-8")
onion.cert_expiry_date = chain[0].not_valid_after_utc if chain else None
onion.cert_sans = ",".join(san_list)
onion.updated = datetime.now(timezone.utc)
try:
db.session.commit()
return jsonify({"Message": "Onion service updated successfully."}), 200
except exc.SQLAlchemyError as e:
2024-12-06 18:15:47 +00:00
return (
jsonify({"Errors": [{"Error": "database_error", "Message": str(e)}]}),
500,
)
2024-12-06 13:34:44 +00:00
@api_onion.route("/onion/<int:onion_id>", methods=["GET"])
def get_onion(onion_id: int) -> ResponseReturnValue:
return get_single_resource(Onion, onion_id, "Onion")