Add Debouncer — coalesce rapid workflow calls into one execution#390
Add Debouncer — coalesce rapid workflow calls into one execution#390easmith wants to merge 19 commits into
Conversation
Implements a debounce mechanism for DBOS workflows analogous to dbos-transact-py _debouncer.py. Multiple calls with the same key within a period are collapsed into a single user-workflow execution that runs with the most recently supplied arguments. Architecture: - DebouncerServiceImpl: internal @workflow that runs a recv-loop, absorbing messages until the debounce period times out or the absolute debounceTimeout elapses, then starts the user workflow. - Debouncer<R>: public fluent API. Enqueues the service workflow on _dbos_internal_queue with a deduplicationId derived from (workflowName, debounceKey). On DBOSQueueDuplicatedException, forwards a message to the running debouncer and waits for an ack. - DBOSExecutor.captureInvocation(): extracted from startWorkflow so Debouncer can capture a lambda's workflow call without executing it. - Auto-registration of DebouncerService in DBOS constructor so users need no boilerplate setup. - Internal system workflows filtered from getRegisteredWorkflows / getRegisteredWorkflowInstances to keep public counts clean. Usage: var handle = dbos.<String>debouncer() .withDebounceTimeout(Duration.ofMinutes(5)) .debounce("key", Duration.ofSeconds(2), () -> svc.process(arg)); String result = handle.getResult(); Tests: 6 integration tests via Testcontainers Postgres covering single-call, multi-call coalescing, absolute timeout, independent keys, concurrent callers, and queue-based user workflow.
Bug 1: userWorkflowId and messageId were generated as UUID.randomUUID() outside any durable step. When debounce() is called from inside a workflow, these values differ on every replay — the returned handle points to a nonexistent workflow and the ack getEvent waits on the wrong key forever. Fix: wrap UUID generation in a runStep when called from a workflow context. Bug 2: Retrieving the existing debouncer's userWorkflowId via status.input()[1] instanceof DebouncerContextOptions always fails on replay. Java records are implicitly final; DBOSJavaSerializer uses NON_FINAL DefaultTyping, so no @Class type metadata is written for them. On deserialisation from Object.class the element comes back as LinkedHashMap, not as DebouncerContextOptions, causing an IllegalStateException. Fix: publish userWorkflowId as a named event (DEBOUNCER_CHILD_ID_KEY) at the start of the debouncer-workflow; callers read it via getEvent instead.
Object[] args round-trip through dbos.send/recv as generic JSON types: long 5L serialises to JSON 5 and back to Integer(5) when the target is Object.class, causing IllegalArgumentException when the method expects a primitive long. Adds JsonUtility.coerceArguments() call before startRegisteredWorkflow, mirroring the coercion already applied in executeWorkflowById (line 1344). Adds numericArgsRoundTripCorrectly test that exercises long/double parameters through the full debounce + coalesce path.
lookupExistingDebouncerId previously called listWorkflows and iterated all active debouncer entries in Java to find the one matching the deduplication id. When called from inside a workflow this result was also serialised as a step, making it a potential OOM bomb under load. Add WorkflowDAO.findWorkflowIdByDeduplicationId that issues a direct point-lookup on the UNIQUE (queue_name, deduplication_id) index: SELECT workflow_uuid FROM workflow_status WHERE queue_name = ? AND deduplication_id = ? Expose through SystemDatabase → DBOSExecutor → DBOSIntegration so Debouncer.lookupExistingDebouncerId becomes a single delegation call.
Two review findings investigated: Reviewed bug: "SQL without status filter -> livelock" Finding: NOT real. updateWorkflowOutcome clears deduplication_id to NULL on completion (WorkflowDAO line 329). PostgreSQL UNIQUE constraints treat NULL != NULL, so the unique slot is freed and a new enqueue succeeds without conflict. findWorkflowIdByDeduplicationId also returns null for completed debouncers since the WHERE deduplication_id = ? predicate never matches NULL. Added regression test reDebouncAfterWindowCloses that confirms two sequential debounce windows on the same key both execute correctly. Reviewed bug: "lookupExistingDebouncerId not a durable step" Finding: REAL when debounce() is called from inside a workflow. If the parent workflow crashes after DBOSQueueDuplicatedException but before the first step (send) is recorded, recovery would re-execute lookupExistingDebouncerId against the live DB rather than replaying a recorded result. This can produce a different debouncer id and break the determinism of the subsequent send and getEvent steps. Python wraps the equivalent call in call_function_as_step. Fix: when DBOS.inWorkflow() && !DBOS.inStep(), record the lookup result as a durable step "lookupDebouncer" so recovery replays it deterministically.
- Propagate caller workflow context (priority, appVersion, deduplicationId, timeout) to the user workflow via DebouncerContextOptions. Add these fields to DBOSContext, populate from ExecutionOptions. - Change debouncerWorkflow return type String → void: return value was unused, Python returns None.
- Guard send with messageSent flag: only one message per debounce() call - Replace unreachable childIdOpt.isEmpty() continue with IllegalStateException
…rkflows - Add debounceVoid, absoluteTimeoutUsesLatestArgs, priorityPropagatedFromCallerContext tests - executeWorkflowById now restores priority/appVersion from workflow_status so DBOSContext.currentPriority() is non-null inside dequeued workflows - Skip queue-option validation for dequeued/recovered workflows - DebouncerServiceImpl: skip priority/deduplicationId when no user queue
|
Hey @easmith, thanks for the contribution! I was traveling today so I haven't had a chance to review this, but I kicked off the GH actions run. I'll take a look tomorrow. (USA Pacific time zone) |
devhawk
left a comment
There was a problem hiding this comment.
Great work so far. I left a bunch of comments, but most are fairly minor (use Duration/Instant instead of longs, use registerWorkflow/startRegisteredWorkflow instead of proxies for internal workflow methods). Feel free to leave responses if you have further questions on these.
Big issues are:
- Needs a version of
Debouncerthat works withDBOSClient. SeeDebouncerClientin DBOS Python for reference Debouncershould have a reference toDBOSExecutorso it can invokefindWorkflowIdByDeduplicationIdandcaptureInvocationdirectly w/o having to go thru DBOSIntegration- I don't think passing debounce values via context is a good idea.
appVersionin particular is an issue since that is fixed for the running application.
- Use Duration/Instant instead of long/Long in DebouncerOptions, DebouncerMessage, and DebouncerContextOptions - Register debouncer workflow via registerWorkflow/startRegisteredWorkflow instead of a proxy; store RegisteredWorkflow instead of DebouncerService - Add null-but-not-empty guard on Debouncer.withQueue(String) - Use send idempotency key instead of messageSent tracking flag - Remove spurious null check on DBOSContextHolder.get() inside workflow - Replace "|"-join UUID hack with a private DebounceIds record - Make instanceName @nullable in getRegisteredWorkflow (DBOSExecutor, DBOSIntegration); drop manual null coercion at call sites
- Remove coerceArguments: DBOSJavaSerializer already writes type info for non-primitive types, so numeric mismatches don't occur; confirmed by test - Fix incorrect comment about record serialization in debouncerWorkflow - Rename debounceVoid → debounce to match the overload pattern of DBOS.runStep
DebouncerClient mirrors Debouncer but works via DBOSClient — no proxy capture, args passed directly, workflow identified by name + className. Implements the same deduplication/ack loop as Debouncer. Also adds DBOSClient.findWorkflowIdByDeduplicationId() and DBOSClient.debouncer(workflowName) factory, and Constants.DEBOUNCER_SERVICE_CLASS_NAME to avoid a silent coupling to the internal class name string.
These are enqueue-time options, not execution-time context. Passing them through DBOSContext creates ambient state that is inconsistent with Java's explicit builder style and does not match Python's actual behavior either: Python's DBOSContext.priority is only set via SetEnqueueOptions and is not auto-restored from the database when a workflow starts executing. - Move appVersion, priority, deduplicationId to DebouncerOptions so they are configured explicitly via Debouncer.withAppVersion/Priority/DeduplicationId - Simplify DebouncerContextOptions to userWorkflowId + workflowTimeout only - Revert DBOSExecutor context creation and validation to pre-PR form - Rename test to reflect that priority is now set explicitly on the debouncer
- Replace dbos.integration().captureInvocation/findWorkflowIdByDeduplicationId/ startRegisteredWorkflow with direct executor calls - Use executor.runStep() for assignDebounceIds and lookupDebouncer steps - Use DBOSContext.getNextWorkflowId() for userWorkflowId so callers can pre-specify it via WorkflowOptions - Remove captureInvocation and findWorkflowIdByDeduplicationId from DBOSIntegration — they were only added for Debouncer
|
Hey @devhawk |
|
One test failed. I guess it's good to rerun this test https://github.com/dbos-inc/dbos-transact-java/actions/runs/26237090589/job/77213213331?pr=390 |
|
|
||
| // Inside a workflow, ID generation is wrapped in a step so replay is deterministic. | ||
| DebounceIds ids; | ||
| if (DBOS.inWorkflow() && !DBOS.inStep()) { |
There was a problem hiding this comment.
I forgot executor.runDbosFunctionAsStep was private. We should make that public so it can be used here and for lookupDebouncer code below. Note, that also requires prefixing step names with "DBOS."
| // When called from inside a workflow, record the result as a durable step so that | ||
| // replay returns the same debouncer id and the subsequent send/getEvent steps stay | ||
| // deterministic. Mirrors Python's call_function_as_step("DBOS.get_deduplicated_workflow"). | ||
| String existingDebouncerId = |
There was a problem hiding this comment.
Use executor.runDbosFunctionAsStep as per earlier comment. Probably can inline lookupExistingDebouncerId code here since it won't need to be repeated
- Expose DBOSExecutor.runDbosFunctionAsStep so Debouncer can use it - Prefix internal step names with "DBOS." (assignDebounceIds, lookupDebouncer)
|
Fixed the latest comments. I'll be mostly offline for the next few days :/ |
Cool. I'm travelling and working for a bit so my schedule is out of wack right now anyways. |
|
|
||
| public static final String DEBOUNCER_WORKFLOW_NAME = "_dbos_debouncer_workflow"; | ||
| public static final String DEBOUNCER_SERVICE_CLASS_NAME = | ||
| "dev.dbos.transact.workflow.internal.InternalWorkflows"; |
There was a problem hiding this comment.
nitpick: Let's use DBOS.InternalWorkflows as the classname and debouncerWorkflow as the workflow name
| * @return list of all user-registered workflow methods | ||
| */ | ||
| public @NonNull Collection<RegisteredWorkflow> getRegisteredWorkflows() { | ||
| var executor = executorSupplier.get(); |
There was a problem hiding this comment.
Since you're offline for a bit, I want to think this thru a little more. I was already thinking we might want to track internal workflows separately from user registered. At one point, the implementation of send was an internal workflow, but other changes eliminated the need to make send a workflow. Now that debounce is decidedly an internal workflow, we might want to track it separately. If we do that, we won't need this change to getRegisteredWorkflows.
Implements a debounce mechanism for DBOS workflows analogous to dbos-transact-py _debouncer.py. Multiple calls with the same key within a period are collapsed into a single user-workflow execution that runs with the most recently supplied arguments.
Architecture:
Usage:
Tests: 6 integration tests via Testcontainers Postgres covering single-call, multi-call coalescing, absolute timeout, independent keys, concurrent callers, and queue-based user workflow.