Move app settings schema into SQL migration

This commit is contained in:
Abel Luck 2026-03-30 18:47:36 +02:00
parent 2092f66dcd
commit 1d5126c2f8
4 changed files with 74 additions and 28 deletions

View file

@ -103,6 +103,8 @@ uv run repub crawl -c repub.toml
- Keep `treefmt.nix`, `flake.nix`, and `pyproject.toml` aligned. - Keep `treefmt.nix`, `flake.nix`, and `pyproject.toml` aligned.
- Prefer updating the flake-exported package and checks rather than adding ad hoc scripts. - Prefer updating the flake-exported package and checks rather than adding ad hoc scripts.
- Put new SQLite schema objects in numbered files under `repub/sql/` such as `002_*.sql`.
- For backward-compatible column additions on existing SQLite databases, use Peewee's `playhouse.migrate` helpers instead of raw ad hoc `ALTER TABLE` logic.
- Do not commit, amend, or stage unrelated files unless explicitly asked. - Do not commit, amend, or stage unrelated files unless explicitly asked.
- Final verication `nix flake check` must be greenbefore claiming task completeness - Final verication `nix flake check` must be greenbefore claiming task completeness

View file

@ -19,6 +19,7 @@ from peewee import (
SqliteDatabase, SqliteDatabase,
TextField, TextField,
) )
from playhouse.migrate import SchemaMigrator, migrate
DEFAULT_DB_PATH = Path("republisher.db") DEFAULT_DB_PATH = Path("republisher.db")
DATABASE_PRAGMAS = { DATABASE_PRAGMAS = {
@ -79,45 +80,50 @@ def initialize_database(db_path: str | Path | None = None) -> Path:
database.init(str(resolved_path), pragmas=DATABASE_PRAGMAS) database.init(str(resolved_path), pragmas=DATABASE_PRAGMAS)
database.connect(reuse_if_open=True) database.connect(reuse_if_open=True)
try: try:
connection = database.connection()
for path in schema_paths(): for path in schema_paths():
connection.executescript(path.read_text(encoding="utf-8")) database.connection().executescript(path.read_text(encoding="utf-8"))
_ensure_schema(connection) _run_legacy_migrations()
finally: finally:
database.close() database.close()
return resolved_path return resolved_path
def _ensure_schema(connection: Any) -> None: def _run_legacy_migrations() -> None:
connection.execute( job_columns = {column.name for column in database.get_columns("job")}
""" operations = []
CREATE TABLE IF NOT EXISTS app_setting ( migrator = SchemaMigrator.from_database(database)
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
"""
)
job_columns = {
row[1] for row in connection.execute("PRAGMA table_info('job')").fetchall()
}
if "convert_images" not in job_columns: if "convert_images" not in job_columns:
connection.execute( operations.extend(
""" (
ALTER TABLE job migrator.add_column(
ADD COLUMN convert_images INTEGER NOT NULL DEFAULT 1 "job",
CHECK (convert_images IN (0, 1)) "convert_images",
""" BooleanField(
default=True,
constraints=[Check("convert_images IN (0, 1)")],
),
),
migrator.add_column_default("job", "convert_images", 1),
)
) )
if "convert_video" not in job_columns: if "convert_video" not in job_columns:
connection.execute( operations.extend(
""" (
ALTER TABLE job migrator.add_column(
ADD COLUMN convert_video INTEGER NOT NULL DEFAULT 1 "job",
CHECK (convert_video IN (0, 1)) "convert_video",
""" BooleanField(
default=True,
constraints=[Check("convert_video IN (0, 1)")],
),
),
migrator.add_column_default("job", "convert_video", 1),
) )
)
if operations:
with database.atomic():
migrate(*operations)
def source_slug_exists(slug: str) -> bool: def source_slug_exists(slug: str) -> bool:

View file

@ -0,0 +1,4 @@
CREATE TABLE IF NOT EXISTS app_setting (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);

View file

@ -15,6 +15,7 @@ from repub.model import (
load_max_concurrent_jobs, load_max_concurrent_jobs,
resolve_database_path, resolve_database_path,
save_setting, save_setting,
schema_paths,
) )
@ -71,6 +72,39 @@ def test_initialize_database_bootstraps_schema_from_sql_files(tmp_path: Path) ->
connection.close() connection.close()
def test_initialize_database_applies_newer_sql_files_to_existing_databases(
tmp_path: Path,
) -> None:
db_path = tmp_path / "existing.db"
connection = sqlite3.connect(db_path)
try:
connection.executescript(schema_paths()[0].read_text(encoding="utf-8"))
finally:
connection.close()
initialize_database(db_path)
connection = sqlite3.connect(db_path)
try:
table_names = {
row[0]
for row in connection.execute(
"""
SELECT name
FROM sqlite_master
WHERE type = 'table' AND name NOT LIKE 'sqlite_%'
"""
)
}
assert "app_setting" in table_names
job_columns = {row[1] for row in connection.execute("PRAGMA table_info('job')")}
assert "convert_images" in job_columns
assert "convert_video" in job_columns
finally:
connection.close()
def test_initialize_database_configures_sqlite_pragmas(tmp_path: Path) -> None: def test_initialize_database_configures_sqlite_pragmas(tmp_path: Path) -> None:
db_path = tmp_path / "pragmas.db" db_path = tmp_path / "pragmas.db"