Skip to content

Commit 458763e

Browse files
committed
chore: add some benchmark
1 parent 1da0daa commit 458763e

6 files changed

Lines changed: 575 additions & 1 deletion

File tree

benchmark/boringnode/harness.ts

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import { Redis } from 'ioredis'
2+
import { Worker } from '#src/worker'
3+
import { Job } from '#src/job'
4+
import { Locator } from '#src/locator'
5+
import { redis } from '#drivers/redis_adapter'
6+
import { barrier, type BenchmarkOptions, type BenchmarkResult } from '../helpers.ts'
7+
import type { QueueManagerConfig } from '#types/main'
8+
9+
// Barrier callback - set before each benchmark run
10+
let onJobComplete: (() => boolean) | null = null
11+
12+
class BenchmarkJob extends Job<{ i: number }> {
13+
async execute() {
14+
// No-op - just measure queue overhead
15+
onJobComplete?.()
16+
}
17+
}
18+
19+
async function clearQueue(host: string, port: number) {
20+
const cleanupConnection = new Redis({ host, port })
21+
const keys = await cleanupConnection.keys('boringnode::queue::*')
22+
if (keys.length > 0) {
23+
await cleanupConnection.del(...keys)
24+
}
25+
await cleanupConnection.quit()
26+
}
27+
28+
export async function run(options: BenchmarkOptions): Promise<BenchmarkResult> {
29+
const host = process.env.REDIS_HOST || 'localhost'
30+
const port = Number.parseInt(process.env.REDIS_PORT || '6379', 10)
31+
32+
const connection = new Redis({
33+
host,
34+
port,
35+
keyPrefix: 'boringnode::queue::',
36+
})
37+
38+
await clearQueue(host, port)
39+
40+
// Setup barrier for completion tracking
41+
const { done, next } = barrier(options.numRuns)
42+
onJobComplete = next
43+
44+
const config: QueueManagerConfig = {
45+
default: 'redis',
46+
adapters: {
47+
redis: redis(connection),
48+
},
49+
locations: [''],
50+
worker: {
51+
concurrency: options.concurrency,
52+
pollingInterval: 1, // Very short polling for benchmarks
53+
},
54+
}
55+
56+
Locator.register('BenchmarkJob', BenchmarkJob)
57+
58+
const adapter = config.adapters.redis()
59+
const worker = new Worker(config)
60+
61+
// Enqueue all jobs first
62+
for (let i = 0; i < options.numRuns; ++i) {
63+
await adapter.pushOn('default', {
64+
id: `job-${i}`,
65+
name: 'BenchmarkJob',
66+
payload: { i },
67+
attempts: 0,
68+
})
69+
}
70+
71+
const startTime = Date.now()
72+
73+
// Start worker in background
74+
void worker.start(['default'])
75+
76+
// Wait for all jobs to complete
77+
await done
78+
79+
const elapsed = Date.now() - startTime
80+
81+
// Stop worker (also closes the connection) and cleanup
82+
await worker.stop()
83+
await clearQueue(host, port)
84+
85+
Locator.clear()
86+
87+
return {
88+
library: '@boringnode/queue',
89+
numRuns: options.numRuns,
90+
concurrency: options.concurrency,
91+
elapsed,
92+
jobsPerSecond: (options.numRuns / elapsed) * 1000,
93+
}
94+
}
95+
96+
if (import.meta.url === `file://${process.argv[1]}`) {
97+
const numRuns = Number.parseInt(process.env.NUM_RUNS || '1000', 10)
98+
const concurrency = Number.parseInt(process.env.CONCURRENCY || '1', 10)
99+
100+
run({ numRuns, concurrency })
101+
.then((result) => {
102+
if (process.stdout.isTTY) {
103+
console.log(
104+
`Ran ${result.numRuns} jobs through @boringnode/queue with concurrency ${result.concurrency} in ${result.elapsed}ms`
105+
)
106+
} else {
107+
console.log(result.elapsed)
108+
}
109+
})
110+
.catch((error) => {
111+
console.error('Benchmark failed:', error)
112+
process.exit(1)
113+
})
114+
}

