"""onion certificate metadata Revision ID: c14f25f364c5 Revises: 13b1d64f134a Create Date: 2024-12-06 14:14:45.796762 """ from alembic import op import sqlalchemy as sa from sqlalchemy import String, LargeBinary, DateTime, Integer, table, column from app.util.x509 import validate_tls_keys, extract_sans revision = 'c14f25f364c5' down_revision = '13b1d64f134a' branch_labels = None depends_on = None def upgrade(): with op.batch_alter_table('onion', schema=None) as batch_op: batch_op.add_column(sa.Column('cert_expiry', sa.DateTime(timezone=True), nullable=True)) batch_op.add_column(sa.Column('cert_sans', sa.String(), nullable=True)) connection = op.get_bind() onion_table = table( 'onion', column('id', Integer), # Primary key column('description', String(255)), column('added', DateTime), column('updated', DateTime), column('destroyed', DateTime), column('group_id', Integer), column('domain_name', String(255)), column('onion_public_key', LargeBinary), column('onion_private_key', LargeBinary), column('tls_public_key', LargeBinary), column('tls_private_key', LargeBinary), column('cert_expiry', DateTime(timezone=True)), # New column column('cert_sans', String), # New column ) rows = connection.execute(sa.select( onion_table.c.id, onion_table.c.tls_public_key, onion_table.c.tls_private_key )).fetchall() updates = [] for row in rows: # validate_tls_keys only returns the SANs if you validate the name chain, _, tls_errors = validate_tls_keys( row.tls_private_key.decode('utf-8'), row.tls_public_key.decode('utf-8'), True, True, "" ) if tls_errors: connection.close() with op.batch_alter_table('onion', schema=None) as batch_op: batch_op.drop_column('cert_expiry') batch_op.drop_column('cert_sans') raise RuntimeError(f"TLS key error for onion {row.id}: {tls_errors}") cert_expiry = chain[0].not_valid_after if chain else None cert_sans = extract_sans(chain[0]) updates.append({ 'id': row.id, 'cert_expiry': cert_expiry, 'cert_sans': ",".join(cert_sans) }) for update in updates: connection.execute( sa.text( "UPDATE onion SET cert_expiry = :cert_expiry, cert_sans = :cert_sans WHERE id = :id" ), { 'cert_expiry': update['cert_expiry'], 'cert_sans': update['cert_sans'], 'id': update['id'] } ) with op.batch_alter_table('onion', schema=None) as batch_op: batch_op.alter_column('cert_expiry', existing_type=sa.DateTime(timezone=True), nullable=False) batch_op.alter_column('cert_sans', existing_type=sa.String(), nullable=False) def downgrade(): with op.batch_alter_table('onion', schema=None) as batch_op: batch_op.drop_column('cert_sans') batch_op.drop_column('cert_expiry')