Skip to content

Commit 6781391

Browse files
authored
chore: optimize how we trigger nango related workflows (CM-795) (#3642)
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent 5b32a7f commit 6781391

8 files changed

Lines changed: 366 additions & 59 deletions

File tree

services/apps/cron_service/src/jobs/nangoMonitoring.job.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ const job: IJobDefinition = {
4747

4848
const ghMissingNangoConnections: Map<string, string[]> = new Map()
4949
const ghNotConnectedToNangoYet: Map<string, number> = new Map()
50+
const ghNoCursorsYet: Map<string, number> = new Map()
5051

5152
let totalRepos = 0
5253

@@ -82,6 +83,15 @@ const job: IJobDefinition = {
8283
// then get nango connection statuses for each connection
8384
if (int.settings.nangoMapping) {
8485
for (const connectionId of Object.keys(int.settings.nangoMapping)) {
86+
// check if we have cursors already for this connection
87+
if (!int.settings.cursors || !int.settings.cursors[connectionId]) {
88+
if (ghNoCursorsYet.has(int.id)) {
89+
ghNoCursorsYet.set(int.id, ghNoCursorsYet.get(int.id) + 1)
90+
} else {
91+
ghNoCursorsYet.set(int.id, 1)
92+
}
93+
}
94+
8595
const nangoConnection = singleOrDefault(
8696
nangoConnections,
8797
(c) => c.connection_id === connectionId,
@@ -188,13 +198,20 @@ const job: IJobDefinition = {
188198

189199
if (ghNotConnectedToNangoYet.size > 0) {
190200
let totalNotConnected = 0
191-
for (const [integrationId, count] of ghNotConnectedToNangoYet.entries()) {
192-
slackMessage += `- *${integrationId}* has ${count} repos not connected to nango yet\n`
201+
for (const count of Array.from(ghNotConnectedToNangoYet.values())) {
193202
totalNotConnected += count
194203
}
195204

196205
slackMessage += `We have in total ${totalRepos} repos and ${totalNotConnected} of them are not connected to nango yet!\n`
197206
}
207+
208+
if (ghNoCursorsYet.size > 0) {
209+
let totalNoCursors = 0
210+
for (const count of Array.from(ghNoCursorsYet.values())) {
211+
totalNoCursors += count
212+
}
213+
slackMessage += `And ${totalNoCursors} of them have no cursors yet!\n`
214+
}
198215
}
199216

200217
if (failedConnections.length > 0) {

services/apps/cron_service/src/jobs/nangoTrigger.job.ts

Lines changed: 75 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import CronTime from 'cron-time-generator'
22

3-
import { IS_DEV_ENV } from '@crowd/common'
3+
import { ConcurrencyLimiter, IS_DEV_ENV } from '@crowd/common'
44
import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
5-
import { fetchNangoIntegrationData } from '@crowd/data-access-layer/src/integrations'
5+
import { fetchNangoIntegrationDataForCheck } from '@crowd/data-access-layer/src/integrations'
66
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
77
import {
88
ALL_NANGO_INTEGRATIONS,
@@ -28,10 +28,15 @@ const job: IJobDefinition = {
2828

2929
const dbConnection = await getDbConnection(READ_DB_CONFIG(), 3, 0)
3030

31-
const integrationsToTrigger = await fetchNangoIntegrationData(pgpQx(dbConnection), [
31+
const integrationsToTrigger = await fetchNangoIntegrationDataForCheck(pgpQx(dbConnection), [
3232
...new Set(ALL_NANGO_INTEGRATIONS.map(nangoIntegrationToPlatform)),
3333
])
3434

35+
const limiter = new ConcurrencyLimiter(5)
36+
37+
// Collect all workflow start operations
38+
const workflowStarts: Array<() => Promise<void>> = []
39+
3540
for (let i = 0; i < integrationsToTrigger.length; i++) {
3641
const int = integrationsToTrigger[i]
3742

@@ -71,10 +76,51 @@ const job: IJobDefinition = {
7176
modifiedAfter: new Date().toISOString(),
7277
}
7378

79+
workflowStarts.push(async () => {
80+
try {
81+
await temporal.workflow.start('processNangoWebhook', {
82+
taskQueue: 'nango',
83+
workflowId: `nango-webhook/${platform}/${id}/${connectionId}/${model}/cron-triggered`,
84+
workflowIdReusePolicy:
85+
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
86+
retry: {
87+
maximumAttempts: 10,
88+
},
89+
args: [payload],
90+
})
91+
} catch (error) {
92+
if (error.name === 'WorkflowExecutionAlreadyStartedError') {
93+
ctx.log.debug(
94+
{
95+
integrationId: id,
96+
platform,
97+
model,
98+
connectionId,
99+
},
100+
'Workflow already running, skipping...',
101+
)
102+
return
103+
}
104+
throw error
105+
}
106+
})
107+
}
108+
} else {
109+
const payload: INangoWebhookPayload = {
110+
connectionId: id,
111+
providerConfigKey: platform,
112+
syncName: 'not important',
113+
model,
114+
responseResults: { added: 1, updated: 1, deleted: 1 },
115+
syncType: 'INCREMENTAL',
116+
modifiedAfter: new Date().toISOString(),
117+
}
118+
119+
workflowStarts.push(async () => {
74120
try {
75121
await temporal.workflow.start('processNangoWebhook', {
76122
taskQueue: 'nango',
77-
workflowId: `nango-webhook/${platform}/${id}/${connectionId}/${model}/cron-triggered`,
123+
workflowId: `nango-webhook/${platform}/${id}/${model}/cron-triggered`,
78124
workflowIdReusePolicy:
79125
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
80126
retry: {
@@ -89,53 +135,40 @@ const job: IJobDefinition = {
89135
integrationId: id,
90136
platform,
91137
model,
92-
connectionId,
93138
},
94139
'Workflow already running, skipping...',
95140
)
96-
continue
141+
return
97142
}
98143
throw error
99144
}
100-
}
101-
} else {
102-
const payload: INangoWebhookPayload = {
103-
connectionId: id,
104-
providerConfigKey: platform,
105-
syncName: 'not important',
106-
model,
107-
responseResults: { added: 1, updated: 1, deleted: 1 },
108-
syncType: 'INCREMENTAL',
109-
modifiedAfter: new Date().toISOString(),
110-
}
111-
112-
try {
113-
await temporal.workflow.start('processNangoWebhook', {
114-
taskQueue: 'nango',
115-
workflowId: `nango-webhook/${platform}/${id}/${model}/cron-triggered`,
116-
workflowIdReusePolicy: WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
117-
retry: {
118-
maximumAttempts: 10,
119-
},
120-
args: [payload],
121-
})
122-
} catch (error) {
123-
if (error.name === 'WorkflowExecutionAlreadyStartedError') {
124-
ctx.log.debug(
125-
{
126-
integrationId: id,
127-
platform,
128-
model,
129-
},
130-
'Workflow already running, skipping...',
131-
)
132-
continue
133-
}
134-
throw error
135-
}
145+
})
136146
}
137147
}
138148
}
149+
150+
ctx.log.info(`Triggering nango integration checks with ${workflowStarts.length} workflows!`)
151+
152+
// Track completed workflows
153+
let completedWorkflows = 0
154+
155+
// Register callback to track completed workflows
156+
limiter.setOnJobComplete(() => {
157+
completedWorkflows++
158+
if (completedWorkflows % 100 === 0) {
159+
ctx.log.info(`Triggered ${completedWorkflows} nango integrations checks so far...`)
160+
}
161+
})
162+
163+
// Process all workflow starts with concurrency limit
164+
for (const workflowStart of workflowStarts) {
165+
await limiter.schedule(workflowStart)
166+
}
167+
168+
// Wait for all remaining jobs to complete
169+
await limiter.waitForFinish()
170+
171+
ctx.log.info(`Triggered ${completedWorkflows} nango integrations checks in total`)
139172
},
140173
}
141174

services/apps/nango_worker/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
"lint": "npx eslint --ext .ts src --max-warnings=0",
1010
"format": "npx prettier --write \"src/**/*.ts\"",
1111
"format-check": "npx prettier --check .",
12-
"tsc-check": "tsc --noEmit"
12+
"tsc-check": "tsc --noEmit",
13+
"check-nango-mapping": "tsx src/bin/check-nango-mapping.ts"
1314
},
1415
"dependencies": {
1516
"@crowd/archetype-standard": "workspace:*",

services/apps/nango_worker/src/activities/nangoActivities.ts

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,16 @@ export async function processNangoWebhook(
121121

122122
const settings = integration.settings
123123
let cursor = args.nextPageCursor
124+
let existingCursor = false
124125
if (
125126
!cursor &&
126127
settings.cursors &&
127128
settings.cursors[args.connectionId] &&
128-
settings.cursors[args.connectionId][args.model]
129+
settings.cursors[args.connectionId][args.model] &&
130+
!['<no-cursor>', '<no-records>'].includes(settings.cursors[args.connectionId][args.model])
129131
) {
130132
cursor = settings.cursors[args.connectionId][args.model]
133+
existingCursor = true
131134
}
132135

133136
await initNangoCloudClient()
@@ -170,14 +173,33 @@ export async function processNangoWebhook(
170173

171174
return records.nextCursor
172175
} else {
173-
await setNangoIntegrationCursor(
174-
dbStoreQx(svc.postgres.writer),
175-
integration.id,
176-
args.connectionId,
177-
args.model,
178-
records.records[records.records.length - 1].metadata.cursor,
179-
)
176+
let cursor = '<no-cursor>'
177+
const lastRecord = records.records[records.records.length - 1]
178+
if (lastRecord.metadata?.cursor) {
179+
cursor = lastRecord.metadata.cursor
180+
}
181+
182+
// if we dont have a cursor but we have an existing one we keep existing one
183+
// if we have a cursor from the last record we also set it
184+
if ((cursor === '<no-cursor>' && !existingCursor) || (cursor && cursor !== '<no-cursor>')) {
185+
await setNangoIntegrationCursor(
186+
dbStoreQx(svc.postgres.writer),
187+
integration.id,
188+
args.connectionId,
189+
args.model,
190+
cursor,
191+
)
192+
}
180193
}
194+
} else if (!existingCursor) {
195+
// only update if we don't have an existing cursor
196+
await setNangoIntegrationCursor(
197+
dbStoreQx(svc.postgres.writer),
198+
integration.id,
199+
args.connectionId,
200+
args.model,
201+
'<no-records>',
202+
)
181203
}
182204
}
183205

0 commit comments

Comments
 (0)