import base64 import json import os import shutil import subprocess from datetime import datetime, timedelta, timezone import pytest import requests from cryptography import x509 from cryptography.hazmat._oid import NameOID from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.hashes import SHA256 from tests.api.conftest import temporary_test_directory mkp224o_available = shutil.which("mkp224o") is not None def generate_self_signed_tls_certificate(folder_name: str, onion_address: str, valid_from: datetime, valid_to: datetime, dns_names=None): """ Generate a self-signed TLS certificate for the Onion address and save it in the specified folder. """ private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend(), ) subject = x509.Name( [ x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Test State"), x509.NameAttribute(NameOID.LOCALITY_NAME, "Test City"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Test Org"), x509.NameAttribute(NameOID.COMMON_NAME, onion_address), ] ) if dns_names is None: dns_names = [onion_address, f"*.{onion_address}"] san_extension = x509.SubjectAlternativeName([x509.DNSName(name) for name in dns_names]) certificate = ( x509.CertificateBuilder() .subject_name(subject) .issuer_name(subject) .public_key(private_key.public_key()) .serial_number(x509.random_serial_number()) .not_valid_before(valid_from) .not_valid_after(valid_to) .add_extension(san_extension, critical=False) .sign(private_key, SHA256(), default_backend()) ) private_key_path = os.path.join(folder_name, "tls_private_key.pem") certificate_path = os.path.join(folder_name, "tls_certificate.pem") with open(private_key_path, "wb") as f: f.write( private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), ) ) with open(certificate_path, "wb") as f: f.write(certificate.public_bytes(serialization.Encoding.PEM)) return private_key_path, certificate_path def generate_create_request_payload(folder_name: str): with open(os.path.join(folder_name, "hs_ed25519_secret_key"), "rb") as f: onion_private_key = base64.b64encode(f.read()).decode("utf-8") with open(os.path.join(folder_name, "hs_ed25519_public_key"), "rb") as f: onion_public_key = base64.b64encode(f.read()).decode("utf-8") with open(os.path.join(folder_name, "tls_private_key.pem"), "r") as f: tls_private_key = f.read() with open(os.path.join(folder_name, "tls_certificate.pem"), "r") as f: tls_public_key = f.read() payload = { "DomainName": "example.com", "Description": f"Generated Onion Service for {folder_name}", "OnionPrivateKey": onion_private_key, "OnionPublicKey": onion_public_key, "TlsPrivateKey": tls_private_key, "TlsCertificate": tls_public_key, "SkipChainVerification": True, "GroupId": 1, } return payload def generate_onion_keys_with_mkp224o(folder_name: str, label: str): """ Generate Tor-compatible Onion service keys using mkp224o. The keys are saved in the specified folder, and the Onion address is returned. """ os.makedirs(folder_name, exist_ok=True) # Call mkp224o to generate a single Onion service key process = subprocess.run( ["mkp224o", "-n", "1", "-d", folder_name, label], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) try: process.check_returncode() except subprocess.CalledProcessError: print("STDOUT:", process.stdout.decode()) print("STDERR:", process.stderr.decode()) raise # Find the generated Onion address for filename in os.listdir(folder_name): if filename.endswith(".onion"): onion_address = filename onion_dir = os.path.join(folder_name, filename) # Move files to parent directory for key_file in ["hs_ed25519_secret_key", "hs_ed25519_public_key", "hostname"]: src = os.path.join(onion_dir, key_file) dst = os.path.join(folder_name, key_file) if os.path.exists(src): shutil.move(src, dst) # Remove the now-empty directory os.rmdir(onion_dir) return onion_address raise RuntimeError("Failed to generate Onion keys using mkp224o") @pytest.mark.skipif(not mkp224o_available, reason="mkp224o is not available in PATH") @pytest.mark.parametrize("scenario", [ ("self_signed_onion_service", datetime.now(timezone.utc), datetime.now(timezone.utc) + timedelta(days=365), None, 201, []), ("expired_onion_service", datetime.now(timezone.utc) - timedelta(days=730), datetime.now(timezone.utc) - timedelta(days=365), None, 400, ["tls_public_key_expired"]), ("future_onion_service", datetime.now(timezone.utc) + timedelta(days=365), datetime.now(timezone.utc) + timedelta(days=730), None, 400, ["tls_public_key_future"]), ("wrong_name_onion_service", datetime.now(timezone.utc), datetime.now(timezone.utc) + timedelta(days=365), ["wrong-name.example.com"], 400, ["hostname_not_in_san"]), ]) def test_onion_service_creation(temporary_test_directory, scenario): folder_name, valid_from, valid_to, dns_names, expected_status, expected_errors = scenario folder_path = os.path.join(temporary_test_directory, folder_name) os.makedirs(folder_path, exist_ok=True) onion_address = generate_onion_keys_with_mkp224o(folder_path, "test") generate_self_signed_tls_certificate(folder_path, onion_address, valid_from, valid_to, dns_names) payload = generate_create_request_payload(folder_path) response = requests.post( "http://localhost:5001/api/onion/onion", headers={"Content-Type": "application/json"}, data=json.dumps(payload), ) assert response.status_code == expected_status, f"Unexpected response: {response.text}" response_data = response.json() if expected_errors: assert "Errors" in response_data assert set(expected_errors) == set([e["Error"] for e in response_data["Errors"]]) else: assert "Errors" not in response_data if os.path.exists(folder_path): shutil.rmtree(folder_path) @pytest.mark.skipif(not mkp224o_available, reason="mkp224o is not available in PATH") @pytest.mark.parametrize("new_description, expected_status", [ ("Updated description", 200), (None, 400), ]) def test_update_onion_description(temporary_test_directory, new_description, expected_status): update_payload = {"Description": new_description} response = requests.put("http://localhost:5001/api/onion/onion/1", headers={"Content-Type": "application/json"}, data=json.dumps(update_payload)) assert response.status_code == expected_status @pytest.mark.skipif(not mkp224o_available, reason="mkp224o is not available in PATH") def test_update_tls_certificate(temporary_test_directory): folder_path = os.path.join(temporary_test_directory, "update_certificate") os.makedirs(folder_path) onion_address = "test.onion" private_key_path, certificate_path = generate_self_signed_tls_certificate( folder_path, onion_address, datetime.now(timezone.utc), datetime.now(timezone.utc) + timedelta(days=365) ) with open(private_key_path, "r") as f: new_private_key = f.read() with open(certificate_path, "r") as f: new_certificate = f.read() update_payload = { "TlsPrivateKey": new_private_key, "TlsCertificate": new_certificate, "SkipChainVerification": True, "SkipNameVerification": True, } response = requests.put("http://localhost:5001/api/onion/onion/1", headers={"Content-Type": "application/json"}, data=json.dumps(update_payload)) assert response.status_code == 200