Skip to content

Commit ba69213

Browse files
committed
refactor(redis): remove verrou in favor of atomic lua script
1 parent 648163e commit ba69213

21 files changed

Lines changed: 588 additions & 638 deletions

benchmark/bullmq/harness.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Queue, Worker } from 'bullmq'
1+
import { Queue, Worker, type ConnectionOptions } from 'bullmq'
22
import { Redis } from 'ioredis'
33
import { barrier, type BenchmarkOptions, type BenchmarkResult } from '../helpers.ts'
44

@@ -19,7 +19,7 @@ export async function run(options: BenchmarkOptions): Promise<BenchmarkResult> {
1919
await clearQueue(connection)
2020

2121
const queue = new Queue('benchmark', {
22-
connection,
22+
connection: connection as ConnectionOptions,
2323
defaultJobOptions: {
2424
removeOnComplete: true,
2525
removeOnFail: true,
@@ -43,7 +43,7 @@ export async function run(options: BenchmarkOptions): Promise<BenchmarkResult> {
4343
next()
4444
},
4545
{
46-
connection,
46+
connection: connection as ConnectionOptions,
4747
concurrency: options.concurrency,
4848
}
4949
)

benchmark/run.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,7 @@ async function runBenchmarks(config: BenchmarkConfig = defaultConfig) {
6969
grouped.get(key)!.push(result)
7070
}
7171

72-
console.log(
73-
'Jobs\tConc.\t@boringnode/queue\t\tBullMQ\t\t\tDiff'
74-
)
72+
console.log('Jobs\tConc.\t@boringnode/queue\t\tBullMQ\t\t\tDiff')
7573
console.log('-'.repeat(90))
7674

7775
for (const [key, group] of grouped) {
@@ -82,9 +80,7 @@ async function runBenchmarks(config: BenchmarkConfig = defaultConfig) {
8280
const boringnodeStr = boringnode
8381
? `${boringnode.elapsed}ms (${boringnode.jobsPerSecond.toFixed(0)} j/s)`
8482
: '-'
85-
const bullmqStr = bullmq
86-
? `${bullmq.elapsed}ms (${bullmq.jobsPerSecond.toFixed(0)} j/s)`
87-
: '-'
83+
const bullmqStr = bullmq ? `${bullmq.elapsed}ms (${bullmq.jobsPerSecond.toFixed(0)} j/s)` : '-'
8884

8985
let diff = ''
9086
if (boringnode && bullmq) {

package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@
3636
},
3737
"dependencies": {
3838
"@lukeed/ms": "^2.0.2",
39-
"@poppinss/utils": "^6.10.1",
40-
"@verrou/core": "^0.5.1"
39+
"@poppinss/utils": "^6.10.1"
4140
},
4241
"devDependencies": {
4342
"@adonisjs/eslint-config": "^2.1.2",

src/contracts/adapter.ts

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,55 @@
1-
import type { LeaseManager } from '#contracts/lease_manager'
2-
import type { JobData, LeaseConfig } from '#types/main'
1+
import type { JobData } from '#types/main'
2+
3+
export interface AcquiredJob extends JobData {
4+
acquiredAt: number
5+
}
36

47
export interface Adapter {
5-
createLeaseManager(config: LeaseConfig): LeaseManager
8+
/**
9+
* Set the worker ID for this adapter instance.
10+
* Required before calling pop methods when consuming jobs.
11+
*/
12+
setWorkerId(workerId: string): void
613

7-
size(): Promise<number>
8-
sizeOf(queue: string): Promise<number>
14+
/**
15+
* Pop the next available job from the default queue.
16+
* The driver handles locking internally.
17+
*/
18+
pop(): Promise<AcquiredJob | null>
19+
20+
/**
21+
* Pop the next available job from a specific queue.
22+
* The driver handles locking internally.
23+
*/
24+
popFrom(queue: string): Promise<AcquiredJob | null>
25+
26+
/**
27+
* Blocking pop that waits for a job to be available.
28+
* Supported by Redis adapter.
29+
*/
30+
popAndWait?(queue: string, timeout: number): Promise<AcquiredJob | null>
31+
32+
/**
33+
* Mark a job as completed and remove it from active set.
34+
*/
35+
completeJob(jobId: string, queue: string): Promise<void>
36+
37+
/**
38+
* Mark a job as failed permanently.
39+
*/
40+
failJob(jobId: string, queue: string, error?: Error): Promise<void>
41+
42+
/**
43+
* Retry a job - move back to pending queue with incremented attempts.
44+
*/
45+
retryJob(jobId: string, queue: string, retryAt?: Date): Promise<void>
946

1047
push(jobData: JobData): Promise<void>
1148
pushOn(queue: string, jobData: JobData): Promise<void>
12-
1349
pushLater(jobData: JobData, delay: number): Promise<void>
1450
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void>
1551

16-
pop(): Promise<JobData | null>
17-
popFrom(queue: string): Promise<JobData | null>
18-
52+
size(): Promise<number>
53+
sizeOf(queue: string): Promise<number>
1954
destroy(): Promise<void>
2055
}

src/contracts/lease_manager.ts

Lines changed: 0 additions & 6 deletions
This file was deleted.

0 commit comments

Comments
 (0)