republisher/tests/test_header_auth.py

345 lines
10 KiB
Python
Raw Normal View History

from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any, cast
from repub.auth_headers import load_trusted_identity
from repub.web import create_app
def _trusted_headers(*, role: str, provider: str | None = None) -> dict[str, str]:
resolved_provider = provider or ("gp" if role == "admin" else "ocb")
return {
"X-Republisher-Auth-Role": role,
"X-Republisher-Auth-Provider": resolved_provider,
"X-Republisher-Auth-User": f"{role}-user",
"X-Republisher-Auth-Email": f"{role}@example.org",
"X-Republisher-Auth-Preferred-Username": f"{role}-user",
"X-Republisher-Auth-Groups": (
"/ocb-republisher-admins, /staff"
if role == "admin"
else "/ocb-republisher-publishers, /publishers"
),
}
def _configure_trusted_auth(monkeypatch, tmp_path: Path, name: str) -> None:
monkeypatch.setenv("REPUBLISHER_AUTH_MODE", "trusted-headers")
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(tmp_path / f"{name}.db"))
def _assert_datastar_shell(body: str) -> None:
assert body.startswith("<!doctype html>")
assert 'id="js"' in body
assert 'src="/static/datastar@1.0.0-RC.8.js"' in body
assert 'data-init="@post(window.location.pathname +' in body
assert '<main id="morph"' in body
assert "Connecting" in body
def test_load_trusted_identity_parses_groups_and_defaults_preferred_username() -> None:
identity = load_trusted_identity(
{
"X-Republisher-Auth-Role": "admin",
"X-Republisher-Auth-Provider": "gp",
"X-Republisher-Auth-User": "abel",
"X-Republisher-Auth-Email": "abel@example.org",
"X-Republisher-Auth-Groups": " /staff, ,/ocb-republisher-admins ",
}
)
assert identity is not None
assert identity.preferred_username == "abel"
assert identity.groups == ("/staff", "/ocb-republisher-admins")
def test_trusted_header_mode_rejects_admin_route_without_identity(
monkeypatch, tmp_path: Path
) -> None:
_configure_trusted_auth(monkeypatch, tmp_path, "missing-identity")
async def run() -> None:
client = create_app().test_client()
response = await client.get("/")
assert response.status_code == 401
asyncio.run(run())
def test_trusted_header_mode_ignores_generic_forwarded_identity_headers(
monkeypatch, tmp_path: Path
) -> None:
_configure_trusted_auth(monkeypatch, tmp_path, "generic-forwarded")
async def run() -> None:
client = create_app().test_client()
response = await client.get(
"/",
headers={
"X-Forwarded-User": "mallory",
"X-Forwarded-Email": "mallory@example.org",
"X-Forwarded-Groups": "/ocb-republisher-admins",
},
)
assert response.status_code == 401
asyncio.run(run())
def test_trusted_header_mode_rejects_malformed_trusted_identity_headers(
monkeypatch, tmp_path: Path
) -> None:
_configure_trusted_auth(monkeypatch, tmp_path, "malformed-identity")
async def run() -> None:
client = create_app().test_client()
response = await client.get(
"/",
headers={
"X-Republisher-Auth-Role": "admin",
"X-Republisher-Auth-Provider": "gp",
},
)
assert response.status_code == 401
asyncio.run(run())
def test_trusted_header_mode_allows_admin_identity_on_admin_route(
monkeypatch, tmp_path: Path
) -> None:
_configure_trusted_auth(monkeypatch, tmp_path, "admin-allowed")
async def run() -> None:
client = create_app().test_client()
response = await client.get("/", headers=_trusted_headers(role="admin"))
assert response.status_code == 200
asyncio.run(run())
def test_trusted_header_mode_rejects_publisher_identity_on_admin_route(
monkeypatch, tmp_path: Path
) -> None:
_configure_trusted_auth(monkeypatch, tmp_path, "publisher-admin-rejected")
async def run() -> None:
client = create_app().test_client()
response = await client.get("/", headers=_trusted_headers(role="publisher"))
assert response.status_code == 403
asyncio.run(run())
def test_trusted_header_mode_rejects_admin_action_without_identity(
monkeypatch, tmp_path: Path
) -> None:
_configure_trusted_auth(monkeypatch, tmp_path, "action-missing-identity")
async def run() -> None:
app = create_app()
app.config["REPUB_LOG_DIR"] = tmp_path / "logs"
client = app.test_client()
response = await client.post("/actions/completed-executions/clear")
assert response.status_code == 401
asyncio.run(run())
def test_trusted_header_mode_rejects_publisher_identity_on_admin_action(
monkeypatch, tmp_path: Path
) -> None:
_configure_trusted_auth(monkeypatch, tmp_path, "action-publisher-rejected")
async def run() -> None:
app = create_app()
app.config["REPUB_LOG_DIR"] = tmp_path / "logs"
client = app.test_client()
response = await client.post(
"/actions/completed-executions/clear",
headers=_trusted_headers(role="publisher"),
)
assert response.status_code == 403
asyncio.run(run())
def test_trusted_header_mode_allows_publisher_identity_on_publisher_route(
monkeypatch, tmp_path: Path
) -> None:
_configure_trusted_auth(monkeypatch, tmp_path, "publisher-allowed")
async def run() -> None:
client = create_app().test_client()
response = await client.get(
"/publisher",
headers=_trusted_headers(role="publisher"),
)
body = await response.get_data(as_text=True)
assert response.status_code == 200
_assert_datastar_shell(body)
asyncio.run(run())
def test_trusted_header_mode_publisher_post_serves_hello_publishers_morph(
monkeypatch, tmp_path: Path
) -> None:
_configure_trusted_auth(monkeypatch, tmp_path, "publisher-post")
async def run() -> None:
client = create_app().test_client()
async with client.request(
"/publisher?u=shim",
method="POST",
headers=_trusted_headers(role="publisher"),
) as connection:
await connection.send_complete()
chunk = await asyncio.wait_for(connection.receive(), timeout=1)
raw_connection = cast(Any, connection)
assert raw_connection.status_code == 200
assert raw_connection.headers["Content-Type"] == "text/event-stream"
assert b"event: datastar-patch-elements" in chunk
assert b'<main id="morph"' in chunk
assert b"Hello publishers" in chunk
await connection.disconnect()
asyncio.run(run())
def test_trusted_header_mode_rejects_admin_identity_on_publisher_route(
monkeypatch, tmp_path: Path
) -> None:
_configure_trusted_auth(monkeypatch, tmp_path, "admin-publisher-rejected")
async def run() -> None:
client = create_app().test_client()
response = await client.get(
"/publisher", headers=_trusted_headers(role="admin")
)
assert response.status_code == 403
asyncio.run(run())
def test_trusted_header_mode_allows_admin_identity_on_admin_publisher_alias(
monkeypatch, tmp_path: Path
) -> None:
_configure_trusted_auth(monkeypatch, tmp_path, "admin-alias-allowed")
async def run() -> None:
client = create_app().test_client()
response = await client.get(
"/admin/publisher",
headers=_trusted_headers(role="admin"),
)
body = await response.get_data(as_text=True)
assert response.status_code == 200
_assert_datastar_shell(body)
asyncio.run(run())
def test_trusted_header_mode_admin_publisher_post_serves_hello_publishers_morph(
monkeypatch, tmp_path: Path
) -> None:
_configure_trusted_auth(monkeypatch, tmp_path, "admin-alias-post")
async def run() -> None:
client = create_app().test_client()
async with client.request(
"/admin/publisher?u=shim",
method="POST",
headers=_trusted_headers(role="admin"),
) as connection:
await connection.send_complete()
chunk = await asyncio.wait_for(connection.receive(), timeout=1)
raw_connection = cast(Any, connection)
assert raw_connection.status_code == 200
assert raw_connection.headers["Content-Type"] == "text/event-stream"
assert b"event: datastar-patch-elements" in chunk
assert b'<main id="morph"' in chunk
assert b"Hello publishers" in chunk
await connection.disconnect()
asyncio.run(run())
def test_trusted_header_mode_rejects_publisher_identity_on_admin_publisher_alias(
monkeypatch, tmp_path: Path
) -> None:
_configure_trusted_auth(monkeypatch, tmp_path, "publisher-alias-rejected")
async def run() -> None:
client = create_app().test_client()
response = await client.get(
"/admin/publisher",
headers=_trusted_headers(role="publisher"),
)
assert response.status_code == 403
asyncio.run(run())
def test_trusted_header_mode_keeps_static_assets_public(
monkeypatch, tmp_path: Path
) -> None:
_configure_trusted_auth(monkeypatch, tmp_path, "static-public")
async def run() -> None:
client = create_app().test_client()
response = await client.get("/static/datastar@1.0.0-RC.8.js")
assert response.status_code == 200
asyncio.run(run())
def test_trusted_header_mode_keeps_dev_feeds_public(
monkeypatch, tmp_path: Path
) -> None:
_configure_trusted_auth(monkeypatch, tmp_path, "feeds-public")
async def run() -> None:
feeds_dir = tmp_path / "out" / "feeds"
feed_path = feeds_dir / "demo-source" / "feed.rss"
feed_path.parent.mkdir(parents=True)
feed_path.write_text("<rss/>\n", encoding="utf-8")
app = create_app(dev_mode=True)
app.config["REPUB_FEEDS_DIR"] = feeds_dir
client = app.test_client()
response = await client.get("/feeds/demo-source/feed.rss")
assert response.status_code == 200
asyncio.run(run())