From 087087bf87166bf16284e0db13e64f4a0595cd02 Mon Sep 17 00:00:00 2001 From: Abel Luck Date: Sun, 5 Nov 2023 20:53:43 +0100 Subject: [PATCH] Move all the work to the worker task and add json logging --- poetry.lock | 14 ++++++++- pyproject.toml | 1 + tailscalesd/main.py | 74 ++++++++++++++++++++++++++++----------------- 3 files changed, 60 insertions(+), 29 deletions(-) diff --git a/poetry.lock b/poetry.lock index dfeca4f..9aac367 100644 --- a/poetry.lock +++ b/poetry.lock @@ -635,6 +635,18 @@ pipfile-deprecated-finder = ["pip-shims (>=0.5.2)", "pipreqs", "requirementslib" plugins = ["setuptools"] requirements-deprecated-finder = ["pip-api", "pipreqs"] +[[package]] +name = "json-logging" +version = "1.3.0" +description = "JSON Python Logging" +category = "main" +optional = false +python-versions = "*" +files = [ + {file = "json-logging-1.3.0.tar.gz", hash = "sha256:60a02a1daa168a08aa0a41eeeda63e92500ab08170491bdd326cf00d17f656f8"}, + {file = "json_logging-1.3.0-py2.py3-none-any.whl", hash = "sha256:5def627a18b9e61690d58016ee5312681b407e3106c80a4be7c787abb8a21355"}, +] + [[package]] name = "markdown-it-py" version = "3.0.0" @@ -1500,4 +1512,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "c44028be6f047970f91425a5c04f65b66116c9a5ad2f8409e2d19d19612f1752" +content-hash = "c48151b734ebe301d09fb06d15888e2b92fd11b80e710e36aa2552082c1637ad" diff --git a/pyproject.toml b/pyproject.toml index 5a01549..0d9a0e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ fastapi = "^0.104.1" uvicorn = "^0.24.0" httpx = "^0.25.1" pydantic-settings = "^2.0.3" +json-logging = "^1.3.0" [tool.poetry.dev-dependencies] pytest = "*" diff --git a/tailscalesd/main.py b/tailscalesd/main.py index 4c38a4f..63a5ed5 100644 --- a/tailscalesd/main.py +++ b/tailscalesd/main.py @@ -1,28 +1,23 @@ import asyncio import logging import os +import sys from ipaddress import ip_address from typing import Dict, List import httpx import uvicorn + +import json_logging from fastapi import FastAPI -from pydantic import SecretStr, Field +from pydantic import Field, SecretStr from pydantic_settings import BaseSettings, SettingsConfigDict env_path = os.getenv("TAILSCALESD_ENV_FILE") debug = os.getenv("TAILSCALESD_DEBUG", False) - -handler = logging.StreamHandler() -handler.setFormatter( - logging.Formatter(style="{", fmt="[{name}:{filename}] {levelname} - {message}") -) - log = logging.getLogger("tailscalesd") log.setLevel(logging.DEBUG if debug else logging.INFO) -log.addHandler(handler) - -CACHE_DEVICES = [] +log.addHandler(logging.StreamHandler(sys.stdout)) def filter_ipv6(addresses): @@ -43,25 +38,27 @@ class Settings(BaseSettings): settings = Settings() # type: ignore[call-arg] app = FastAPI() +json_logging.init_fastapi(enable_json=True) +json_logging.init_request_instrument(app) -async def tailscale_poll(): - global CACHE_DEVICES - log.debug("Starting polling") - while True: - async with httpx.AsyncClient() as client: +CACHE_SD = [] + + +async def tailscale_devices() -> List: + async with httpx.AsyncClient() as client: + try: # https://github.com/tailscale/tailscale/blob/main/api.md#tailnet-devices-get r = await client.get( f"https://api.tailscale.com/api/v2/tailnet/{settings.tailnet}/devices", auth=(settings.api_key.get_secret_value(), ""), ) - CACHE_DEVICES = r.json()["devices"] - - await asyncio.sleep(settings.interval) - - -@app.on_event("startup") -async def start_tailscale_poll(): - asyncio.create_task(tailscale_poll()) + return r.json()["devices"] + except Exception as e: + log.error( + f"Polling tailscale devices failed!", + exc_info=e, + ) + return [] def group_by_type(input_list): @@ -130,8 +127,10 @@ async def matrix_sd(devices) -> List: try: workers = await matrix_node_sd(device) except Exception as e: - log.error(f"Failed parsing matrix node sd for device={device['hostname']}") - log.error(e) + log.error( + f"Failed parsing matrix node sd for device={device['hostname']}", + exc_info=e, + ) workers = [] targets = matrix_workers_to_sd(device, workers) if targets: @@ -151,11 +150,30 @@ def plain_devices_sd(devices) -> List: return sd +async def poll_sd(): + global CACHE_SD + while True: + try: + devices = await tailscale_devices() + device_targets = plain_devices_sd(devices) + matrix_targets = await matrix_sd(devices) + CACHE_SD = matrix_targets + device_targets + await asyncio.sleep(settings.interval) + except Exception as e: + log.error( + f"service discovery poller failed", + exc_info=e, + ) + + +@app.on_event("startup") +async def start_sd(): + asyncio.create_task(poll_sd()) + + @app.get("/") async def sd(): - device_targets = plain_devices_sd(CACHE_DEVICES) - matrix_targets = await matrix_sd(CACHE_DEVICES) - return matrix_targets + device_targets + return CACHE_SD def main():