Skip to content

Commit 836b2f3

Browse files
authored
global-monitor: add clickhouse writer (#3449)
## Summary of Changes - Add ClickHouse as a second telemetry backend alongside the existing InfluxDB writer, with both independently toggleable via environment variables - Include goose-based migration setup with an initial migration creating 3 tables: `solana_validator_icmp_probe`, `solana_validator_tpuquic_probe`, `doublezero_user_icmp_probe` - Introduce a `ProbeWriter` interface so planners can be tested without a real ClickHouse connection ## Diff Breakdown | Category | Files | Lines (+/-) | Net | |--------------|-------|-------------|-------| | Core logic | 7 | +831 / -19 | +812 | | Scaffolding | 3 | +274 / -0 | +274 | | Tests | 5 | +510 / -12 | +498 | | Config/build | 2 | +92 / -72 | +20 | ~⅓ tests, ~½ core logic, remainder is migration scaffolding and dependency updates. <details> <summary>Key files (click to expand)</summary> - `telemetry/global-monitor/internal/clickhouse/writer.go` — new buffered ClickHouse writer with typed row structs per table, batch insert flush, and `ProbeWriter` interface - `telemetry/global-monitor/db/clickhouse/migrations/20260404000001_init.sql` — initial migration creating 3 MergeTree tables with monthly partitioning and 90-day TTL - `telemetry/global-monitor/internal/gm/planner_dz_icmp.go` — add `recordClickHouseRow()` for DZ user ICMP probes including Solana cross-reference fields - `telemetry/global-monitor/internal/gm/planner_sol_tpuquic.go` — add `recordClickHouseRow()` for Solana validator TPUQUIC probes - `telemetry/global-monitor/internal/gm/planner_sol_icmp.go` — add `recordClickHouseRow()` for Solana validator ICMP probes - `telemetry/global-monitor/internal/clickhouse/migrations.go` — goose migration runner adapted for slog - `telemetry/global-monitor/cmd/global-monitor/main.go` — wire ClickHouse config via env vars, run migrations on startup, pass writer to runner - `telemetry/global-monitor/internal/gm/runner.go` — add `ClickHouseWriter` to config, flush buffered rows at end of each tick </details> ## Testing Verification - Unit tests added for all 3 planners verifying ClickHouse row recording: success case (all fields populated), failure case (probe_ok=false with fail reason), not-ready case (no row written), and nil writer safety - Writer buffer tests: correct accumulation per table, concurrent append safety with 300 goroutines - All existing InfluxDB recording tests continue to pass unchanged
1 parent 124b2f2 commit 836b2f3

16 files changed

Lines changed: 1568 additions & 31 deletions

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ All notable changes to this project will be documented in this file.
1313
- Telemetry
1414
- Add optional TLS support to state-ingest server via `--tls-cert-file` and `--tls-key-file` flags; when set, the server listens on both HTTP (`:8080`) and HTTPS (`:8443`) simultaneously
1515
- Remove `--additional-child-probes` CLI flag from telemetry-agent; child geoprobe discovery now relies entirely on the onchain Geolocation program
16+
- Monitor
17+
- Add ClickHouse as a telemetry backend for the global monitor alongside existing InfluxDB
1618
- E2E tests
1719
- Add `TestE2E_GeoprobeIcmpTargets` verifying end-to-end ICMP outbound offset delivery via onchain `outbound-icmp` targets
1820
- Refactor geoprobe E2E tests to use testcontainers entrypoints and onchain target discovery

telemetry/global-monitor/cmd/global-monitor/main.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/lmittmann/tint"
1919
"github.com/malbeclabs/doublezero/config"
2020
"github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability"
21+
chwriter "github.com/malbeclabs/doublezero/telemetry/global-monitor/internal/clickhouse"
2122
"github.com/malbeclabs/doublezero/telemetry/global-monitor/internal/dz"
2223
"github.com/malbeclabs/doublezero/telemetry/global-monitor/internal/gm"
2324
"github.com/malbeclabs/doublezero/telemetry/global-monitor/internal/metrics"
@@ -228,6 +229,37 @@ func run() error {
228229
defer influxAPI.Flush()
229230
}
230231

232+
// ClickHouse configuration.
233+
chAddr := os.Getenv("CLICKHOUSE_ADDR")
234+
chDatabase := os.Getenv("CLICKHOUSE_DATABASE")
235+
chUsername := os.Getenv("CLICKHOUSE_USER")
236+
chPassword := os.Getenv("CLICKHOUSE_PASS")
237+
chSecure := os.Getenv("CLICKHOUSE_SECURE") == "true"
238+
chRunMigrations := os.Getenv("CLICKHOUSE_RUN_MIGRATIONS") == "true"
239+
chEnabled := chAddr != "" && chDatabase != ""
240+
var clickHouseWriter *chwriter.Writer
241+
if chEnabled {
242+
log.Info("clickhouse enabled", "addr", chAddr, "database", chDatabase, "secure", chSecure, "run_migrations", chRunMigrations)
243+
if chRunMigrations {
244+
if err := chwriter.RunMigrations(chAddr, chDatabase, chUsername, chPassword, chSecure, log); err != nil {
245+
log.Error("failed to run clickhouse migrations", "error", err)
246+
return err
247+
}
248+
log.Info("clickhouse migrations applied")
249+
}
250+
251+
w, err := chwriter.NewWriter(chAddr, chDatabase, chUsername, chPassword, chSecure, log)
252+
if err != nil {
253+
log.Error("failed to create clickhouse writer", "error", err)
254+
return err
255+
}
256+
defer w.Close()
257+
clickHouseWriter = w
258+
log.Info("clickhouse writer created")
259+
} else {
260+
log.Info("clickhouse disabled (CLICKHOUSE_ADDR and CLICKHOUSE_DATABASE required)")
261+
}
262+
231263
nlr := netlink.NewNetlinker()
232264

233265
geoIP, err := geoip.NewResolver(log, cityDB, asnDB, metroDB)
@@ -271,6 +303,9 @@ func run() error {
271303
// InfluxDB configuration.
272304
InfluxAPI: influxAPI,
273305

306+
// ClickHouse configuration.
307+
ClickHouseWriter: clickHouseWriter,
308+
274309
// GeoIP configuration.
275310
GeoIP: geoIP,
276311

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
-- +goose Up
2+
3+
CREATE TABLE IF NOT EXISTS solana_validator_icmp_probe (
4+
timestamp DateTime64(3) CODEC(DoubleDelta, ZSTD(1)),
5+
6+
-- Probe dimensions.
7+
probe_type LowCardinality(String),
8+
probe_path LowCardinality(String),
9+
10+
-- Validator dimensions.
11+
validator_pubkey String,
12+
validator_vote_pubkey String,
13+
14+
-- Target dimensions.
15+
target_ip String,
16+
target_ip_block_24 String,
17+
target_endpoint String,
18+
19+
-- Source dimensions.
20+
source_metro LowCardinality(String),
21+
source_metro_name LowCardinality(String),
22+
source_host LowCardinality(String),
23+
source_iface LowCardinality(String),
24+
source_ip String,
25+
source_dzd_code LowCardinality(String),
26+
source_dzd_metro_code LowCardinality(String),
27+
source_dzd_metro_name LowCardinality(String),
28+
29+
-- Target device dimensions.
30+
target_dzd_code LowCardinality(String),
31+
target_dzd_metro_code LowCardinality(String),
32+
target_dzd_metro_name LowCardinality(String),
33+
34+
-- Target GeoIP dimensions.
35+
target_geoip_country LowCardinality(String),
36+
target_geoip_country_code LowCardinality(String),
37+
target_geoip_region LowCardinality(String),
38+
target_geoip_city LowCardinality(String),
39+
target_geoip_city_id Int32 DEFAULT 0,
40+
target_geoip_metro LowCardinality(String),
41+
target_geoip_asn UInt32 DEFAULT 0,
42+
target_geoip_asn_org String DEFAULT '',
43+
target_geoip_latitude Float64 DEFAULT 0,
44+
target_geoip_longitude Float64 DEFAULT 0,
45+
46+
-- Probe result metrics.
47+
probe_ok Bool,
48+
probe_fail_reason LowCardinality(String),
49+
probe_rtt_avg_ms Float64 DEFAULT 0,
50+
probe_rtt_latest_ms Float64 DEFAULT 0,
51+
probe_rtt_min_ms Float64 DEFAULT 0,
52+
probe_rtt_dev_ms Float64 DEFAULT 0,
53+
probe_packets_sent Int64 DEFAULT 0,
54+
probe_packets_recv Int64 DEFAULT 0,
55+
probe_packets_lost Int64 DEFAULT 0,
56+
probe_loss_ratio Float64 DEFAULT 0,
57+
58+
-- Validator metrics.
59+
validator_leader_ratio Float64 DEFAULT 0,
60+
validator_stake_lamports UInt64 DEFAULT 0
61+
)
62+
ENGINE = MergeTree()
63+
PARTITION BY toYYYYMM(timestamp)
64+
ORDER BY (source_metro, probe_path, validator_pubkey, timestamp)
65+
TTL toDateTime(timestamp) + INTERVAL 90 DAY;
66+
67+
CREATE TABLE IF NOT EXISTS solana_validator_tpuquic_probe (
68+
timestamp DateTime64(3) CODEC(DoubleDelta, ZSTD(1)),
69+
70+
-- Probe dimensions.
71+
probe_type LowCardinality(String),
72+
probe_path LowCardinality(String),
73+
74+
-- Validator dimensions.
75+
validator_pubkey String,
76+
validator_vote_pubkey String,
77+
78+
-- Target dimensions.
79+
target_ip String,
80+
target_ip_block_24 String,
81+
target_port UInt16 DEFAULT 0,
82+
target_endpoint String,
83+
84+
-- Source dimensions.
85+
source_metro LowCardinality(String),
86+
source_metro_name LowCardinality(String),
87+
source_host LowCardinality(String),
88+
source_iface LowCardinality(String),
89+
source_ip String,
90+
source_dzd_code LowCardinality(String),
91+
source_dzd_metro_code LowCardinality(String),
92+
source_dzd_metro_name LowCardinality(String),
93+
94+
-- Target device dimensions.
95+
target_dzd_code LowCardinality(String),
96+
target_dzd_metro_code LowCardinality(String),
97+
target_dzd_metro_name LowCardinality(String),
98+
99+
-- Target GeoIP dimensions.
100+
target_geoip_country LowCardinality(String),
101+
target_geoip_country_code LowCardinality(String),
102+
target_geoip_region LowCardinality(String),
103+
target_geoip_city LowCardinality(String),
104+
target_geoip_city_id Int32 DEFAULT 0,
105+
target_geoip_metro LowCardinality(String),
106+
target_geoip_asn UInt32 DEFAULT 0,
107+
target_geoip_asn_org String DEFAULT '',
108+
target_geoip_latitude Float64 DEFAULT 0,
109+
target_geoip_longitude Float64 DEFAULT 0,
110+
111+
-- Probe result metrics.
112+
probe_ok Bool,
113+
probe_fail_reason LowCardinality(String),
114+
probe_rtt_avg_ms Float64 DEFAULT 0,
115+
probe_rtt_latest_ms Float64 DEFAULT 0,
116+
probe_rtt_min_ms Float64 DEFAULT 0,
117+
probe_rtt_dev_ms Float64 DEFAULT 0,
118+
probe_packets_sent Int64 DEFAULT 0,
119+
probe_packets_recv Int64 DEFAULT 0,
120+
probe_packets_lost Int64 DEFAULT 0,
121+
probe_loss_ratio Float64 DEFAULT 0,
122+
123+
-- Validator metrics.
124+
validator_leader_ratio Float64 DEFAULT 0,
125+
validator_stake_lamports UInt64 DEFAULT 0
126+
)
127+
ENGINE = MergeTree()
128+
PARTITION BY toYYYYMM(timestamp)
129+
ORDER BY (source_metro, probe_path, validator_pubkey, timestamp)
130+
TTL toDateTime(timestamp) + INTERVAL 90 DAY;
131+
132+
CREATE TABLE IF NOT EXISTS doublezero_user_icmp_probe (
133+
timestamp DateTime64(3) CODEC(DoubleDelta, ZSTD(1)),
134+
135+
-- Probe dimensions.
136+
probe_type LowCardinality(String),
137+
probe_path LowCardinality(String),
138+
139+
-- User dimensions.
140+
user_pubkey String,
141+
user_validator_pubkey String,
142+
validator_vote_pubkey String,
143+
144+
-- Target dimensions.
145+
target_ip String,
146+
target_ip_block_24 String,
147+
148+
-- Source dimensions.
149+
source_metro LowCardinality(String),
150+
source_metro_name LowCardinality(String),
151+
source_host LowCardinality(String),
152+
source_iface LowCardinality(String),
153+
source_ip String,
154+
source_user_pubkey String,
155+
source_dzd_code LowCardinality(String),
156+
source_dzd_metro_code LowCardinality(String),
157+
source_dzd_metro_name LowCardinality(String),
158+
159+
-- Target device dimensions.
160+
target_dzd_code LowCardinality(String),
161+
target_dzd_metro_code LowCardinality(String),
162+
target_dzd_metro_name LowCardinality(String),
163+
164+
-- Target GeoIP dimensions.
165+
target_geoip_country LowCardinality(String),
166+
target_geoip_country_code LowCardinality(String),
167+
target_geoip_region LowCardinality(String),
168+
target_geoip_city LowCardinality(String),
169+
target_geoip_city_id Int32 DEFAULT 0,
170+
target_geoip_metro LowCardinality(String),
171+
target_geoip_asn UInt32 DEFAULT 0,
172+
target_geoip_asn_org String DEFAULT '',
173+
target_geoip_latitude Float64 DEFAULT 0,
174+
target_geoip_longitude Float64 DEFAULT 0,
175+
176+
-- Probe result metrics.
177+
probe_ok Bool,
178+
probe_fail_reason LowCardinality(String),
179+
probe_rtt_avg_ms Float64 DEFAULT 0,
180+
probe_rtt_latest_ms Float64 DEFAULT 0,
181+
probe_rtt_min_ms Float64 DEFAULT 0,
182+
probe_rtt_dev_ms Float64 DEFAULT 0,
183+
probe_packets_sent Int64 DEFAULT 0,
184+
probe_packets_recv Int64 DEFAULT 0,
185+
probe_packets_lost Int64 DEFAULT 0,
186+
probe_loss_ratio Float64 DEFAULT 0,
187+
188+
-- Solana cross-reference metrics.
189+
user_validator_pubkey_in_solana_vote_accounts Bool DEFAULT false,
190+
user_validator_pubkey_in_solana_gossip Bool DEFAULT false,
191+
target_ip_in_solana_gossip Bool DEFAULT false,
192+
target_ip_in_solana_gossip_as_tpuquic Bool DEFAULT false
193+
)
194+
ENGINE = MergeTree()
195+
PARTITION BY toYYYYMM(timestamp)
196+
ORDER BY (source_metro, probe_path, user_pubkey, timestamp)
197+
TTL toDateTime(timestamp) + INTERVAL 90 DAY;
198+
199+
-- +goose Down
200+
201+
DROP TABLE IF EXISTS solana_validator_icmp_probe;
202+
DROP TABLE IF EXISTS solana_validator_tpuquic_probe;
203+
DROP TABLE IF EXISTS doublezero_user_icmp_probe;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// Package migrations embeds ClickHouse migration SQL files for use with goose.
2+
package migrations
3+
4+
import "embed"
5+
6+
//go:embed *.sql
7+
var FS embed.FS
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- One-time manual setup: create the database and user before running the application.
2+
CREATE DATABASE IF NOT EXISTS global_monitor;
3+
CREATE USER IF NOT EXISTS global_monitor IDENTIFIED BY 'changeme';
4+
GRANT SELECT, INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE ON global_monitor.* TO global_monitor;
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package clickhouse
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"database/sql"
7+
"fmt"
8+
"log/slog"
9+
10+
"github.com/ClickHouse/clickhouse-go/v2"
11+
"github.com/malbeclabs/doublezero/telemetry/global-monitor/db/clickhouse/migrations"
12+
"github.com/pressly/goose/v3"
13+
)
14+
15+
func RunMigrations(addr, database, username, password string, secure bool, log *slog.Logger) error {
16+
opts := &clickhouse.Options{
17+
Addr: []string{addr},
18+
Auth: clickhouse.Auth{
19+
Database: database,
20+
Username: username,
21+
Password: password,
22+
},
23+
}
24+
if secure {
25+
opts.TLS = &tls.Config{}
26+
}
27+
db := clickhouse.OpenDB(opts)
28+
defer func() { _ = db.Close() }()
29+
30+
if err := db.Ping(); err != nil {
31+
return fmt.Errorf("migration ping: %w", err)
32+
}
33+
34+
return runGoose(db, log)
35+
}
36+
37+
type gooseLogger struct {
38+
log *slog.Logger
39+
}
40+
41+
func (g *gooseLogger) Fatalf(format string, v ...any) {
42+
g.log.Error(fmt.Sprintf(format, v...))
43+
}
44+
45+
func (g *gooseLogger) Printf(format string, v ...any) {
46+
g.log.Info(fmt.Sprintf(format, v...))
47+
}
48+
49+
func runGoose(db *sql.DB, log *slog.Logger) error {
50+
provider, err := goose.NewProvider(goose.DialectClickHouse, db, migrations.FS,
51+
goose.WithLogger(&gooseLogger{log: log}),
52+
)
53+
if err != nil {
54+
return fmt.Errorf("goose provider: %w", err)
55+
}
56+
if _, err := provider.Up(context.Background()); err != nil {
57+
return fmt.Errorf("goose up: %w", err)
58+
}
59+
return nil
60+
}

0 commit comments

Comments
 (0)