From 5e760d52dd254a26483237eff430cfd9586b630c Mon Sep 17 00:00:00 2001 From: Abel Luck Date: Wed, 27 May 2026 09:06:29 +0200 Subject: [PATCH] Reduce peak client recalculation overhead --- internal/dnstt/collector.go | 8 ++++++-- internal/dnstt/collector_test.go | 34 ++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/internal/dnstt/collector.go b/internal/dnstt/collector.go index 4928c16..175b712 100644 --- a/internal/dnstt/collector.go +++ b/internal/dnstt/collector.go @@ -167,9 +167,12 @@ func (c *Collector) RecordQueryFrom(domain string, clientID string, resolverIP n now := c.now() client, exists := tunnel.clients[clientID] + updatePeak := !exists if !exists { client = &clientState{firstSeen: now, firstKey: key} tunnel.clients[clientID] = client + } else if now.Sub(client.lastSeen) >= ClientTimeout || client.lastKey != key { + updatePeak = true } client.lastSeen = now client.lastKey = key @@ -178,7 +181,9 @@ func (c *Collector) RecordQueryFrom(domain string, clientID string, resolverIP n client.bytesIn += uint64(size) } - updatePeaks(tunnel, now) + if updatePeak { + updatePeaks(tunnel, now) + } } // RecordResponse records an observed DNSTT DNS response. @@ -211,7 +216,6 @@ func (c *Collector) Snapshot() Snapshot { snapshot := Snapshot{Tunnels: make(map[string]TunnelSnapshot, len(c.tunnels))} for _, domain := range c.domains { tunnel := c.tunnels[domain] - updatePeaks(tunnel, now) series := c.seriesSnapshotsLocked(domain, tunnel, now) tunnelSnapshot := TunnelSnapshot{Domain: domain, Series: series} diff --git a/internal/dnstt/collector_test.go b/internal/dnstt/collector_test.go index d46c923..8f31044 100644 --- a/internal/dnstt/collector_test.go +++ b/internal/dnstt/collector_test.go @@ -1,6 +1,7 @@ package dnstt import ( + "net/netip" "testing" "time" ) @@ -59,3 +60,36 @@ func TestCollectorMatchesSubdomainsToRegisteredTunnel(t *testing.T) { t.Fatalf("active clients = %d, want 1", tunnel.ActiveClients) } } + +func TestCollectorUpdatesPeakWhenActiveClientChangesGeoKey(t *testing.T) { + now := time.Unix(1000, 0) + firstResolver := netip.MustParseAddr("192.0.2.53") + secondResolver := netip.MustParseAddr("198.51.100.53") + c := NewCollector( + []string{"tunnel.example.com"}, + WithNow(func() time.Time { return now }), + WithGeoResolver(fakeGeoResolver{ + labelNames: []string{"asn"}, + labels: map[netip.Addr]GeoLabels{ + firstResolver: {ASN: "64500"}, + secondResolver: {ASN: "64501"}, + }, + }), + ) + + c.RecordQueryFrom("tunnel.example.com", "client-a", firstResolver, 120) + c.RecordQueryFrom("tunnel.example.com", "client-a", secondResolver, 120) + + foundChangedASN := false + for _, series := range c.Snapshot().Tunnels["tunnel.example.com"].Series { + if series.ASN == "64501" && series.PeakClients != 1 { + t.Fatalf("peak clients for changed ASN = %d, want 1", series.PeakClients) + } + if series.ASN == "64501" { + foundChangedASN = true + } + } + if !foundChangedASN { + t.Fatal("series for changed ASN not found") + } +}