tailscalesd/tailscalesd/main.py

166 lines
4.6 KiB
Python

import asyncio
import logging
import os
from ipaddress import ip_address
from typing import Dict, List
import httpx
import uvicorn
from fastapi import FastAPI
from pydantic import SecretStr, Field
from pydantic_settings import BaseSettings, SettingsConfigDict
env_path = os.getenv("TAILSCALESD_ENV_FILE")
debug = os.getenv("TAILSCALESD_DEBUG", False)
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter(style="{", fmt="[{name}:{filename}] {levelname} - {message}")
)
log = logging.getLogger("tailscalesd")
log.setLevel(logging.DEBUG if debug else logging.INFO)
log.addHandler(handler)
CACHE_DEVICES = []
def filter_ipv6(addresses):
return list(filter(lambda a: ip_address(a).version == 4, addresses))
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_prefix="TAILSCALESD_", env_file=env_path, env_file_encoding="utf-8"
)
host: str = "0.0.0.0" # nosec B104
port: int = 9242
interval: int = 5
tailnet: str = Field()
api_key: SecretStr = Field()
settings = Settings() # type: ignore[call-arg]
app = FastAPI()
async def tailscale_poll():
global CACHE_DEVICES
log.debug("Starting polling")
while True:
async with httpx.AsyncClient() as client:
# https://github.com/tailscale/tailscale/blob/main/api.md#tailnet-devices-get
r = await client.get(
f"https://api.tailscale.com/api/v2/tailnet/{settings.tailnet}/devices",
auth=(settings.api_key.get_secret_value(), ""),
)
CACHE_DEVICES = r.json()["devices"]
await asyncio.sleep(settings.interval)
@app.on_event("startup")
async def start_tailscale_poll():
asyncio.create_task(tailscale_poll())
def group_by_type(input_list):
result = {}
for item in input_list:
key = item.get("type")
if key:
result.setdefault(key, []).append(item)
return result
def tailscale_labels(device) -> Dict:
return {
"__meta_tailscale_device_client_version": device["clientVersion"],
"__meta_tailscale_device_hostname": device["hostname"],
"__meta_tailscale_device_authorized": str(device["authorized"]).lower(),
"__meta_tailscale_device_id": device["id"],
"__meta_tailscale_device_name": device["name"],
"__meta_tailscale_device_os": device["os"],
"__meta_tailscale_tailnet": settings.tailnet,
}
async def matrix_node_sd(device) -> Dict:
ipv4 = filter_ipv6(device["addresses"])[0]
async with httpx.AsyncClient() as client:
r = await client.get(f"http://{ipv4}:8081/")
data = r.json()
return group_by_type(data)
def matrix_workers_to_sd(device, workers) -> List:
if len(workers) == 0:
return []
ipv4 = filter_ipv6(device["addresses"])[0]
target_groups = []
for worker_type, workers in workers.items():
targets = []
for worker in workers:
port = worker["metrics_port"]
worker_name = worker.get("name", "WORKER_NO_NAME")
if not port:
log.error(
f"Error parsing worker {worker_name} on host={device['hostname']}. Port is invalid port={port}"
)
continue
targets.append(f"{ipv4}:{port}")
target_groups.append(
{
"targets": targets,
"labels": tailscale_labels(device)
| {
"__meta_matrix_worker_type": worker_type,
"__meta_matrix_worker_name": worker_name,
},
}
)
return target_groups
async def matrix_sd(devices) -> List:
sd = []
for device in devices:
if "tag:matrix" not in device["tags"]:
continue
try:
workers = await matrix_node_sd(device)
except Exception as e:
log.error(f"Failed parsing matrix node sd for device={device['hostname']}")
log.error(e)
workers = []
targets = matrix_workers_to_sd(device, workers)
if targets:
sd.append(targets)
return []
def plain_devices_sd(devices) -> List:
sd = []
for device in devices:
service = {}
service["targets"] = filter_ipv6(device["addresses"])
service["labels"] = tailscale_labels(device)
sd.append(service)
return sd
@app.get("/")
async def sd():
device_targets = plain_devices_sd(CACHE_DEVICES)
matrix_targets = await matrix_sd(CACHE_DEVICES)
return matrix_targets + device_targets
def main():
uvicorn.run(app, host=settings.host, port=settings.port)
if __name__ == "__main__":
main()