Skip to content

Commit 861a96a

Browse files
authored
chore: split how we execute github nango sync workflow so it's one per integration (#3657)
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent 63c9bd8 commit 861a96a

6 files changed

Lines changed: 121 additions & 88 deletions

File tree

backend/src/services/integrationService.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import {
2626
startNangoSync,
2727
} from '@crowd/nango'
2828
import { RedisCache } from '@crowd/redis'
29-
import { WorkflowIdReusePolicy } from '@crowd/temporal'
29+
import { WorkflowIdConflictPolicy, WorkflowIdReusePolicy } from '@crowd/temporal'
3030
import { CodePlatform, Edition, PlatformType } from '@crowd/types'
3131

3232
import { IRepositoryOptions } from '@/database/repositories/IRepositoryOptions'
@@ -919,11 +919,12 @@ export default class IntegrationService {
919919
await this.options.temporal.workflow.start('syncGithubIntegration', {
920920
taskQueue: 'nango',
921921
workflowId: `github-nango-sync/${integration.id}`,
922-
workflowIdReusePolicy: WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
922+
workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE,
923+
workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,
923924
retry: {
924925
maximumAttempts: 10,
925926
},
926-
args: [{ integrationIds: [integration.id] }],
927+
args: [{ integrationId: integration.id }],
927928
})
928929

929930
return await this.findById(integration.id)

services/apps/cron_service/src/jobs/nangoGithubSync.job.ts

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
import CronTime from 'cron-time-generator'
22

3-
import { IS_DEV_ENV } from '@crowd/common'
3+
import { ConcurrencyLimiter, IS_DEV_ENV } from '@crowd/common'
44
import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
55
import { fetchNangoIntegrationData } from '@crowd/data-access-layer/src/integrations'
66
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
77
import { NangoIntegration, nangoIntegrationToPlatform } from '@crowd/nango'
8-
import { TEMPORAL_CONFIG, WorkflowIdReusePolicy, getTemporalClient } from '@crowd/temporal'
8+
import {
9+
TEMPORAL_CONFIG,
10+
WorkflowIdConflictPolicy,
11+
WorkflowIdReusePolicy,
12+
getTemporalClient,
13+
} from '@crowd/temporal'
914
import { PlatformType } from '@crowd/types'
1015

1116
import { IJobDefinition } from '../types'
@@ -15,7 +20,7 @@ const job: IJobDefinition = {
1520
cronTime: CronTime.every(
1621
Number(process.env.CROWD_GH_NANGO_SYNC_INTERVAL_MINUTES || IS_DEV_ENV ? 5 : 60),
1722
).minutes(),
18-
timeout: 10 * 60,
23+
timeout: 4 * 60 * 60, // 4 hours
1924
process: async (ctx) => {
2025
ctx.log.info('Triggering nango API check as if a webhook was received!')
2126

@@ -27,9 +32,13 @@ const job: IJobDefinition = {
2732
nangoIntegrationToPlatform(NangoIntegration.GITHUB),
2833
])
2934

30-
const ids: string[] = []
35+
const limiter = new ConcurrencyLimiter(5)
3136

32-
for (const int of integrations) {
37+
// Collect all workflow start operations
38+
const workflowStarts: Array<() => Promise<void>> = []
39+
40+
for (let i = 0; i < integrations.length; i++) {
41+
const int = integrations[i]
3342
const { id, platform } = int
3443

3544
if (platform !== PlatformType.GITHUB_NANGO) {
@@ -40,22 +49,51 @@ const job: IJobDefinition = {
4049
{
4150
integrationId: id,
4251
},
43-
'Triggering nango github integration sync!',
52+
`${i + 1}/${integrations.length} Triggering nango github integration sync!`,
4453
)
4554

46-
ids.push(id)
55+
workflowStarts.push(async () => {
56+
await temporal.workflow
57+
.start('syncGithubIntegration', {
58+
taskQueue: 'nango',
59+
workflowId: `github-nango-sync/cron/${id}`,
60+
workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE,
61+
workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,
62+
retry: {
63+
maximumAttempts: 10,
64+
},
65+
args: [{ integrationId: id }],
66+
})
67+
.catch((err) =>
68+
ctx.log.error(err, 'Error while triggering nango github integration sync!'),
69+
)
70+
})
4771
}
4872

49-
await temporal.workflow.start('syncGithubIntegration', {
50-
taskQueue: 'nango',
51-
workflowId: `github-nango-sync/cron/${new Date().toISOString()}`,
52-
workflowIdReusePolicy:
53-
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
54-
retry: {
55-
maximumAttempts: 10,
56-
},
57-
args: [{ integrationIds: ids }],
73+
ctx.log.info(
74+
`Triggering nango github integration syncs with ${workflowStarts.length} workflows!`,
75+
)
76+
77+
// Track completed workflows
78+
let completedWorkflows = 0
79+
80+
// Register callback to track completed workflows
81+
limiter.setOnJobComplete(() => {
82+
completedWorkflows++
83+
if (completedWorkflows % 100 === 0) {
84+
ctx.log.info(`Triggered ${completedWorkflows} nango github integration syncs so far...`)
85+
}
5886
})
87+
88+
// Process all workflow starts with concurrency limit
89+
for (const workflowStart of workflowStarts) {
90+
await limiter.schedule(workflowStart)
91+
}
92+
93+
// Wait for all remaining jobs to complete
94+
await limiter.waitForFinish()
95+
96+
ctx.log.info(`Triggered ${completedWorkflows} nango github integration syncs in total`)
5997
},
6098
}
6199

services/apps/nango_worker/src/activities.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import {
22
analyzeGithubIntegration,
3+
canCreateGithubConnection,
34
createGithubConnection,
45
deleteConnection,
56
logInfo,
67
mapGithubRepo,
7-
numberOfGithubConnectionsToCreate,
88
processNangoWebhook,
99
removeGithubConnection,
1010
setGithubConnection,
@@ -24,7 +24,7 @@ export {
2424
startNangoSync,
2525
mapGithubRepo,
2626
unmapGithubRepo,
27-
numberOfGithubConnectionsToCreate,
27+
canCreateGithubConnection,
2828
updateGitIntegrationWithRepo,
2929
syncGithubReposToInsights,
3030
logInfo,

services/apps/nango_worker/src/activities/nangoActivities.ts

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,23 +50,23 @@ async function getLastConnectTs(): Promise<Date | undefined> {
5050
return new Date(lastConnect)
5151
}
5252

53-
export async function numberOfGithubConnectionsToCreate(): Promise<number> {
54-
const max = Number(process.env.CROWD_MAX_GH_NANGO_CONNECTIONS_PER_HOUR || 1)
53+
export async function canCreateGithubConnection(): Promise<boolean> {
54+
const minutes = Number(process.env.CROWD_MINUTES_BETWEEN_GH_NANGO_CONNECTION || 6)
5555

56-
svc.log.info(`[GITHUB] Max number of github connections to create: ${max}`)
56+
svc.log.info(`[GITHUB] Min minutes between connection creation: ${minutes}`)
5757

5858
if (IS_DEV_ENV || IS_STAGING_ENV) {
59-
svc.log.info('[GITHUB] Number of github connections to create: 10')
60-
return 10
59+
svc.log.info('[GITHUB] DEV MODE - we can create a connection!')
60+
return true
6161
}
6262

6363
const lastConnectDate = await getLastConnectTs()
6464

6565
svc.log.info(`[GITHUB] Last connect date: ${lastConnectDate.toISOString()}`)
6666

6767
if (!lastConnectDate) {
68-
svc.log.info(`[GITHUB] Number of github connections to create: ${max}`)
69-
return max
68+
svc.log.info('[GITHUB] no last connect date found - we can create a connection!')
69+
return true
7070
}
7171

7272
const now = new Date()
@@ -75,17 +75,21 @@ export async function numberOfGithubConnectionsToCreate(): Promise<number> {
7575
// time is milliseconds
7676
const diff = now.getTime() - lastConnectDate.getTime()
7777

78-
// how many hours
79-
const hours = diff / (1000 * 60 * 60) // ms to seconds to minutes
80-
svc.log.info(`[GITHUB] Diff: ${diff}, hours: ${hours}`)
78+
// how many minutes from diff
79+
const minutesSinceLastConnection = diff / (1000 * 60)
80+
svc.log.info(`[GITHUB] Diff: ${diff}, minutes: ${minutesSinceLastConnection}`)
8181

82-
if (hours >= 1.0) {
83-
svc.log.info(`[GITHUB] Number of github connections to create: ${max}`)
84-
return max
82+
if (minutesSinceLastConnection >= minutes) {
83+
svc.log.info(
84+
'[GITHUB] more time has passed since last connection - we can create a connection!',
85+
)
86+
return true
8587
}
8688

87-
svc.log.info('[GITHUB] Number of github connections to create: 0')
88-
return 0
89+
svc.log.info(
90+
'[GITHUB] not enough time has passed since last connection - we cannot create a connection!',
91+
)
92+
return false
8993
}
9094

9195
export async function processNangoWebhook(

services/apps/nango_worker/src/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ export interface IProcessNangoWebhookArguments {
77
}
88

99
export interface ISyncGithubIntegrationArguments {
10-
integrationIds: string[]
10+
integrationId: string
1111
}
1212

1313
export interface IGithubIntegrationSyncInstructions {
Lines changed: 41 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,68 @@
1-
import { proxyActivities, sleep } from '@temporalio/workflow'
1+
import { proxyActivities } from '@temporalio/workflow'
22

33
import * as activities from '../activities/nangoActivities'
44
import { ISyncGithubIntegrationArguments } from '../types'
55

6-
const REPO_ONBOARDING_INTERVAL_MINUTES = 6
7-
86
const activity = proxyActivities<typeof activities>({
97
startToCloseTimeout: '2 hour',
108
retry: { maximumAttempts: 20, backoffCoefficient: 2 },
119
})
1210

1311
export async function syncGithubIntegration(args: ISyncGithubIntegrationArguments): Promise<void> {
14-
const limit = await activity.numberOfGithubConnectionsToCreate()
15-
let created = 0
16-
17-
for (const integrationId of args.integrationIds) {
18-
const result = await activity.analyzeGithubIntegration(integrationId)
12+
const integrationId = args.integrationId
1913

20-
// delete connections that are no longer needed
21-
for (const repo of result.reposToDelete) {
22-
// delete nango connection
23-
await activity.deleteConnection(integrationId, result.providerConfigKey, repo.connectionId)
14+
const result = await activity.analyzeGithubIntegration(integrationId)
2415

25-
// delete connection from integrations.settings.nangoMapping object
26-
await activity.removeGithubConnection(integrationId, repo.connectionId)
16+
// delete connections that are no longer needed
17+
for (const repo of result.reposToDelete) {
18+
// delete nango connection
19+
await activity.deleteConnection(integrationId, result.providerConfigKey, repo.connectionId)
2720

28-
// delete githubRepos mapping
29-
await activity.unmapGithubRepo(integrationId, repo.repo)
30-
}
21+
// delete connection from integrations.settings.nangoMapping object
22+
await activity.removeGithubConnection(integrationId, repo.connectionId)
3123

32-
// delete duplicate connections
33-
for (const repo of result.duplicatesToDelete) {
34-
// delete nango connection
35-
await activity.deleteConnection(integrationId, result.providerConfigKey, repo.connectionId)
24+
// delete githubRepos mapping
25+
await activity.unmapGithubRepo(integrationId, repo.repo)
26+
}
3627

37-
// delete connection from integrations.settings.nangoMapping object
38-
await activity.removeGithubConnection(integrationId, repo.connectionId)
28+
// delete duplicate connections
29+
for (const repo of result.duplicatesToDelete) {
30+
// delete nango connection
31+
await activity.deleteConnection(integrationId, result.providerConfigKey, repo.connectionId)
3932

40-
// we don't unmap because this one was duplicated
41-
}
33+
// delete connection from integrations.settings.nangoMapping object
34+
await activity.removeGithubConnection(integrationId, repo.connectionId)
4235

43-
// create connections for repos that are not already connected
44-
for (const repo of result.reposToSync) {
45-
if (created >= limit) {
46-
await activity.logInfo(
47-
`Max number of github connections reached! Skipping repo ${repo.owner}/${repo.repoName} from integration ${integrationId}!`,
48-
)
49-
continue
50-
}
36+
// we don't unmap because this one was duplicated
37+
}
5138

52-
// create nango connection
53-
const connectionId = await activity.createGithubConnection(integrationId, repo)
39+
// create connections for repos that are not already connected
40+
for (const repo of result.reposToSync) {
41+
const canCreate = await activity.canCreateGithubConnection()
5442

55-
// add connection to integrations.settings.nangoMapping object
56-
await activity.setGithubConnection(integrationId, repo, connectionId)
43+
if (!canCreate) {
44+
await activity.logInfo(
45+
`Not enough time has passed since last connection! Skipping repo ${repo.owner}/${repo.repoName} from integration ${integrationId}!`,
46+
)
47+
continue
48+
}
5749

58-
// add repo to githubRepos mapping if it's not already mapped
59-
await activity.mapGithubRepo(integrationId, repo)
50+
// create nango connection
51+
const connectionId = await activity.createGithubConnection(integrationId, repo)
6052

61-
// add repo to git integration
62-
await activity.updateGitIntegrationWithRepo(integrationId, repo)
53+
// add connection to integrations.settings.nangoMapping object
54+
await activity.setGithubConnection(integrationId, repo, connectionId)
6355

64-
// start nango sync
65-
await activity.startNangoSync(integrationId, result.providerConfigKey, connectionId)
56+
// add repo to githubRepos mapping if it's not already mapped
57+
await activity.mapGithubRepo(integrationId, repo)
6658

67-
// sync repositories to segmentRepositories and insightsProjects after processing all repos
68-
await activity.syncGithubReposToInsights(integrationId)
59+
// add repo to git integration
60+
await activity.updateGitIntegrationWithRepo(integrationId, repo)
6961

70-
created++
62+
// start nango sync
63+
await activity.startNangoSync(integrationId, result.providerConfigKey, connectionId)
7164

72-
if (created < limit) {
73-
// fixed delay to spread onboarding evenly throughout the day
74-
await sleep(REPO_ONBOARDING_INTERVAL_MINUTES * 60 * 1000)
75-
}
76-
}
65+
// sync repositories to segmentRepositories and insightsProjects after processing all repos
66+
await activity.syncGithubReposToInsights(integrationId)
7767
}
7868
}

0 commit comments

Comments
 (0)