[Async Batch Pipeline] Add AsyncAction support to actions-core#3768
[Async Batch Pipeline] Add AsyncAction support to actions-core#3768sayan-das-in wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Pull request overview
Adds first-class async action support to the Actions Core (packages/core) destination-kit so cloud destinations can define and execute batched async workflows (with a follow-up poll step), and updates CLI tooling + tests accordingly.
Changes:
- Introduces
AsyncActionDefinitionplus async batch + poll execution paths (executeAsyncBatch,executeAsyncPoll) indestination-kit. - Exposes new async types from core entrypoints (
AsyncActionDefinition,AsyncPerformBatchResponse,PollPayload,PollResponse). - Updates CLI validation/type-generation to account for
asyncActions, and adds unit tests covering async batching + polling.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/core/src/index.ts | Re-exports new async action / polling types from core. |
| packages/core/src/destination-kit/types.ts | Minor formatting fix to Result interface comment/layout. |
| packages/core/src/destination-kit/index.ts | Adds asyncActions to destination definitions and adds executeAsyncBatch / executeAsyncPoll on Destination. |
| packages/core/src/destination-kit/action.ts | Defines async action types (AsyncActionDefinition, PollPayload, PollResponse) and implements AsyncAction runtime. |
| packages/core/src/tests/destination-kit.test.ts | Adds unit tests for destination initialization + async batch/poll behavior. |
| packages/core/src/tests/batching.test.ts | Adds unit tests focused on async batching/polling flows. |
| packages/cli/src/commands/validate.ts | Validates destinations by considering both actions and asyncActions. |
| packages/cli/src/commands/generate/types.ts | Generates field payload types for both sync and async actions. |
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (53.36%) is below the target coverage (80.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #3768 +/- ##
=========================================
Coverage 80.93% 80.93%
=========================================
Files 1348 1405 +57
Lines 25089 28265 +3176
Branches 5212 6120 +908
=========================================
+ Hits 20305 22876 +2571
- Misses 3837 4436 +599
- Partials 947 953 +6 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
7b7872f to
ce920cb
Compare
ce920cb to
6c30649
Compare
| // Total number of events in the batch being processed. | ||
| totalEventsCount: number | ||
|
|
||
| // Number of events that was processed | ||
| validEventsCount: number |
| const syncModeVal = this.definition.syncMode ? bundle.mapping?.['__segment_internal_sync_mode'] : undefined | ||
| const syncMode = isSyncMode(syncModeVal) ? syncModeVal : undefined | ||
| const matchingKey = bundle.mapping?.['__segment_internal_matching_key'] | ||
| const audienceMembership = bundle.data.map((d) => resolveAudienceMembership(d, syncMode)) | ||
|
|
| let audienceSettings = {} as AudienceSettings | ||
| // All events should be batched on the same audience | ||
| if (events[0].context?.personas) { | ||
| audienceSettings = events[0].context?.personas?.audience_settings as AudienceSettings | ||
| } |
| const pollPayload: PollPayload = { | ||
| jobId: 'test-job-123', | ||
| attempt: 1 | ||
| } |
| stats: { | ||
| totalEventsCount: 100, | ||
| successfulEventsCount: 50, | ||
| failedEventsCount: 0 | ||
| } |
| const destination = new Destination(asyncPollDestination) | ||
| const pollPayload: PollPayload = { jobId: 'poll-job-123', attempt: 1 } | ||
|
|
6c30649 to
84ddbac
Compare
84ddbac to
9bb16ce
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 10 comments.
Comments suppressed due to low confidence (1)
packages/core/src/destination-kit/index.ts:862
executeDynamicFieldonly looks upactionSluginthis.actions. If async actions are expected to supportdynamicFields(theAsyncActionDefinitiontype includes them), dynamic field resolution for async actions will currently return[]. Consider checkingthis.asyncActions[actionSlug]as well and delegating toexecuteDynamicFieldonAsyncActionwhen present.
public async executeDynamicField(
actionSlug: string,
fieldKey: string,
data: ExecuteDynamicFieldInput<Settings, object>,
/**
* The dynamicFn argument is optional since it is only used by dynamic hook input fields. (For now)
*/
dynamicFn?: RequestFn<Settings, any, DynamicFieldResponse, AudienceSettings>
) {
const action = this.actions[actionSlug]
if (!action) {
return []
}
return action.executeDynamicField(fieldKey, data, dynamicFn)
}
| // The payload passed into a poll operation of an AsyncActionDefinition. | ||
| export type PollPayload = { | ||
| // Job ID returned from the performBatch response that identifies the batch request being polled | ||
| jobId: string |
| granularResults?: MultiStatusResponse | ||
|
|
||
| // Batch results for destinations with all or nothing statuses | ||
| batchResults?: ActionDestinationSuccessResponse | ActionDestinationErrorResponse |
| const { jobId, multiStatusResponse: batchMultiStatus } = performBatchResponse | ||
|
|
| let audienceSettings = {} as AudienceSettings | ||
| // All events should be batched on the same audience | ||
| if (events[0].context?.personas) { | ||
| audienceSettings = events[0].context?.personas?.audience_settings as AudienceSettings | ||
| } |
| throw new IntegrationError( | ||
| `Async action ${actionSlug} not found or does not support polling.`, | ||
| 'NotImplemented', | ||
| 501 | ||
| ) |
| test('should throw error when async action is not found', async () => { | ||
| const destinationTest = new Destination(destinationWithAsyncAction) | ||
| const pollPayload: PollPayload = { | ||
| jobId: 'test-job-123', | ||
| attempt: 1 | ||
| } | ||
|
|
| const pollResponse: PollResponse = { | ||
| jobId: 'test-job-123', | ||
| status: 'IN_PROGRESS', | ||
| stats: { | ||
| totalEventsCount: 100, | ||
| successfulEventsCount: 50, | ||
| failedEventsCount: 0 | ||
| } | ||
| } |
| batchResults: { | ||
| status: 500, | ||
| errortype: 'UNKNOWN_ERROR', | ||
| errormessage: 'Destination API returned an error' | ||
| } | ||
| } |
| const destination = new Destination(asyncPollDestination) | ||
| const pollPayload: PollPayload = { jobId: 'poll-job-123', attempt: 1 } | ||
|
|
||
| const result = await destination.executeAsyncPoll('asyncPollAction', { |
| }, | ||
| batchResults: { | ||
| status: 500, | ||
| errortype: 'UNKNOWN_ERROR', | ||
| errormessage: 'External API failure' | ||
| } | ||
| } |
9bb16ce to
54174d2
Compare
54174d2 to
ef07ffd
Compare
| // The payload passed into a poll operation of an AsyncActionDefinition. | ||
| export type PollPayload = { | ||
| // Job ID returned from the performBatch response that identifies the batch request being polled | ||
| jobId: string |
| granularResults?: MultiStatusResponse | ||
|
|
||
| // Batch results for destinations with all or nothing statuses | ||
| batchResults?: ActionDestinationSuccessResponse | ActionDestinationErrorResponse |
| const pollResponse: PollResponse = { | ||
| jobId: 'test-job-123', | ||
| status: 'IN_PROGRESS', | ||
| stats: { | ||
| totalEventsCount: 100, | ||
| successfulEventsCount: 50, | ||
| failedEventsCount: 0 | ||
| } |
| const pollResponse: PollResponse = { | ||
| jobId: 'test-job-456', | ||
| status: 'COMPLETED', | ||
| stats: { | ||
| totalEventsCount: 2, | ||
| successfulEventsCount: 2, | ||
| failedEventsCount: 0 | ||
| }, |
| const pollResponse: PollResponse = { | ||
| jobId: 'test-job-789', | ||
| status: 'FAILED', | ||
| stats: { | ||
| totalEventsCount: 10, | ||
| successfulEventsCount: 0, | ||
| failedEventsCount: 10 | ||
| }, |
| const pollResponse: PollResponse = { | ||
| jobId: 'test-job-auth', | ||
| status: 'COMPLETED', | ||
| stats: { | ||
| totalEventsCount: 1, | ||
| successfulEventsCount: 1, | ||
| failedEventsCount: 0 | ||
| } |
ef07ffd to
02f0e6e
Compare
02f0e6e to
a3ff38d
Compare
| // The payload passed into a poll operation of an AsyncActionDefinition. | ||
| export type PollPayload = { | ||
| // Job ID returned from the performBatch response that identifies the batch request being polled | ||
| jobId: string |
| // The response from a poll operation of an AsyncActionDefinition. | ||
| export type PollResponse = { | ||
| // Echo back the job ID being polled | ||
| jobId: string | ||
|
|
||
| // HTTP status code returned by the partner API during the poll operation | ||
| status: number | ||
|
|
||
| // The status of the poll operation | ||
| pollStatus: 'IN_PROGRESS' | 'COMPLETED' | 'FAILED' | 'ERROR' | ||
|
|
||
| // Return a multi-status response | ||
| // If not available, the status code will be used to determine the outcome | ||
| multiStatusResponse?: MultiStatusResponse |
| const pollPayload: PollPayload = { | ||
| jobId: 'test-job-123', | ||
| attempt: 1 | ||
| } |
| const pollResponse: PollResponse = { | ||
| jobId: 'test-job-123', | ||
| status: 'IN_PROGRESS', | ||
| stats: { | ||
| totalEventsCount: 100, | ||
| successfulEventsCount: 50, | ||
| failedEventsCount: 0 | ||
| } | ||
| } |
| multiStatusResponse.pushSuccessResponse({ status: 200, body: {}, sent: {} }) | ||
| multiStatusResponse.pushSuccessResponse({ status: 200, body: {}, sent: {} }) | ||
|
|
||
| mockPerformBatch.mockResolvedValue({ |
| const destination = new Destination(asyncPollDestination) | ||
| const pollPayload: PollPayload = { jobId: 'poll-job-123', attempt: 1 } | ||
|
|
| const pollResponse: PollResponse = { | ||
| jobId: 'poll-job-123', | ||
| status: 'IN_PROGRESS', | ||
| stats: { | ||
| successCount: 50, | ||
| failureCount: 0 | ||
| } | ||
| } |
a3ff38d to
e3187d6
Compare
This PR adds async actions support to Actions Core package.
These actions will be called from a different endpoint and doesn't any existing flow
Testing
Include any additional information about the testing you have completed to
ensure your changes behave as expected. For a speedy review, please check
any of the tasks you completed below during your testing.
Security Review
Please ensure sensitive data is properly protected in your integration.
type: 'password'New Destination Checklist
verioning-info.tsfile. example