Callback for workflow update support#9614
Conversation
bergundy
left a comment
There was a problem hiding this comment.
I think we need just one more round here. For when updates are already completed, let's make sure to generate the new link type we discussed server-side.
| func (l *Library) Components() []*chasm.RegistrableComponent { | ||
| return []*chasm.RegistrableComponent{ | ||
| chasm.NewRegistrableComponent[*Workflow](chasm.WorkflowComponentName), | ||
| chasm.NewRegistrableComponent[*WorkflowUpdate](chasm.WorkflowUpdateComponentName), |
There was a problem hiding this comment.
Given that workflow update is tightly coupled to workflows, it makes total sense to put them in the same library.
| *workflowpb.UpdateState | ||
|
|
||
| // MSPointer is a special in-memory field for accessing the underlying mutable state. | ||
| chasm.MSPointer |
There was a problem hiding this comment.
This was only supposed to be embedded in the top level Workflow component but I can see why you'd want to access it here. No strong opinion because either way this would be a workaround. I wonder though if you need to embed this or if it'd be better to make it a named field.
There was a problem hiding this comment.
It was embed in the workflow component so I made it embed here
There was a problem hiding this comment.
if it's not embedded then it would also need to be an exported field otherwise CHASM tree deserialization will not work. Probably to keep similar convention embedding is ok here
| ) | ||
| MaxCallbacksPerUpdateID = NewNamespaceIntSetting( | ||
| "system.maxCallbacksPerUpdateID", | ||
| 32, |
There was a problem hiding this comment.
I think limiting all of the workflow callbacks, regardless of what component they're attached to makes more sense than a per component limit due to the fact that the entire tree needs to be loaded into memory when mutable state is accessed today.
There was a problem hiding this comment.
I also limited all workflow callbacks as well. I added this limit as well to keep one update from using up all the callbacks limit on a workflow.
stephanos
left a comment
There was a problem hiding this comment.
Only made it half-way through so far; but figured I can send my first review comments now.
| links []*commonpb.Link, | ||
| identity string, | ||
| priority *commonpb.Priority, | ||
| workflowUpdateOptions map[string]*historypb.WorkflowExecutionOptionsUpdatedEventAttributes_WorkflowUpdateOptionsUpdate, |
There was a problem hiding this comment.
I know it's not wrong, but ... WorkflowUpdateOptionsUpdate 😬
(non-blocking; just noticing)
There was a problem hiding this comment.
Yeah I agree
a453230 to
09ac27a
Compare
| // - The event will be written atomically with acceptance | ||
| // If the Update struct is lost (registry cleared), the abort mechanism fires | ||
| // registryClearedErr on the caller's future, prompting an immediate retry. | ||
| if u.state == stateAdmitted || u.state == stateSent { |
There was a problem hiding this comment.
added handling for stateAdmitted, should be same as stateSent but returns false, nil since IIUC caller still needs to create the speculative WFT at this stage
09ac27a to
9de5339
Compare
|
Made some updates to bring this to latest Only logical changes are on on the top commit -- handling |
9de5339 to
4b0915d
Compare
8551a4f to
3ae1202
Compare
3ae1202 to
2ce7339
Compare
## What changed? When we set Nexus callback URL in test_env.go, the dynamic config override is still tied to the test's lifetime, not the cluster's lifetime, so a subsequent test that reuse this cluster will not have that override. Moving the override to onebox.go (similar pattern to #9918) so this default lives for the lifetime of the cluster. ## Why? Ran into issue with task token not set in #9614, this solves it. Breaking the fix in a separate PR for ease of review + checking this in first. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s)
## What changed? Added a `createExternalNexusServer(...)` which sets up an external Nexus endpoint with user-provided handler and listens on a provided address. This is used in nexus_workflow_test.go and will be used more in #9614 Opportunistically did a couple more drive-by refactors/consistency fixes, specifically: * Force user to provide `ctx` into the endpoint creation functions instead of making a new `ctx` * Use `env.Context()` instead of `testcore.NewContext()` in all suites that I touched here ## Why? Pulling changes out of #9614 into targeted PRs to reduce load on reviewers. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s)
| // TODO (alex-update): This method is noop because we don't currently write rejections to the history. | ||
| return nil | ||
| func (ms *MutableStateImpl) RejectWorkflowExecutionUpdate(updateID string, wfFailure *failurepb.Failure) error { | ||
| if !ms.chasmCallbacksEnabled() { |
There was a problem hiding this comment.
elsewhere we use chasmEnabled; why use chasmCallbacksEnabled here? if it's intentional, a comment would be helpful.
| // but update callbacks must fire now because the update was aborted on the old run. | ||
| func (w *Workflow) ProcessAllUpdateCloseCallbacks(ctx chasm.MutableContext) error { | ||
| for _, updateField := range w.Updates { | ||
| if err := callback.ScheduleStandbyCallbacks(ctx, updateField.Get(ctx).Callbacks); err != nil { |
There was a problem hiding this comment.
Is my understanding correct that once this returns an error; the entire attempt to schedule callbacks is aborted and retried from the top? We wouldn't want partial results.
There was a problem hiding this comment.
right, I think it's better to abort here and retry -- seems like it'd be harder to reason about if we only partially succeed some updates
| if len(wf.Updates) == 0 { | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Is this a meaningful perf optimization?
There was a problem hiding this comment.
seems like for now it still has some perf implication based on offline discussion -- added TODOs to cleanup this read-to-write upgrade when it's CHASM natively detects mutation
|
|
||
| // flushPendingCallbacks writes one WorkflowExecutionOptionsUpdatedEvent per | ||
| // buffered AttachCallbacks callback, skipping any whose requestID is already persisted. | ||
| // Called from onAcceptanceMsg after the acceptance event has been written. |
There was a problem hiding this comment.
This line makes it sound like it's only called from onAcceptanceMsg but that's not true.
There was a problem hiding this comment.
replying for posterity from offline chat -- it should only be called from onAcceptanceMsg (not on rejection)
| "go.temporal.io/server/common/nexus/nexusrpc" | ||
| ) | ||
|
|
||
| type WorkflowUpdate struct { |
There was a problem hiding this comment.
I think it's not yet clearly documented anywhere that the semantics of a rejected Update with vs without callbacks are different now. If an update has callbacks it will incur a write on rejection now where it didn't before.
I'd update docs/architecture/workflow-update.md and add comments across the update package to clarify that (e.g. pendingCallbacks, onRejectionMsg).
There was a problem hiding this comment.
If an update has callbacks it will incur a write on rejection now where it didn't before.
That shouldn't be true, the callback is not used for rejection normally (speculative update case). It should only used if the update was durably admitted
There was a problem hiding this comment.
I see onRejectionMsg has eventStore EventStore and if the update is in "sent" or "admitted" (ie before "acceptance") it invokes flushPendingCallbacks which - if there are callbacks - invokes AddWorkflowExecutionOptionsUpdatedEvent which AFAICT is a write? And then RejectWorkflowExecutionUpdate also adds another one, no?
There was a problem hiding this comment.
modified the workflow-update.md to capture the expected buffering -> persist behavior for updates (tldr: buffered when they get admitted/sent, persisted on acceptance, but not on rejection). Essentially the same as before, with the CHASM persistence so we can attach completion callbacks
239cb1f to
72b65be
Compare
| type CallbackParent interface { | ||
| RemoveCallback(ctx chasm.MutableContext, c *Callback) | ||
| } |
There was a problem hiding this comment.
@stephanos LMK if this is an idiomatic pattern (self-cleanup of callbacks via optional interface)
There was a problem hiding this comment.
AFAIK there's no established pattern for this yet. cc @yycptt
Squashed these commits, left for posterity: - Add Nexus Workflow Update - Update from rebase - Fix sent state - Cleanup - Fix lint - Fix more CI - fix - Review clean up - Try suggestions from the review skill - Fix some tests - Add TODO for rejected event - Remove .omc from gitignore - Respond to PR comments - Add NS Capability for this feature - Respond to PR comments - Update API
72b65be to
4b7757c
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 4 potential issues.
Reviewed by Cursor Bugbot for commit 579442b. Configure here.
| maxCallbacksPerUpdateID, | ||
| updateID, | ||
| currentCallbackCount, | ||
| ) |
There was a problem hiding this comment.
Duplicate callbacks can exceed configured limits
Medium Severity
AddUpdateCompletionCallbacks enforces limits before deduplicating existing callback IDs. When the same callbacks are re-applied for an update (for example admission plus acceptance replay paths), valid idempotent re-registration can fail with FailedPrecondition even though no new callbacks would be added.
Additional Locations (2)
Reviewed by Cursor Bugbot for commit 579442b. Configure here.
There was a problem hiding this comment.
right, good spot, it seems a bit clunky to assert this (see countNewCallbacks(...) but maybe better than not
| Reference: &commonpb.Link_WorkflowEvent_RequestIdRef{ | ||
| RequestIdRef: &commonpb.Link_WorkflowEvent_RequestIdReference{ | ||
| RequestId: requestID, | ||
| EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED, |
There was a problem hiding this comment.
Update backlink may reference wrong request id
Low Severity
The response link uses u.req...RequestId instead of the accepted update’s persisted identifier. For duplicate update_id calls or requests without request_id, Link_WorkflowEvent.RequestIdRef can be empty or mismatched, producing a backlink that does not correspond to the actual accepted event.
Reviewed by Cursor Bugbot for commit 579442b. Configure here.
|
|
||
| func (u *Update) AcceptedEventID() int64 { | ||
| return u.acceptedEventID | ||
| } |
There was a problem hiding this comment.
Unused exported update accessor added
Low Severity
AcceptedEventID is newly exported but has no callers in the repository. This adds dead public surface to update.Update, which increases maintenance burden and can mislead future code into relying on an accessor that currently has no supported use.
Reviewed by Cursor Bugbot for commit 579442b. Configure here.
There was a problem hiding this comment.
This connects to my comment about Link_WorkflowEvent_EventRef
| eq(msg.GetMeta().GetUpdateId(), prefix+"meta.update_id", updateID, updateID, msg), | ||
| notZero(msg.GetInput(), prefix+"input", msg), | ||
| notZero(msg.GetInput().GetName(), prefix+"input.name", msg), | ||
| callbacksRequireRequestID(msg), |
There was a problem hiding this comment.
Rejection failure is not validated
Medium Severity
validateRejectionMsg only checks that the rejection message exists, not that rejection.failure is present. A nil failure is then forwarded through RejectWorkflowExecutionUpdate, so update callbacks lose the rejection payload and can resolve via the wrong completion path.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 579442b. Configure here.
## What changed? Added a `createExternalNexusServer(...)` which sets up an external Nexus endpoint with user-provided handler and listens on a provided address. This is used in nexus_workflow_test.go and will be used more in #9614 Opportunistically did a couple more drive-by refactors/consistency fixes, specifically: * Force user to provide `ctx` into the endpoint creation functions instead of making a new `ctx` * Use `env.Context()` instead of `testcore.NewContext()` in all suites that I touched here ## Why? Pulling changes out of #9614 into targeted PRs to reduce load on reviewers. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s)
|
|
||
| if len(request.GetRequest().GetRequestId()) > wh.config.MaxIDLengthLimit() { | ||
| return errRequestIDTooLong | ||
| } |
| return nil, err | ||
| } | ||
| resp := u.CreateResponse(u.wfKey, status.Outcome, status.Stage) | ||
| // Attach a link to the response. For accepted/completed updates, use a WorkflowEvent link |
There was a problem hiding this comment.
nit: let's give this code block a newline above and below to breath
| Namespace: u.req.Request.Namespace, | ||
| WorkflowId: u.wfKey.WorkflowID, | ||
| RunId: u.wfKey.RunID, | ||
| Reference: &commonpb.Link_WorkflowEvent_RequestIdRef{ |
There was a problem hiding this comment.
I'm new to these link events, but why would't we use Link_WorkflowEvent_EventRef since we have an EventId (ie AcceptedEventID)?
| } | ||
| if got := workflowEvent.GetRequestIdRef().GetEventType(); got != enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED { | ||
| return nil, nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "expected event type UPDATE_ACCEPTED, got %v", got) | ||
| } |
There was a problem hiding this comment.
Should this verify that it "points to the accepted event"?
| s.NoError(err) | ||
|
|
||
| // hasUpdateChasmNode reports whether any CHASM node path references the given update ID. | ||
| hasUpdateChasmNode := func(desc *adminservice.DescribeMutableStateResponse) bool { |
There was a problem hiding this comment.
I'm not sure about the functional/acceptance tests reaching that deep into the server internals. It couples the test too much to the impl details for me. Maybe this should be a unit/integration test inside the CHASM package instead?
| // from saveResult on terminal transition. | ||
| func (u *WorkflowUpdate) RemoveCallback(ctx chasm.MutableContext, c *callback.Callback) { | ||
| for id, field := range u.Callbacks { | ||
| if field.Get(ctx) == c { |
There was a problem hiding this comment.
side note; I was surprised that this is safe. But it appears to be safe. I didn't think that comparing the pointers here would work in CHASM, but apparently that's how it works. I'm not sure we can rely on that, though?
| func (u *WorkflowUpdate) RemoveCallback(ctx chasm.MutableContext, c *callback.Callback) { | ||
| for id, field := range u.Callbacks { | ||
| if field.Get(ctx) == c { | ||
| delete(u.Callbacks, id) |
There was a problem hiding this comment.
I'm torn between paying the runtime cost of iteration over this and the fact that the map key is essentially "unrecoverable" as it's based on information that's thrown away again - vs storing the key/part of the key so this becomes an O(1) lookup. I acknowledge that there likely won't be many callbacks here, though. So I suppose runtime loop is fine.
| // update-level scheduling are independent: failure of one does not stop the | ||
| // other; the errors are joined. | ||
| func (w *Workflow) ScheduleCloseCallbacks(ctx chasm.MutableContext) error { | ||
| wfErr := callback.ScheduleStandbyCallbacks(ctx, w.Callbacks) |
There was a problem hiding this comment.
nit: maybe a softassert invariant error if the workflow isn't closed?
There was a problem hiding this comment.
needed to expose a function from ms_pointer.go to check state but maybe that's ok
| maxCallbacksPerUpdateID, | ||
| updateID, | ||
| currentCallbackCount, | ||
| ) |
|
|
||
| func (u *Update) AcceptedEventID() int64 { | ||
| return u.acceptedEventID | ||
| } |
There was a problem hiding this comment.
This connects to my comment about Link_WorkflowEvent_EventRef


What changed?
Added support for Nexus workflow update completion callbacks via CHASM. This allows a Nexus caller to be notified when a workflow update completes by attaching completion callbacks to the update request.
Why?
Nexus operations that target workflow updates need a way to receive completion notifications. Without this, a Nexus caller that sends an update has no async mechanism to learn when the update finishes. Completion callbacks enable the same async notification pattern that already exists for workflow-level Nexus operations.
How did you test it?
Potential risks
Touches speculative workflow updates, they are always hard to reason about. Tried to compensate with lots of test coverage.
Note: Needs this API PR https://github.com/temporalio/api/pull/742/changes
Note
High Risk
Touches workflow update state transitions, mutable state/history event application, and callback scheduling paths (including retry/continue-as-new), which can affect correctness of update outcomes and callback delivery. Also adds a
go.modreplaceforgo.temporal.io/api, increasing dependency and compatibility risk.Overview
Adds CHASM-backed completion callbacks for workflow updates: update requests can register Nexus
completion_callbacks, persist them viaWorkflowExecutionOptionsUpdated/update events, and schedule them when the update completes, the workflow closes, or the run continues.Introduces a new per-update CHASM component (
WorkflowUpdate+UpdateStateproto) with opt-in self-cleanup of terminal callbacks, plus new mutable-state support to fetch update completion data (GetNexusUpdateCompletion) and to fire callbacks on update completion/rejection and on close paths.Extends update handling to validate
request_idwhen callbacks are present, buffer callbacks pre-acceptance, persist/dedup them on acceptance usingWorkflowExecutionOptionsUpdatedEventAttributes.WorkflowUpdateOptions, and return clearer response backlinks (workflow-event link for accepted updates, workflow link for validator rejections).Reviewed by Cursor Bugbot for commit 579442b. Bugbot is set up for automated code reviews on this repo. Configure here.