import os import shutil import subprocess import base64 import json from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.hashes import SHA256 from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend from datetime import datetime, timedelta, timezone def generate_onion_keys_with_mkp224o(folder_name: str, label: str): """ Generate Tor-compatible Onion service keys using mkp224o. The keys are saved in the specified folder, and the Onion address is returned. """ os.makedirs(folder_name, exist_ok=True) # Call mkp224o to generate a single Onion service key process = subprocess.run( ["mkp224o", "-n", "1", "-d", folder_name, label], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) try: process.check_returncode() except subprocess.CalledProcessError: print("STDOUT:", process.stdout.decode()) print("STDERR:", process.stderr.decode()) raise # Find the generated Onion address for filename in os.listdir(folder_name): if filename.endswith(".onion"): onion_address = filename onion_dir = os.path.join(folder_name, filename) # Move files to parent directory for key_file in ["hs_ed25519_secret_key", "hs_ed25519_public_key", "hostname"]: src = os.path.join(onion_dir, key_file) dst = os.path.join(folder_name, key_file) if os.path.exists(src): shutil.move(src, dst) # Remove the now-empty directory os.rmdir(onion_dir) return onion_address raise RuntimeError("Failed to generate Onion keys using mkp224o") def generate_self_signed_tls_certificate(folder_name: str, onion_address: str, valid_from: datetime, valid_to: datetime, dns_names=None): """ Generate a self-signed TLS certificate for the Onion address and save it in the specified folder. """ private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend(), ) subject = x509.Name( [ x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Test State"), x509.NameAttribute(NameOID.LOCALITY_NAME, "Test City"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Test Org"), x509.NameAttribute(NameOID.COMMON_NAME, onion_address), ] ) if dns_names is None: dns_names = [onion_address, f"*.{onion_address}"] san_extension = x509.SubjectAlternativeName([x509.DNSName(name) for name in dns_names]) certificate = ( x509.CertificateBuilder() .subject_name(subject) .issuer_name(subject) .public_key(private_key.public_key()) .serial_number(x509.random_serial_number()) .not_valid_before(valid_from) .not_valid_after(valid_to) .add_extension(san_extension, critical=False) .sign(private_key, SHA256(), default_backend()) ) private_key_path = os.path.join(folder_name, "tls_private_key.pem") certificate_path = os.path.join(folder_name, "tls_certificate.pem") with open(private_key_path, "wb") as f: f.write( private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), ) ) with open(certificate_path, "wb") as f: f.write(certificate.public_bytes(serialization.Encoding.PEM)) return private_key_path, certificate_path def generate_rest_payload(parent_folder: str, folder_name: str, onion_address: str): """ Generate REST payload for a specific Onion service and append it to a shared .rest file. """ rest_file_path = os.path.join(parent_folder, "new_onion.rest") with open(os.path.join(folder_name, "hs_ed25519_secret_key"), "rb") as f: onion_private_key = base64.b64encode(f.read()).decode("utf-8") with open(os.path.join(folder_name, "hs_ed25519_public_key"), "rb") as f: onion_public_key = base64.b64encode(f.read()).decode("utf-8") with open(os.path.join(folder_name, "tls_private_key.pem"), "r") as f: tls_private_key = f.read() with open(os.path.join(folder_name, "tls_certificate.pem"), "r") as f: tls_public_key = f.read() payload = { "DomainName": onion_address, "Description": f"Generated Onion Service for {folder_name}", "OnionPrivateKey": onion_private_key, "OnionPublicKey": onion_public_key, "TlsPrivateKey": tls_private_key, "TlsCertificate": tls_public_key, "SkipChainVerification": True, "GroupId": 1, } with open(rest_file_path, "a") as f: f.write(f"### Create Onion Service ({folder_name})\n") f.write("POST http://localhost:5000/api/web/onion\n") f.write("Content-Type: application/json\n\n") json.dump(payload, f, indent=4) f.write("\n\n") if __name__ == "__main__": parent_folder = "." scenarios = [ ("self_signed_onion_service", datetime.now(timezone.utc), datetime.now(timezone.utc) + timedelta(days=365), None), ("expired_onion_service", datetime.now(timezone.utc) - timedelta(days=730), datetime.now(timezone.utc) - timedelta(days=365), None), ("future_onion_service", datetime.now(timezone.utc) + timedelta(days=365), datetime.now(timezone.utc) + timedelta(days=730), None), ("wrong_name_onion_service", datetime.now(timezone.utc), datetime.now(timezone.utc) + timedelta(days=365), ["wrong-name.example.com"]), ] if os.path.exists("new_onion.rest"): os.remove("new_onion.rest") for folder_name, valid_from, valid_to, dns_names in scenarios: print(f"Generating {folder_name}...") onion_address = generate_onion_keys_with_mkp224o(folder_name, "test") generate_self_signed_tls_certificate(folder_name, onion_address, valid_from, valid_to, dns_names) generate_rest_payload(parent_folder, folder_name, onion_address) print("All Onion services and REST requests generated successfully.")