from __future__ import annotations import threading import time from pathlib import Path import pytest from peewee import InterfaceError from repub.db import get_database_connection from repub.model import AppSetting, database, initialize_database def test_queries_require_managed_database_context(tmp_path: Path) -> None: initialize_database(tmp_path / "managed-context.db") with pytest.raises( RuntimeError, match="database.reader\\(\\)|database.writer\\(\\)" ): AppSetting.select().count() def test_writer_and_reader_contexts_allow_persisted_queries(tmp_path: Path) -> None: initialize_database(tmp_path / "reader-writer.db") with database.writer(): AppSetting.create(key="feed_url", value='"https://mirror.example"') with database.reader(): setting = AppSetting.get(AppSetting.key == "feed_url") assert setting.value == '"https://mirror.example"' def test_managed_connections_disable_peewee_autoconnect(tmp_path: Path) -> None: initialize_database(tmp_path / "autoconnect-disabled.db") connection = get_database_connection() assert connection is not None assert connection.writer_db.autoconnect is False assert all(reader_db.autoconnect is False for reader_db in connection.reader_dbs) reader_db = connection.reader_dbs[0] reader_db.close() with pytest.raises(InterfaceError, match="database connection not opened"): reader_db.execute_sql("SELECT 1") def test_database_connection_initializes_four_readers_and_one_writer( tmp_path: Path, ) -> None: initialize_database(tmp_path / "pool-shape.db") connection = get_database_connection() assert connection is not None assert connection.pool_size == 4 assert len(connection.reader_dbs) == 4 assert connection._reader_pool.qsize() == 4 assert connection.writer_db is not None def test_reader_lease_is_returned_to_the_pool_after_use(tmp_path: Path) -> None: initialize_database(tmp_path / "reader-lease.db") connection = get_database_connection() assert connection is not None initial_size = connection._reader_pool.qsize() with database.reader(): assert connection._reader_pool.qsize() == initial_size - 1 assert connection._reader_pool.qsize() == initial_size def test_writer_contexts_serialize_through_the_single_writer(tmp_path: Path) -> None: initialize_database(tmp_path / "single-writer.db") events: list[str] = [] entered_first_writer = threading.Event() allow_first_writer_to_exit = threading.Event() def first_writer() -> None: with database.writer(): events.append("first-entered") entered_first_writer.set() allow_first_writer_to_exit.wait(timeout=1) events.append("first-exiting") def second_writer() -> None: entered_first_writer.wait(timeout=1) with database.writer(): events.append("second-entered") first_thread = threading.Thread(target=first_writer) second_thread = threading.Thread(target=second_writer) first_thread.start() second_thread.start() assert entered_first_writer.wait(timeout=1) is True time.sleep(0.05) assert events == ["first-entered"] allow_first_writer_to_exit.set() first_thread.join(timeout=1) second_thread.join(timeout=1) assert events == ["first-entered", "first-exiting", "second-entered"]