diff --git a/packages/das/src/queue/fetch.processor.ts b/packages/das/src/queue/fetch.processor.ts index 27860d2..1ee3db5 100644 --- a/packages/das/src/queue/fetch.processor.ts +++ b/packages/das/src/queue/fetch.processor.ts @@ -79,6 +79,11 @@ export class FetchProcessor extends WorkerHost { ): Promise { this.logger.log(`Fetching PR metadata for ${repoFullName}#${prNumber}`); + const prBefore = await this.prRepo.findOneBy({ repoFullName, prNumber }); + const previousClosingIssueNumbers = new Set( + prBefore?.closingIssueNumbers ?? [], + ); + const { closingIssueNumbers, body, lastEditedAt } = await this.fetcher.fetchPrMetadata(repoFullName, prNumber); @@ -91,13 +96,54 @@ export class FetchProcessor extends WorkerHost { }, ); - // If this PR is merged, mark each linked issue as solved_by_pr const pr = await this.prRepo.findOneBy({ repoFullName, prNumber }); - if (pr?.state === "MERGED" && closingIssueNumbers.length > 0) { - for (const issueNumber of closingIssueNumbers) { - await this.issueRepo.update( - { repoFullName, issueNumber }, - { solvedByPr: prNumber }, + const currentClosingIssueNumbers = new Set(closingIssueNumbers); + + const removedIssueNumbers = [...previousClosingIssueNumbers].filter( + (issueNumber) => !currentClosingIssueNumbers.has(issueNumber), + ); + + // Always clear stale mappings that were previously linked to this PR but + // are no longer linked after metadata refresh. + if (removedIssueNumbers.length > 0) { + await this.issueRepo.query( + `UPDATE issues + SET solved_by_pr = NULL + WHERE repo_full_name = $1 + AND solved_by_pr = $2 + AND issue_number = ANY($3::int[])`, + [repoFullName, prNumber, removedIssueNumbers], + ); + } + + if (pr?.state === "MERGED") { + // Current links on merged PRs should point to this PR. + if (closingIssueNumbers.length > 0) { + await this.issueRepo.query( + `UPDATE issues + SET solved_by_pr = $3 + WHERE repo_full_name = $1 + AND issue_number = ANY($2::int[])`, + [repoFullName, closingIssueNumbers, prNumber], + ); + } + } else { + // If PR is not merged (e.g. reopened), it should not be solver for any + // issue currently or previously linked by closing references. + const clearCandidates = [ + ...new Set([ + ...previousClosingIssueNumbers, + ...currentClosingIssueNumbers, + ]), + ]; + if (clearCandidates.length > 0) { + await this.issueRepo.query( + `UPDATE issues + SET solved_by_pr = NULL + WHERE repo_full_name = $1 + AND solved_by_pr = $2 + AND issue_number = ANY($3::int[])`, + [repoFullName, prNumber, clearCandidates], ); } } diff --git a/packages/das/src/webhook/webhook.controller.ts b/packages/das/src/webhook/webhook.controller.ts index 0915fd8..9fb436e 100644 --- a/packages/das/src/webhook/webhook.controller.ts +++ b/packages/das/src/webhook/webhook.controller.ts @@ -48,16 +48,22 @@ export class WebhookController { return { accepted: false }; } - await this.webhookService.handleEvent( - event, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - req.body as Record, - deliveryId, - ); + try { + await this.webhookService.handleEvent( + event, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + req.body as Record, + deliveryId, + ); - // Only mark processed on handler success — a thrown error leaves - // processed_at NULL so GitHub's retry will re-claim the row. - await this.webhookService.markProcessed(deliveryId); + // Only mark processed on handler success. + await this.webhookService.markProcessed(deliveryId); + } catch (err) { + // Release the in-flight lease on failure so retries can proceed + // immediately without waiting for lease expiry. + await this.webhookService.releaseDelivery(deliveryId); + throw err; + } return { accepted: true }; } diff --git a/packages/das/src/webhook/webhook.service.ts b/packages/das/src/webhook/webhook.service.ts index ab4d707..80a48c7 100644 --- a/packages/das/src/webhook/webhook.service.ts +++ b/packages/das/src/webhook/webhook.service.ts @@ -14,6 +14,7 @@ import { InstallationHandler } from "./handlers/installation.handler"; @Injectable() export class WebhookService { private readonly logger = new Logger(WebhookService.name); + private static readonly DELIVERY_LEASE = "10 minutes"; constructor( @InjectRepository(Repo) @@ -32,32 +33,45 @@ export class WebhookService { * Claim a delivery for processing. * Returns true if this caller should process the event, false to skip. * - * Uses INSERT ... ON CONFLICT DO NOTHING as an atomic claim. If the row - * already exists with processed_at set, it's a confirmed duplicate and we - * skip. If it exists with processed_at NULL, the prior attempt crashed - * mid-handler — we reprocess (all handlers are upserts, so that's safe). + * Uses an atomic insert-or-reclaim lease: + * - first sight inserts a row and claims it + * - processed rows are never re-claimed + * - unprocessed rows are only re-claimed when the prior lease is stale + * + * This prevents concurrent processing of the same delivery while still + * allowing retries after crashes. */ async claimDelivery(deliveryId: string): Promise { - const inserted: unknown[] = await this.dataSource.query( - `INSERT INTO webhook_deliveries (delivery_id) - VALUES ($1) - ON CONFLICT (delivery_id) DO NOTHING + const claimed: unknown[] = await this.dataSource.query( + `INSERT INTO webhook_deliveries (delivery_id, processing_started_at) + VALUES ($1, NOW()) + ON CONFLICT (delivery_id) DO UPDATE + SET processing_started_at = NOW() + WHERE webhook_deliveries.processed_at IS NULL + AND ( + webhook_deliveries.processing_started_at IS NULL + OR webhook_deliveries.processing_started_at < NOW() - ($2)::interval + ) RETURNING delivery_id`, - [deliveryId], + [deliveryId, WebhookService.DELIVERY_LEASE], ); - if (inserted.length > 0) return true; - - const existing: { processed_at: string | null }[] = - await this.dataSource.query( - `SELECT processed_at FROM webhook_deliveries WHERE delivery_id = $1`, - [deliveryId], - ); - return existing[0]?.processed_at == null; + return claimed.length > 0; } async markProcessed(deliveryId: string): Promise { await this.dataSource.query( - `UPDATE webhook_deliveries SET processed_at = NOW() WHERE delivery_id = $1`, + `UPDATE webhook_deliveries + SET processed_at = NOW(), processing_started_at = NULL + WHERE delivery_id = $1`, + [deliveryId], + ); + } + + async releaseDelivery(deliveryId: string): Promise { + await this.dataSource.query( + `UPDATE webhook_deliveries + SET processing_started_at = NULL + WHERE delivery_id = $1 AND processed_at IS NULL`, [deliveryId], ); } diff --git a/packages/db/10_webhook_deliveries.sql b/packages/db/10_webhook_deliveries.sql index d48c47f..e63c9df 100644 --- a/packages/db/10_webhook_deliveries.sql +++ b/packages/db/10_webhook_deliveries.sql @@ -1,19 +1,24 @@ -- Webhook delivery dedup (X-GitHub-Delivery header). --- received_at is set on first sight; processed_at is set only after the --- handler succeeds. A retry whose row exists with processed_at IS NULL --- means the previous attempt crashed mid-handler and must be reprocessed. +-- received_at is set on first sight; processing_started_at tracks an in-flight +-- claim; processed_at is set only after the handler succeeds. +-- This prevents concurrent workers from processing the same delivery while +-- still allowing retries when a claim becomes stale. -- Pruned daily at 7-day age. CREATE TABLE IF NOT EXISTS webhook_deliveries ( delivery_id VARCHAR(255) PRIMARY KEY, received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + processing_started_at TIMESTAMPTZ, processed_at TIMESTAMPTZ ); --- Backfill for existing deployments: any row that predates this column is --- treated as fully processed so GitHub retries for historic deliveries --- aren't re-run. +-- Backfill for existing deployments: +-- - Ensure processed_at exists. +-- - Ensure processing_started_at exists. +-- - Any historic unprocessed row is treated as already processed so very old +-- retries are not replayed. ALTER TABLE webhook_deliveries ADD COLUMN IF NOT EXISTS processed_at TIMESTAMPTZ; +ALTER TABLE webhook_deliveries ADD COLUMN IF NOT EXISTS processing_started_at TIMESTAMPTZ; UPDATE webhook_deliveries SET processed_at = received_at WHERE processed_at IS NULL; CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_age ON webhook_deliveries(received_at);