426 lines
12 KiB
Python
426 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from pathlib import Path
|
|
from typing import Any, cast
|
|
|
|
from repub.auth_headers import load_trusted_identity
|
|
from repub.web import create_app
|
|
|
|
|
|
def _trusted_headers(*, role: str, provider: str | None = None) -> dict[str, str]:
|
|
resolved_provider = provider or ("gp" if role == "admin" else "ocb")
|
|
return {
|
|
"X-Republisher-Auth-Role": role,
|
|
"X-Republisher-Auth-Provider": resolved_provider,
|
|
"X-Republisher-Auth-User": f"{role}-user",
|
|
"X-Republisher-Auth-Email": f"{role}@example.org",
|
|
"X-Republisher-Auth-Preferred-Username": f"{role}-user",
|
|
"X-Republisher-Auth-Groups": (
|
|
"/ocb-republisher-admins, /staff"
|
|
if role == "admin"
|
|
else "/ocb-republisher-publishers, /publishers"
|
|
),
|
|
}
|
|
|
|
|
|
def _configure_trusted_auth(monkeypatch, tmp_path: Path, name: str) -> None:
|
|
monkeypatch.setenv("REPUBLISHER_AUTH_MODE", "trusted-headers")
|
|
monkeypatch.setenv("REPUBLISHER_DB_PATH", str(tmp_path / f"{name}.db"))
|
|
|
|
|
|
def _assert_datastar_shell(body: str, *, static_prefix: str) -> None:
|
|
assert body.startswith("<!doctype html>")
|
|
assert 'id="js"' in body
|
|
assert f'src="{static_prefix}/static/datastar@1.0.0-RC.8.js"' in body
|
|
assert 'data-init="@post(window.location.pathname +' in body
|
|
assert '<main id="morph"' in body
|
|
assert "Connecting" in body
|
|
|
|
|
|
def test_load_trusted_identity_parses_groups_and_defaults_preferred_username() -> None:
|
|
identity = load_trusted_identity(
|
|
{
|
|
"X-Republisher-Auth-Role": "admin",
|
|
"X-Republisher-Auth-Provider": "gp",
|
|
"X-Republisher-Auth-User": "abel",
|
|
"X-Republisher-Auth-Email": "abel@example.org",
|
|
"X-Republisher-Auth-Groups": " /staff, ,/ocb-republisher-admins ",
|
|
}
|
|
)
|
|
|
|
assert identity is not None
|
|
assert identity.preferred_username == "abel"
|
|
assert identity.groups == ("/staff", "/ocb-republisher-admins")
|
|
|
|
|
|
def test_trusted_header_mode_rejects_admin_route_without_identity(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "missing-identity")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
response = await client.get("/admin")
|
|
|
|
assert response.status_code == 401
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_ignores_generic_forwarded_identity_headers(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "generic-forwarded")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
response = await client.get(
|
|
"/admin",
|
|
headers={
|
|
"X-Forwarded-User": "mallory",
|
|
"X-Forwarded-Email": "mallory@example.org",
|
|
"X-Forwarded-Groups": "/ocb-republisher-admins",
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_rejects_malformed_trusted_identity_headers(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "malformed-identity")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
response = await client.get(
|
|
"/admin",
|
|
headers={
|
|
"X-Republisher-Auth-Role": "admin",
|
|
"X-Republisher-Auth-Provider": "gp",
|
|
},
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_allows_admin_identity_on_admin_route(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "admin-allowed")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
response = await client.get("/admin", headers=_trusted_headers(role="admin"))
|
|
|
|
assert response.status_code == 200
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_rejects_publisher_identity_on_admin_route(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "publisher-admin-rejected")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
response = await client.get(
|
|
"/admin", headers=_trusted_headers(role="publisher")
|
|
)
|
|
|
|
assert response.status_code == 403
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_rejects_admin_action_without_identity(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "action-missing-identity")
|
|
|
|
async def run() -> None:
|
|
app = create_app()
|
|
app.config["REPUB_LOG_DIR"] = tmp_path / "logs"
|
|
client = app.test_client()
|
|
|
|
response = await client.post("/admin/actions/completed-executions/clear")
|
|
|
|
assert response.status_code == 401
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_rejects_publisher_identity_on_admin_action(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "action-publisher-rejected")
|
|
|
|
async def run() -> None:
|
|
app = create_app()
|
|
app.config["REPUB_LOG_DIR"] = tmp_path / "logs"
|
|
client = app.test_client()
|
|
|
|
response = await client.post(
|
|
"/admin/actions/completed-executions/clear",
|
|
headers=_trusted_headers(role="publisher"),
|
|
)
|
|
|
|
assert response.status_code == 403
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_allows_publisher_identity_on_publisher_route(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "publisher-allowed")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
response = await client.get(
|
|
"/publisher",
|
|
headers=_trusted_headers(role="publisher"),
|
|
)
|
|
body = await response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
_assert_datastar_shell(body, static_prefix="/publisher")
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_publisher_post_serves_publisher_dashboard_morph(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "publisher-post")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
async with client.request(
|
|
"/publisher?u=shim",
|
|
method="POST",
|
|
headers=_trusted_headers(role="publisher"),
|
|
) as connection:
|
|
await connection.send_complete()
|
|
chunk = await asyncio.wait_for(connection.receive(), timeout=1)
|
|
raw_connection = cast(Any, connection)
|
|
|
|
assert raw_connection.status_code == 200
|
|
assert raw_connection.headers["Content-Type"] == "text/event-stream"
|
|
assert b"event: datastar-patch-elements" in chunk
|
|
assert b'<main id="morph"' in chunk
|
|
assert b"Published feeds" in chunk
|
|
await connection.disconnect()
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_rejects_admin_identity_on_publisher_route(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "admin-publisher-rejected")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
response = await client.get(
|
|
"/publisher", headers=_trusted_headers(role="admin")
|
|
)
|
|
|
|
assert response.status_code == 403
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_allows_admin_identity_on_admin_publisher_alias(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "admin-alias-allowed")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
response = await client.get(
|
|
"/admin/publisher",
|
|
headers=_trusted_headers(role="admin"),
|
|
)
|
|
body = await response.get_data(as_text=True)
|
|
|
|
assert response.status_code == 200
|
|
_assert_datastar_shell(body, static_prefix="/admin")
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_admin_publisher_post_serves_publisher_dashboard_morph(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "admin-alias-post")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
async with client.request(
|
|
"/admin/publisher?u=shim",
|
|
method="POST",
|
|
headers=_trusted_headers(role="admin"),
|
|
) as connection:
|
|
await connection.send_complete()
|
|
chunk = await asyncio.wait_for(connection.receive(), timeout=1)
|
|
raw_connection = cast(Any, connection)
|
|
|
|
assert raw_connection.status_code == 200
|
|
assert raw_connection.headers["Content-Type"] == "text/event-stream"
|
|
assert b"event: datastar-patch-elements" in chunk
|
|
assert b'<main id="morph"' in chunk
|
|
assert b"Published feeds" in chunk
|
|
await connection.disconnect()
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_rejects_publisher_identity_on_admin_publisher_alias(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "publisher-alias-rejected")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
response = await client.get(
|
|
"/admin/publisher",
|
|
headers=_trusted_headers(role="publisher"),
|
|
)
|
|
|
|
assert response.status_code == 403
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_allows_publisher_identity_on_publisher_run_action(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "publisher-run-action")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
response = await client.post(
|
|
"/publisher/actions/jobs/999/run-now",
|
|
headers=_trusted_headers(role="publisher"),
|
|
)
|
|
|
|
assert response.status_code == 204
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_rejects_admin_identity_on_publisher_run_action(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "publisher-run-action-admin")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
response = await client.post(
|
|
"/publisher/actions/jobs/999/run-now",
|
|
headers=_trusted_headers(role="admin"),
|
|
)
|
|
|
|
assert response.status_code == 403
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_allows_admin_identity_on_admin_publisher_run_action(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "admin-publisher-run-action")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
response = await client.post(
|
|
"/admin/publisher/actions/jobs/999/run-now",
|
|
headers=_trusted_headers(role="admin"),
|
|
)
|
|
|
|
assert response.status_code == 204
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_rejects_publisher_identity_on_admin_publisher_run_action(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "admin-publisher-run-publisher")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
response = await client.post(
|
|
"/admin/publisher/actions/jobs/999/run-now",
|
|
headers=_trusted_headers(role="publisher"),
|
|
)
|
|
|
|
assert response.status_code == 403
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_keeps_section_static_assets_public(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "static-public")
|
|
|
|
async def run() -> None:
|
|
client = create_app().test_client()
|
|
|
|
for path in (
|
|
"/admin/static/datastar@1.0.0-RC.8.js",
|
|
"/publisher/static/datastar@1.0.0-RC.8.js",
|
|
):
|
|
response = await client.get(path)
|
|
|
|
assert response.status_code == 200
|
|
|
|
root_response = await client.get("/static/datastar@1.0.0-RC.8.js")
|
|
|
|
assert root_response.status_code == 404
|
|
|
|
asyncio.run(run())
|
|
|
|
|
|
def test_trusted_header_mode_keeps_dev_feeds_public(
|
|
monkeypatch, tmp_path: Path
|
|
) -> None:
|
|
_configure_trusted_auth(monkeypatch, tmp_path, "feeds-public")
|
|
|
|
async def run() -> None:
|
|
feeds_dir = tmp_path / "out" / "feeds"
|
|
feed_path = feeds_dir / "demo-source" / "feed.rss"
|
|
feed_path.parent.mkdir(parents=True)
|
|
feed_path.write_text("<rss/>\n", encoding="utf-8")
|
|
app = create_app(dev_mode=True)
|
|
app.config["REPUB_FEEDS_DIR"] = feeds_dir
|
|
client = app.test_client()
|
|
|
|
response = await client.get("/feeds/demo-source/feed.rss")
|
|
|
|
assert response.status_code == 200
|
|
|
|
asyncio.run(run())
|