Skip to content

Commit c4e29fb

Browse files
committed
device-health-oracle: add controller_success activation criterion
Add a criteria-based evaluation pattern to the device-health-oracle and implement the first criterion: devices must have called the controller at least once per minute over the burn-in period (verified via ClickHouse controller_grpc_getconfig_success table). - Introduce DeviceCriterion/LinkCriterion interfaces and stage-aware evaluators that enforce Pending → ReadyForLinks → ReadyForUsers progression for devices - Add ControllerSuccessCriterion querying ClickHouse for controller call coverage over the burn-in window - Optimize update logic to skip health writes when the value is already at the desired state - ClickHouse connection is optional: when CLICKHOUSE_ADDR is not set, the oracle falls back to no-criteria behavior (backward compatible)
1 parent a068f4a commit c4e29fb

11 files changed

Lines changed: 1104 additions & 6 deletions

File tree

controlplane/device-health-oracle/cmd/device-health-oracle/main.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ var (
4444
slackWebhookURL = flag.String("slack-webhook-url", "", "The Slack webhook URL to send alerts")
4545
provisioningSlotCount = flag.Uint64("provisioning-slot-count", defaultProvisioningSlotCount, "Burn-in slot count for new devices/links (~20 hours at 200000)")
4646
drainedSlotCount = flag.Uint64("drained-slot-count", defaultDrainedSlotCount, "Burn-in slot count for reactivated devices/links (~30 min at 5000)")
47+
slotDurationMs = flag.Uint64("slot-duration-ms", 400, "Duration of a ledger slot in milliseconds")
4748
version = "dev"
4849
commit = "none"
4950
date = "unknown"
@@ -129,6 +130,48 @@ func main() {
129130
serviceabilityExecutor := serviceability.NewExecutor(log, rpcClient, &signer, networkConfig.ServiceabilityProgramID)
130131
telemetryClient := telemetry.New(log, rpcClient, nil, networkConfig.TelemetryProgramID)
131132

133+
// Initialize ClickHouse-dependent criteria.
134+
var deviceCriteria []worker.DeviceCriterion
135+
if chAddr := os.Getenv("CLICKHOUSE_ADDR"); chAddr != "" {
136+
chDB := os.Getenv("CLICKHOUSE_DB")
137+
if chDB == "" {
138+
chDB = *env
139+
}
140+
chUser := os.Getenv("CLICKHOUSE_USER")
141+
if chUser == "" {
142+
chUser = "default"
143+
}
144+
chPass := os.Getenv("CLICKHOUSE_PASS")
145+
chTLSDisabled := os.Getenv("CLICKHOUSE_TLS_DISABLED") == "true"
146+
147+
chClient, err := worker.NewClickHouseClient(chAddr, chDB, chUser, chPass, chTLSDisabled)
148+
if err != nil {
149+
log.Warn("ClickHouse connection failed, continuing without controller_success criterion", "addr", chAddr, "error", err)
150+
} else {
151+
log.Info("ClickHouse enabled", "addr", chAddr, "db", chDB, "user", chUser, "tls", !chTLSDisabled)
152+
controllerSuccess := worker.NewControllerSuccessCriterion(
153+
chClient,
154+
*provisioningSlotCount,
155+
*drainedSlotCount,
156+
*slotDurationMs,
157+
log,
158+
)
159+
deviceCriteria = append(deviceCriteria, controllerSuccess)
160+
}
161+
} else {
162+
log.Info("ClickHouse disabled (CLICKHOUSE_ADDR not set), no controller_success criterion")
163+
}
164+
165+
deviceEvaluator := &worker.DeviceHealthEvaluator{
166+
ReadyForLinksCriteria: deviceCriteria,
167+
ReadyForUsersCriteria: nil,
168+
Log: log,
169+
}
170+
linkEvaluator := &worker.LinkHealthEvaluator{
171+
ReadyForServiceCriteria: nil,
172+
Log: log,
173+
}
174+
132175
worker.MetricBuildInfo.WithLabelValues(version, commit, date).Set(1)
133176
go func() {
134177
listener, err := net.Listen("tcp", *metricsAddr)
@@ -155,6 +198,8 @@ func main() {
155198
Env: *env,
156199
ProvisioningSlotCount: *provisioningSlotCount,
157200
DrainedSlotCount: *drainedSlotCount,
201+
DeviceEvaluator: deviceEvaluator,
202+
LinkEvaluator: linkEvaluator,
158203
})
159204
if err != nil {
160205
log.Error("Failed to create worker", "error", err)
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package worker
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"fmt"
7+
"strings"
8+
"time"
9+
10+
"github.com/ClickHouse/clickhouse-go/v2"
11+
)
12+
13+
// ControllerCallChecker queries ClickHouse for controller call records.
14+
type ControllerCallChecker interface {
15+
ControllerCallCoverage(ctx context.Context, devicePubkey string, start, end time.Time) (minutesWithCalls int64, err error)
16+
Close() error
17+
}
18+
19+
// ClickHouseClient wraps a ClickHouse connection for reading controller call data.
20+
type ClickHouseClient struct {
21+
conn clickhouse.Conn
22+
db string
23+
}
24+
25+
// NewClickHouseClient creates a new ClickHouse client connection.
26+
func NewClickHouseClient(addr, db, user, pass string, disableTLS bool) (*ClickHouseClient, error) {
27+
addr = strings.TrimPrefix(addr, "https://")
28+
addr = strings.TrimPrefix(addr, "http://")
29+
30+
opts := &clickhouse.Options{
31+
Protocol: clickhouse.HTTP,
32+
Addr: []string{addr},
33+
Auth: clickhouse.Auth{
34+
Database: db,
35+
Username: user,
36+
Password: pass,
37+
},
38+
MaxOpenConns: 5,
39+
DialTimeout: 30 * time.Second,
40+
}
41+
if !disableTLS {
42+
opts.TLS = &tls.Config{}
43+
}
44+
45+
conn, err := clickhouse.Open(opts)
46+
if err != nil {
47+
return nil, fmt.Errorf("clickhouse open: %w", err)
48+
}
49+
if err := conn.Ping(context.Background()); err != nil {
50+
return nil, fmt.Errorf("clickhouse ping: %w", err)
51+
}
52+
53+
return &ClickHouseClient{conn: conn, db: db}, nil
54+
}
55+
56+
// ControllerCallCoverage returns the number of distinct minutes in [start, end] that have
57+
// at least one controller_grpc_getconfig_success record for the given device.
58+
func (c *ClickHouseClient) ControllerCallCoverage(ctx context.Context, devicePubkey string, start, end time.Time) (int64, error) {
59+
query := fmt.Sprintf(
60+
`SELECT count(DISTINCT toStartOfMinute(timestamp)) AS minutes_with_calls
61+
FROM "%s".controller_grpc_getconfig_success
62+
WHERE device_pubkey = {pubkey:String}
63+
AND timestamp >= {start:DateTime64(3)}
64+
AND timestamp <= {end:DateTime64(3)}`,
65+
c.db,
66+
)
67+
68+
var minutesWithCalls int64
69+
err := c.conn.QueryRow(ctx, query,
70+
clickhouse.Named("pubkey", devicePubkey),
71+
clickhouse.Named("start", start),
72+
clickhouse.Named("end", end),
73+
).Scan(&minutesWithCalls)
74+
if err != nil {
75+
return 0, fmt.Errorf("clickhouse query: %w", err)
76+
}
77+
78+
return minutesWithCalls, nil
79+
}
80+
81+
// Close closes the ClickHouse connection.
82+
func (c *ClickHouseClient) Close() error {
83+
return c.conn.Close()
84+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package worker
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
"time"
8+
9+
chmodule "github.com/testcontainers/testcontainers-go/modules/clickhouse"
10+
11+
"github.com/ClickHouse/clickhouse-go/v2"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
func setupClickHouseContainer(t *testing.T) (clickhouse.Conn, func()) {
17+
t.Helper()
18+
ctx := context.Background()
19+
20+
container, err := chmodule.Run(ctx,
21+
"clickhouse/clickhouse-server:24.3",
22+
)
23+
require.NoError(t, err)
24+
25+
host, err := container.Host(ctx)
26+
require.NoError(t, err)
27+
28+
port, err := container.MappedPort(ctx, "9000/tcp")
29+
require.NoError(t, err)
30+
31+
conn, err := clickhouse.Open(&clickhouse.Options{
32+
Addr: []string{fmt.Sprintf("%s:%s", host, port.Port())},
33+
Auth: clickhouse.Auth{
34+
Database: "default",
35+
Username: "default",
36+
},
37+
})
38+
require.NoError(t, err)
39+
require.NoError(t, conn.Ping(ctx))
40+
41+
// Create the table
42+
err = conn.Exec(ctx, `
43+
CREATE TABLE IF NOT EXISTS "default".controller_grpc_getconfig_success (
44+
timestamp DateTime64(3),
45+
device_pubkey LowCardinality(String)
46+
) ENGINE = MergeTree
47+
PARTITION BY toYYYYMM(timestamp)
48+
ORDER BY (timestamp, device_pubkey)
49+
`)
50+
require.NoError(t, err)
51+
52+
cleanup := func() {
53+
_ = conn.Close()
54+
_ = container.Terminate(ctx)
55+
}
56+
57+
return conn, cleanup
58+
}
59+
60+
func TestClickHouseClient_ControllerCallCoverage(t *testing.T) {
61+
if testing.Short() {
62+
t.Skip("skipping integration test in short mode")
63+
}
64+
65+
conn, cleanup := setupClickHouseContainer(t)
66+
defer cleanup()
67+
68+
ctx := context.Background()
69+
client := &ClickHouseClient{conn: conn, db: "default"}
70+
71+
devicePubkey := "TestDevice123"
72+
now := time.Now().Truncate(time.Second)
73+
74+
// Insert records: one per minute for 10 minutes
75+
for i := 0; i < 10; i++ {
76+
ts := now.Add(-time.Duration(10-i) * time.Minute)
77+
err := conn.Exec(ctx, fmt.Sprintf(
78+
`INSERT INTO "default".controller_grpc_getconfig_success (timestamp, device_pubkey) VALUES (?, ?)`,
79+
), ts, devicePubkey)
80+
require.NoError(t, err)
81+
}
82+
83+
t.Run("full coverage", func(t *testing.T) {
84+
start := now.Add(-11 * time.Minute)
85+
end := now
86+
minutes, err := client.ControllerCallCoverage(ctx, devicePubkey, start, end)
87+
require.NoError(t, err)
88+
assert.Equal(t, int64(10), minutes)
89+
})
90+
91+
t.Run("partial window", func(t *testing.T) {
92+
start := now.Add(-5 * time.Minute)
93+
end := now
94+
minutes, err := client.ControllerCallCoverage(ctx, devicePubkey, start, end)
95+
require.NoError(t, err)
96+
assert.GreaterOrEqual(t, minutes, int64(4))
97+
assert.LessOrEqual(t, minutes, int64(5))
98+
})
99+
100+
t.Run("no data for different device", func(t *testing.T) {
101+
start := now.Add(-11 * time.Minute)
102+
end := now
103+
minutes, err := client.ControllerCallCoverage(ctx, "OtherDevice456", start, end)
104+
require.NoError(t, err)
105+
assert.Equal(t, int64(0), minutes)
106+
})
107+
108+
t.Run("empty time range", func(t *testing.T) {
109+
start := now.Add(-1 * time.Hour)
110+
end := now.Add(-50 * time.Minute)
111+
minutes, err := client.ControllerCallCoverage(ctx, devicePubkey, start, end)
112+
require.NoError(t, err)
113+
assert.Equal(t, int64(0), minutes)
114+
})
115+
}
116+
117+
func TestClickHouseClient_ControllerCallCoverage_WithGaps(t *testing.T) {
118+
if testing.Short() {
119+
t.Skip("skipping integration test in short mode")
120+
}
121+
122+
conn, cleanup := setupClickHouseContainer(t)
123+
defer cleanup()
124+
125+
ctx := context.Background()
126+
client := &ClickHouseClient{conn: conn, db: "default"}
127+
128+
devicePubkey := "GappyDevice789"
129+
now := time.Now().Truncate(time.Second)
130+
131+
// Insert records with gaps: minutes 0,1,2 then skip 3,4, then 5,6,7
132+
gapMinutes := []int{10, 9, 8, 5, 4, 3}
133+
for _, m := range gapMinutes {
134+
ts := now.Add(-time.Duration(m) * time.Minute)
135+
err := conn.Exec(ctx, fmt.Sprintf(
136+
`INSERT INTO "default".controller_grpc_getconfig_success (timestamp, device_pubkey) VALUES (?, ?)`,
137+
), ts, devicePubkey)
138+
require.NoError(t, err)
139+
}
140+
141+
start := now.Add(-11 * time.Minute)
142+
end := now
143+
minutes, err := client.ControllerCallCoverage(ctx, devicePubkey, start, end)
144+
require.NoError(t, err)
145+
assert.Equal(t, int64(6), minutes, "should count 6 distinct minutes (with 2-minute gap)")
146+
}

controlplane/device-health-oracle/internal/worker/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ type Config struct {
4747
// DrainedSlotCount is used for reactivated devices/links (status = Drained, HardDrained, SoftDrained).
4848
ProvisioningSlotCount uint64
4949
DrainedSlotCount uint64
50+
51+
// Health evaluators determine target health based on criteria.
52+
DeviceEvaluator *DeviceHealthEvaluator
53+
LinkEvaluator *LinkHealthEvaluator
5054
}
5155

5256
func (c *Config) Validate() error {
@@ -71,5 +75,11 @@ func (c *Config) Validate() error {
7175
if c.Interval <= 0 {
7276
return errors.New("interval must be greater than 0")
7377
}
78+
if c.DeviceEvaluator == nil {
79+
return errors.New("device evaluator is required")
80+
}
81+
if c.LinkEvaluator == nil {
82+
return errors.New("link evaluator is required")
83+
}
7484
return nil
7585
}

0 commit comments

Comments
 (0)