-
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathjob.ts
More file actions
330 lines (308 loc) · 9.21 KB
/
job.ts
File metadata and controls
330 lines (308 loc) · 9.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
import { JobDispatcher } from './job_dispatcher.js'
import { JobBatchDispatcher } from './job_batch_dispatcher.js'
import { ScheduleBuilder } from './schedule_builder.js'
import type { JobContext, JobOptions } from './types/main.js'
/**
* Abstract base class for all queue jobs.
*
* Extend this class to create your own jobs. Each job must implement
* the `execute()` method which contains the job's business logic.
*
* The constructor is reserved for dependency injection. Payload and context
* are provided separately via the `$hydrate()` method (called by the worker).
*
* @typeParam Payload - The type of data this job receives
*
* @example
* ```typescript
* import { Job } from '@boringnode/queue'
*
* interface SendEmailPayload {
* to: string
* subject: string
* body: string
* }
*
* export default class SendEmailJob extends Job<SendEmailPayload> {
* static options = {
* queue: 'emails',
* maxRetries: 3,
* }
*
* // Constructor is for dependency injection only
* constructor(private mailer: MailerService) {
* super()
* }
*
* async execute() {
* console.log(`Attempt ${this.context.attempt} for job ${this.context.jobId}`)
* await this.mailer.send(this.payload.to, this.payload.subject, this.payload.body)
* }
*
* async failed(error: Error) {
* console.error(`Failed to send email to ${this.payload.to}:`, error)
* }
* }
* ```
*/
export abstract class Job<Payload = any, Output = any> {
#payload!: Payload
#context!: JobContext
#signal?: AbortSignal
/**
* Static options for this job class.
*
* Override this property in subclasses to configure job behavior
* such as queue name, retry policy, timeout, and more.
*
* @example
* ```typescript
* class SendEmailJob extends Job<SendEmailPayload> {
* static options = {
* queue: 'emails',
* maxRetries: 3,
* timeout: '30s',
* }
* }
* ```
*/
static options: JobOptions = {}
/**
* The payload data passed to this job instance.
*
* Contains the data provided when the job was dispatched.
* Available after the job has been hydrated by the worker.
*
* @example
* ```typescript
* async execute() {
* const { to, subject, body } = this.payload
* await sendEmail(to, subject, body)
* }
* ```
*/
get payload(): Payload {
return this.#payload
}
/**
* Context information for the current job execution.
*
* Provides metadata such as job ID, current attempt number,
* queue name, priority, and timing information.
*
* @example
* ```typescript
* async execute() {
* if (this.context.attempt > 1) {
* console.log(`Retry attempt ${this.context.attempt}`)
* }
* console.log(`Processing job ${this.context.jobId} on queue ${this.context.queue}`)
* }
* ```
*/
get context(): JobContext {
return this.#context
}
/**
* The abort signal for timeout handling.
*
* Check `signal.aborted` in long-running operations to handle timeouts gracefully.
*
* @example
* ```typescript
* async execute() {
* for (const item of this.payload.items) {
* if (this.signal?.aborted) {
* throw new Error('Job timed out')
* }
* await processItem(item)
* }
* }
* ```
*/
get signal(): AbortSignal | undefined {
return this.#signal
}
/**
* Hydrate the job with payload, context, and optional abort signal.
*
* This method is called by the worker after instantiation to provide
* the job's runtime data. It should not be called directly by user code.
*
* @param payload - The data to be processed by this job
* @param context - The job execution context
* @param signal - Optional abort signal for timeout handling
*
* @internal
*/
$hydrate(payload: Payload, context: JobContext, signal?: AbortSignal): void {
this.#payload = payload
this.#context = Object.freeze(context)
this.#signal = signal
}
/**
* Dispatch this job to the queue.
*
* Returns a JobDispatcher for fluent configuration before dispatching.
* The job is not actually dispatched until `.run()` is called or the
* dispatcher is awaited.
*
* @param payload - The data to pass to the job
* @returns A JobDispatcher for fluent configuration
*
* @example
* ```typescript
* // Simple dispatch
* await SendEmailJob.dispatch({ to: 'user@example.com', subject: 'Hello' })
*
* // With options
* await SendEmailJob.dispatch({ to: 'user@example.com' })
* .toQueue('high-priority')
* .priority(1)
* .in('5m')
* .run()
* ```
*/
static dispatch<T extends Job>(
this: abstract new (...args: any[]) => T,
payload: T extends Job<infer P> ? P : never
): JobDispatcher<T extends Job<infer P> ? P : never, T extends Job<any, infer O> ? O : never> {
const jobClass = this as unknown as { options?: JobOptions; name: string }
const options = jobClass.options || {}
const jobName = options.name || this.name
const dispatcher = new JobDispatcher<
T extends Job<infer P> ? P : never,
T extends Job<any, infer O> ? O : never
>(jobName, payload)
if (options.queue) {
dispatcher.toQueue(options.queue)
}
if (options.adapter) {
dispatcher.with(options.adapter)
}
if (options.priority !== undefined) {
dispatcher.priority(options.priority)
}
return dispatcher
}
/**
* Dispatch multiple jobs to the queue in a single batch.
*
* Returns a JobBatchDispatcher for fluent configuration before dispatching.
* The jobs are not actually dispatched until `.run()` is called or the
* dispatcher is awaited.
*
* This is more efficient than calling `dispatch()` multiple times as it
* uses batched operations (e.g., Redis pipeline, SQL batch insert).
*
* @param payloads - Array of data to pass to each job
* @returns A JobBatchDispatcher for fluent configuration
*
* @example
* ```typescript
* // Batch dispatch for newsletter
* const { jobIds } = await SendEmailJob.dispatchMany([
* { to: 'user1@example.com', subject: 'Newsletter' },
* { to: 'user2@example.com', subject: 'Newsletter' },
* ])
* .group('newsletter-jan-2025')
* .toQueue('emails')
* .run()
*
* console.log(`Dispatched ${jobIds.length} jobs`)
* ```
*/
static dispatchMany<T extends Job>(
this: abstract new (...args: any[]) => T,
payloads: (T extends Job<infer P> ? P : never)[]
): JobBatchDispatcher<T extends Job<infer P> ? P : never> {
const jobClass = this as unknown as { options?: JobOptions; name: string }
const options = jobClass.options || {}
const jobName = options.name || this.name
const dispatcher = new JobBatchDispatcher<T extends Job<infer P> ? P : never>(jobName, payloads)
if (options.queue) {
dispatcher.toQueue(options.queue)
}
if (options.adapter) {
dispatcher.with(options.adapter)
}
if (options.priority !== undefined) {
dispatcher.priority(options.priority)
}
return dispatcher
}
/**
* Create a schedule for this job.
*
* Returns a ScheduleBuilder for fluent configuration before creating the schedule.
* The schedule is not actually created until `.run()` is called or the
* builder is awaited.
*
* @param payload - The data to pass to the job on each run
* @returns A ScheduleBuilder for fluent configuration
*
* @example
* ```typescript
* // Cron schedule
* await CleanupJob.schedule({ days: 30 })
* .id('cleanup-daily')
* .cron('0 0 * * *')
* .timezone('Europe/Paris')
* .run()
*
* // Interval schedule
* await SyncJob.schedule({ source: 'api' })
* .every('5m')
* .run()
* ```
*/
static schedule<T extends Job>(
this: abstract new (...args: any[]) => T,
payload: T extends Job<infer P> ? P : never
): ScheduleBuilder<T extends Job<infer P> ? P : never> {
const jobClass = this as unknown as { options?: JobOptions; name: string }
const options = jobClass.options || {}
const jobName = options.name || this.name
return new ScheduleBuilder<T extends Job<infer P> ? P : never>(jobName, payload)
}
/**
* Execute the job's business logic.
*
* This method is called by the worker when processing the job.
* Implement your job's logic here.
*
* For timeout handling, use `this.signal` which is available after hydration.
*
* @throws Any error thrown will trigger retry logic (if configured)
*
* @example
* ```typescript
* async execute() {
* for (const item of this.payload.items) {
* if (this.signal?.aborted) {
* throw new Error('Job timed out')
* }
* await processItem(item)
* }
* }
* ```
*/
abstract execute(): Promise<Output>
/**
* Called when the job has permanently failed (after all retries exhausted).
*
* Use this hook for cleanup, logging, or notifications.
* This is optional - implement only if you need failure handling.
*
* @param error - The error that caused the final failure
*
* @example
* ```typescript
* async failed(error: Error) {
* await notifyAdmin(`Job failed: ${error.message}`)
* await cleanup(this.payload)
* }
* ```
*/
failed?(error: Error): Promise<void>
}