Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@devrev/ts-adaas",
"version": "1.19.8",
"version": "1.19.9-beta.1",
"description": "Typescript library containing the ADaaS(AirDrop as a Service) control protocol.",
"type": "commonjs",
"main": "./dist/index.js",
Expand Down
173 changes: 173 additions & 0 deletions src/common/control-protocol.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import { axiosClient } from '../http/axios-client-internal';
import { createAxiosResponse } from '../tests/test-helpers';
import { createMockEvent } from './test-utils';
import { emit } from './control-protocol';
import { EventType, ExtractorEventType } from '../types/extraction';
import { LoaderEventType } from '../types/loading';

jest.mock('../http/axios-client-internal');

const mockedAxiosClient = jest.mocked(axiosClient);

describe('control-protocol.emit', () => {
beforeEach(() => {
jest.clearAllMocks();
mockedAxiosClient.post.mockResolvedValue(createAxiosResponse());
});

it.each([
{
title: 'extractor events',
inputEventType: EventType.StartExtractingData,
outputEventType: ExtractorEventType.DataExtractionProgress,
},
{
title: 'loader events',
inputEventType: EventType.StartLoadingData,
outputEventType: LoaderEventType.DataLoadingProgress,
},
{
title: 'unknown events',
inputEventType: EventType.StartExtractingData,
outputEventType: 'SOME_UNKNOWN_EVENT' as ExtractorEventType,
},
])(
'sets state dates from event context for $title',
async ({ inputEventType, outputEventType }) => {
const event = createMockEvent(undefined, {
payload: {
event_type: inputEventType,
event_context: {
extract_from: '2024-01-01T00:00:00.000Z',
extract_to: '2024-06-01T00:00:00.000Z',
},
},
});

await emit({
event,
eventType: outputEventType,
});

expect(mockedAxiosClient.post).toHaveBeenCalledTimes(1);

const [, body, config] = mockedAxiosClient.post.mock.calls[0] as [
string,
{
event_type: string;
event_context: {
extract_from?: string;
extract_to?: string;
};
worker_metadata: Record<string, unknown>;
},
{ headers: Record<string, string> }
];

expect(body).toMatchObject({
event_type: outputEventType,
event_context: expect.objectContaining({
extract_from: '2024-01-01T00:00:00.000Z',
extract_to: '2024-06-01T00:00:00.000Z',
}),
worker_metadata: expect.objectContaining({
oldest_state_date: '2024-01-01T00:00:00.000Z',
newest_state_date: '2024-06-01T00:00:00.000Z',
}),
});

expect(config).toEqual(
expect.objectContaining({
headers: expect.objectContaining({
'X-DevRev-Client-Version': expect.any(String),
}),
})
);
}
);

it.each([
{
title: 'only extract_from is set',
extractFrom: '2024-01-01T00:00:00.000Z',
extractTo: undefined,
},
{
title: 'only extract_to is set',
extractFrom: undefined,
extractTo: '2024-06-01T00:00:00.000Z',
},
{
title: 'neither extract_from nor extract_to is set',
extractFrom: undefined,
extractTo: undefined,
},
])(
'handles state-date absence when $title',
async ({ extractFrom, extractTo }) => {
const event = createMockEvent(undefined, {
payload: {
event_type: EventType.StartExtractingData,
event_context: {
extract_from: extractFrom,
extract_to: extractTo,
},
},
});

await emit({
event,
eventType: ExtractorEventType.DataExtractionProgress,
});

const [, body] = mockedAxiosClient.post.mock.calls[0] as [
string,
{
worker_metadata: Record<string, unknown>;
},
unknown
];
const workerMetadata = body.worker_metadata;

expect(workerMetadata.oldest_state_date).toBe(extractFrom);
expect(workerMetadata.newest_state_date).toBe(extractTo);
}
);

it('overrides caller-provided worker_metadata state dates', async () => {
const event = createMockEvent(undefined, {
payload: {
event_type: EventType.StartExtractingData,
event_context: {
extract_from: '2024-01-01T00:00:00.000Z',
extract_to: '2024-06-01T00:00:00.000Z',
},
},
});

await emit({
event,
eventType: ExtractorEventType.DataExtractionProgress,
worker_metadata: {
item_type: 'tasks',
oldest_state_date: 'should-be-overwritten',
newest_state_date: 'should-be-overwritten',
},
});

const [, body] = mockedAxiosClient.post.mock.calls[0] as [
string,
{
worker_metadata: Record<string, unknown>;
},
unknown
];
expect(body.worker_metadata).toEqual(
expect.objectContaining({
item_type: 'tasks',
oldest_state_date: '2024-01-01T00:00:00.000Z',
newest_state_date: '2024-06-01T00:00:00.000Z',
})
);
});
});
6 changes: 6 additions & 0 deletions src/common/control-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
ExtractorEvent,
ExtractorEventType,
LoaderEvent,
WorkerMetadata,
} from '../types/extraction';
import { LoaderEventType } from '../types/loading';
import { LIBRARY_VERSION } from './constants';
Expand All @@ -15,12 +16,14 @@ export interface EmitInterface {
event: AirdropEvent;
eventType: ExtractorEventType | LoaderEventType;
data?: EventData;
worker_metadata?: WorkerMetadata;
}

