Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 52 additions & 6 deletions packages/das/src/queue/fetch.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ export class FetchProcessor extends WorkerHost {
): Promise<void> {
this.logger.log(`Fetching PR metadata for ${repoFullName}#${prNumber}`);

const prBefore = await this.prRepo.findOneBy({ repoFullName, prNumber });
const previousClosingIssueNumbers = new Set(
prBefore?.closingIssueNumbers ?? [],
);

const { closingIssueNumbers, body, lastEditedAt } =
await this.fetcher.fetchPrMetadata(repoFullName, prNumber);

Expand All @@ -91,13 +96,54 @@ export class FetchProcessor extends WorkerHost {
},
);

// If this PR is merged, mark each linked issue as solved_by_pr
const pr = await this.prRepo.findOneBy({ repoFullName, prNumber });
if (pr?.state === "MERGED" && closingIssueNumbers.length > 0) {
for (const issueNumber of closingIssueNumbers) {
await this.issueRepo.update(
{ repoFullName, issueNumber },
{ solvedByPr: prNumber },
const currentClosingIssueNumbers = new Set(closingIssueNumbers);

const removedIssueNumbers = [...previousClosingIssueNumbers].filter(
(issueNumber) => !currentClosingIssueNumbers.has(issueNumber),
);

// Always clear stale mappings that were previously linked to this PR but
// are no longer linked after metadata refresh.
if (removedIssueNumbers.length > 0) {
await this.issueRepo.query(
`UPDATE issues
SET solved_by_pr = NULL
WHERE repo_full_name = $1
AND solved_by_pr = $2
AND issue_number = ANY($3::int[])`,
[repoFullName, prNumber, removedIssueNumbers],
);
}

if (pr?.state === "MERGED") {
// Current links on merged PRs should point to this PR.
if (closingIssueNumbers.length > 0) {
await this.issueRepo.query(
`UPDATE issues
SET solved_by_pr = $3
WHERE repo_full_name = $1
AND issue_number = ANY($2::int[])`,
[repoFullName, closingIssueNumbers, prNumber],
);
}
} else {
// If PR is not merged (e.g. reopened), it should not be solver for any
// issue currently or previously linked by closing references.
const clearCandidates = [
...new Set([
...previousClosingIssueNumbers,
...currentClosingIssueNumbers,
]),
];
if (clearCandidates.length > 0) {
await this.issueRepo.query(
`UPDATE issues
SET solved_by_pr = NULL
WHERE repo_full_name = $1
AND solved_by_pr = $2
AND issue_number = ANY($3::int[])`,
[repoFullName, prNumber, clearCandidates],
);
}
}
Expand Down
24 changes: 15 additions & 9 deletions packages/das/src/webhook/webhook.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,22 @@ export class WebhookController {
return { accepted: false };
}

await this.webhookService.handleEvent(
event,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
req.body as Record<string, any>,
deliveryId,
);
try {
await this.webhookService.handleEvent(
event,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
req.body as Record<string, any>,
deliveryId,
);

// Only mark processed on handler success — a thrown error leaves
// processed_at NULL so GitHub's retry will re-claim the row.
await this.webhookService.markProcessed(deliveryId);
// Only mark processed on handler success.
await this.webhookService.markProcessed(deliveryId);
} catch (err) {
// Release the in-flight lease on failure so retries can proceed
// immediately without waiting for lease expiry.
await this.webhookService.releaseDelivery(deliveryId);
throw err;
}

return { accepted: true };
}
Expand Down
50 changes: 32 additions & 18 deletions packages/das/src/webhook/webhook.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { InstallationHandler } from "./handlers/installation.handler";
@Injectable()
export class WebhookService {
private readonly logger = new Logger(WebhookService.name);
private static readonly DELIVERY_LEASE = "10 minutes";

constructor(
@InjectRepository(Repo)
Expand All @@ -32,32 +33,45 @@ export class WebhookService {
* Claim a delivery for processing.
* Returns true if this caller should process the event, false to skip.
*
* Uses INSERT ... ON CONFLICT DO NOTHING as an atomic claim. If the row
* already exists with processed_at set, it's a confirmed duplicate and we
* skip. If it exists with processed_at NULL, the prior attempt crashed
* mid-handler — we reprocess (all handlers are upserts, so that's safe).
* Uses an atomic insert-or-reclaim lease:
* - first sight inserts a row and claims it
* - processed rows are never re-claimed
* - unprocessed rows are only re-claimed when the prior lease is stale
*
* This prevents concurrent processing of the same delivery while still
* allowing retries after crashes.
*/
async claimDelivery(deliveryId: string): Promise<boolean> {
const inserted: unknown[] = await this.dataSource.query(
`INSERT INTO webhook_deliveries (delivery_id)
VALUES ($1)
ON CONFLICT (delivery_id) DO NOTHING
const claimed: unknown[] = await this.dataSource.query(
`INSERT INTO webhook_deliveries (delivery_id, processing_started_at)
VALUES ($1, NOW())
ON CONFLICT (delivery_id) DO UPDATE
SET processing_started_at = NOW()
WHERE webhook_deliveries.processed_at IS NULL
AND (
webhook_deliveries.processing_started_at IS NULL
OR webhook_deliveries.processing_started_at < NOW() - ($2)::interval
)
RETURNING delivery_id`,
[deliveryId],
[deliveryId, WebhookService.DELIVERY_LEASE],
);
if (inserted.length > 0) return true;

const existing: { processed_at: string | null }[] =
await this.dataSource.query(
`SELECT processed_at FROM webhook_deliveries WHERE delivery_id = $1`,
[deliveryId],
);
return existing[0]?.processed_at == null;
return claimed.length > 0;
}

async markProcessed(deliveryId: string): Promise<void> {
await this.dataSource.query(
`UPDATE webhook_deliveries SET processed_at = NOW() WHERE delivery_id = $1`,
`UPDATE webhook_deliveries
SET processed_at = NOW(), processing_started_at = NULL
WHERE delivery_id = $1`,
[deliveryId],
);
}

async releaseDelivery(deliveryId: string): Promise<void> {
await this.dataSource.query(
`UPDATE webhook_deliveries
SET processing_started_at = NULL
WHERE delivery_id = $1 AND processed_at IS NULL`,
[deliveryId],
);
}
Expand Down
17 changes: 11 additions & 6 deletions packages/db/10_webhook_deliveries.sql
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
-- Webhook delivery dedup (X-GitHub-Delivery header).
-- received_at is set on first sight; processed_at is set only after the
-- handler succeeds. A retry whose row exists with processed_at IS NULL
-- means the previous attempt crashed mid-handler and must be reprocessed.
-- received_at is set on first sight; processing_started_at tracks an in-flight
-- claim; processed_at is set only after the handler succeeds.
-- This prevents concurrent workers from processing the same delivery while
-- still allowing retries when a claim becomes stale.
-- Pruned daily at 7-day age.

CREATE TABLE IF NOT EXISTS webhook_deliveries (
delivery_id VARCHAR(255) PRIMARY KEY,
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processing_started_at TIMESTAMPTZ,
processed_at TIMESTAMPTZ
);

-- Backfill for existing deployments: any row that predates this column is
-- treated as fully processed so GitHub retries for historic deliveries
-- aren't re-run.
-- Backfill for existing deployments:
-- - Ensure processed_at exists.
-- - Ensure processing_started_at exists.
-- - Any historic unprocessed row is treated as already processed so very old
-- retries are not replayed.
ALTER TABLE webhook_deliveries ADD COLUMN IF NOT EXISTS processed_at TIMESTAMPTZ;
ALTER TABLE webhook_deliveries ADD COLUMN IF NOT EXISTS processing_started_at TIMESTAMPTZ;
UPDATE webhook_deliveries SET processed_at = received_at WHERE processed_at IS NULL;

CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_age ON webhook_deliveries(received_at);