Skip to content

Commit 648163e

Browse files
committed
refactor(redis): do not check for delayed jobs every pop
1 parent 458763e commit 648163e

1 file changed

Lines changed: 14 additions & 2 deletions

File tree

src/drivers/redis_adapter.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ export function redis(config?: RedisConfig) {
5454

5555
export class RedisAdapter implements Adapter {
5656
readonly #connection: Redis
57+
#lastDelayedCheck: Map<string, number> = new Map()
58+
#delayedCheckInterval = 100 // Check delayed jobs every 100ms max
5759

5860
constructor(connection: Redis) {
5961
this.#connection = connection
@@ -72,8 +74,8 @@ export class RedisAdapter implements Adapter {
7274
}
7375

7476
async popFrom(queue: string): Promise<JobData | null> {
75-
// First, move any ready delayed jobs to the regular queue
76-
await this.#processDelayedJobs(queue)
77+
// Check delayed jobs periodically, not on every pop
78+
await this.#maybeProcessDelayedJobs(queue)
7779

7880
// Pop from priority queue (sorted set) - highest priority (lowest score) first
7981
const queueContent = await this.#connection.zpopmin(`${redisKey}::${queue}`)
@@ -85,6 +87,16 @@ export class RedisAdapter implements Adapter {
8587
return null
8688
}
8789

90+
async #maybeProcessDelayedJobs(queue: string): Promise<void> {
91+
const now = Date.now()
92+
const lastCheck = this.#lastDelayedCheck.get(queue) || 0
93+
94+
if (now - lastCheck >= this.#delayedCheckInterval) {
95+
this.#lastDelayedCheck.set(queue, now)
96+
await this.#processDelayedJobs(queue)
97+
}
98+
}
99+
88100
push(jobData: JobData): Promise<void> {
89101
return this.pushOn('default', jobData)
90102
}

0 commit comments

Comments
 (0)