Move all the work to the worker task and add json logging

This commit is contained in:
Abel Luck 2023-11-05 20:53:43 +01:00
parent 918cf4446c
commit 087087bf87
3 changed files with 60 additions and 29 deletions

14
poetry.lock generated
View file

@ -635,6 +635,18 @@ pipfile-deprecated-finder = ["pip-shims (>=0.5.2)", "pipreqs", "requirementslib"
plugins = ["setuptools"] plugins = ["setuptools"]
requirements-deprecated-finder = ["pip-api", "pipreqs"] requirements-deprecated-finder = ["pip-api", "pipreqs"]
[[package]]
name = "json-logging"
version = "1.3.0"
description = "JSON Python Logging"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "json-logging-1.3.0.tar.gz", hash = "sha256:60a02a1daa168a08aa0a41eeeda63e92500ab08170491bdd326cf00d17f656f8"},
{file = "json_logging-1.3.0-py2.py3-none-any.whl", hash = "sha256:5def627a18b9e61690d58016ee5312681b407e3106c80a4be7c787abb8a21355"},
]
[[package]] [[package]]
name = "markdown-it-py" name = "markdown-it-py"
version = "3.0.0" version = "3.0.0"
@ -1500,4 +1512,4 @@ multidict = ">=4.0"
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.11" python-versions = "^3.11"
content-hash = "c44028be6f047970f91425a5c04f65b66116c9a5ad2f8409e2d19d19612f1752" content-hash = "c48151b734ebe301d09fb06d15888e2b92fd11b80e710e36aa2552082c1637ad"

View file

@ -22,6 +22,7 @@ fastapi = "^0.104.1"
uvicorn = "^0.24.0" uvicorn = "^0.24.0"
httpx = "^0.25.1" httpx = "^0.25.1"
pydantic-settings = "^2.0.3" pydantic-settings = "^2.0.3"
json-logging = "^1.3.0"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]
pytest = "*" pytest = "*"

View file

@ -1,28 +1,23 @@
import asyncio import asyncio
import logging import logging
import os import os
import sys
from ipaddress import ip_address from ipaddress import ip_address
from typing import Dict, List from typing import Dict, List
import httpx import httpx
import uvicorn import uvicorn
import json_logging
from fastapi import FastAPI from fastapi import FastAPI
from pydantic import SecretStr, Field from pydantic import Field, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
env_path = os.getenv("TAILSCALESD_ENV_FILE") env_path = os.getenv("TAILSCALESD_ENV_FILE")
debug = os.getenv("TAILSCALESD_DEBUG", False) debug = os.getenv("TAILSCALESD_DEBUG", False)
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter(style="{", fmt="[{name}:{filename}] {levelname} - {message}")
)
log = logging.getLogger("tailscalesd") log = logging.getLogger("tailscalesd")
log.setLevel(logging.DEBUG if debug else logging.INFO) log.setLevel(logging.DEBUG if debug else logging.INFO)
log.addHandler(handler) log.addHandler(logging.StreamHandler(sys.stdout))
CACHE_DEVICES = []
def filter_ipv6(addresses): def filter_ipv6(addresses):
@ -43,25 +38,27 @@ class Settings(BaseSettings):
settings = Settings() # type: ignore[call-arg] settings = Settings() # type: ignore[call-arg]
app = FastAPI() app = FastAPI()
json_logging.init_fastapi(enable_json=True)
json_logging.init_request_instrument(app)
async def tailscale_poll(): CACHE_SD = []
global CACHE_DEVICES
log.debug("Starting polling")
while True: async def tailscale_devices() -> List:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
try:
# https://github.com/tailscale/tailscale/blob/main/api.md#tailnet-devices-get # https://github.com/tailscale/tailscale/blob/main/api.md#tailnet-devices-get
r = await client.get( r = await client.get(
f"https://api.tailscale.com/api/v2/tailnet/{settings.tailnet}/devices", f"https://api.tailscale.com/api/v2/tailnet/{settings.tailnet}/devices",
auth=(settings.api_key.get_secret_value(), ""), auth=(settings.api_key.get_secret_value(), ""),
) )
CACHE_DEVICES = r.json()["devices"] return r.json()["devices"]
except Exception as e:
await asyncio.sleep(settings.interval) log.error(
f"Polling tailscale devices failed!",
exc_info=e,
@app.on_event("startup") )
async def start_tailscale_poll(): return []
asyncio.create_task(tailscale_poll())
def group_by_type(input_list): def group_by_type(input_list):
@ -130,8 +127,10 @@ async def matrix_sd(devices) -> List:
try: try:
workers = await matrix_node_sd(device) workers = await matrix_node_sd(device)
except Exception as e: except Exception as e:
log.error(f"Failed parsing matrix node sd for device={device['hostname']}") log.error(
log.error(e) f"Failed parsing matrix node sd for device={device['hostname']}",
exc_info=e,
)
workers = [] workers = []
targets = matrix_workers_to_sd(device, workers) targets = matrix_workers_to_sd(device, workers)
if targets: if targets:
@ -151,11 +150,30 @@ def plain_devices_sd(devices) -> List:
return sd return sd
async def poll_sd():
global CACHE_SD
while True:
try:
devices = await tailscale_devices()
device_targets = plain_devices_sd(devices)
matrix_targets = await matrix_sd(devices)
CACHE_SD = matrix_targets + device_targets
await asyncio.sleep(settings.interval)
except Exception as e:
log.error(
f"service discovery poller failed",
exc_info=e,
)
@app.on_event("startup")
async def start_sd():
asyncio.create_task(poll_sd())
@app.get("/") @app.get("/")
async def sd(): async def sd():
device_targets = plain_devices_sd(CACHE_DEVICES) return CACHE_SD
matrix_targets = await matrix_sd(CACHE_DEVICES)
return matrix_targets + device_targets
def main(): def main():