feat: expand onion service api

This commit is contained in:
Iain Learmonth 2024-12-06 13:34:44 +00:00
parent c1b385ed99
commit e5976c4739
11 changed files with 646 additions and 348 deletions

64
tests/api/conftest.py Normal file
View file

@ -0,0 +1,64 @@
import os
import shutil
import tempfile
from datetime import datetime, timezone
from multiprocessing import Process
from time import sleep
import pytest
from sqlalchemy.exc import IntegrityError
from app import app, db
from app.models.base import Group
@pytest.fixture(scope="session", autouse=True)
def test_server(test_database):
process = Process(target=run_app)
process.start()
sleep(2)
yield
process.terminate()
process.join()
@pytest.fixture(scope="session", autouse=True)
def test_database():
temp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
db_path = temp_db.name
# app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{db_path}"
app.config["TESTING"] = True
with app.app_context():
db.create_all()
group = Group(group_name="test-group",
description="Test group",
eotk=True, added=datetime.now(timezone.utc),
updated=datetime.now(timezone.utc))
try:
db.session.add(group)
db.session.commit()
except IntegrityError:
db.session.rollback()
yield db_path
with app.app_context():
db.drop_all()
os.unlink(db_path)
def run_app():
app.run(host="localhost", port=5001, debug=False, use_reloader=False)
@pytest.fixture(scope="function")
def temporary_test_directory():
temp_dir = tempfile.mkdtemp()
yield temp_dir
shutil.rmtree(temp_dir) # Cleanup after the test

215
tests/api/test_onion.py Normal file
View file

