Skip to content

Commit 49e5aba

Browse files
committed
feat: add transactional option for non-transactional worker execution
1 parent b04b18a commit 49e5aba

2 files changed

Lines changed: 172 additions & 46 deletions

File tree

src/PrismaQueue.ts

Lines changed: 158 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import type {
1010
JobPayload,
1111
JobResult,
1212
JobWorker,
13+
JobWorkerWithClient,
1314
PrismaLightClient,
1415
RetryStrategy,
1516
} from "./types";
@@ -28,6 +29,8 @@ export type PrismaQueueOptions = {
2829
transactionTimeout?: number;
2930
/** Custom retry strategy. Returns delay in ms, or null to stop retrying. */
3031
retryStrategy?: RetryStrategy;
32+
/** Whether to run the worker inside the dequeue transaction. Defaults to true. */
33+
transactional?: boolean;
3134
};
3235

3336
export type EnqueueOptions = {
@@ -87,12 +90,13 @@ export class PrismaQueue<
8790

8891
/**
8992
* Constructs a PrismaQueue object with specified options and a worker function.
93+
* Use the `createQueue` factory function for type-safe overloads based on the `transactional` option.
9094
* @param options - Configuration options for the queue.
9195
* @param worker - The worker function that processes jobs.
9296
*/
9397
public constructor(
9498
private options: PrismaQueueOptions = {},
95-
public worker: JobWorker<T, U>,
99+
public worker: JobWorker<T, U> | JobWorkerWithClient<T, U>,
96100
) {
97101
super();
98102

@@ -107,6 +111,7 @@ export class PrismaQueue<
107111
deleteOn = DEFAULT_DELETE_ON,
108112
transactionTimeout = 30 * 60 * 1000,
109113
retryStrategy = defaultRetryStrategy,
114+
transactional = true,
110115
} = this.options;
111116

112117
assert(name.length <= 255, "name must be less or equal to 255 chars");
@@ -129,6 +134,7 @@ export class PrismaQueue<
129134
deleteOn,
130135
transactionTimeout,
131136
retryStrategy,
137+
transactional,
132138
};
133139

134140
// Default error handlers
@@ -345,20 +351,57 @@ export class PrismaQueue<
345351
}
346352

347353
/**
348-
* Dequeues and processes the next job in the queue. Handles locking and error management internally.
354+
* Dequeues and processes the next job in the queue. Dispatches to transactional or
355+
* non-transactional path based on configuration, then emits events and handles cron scheduling.
349356
* @returns {Promise<PrismaJob<T, U> | null>} The job that was processed or null if no job was available.
350357
*/
351358
private async dequeue(): Promise<PrismaJob<T, U> | null> {
352359
if (this.stopped) {
353360
return null;
354361
}
355362
debug(`dequeuing from queue named="${this.name}"...`);
363+
364+
const { job, successResult, errorResult } = this.config.transactional
365+
? await this.dequeueTransactional()
366+
: await this.dequeueNonTransactional();
367+
368+
if (job) {
369+
// Emit events in logical order: dequeue first, then success/error
370+
this.emit("dequeue", job);
371+
if (successResult !== undefined) {
372+
this.emit("success", successResult, job);
373+
} else if (errorResult !== undefined) {
374+
this.emit("jobError", errorResult, job);
375+
}
376+
377+
const { key, cron, payload, finishedAt } = job;
378+
if (finishedAt && cron && key) {
379+
// Schedule next cron
380+
debug(
381+
`scheduling next cron job({key: ${key}, cron: ${cron}}) with payload=${JSON.stringify(payload)}`,
382+
);
383+
try {
384+
await this.schedule({ key, cron }, payload);
385+
} catch (scheduleError) {
386+
this.emit("error", scheduleError);
387+
}
388+
}
389+
}
390+
391+
return job;
392+
}
393+
394+
private async dequeueTransactional(): Promise<{
395+
job: PrismaJob<T, U> | null;
396+
successResult: U | undefined;
397+
errorResult: unknown;
398+
}> {
356399
const { name: queueName } = this;
357400
const { deleteOn, transactionTimeout } = this.config;
358401
const tableName = this.#escapedTableName;
359402
const now = new Date();
403+
const worker = this.worker as JobWorker<T, U>;
360404

361-
// Collect deferred events to emit after transaction
362405
let successResult: U | undefined;
363406
let errorResult: unknown;
364407

@@ -384,7 +427,6 @@ export class PrismaQueue<
384427
);
385428
if (!rows.length || !rows[0]) {
386429
debug(`no jobs found in queue named="${this.name}"`);
387-
// @NOTE Failed to acquire a lock
388430
return null;
389431
}
390432
const { id, payload, attempts, maxAttempts } = rows[0];
@@ -397,7 +439,7 @@ export class PrismaQueue<
397439
let result;
398440
try {
399441
debug(`starting worker for job({id: ${id}, payload: ${JSON.stringify(payload)}})`);
400-
result = await this.worker(job, client);
442+
result = await worker(job, client);
401443
debug(`finished worker for job({id: ${id}, payload: ${JSON.stringify(payload)}})`);
402444
const date = new Date();
403445
await job.update({ finishedAt: date, progress: 100, result, error: Prisma.DbNull });
@@ -441,30 +483,121 @@ export class PrismaQueue<
441483
{ timeout: transactionTimeout },
442484
);
443485