benchmark/bullmq/harness.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import { Queue, Worker } from 'bullmq'
2+
import { Redis } from 'ioredis'
3+
import { barrier, type BenchmarkOptions, type BenchmarkResult } from '../helpers.ts'
4+
5+
async function clearQueue(connection: Redis) {
6+
const keys = await connection.keys('bull:benchmark:*')
7+
if (keys.length > 0) {
8+
await connection.del(...keys)
9+
}
10+
}
11+
12+
export async function run(options: BenchmarkOptions): Promise<BenchmarkResult> {
13+
const connection = new Redis({
14+
host: process.env.REDIS_HOST || 'localhost',
15+
port: Number.parseInt(process.env.REDIS_PORT || '6379', 10),
16+
maxRetriesPerRequest: null,
17+
})
18+
19+
await clearQueue(connection)
20+
21+
const queue = new Queue('benchmark', {
22+
connection,
23+
defaultJobOptions: {
24+
removeOnComplete: true,
25+
removeOnFail: true,
26+
},
27+
})
28+
29+
// Enqueue all jobs first (before worker starts)
30+
for (let i = 0; i < options.numRuns; ++i) {
31+
await queue.add('job', { i })
32+
}
33+
34+
const { done, next } = barrier(options.numRuns)
35+
36+
const startTime = Date.now()
37+
38+
// Create worker AFTER all jobs are enqueued (pure dequeue test)
39+
const worker = new Worker(
40+
'benchmark',
41+
async () => {
42+
// No-op - just measure queue overhead
43+
next()
44+
},
45+
{
46+
connection,
47+
concurrency: options.concurrency,
48+
}
49+
)
50+
51+
// Wait for all jobs to complete
52+
await done
53+
54+
const elapsed = Date.now() - startTime
55+
56+
// Cleanup
57+
await worker.close()
58+
await queue.close()
59+
await clearQueue(connection)
60+
await connection.quit()
61+
62+
return {
63+
library: 'BullMQ',
64+
numRuns: options.numRuns,
65+
concurrency: options.concurrency,
66+
elapsed,
67+
jobsPerSecond: (options.numRuns / elapsed) * 1000,
68+
}
69+
}
70+
71+
if (import.meta.url === `file://${process.argv[1]}`) {
72+
const numRuns = Number.parseInt(process.env.NUM_RUNS || '1000', 10)
73+
const concurrency = Number.parseInt(process.env.CONCURRENCY || '1', 10)
74+
75+
run({ numRuns, concurrency })
76+
.then((result) => {
77+
if (process.stdout.isTTY) {
78+
console.log(
79+
`Ran ${result.numRuns} jobs through BullMQ with concurrency ${result.concurrency} in ${result.elapsed}ms`
80+
)
81+
} else {
82+
console.log(result.elapsed)
83+
}
84+
})
85+
.catch((error) => {
86+
console.error('Benchmark failed:', error)
87+
process.exit(1)
88+
})
89+
}

benchmark/helpers.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* Creates a deferred promise that can be resolved externally
3+
*/
4+
export function deferred<T = void>(): {
5+
promise: Promise<T>
6+
resolve: (value: T) => void
7+
reject: (error: Error) => void
8+
} {
9+
let resolve!: (value: T) => void
10+
let reject!: (error: Error) => void
11+
12+
const promise = new Promise<T>((res, rej) => {
13+
resolve = res
14+
reject = rej
15+
})
16+
17+
return { promise, resolve, reject }
18+
}
19+
20+
/**
21+
* A promise-based barrier that resolves when `n` calls to `next()` are made
22+
*/
23+
export function barrier(n: number = 1) {
24+
const { promise, resolve } = deferred<void>()
25+
26+
return {
27+
done: promise,
28+
next() {
29+
--n
30+
if (n < 0) return false
31+
if (n === 0) resolve()
32+
return true
33+
},
34+
}
35+
}
36+
37+
export interface BenchmarkOptions {
38+
numRuns: number
39+
concurrency: number
40+
}
41+
42+
export interface BenchmarkResult {
43+
library: string
44+
numRuns: number
45+
concurrency: number
46+
elapsed: number
47+
jobsPerSecond: number
48+
}
49+
50+
export function formatResult(result: BenchmarkResult): string {
51+
return `${result.library}: ${result.numRuns} jobs with concurrency ${result.concurrency} in ${result.elapsed}ms (${result.jobsPerSecond.toFixed(2)} jobs/sec)`
52+
}

