216 lines
8.3 KiB
Python
216 lines
8.3 KiB
Python
|
import base64
|
||
|
import json
|
||
|
import os
|
||
|
import shutil
|
||
|
import subprocess
|
||
|
from datetime import datetime, timedelta, timezone
|
||
|
|
||
|
import pytest
|
||
|
import requests
|
||
|
from cryptography import x509
|
||
|
from cryptography.hazmat._oid import NameOID
|
||
|
from cryptography.hazmat.backends import default_backend
|
||
|
from cryptography.hazmat.primitives import serialization
|
||
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
||
|
from cryptography.hazmat.primitives.hashes import SHA256
|
||
|
|
||
|
from tests.api.conftest import temporary_test_directory
|
||
|
|
||
|
mkp224o_available = shutil.which("mkp224o") is not None
|
||
|
|
||
|
|
||
|
def generate_self_signed_tls_certificate(folder_name: str, onion_address: str, valid_from: datetime, valid_to: datetime,
|
||
|
dns_names=None):
|
||
|
"""
|
||
|
Generate a self-signed TLS certificate for the Onion address and save it in the specified folder.
|
||
|
"""
|
||
|
private_key = rsa.generate_private_key(
|
||
|
public_exponent=65537,
|
||
|
key_size=2048,
|
||
|
backend=default_backend(),
|
||
|
)
|
||
|
subject = x509.Name(
|
||
|
[
|
||
|
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
|
||
|
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Test State"),
|
||
|
x509.NameAttribute(NameOID.LOCALITY_NAME, "Test City"),
|
||
|
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Test Org"),
|
||
|
x509.NameAttribute(NameOID.COMMON_NAME, onion_address),
|
||
|
]
|
||
|
)
|
||
|
if dns_names is None:
|
||
|
dns_names = [onion_address, f"*.{onion_address}"]
|
||
|
|
||
|
san_extension = x509.SubjectAlternativeName([x509.DNSName(name) for name in dns_names])
|
||
|
|
||
|
certificate = (
|
||
|
x509.CertificateBuilder()
|
||
|
.subject_name(subject)
|
||
|
.issuer_name(subject)
|
||
|
.public_key(private_key.public_key())
|
||
|
.serial_number(x509.random_serial_number())
|
||
|
.not_valid_before(valid_from)
|
||
|
.not_valid_after(valid_to)
|
||
|
.add_extension(san_extension, critical=False)
|
||
|
.sign(private_key, SHA256(), default_backend())
|
||
|
)
|
||
|
|
||
|
private_key_path = os.path.join(folder_name, "tls_private_key.pem")
|
||
|
certificate_path = os.path.join(folder_name, "tls_certificate.pem")
|
||
|
|
||
|
with open(private_key_path, "wb") as f:
|
||
|
f.write(
|
||
|
private_key.private_bytes(
|
||
|
encoding=serialization.Encoding.PEM,
|
||
|
format=serialization.PrivateFormat.PKCS8,
|
||
|
encryption_algorithm=serialization.NoEncryption(),
|
||
|
)
|
||
|
)
|
||
|
with open(certificate_path, "wb") as f:
|
||
|
f.write(certificate.public_bytes(serialization.Encoding.PEM))
|
||
|
|
||
|
return private_key_path, certificate_path
|
||
|
|
||
|
|
||
|
def generate_create_request_payload(folder_name: str):
|
||
|
with open(os.path.join(folder_name, "hs_ed25519_secret_key"), "rb") as f:
|
||
|
onion_private_key = base64.b64encode(f.read()).decode("utf-8")
|
||
|
with open(os.path.join(folder_name, "hs_ed25519_public_key"), "rb") as f:
|
||
|
onion_public_key = base64.b64encode(f.read()).decode("utf-8")
|
||
|
with open(os.path.join(folder_name, "tls_private_key.pem"), "r") as f:
|
||
|
tls_private_key = f.read()
|
||
|
with open(os.path.join(folder_name, "tls_certificate.pem"), "r") as f:
|
||
|
tls_public_key = f.read()
|
||
|
|
||
|
payload = {
|
||
|
"DomainName": "example.com",
|
||
|
"Description": f"Generated Onion Service for {folder_name}",
|
||
|
"OnionPrivateKey": onion_private_key,
|
||
|
"OnionPublicKey": onion_public_key,
|
||
|
"TlsPrivateKey": tls_private_key,
|
||
|
"TlsCertificate": tls_public_key,
|
||
|
"SkipChainVerification": True,
|
||
|
"GroupId": 1,
|
||
|
}
|
||
|
|
||
|
return payload
|
||
|
|
||
|
|
||
|
def generate_onion_keys_with_mkp224o(folder_name: str, label: str):
|
||
|
"""
|
||
|
Generate Tor-compatible Onion service keys using mkp224o.
|
||
|
The keys are saved in the specified folder, and the Onion address is returned.
|
||
|
"""
|
||
|
os.makedirs(folder_name, exist_ok=True)
|
||
|
|
||
|
# Call mkp224o to generate a single Onion service key
|
||
|
process = subprocess.run(
|
||
|
["mkp224o", "-n", "1", "-d", folder_name, label],
|
||
|
stdout=subprocess.PIPE,
|
||
|
stderr=subprocess.PIPE
|
||
|
)
|
||
|
try:
|
||
|
process.check_returncode()
|
||
|
except subprocess.CalledProcessError:
|
||
|
print("STDOUT:", process.stdout.decode())
|
||
|
print("STDERR:", process.stderr.decode())
|
||
|
raise
|
||
|
|
||
|
# Find the generated Onion address
|
||
|
for filename in os.listdir(folder_name):
|
||
|
if filename.endswith(".onion"):
|
||
|
onion_address = filename
|
||
|
onion_dir = os.path.join(folder_name, filename)
|
||
|
|
||
|
# Move files to parent directory
|
||
|
for key_file in ["hs_ed25519_secret_key", "hs_ed25519_public_key", "hostname"]:
|
||
|
src = os.path.join(onion_dir, key_file)
|
||
|
dst = os.path.join(folder_name, key_file)
|
||
|
if os.path.exists(src):
|
||
|
shutil.move(src, dst)
|
||
|
|
||
|
# Remove the now-empty directory
|
||
|
os.rmdir(onion_dir)
|
||
|
|
||
|
return onion_address
|
||
|
|
||
|
raise RuntimeError("Failed to generate Onion keys using mkp224o")
|
||
|
|
||
|
@pytest.mark.skipif(not mkp224o_available, reason="mkp224o is not available in PATH")
|
||
|
@pytest.mark.parametrize("scenario", [
|
||
|
("self_signed_onion_service", datetime.now(timezone.utc), datetime.now(timezone.utc) + timedelta(days=365), None,
|
||
|
201, []),
|
||
|
("expired_onion_service", datetime.now(timezone.utc) - timedelta(days=730),
|
||
|
datetime.now(timezone.utc) - timedelta(days=365), None, 400, ["tls_public_key_expired"]),
|
||
|
("future_onion_service", datetime.now(timezone.utc) + timedelta(days=365),
|
||
|
datetime.now(timezone.utc) + timedelta(days=730), None, 400, ["tls_public_key_future"]),
|
||
|
("wrong_name_onion_service", datetime.now(timezone.utc), datetime.now(timezone.utc) + timedelta(days=365),
|
||
|
["wrong-name.example.com"], 400, ["hostname_not_in_san"]),
|
||
|
])
|
||
|
def test_onion_service_creation(temporary_test_directory, scenario):
|
||
|
folder_name, valid_from, valid_to, dns_names, expected_status, expected_errors = scenario
|
||
|
|
||
|
folder_path = os.path.join(temporary_test_directory, folder_name)
|
||
|
os.makedirs(folder_path, exist_ok=True)
|
||
|
|
||
|
onion_address = generate_onion_keys_with_mkp224o(folder_path, "test")
|
||
|
generate_self_signed_tls_certificate(folder_path, onion_address, valid_from, valid_to, dns_names)
|
||
|
|
||
|
payload = generate_create_request_payload(folder_path)
|
||
|
|
||
|
response = requests.post(
|
||
|
"http://localhost:5001/api/onion/onion",
|
||
|
headers={"Content-Type": "application/json"},
|
||
|
data=json.dumps(payload),
|
||
|
)
|
||
|
|
||
|
assert response.status_code == expected_status, f"Unexpected response: {response.text}"
|
||
|
|
||
|
response_data = response.json()
|
||
|
if expected_errors:
|
||
|
assert "Errors" in response_data
|
||
|
assert set(expected_errors) == set([e["Error"] for e in response_data["Errors"]])
|
||
|
else:
|
||
|
assert "Errors" not in response_data
|
||
|
|
||
|
if os.path.exists(folder_path):
|
||
|
shutil.rmtree(folder_path)
|
||
|
|
||
|
@pytest.mark.skipif(not mkp224o_available, reason="mkp224o is not available in PATH")
|
||
|
@pytest.mark.parametrize("new_description, expected_status", [
|
||
|
("Updated description", 200),
|
||
|
(None, 400),
|
||
|
])
|
||
|
def test_update_onion_description(temporary_test_directory, new_description, expected_status):
|
||
|
update_payload = {"Description": new_description}
|
||
|
response = requests.put("http://localhost:5001/api/onion/onion/1", headers={"Content-Type": "application/json"},
|
||
|
data=json.dumps(update_payload))
|
||
|
|
||
|
assert response.status_code == expected_status
|
||
|
|
||
|
@pytest.mark.skipif(not mkp224o_available, reason="mkp224o is not available in PATH")
|
||
|
def test_update_tls_certificate(temporary_test_directory):
|
||
|
folder_path = os.path.join(temporary_test_directory, "update_certificate")
|
||
|
os.makedirs(folder_path)
|
||
|
onion_address = "test.onion"
|
||
|
private_key_path, certificate_path = generate_self_signed_tls_certificate(
|
||
|
folder_path, onion_address, datetime.now(timezone.utc), datetime.now(timezone.utc) + timedelta(days=365)
|
||
|
)
|
||
|
|
||
|
with open(private_key_path, "r") as f:
|
||
|
new_private_key = f.read()
|
||
|
with open(certificate_path, "r") as f:
|
||
|
new_certificate = f.read()
|
||
|
|
||
|
update_payload = {
|
||
|
"TlsPrivateKey": new_private_key,
|
||
|
"TlsCertificate": new_certificate,
|
||
|
"SkipChainVerification": True,
|
||
|
"SkipNameVerification": True,
|
||
|
}
|
||
|
|
||
|
response = requests.put("http://localhost:5001/api/onion/onion/1", headers={"Content-Type": "application/json"},
|
||
|
data=json.dumps(update_payload))
|
||
|
|
||
|
assert response.status_code == 200
|