republisher/tests/test_db.py

112 lines
3.4 KiB
Python
Raw Normal View History

from __future__ import annotations
import threading
import time
from pathlib import Path
import pytest
from peewee import InterfaceError
from repub.db import get_database_connection
from repub.model import AppSetting, database, initialize_database
def test_queries_require_managed_database_context(tmp_path: Path) -> None:
initialize_database(tmp_path / "managed-context.db")
with pytest.raises(
RuntimeError, match="database.reader\\(\\)|database.writer\\(\\)"
):
AppSetting.select().count()
def test_writer_and_reader_contexts_allow_persisted_queries(tmp_path: Path) -> None:
initialize_database(tmp_path / "reader-writer.db")
with database.writer():
AppSetting.create(key="feed_url", value='"https://mirror.example"')
with database.reader():
setting = AppSetting.get(AppSetting.key == "feed_url")
assert setting.value == '"https://mirror.example"'
def test_managed_connections_disable_peewee_autoconnect(tmp_path: Path) -> None:
initialize_database(tmp_path / "autoconnect-disabled.db")
connection = get_database_connection()
assert connection is not None
assert connection.writer_db.autoconnect is False
assert all(reader_db.autoconnect is False for reader_db in connection.reader_dbs)
reader_db = connection.reader_dbs[0]
reader_db.close()
with pytest.raises(InterfaceError, match="database connection not opened"):
reader_db.execute_sql("SELECT 1")
def test_database_connection_initializes_four_readers_and_one_writer(
tmp_path: Path,
) -> None:
initialize_database(tmp_path / "pool-shape.db")
connection = get_database_connection()
assert connection is not None
assert connection.pool_size == 4
assert len(connection.reader_dbs) == 4
assert connection._reader_pool.qsize() == 4
assert connection.writer_db is not None
def test_reader_lease_is_returned_to_the_pool_after_use(tmp_path: Path) -> None:
initialize_database(tmp_path / "reader-lease.db")
connection = get_database_connection()
assert connection is not None
initial_size = connection._reader_pool.qsize()
with database.reader():
assert connection._reader_pool.qsize() == initial_size - 1
assert connection._reader_pool.qsize() == initial_size
def test_writer_contexts_serialize_through_the_single_writer(tmp_path: Path) -> None:
initialize_database(tmp_path / "single-writer.db")
events: list[str] = []
entered_first_writer = threading.Event()
allow_first_writer_to_exit = threading.Event()
def first_writer() -> None:
with database.writer():
events.append("first-entered")
entered_first_writer.set()
allow_first_writer_to_exit.wait(timeout=1)
events.append("first-exiting")
def second_writer() -> None:
entered_first_writer.wait(timeout=1)
with database.writer():
events.append("second-entered")
first_thread = threading.Thread(target=first_writer)
second_thread = threading.Thread(target=second_writer)
first_thread.start()
second_thread.start()
assert entered_first_writer.wait(timeout=1) is True
time.sleep(0.05)
assert events == ["first-entered"]
allow_first_writer_to_exit.set()
first_thread.join(timeout=1)
second_thread.join(timeout=1)
assert events == ["first-entered", "first-exiting", "second-entered"]