444-
if (job) {
445-
// Emit events in logical order: dequeue first, then success/error
446-
this.emit("dequeue", job);
447-
if (successResult !== undefined) {
448-
this.emit("success", successResult, job);
449-
} else if (errorResult !== undefined) {
450-
this.emit("jobError", errorResult, job);
451-
}
486+
return { job, successResult, errorResult };
487+
}
452488

453-
const { key, cron, payload, finishedAt } = job;
454-
if (finishedAt && cron && key) {
455-
// Schedule next cron
456-
debug(
457-
`scheduling next cron job({key: ${key}, cron: ${cron}}) with payload=${JSON.stringify(payload)}`,
458-
);
459-
try {
460-
await this.schedule({ key, cron }, payload);
461-
} catch (scheduleError) {
462-
this.emit("error", scheduleError);
463-
}
489+
private async dequeueNonTransactional(): Promise<{
490+
job: PrismaJob<T, U> | null;
491+
successResult: U | undefined;
492+
errorResult: unknown;
493+
}> {
494+
const { name: queueName } = this;
495+
const { deleteOn } = this.config;
496+
const tableName = this.#escapedTableName;
497+
const now = new Date();
498+
const worker = this.worker as JobWorkerWithClient<T, U>;
499+
500+
let successResult: U | undefined;
501+
let errorResult: unknown;
502+
503+
// Phase 1: Claim the job atomically (single-statement implicit transaction)
504+
const rows = await this.#prisma.$queryRawUnsafe<DatabaseJob<T, U>[]>(
505+
`UPDATE ${tableName} SET "processedAt" = $2, "attempts" = "attempts" + 1
506+
WHERE id = (
507+
SELECT id
508+
FROM ${tableName}
509+
WHERE (${tableName}."queue" = $1)
510+
AND (${tableName}."finishedAt" IS NULL)
511+
AND (${tableName}."processedAt" IS NULL)
512+
AND (${tableName}."runAt" <= $2)
513+
AND (${tableName}."notBefore" IS NULL OR ${tableName}."notBefore" <= $2)
514+
ORDER BY ${tableName}."priority" ASC, ${tableName}."runAt" ASC
515+
FOR UPDATE SKIP LOCKED
516+
LIMIT 1
517+
)
518+
RETURNING *;`,
519+
queueName,
520+
now,
521+
);
522+
523+
if (!rows.length || !rows[0]) {
524+
debug(`no jobs found in queue named="${this.name}"`);
525+
return { job: null, successResult: undefined, errorResult: undefined };
526+
}
527+
528+
const { id, payload, attempts, maxAttempts } = rows[0];
529+
const job = new PrismaJob<T, U>(rows[0], {
530+
model: this.model,
531+
client: this.#prisma,
532+
tableName,
533+
signal: this.abortController.signal,
534+
});
535+
536+
// Phase 2: Run worker outside any transaction
537+
try {
538+
debug(`starting worker for job({id: ${id}, payload: ${JSON.stringify(payload)}})`);
539+
const result = await worker(job, this.#prisma);
540+
debug(`finished worker for job({id: ${id}, payload: ${JSON.stringify(payload)}})`);
541+
542+
// Phase 3a: Update success
543+
const date = new Date();
544+
await job.update({ finishedAt: date, progress: 100, result, error: Prisma.DbNull });
545+
successResult = result;
546+
if (deleteOn === "success" || deleteOn === "always") {
547+
await job.delete();
548+
}
549+
} catch (error) {
550+
// Phase 3b: Update error/retry
551+
const date = new Date();
552+
debug(
553+
`failed finishing job({id: ${id}, payload: ${JSON.stringify(payload)}}) with error="${String(error)}"`,
554+
);
555+
const delay = this.config.retryStrategy({ attempts, maxAttempts, error });
556+
const isFinished = delay === null;
557+
if (!isFinished) {
558+
const notBefore = new Date(date.getTime() + delay);
559+
debug(`will retry at notBefore=${notBefore.toISOString()} (attempts=${attempts})`);
560+
await job.update({
561+
processedAt: null,
562+
finishedAt: null,
563+
failedAt: date,
564+
error: serializeError(error),
565+
notBefore,
566+
});
567+
} else {
568+
await job.update({
569+
finishedAt: date,
570+
failedAt: date,
571+
error: serializeError(error),
572+
notBefore: null,
573+
});
574+
}
575+
errorResult = error;
576+
if (deleteOn === "failure" || deleteOn === "always") {
577+
await job.delete();
464578
}
465579
}
466580

467-
return job;
581+
return { job, successResult, errorResult };
582+
}
583+
584+
/**
585+
* Requeues stale jobs that were claimed but never completed (e.g., due to a process crash
586+
* in non-transactional mode). Resets `processedAt` to null for jobs older than the cutoff.
587+
* @param options.olderThanMs - Only requeue jobs claimed more than this many milliseconds ago.
588+
* @returns The number of jobs requeued.
589+
*/
590+
public async requeueStale(options: { olderThanMs: number }): Promise<number> {
591+
const cutoff = new Date(Date.now() - options.olderThanMs);
592+
const { count } = await this.model.updateMany({
593+
where: {
594+
queue: this.name,
595+
processedAt: { lte: cutoff },
596+
finishedAt: null,
597+
},
598+
data: { processedAt: null },
599+
});
600+
return count;
468601
}
469602

470603
/**

src/index.ts

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { PrismaQueue, type PrismaQueueOptions } from "./PrismaQueue";
2-
import type { JobPayload, JobResult, JobWorker } from "./types";
2+
import type { JobPayload, JobResult, JobWorker, JobWorkerWithClient } from "./types";
33

44
export * from "./PrismaJob";
55
export * from "./PrismaQueue";
@@ -9,29 +9,22 @@ export { prepareForJson, restoreFromJson } from "./utils";
99

1010
/**
1111
* Factory function to create a new PrismaQueue instance.
12-
* This function simplifies the instantiation of a PrismaQueue by wrapping it into a function call.
13-
*
14-
* @param options - The configuration options for the PrismaQueue. These options configure how the queue interacts with the database and controls job processing behavior.
15-
* @param worker - The worker function that will process each job. The worker function is called with each dequeued job and is responsible for executing the job's logic.
1612
*
13+
* @param options - Configuration options for the queue.
14+
* @param worker - The worker function that processes each job.
1715
* @returns An instance of PrismaQueue configured with the provided options and worker.
18-
*
19-
* @template T - The type of the job payload. It extends JobPayload which can be further extended to include more specific data types as needed.
20-
* @template U - The type of the result expected from the worker function after processing a job. It extends JobResult which can be specialized based on the application's needs.
21-
*
22-
* @example
23-
* // Create a new queue for email sending jobs
24-
* const emailQueue = createQueue<EmailPayload, void>({
25-
* name: 'emails',
26-
* prisma: new PrismaClient(),
27-
* pollInterval: 5000,
28-
* }, async (job) => {
29-
* await sendEmail(job.payload);
30-
* });
3116
*/
32-
export const createQueue = <T extends JobPayload = JobPayload, U extends JobResult = JobResult>(
17+
export function createQueue<T extends JobPayload = JobPayload, U extends JobResult = JobResult>(
18+
options: PrismaQueueOptions & { transactional: false },
19+
worker: JobWorkerWithClient<T, U>,
20+
): PrismaQueue<T, U>;
21+
export function createQueue<T extends JobPayload = JobPayload, U extends JobResult = JobResult>(
3322
options: PrismaQueueOptions,
3423
worker: JobWorker<T, U>,
35-
) => {
24+
): PrismaQueue<T, U>;
25+
export function createQueue<T extends JobPayload = JobPayload, U extends JobResult = JobResult>(
26+
options: PrismaQueueOptions,
27+
worker: JobWorker<T, U> | JobWorkerWithClient<T, U>,
28+
) {
3629
return new PrismaQueue<T, U>(options, worker);
37-
};
30+
}

0 commit comments

Comments
 (0)