Skip to content

Commit a595641

Browse files
bdchathamclaude
andauthored
feat: status-aware task Submit with Run counter and livez endpoint (Alpha P3) (#61)
## Summary Cloud-API model for the sidecar task engine: the controller submits stable keys, and the sidecar owns the execution lifecycle. - **Status-aware `Submit`**: failed tasks are transparently re-executed on re-submit; running/completed are idempotent no-ops - **`Run` counter** on `TaskResult`: tracks execution count under the same stable ID. Increments on failed→re-execute, NOT on crash-recovery rehydration - **Concurrency safety**: `sync.Mutex` + `inFlight` map prevents double-execution of the same failed task ID - **`/v0/livez` endpoint**: SQLite liveness check via `Ping()` — distinct from `/v0/healthz` (readiness). Use as Kubernetes liveness probe. - **SQLite migration v3**: adds `run` column ### Why Deterministic task IDs + PVC-persisted SQLite = permanently stuck failed tasks after pod restart. The engine's dedup check returned the cached failure without re-executing. This is the sidecar half of the Alpha P3 task reliability initiative. The controller half (plan IDs, simplified retry, failure diagnostics) follows in a separate PR on sei-node-controller-networking. ## Test plan - [x] `TestSubmitReExecutesFailedTask` — failed task re-executes, Run increments to 2 - [x] `TestSubmitReExecutesFailedTaskThatFailsAgain` — persistent failure increments Run - [x] `TestSubmitDoesNotIncrementRunOnRehydration` — crash recovery preserves Run=1 - [x] `TestSubmitConcurrentSameFailedID` — mutex prevents double-execution - [x] `TestSubmitRunFieldOnFirstSubmit` — new tasks start at Run=1 - [x] `TestLivezReturns200WhenStoreHealthy` / `TestLivezReturns200BeforeReady` - [x] All 40+ existing engine, server, and store tests pass - [x] `go vet` clean 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent f8881c9 commit a595641

9 files changed

Lines changed: 332 additions & 18 deletions

File tree

sidecar/engine/engine.go

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package engine
33
import (
44
"context"
55
"fmt"
6+
"sync"
67
"sync/atomic"
78
"time"
89

@@ -35,6 +36,7 @@ type Engine struct {
3536
ctx context.Context
3637
ready atomic.Bool
3738
store ResultStore
39+
mu sync.Mutex
3840
}
3941

4042
// NewEngine creates a new Engine. The engine runs until ctx is cancelled.
@@ -52,6 +54,8 @@ func NewEngine(ctx context.Context, handlers map[TaskType]TaskHandler, store Res
5254

5355
// rehydrateStaleTasks re-executes tasks that were left in "running"
5456
// state by a previous process that exited before completing them.
57+
// Run count is NOT incremented — rehydration is crash recovery of an
58+
// incomplete run, not a new run.
5559
func (e *Engine) rehydrateStaleTasks() {
5660
stale, err := e.store.ListStaleTasks()
5761
if err != nil {
@@ -66,19 +70,24 @@ func (e *Engine) rehydrateStaleTasks() {
6670
tr.Status = TaskStatusFailed
6771
tr.Error = "no handler registered for task type"
6872
tr.CompletedAt = &t
69-
_ = e.store.Save(&tr)
73+
if err := e.store.Save(&tr); err != nil {
74+
log.Error("failed to persist stale task failure", "id", tr.ID, "err", err)
75+
}
7076
continue
7177
}
72-
log.Info("rehydrating stale task", "type", tr.Type, "id", tr.ID)
73-
e.runTask(tr.ID, TaskType(tr.Type), handler, tr.Params, tr.SubmittedAt)
78+
log.Info("rehydrating stale task", "type", tr.Type, "id", tr.ID, "run", tr.Run)
79+
e.runTask(tr.ID, TaskType(tr.Type), handler, tr.Params, tr.SubmittedAt, tr.Run)
7480
}
7581
}
7682

77-
// Submit starts a task in its own goroutine and returns its ID. When
78-
// task.ID is set, it becomes the canonical identifier (enabling
79-
// deterministic IDs from the controller). When empty, a random UUID is
80-
// generated. If a task with the same ID already exists, the existing ID
81-
// is returned without re-submitting.
83+
// Submit starts a task in its own goroutine and returns its ID.
84+
//
85+
// The engine follows a cloud-API model for task lifecycle:
86+
// - If no task with this ID exists, create and execute it (run 1).
87+
// - If the task is running or completed, return its ID (idempotent no-op).
88+
// - If the task failed, re-execute it with an incremented run counter.
89+
//
90+
// The caller submits a stable key and the engine owns the execution lifecycle.
8291
func (e *Engine) Submit(task Task) (string, error) {
8392
handler, ok := e.handlers[task.Type]
8493
if !ok {
@@ -94,17 +103,25 @@ func (e *Engine) Submit(task Task) (string, error) {
94103
id = uuid.New().String()
95104
}
96105

97-
// Dedup check against the store.
106+
e.mu.Lock()
107+
defer e.mu.Unlock()
108+
109+
run := 1
98110
if existing, _ := e.store.Get(id); existing != nil {
99-
return id, nil
111+
switch existing.Status {
112+
case TaskStatusRunning, TaskStatusCompleted:
113+
return id, nil
114+
case TaskStatusFailed:
115+
run = existing.Run + 1
116+
}
100117
}
101118

102119
now := time.Now().UTC()
103-
104120
tr := &TaskResult{
105121
ID: id,
106122
Type: string(task.Type),
107123
Status: TaskStatusRunning,
124+
Run: run,
108125
Params: task.Params,
109126
SubmittedAt: now,
110127
}
@@ -113,21 +130,22 @@ func (e *Engine) Submit(task Task) (string, error) {
113130
return "", fmt.Errorf("persist task: %w", err)
114131
}
115132

116-
log.Info("task submitted", "type", task.Type, "id", id)
117-
e.runTask(id, task.Type, handler, task.Params, now)
133+
log.Info("task submitted", "type", task.Type, "id", id, "run", run)
134+
e.runTask(id, task.Type, handler, task.Params, now, run)
118135

119136
return id, nil
120137
}
121138

122139
// runTask spawns a goroutine to run the handler and persist the result.
123-
func (e *Engine) runTask(id string, taskType TaskType, handler TaskHandler, params map[string]any, submittedAt time.Time) {
140+
func (e *Engine) runTask(id string, taskType TaskType, handler TaskHandler, params map[string]any, submittedAt time.Time, run int) {
124141
go func() {
125142
err := e.execute(e.ctx, taskType, handler, params)
126143

127144
t := time.Now().UTC()
128145
tr := &TaskResult{
129146
ID: id,
130147
Type: string(taskType),
148+
Run: run,
131149
Params: params,
132150
SubmittedAt: submittedAt,
133151
CompletedAt: &t,
@@ -160,10 +178,18 @@ func (e *Engine) execute(ctx context.Context, taskType TaskType, handler TaskHan
160178
}
161179

162180
// Healthz returns true after the engine has been marked ready.
181+
// Use as a readiness check.
163182
func (e *Engine) Healthz() bool {
164183
return e.ready.Load()
165184
}
166185

186+
// Livez returns nil when the engine's backing store is responsive.
187+
// Use as a liveness check — a non-nil error means the process is wedged
188+
// (e.g., SQLite WAL corruption, PVC read-only).
189+
func (e *Engine) Livez() error {
190+
return e.store.Ping()
191+
}
192+
167193
// Status returns the engine's current state.
168194
func (e *Engine) Status() StatusResponse {
169195
status := "Initializing"

sidecar/engine/engine_test.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,173 @@ func TestLongRunningTaskDoesNotBlockOthers(t *testing.T) {
537537
waitForResult(t, eng, id)
538538
}
539539

540+
// --- Status-aware Submit tests ---
541+
542+
func TestSubmitReExecutesFailedTask(t *testing.T) {
543+
calls := 0
544+
eng := newTestEngine(t, map[TaskType]TaskHandler{
545+
TaskConfigPatch: func(_ context.Context, _ map[string]any) error {
546+
calls++
547+
if calls == 1 {
548+
return errors.New("transient failure")
549+
}
550+
return nil
551+
},
552+
})
553+
554+
const taskID = "dddddddd-1111-2222-3333-444444444444"
555+
556+
// First submit: task fails.
557+
id1, err := eng.Submit(Task{ID: taskID, Type: TaskConfigPatch})
558+
if err != nil {
559+
t.Fatalf("first submit: %v", err)
560+
}
561+
r1 := waitForResult(t, eng, id1)
562+
if r1.Status != TaskStatusFailed {
563+
t.Fatalf("expected failed, got %s", r1.Status)
564+
}
565+
if r1.Run != 1 {
566+
t.Fatalf("expected run=1, got %d", r1.Run)
567+
}
568+
569+
// Second submit with same ID: should re-execute.
570+
id2, err := eng.Submit(Task{ID: taskID, Type: TaskConfigPatch})
571+
if err != nil {
572+
t.Fatalf("second submit: %v", err)
573+
}
574+
if id2 != id1 {
575+
t.Fatalf("expected same ID, got %q and %q", id1, id2)
576+
}
577+
578+
r2 := waitForResult(t, eng, id2)
579+
if r2.Status != TaskStatusCompleted {
580+
t.Fatalf("expected completed on retry, got %s", r2.Status)
581+
}
582+
if r2.Run != 2 {
583+
t.Fatalf("expected run=2, got %d", r2.Run)
584+
}
585+
if calls != 2 {
586+
t.Fatalf("expected handler called twice, got %d", calls)
587+
}
588+
}
589+
590+
func TestSubmitReExecutesFailedTaskThatFailsAgain(t *testing.T) {
591+
eng := newTestEngine(t, map[TaskType]TaskHandler{
592+
TaskConfigPatch: func(_ context.Context, _ map[string]any) error {
593+
return errors.New("persistent failure")
594+
},
595+
})
596+
597+
const taskID = "eeeeeeee-1111-2222-3333-444444444444"
598+
599+
id, _ := eng.Submit(Task{ID: taskID, Type: TaskConfigPatch})
600+
waitForResult(t, eng, id)
601+
602+
// Re-submit: still fails.
603+
eng.Submit(Task{ID: taskID, Type: TaskConfigPatch})
604+
r := waitForResult(t, eng, id)
605+
606+
if r.Status != TaskStatusFailed {
607+
t.Fatalf("expected failed, got %s", r.Status)
608+
}
609+
if r.Run != 2 {
610+
t.Fatalf("expected run=2, got %d", r.Run)
611+
}
612+
}
613+
614+
func TestSubmitDoesNotIncrementRunOnRehydration(t *testing.T) {
615+
// Create a store with a stale running task (simulates pod crash).
616+
store := newTestStore(t)
617+
now := time.Now().UTC()
618+
_ = store.Save(&TaskResult{
619+
ID: "ffffffff-1111-2222-3333-444444444444",
620+
Type: string(TaskConfigPatch),
621+
Status: TaskStatusRunning,
622+
Run: 1,
623+
SubmittedAt: now,
624+
})
625+
626+
ctx, cancel := context.WithCancel(context.Background())
627+
t.Cleanup(cancel)
628+
eng := NewEngine(ctx, map[TaskType]TaskHandler{
629+
TaskConfigPatch: func(_ context.Context, _ map[string]any) error { return nil },
630+
}, store)
631+
632+
r := waitForResult(t, eng, "ffffffff-1111-2222-3333-444444444444")
633+
if r.Run != 1 {
634+
t.Fatalf("expected run=1 after rehydration (not incremented), got %d", r.Run)
635+
}
636+
if r.Status != TaskStatusCompleted {
637+
t.Fatalf("expected completed after rehydration, got %s", r.Status)
638+
}
639+
}
640+
641+
func TestSubmitConcurrentSameFailedID(t *testing.T) {
642+
started := make(chan struct{}, 2)
643+
blocked := make(chan struct{})
644+
var callCount atomic.Int32
645+
646+
ctx, cancel := context.WithCancel(context.Background())
647+
t.Cleanup(cancel)
648+
store := newTestStore(t)
649+
eng := NewEngine(ctx, map[TaskType]TaskHandler{
650+
TaskConfigPatch: func(_ context.Context, _ map[string]any) error {
651+
callCount.Add(1)
652+
started <- struct{}{}
653+
<-blocked
654+
return nil
655+
},
656+
}, store)
657+
658+
const taskID = "11111111-2222-3333-4444-555555555555"
659+
660+
// Seed a failed task directly in the store.
661+
now := time.Now().UTC()
662+
_ = store.Save(&TaskResult{
663+
ID: taskID,
664+
Type: string(TaskConfigPatch),
665+
Status: TaskStatusFailed,
666+
Run: 1,
667+
Error: "failed",
668+
SubmittedAt: now,
669+
CompletedAt: &now,
670+
})
671+
672+
// Two concurrent submits of the same failed ID.
673+
var wg sync.WaitGroup
674+
wg.Add(2)
675+
for i := 0; i < 2; i++ {
676+
go func() {
677+
defer wg.Done()
678+
eng.Submit(Task{ID: taskID, Type: TaskConfigPatch})
679+
}()
680+
}
681+
wg.Wait()
682+
683+
// Unblock the handler and wait for completion.
684+
close(blocked)
685+
waitForResult(t, eng, taskID)
686+
687+
// The mutex serializes Submit: the first sees "failed" and re-executes,
688+
// the second sees "running" (from the first's Save) and no-ops.
689+
if c := callCount.Load(); c != 1 {
690+
t.Fatalf("expected exactly 1 re-execution, got %d", c)
691+
}
692+
}
693+
694+
func TestSubmitRunFieldOnFirstSubmit(t *testing.T) {
695+
eng := newTestEngine(t, map[TaskType]TaskHandler{
696+
TaskConfigPatch: func(_ context.Context, _ map[string]any) error { return nil },
697+
})
698+
699+
id, _ := eng.Submit(Task{Type: TaskConfigPatch})
700+
r := waitForResult(t, eng, id)
701+
702+
if r.Run != 1 {
703+
t.Fatalf("expected run=1 on first submit, got %d", r.Run)
704+
}
705+
}
706+
540707
func TestTaskErrorProducesRichErrorString(t *testing.T) {
541708
eng := newTestEngine(t, map[TaskType]TaskHandler{
542709
TaskConfigPatch: func(_ context.Context, _ map[string]any) error {

sidecar/engine/sqlite_migrations.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,5 +68,27 @@ func migrate(db *sql.DB) error {
6868
}
6969
}
7070

71+
if version < 3 {
72+
tx, err := db.Begin()
73+
if err != nil {
74+
return err
75+
}
76+
defer tx.Rollback()
77+
78+
if _, err := tx.Exec(`
79+
ALTER TABLE task_results ADD COLUMN run INTEGER NOT NULL DEFAULT 1;
80+
`); err != nil {
81+
return err
82+
}
83+
84+
if _, err := tx.Exec("PRAGMA user_version = 3"); err != nil {
85+
return err
86+
}
87+
88+
if err := tx.Commit(); err != nil {
89+
return err
90+
}
91+
}
92+
7193
return nil
7294
}

sidecar/engine/sqlite_store.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,12 @@ func (s *SQLiteStore) Save(r *TaskResult) error {
7070

7171
_, err = s.db.Exec(`
7272
INSERT OR REPLACE INTO task_results
73-
(id, type, status, params, error, submitted_at, completed_at)
74-
VALUES (?, ?, ?, ?, ?, ?, ?)`,
73+
(id, type, status, run, params, error, submitted_at, completed_at)
74+
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
7575
r.ID,
7676
r.Type,
7777
string(r.Status),
78+
r.Run,
7879
string(params),
7980
r.Error,
8081
r.SubmittedAt.UTC().Format(time.RFC3339Nano),
@@ -112,14 +113,19 @@ func (s *SQLiteStore) Delete(id string) (bool, error) {
112113
return n > 0, nil
113114
}
114115

116+
func (s *SQLiteStore) Ping() error {
117+
var n int
118+
return s.db.QueryRow("SELECT 1").Scan(&n)
119+
}
120+
115121
func (s *SQLiteStore) Close() error {
116122
return s.db.Close()
117123
}
118124

119125
// --- query helpers ---
120126

121127
const selectColumns = `
122-
SELECT id, type, status, params, error, submitted_at, completed_at
128+
SELECT id, type, status, run, params, error, submitted_at, completed_at
123129
FROM task_results`
124130

125131
// queryMany executes a query and scans all rows into TaskResults.
@@ -156,7 +162,7 @@ func scanTaskResult(s rowScanner) (*TaskResult, error) {
156162
)
157163

158164
if err := s.Scan(
159-
&r.ID, &r.Type, &status, &paramsJSON,
165+
&r.ID, &r.Type, &status, &r.Run, &paramsJSON,
160166
&r.Error, &submittedAt, &completedAt,
161167
); err != nil {
162168
return nil, err

0 commit comments

Comments
 (0)