diff --git a/migrations/versions/c14f25f364c5_onion_certificate_metadata.py b/migrations/versions/c14f25f364c5_onion_certificate_metadata.py new file mode 100644 index 0000000..5938697 --- /dev/null +++ b/migrations/versions/c14f25f364c5_onion_certificate_metadata.py @@ -0,0 +1,94 @@ +"""onion certificate metadata + +Revision ID: c14f25f364c5 +Revises: 13b1d64f134a +Create Date: 2024-12-06 14:14:45.796762 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy import String, LargeBinary, DateTime, Integer, table, column + +from app.util.x509 import validate_tls_keys, extract_sans + +revision = 'c14f25f364c5' +down_revision = '13b1d64f134a' +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table('onion', schema=None) as batch_op: + batch_op.add_column(sa.Column('cert_expiry', sa.DateTime(timezone=True), nullable=True)) + batch_op.add_column(sa.Column('cert_sans', sa.String(), nullable=True)) + + connection = op.get_bind() + onion_table = table( + 'onion', + column('id', Integer), # Primary key + column('description', String(255)), + column('added', DateTime), + column('updated', DateTime), + column('destroyed', DateTime), + column('group_id', Integer), + column('domain_name', String(255)), + column('onion_public_key', LargeBinary), + column('onion_private_key', LargeBinary), + column('tls_public_key', LargeBinary), + column('tls_private_key', LargeBinary), + column('cert_expiry', DateTime(timezone=True)), # New column + column('cert_sans', String), # New column + ) + + rows = connection.execute(sa.select( + onion_table.c.id, + onion_table.c.tls_public_key, + onion_table.c.tls_private_key + )).fetchall() + + updates = [] + for row in rows: + # validate_tls_keys only returns the SANs if you validate the name + chain, _, tls_errors = validate_tls_keys( + row.tls_private_key.decode('utf-8'), row.tls_public_key.decode('utf-8'), True, + True, "" + ) + + if tls_errors: + connection.close() + with op.batch_alter_table('onion', schema=None) as batch_op: + batch_op.drop_column('cert_expiry') + batch_op.drop_column('cert_sans') + + raise RuntimeError(f"TLS key error for onion {row.id}: {tls_errors}") + + cert_expiry = chain[0].not_valid_after if chain else None + cert_sans = extract_sans(chain[0]) + + updates.append({ + 'id': row.id, + 'cert_expiry': cert_expiry, + 'cert_sans': ",".join(cert_sans) + }) + + for update in updates: + connection.execute( + sa.text( + "UPDATE onion SET cert_expiry = :cert_expiry, cert_sans = :cert_sans WHERE id = :id" + ), + { + 'cert_expiry': update['cert_expiry'], + 'cert_sans': update['cert_sans'], + 'id': update['id'] + } + ) + + with op.batch_alter_table('onion', schema=None) as batch_op: + batch_op.alter_column('cert_expiry', existing_type=sa.DateTime(timezone=True), nullable=False) + batch_op.alter_column('cert_sans', existing_type=sa.String(), nullable=False) + + +def downgrade(): + with op.batch_alter_table('onion', schema=None) as batch_op: + batch_op.drop_column('cert_sans') + batch_op.drop_column('cert_expiry')