majuna/app/util/x509.py

66 lines
2.5 KiB
Python
Raw Normal View History

import ssl
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
def load_certificates_from_pem(pem_data: bytes) -> list[x509.Certificate]:
certificates = []
for pem_block in pem_data.split(b"-----END CERTIFICATE-----"):
pem_block = pem_block.strip()
if pem_block:
pem_block += b"-----END CERTIFICATE-----"
certificate = x509.load_pem_x509_certificate(pem_block, default_backend())
certificates.append(certificate)
return certificates
def build_certificate_chain(certificates: list[x509.Certificate]) -> list[x509.Certificate]:
if len(certificates) == 1:
return certificates
chain = []
cert_map = {cert.subject.rfc4514_string(): cert for cert in certificates}
end_entity = next(
(cert for cert in certificates if cert.subject.rfc4514_string() not in cert_map),
None
)
if not end_entity:
raise ValueError("Cannot identify the end-entity certificate.")
chain.append(end_entity)
current_cert = end_entity
while current_cert.issuer.rfc4514_string() in cert_map:
next_cert = cert_map[current_cert.issuer.rfc4514_string()]
chain.append(next_cert)
current_cert = next_cert
return chain
def validate_certificate_chain(chain: list[x509.Certificate]) -> bool:
"""Validate a certificate chain against the system's root CA store."""
context = ssl.create_default_context()
store = context.get_ca_certs(binary_form=True)
trusted_certificates = [x509.load_der_x509_certificate(cert) for cert in store]
for i in range(len(chain) - 1):
next_public_key = chain[i + 1].public_key()
if not (isinstance(next_public_key, RSAPublicKey)):
raise ValueError(f"Certificate using unsupported algorithm: {type(next_public_key)}")
hash_algorithm = chain[i].signature_hash_algorithm
if hash_algorithm is None:
raise ValueError("Certificate missing hash algorithm")
next_public_key.verify(
chain[i].signature,
chain[i].tbs_certificate_bytes,
PKCS1v15(),
hash_algorithm
)
end_cert = chain[-1]
if not any(
end_cert.issuer == trusted_cert.subject for trusted_cert in trusted_certificates
):
raise ValueError("Certificate chain does not terminate at a trusted root CA.")
return True