Skip to content

Commit 9d0f1b7

Browse files
committed
docs: document transactional option and migration steps
1 parent b11313c commit 9d0f1b7

1 file changed

Lines changed: 37 additions & 1 deletion

File tree

README.md

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ main();
123123
| `deleteOn` | `"success" \| "failure" \| "always" \| "never"` | `"never"` | When to delete completed jobs |
124124
| `transactionTimeout` | `number` | `1800000` | Transaction timeout in ms (30 min) |
125125
| `retryStrategy` | `RetryStrategy` | Exponential backoff | Custom retry logic |
126+
| `transactional` | `boolean` | `true` | Run worker inside the dequeue transaction |
126127

127128
### Events
128129

@@ -192,6 +193,35 @@ const queue = createQueue({ name: "email" }, async (job, client) => {
192193
});
193194
```
194195

196+
### Non-Transactional Mode
197+
198+
By default, the worker runs inside the dequeue transaction (exactly-once semantics). Set `transactional: false` to run the worker outside the transaction, giving it access to the full `PrismaClient` with `$transaction` support (at-least-once semantics):
199+
200+
```ts
201+
import { createQueue, type JobWorkerWithClient } from "@mgcrea/prisma-queue";
202+
203+
const queue = createQueue<JobPayload, JobResult>(
204+
{ name: "email", transactional: false },
205+
async (job, client) => {
206+
// client is the full PrismaClient — $transaction is available
207+
await client.$transaction(async (tx) => {
208+
await tx.user.update({ where: { id: 1 }, data: { email: job.payload.email } });
209+
await tx.auditLog.create({ data: { action: "email_updated" } });
210+
});
211+
return { status: 200 };
212+
},
213+
);
214+
```
215+
216+
**Trade-offs**: In non-transactional mode, a process crash between claiming and completing a job can leave it "stuck" (`processedAt` set, `finishedAt` null). Use `requeueStale()` to recover:
217+
218+
```ts
219+
// Requeue jobs claimed more than 5 minutes ago that never completed
220+
const count = await queue.requeueStale({ olderThanMs: 5 * 60 * 1000 });
221+
```
222+
223+
Note: `isLocked()` returns `false` during worker execution in non-transactional mode since the row lock is released after claiming.
224+
195225
### Edge Environments
196226

197227
When using this library in edge environments (Cloudflare Workers, Vercel Edge Functions, etc.) where Prisma's DMMF (Datamodel Meta Format) may not be available, you must provide the `tableName` option:
@@ -326,7 +356,13 @@ queue.on("error", (error) => { /* system/infrastructure errors */ });
326356

327357
**Edge environments require explicit `tableName`**: The library now throws if DMMF is unavailable and no `tableName` is provided (previously it guessed using `snake_case + "s"`).
328358

329-
**Database index reordered**: The index on `QueueJob` changed from `[queue, priority, runAt, finishedAt]` to `[queue, finishedAt, priority, runAt]`. Run a Prisma migration to update the index after upgrading.
359+
**Database index reordered**: The index on `QueueJob` changed from `[queue, priority, runAt, finishedAt]` to `[queue, finishedAt, processedAt, priority, runAt]`. Run a Prisma migration to update the index after upgrading.
360+
361+
**Dequeue now filters on `processedAt IS NULL`**: The dequeue query now requires `processedAt` to be null. Existing in-flight or crashed jobs with non-null `processedAt` and null `finishedAt` will no longer be picked up. Run this migration during a drained-queue window:
362+
363+
```sql
364+
UPDATE queue_jobs SET "processedAt" = NULL WHERE "finishedAt" IS NULL;
365+
```
330366

331367
## Authors
332368

0 commit comments

Comments
 (0)