from __future__ import annotations import asyncio from pathlib import Path from typing import Any, cast from repub.auth_headers import load_trusted_identity from repub.web import create_app def _trusted_headers(*, role: str, provider: str | None = None) -> dict[str, str]: resolved_provider = provider or ("gp" if role == "admin" else "ocb") return { "X-Republisher-Auth-Role": role, "X-Republisher-Auth-Provider": resolved_provider, "X-Republisher-Auth-User": f"{role}-user", "X-Republisher-Auth-Email": f"{role}@example.org", "X-Republisher-Auth-Preferred-Username": f"{role}-user", "X-Republisher-Auth-Groups": ( "/ocb-republisher-admins, /staff" if role == "admin" else "/ocb-republisher-publishers, /publishers" ), } def _configure_trusted_auth(monkeypatch, tmp_path: Path, name: str) -> None: monkeypatch.setenv("REPUBLISHER_AUTH_MODE", "trusted-headers") monkeypatch.setenv("REPUBLISHER_DB_PATH", str(tmp_path / f"{name}.db")) def _assert_datastar_shell(body: str, *, static_prefix: str) -> None: assert body.startswith("") assert 'id="js"' in body assert f'src="{static_prefix}/static/datastar@1.0.0-RC.8.js"' in body assert 'data-init="@post(window.location.pathname +' in body assert '
None: identity = load_trusted_identity( { "X-Republisher-Auth-Role": "admin", "X-Republisher-Auth-Provider": "gp", "X-Republisher-Auth-User": "abel", "X-Republisher-Auth-Email": "abel@example.org", "X-Republisher-Auth-Groups": " /staff, ,/ocb-republisher-admins ", } ) assert identity is not None assert identity.preferred_username == "abel" assert identity.groups == ("/staff", "/ocb-republisher-admins") def test_trusted_header_mode_rejects_admin_route_without_identity( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "missing-identity") async def run() -> None: client = create_app().test_client() response = await client.get("/admin") assert response.status_code == 401 asyncio.run(run()) def test_trusted_header_mode_ignores_generic_forwarded_identity_headers( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "generic-forwarded") async def run() -> None: client = create_app().test_client() response = await client.get( "/admin", headers={ "X-Forwarded-User": "mallory", "X-Forwarded-Email": "mallory@example.org", "X-Forwarded-Groups": "/ocb-republisher-admins", }, ) assert response.status_code == 401 asyncio.run(run()) def test_trusted_header_mode_rejects_malformed_trusted_identity_headers( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "malformed-identity") async def run() -> None: client = create_app().test_client() response = await client.get( "/admin", headers={ "X-Republisher-Auth-Role": "admin", "X-Republisher-Auth-Provider": "gp", }, ) assert response.status_code == 401 asyncio.run(run()) def test_trusted_header_mode_allows_admin_identity_on_admin_route( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "admin-allowed") async def run() -> None: client = create_app().test_client() response = await client.get("/admin", headers=_trusted_headers(role="admin")) assert response.status_code == 200 asyncio.run(run()) def test_trusted_header_mode_rejects_publisher_identity_on_admin_route( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "publisher-admin-rejected") async def run() -> None: client = create_app().test_client() response = await client.get( "/admin", headers=_trusted_headers(role="publisher") ) assert response.status_code == 403 asyncio.run(run()) def test_trusted_header_mode_rejects_admin_action_without_identity( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "action-missing-identity") async def run() -> None: app = create_app() app.config["REPUB_LOG_DIR"] = tmp_path / "logs" client = app.test_client() response = await client.post("/admin/actions/completed-executions/clear") assert response.status_code == 401 asyncio.run(run()) def test_trusted_header_mode_rejects_publisher_identity_on_admin_action( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "action-publisher-rejected") async def run() -> None: app = create_app() app.config["REPUB_LOG_DIR"] = tmp_path / "logs" client = app.test_client() response = await client.post( "/admin/actions/completed-executions/clear", headers=_trusted_headers(role="publisher"), ) assert response.status_code == 403 asyncio.run(run()) def test_trusted_header_mode_allows_publisher_identity_on_publisher_route( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "publisher-allowed") async def run() -> None: client = create_app().test_client() response = await client.get( "/publisher", headers=_trusted_headers(role="publisher"), ) body = await response.get_data(as_text=True) assert response.status_code == 200 _assert_datastar_shell(body, static_prefix="/publisher") asyncio.run(run()) def test_trusted_header_mode_publisher_post_serves_publisher_dashboard_morph( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "publisher-post") async def run() -> None: client = create_app().test_client() async with client.request( "/publisher?u=shim", method="POST", headers=_trusted_headers(role="publisher"), ) as connection: await connection.send_complete() chunk = await asyncio.wait_for(connection.receive(), timeout=1) raw_connection = cast(Any, connection) assert raw_connection.status_code == 200 assert raw_connection.headers["Content-Type"] == "text/event-stream" assert b"event: datastar-patch-elements" in chunk assert b'
None: _configure_trusted_auth(monkeypatch, tmp_path, "admin-publisher-rejected") async def run() -> None: client = create_app().test_client() response = await client.get( "/publisher", headers=_trusted_headers(role="admin") ) assert response.status_code == 403 asyncio.run(run()) def test_trusted_header_mode_allows_admin_identity_on_admin_publisher_alias( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "admin-alias-allowed") async def run() -> None: client = create_app().test_client() response = await client.get( "/admin/publisher", headers=_trusted_headers(role="admin"), ) body = await response.get_data(as_text=True) assert response.status_code == 200 _assert_datastar_shell(body, static_prefix="/admin") asyncio.run(run()) def test_trusted_header_mode_admin_publisher_post_serves_publisher_dashboard_morph( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "admin-alias-post") async def run() -> None: client = create_app().test_client() async with client.request( "/admin/publisher?u=shim", method="POST", headers=_trusted_headers(role="admin"), ) as connection: await connection.send_complete() chunk = await asyncio.wait_for(connection.receive(), timeout=1) raw_connection = cast(Any, connection) assert raw_connection.status_code == 200 assert raw_connection.headers["Content-Type"] == "text/event-stream" assert b"event: datastar-patch-elements" in chunk assert b'
None: _configure_trusted_auth(monkeypatch, tmp_path, "publisher-alias-rejected") async def run() -> None: client = create_app().test_client() response = await client.get( "/admin/publisher", headers=_trusted_headers(role="publisher"), ) assert response.status_code == 403 asyncio.run(run()) def test_trusted_header_mode_allows_publisher_identity_on_publisher_run_action( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "publisher-run-action") async def run() -> None: client = create_app().test_client() response = await client.post( "/publisher/actions/jobs/999/run-now", headers=_trusted_headers(role="publisher"), ) assert response.status_code == 204 asyncio.run(run()) def test_trusted_header_mode_rejects_admin_identity_on_publisher_run_action( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "publisher-run-action-admin") async def run() -> None: client = create_app().test_client() response = await client.post( "/publisher/actions/jobs/999/run-now", headers=_trusted_headers(role="admin"), ) assert response.status_code == 403 asyncio.run(run()) def test_trusted_header_mode_allows_admin_identity_on_admin_publisher_run_action( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "admin-publisher-run-action") async def run() -> None: client = create_app().test_client() response = await client.post( "/admin/publisher/actions/jobs/999/run-now", headers=_trusted_headers(role="admin"), ) assert response.status_code == 204 asyncio.run(run()) def test_trusted_header_mode_rejects_publisher_identity_on_admin_publisher_run_action( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "admin-publisher-run-publisher") async def run() -> None: client = create_app().test_client() response = await client.post( "/admin/publisher/actions/jobs/999/run-now", headers=_trusted_headers(role="publisher"), ) assert response.status_code == 403 asyncio.run(run()) def test_trusted_header_mode_keeps_section_static_assets_public( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "static-public") async def run() -> None: client = create_app().test_client() for path in ( "/admin/static/datastar@1.0.0-RC.8.js", "/publisher/static/datastar@1.0.0-RC.8.js", ): response = await client.get(path) assert response.status_code == 200 root_response = await client.get("/static/datastar@1.0.0-RC.8.js") assert root_response.status_code == 404 asyncio.run(run()) def test_trusted_header_mode_keeps_dev_feeds_public( monkeypatch, tmp_path: Path ) -> None: _configure_trusted_auth(monkeypatch, tmp_path, "feeds-public") async def run() -> None: feeds_dir = tmp_path / "out" / "feeds" feed_path = feeds_dir / "demo-source" / "feed.rss" feed_path.parent.mkdir(parents=True) feed_path.write_text("\n", encoding="utf-8") app = create_app(dev_mode=True) app.config["REPUB_FEEDS_DIR"] = feeds_dir client = app.test_client() response = await client.get("/feeds/demo-source/feed.rss") assert response.status_code == 200 asyncio.run(run())