@ -0,0 +1,215 @@
import base64
import json
import os
import shutil
import subprocess
from datetime import datetime, timedelta, timezone
import pytest
import requests
from cryptography import x509
from cryptography.hazmat._oid import NameOID
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.hashes import SHA256
from tests.api.conftest import temporary_test_directory
mkp224o_available = shutil.which("mkp224o") is not None
def generate_self_signed_tls_certificate(folder_name: str, onion_address: str, valid_from: datetime, valid_to: datetime,
dns_names=None):
"""
Generate a self-signed TLS certificate for the Onion address and save it in the specified folder.
"""
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend(),
)
subject = x509.Name(
[
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Test State"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "Test City"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Test Org"),
x509.NameAttribute(NameOID.COMMON_NAME, onion_address),
]
)
if dns_names is None:
dns_names = [onion_address, f"*.{onion_address}"]
san_extension = x509.SubjectAlternativeName([x509.DNSName(name) for name in dns_names])
certificate = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(subject)
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(valid_from)
.not_valid_after(valid_to)
.add_extension(san_extension, critical=False)
.sign(private_key, SHA256(), default_backend())
)
private_key_path = os.path.join(folder_name, "tls_private_key.pem")
certificate_path = os.path.join(folder_name, "tls_certificate.pem")
with open(private_key_path, "wb") as f:
f.write(
private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
)
with open(certificate_path, "wb") as f:
f.write(certificate.public_bytes(serialization.Encoding.PEM))
return private_key_path, certificate_path
def generate_create_request_payload(folder_name: str):
with open(os.path.join(folder_name, "hs_ed25519_secret_key"), "rb") as f:
onion_private_key = base64.b64encode(f.read()).decode("utf-8")
with open(os.path.join(folder_name, "hs_ed25519_public_key"), "rb") as f:
onion_public_key = base64.b64encode(f.read()).decode("utf-8")
with open(os.path.join(folder_name, "tls_private_key.pem"), "r") as f:
tls_private_key = f.read()
with open(os.path.join(folder_name, "tls_certificate.pem"), "r") as f:
tls_public_key = f.read()
payload = {
"DomainName": "example.com",
"Description": f"Generated Onion Service for {folder_name}",
"OnionPrivateKey": onion_private_key,
"OnionPublicKey": onion_public_key,
"TlsPrivateKey": tls_private_key,
"TlsCertificate": tls_public_key,
"SkipChainVerification": True,
"GroupId": 1,
}
return payload
def generate_onion_keys_with_mkp224o(folder_name: str, label: str):
"""
Generate Tor-compatible Onion service keys using mkp224o.
The keys are saved in the specified folder, and the Onion address is returned.
"""
os.makedirs(folder_name, exist_ok=True)
# Call mkp224o to generate a single Onion service key
process = subprocess.run(
["mkp224o", "-n", "1", "-d", folder_name, label],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
try:
process.check_returncode()
except subprocess.CalledProcessError:
print("STDOUT:", process.stdout.decode())
print("STDERR:", process.stderr.decode())
raise
# Find the generated Onion address
for filename in os.listdir(folder_name):
if filename.endswith(".onion"):
onion_address = filename
onion_dir = os.path.join(folder_name, filename)
# Move files to parent directory
for key_file in ["hs_ed25519_secret_key", "hs_ed25519_public_key", "hostname"]:
src = os.path.join(onion_dir, key_file)
dst = os.path.join(folder_name, key_file)
if os.path.exists(src):
shutil.move(src, dst)
# Remove the now-empty directory
os.rmdir(onion_dir)
return onion_address
raise RuntimeError("Failed to generate Onion keys using mkp224o")
@pytest.mark.skipif(not mkp224o_available, reason="mkp224o is not available in PATH")
@pytest.mark.parametrize("scenario", [
("self_signed_onion_service", datetime.now(timezone.utc), datetime.now(timezone.utc) + timedelta(days=365), None,
201, []),
("expired_onion_service", datetime.now(timezone.utc) - timedelta(days=730),
datetime.now(timezone.utc) - timedelta(days=365), None, 400, ["tls_public_key_expired"]),
("future_onion_service", datetime.now(timezone.utc) + timedelta(days=365),
datetime.now(timezone.utc) + timedelta(days=730), None, 400, ["tls_public_key_future"]),
("wrong_name_onion_service", datetime.now(timezone.utc), datetime.now(timezone.utc) + timedelta(days=365),
["wrong-name.example.com"], 400, ["hostname_not_in_san"]),
])
def test_onion_service_creation(temporary_test_directory, scenario):
folder_name, valid_from, valid_to, dns_names, expected_status, expected_errors = scenario
folder_path = os.path.join(temporary_test_directory, folder_name)
os.makedirs(folder_path, exist_ok=True)
onion_address = generate_onion_keys_with_mkp224o(folder_path, "test")
generate_self_signed_tls_certificate(folder_path, onion_address, valid_from, valid_to, dns_names)
payload = generate_create_request_payload(folder_path)
response = requests.post(
"http://localhost:5001/api/onion/onion",
headers={"Content-Type": "application/json"},
data=json.dumps(payload),
)
assert response.status_code == expected_status, f"Unexpected response: {response.text}"
response_data = response.json()
if expected_errors:
assert "Errors" in response_data
assert set(expected_errors) == set([e["Error"] for e in response_data["Errors"]])
else:
assert "Errors" not in response_data
if os.path.exists(folder_path):
shutil.rmtree(folder_path)
@pytest.mark.skipif(not mkp224o_available, reason="mkp224o is not available in PATH")
@pytest.mark.parametrize("new_description, expected_status", [
("Updated description", 200),
(None, 400),
])
def test_update_onion_description(temporary_test_directory, new_description, expected_status):
update_payload = {"Description": new_description}
response = requests.put("http://localhost:5001/api/onion/onion/1", headers={"Content-Type": "application/json"},
data=json.dumps(update_payload))
assert response.status_code == expected_status
@pytest.mark.skipif(not mkp224o_available, reason="mkp224o is not available in PATH")
def test_update_tls_certificate(temporary_test_directory):
folder_path = os.path.join(temporary_test_directory, "update_certificate")
os.makedirs(folder_path)
onion_address = "test.onion"
private_key_path, certificate_path = generate_self_signed_tls_certificate(
folder_path, onion_address, datetime.now(timezone.utc), datetime.now(timezone.utc) + timedelta(days=365)
)
with open(private_key_path, "r") as f:
new_private_key = f.read()
with open(certificate_path, "r") as f:
new_certificate = f.read()
update_payload = {
"TlsPrivateKey": new_private_key,
"TlsCertificate": new_certificate,
"SkipChainVerification": True,
"SkipNameVerification": True,
}
response = requests.put("http://localhost:5001/api/onion/onion/1", headers={"Content-Type": "application/json"},
data=json.dumps(update_payload))
assert response.status_code == 200