import json from datetime import datetime from alembic import op import sqlalchemy as sa from sqlalchemy import column, func, table from sqlalchemy.orm import Session revision = 'bbec86de37c4' down_revision = '278bcfb487d3' branch_labels = None depends_on = None def upgrade(): op.create_table('country', sa.Column('country_code', sa.String(length=2), nullable=False), sa.Column('risk_level_override', sa.Integer(), nullable=True), sa.Column('id', sa.Integer(), nullable=False), sa.Column('description', sa.String(length=255), nullable=False), sa.Column('added', sa.DateTime(), nullable=False), sa.Column('updated', sa.DateTime(), nullable=False), sa.Column('destroyed', sa.DateTime(), nullable=True), sa.PrimaryKeyConstraint('id', name=op.f('pk_country')) ) op.create_table('deprecation', sa.Column('id', sa.Integer(), nullable=False), sa.Column('resource_type', sa.String(length=50), nullable=True), sa.Column('resource_id', sa.Integer(), nullable=True), sa.Column('deprecated_at', sa.DateTime(), nullable=False), sa.Column('reason', sa.String(), nullable=False), sa.Column('meta', sa.JSON()), sa.PrimaryKeyConstraint('id', name=op.f('pk_deprecation')) ) op.create_table('country_origin', sa.Column('country_id', sa.Integer(), nullable=False), sa.Column('origin_id', sa.Integer(), nullable=False), sa.ForeignKeyConstraint(['country_id'], ['country.id'], name=op.f('fk_country_origin_country_id_country')), sa.ForeignKeyConstraint(['origin_id'], ['origin.id'], name=op.f('fk_country_origin_origin_id_origin')), sa.PrimaryKeyConstraint('country_id', 'origin_id', name=op.f('pk_country_origin')) ) with op.batch_alter_table('origin', schema=None) as batch_op: batch_op.add_column(sa.Column('risk_level_override', sa.Integer(), nullable=True)) countries = json.load(open("migrations/countries.json")) country_table = table( 'country', column('id', sa.Integer), column('country_code', sa.String), column('description', sa.String), column('risk_level_override', sa.Integer), column('added', sa.DateTime), column('updated', sa.DateTime), ) # Iterate through each country and insert it using the 'country_table' definition for country in countries: op.execute( country_table.insert().values( country_code=country['Code'], description=country['Name'], risk_level_override=None, # Assuming risk level override is initially None added=func.now(), updated=func.now() ) ) deprecation_table = table('deprecation', column('id', sa.Integer), column('resource_type', sa.String), column('resource_id', sa.Integer), column('deprecated_at', sa.DateTime), column('reason', sa.String) ) bind = op.get_bind() session = Session(bind=bind) resource_tables = ['proxy', 'bridge'] for table_name in resource_tables: # Query the existing deprecations results = session.execute( sa.select( column('id'), column('deprecated'), column('deprecation_reason') ).select_from(table(table_name)) ) # Iterate over each row and create a corresponding entry in the Deprecation table for id_, deprecated, reason in results: if deprecated is not None: # Only migrate if there's a deprecation date if isinstance(deprecated, str): deprecated = datetime.strptime(deprecated, "%Y-%m-%d %H:%M:%S.%f") op.execute( deprecation_table.insert().values( resource_type=table_name.title(), # The class name is used, not the table name resource_id=id_, deprecated_at=deprecated, reason=reason ) ) session.commit() def downgrade(): # ### commands auto generated by Alembic - please adjust! ### with op.batch_alter_table('origin', schema=None) as batch_op: batch_op.drop_column('risk_level_override') op.drop_table('country_origin') op.drop_table('deprecation') op.drop_table('country') # ### end Alembic commands ###