diff --git a/package-lock.json b/package-lock.json index 00537e9..b9c30e4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@devrev/ts-adaas", - "version": "1.19.8", + "version": "1.19.9-beta.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@devrev/ts-adaas", - "version": "1.19.8", + "version": "1.19.9-beta.1", "license": "ISC", "dependencies": { "@devrev/typescript-sdk": "^1.1.74", diff --git a/package.json b/package.json index d9b2223..c65a42d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@devrev/ts-adaas", - "version": "1.19.8", + "version": "1.19.9-beta.1", "description": "Typescript library containing the ADaaS(AirDrop as a Service) control protocol.", "type": "commonjs", "main": "./dist/index.js", diff --git a/src/common/control-protocol.test.ts b/src/common/control-protocol.test.ts new file mode 100644 index 0000000..d975355 --- /dev/null +++ b/src/common/control-protocol.test.ts @@ -0,0 +1,173 @@ +import { axiosClient } from '../http/axios-client-internal'; +import { createAxiosResponse } from '../tests/test-helpers'; +import { createMockEvent } from './test-utils'; +import { emit } from './control-protocol'; +import { EventType, ExtractorEventType } from '../types/extraction'; +import { LoaderEventType } from '../types/loading'; + +jest.mock('../http/axios-client-internal'); + +const mockedAxiosClient = jest.mocked(axiosClient); + +describe('control-protocol.emit', () => { + beforeEach(() => { + jest.clearAllMocks(); + mockedAxiosClient.post.mockResolvedValue(createAxiosResponse()); + }); + + it.each([ + { + title: 'extractor events', + inputEventType: EventType.StartExtractingData, + outputEventType: ExtractorEventType.DataExtractionProgress, + }, + { + title: 'loader events', + inputEventType: EventType.StartLoadingData, + outputEventType: LoaderEventType.DataLoadingProgress, + }, + { + title: 'unknown events', + inputEventType: EventType.StartExtractingData, + outputEventType: 'SOME_UNKNOWN_EVENT' as ExtractorEventType, + }, + ])( + 'sets state dates from event context for $title', + async ({ inputEventType, outputEventType }) => { + const event = createMockEvent(undefined, { + payload: { + event_type: inputEventType, + event_context: { + extract_from: '2024-01-01T00:00:00.000Z', + extract_to: '2024-06-01T00:00:00.000Z', + }, + }, + }); + + await emit({ + event, + eventType: outputEventType, + }); + + expect(mockedAxiosClient.post).toHaveBeenCalledTimes(1); + + const [, body, config] = mockedAxiosClient.post.mock.calls[0] as [ + string, + { + event_type: string; + event_context: { + extract_from?: string; + extract_to?: string; + }; + worker_metadata: Record; + }, + { headers: Record } + ]; + + expect(body).toMatchObject({ + event_type: outputEventType, + event_context: expect.objectContaining({ + extract_from: '2024-01-01T00:00:00.000Z', + extract_to: '2024-06-01T00:00:00.000Z', + }), + worker_metadata: expect.objectContaining({ + oldest_state_date: '2024-01-01T00:00:00.000Z', + newest_state_date: '2024-06-01T00:00:00.000Z', + }), + }); + + expect(config).toEqual( + expect.objectContaining({ + headers: expect.objectContaining({ + 'X-DevRev-Client-Version': expect.any(String), + }), + }) + ); + } + ); + + it.each([ + { + title: 'only extract_from is set', + extractFrom: '2024-01-01T00:00:00.000Z', + extractTo: undefined, + }, + { + title: 'only extract_to is set', + extractFrom: undefined, + extractTo: '2024-06-01T00:00:00.000Z', + }, + { + title: 'neither extract_from nor extract_to is set', + extractFrom: undefined, + extractTo: undefined, + }, + ])( + 'handles state-date absence when $title', + async ({ extractFrom, extractTo }) => { + const event = createMockEvent(undefined, { + payload: { + event_type: EventType.StartExtractingData, + event_context: { + extract_from: extractFrom, + extract_to: extractTo, + }, + }, + }); + + await emit({ + event, + eventType: ExtractorEventType.DataExtractionProgress, + }); + + const [, body] = mockedAxiosClient.post.mock.calls[0] as [ + string, + { + worker_metadata: Record; + }, + unknown + ]; + const workerMetadata = body.worker_metadata; + + expect(workerMetadata.oldest_state_date).toBe(extractFrom); + expect(workerMetadata.newest_state_date).toBe(extractTo); + } + ); + + it('overrides caller-provided worker_metadata state dates', async () => { + const event = createMockEvent(undefined, { + payload: { + event_type: EventType.StartExtractingData, + event_context: { + extract_from: '2024-01-01T00:00:00.000Z', + extract_to: '2024-06-01T00:00:00.000Z', + }, + }, + }); + + await emit({ + event, + eventType: ExtractorEventType.DataExtractionProgress, + worker_metadata: { + item_type: 'tasks', + oldest_state_date: 'should-be-overwritten', + newest_state_date: 'should-be-overwritten', + }, + }); + + const [, body] = mockedAxiosClient.post.mock.calls[0] as [ + string, + { + worker_metadata: Record; + }, + unknown + ]; + expect(body.worker_metadata).toEqual( + expect.objectContaining({ + item_type: 'tasks', + oldest_state_date: '2024-01-01T00:00:00.000Z', + newest_state_date: '2024-06-01T00:00:00.000Z', + }) + ); + }); +}); diff --git a/src/common/control-protocol.ts b/src/common/control-protocol.ts index c372c91..0649b4b 100644 --- a/src/common/control-protocol.ts +++ b/src/common/control-protocol.ts @@ -6,6 +6,7 @@ import { ExtractorEvent, ExtractorEventType, LoaderEvent, + WorkerMetadata, } from '../types/extraction'; import { LoaderEventType } from '../types/loading'; import { LIBRARY_VERSION } from './constants'; @@ -15,12 +16,14 @@ export interface EmitInterface { event: AirdropEvent; eventType: ExtractorEventType | LoaderEventType; data?: EventData; + worker_metadata?: WorkerMetadata; } export const emit = async ({ event, eventType, data, + worker_metadata, }: EmitInterface): Promise => { // Translate outgoing event type to ensure we always send new event types // TODO: Remove when the old types are completely phased out @@ -33,6 +36,9 @@ export const emit = async ({ ...data, }, worker_metadata: { + ...worker_metadata, + newest_state_date: event.payload.event_context.extract_to, + oldest_state_date: event.payload.event_context.extract_from, adaas_library_version: LIBRARY_VERSION, }, }; diff --git a/src/multithreading/worker-adapter/worker-adapter.emit.test.ts b/src/multithreading/worker-adapter/worker-adapter.emit.test.ts index 8afef41..cd453dd 100644 --- a/src/multithreading/worker-adapter/worker-adapter.emit.test.ts +++ b/src/multithreading/worker-adapter/worker-adapter.emit.test.ts @@ -61,6 +61,8 @@ function makeAdapter(eventType: EventType = EventType.StartExtractingData): { return { adapter, event, adapterState }; } +const iso = (ms: number) => new Date(ms).toISOString(); + describe(`${WorkerAdapter.name}.emit`, () => { let adapter: WorkerAdapter; let mockPostMessage: jest.Mock; @@ -317,6 +319,100 @@ describe(`${WorkerAdapter.name}.emit`, () => { }); }); +describe(`${WorkerAdapter.name}.emit — worker_metadata`, () => { + let adapter: WorkerAdapter; + + beforeEach(() => { + jest.clearAllMocks(); + ({ adapter } = makeAdapter()); + adapter['adapterState'].postState = jest.fn().mockResolvedValue(undefined); + adapter.uploadAllRepos = jest.fn().mockResolvedValue(undefined); + }); + + function setupRepos( + adapterInstance: WorkerAdapter, + lastExtractedItemType: string + ) { + adapterInstance['repos'] = [ + { + itemType: 'issues', + dateRanges: { + creationDate: { oldest: 100, newest: 200 }, + modifiedDate: { oldest: 150, newest: 250 }, + }, + }, + { + itemType: 'tasks', + dateRanges: { + creationDate: { oldest: 300, newest: 400 }, + modifiedDate: { oldest: 350, newest: 450 }, + }, + }, + ] as never; + adapterInstance['lastExtractedItemType'] = lastExtractedItemType; + } + + it('should emit flat worker_metadata for the latest extracted item type only', async () => { + setupRepos(adapter, 'tasks'); + + await adapter.emit(ExtractorEventType.DataExtractionProgress); + + const { emit: mockEmit } = require('../../common/control-protocol'); + expect(mockEmit.mock.calls[0][0].worker_metadata).toEqual({ + item_type: 'tasks', + oldest_created_date: iso(300), + newest_created_date: iso(400), + oldest_modified_date: iso(350), + newest_modified_date: iso(450), + }); + }); + + it('should omit unset RFC3339 bounds from worker_metadata', async () => { + adapter['repos'] = [ + { + itemType: 'tasks', + dateRanges: { + creationDate: { oldest: 0, newest: 0 }, + modifiedDate: { oldest: 0, newest: 0 }, + }, + }, + ] as never; + adapter['lastExtractedItemType'] = 'tasks'; + + await adapter.emit(ExtractorEventType.DataExtractionProgress); + + const { emit: mockEmit } = require('../../common/control-protocol'); + expect(mockEmit.mock.calls[0][0].worker_metadata).toEqual({ + item_type: 'tasks', + }); + }); + + it('should send empty worker_metadata when no item type has been extracted', async () => { + await adapter.emit(ExtractorEventType.DataExtractionProgress); + + const { emit: mockEmit } = require('../../common/control-protocol'); + expect(mockEmit.mock.calls[0][0].worker_metadata).toEqual({}); + }); + + it('should send empty worker_metadata for loader events', async () => { + setupRepos(adapter, 'tasks'); + + await adapter.emit(LoaderEventType.DataLoadingProgress); + + const { emit: mockEmit } = require('../../common/control-protocol'); + expect(mockEmit.mock.calls[0][0].worker_metadata).toEqual({}); + }); + + it('should send empty worker_metadata for non-progress extraction events', async () => { + setupRepos(adapter, 'tasks'); + + await adapter.emit(ExtractorEventType.DataExtractionError); + + const { emit: mockEmit } = require('../../common/control-protocol'); + expect(mockEmit.mock.calls[0][0].worker_metadata).toEqual({}); + }); +}); + describe(`${WorkerAdapter.name}.emit — ExternalSyncUnitExtractionDone legacy path`, () => { it('should upload ESUs via a repo and strip external_sync_units from the emitted payload', async () => { // Arrange diff --git a/src/multithreading/worker-adapter/worker-adapter.helpers.ts b/src/multithreading/worker-adapter/worker-adapter.helpers.ts index ecab2de..465150a 100644 --- a/src/multithreading/worker-adapter/worker-adapter.helpers.ts +++ b/src/multithreading/worker-adapter/worker-adapter.helpers.ts @@ -90,3 +90,12 @@ export function addReportToLoaderReport({ return loaderReports; } + +export function toRfc3339Timestamp(ms: number): string | undefined { + if (!Number.isFinite(ms) || ms === 0) { + return undefined; + } + + return new Date(ms).toISOString(); +} + diff --git a/src/multithreading/worker-adapter/worker-adapter.ts b/src/multithreading/worker-adapter/worker-adapter.ts index 66948de..97311b6 100644 --- a/src/multithreading/worker-adapter/worker-adapter.ts +++ b/src/multithreading/worker-adapter/worker-adapter.ts @@ -11,6 +11,7 @@ import { emit } from '../../common/control-protocol'; import { addReportToLoaderReport, getFilesToLoad, + toRfc3339Timestamp, } from './worker-adapter.helpers'; import { serializeError } from '../../logger/logger'; import { @@ -102,6 +103,7 @@ export class WorkerAdapter { private adapterState: State; private _artifacts: Artifact[]; private repos: Repo[] = []; + private lastExtractedItemType?: string; private currentEventDataLength: number = 0; // Loader @@ -191,6 +193,8 @@ export class WorkerAdapter { itemType: repo.itemType, ...(shouldNormalize && { normalize: repo.normalize }), onUpload: (artifact: Artifact) => { + this.lastExtractedItemType = repo.itemType; + // We need to store artifacts ids in state for later use when streaming attachments if (repo.itemType === AirSyncDefaultItemTypes.ATTACHMENTS) { this.state.toDevRev?.attachmentsMetadata.artifactIds.push( @@ -381,6 +385,46 @@ export class WorkerAdapter { newEventType as LoaderEventType ); + const progressData: { + // Last extracted item type statistics + item_type?: string; + oldest_created_date?: string; + newest_created_date?: string; + oldest_modified_date?: string; + newest_modified_date?: string; + + // Calculated time ranges in absolute times + oldest_state_date?: string; + newest_state_date?: string; + } = {}; + + if ( + isExtractionEvent && + (newEventType == ExtractorEventType.DataExtractionDone || + newEventType == ExtractorEventType.DataExtractionProgress || + newEventType == ExtractorEventType.AttachmentExtractionDone || + newEventType == ExtractorEventType.AttachmentExtractionProgress) + ) { + const repo = this.lastExtractedItemType + ? this.repos.find((r) => r.itemType === this.lastExtractedItemType) + : undefined; + if (repo) { + progressData.item_type = repo.itemType; + progressData.newest_created_date = toRfc3339Timestamp( + repo.dateRanges.creationDate.newest + ); + progressData.oldest_created_date = toRfc3339Timestamp( + repo.dateRanges.creationDate.oldest + ); + progressData.newest_modified_date = toRfc3339Timestamp( + repo.dateRanges.modifiedDate.newest + ); + progressData.oldest_modified_date = toRfc3339Timestamp( + repo.dateRanges.modifiedDate.oldest + ); + } + } + await emit({ eventType: newEventType, event: this.event, @@ -391,6 +435,7 @@ export class WorkerAdapter { ? { reports: this.reports, processed_files: this.processedFiles } : {}), }, + worker_metadata: { ...progressData }, }); const message: WorkerMessageEmitted = { diff --git a/src/repo/repo.helpers.ts b/src/repo/repo.helpers.ts new file mode 100644 index 0000000..159fea9 --- /dev/null +++ b/src/repo/repo.helpers.ts @@ -0,0 +1,17 @@ +export function updateRange( + range: { oldest: number; newest: number }, + ms: number +): void { + if (range.oldest === 0 || ms < range.oldest) { + range.oldest = ms; + } + if (range.newest === 0 || ms > range.newest) { + range.newest = ms; + } +} + +export function toValidTimestamp(value: string): number | undefined { + const ms = new Date(value).getTime(); + return Number.isFinite(ms) ? ms : undefined; +} + diff --git a/src/repo/repo.interfaces.ts b/src/repo/repo.interfaces.ts index 4573a3a..a374781 100644 --- a/src/repo/repo.interfaces.ts +++ b/src/repo/repo.interfaces.ts @@ -44,6 +44,8 @@ export interface NormalizedAttachment { author_id?: string; inline?: boolean; content_type?: string; + created_date?: string; + modified_date?: string; // This should be a string, but it was a number in the past. Due to backwards // compatibility we are keeping it also as a number. diff --git a/src/repo/repo.test.ts b/src/repo/repo.test.ts index b1d93c7..4375a98 100644 --- a/src/repo/repo.test.ts +++ b/src/repo/repo.test.ts @@ -3,6 +3,7 @@ import { createItems, normalizeItem } from '../tests/test-helpers'; import { mockServer } from '../tests/jest.setup'; import { createMockEvent } from '../common/test-utils'; import { EventType } from '../types'; +import { NormalizedAttachment, NormalizedItem } from './repo.interfaces'; import { Repo } from './repo'; jest.mock('../tests/test-helpers', () => ({ @@ -10,6 +11,31 @@ jest.mock('../tests/test-helpers', () => ({ normalizeItem: jest.fn(), })); +const mockUploadFn = jest.fn().mockResolvedValue({ + error: null, + artifact: { id: 'art-1', item_type: 'test', item_count: 0 }, +}); + +jest.mock('../uploader/uploader', () => ({ + Uploader: jest.fn().mockImplementation(() => ({ + upload: mockUploadFn, + })), +})); + +function itemWithDate(id: string, created_date: string): NormalizedItem { + return { id, created_date, modified_date: created_date, data: {} }; +} + +function itemWithDates( + id: string, + created_date: string, + modified_date: string +): NormalizedItem { + return { id, created_date, modified_date, data: {} }; +} + +const ts = (iso: string) => new Date(iso).getTime(); + describe(Repo.name, () => { let repo: Repo; let normalize: jest.Mock; @@ -228,4 +254,230 @@ describe(Repo.name, () => { expect(repo.getItems().length).toBe(5); }); }); + + describe('dateRanges', () => { + beforeEach(() => { + mockUploadFn.mockResolvedValue({ + error: null, + artifact: { id: 'art-1', item_type: 'test', item_count: 0 }, + }); + }); + + it('should track min and max created_date from a single upload batch', async () => { + await repo.upload([ + itemWithDate('1', '2023-06-15T12:00:00.000Z'), + itemWithDate('2', '2020-01-01T00:00:00.000Z'), + itemWithDate('3', '2021-03-01T00:00:00.000Z'), + ]); + + expect(repo.dateRanges.creationDate.oldest).toBe( + ts('2020-01-01T00:00:00.000Z') + ); + expect(repo.dateRanges.creationDate.newest).toBe( + ts('2023-06-15T12:00:00.000Z') + ); + expect(repo.dateRanges.modifiedDate.oldest).toBe( + ts('2020-01-01T00:00:00.000Z') + ); + expect(repo.dateRanges.modifiedDate.newest).toBe( + ts('2023-06-15T12:00:00.000Z') + ); + }); + + it('should skip items without created_date', async () => { + const attachmentWithoutDate: NormalizedAttachment = { + id: 'att-1', + url: 'https://example.com/file', + file_name: 'file.txt', + parent_id: 'parent-1', + }; + + await repo.upload([ + itemWithDate('1', '2022-06-01T00:00:00.000Z'), + { id: '2', created_date: null, modified_date: '', data: {} }, + { id: '3', modified_date: '', data: {} }, + attachmentWithoutDate, + ]); + + expect(repo.dateRanges.creationDate.oldest).toBe( + ts('2022-06-01T00:00:00.000Z') + ); + expect(repo.dateRanges.creationDate.newest).toBe( + ts('2022-06-01T00:00:00.000Z') + ); + }); + + it('should leave timestamps at zero when no items have created_date', async () => { + await repo.upload([ + { id: '1', modified_date: '', data: {} }, + { + id: 'att-1', + url: 'https://example.com/file', + file_name: 'file.txt', + parent_id: 'parent-1', + }, + ]); + + expect(repo.dateRanges).toEqual({ + creationDate: { oldest: 0, newest: 0 }, + modifiedDate: { oldest: 0, newest: 0 }, + }); + }); + + it('should not update timestamps or call uploader on empty upload', async () => { + await repo.upload([]); + + expect(repo.dateRanges).toEqual({ + creationDate: { oldest: 0, newest: 0 }, + modifiedDate: { oldest: 0, newest: 0 }, + }); + expect(mockUploadFn).not.toHaveBeenCalled(); + }); + + it('should accumulate min and max across multiple upload batches via push', async () => { + repo = new Repo({ + event: createMockEvent(mockServer.baseUrl, { + payload: { event_type: EventType.ExtractionDataStart }, + }), + itemType: 'test_item_type', + onUpload: jest.fn(), + options: { batchSize: 3 }, + }); + + const dates = [ + '2020-01-01T00:00:00.000Z', + '2021-01-01T00:00:00.000Z', + '2022-01-01T00:00:00.000Z', + '2023-01-01T00:00:00.000Z', + '2024-06-01T00:00:00.000Z', + '2024-12-01T00:00:00.000Z', + '2024-12-31T00:00:00.000Z', + ]; + const items = dates.map((created_date, index) => + itemWithDate(String(index), created_date) + ); + + await repo.push(items); + + expect(repo.getItems()).toHaveLength(1); + expect(repo.dateRanges.creationDate.oldest).toBe( + ts('2020-01-01T00:00:00.000Z') + ); + expect(repo.dateRanges.creationDate.newest).toBe( + ts('2024-12-01T00:00:00.000Z') + ); + + await repo.push([ + itemWithDate('7', '2019-01-01T00:00:00.000Z'), + itemWithDate('8', '2025-01-01T00:00:00.000Z'), + ]); + + expect(repo.getItems()).toHaveLength(0); + expect(repo.dateRanges.creationDate.oldest).toBe( + ts('2019-01-01T00:00:00.000Z') + ); + expect(repo.dateRanges.creationDate.newest).toBe( + ts('2025-01-01T00:00:00.000Z') + ); + }); + + it('should extend min and max when subsequent batches have wider date range', async () => { + await repo.upload([ + itemWithDate('1', '2022-06-01T00:00:00.000Z'), + itemWithDate('2', '2023-06-01T00:00:00.000Z'), + ]); + + expect(repo.dateRanges.creationDate.oldest).toBe( + ts('2022-06-01T00:00:00.000Z') + ); + expect(repo.dateRanges.creationDate.newest).toBe( + ts('2023-06-01T00:00:00.000Z') + ); + + await repo.upload([ + itemWithDate('3', '2020-01-01T00:00:00.000Z'), + itemWithDate('4', '2024-01-01T00:00:00.000Z'), + ]); + + expect(repo.dateRanges.creationDate.oldest).toBe( + ts('2020-01-01T00:00:00.000Z') + ); + expect(repo.dateRanges.creationDate.newest).toBe( + ts('2024-01-01T00:00:00.000Z') + ); + }); + + it('should update timestamps even when upload fails', async () => { + mockUploadFn.mockResolvedValueOnce({ + error: new Error('fail'), + artifact: null, + }); + + await repo.upload([itemWithDate('1', '2022-01-01T00:00:00.000Z')]); + + expect(repo.dateRanges.creationDate.oldest).toBe( + ts('2022-01-01T00:00:00.000Z') + ); + expect(repo.dateRanges.creationDate.newest).toBe( + ts('2022-01-01T00:00:00.000Z') + ); + }); + + it('should ignore invalid created_date and modified_date values', async () => { + await repo.upload([ + { + id: '1', + created_date: 'not-a-date', + modified_date: 'still-not-a-date', + data: {}, + }, + itemWithDates( + '2', + '2022-01-01T00:00:00.000Z', + '2023-01-01T00:00:00.000Z' + ), + ]); + + expect(repo.dateRanges.creationDate.oldest).toBe( + ts('2022-01-01T00:00:00.000Z') + ); + expect(repo.dateRanges.creationDate.newest).toBe( + ts('2022-01-01T00:00:00.000Z') + ); + expect(repo.dateRanges.modifiedDate.oldest).toBe( + ts('2023-01-01T00:00:00.000Z') + ); + expect(repo.dateRanges.modifiedDate.newest).toBe( + ts('2023-01-01T00:00:00.000Z') + ); + }); + + it('should track modified_date independently from created_date', async () => { + await repo.upload([ + itemWithDates( + '1', + '2020-01-01T00:00:00.000Z', + '2023-01-01T00:00:00.000Z' + ), + itemWithDates( + '2', + '2024-01-01T00:00:00.000Z', + '2021-06-01T00:00:00.000Z' + ), + ]); + + expect(repo.dateRanges.creationDate.oldest).toBe( + ts('2020-01-01T00:00:00.000Z') + ); + expect(repo.dateRanges.creationDate.newest).toBe( + ts('2024-01-01T00:00:00.000Z') + ); + expect(repo.dateRanges.modifiedDate.oldest).toBe( + ts('2021-06-01T00:00:00.000Z') + ); + expect(repo.dateRanges.modifiedDate.newest).toBe( + ts('2023-01-01T00:00:00.000Z') + ); + }); + }); }); diff --git a/src/repo/repo.ts b/src/repo/repo.ts index d119eea..aace2d3 100644 --- a/src/repo/repo.ts +++ b/src/repo/repo.ts @@ -15,6 +15,10 @@ import { NormalizedItem, RepoFactoryInterface, } from './repo.interfaces'; +import { + updateRange, + toValidTimestamp +} from './repo.helpers'; export class Repo { readonly itemType: string; @@ -24,6 +28,10 @@ export class Repo { private onUpload: (artifact: Artifact) => void; private options?: WorkerAdapterOptions; public uploadedArtifacts: Artifact[]; + public dateRanges = { + creationDate: { oldest: 0, newest: 0 }, + modifiedDate: { oldest: 0, newest: 0 }, + }; constructor({ event, @@ -51,6 +59,23 @@ export class Repo { const itemsToUpload = batch || this.items; if (itemsToUpload.length > 0) { + for (const item of itemsToUpload) { + const createdDate = item?.created_date; + if (createdDate != null) { + const createdMs = toValidTimestamp(createdDate); + if (createdMs !== undefined) { + updateRange(this.dateRanges.creationDate, createdMs); + } + } + const modifiedDate = item?.modified_date; + if (modifiedDate != null && modifiedDate !== '') { + const modifiedMs = toValidTimestamp(modifiedDate); + if (modifiedMs !== undefined) { + updateRange(this.dateRanges.modifiedDate, modifiedMs); + } + } + } + console.log( `Uploading ${itemsToUpload.length} items of type ${this.itemType}. ` ); diff --git a/src/types/extraction.ts b/src/types/extraction.ts index 0c328de..859dff8 100644 --- a/src/types/extraction.ts +++ b/src/types/extraction.ts @@ -3,7 +3,6 @@ import { InputData } from '@devrev/typescript-sdk/dist/snap-ins'; import { Artifact } from '../uploader/uploader.interfaces'; import { ErrorRecord } from './common'; - import { AxiosResponse } from 'axios'; import { NormalizedAttachment } from '../repo/repo.interfaces'; import { WorkerAdapter } from '../multithreading/worker-adapter/worker-adapter'; @@ -419,7 +418,19 @@ export interface EventData { * WorkerMetadata is an interface that defines the structure of the worker metadata that is sent from the external extractor to ADaaS. */ export interface WorkerMetadata { - adaas_library_version: string; + adaas_library_version?: string; + + // Last extracted item type statistics + item_type?: string; + oldest_created_date?: string; + newest_created_date?: string; + oldest_modified_date?: string; + newest_modified_date?: string; + + // Calculated time ranges in absolute times + // Times present in `extract_from` and `extract_to` given to the connector. + oldest_state_date?: string; + newest_state_date?: string; } /** diff --git a/src/uploader/uploader.helpers.test.ts b/src/uploader/uploader.helpers.test.ts index 7fffbec..44fecb7 100644 --- a/src/uploader/uploader.helpers.test.ts +++ b/src/uploader/uploader.helpers.test.ts @@ -1,5 +1,4 @@ -import fs from 'fs'; -import { promises as fsPromises } from 'fs'; +import fs, { promises as fsPromises } from 'fs'; import type { FileHandle } from 'fs/promises'; import { jsonl } from 'js-jsonl'; import zlib from 'zlib'; @@ -10,6 +9,7 @@ import { } from '../common/constants'; import { compressGzip, + computeArtifactDateRanges, decompressGzip, downloadToLocal, parseJsonl, @@ -229,6 +229,153 @@ describe('uploader.helpers', () => { }); }); + describe(computeArtifactDateRanges.name, () => { + it('should compute min and max across multiple items', () => { + // Arrange + const items = [ + { + id: '1', + created_date: '2020-01-01T00:00:00.000Z', + modified_date: '2021-06-01T00:00:00.000Z', + data: {}, + }, + { + id: '2', + created_date: '2022-03-15T12:00:00.000Z', + modified_date: '2020-12-31T23:59:59.000Z', + data: {}, + }, + ]; + + // Act + const result = computeArtifactDateRanges(items); + + // Assert + expect(result.oldest_created_date).toBe( + '2020-01-01T00:00:00.000Z' + ); + expect(result.newest_created_date).toBe( + '2022-03-15T12:00:00.000Z' + ); + expect(result.oldest_modified_date).toBe( + '2020-12-31T23:59:59.000Z' + ); + expect(result.newest_modified_date).toBe( + '2021-06-01T00:00:00.000Z' + ); + }); + + it('should return zeros when no items have date fields', () => { + // Arrange + const items = [ + { id: 1, name: 'a' }, + { id: 2, name: 'b' }, + ]; + + // Act + const result = computeArtifactDateRanges(items); + + // Assert + expect(result).toEqual({}); + }); + + it('should aggregate only fields that are present on items', () => { + // Arrange + const items = [ + { + id: '1', + created_date: '2021-01-01T00:00:00.000Z', + data: {}, + }, + { + id: '2', + modified_date: '2023-01-01T00:00:00.000Z', + data: {}, + }, + ]; + + // Act + const result = computeArtifactDateRanges(items); + + // Assert + expect(result.oldest_created_date).toBe( + '2021-01-01T00:00:00.000Z' + ); + expect(result.newest_created_date).toBe( + '2021-01-01T00:00:00.000Z' + ); + expect(result.oldest_modified_date).toBe( + '2023-01-01T00:00:00.000Z' + ); + expect(result.newest_modified_date).toBe( + '2023-01-01T00:00:00.000Z' + ); + }); + + it('should handle single object input', () => { + // Arrange + const item = { + id: '1', + created_date: '2019-05-10T08:30:00.000Z', + modified_date: '2019-05-10T08:30:00.000Z', + data: {}, + }; + + // Act + const result = computeArtifactDateRanges(item); + + // Assert + const ts = '2019-05-10T08:30:00.000Z'; + expect(result.oldest_created_date).toBe(ts); + expect(result.newest_created_date).toBe(ts); + expect(result.oldest_modified_date).toBe(ts); + expect(result.newest_modified_date).toBe(ts); + }); + + it('should skip invalid date values', () => { + const items = [ + { + id: '1', + created_date: 'not-a-date', + modified_date: '2024-01-01T00:00:00.000Z', + data: {}, + }, + ]; + + const result = computeArtifactDateRanges(items); + + expect(result).toEqual({ + oldest_modified_date: '2024-01-01T00:00:00.000Z', + newest_modified_date: '2024-01-01T00:00:00.000Z', + }); + }); + + it('[edge] should skip non-object entries in an array', () => { + // Arrange + const items = [ + null, + { + id: '1', + created_date: '2024-01-01T00:00:00.000Z', + modified_date: '2024-06-01T00:00:00.000Z', + data: {}, + }, + 'not-an-object', + ] as unknown as object[]; + + // Act + const result = computeArtifactDateRanges(items); + + // Assert + expect(result.oldest_created_date).toBe( + '2024-01-01T00:00:00.000Z' + ); + expect(result.newest_created_date).toBe( + '2024-01-01T00:00:00.000Z' + ); + }); + }); + describe(truncateFilename.name, () => { it('should return filename unchanged when within the limit', () => { // Arrange diff --git a/src/uploader/uploader.helpers.ts b/src/uploader/uploader.helpers.ts index ad389a5..60e3975 100644 --- a/src/uploader/uploader.helpers.ts +++ b/src/uploader/uploader.helpers.ts @@ -6,7 +6,92 @@ import { MAX_DEVREV_FILENAME_EXTENSION_LENGTH, MAX_DEVREV_FILENAME_LENGTH, } from '../common/constants'; -import { UploaderResult } from './uploader.interfaces'; +import { NormalizedItem } from '../repo/repo.interfaces'; +import { + ArtifactDateField, + ArtifactDateRanges, + UploaderResult, +} from './uploader.interfaces'; + +/** + * Computes oldest/newest created and modified timestamps (RFC3339) across uploaded items. + * @param fetchedObjects - Single object or array of objects (e.g. NormalizedItem[]) + */ +export function computeArtifactDateRanges( + fetchedObjects: object[] | object +): ArtifactDateRanges { + const items = Array.isArray(fetchedObjects) + ? fetchedObjects + : [fetchedObjects]; + + const created = { + min: Number.POSITIVE_INFINITY, + max: Number.NEGATIVE_INFINITY, + }; + const modified = { + min: Number.POSITIVE_INFINITY, + max: Number.NEGATIVE_INFINITY, + }; + let hasCreated = false; + let hasModified = false; + + const parseTimestamp = (value: string): number | undefined => { + const ts = new Date(value).getTime(); + return Number.isNaN(ts) ? undefined : ts; + }; + + for (const obj of items) { + if (!obj || typeof obj !== 'object') { + continue; + } + const item = obj as NormalizedItem; + if (item.created_date != undefined) { + const ts = parseTimestamp(item.created_date); + if (ts != undefined) { + if (ts < created.min) { + created.min = ts; + } + if (ts > created.max) { + created.max = ts; + } + hasCreated = true; + } + } + if (item.modified_date != undefined) { + const ts = parseTimestamp(item.modified_date); + if (ts != undefined) { + if (ts < modified.min) { + modified.min = ts; + } + if (ts > modified.max) { + modified.max = ts; + } + hasModified = true; + } + } + } + + const result: ArtifactDateRanges = {}; + + if (hasCreated) { + result[ArtifactDateField.OldestCreatedDate] = new Date( + created.min + ).toISOString(); + result[ArtifactDateField.NewestCreatedDate] = new Date( + created.max + ).toISOString(); + } + if (hasModified) { + result[ArtifactDateField.OldestModifiedDate] = new Date( + modified.min + ).toISOString(); + result[ArtifactDateField.NewestModifiedDate] = new Date( + modified.max + ).toISOString(); + } + + return result; +} /** * Compresses a JSONL string using gzip compression. diff --git a/src/uploader/uploader.interfaces.ts b/src/uploader/uploader.interfaces.ts index ce995a7..6650b91 100644 --- a/src/uploader/uploader.interfaces.ts +++ b/src/uploader/uploader.interfaces.ts @@ -19,12 +19,31 @@ export type UploaderResult = /** * Artifact is an interface that defines the structure of an artifact. Artifact is a file that is generated by the extractor and uploaded to ADaaS. */ +export enum ArtifactDateField { + OldestCreatedDate = 'oldest_created_date', + NewestCreatedDate = 'newest_created_date', + OldestModifiedDate = 'oldest_modified_date', + NewestModifiedDate = 'newest_modified_date', +} + export interface Artifact { id: string; item_type: string; item_count: number; + [ArtifactDateField.OldestCreatedDate]?: string; + [ArtifactDateField.NewestCreatedDate]?: string; + [ArtifactDateField.OldestModifiedDate]?: string; + [ArtifactDateField.NewestModifiedDate]?: string; } +export type ArtifactDateRanges = Pick< + Artifact, + | ArtifactDateField.OldestCreatedDate + | ArtifactDateField.NewestCreatedDate + | ArtifactDateField.OldestModifiedDate + | ArtifactDateField.NewestModifiedDate +>; + /** * ArtifactsPrepareResponse is an interface that defines the structure of the response from the prepare artifacts endpoint. */ diff --git a/src/uploader/uploader.test.ts b/src/uploader/uploader.test.ts index 41e9777..39d1036 100644 --- a/src/uploader/uploader.test.ts +++ b/src/uploader/uploader.test.ts @@ -3,7 +3,9 @@ import FormData from 'form-data'; import { jsonl } from 'js-jsonl'; import zlib from 'zlib'; +import { createMockEvent } from '../common/test-utils'; import { axiosClient } from '../http/axios-client-internal'; +import { mockServer } from '../tests/jest.setup'; import { callPrivateMethod, createArtifact, @@ -13,12 +15,10 @@ import { createFileStream, spyOnPrivateMethod, } from '../tests/test-helpers'; -import { mockServer } from '../tests/jest.setup'; -import { createMockEvent } from '../common/test-utils'; +import { Uploader } from './uploader'; import { compressGzip, downloadToLocal } from './uploader.helpers'; import { ArtifactToUpload, UploaderResult } from './uploader.interfaces'; -import { Uploader } from './uploader'; jest.mock('../http/axios-client-internal'); jest.mock('./uploader.helpers', () => ({ @@ -86,6 +86,75 @@ describe(Uploader.name, () => { expect(result.error).toBeUndefined(); }); + it('should compute oldest/newest created and modified dates from normalized items', async () => { + // Arrange + const itemType = 'tasks'; + const fetchedObjects = [ + { + id: '1', + created_date: '2020-06-15T10:00:00.000Z', + modified_date: '2021-01-20T10:00:00.000Z', + data: { name: 'Task 1' }, + }, + { + id: '2', + created_date: '2019-03-01T08:00:00.000Z', + modified_date: '2022-11-30T18:00:00.000Z', + data: { name: 'Task 2' }, + }, + ]; + + mockedAxiosClient.get.mockResolvedValueOnce( + mockArtifactUploadUrlResponse + ); + mockedAxiosClient.post.mockResolvedValue(createAxiosResponse()); + + // Act + const result = await uploader.upload(itemType, fetchedObjects); + + // Assert + expect(result.artifact?.oldest_created_date).toBe( + '2019-03-01T08:00:00.000Z' + ); + expect(result.artifact?.newest_created_date).toBe( + '2020-06-15T10:00:00.000Z' + ); + expect(result.artifact?.oldest_modified_date).toBe( + '2021-01-20T10:00:00.000Z' + ); + expect(result.artifact?.newest_modified_date).toBe( + '2022-11-30T18:00:00.000Z' + ); + expect(result.error).toBeUndefined(); + }); + + it('should compute date ranges for single object upload', async () => { + // Arrange + const itemType = 'metadata'; + const fetchedObject = { + id: '1', + created_date: '2018-12-25T00:00:00.000Z', + modified_date: '2018-12-26T00:00:00.000Z', + data: { key: 'value' }, + }; + + mockedAxiosClient.get.mockResolvedValueOnce( + mockArtifactUploadUrlResponse + ); + mockedAxiosClient.post.mockResolvedValue(createAxiosResponse()); + + // Act + const result = await uploader.upload(itemType, fetchedObject); + + // Assert + const createdTs = '2018-12-25T00:00:00.000Z'; + const modifiedTs = '2018-12-26T00:00:00.000Z'; + expect(result.artifact?.oldest_created_date).toBe(createdTs); + expect(result.artifact?.newest_created_date).toBe(createdTs); + expect(result.artifact?.oldest_modified_date).toBe(modifiedTs); + expect(result.artifact?.newest_modified_date).toBe(modifiedTs); + }); + it('should report item_count as 1 when uploading single object', async () => { // Arrange const itemType = 'metadata'; diff --git a/src/uploader/uploader.ts b/src/uploader/uploader.ts index fa9833a..7ca52ad 100644 --- a/src/uploader/uploader.ts +++ b/src/uploader/uploader.ts @@ -9,6 +9,7 @@ import { serializeError } from '../logger/logger'; import { compressGzip, + computeArtifactDateRanges, decompressGzip, downloadToLocal, parseJsonl, @@ -116,11 +117,14 @@ export class Uploader { } } + const artifactDateRanges = computeArtifactDateRanges(fetchedObjects); + // Return the artifact information to the platform const artifact: Artifact = { id: preparedArtifact!.artifact_id, item_type: itemType, item_count: Array.isArray(fetchedObjects) ? fetchedObjects.length : 1, + ...artifactDateRanges, }; return { artifact };