benchmark/run.ts

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
import { formatResult, type BenchmarkOptions, type BenchmarkResult } from './helpers.ts'
2+
import { run as runBoringnode } from './boringnode/harness.ts'
3+
import { run as runBullMQ } from './bullmq/harness.ts'
4+
5+
interface BenchmarkConfig {
6+
numRuns: number[]
7+
concurrency: number[]
8+
}
9+
10+
const defaultConfig: BenchmarkConfig = {
11+
numRuns: [100, 1000, 5000],
12+
concurrency: [1, 5, 10],
13+
}
14+
15+
async function runBenchmarks(config: BenchmarkConfig = defaultConfig) {
16+
const results: BenchmarkResult[] = []
17+
18+
console.log('='.repeat(60))
19+
console.log('Queue Benchmark Suite')
20+
console.log('='.repeat(60))
21+
console.log()
22+
23+
for (const numRuns of config.numRuns) {
24+
for (const concurrency of config.concurrency) {
25+
const options: BenchmarkOptions = { numRuns, concurrency }
26+
27+
console.log(`\nBenchmark: ${numRuns} jobs, concurrency ${concurrency}`)
28+
console.log('-'.repeat(50))
29+
30+
// Run @boringnode/queue benchmark
31+
try {
32+
console.log('Running @boringnode/queue...')
33+
const boringnodeResult = await runBoringnode(options)
34+
results.push(boringnodeResult)
35+
console.log(` ${formatResult(boringnodeResult)}`)
36+
} catch (error) {
37+
console.error(` @boringnode/queue failed:`, error)
38+
}
39+
40+
// Run BullMQ benchmark
41+
try {
42+
console.log('Running BullMQ...')
43+
const bullmqResult = await runBullMQ(options)
44+
results.push(bullmqResult)
45+
console.log(` ${formatResult(bullmqResult)}`)
46+
} catch (error) {
47+
console.error(` BullMQ failed:`, error)
48+
}
49+
50+
// Small delay between runs to let Redis settle
51+
await new Promise((resolve) => setTimeout(resolve, 100))
52+
}
53+
}
54+
55+
// Print summary
56+
console.log()
57+
console.log('='.repeat(60))
58+
console.log('Summary')
59+
console.log('='.repeat(60))
60+
console.log()
61+
62+
// Group by numRuns and concurrency
63+
const grouped = new Map<string, BenchmarkResult[]>()
64+
for (const result of results) {
65+
const key = `${result.numRuns}-${result.concurrency}`
66+
if (!grouped.has(key)) {
67+
grouped.set(key, [])
68+
}
69+
grouped.get(key)!.push(result)
70+
}
71+
72+
console.log(
73+
'Jobs\tConc.\t@boringnode/queue\t\tBullMQ\t\t\tDiff'
74+
)
75+
console.log('-'.repeat(90))
76+
77+
for (const [key, group] of grouped) {
78+
const [numRuns, concurrency] = key.split('-')
79+
const boringnode = group.find((r) => r.library === '@boringnode/queue')
80+
const bullmq = group.find((r) => r.library === 'BullMQ')
81+
82+
const boringnodeStr = boringnode
83+
? `${boringnode.elapsed}ms (${boringnode.jobsPerSecond.toFixed(0)} j/s)`
84+
: '-'
85+
const bullmqStr = bullmq
86+
? `${bullmq.elapsed}ms (${bullmq.jobsPerSecond.toFixed(0)} j/s)`
87+
: '-'
88+
89+
let diff = ''
90+
if (boringnode && bullmq) {
91+
const percentage = ((bullmq.elapsed - boringnode.elapsed) / bullmq.elapsed) * 100
92+
if (percentage > 0) {
93+
diff = `${percentage.toFixed(1)}% faster`
94+
} else {
95+
diff = `${Math.abs(percentage).toFixed(1)}% slower`
96+
}
97+
}
98+
99+
console.log(
100+
`${numRuns}\t${concurrency}\t${boringnodeStr.padEnd(24)}\t${bullmqStr.padEnd(24)}\t${diff}`
101+
)
102+
}
103+
}
104+
105+
// Parse CLI arguments
106+
const args = process.argv.slice(2)
107+
let config = defaultConfig
108+
109+
if (args.includes('--quick')) {
110+
config = {
111+
numRuns: [100],
112+
concurrency: [1],
113+
}
114+
} else if (args.includes('--full')) {
115+
config = {
116+
numRuns: [100, 500, 1000, 2500, 5000, 10000],
117+
concurrency: [1, 5, 10, 25, 50],
118+
}
119+
}
120+
121+
runBenchmarks(config)
122+
.then(() => {
123+
console.log('\nBenchmark completed!')
124+
process.exit(0)
125+
})
126+
.catch((error) => {
127+
console.error('Benchmark suite failed:', error)
128+
process.exit(1)
129+
})

0 commit comments

Comments
 (0)