Skip to content

Commit 14e28a5

Browse files
simonovic86claude
andauthored
fix(runtime): harden Phase 3 — validate bounds, fix leaks, add tests (#7)
Hardening pass before Phase 4 (Economics): - Validate hostcall bounds: cap rand_bytes at 64MB and log_emit at 1MB to prevent OOM panics (EI-6), check malloc null pointer return - Reject negative budget/price in checkpoint parsing (RE-3), guard int64 overflow in cost calculation - Fix migration: return error on stale checkpoint deletion failure (EI-1), hold lock during instance Close to prevent use-after-close - Plug memory leaks: copy-to-new-slice on eventlog and replay window eviction instead of slice truncation - Clean stale .tmp files on FSProvider startup (RE-1 crash recovery) - Add config validation (PricePerSecond, ReplayWindowSize, ReplayMode) - Remove unused protocol types, consolidate logging in cmd/igord - Add 13 new tests covering all hardening fixes Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent f0326bb commit 14e28a5

14 files changed

Lines changed: 361 additions & 64 deletions

File tree

cmd/igord/main.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -86,28 +86,28 @@ func main() {
8686
logger := logging.NewLogger()
8787

8888
// Print startup banner
89-
logging.Info(logger, "Igor Node starting...")
90-
logging.Info(logger, fmt.Sprintf("NodeID: %s", cfg.NodeID))
89+
logger.Info("Igor Node starting...")
90+
logger.Info("NodeID: " + cfg.NodeID)
9191

9292
// Initialize P2P node
9393
node, err := p2p.NewNode(ctx, cfg, logger)
9494
if err != nil {
95-
logging.Error(logger, "Failed to create P2P node", "error", err)
95+
logger.Error("Failed to create P2P node", "error", err)
9696
os.Exit(1)
9797
}
9898
defer node.Close()
9999

100100
// Create storage provider
101101
storageProvider, err := storage.NewFSProvider(cfg.CheckpointDir, logger)
102102
if err != nil {
103-
logging.Error(logger, "Failed to create storage provider", "error", err)
103+
logger.Error("Failed to create storage provider", "error", err)
104104
os.Exit(1)
105105
}
106106

107107
// Create WASM runtime engine for migration service
108108
engine, err := runtime.NewEngine(ctx, logger)
109109
if err != nil {
110-
logging.Error(logger, "Failed to create runtime engine", "error", err)
110+
logger.Error("Failed to create runtime engine", "error", err)
111111
os.Exit(1)
112112
}
113113
defer engine.Close(ctx)
@@ -118,46 +118,46 @@ func main() {
118118
// If --migrate-agent flag is provided, perform migration
119119
if *migrateAgent != "" {
120120
if *targetPeer == "" {
121-
logging.Error(logger, "Migration requires --to flag with target peer address")
121+
logger.Error("Migration requires --to flag with target peer address")
122122
os.Exit(1)
123123
}
124124
if *wasmPath == "" {
125-
logging.Error(logger, "Migration requires --wasm flag with WASM binary path")
125+
logger.Error("Migration requires --wasm flag with WASM binary path")
126126
os.Exit(1)
127127
}
128128

129-
logging.Info(logger, "Initiating agent migration",
129+
logger.Info("Initiating agent migration",
130130
"agent_id", *migrateAgent,
131131
"target", *targetPeer,
132132
)
133133

134134
if err := migrationSvc.MigrateAgent(ctx, *migrateAgent, *wasmPath, *targetPeer); err != nil {
135-
logging.Error(logger, "Migration failed", "error", err)
135+
logger.Error("Migration failed", "error", err)
136136
os.Exit(1)
137137
}
138138

139-
logging.Info(logger, "Migration completed successfully")
139+
logger.Info("Migration completed successfully")
140140
return
141141
}
142142

143143
// If --run-agent flag is provided, run agent locally
144144
if *runAgent != "" {
145145
budgetMicrocents := budget.FromFloat(*budgetFlag)
146146
if err := runLocalAgent(ctx, cfg, engine, storageProvider, *runAgent, budgetMicrocents, *manifestPath, migrationSvc, logger); err != nil {
147-
logging.Error(logger, "Failed to run agent", "error", err)
147+
logger.Error("Failed to run agent", "error", err)
148148
os.Exit(1)
149149
}
150150
return
151151
}
152152

153-
logging.Info(logger, "Igor Node ready")
153+
logger.Info("Igor Node ready")
154154

155155
// Block until interrupted
156156
sigChan := make(chan os.Signal, 1)
157157
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
158158
<-sigChan
159159

160-
logging.Info(logger, "Igor Node shutting down...")
160+
logger.Info("Igor Node shutting down...")
161161
}
162162

163163
// runLocalAgent loads and executes an agent locally with tick loop and checkpointing.
@@ -408,7 +408,7 @@ func runSimulator(wasmPath, manifestPath string, budgetVal float64, ticks int, v
408408
}
409409
result, err := simulator.Run(ctx, cfg, logger)
410410
if err != nil {
411-
logging.Error(logger, "Simulation failed", "error", err)
411+
logger.Error("Simulation failed", "error", err)
412412
os.Exit(1)
413413
}
414414
simulator.PrintSummary(result, logger)

internal/agent/instance.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/binary"
77
"fmt"
88
"log/slog"
9+
"math"
910
"os"
1011
"time"
1112

@@ -263,11 +264,21 @@ func (i *Instance) Tick(ctx context.Context) error {
263264
maxSnaps = DefaultReplayWindowSize
264265
}
265266
if len(i.ReplayWindow) > maxSnaps {
266-
i.ReplayWindow = i.ReplayWindow[len(i.ReplayWindow)-maxSnaps:]
267+
kept := i.ReplayWindow[len(i.ReplayWindow)-maxSnaps:]
268+
fresh := make([]TickSnapshot, len(kept))
269+
copy(fresh, kept)
270+
i.ReplayWindow = fresh
267271
}
268272

269-
// Calculate and deduct execution cost (nanosecond precision, no float, no truncation)
270-
costMicrocents := elapsed.Nanoseconds() * i.PricePerSecond / 1_000_000_000
273+
// Calculate and deduct execution cost (nanosecond precision, no float, no truncation).
274+
// Guard against int64 overflow: if nanos * price would overflow, cap cost at remaining budget.
275+
var costMicrocents int64
276+
nanos := elapsed.Nanoseconds()
277+
if i.PricePerSecond > 0 && nanos > math.MaxInt64/i.PricePerSecond {
278+
costMicrocents = i.Budget
279+
} else {
280+
costMicrocents = nanos * i.PricePerSecond / 1_000_000_000
281+
}
271282
i.Budget -= costMicrocents
272283

273284
observationCount := 0
@@ -398,6 +409,9 @@ func (i *Instance) Resume(ctx context.Context, state []byte) error {
398409
}
399410

400411
ptr := uint32(results[0])
412+
if ptr == 0 {
413+
return fmt.Errorf("malloc returned null pointer (out of memory)")
414+
}
401415

402416
// Write state to WASM memory
403417
ok := i.Module.Memory().Write(ptr, state)
@@ -493,10 +507,18 @@ func ParseCheckpointHeader(checkpoint []byte) (budgetVal int64, price int64, tic
493507
if checkpoint[0] != checkpointVersion {
494508
return 0, 0, 0, [32]byte{}, nil, fmt.Errorf("unsupported checkpoint version: %d", checkpoint[0])
495509
}
510+
budgetParsed := int64(binary.LittleEndian.Uint64(checkpoint[1:9]))
511+
priceParsed := int64(binary.LittleEndian.Uint64(checkpoint[9:17]))
512+
if budgetParsed < 0 {
513+
return 0, 0, 0, [32]byte{}, nil, fmt.Errorf("checkpoint contains negative budget: %d", budgetParsed)
514+
}
515+
if priceParsed < 0 {
516+
return 0, 0, 0, [32]byte{}, nil, fmt.Errorf("checkpoint contains negative price: %d", priceParsed)
517+
}
496518
var hash [32]byte
497519
copy(hash[:], checkpoint[25:57])
498-
return int64(binary.LittleEndian.Uint64(checkpoint[1:9])),
499-
int64(binary.LittleEndian.Uint64(checkpoint[9:17])),
520+
return budgetParsed,
521+
priceParsed,
500522
binary.LittleEndian.Uint64(checkpoint[17:25]),
501523
hash,
502524
checkpoint[checkpointHeaderLen:],

internal/agent/instance_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,33 @@ func TestParseCheckpointHeader_EmptyState(t *testing.T) {
501501
}
502502
}
503503

504+
func TestParseCheckpointHeader_NegativeBudget(t *testing.T) {
505+
checkpoint := make([]byte, 57)
506+
checkpoint[0] = 0x02
507+
// Write -1 as budget (all bits set via two's complement)
508+
negBudget := int64(-1)
509+
binary.LittleEndian.PutUint64(checkpoint[1:9], uint64(negBudget))
510+
binary.LittleEndian.PutUint64(checkpoint[9:17], 1000)
511+
512+
_, _, _, _, _, err := ParseCheckpointHeader(checkpoint)
513+
if err == nil {
514+
t.Error("expected error for negative budget in checkpoint")
515+
}
516+
}
517+
518+
func TestParseCheckpointHeader_NegativePrice(t *testing.T) {
519+
checkpoint := make([]byte, 57)
520+
checkpoint[0] = 0x02
521+
binary.LittleEndian.PutUint64(checkpoint[1:9], 1000000)
522+
negPrice := int64(-500)
523+
binary.LittleEndian.PutUint64(checkpoint[9:17], uint64(negPrice))
524+
525+
_, _, _, _, _, err := ParseCheckpointHeader(checkpoint)
526+
if err == nil {
527+
t.Error("expected error for negative price in checkpoint")
528+
}
529+
}
530+
504531
func TestParseCheckpointHeader_Corruption(t *testing.T) {
505532
tests := []struct {
506533
name string

internal/config/config.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package config
22

33
import (
4+
"fmt"
5+
46
"github.com/google/uuid"
57
)
68

@@ -53,9 +55,30 @@ func Load() (*Config, error) {
5355
ReplayMode: "full",
5456
ReplayCostLog: false,
5557
}
58+
if err := cfg.Validate(); err != nil {
59+
return nil, fmt.Errorf("invalid config: %w", err)
60+
}
5661
return cfg, nil
5762
}
5863

64+
// Validate checks config invariants.
65+
func (c *Config) Validate() error {
66+
if c.PricePerSecond <= 0 {
67+
return fmt.Errorf("PricePerSecond must be positive, got %d", c.PricePerSecond)
68+
}
69+
if c.ReplayWindowSize < 0 {
70+
return fmt.Errorf("ReplayWindowSize must be non-negative, got %d", c.ReplayWindowSize)
71+
}
72+
if c.VerifyInterval < 0 {
73+
return fmt.Errorf("VerifyInterval must be non-negative, got %d", c.VerifyInterval)
74+
}
75+
validModes := map[string]bool{"off": true, "periodic": true, "on-migrate": true, "full": true}
76+
if !validModes[c.ReplayMode] {
77+
return fmt.Errorf("ReplayMode must be one of off/periodic/on-migrate/full, got %q", c.ReplayMode)
78+
}
79+
return nil
80+
}
81+
5982
// generateNodeID creates a random UUID for the node if one is not provided.
6083
func generateNodeID() string {
6184
return uuid.New().String()

internal/config/config_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,48 @@ func TestLoad_Defaults(t *testing.T) {
2727
t.Errorf("VerifyInterval: got %d, want 5", cfg.VerifyInterval)
2828
}
2929
}
30+
31+
func TestConfig_Validate_DefaultsAreValid(t *testing.T) {
32+
cfg, err := Load()
33+
if err != nil {
34+
t.Fatalf("Load: %v", err)
35+
}
36+
if err := cfg.Validate(); err != nil {
37+
t.Errorf("default config should be valid: %v", err)
38+
}
39+
}
40+
41+
func TestConfig_Validate_ZeroPrice(t *testing.T) {
42+
cfg := &Config{PricePerSecond: 0, ReplayMode: "full"}
43+
if err := cfg.Validate(); err == nil {
44+
t.Error("expected error for zero PricePerSecond")
45+
}
46+
}
47+
48+
func TestConfig_Validate_NegativePrice(t *testing.T) {
49+
cfg := &Config{PricePerSecond: -1, ReplayMode: "full"}
50+
if err := cfg.Validate(); err == nil {
51+
t.Error("expected error for negative PricePerSecond")
52+
}
53+
}
54+
55+
func TestConfig_Validate_InvalidReplayMode(t *testing.T) {
56+
cfg := &Config{PricePerSecond: 1000, ReplayMode: "invalid"}
57+
if err := cfg.Validate(); err == nil {
58+
t.Error("expected error for invalid ReplayMode")
59+
}
60+
}
61+
62+
func TestConfig_Validate_NegativeReplayWindow(t *testing.T) {
63+
cfg := &Config{PricePerSecond: 1000, ReplayWindowSize: -1, ReplayMode: "full"}
64+
if err := cfg.Validate(); err == nil {
65+
t.Error("expected error for negative ReplayWindowSize")
66+
}
67+
}
68+
69+
func TestConfig_Validate_NegativeVerifyInterval(t *testing.T) {
70+
cfg := &Config{PricePerSecond: 1000, VerifyInterval: -1, ReplayMode: "full"}
71+
if err := cfg.Validate(); err == nil {
72+
t.Error("expected error for negative VerifyInterval")
73+
}
74+
}

internal/eventlog/eventlog.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,14 @@ func (l *EventLog) SealTick() *TickLog {
8787
l.history = append(l.history, sealed)
8888
l.current = nil
8989

90-
// Evict oldest ticks when history exceeds maxTicks bound
90+
// Evict oldest ticks when history exceeds maxTicks bound.
91+
// Copy to a new slice to release references to evicted TickLogs,
92+
// preventing memory leaks from the retained underlying array.
9193
if l.maxTicks > 0 && len(l.history) > l.maxTicks {
92-
l.history = l.history[len(l.history)-l.maxTicks:]
94+
kept := l.history[len(l.history)-l.maxTicks:]
95+
fresh := make([]*TickLog, len(kept))
96+
copy(fresh, kept)
97+
l.history = fresh
9398
}
9499

95100
return sealed

internal/eventlog/eventlog_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,23 @@ func TestEventLog_Eviction(t *testing.T) {
136136
}
137137
}
138138

139+
func TestEventLog_EvictionReleasesMemory(t *testing.T) {
140+
el := NewEventLog(3)
141+
for i := uint64(1); i <= 10; i++ {
142+
el.BeginTick(i)
143+
el.Record(ClockNow, make([]byte, 1024))
144+
el.SealTick()
145+
}
146+
history := el.History()
147+
if len(history) != 3 {
148+
t.Fatalf("expected 3, got %d", len(history))
149+
}
150+
// Verify capacity is not leaking (should be exactly 3, not 10)
151+
if cap(history) > 3 {
152+
t.Errorf("history capacity should be 3, got %d (memory leak)", cap(history))
153+
}
154+
}
155+
139156
func TestEventLog_UnboundedWhenZero(t *testing.T) {
140157
el := NewEventLog(0)
141158

internal/hostcall/log.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,23 @@ import (
88
"github.com/tetratelabs/wazero/api"
99
)
1010

11+
// maxLogBytes caps the buffer size for log_emit to prevent excessive memory allocation.
12+
const maxLogBytes = 1 * 1024 * 1024 // 1 MB
13+
1114
// registerLog adds log_emit to the host module builder.
1215
// log_emit(ptr i32, len i32): emits a structured log entry.
1316
// Not a side effect — logging is an observation that produces no
1417
// externally visible state change.
1518
func (r *Registry) registerLog(builder wazero.HostModuleBuilder) {
1619
builder.NewFunctionBuilder().
1720
WithFunc(func(_ context.Context, m api.Module, ptr, length uint32) {
21+
if length == 0 || length > maxLogBytes {
22+
return
23+
}
1824
data, ok := m.Memory().Read(ptr, length)
1925
if !ok {
26+
r.logger.Warn("log_emit: failed to read from WASM memory",
27+
"ptr", ptr, "length", length)
2028
return
2129
}
2230

internal/hostcall/rand.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,28 @@ import (
99
"github.com/tetratelabs/wazero/api"
1010
)
1111

12+
// maxRandBytes caps the buffer size for rand_bytes to prevent OOM panics.
13+
// Matches the WASM linear memory limit (64 MB = 1024 pages × 64 KiB).
14+
const maxRandBytes = 64 * 1024 * 1024
15+
1216
// registerRand adds rand_bytes to the host module builder.
1317
// rand_bytes(ptr i32, len i32) -> i32: fills buffer with random bytes.
14-
// Returns 0 on success, -1 on internal error, -4 if buffer write fails.
18+
// Returns 0 on success, -1 on internal error, -2 if length exceeds maximum,
19+
// -4 if buffer write fails.
1520
// Observation hostcall — recorded in event log for replay (CE-3).
1621
func (r *Registry) registerRand(builder wazero.HostModuleBuilder) {
1722
builder.NewFunctionBuilder().
1823
WithFunc(func(_ context.Context, m api.Module, ptr, length uint32) int32 {
24+
if length == 0 {
25+
return 0
26+
}
27+
if length > maxRandBytes {
28+
r.logger.Warn("rand_bytes: length exceeds maximum",
29+
"requested", length,
30+
"max", maxRandBytes,
31+
)
32+
return -2
33+
}
1934
buf := make([]byte, length)
2035
if _, err := crypto_rand.Read(buf); err != nil {
2136
r.logger.Error("rand_bytes: crypto/rand.Read failed", "error", err)

0 commit comments

Comments
 (0)