Skip to content

Commit 86fc035

Browse files
bdchathamclaude
andauthored
feat: single-patch reconcile model (#90)
* WIP: single-patch reconcile model (source changes, tests pending) Refactors both controllers to accumulate all status mutations in-memory and flush with a single Status().Patch() at the end of Reconcile. Executor changes: - Removes Client field — executor is now a pure in-memory mutation engine - Task loop: synchronous tasks advance in sequence without intermediate patches - failTask is now a void function (in-memory mutations only) Node controller changes: - Single statusBase captured after ensureNodeFinalizer - reconcilePeers returns (bool, error) dirty flag, mutates in-memory - observeCurrentImage returns (bool, error), mutates in-memory - Single Status().Patch() at end, gated on statusDirty - Flushes before returning exec errors (preserves partial progress) NodeDeployment controller changes: - reconcilePlan no longer takes statusBase (in-memory mutations) - completePlan/failPlan are now void (in-memory, no re-read) - startPlan is now void (in-memory, caller flushes) Tests need updating to match the new in-memory executor behavior. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: single-patch reconcile model Both controllers now accumulate all status mutations in-memory and flush with a single Status().Patch() at the end of Reconcile. This eliminates optimistic lock conflicts between tasks and the executor, and establishes a clean ownership model: tasks mutate owned resources, the executor mutates plan state in-memory, the reconciler flushes once. Executor changes: - Removes Client field — pure in-memory mutation engine - Task loop: synchronous tasks advance in sequence without patches - advanceTask returns ctrl.Result (no error — failures are in-memory) - failTask is void (in-memory mutations only) Node controller changes: - Single statusBase captured after ensureNodeFinalizer - reconcilePeers returns (bool, error) dirty flag, mutates in-memory - observeCurrentImage returns (bool, error), mutates in-memory - Single Status().Patch() at end, gated on statusDirty - Flushes before returning exec errors (preserves partial progress) NodeDeployment controller changes: - reconcilePlan no longer takes statusBase - completePlan/failPlan are void (in-memory, no re-read needed) - startPlan is void (caller flushes) Test updates: - Executor unit tests assert against in-memory object (no API re-fetch) - Integration tests use Reconcile (full pipeline with flush) - peers_test, reconciler_test assert in-memory mutations - nodedeployment plan_test asserts in-memory group status Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor: address PR comments - Trim verbose strategy comment in Reconcile, add concise method doc - Log exec error when status flush also fails (don't swallow silently) - Remove "Task completed synchronously" comment from executor loop - Use log.FromContext(ctx) instead of log.Log in failTask Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 358a216 commit 86fc035

12 files changed

Lines changed: 257 additions & 346 deletions

File tree

cmd/main.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ func main() {
159159
Recorder: nodeRecorder,
160160
Platform: platformCfg,
161161
PlanExecutor: &planner.Executor[*seiv1alpha1.SeiNode]{
162-
Client: kc,
163162
ConfigFor: func(_ context.Context, node *seiv1alpha1.SeiNode) task.ExecutionConfig {
164163
return task.ExecutionConfig{
165164
BuildSidecarClient: func() (task.SidecarClient, error) {
@@ -189,7 +188,6 @@ func main() {
189188
GatewayDomain: platformCfg.GatewayDomain,
190189
GatewayPublicDomain: platformCfg.GatewayPublicDomain,
191190
PlanExecutor: &planner.Executor[*seiv1alpha1.SeiNodeDeployment]{
192-
Client: kc,
193191
ConfigFor: func(ctx context.Context, group *seiv1alpha1.SeiNodeDeployment) task.ExecutionConfig {
194192
var assemblerNode *seiv1alpha1.SeiNode
195193
nodes := &seiv1alpha1.SeiNodeList{}

internal/controller/node/controller.go

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"sigs.k8s.io/controller-runtime/pkg/builder"
1717
"sigs.k8s.io/controller-runtime/pkg/client"
1818
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
19+
"sigs.k8s.io/controller-runtime/pkg/log"
1920
"sigs.k8s.io/controller-runtime/pkg/predicate"
2021

2122
seiv1alpha1 "github.com/sei-protocol/sei-k8s-controller/api/v1alpha1"
@@ -53,6 +54,8 @@ type SeiNodeReconciler struct {
5354
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch
5455
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
5556

57+
// Reconcile drives the SeiNode lifecycle. All status mutations after the
58+
// finalizer are accumulated in-memory and flushed in a single status patch.
5659
func (r *SeiNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
5760
node := &seiv1alpha1.SeiNode{}
5861
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
@@ -70,6 +73,8 @@ func (r *SeiNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
7073
return r.handleNodeDeletion(ctx, node)
7174
}
7275

76+
// Finalizer is a metadata Update — must happen before we snapshot
77+
// the status patch base because Update changes resourceVersion.
7378
if err := r.ensureNodeFinalizer(ctx, node); err != nil {
7479
return ctrl.Result{}, err
7580
}
@@ -81,38 +86,60 @@ func (r *SeiNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
8186
return ctrl.Result{}, nil
8287
}
8388

89+
statusBase := client.MergeFromWithOptions(node.DeepCopy(), client.MergeFromWithOptimisticLock{})
90+
observedPhase := node.Status.Phase
91+
statusDirty := false
92+
8493
// Pre-plan: resolve label-based peers so plan params have fresh data.
85-
if err := r.reconcilePeers(ctx, node); err != nil {
94+
if dirty, err := r.reconcilePeers(ctx, node); err != nil {
8695
return ctrl.Result{}, fmt.Errorf("reconciling peers: %w", err)
96+
} else if dirty {
97+
statusDirty = true
8798
}
8899

89100
// Resolve or resume plan. ResolvePlan either resumes an active plan or
90101
// builds a new one based on the node's phase, stamping it onto
91102
// node.Status.Plan (and transitioning Pending → Initializing).
92-
observedPhase := node.Status.Phase
93103
planAlreadyActive := node.Status.Plan != nil && node.Status.Plan.Phase == seiv1alpha1.TaskPlanActive
94-
patch := client.MergeFromWithOptions(node.DeepCopy(), client.MergeFromWithOptimisticLock{})
95104
if err := planner.ResolvePlan(node); err != nil {
96105
return ctrl.Result{}, fmt.Errorf("resolving plan: %w", err)
97106
}
98107

99-
if node.Status.Plan == nil {
100-
return ctrl.Result{}, nil
108+
// Execute the plan. The executor advances tasks in-memory — synchronous
109+
// tasks complete in a loop, async tasks return Running with a poll interval.
110+
var result ctrl.Result
111+
var execErr error
112+
if node.Status.Plan != nil && node.Status.Plan.Phase == seiv1alpha1.TaskPlanActive {
113+
result, execErr = r.PlanExecutor.ExecutePlan(ctx, node, node.Status.Plan)
114+
statusDirty = true
101115
}
102116

103-
// If ResolvePlan built a new plan, persist it before execution.
104-
if !planAlreadyActive {
105-
if err := r.Status().Patch(ctx, node, patch); err != nil {
106-
return ctrl.Result{}, fmt.Errorf("persisting new plan: %w", err)
117+
// If ResolvePlan built a new plan (but there's nothing to execute yet
118+
// because it was just created this reconcile), mark dirty so it gets persisted.
119+
if !planAlreadyActive && node.Status.Plan != nil {
120+
statusDirty = true
121+
}
122+
123+
// Running phase: observe image convergence in-memory.
124+
if node.Status.Phase == seiv1alpha1.PhaseRunning {
125+
if dirty, err := r.observeCurrentImage(ctx, node); err != nil {
126+
return ctrl.Result{}, fmt.Errorf("observing current image: %w", err)
127+
} else if dirty {
128+
statusDirty = true
107129
}
108-
return planner.ResultRequeueImmediate, nil
109130
}
110131

111-
// Execute the plan. The executor handles phase transitions via
112-
// TargetPhase/FailedPhase and sets Conditions on failure.
113-
result, err := r.PlanExecutor.ExecutePlan(ctx, node, node.Status.Plan)
114-
if err != nil {
115-
return result, err
132+
if statusDirty {
133+
if err := r.Status().Patch(ctx, node, statusBase); err != nil {
134+
if execErr != nil {
135+
log.FromContext(ctx).Error(execErr, "plan execution error lost due to status flush failure")
136+
}
137+
return ctrl.Result{}, fmt.Errorf("flushing status: %w", err)
138+
}
139+
}
140+
141+
if execErr != nil {
142+
return result, execErr
116143
}
117144

118145
// Emit metrics/events if the phase changed.
@@ -130,40 +157,38 @@ func (r *SeiNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
130157
}
131158
}
132159

133-
// Running phase: observe image convergence after plan completes.
134-
if node.Status.Phase == seiv1alpha1.PhaseRunning {
135-
if err := r.observeCurrentImage(ctx, node); err != nil {
136-
return ctrl.Result{}, fmt.Errorf("observing current image: %w", err)
137-
}
160+
// Running nodes with no active plan requeue on a steady-state interval.
161+
// Spec changes trigger immediate reconciles via GenerationChangedPredicate.
162+
if node.Status.Phase == seiv1alpha1.PhaseRunning && (node.Status.Plan == nil || node.Status.Plan.Phase != seiv1alpha1.TaskPlanActive) {
138163
return ctrl.Result{RequeueAfter: statusPollInterval}, nil
139164
}
140165

141166
return result, nil
142167
}
143168

144-
func (r *SeiNodeReconciler) observeCurrentImage(ctx context.Context, node *seiv1alpha1.SeiNode) error {
169+
// observeCurrentImage checks whether the StatefulSet rollout has completed
170+
// and stamps status.currentImage in-memory. Returns true if the image changed.
171+
func (r *SeiNodeReconciler) observeCurrentImage(ctx context.Context, node *seiv1alpha1.SeiNode) (bool, error) {
145172
sts := &appsv1.StatefulSet{}
146173
if err := r.Get(ctx, types.NamespacedName{Name: node.Name, Namespace: node.Namespace}, sts); err != nil {
147174
if apierrors.IsNotFound(err) {
148-
return nil
175+
return false, nil
149176
}
150-
return err
177+
return false, err
151178
}
152179

153-
// Wait for the StatefulSet controller to process the latest spec change.
154180
if sts.Status.ObservedGeneration < sts.Generation {
155-
return nil
181+
return false, nil
156182
}
157183
if sts.Spec.Replicas == nil || sts.Status.UpdatedReplicas < *sts.Spec.Replicas {
158-
return nil
184+
return false, nil
159185
}
160186

161187
if node.Status.CurrentImage != node.Spec.Image {
162-
patch := client.MergeFromWithOptions(node.DeepCopy(), client.MergeFromWithOptimisticLock{})
163188
node.Status.CurrentImage = node.Spec.Image
164-
return r.Status().Patch(ctx, node, patch)
189+
return true, nil
165190
}
166-
return nil
191+
return false, nil
167192
}
168193

169194
// SetupWithManager sets up the controller with the Manager.

internal/controller/node/peers.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,17 @@ import (
1212

1313
// reconcilePeers resolves label-based peer sources by listing matching
1414
// SeiNode resources and writing their stable DNS hostnames to
15-
// status.resolvedPeers. This runs on every reconcile so the resolved
16-
// list stays current. EC2Tags and Static sources are handled by the
17-
// sidecar at task execution time and do not appear here.
18-
func (r *SeiNodeReconciler) reconcilePeers(ctx context.Context, node *seiv1alpha1.SeiNode) error {
15+
// status.resolvedPeers in-memory. The caller is responsible for persisting
16+
// the change. Returns true if the resolved peers changed.
17+
func (r *SeiNodeReconciler) reconcilePeers(ctx context.Context, node *seiv1alpha1.SeiNode) (bool, error) {
1918
var resolved []string
2019
for _, src := range node.Spec.Peers {
2120
if src.Label == nil {
2221
continue
2322
}
2423
endpoints, err := r.resolveLabelPeers(ctx, node, src.Label)
2524
if err != nil {
26-
return err
25+
return false, err
2726
}
2827
resolved = append(resolved, endpoints...)
2928
}
@@ -32,13 +31,10 @@ func (r *SeiNodeReconciler) reconcilePeers(ctx context.Context, node *seiv1alpha
3231
resolved = slices.Compact(resolved)
3332

3433
if !slices.Equal(node.Status.ResolvedPeers, resolved) {
35-
patch := client.MergeFromWithOptions(node.DeepCopy(), client.MergeFromWithOptimisticLock{})
3634
node.Status.ResolvedPeers = resolved
37-
if err := r.Status().Patch(ctx, node, patch); err != nil {
38-
return fmt.Errorf("patching resolved peers: %w", err)
39-
}
35+
return true, nil
4036
}
41-
return nil
37+
return false, nil
4238
}
4339

4440
// resolveLabelPeers lists SeiNode resources matching the label selector

internal/controller/node/peers_test.go

Lines changed: 22 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"testing"
66

77
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8-
"k8s.io/apimachinery/pkg/types"
98

109
seiv1alpha1 "github.com/sei-protocol/sei-k8s-controller/api/v1alpha1"
1110
)
@@ -39,28 +38,23 @@ func TestReconcilePeers_ResolvesLabelSource(t *testing.T) {
3938
Spec: seiv1alpha1.SeiNodeSpec{ChainID: "test-1", Image: "sei:latest", FullNode: &seiv1alpha1.FullNodeSpec{}},
4039
}
4140

42-
r, c := newNodeReconciler(t, node, peer1, peer2)
41+
r, _ := newNodeReconciler(t, node, peer1, peer2)
4342
ctx := context.Background()
4443

45-
if err := r.reconcilePeers(ctx, node); err != nil {
44+
if _, err := r.reconcilePeers(ctx, node); err != nil {
4645
t.Fatalf("reconcilePeers: %v", err)
4746
}
4847

49-
got := &seiv1alpha1.SeiNode{}
50-
if err := c.Get(ctx, types.NamespacedName{Name: "my-node", Namespace: "default"}, got); err != nil {
51-
t.Fatalf("Get: %v", err)
52-
}
53-
54-
if len(got.Status.ResolvedPeers) != 2 {
55-
t.Fatalf("expected 2 resolved peers, got %d: %v", len(got.Status.ResolvedPeers), got.Status.ResolvedPeers)
48+
if len(node.Status.ResolvedPeers) != 2 {
49+
t.Fatalf("expected 2 resolved peers, got %d: %v", len(node.Status.ResolvedPeers), node.Status.ResolvedPeers)
5650
}
5751
want := []string{
5852
"peer-1-0.peer-1.default.svc.cluster.local",
5953
"peer-2-0.peer-2.default.svc.cluster.local",
6054
}
6155
for i, w := range want {
62-
if got.Status.ResolvedPeers[i] != w {
63-
t.Errorf("resolvedPeers[%d] = %q, want %q", i, got.Status.ResolvedPeers[i], w)
56+
if node.Status.ResolvedPeers[i] != w {
57+
t.Errorf("resolvedPeers[%d] = %q, want %q", i, node.Status.ResolvedPeers[i], w)
6458
}
6559
}
6660
}
@@ -90,23 +84,18 @@ func TestReconcilePeers_ExcludesSelf(t *testing.T) {
9084
Spec: seiv1alpha1.SeiNodeSpec{ChainID: "test-1", Image: "sei:latest", FullNode: &seiv1alpha1.FullNodeSpec{}},
9185
}
9286

93-
r, c := newNodeReconciler(t, node, peer)
87+
r, _ := newNodeReconciler(t, node, peer)
9488
ctx := context.Background()
9589

96-
if err := r.reconcilePeers(ctx, node); err != nil {
90+
if _, err := r.reconcilePeers(ctx, node); err != nil {
9791
t.Fatalf("reconcilePeers: %v", err)
9892
}
9993

100-
got := &seiv1alpha1.SeiNode{}
101-
if err := c.Get(ctx, types.NamespacedName{Name: "my-node", Namespace: "default"}, got); err != nil {
102-
t.Fatalf("Get: %v", err)
103-
}
104-
105-
if len(got.Status.ResolvedPeers) != 1 {
106-
t.Fatalf("expected 1 resolved peer (self excluded), got %d: %v", len(got.Status.ResolvedPeers), got.Status.ResolvedPeers)
94+
if len(node.Status.ResolvedPeers) != 1 {
95+
t.Fatalf("expected 1 resolved peer (self excluded), got %d: %v", len(node.Status.ResolvedPeers), node.Status.ResolvedPeers)
10796
}
108-
if got.Status.ResolvedPeers[0] != "other-node-0.other-node.default.svc.cluster.local" {
109-
t.Errorf("resolvedPeers[0] = %q", got.Status.ResolvedPeers[0])
97+
if node.Status.ResolvedPeers[0] != "other-node-0.other-node.default.svc.cluster.local" {
98+
t.Errorf("resolvedPeers[0] = %q", node.Status.ResolvedPeers[0])
11099
}
111100
}
112101

@@ -134,20 +123,15 @@ func TestReconcilePeers_CrossNamespace_DoesNotExcludeMatchingName(t *testing.T)
134123
Spec: seiv1alpha1.SeiNodeSpec{ChainID: "test-1", Image: "sei:latest", FullNode: &seiv1alpha1.FullNodeSpec{}},
135124
}
136125

137-
r, c := newNodeReconciler(t, node, peerSameName)
126+
r, _ := newNodeReconciler(t, node, peerSameName)
138127
ctx := context.Background()
139128

140-
if err := r.reconcilePeers(ctx, node); err != nil {
129+
if _, err := r.reconcilePeers(ctx, node); err != nil {
141130
t.Fatalf("reconcilePeers: %v", err)
142131
}
143132

144-
got := &seiv1alpha1.SeiNode{}
145-
if err := c.Get(ctx, types.NamespacedName{Name: "shared-name", Namespace: "ns-a"}, got); err != nil {
146-
t.Fatalf("Get: %v", err)
147-
}
148-
149-
if len(got.Status.ResolvedPeers) != 1 {
150-
t.Fatalf("expected 1 peer (same name, different ns), got %d: %v", len(got.Status.ResolvedPeers), got.Status.ResolvedPeers)
133+
if len(node.Status.ResolvedPeers) != 1 {
134+
t.Fatalf("expected 1 peer (same name, different ns), got %d: %v", len(node.Status.ResolvedPeers), node.Status.ResolvedPeers)
151135
}
152136
}
153137

@@ -179,7 +163,7 @@ func TestReconcilePeers_NoPatchWhenUnchanged(t *testing.T) {
179163
r, _ := newNodeReconciler(t, node, peer)
180164

181165
// Should not error — resolved peers match, no patch needed
182-
if err := r.reconcilePeers(context.Background(), node); err != nil {
166+
if _, err := r.reconcilePeers(context.Background(), node); err != nil {
183167
t.Fatalf("reconcilePeers: %v", err)
184168
}
185169
}
@@ -199,7 +183,7 @@ func TestReconcilePeers_NoLabelSources_NoPatch(t *testing.T) {
199183

200184
r, _ := newNodeReconciler(t, node)
201185

202-
if err := r.reconcilePeers(context.Background(), node); err != nil {
186+
if _, err := r.reconcilePeers(context.Background(), node); err != nil {
203187
t.Fatalf("reconcilePeers: %v", err)
204188
}
205189
// No label sources means no resolved peers, no patch — just verifying no error
@@ -234,19 +218,14 @@ func TestReconcilePeers_DeduplicatesOverlappingSources(t *testing.T) {
234218
Spec: seiv1alpha1.SeiNodeSpec{ChainID: "test-1", Image: "sei:latest", FullNode: &seiv1alpha1.FullNodeSpec{}},
235219
}
236220

237-
r, c := newNodeReconciler(t, node, peer)
221+
r, _ := newNodeReconciler(t, node, peer)
238222
ctx := context.Background()
239223

240-
if err := r.reconcilePeers(ctx, node); err != nil {
224+
if _, err := r.reconcilePeers(ctx, node); err != nil {
241225
t.Fatalf("reconcilePeers: %v", err)
242226
}
243227

244-
got := &seiv1alpha1.SeiNode{}
245-
if err := c.Get(ctx, types.NamespacedName{Name: "my-node", Namespace: "default"}, got); err != nil {
246-
t.Fatalf("Get: %v", err)
247-
}
248-
249-
if len(got.Status.ResolvedPeers) != 1 {
250-
t.Fatalf("expected 1 deduplicated peer, got %d: %v", len(got.Status.ResolvedPeers), got.Status.ResolvedPeers)
228+
if len(node.Status.ResolvedPeers) != 1 {
229+
t.Fatalf("expected 1 deduplicated peer, got %d: %v", len(node.Status.ResolvedPeers), node.Status.ResolvedPeers)
251230
}
252231
}

0 commit comments

Comments
 (0)