export const emit = async ({
event,
eventType,
data,
worker_metadata,
}: EmitInterface): Promise<AxiosResponse> => {
// Translate outgoing event type to ensure we always send new event types
// TODO: Remove when the old types are completely phased out
Expand All @@ -33,6 +36,9 @@ export const emit = async ({
...data,
},
worker_metadata: {
...worker_metadata,
newest_state_date: event.payload.event_context.extract_to,
oldest_state_date: event.payload.event_context.extract_from,
adaas_library_version: LIBRARY_VERSION,
},
};
Expand Down
96 changes: 96 additions & 0 deletions src/multithreading/worker-adapter/worker-adapter.emit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ function makeAdapter(eventType: EventType = EventType.StartExtractingData): {
return { adapter, event, adapterState };
}

const iso = (ms: number) => new Date(ms).toISOString();

describe(`${WorkerAdapter.name}.emit`, () => {
let adapter: WorkerAdapter<TestState>;
let mockPostMessage: jest.Mock;
Expand Down Expand Up @@ -317,6 +319,100 @@ describe(`${WorkerAdapter.name}.emit`, () => {
});
});

describe(`${WorkerAdapter.name}.emit — worker_metadata`, () => {
let adapter: WorkerAdapter<TestState>;

beforeEach(() => {
jest.clearAllMocks();
({ adapter } = makeAdapter());
adapter['adapterState'].postState = jest.fn().mockResolvedValue(undefined);
adapter.uploadAllRepos = jest.fn().mockResolvedValue(undefined);
});

function setupRepos(
adapterInstance: WorkerAdapter<TestState>,
lastExtractedItemType: string
) {
adapterInstance['repos'] = [
{
itemType: 'issues',
dateRanges: {
creationDate: { oldest: 100, newest: 200 },
modifiedDate: { oldest: 150, newest: 250 },
},
},
{
itemType: 'tasks',
dateRanges: {
creationDate: { oldest: 300, newest: 400 },
modifiedDate: { oldest: 350, newest: 450 },
},
},
] as never;
adapterInstance['lastExtractedItemType'] = lastExtractedItemType;
}

it('should emit flat worker_metadata for the latest extracted item type only', async () => {
setupRepos(adapter, 'tasks');

await adapter.emit(ExtractorEventType.DataExtractionProgress);

const { emit: mockEmit } = require('../../common/control-protocol');
expect(mockEmit.mock.calls[0][0].worker_metadata).toEqual({
item_type: 'tasks',
oldest_created_date: iso(300),
newest_created_date: iso(400),
oldest_modified_date: iso(350),
newest_modified_date: iso(450),
});
});

it('should omit unset RFC3339 bounds from worker_metadata', async () => {
adapter['repos'] = [
{
itemType: 'tasks',
dateRanges: {
creationDate: { oldest: 0, newest: 0 },
modifiedDate: { oldest: 0, newest: 0 },
},
},
] as never;
adapter['lastExtractedItemType'] = 'tasks';

await adapter.emit(ExtractorEventType.DataExtractionProgress);

const { emit: mockEmit } = require('../../common/control-protocol');
expect(mockEmit.mock.calls[0][0].worker_metadata).toEqual({
item_type: 'tasks',
});
});

it('should send empty worker_metadata when no item type has been extracted', async () => {
await adapter.emit(ExtractorEventType.DataExtractionProgress);

const { emit: mockEmit } = require('../../common/control-protocol');
expect(mockEmit.mock.calls[0][0].worker_metadata).toEqual({});
});

it('should send empty worker_metadata for loader events', async () => {
setupRepos(adapter, 'tasks');

await adapter.emit(LoaderEventType.DataLoadingProgress);

const { emit: mockEmit } = require('../../common/control-protocol');
expect(mockEmit.mock.calls[0][0].worker_metadata).toEqual({});
});

it('should send empty worker_metadata for non-progress extraction events', async () => {
setupRepos(adapter, 'tasks');

await adapter.emit(ExtractorEventType.DataExtractionError);

const { emit: mockEmit } = require('../../common/control-protocol');
expect(mockEmit.mock.calls[0][0].worker_metadata).toEqual({});
});
});

describe(`${WorkerAdapter.name}.emit — ExternalSyncUnitExtractionDone legacy path`, () => {
it('should upload ESUs via a repo and strip external_sync_units from the emitted payload', async () => {
// Arrange
Expand Down
9 changes: 9 additions & 0 deletions src/multithreading/worker-adapter/worker-adapter.helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,12 @@ export function addReportToLoaderReport({

return loaderReports;
}

export function toRfc3339Timestamp(ms: number): string | undefined {
if (!Number.isFinite(ms) || ms === 0) {
return undefined;
}

return new Date(ms).toISOString();
}

Loading
Loading