Skip to content

Commit 3acf656

Browse files
committed
feat: Adds support for a context exposed in job processing
This adds suport for a context object, created at the start of an individual job's processing lifecycle and then exposed in emitter events such as ack, fail, ping, and dead. Resolves #13
1 parent 5b41ba4 commit 3acf656

5 files changed

Lines changed: 203 additions & 59 deletions

File tree

README.md

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -93,32 +93,53 @@ const queue = new Queue<SimpleJob>(new MemoryDriver("default"), "docmq");
9393

9494
#### `new Queue()` options
9595

96-
`new Queue<T>(driver: Driver, name: string, options?: QueueOptions)`
96+
```ts
97+
new Queue<
98+
TData,
99+
TAck = unknown,
100+
TFail extends Error = Error,
101+
TContext = DefaultContext,
102+
>(driver: Driver, name: string, options?: QueueOptions)
103+
```
97104

98105
- `driver` a Driver implementation to use such as the `MemoryDriver`
99106
- `name` a string for the queue's name
100107
- `options?` additional options
101108
- `retention.jobs?` number of seconds to retain jobs with no further work. Default `3600` (1 hour)
102109
- `statInterval?` number of seconds between emitting a `stat` event with queue statistics, defaults to `5`
103110

111+
### A Note on TypeScript
112+
113+
This library uses TypeScript to provide a better developer experience regarding the objects passed into your queue and the responses your job processor provides back to DocMQ. There are four main types used throughout this documentation, and all are set during the creation of the `Queue` class.
114+
115+
`TData` refers specifically to the typing of your **job payload**. It's the payload you're expecting to pass when calling `enqueue()`, and it's the payload you're expecting to receive inside of your `process()` callback.
116+
117+
`TAck = unknown` refers to the typing of your **ack response** when calling `api.ack()` inside of your job processor and is by default an unknown type. Setting `TAck` also sets the typings for the `ack` event.
118+
119+
`TFail extends Error = Error` refers to the typing of your **error object** created and passed to `api.fail()` inside of your job processor and defaults to the base `Error` class. Setting `TFail` also sets the typings for your `fail` event.
120+
121+
`TContext = Record<string, unknown>` refers to the **context object** available during processing, and is by default an empty object. The context is available inside of `process()` as well as inside of event callbacks after the processing context is available (`ack`, `fail`, `ping`, `dead`, etc). A `DefaultContext` is made available as a convienence for the Record definition.
122+
104123
### Adding a Job to the Queue
105124

106125
```ts
107126
await queue.enqueue({
108127
ref: "sample-id",
109-
/* SimpleJob */ payload: {
128+
/* TData */ payload: {
110129
success: true,
111130
},
112131
});
113132
```
114133

115-
#### `enqueue()` Options
134+
#### `enqueue()` API
116135

117-
`queue.enqueue(job: JobDefinition<T> | JobDefinition<T>[])`
136+
```ts
137+
queue.enqueue(job: JobDefinition<TData> | JobDefinition<TData>[])
138+
```
118139

119-
- `job` the JSON Job object, consisting of
140+
- `job` the JSON Job object (or an array of job objects), consisting of
120141
- `ref?: string` an identifier for the job, allowing future `enqueue()` calls to replace the job with new data. Defaults to a v4 UUID
121-
- `payload: T` the job's payload which will be saved and sent to the handler
142+
- `payload: TData` the job's payload which will be saved and sent to the handler
122143
- `runAt?: Date` a date object describing when the job should run. Defaults to `now()`
123144
- `runEvery?: string | null` Either a cron interval or an ISO-8601 duration, or `null` to remove recurrence
124145
- `timezone?: string | null` When using `runEvery`, you can specify a timezone to make DocMQ aware of durations that cross date-modifying thresholds such as Daylight Savings Time; recommended when using cron and duration values outside of UTC.
@@ -155,7 +176,7 @@ export interface LinearRetryStrategy {
155176

156177
```ts
157178
queue.process(
158-
async (job /* SampleJob */, api) => {
179+
async (job: TData, api: HandlerAPI<TAck, TFail, TContext>) => {
159180
await api.ack();
160181
},
161182
{
@@ -166,27 +187,29 @@ queue.process(
166187

167188
#### `process()` Options
168189

169-
`queue.process(handler: JobHandler<T, A, F>, config?: ProcessorConfig)`
190+
`queue.process(handler: JobHandler<T, A, F>, config?: ProcessorConfig<C>)`
170191

171192
- `handler` the job handler function, taking the job `T` and the api as arguments, returns a promise
172193
- `config?: ProcessorConfig` an optional configuration for the processor including
173194
- `pause?: boolean` should the processor wait to be started, default `false`
174195
- `concurrency?: number` the number of concurrent processor loops to run, default `1`
175196
- `visibility?: number` specify the visibility window (how long a job is held for by default) in seconds, default `30`
176197
- `pollInterval?: number` as a fallback, define how often to check for new jobs in the event that driver does not support evented notifications. Defaults to `5`
198+
- `createContext?: () => Promise<TContext> | TContext` generates a unique context of type `TContext` for this run. It will be available in the handler API.
177199

178200
#### `api` Methods and Members
179201

180-
- `api.ref` (string) the ref value of the job
181-
- `api.attempt` (number) the attempt number for this job
182-
- `api.visible` (number) the number of seconds this job was originally reserved for
183-
- `api.ack(result: A)` acknowlegde the job, marking it complete, and scheduling future work
184-
- `api.fail(reason: string | F)` fail the job and emit the `reason`, scheduling a retry if required
202+
- `api.ref` (`string`) the ref value of the job
203+
- `api.attempt` (`number`) the attempt number for this job
204+
- `api.visible` (`number`) the number of seconds this job was originally reserved for
205+
- `api.context` (`TContext`) the context object, generated for this run
206+
- `api.ack(result: TAck)` acknowlegde the job, marking it complete, and scheduling future work
207+
- `api.fail(reason: string | TFail)` fail the job and emit the `reason`, scheduling a retry if required
185208
- `api.ping(extendBy: number)` on a long running job, extend the runtime by `extendBy` seconds
186209

187210
### Events
188211

189-
The `Queue` object has a large number of emitted events available through `queue.events`. It extends `EventEmitter`, and the most common events are below:
212+
The `Queue` object has a large number of emitted events available through `queue.events`. It extends `EventEmitter`, and the most common events are below. Events related to the processing of a job (`ack`, `fail`, `dead`, and `ping`) will all receive `context: TContext` as a second argument to the event callback
190213

191214
- `ack` when a job was acked successfully
192215
- `fail` when a job was failed

src/queue.ts

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ import {
1515
type JobDefinition,
1616
type Driver,
1717
type EmitterJob,
18-
ProcessAPI,
18+
type ProcessAPI,
19+
type DefaultContext,
1920
} from "./types.js";
2021
import { Worker } from "./worker.js";
2122
import {
@@ -70,17 +71,22 @@ const resetStats = (): QueueStats => ({
7071
* })
7172
* ```
7273
*/
73-
export class Queue<T, A = unknown, F extends Error = Error> {
74+
export class Queue<
75+
TData,
76+
TAck = unknown,
77+
TFail extends Error = Error,
78+
TContext = DefaultContext
79+
> {
7480
/**
7581
* An emitter associated with all interesting events that a queue can create
7682
* See: {@link Emitter}
7783
*/
78-
events: Emitter<T, A, F>;
84+
events: Emitter<TData, TAck, TFail, TContext>;
7985

8086
protected name: string;
8187
protected driver: Driver;
8288
protected opts: Required<QueueOptions>;
83-
protected workers: Worker<T, A, F>[];
89+
protected workers: Worker<TData, TAck, TFail, TContext>[];
8490
protected destroyed: boolean;
8591
protected statInterval?: ReturnType<typeof setInterval>;
8692
protected stats: QueueStats;
@@ -104,7 +110,7 @@ export class Queue<T, A = unknown, F extends Error = Error> {
104110
this.destroyed = false;
105111
this.workers = [];
106112
this.driver = driver;
107-
this.events = new EventEmitter() as Emitter<T, A, F>;
113+
this.events = new EventEmitter() as Emitter<TData, TAck, TFail, TContext>;
108114
this.opts = {
109115
retention: {
110116
jobs: options?.retention?.jobs ?? 3600,
@@ -162,7 +168,7 @@ export class Queue<T, A = unknown, F extends Error = Error> {
162168
* Add a job to DocMQ
163169
* @param job A job, specified by {@link JobDefinition}
164170
*/
165-
async enqueue(job: JobDefinition<T> | JobDefinition<T>[]) {
171+
async enqueue(job: JobDefinition<TData> | JobDefinition<TData>[]) {
166172
const bulkJobs = Array.isArray(job) ? job : [job];
167173

168174
if (this.destroyed) {
@@ -252,14 +258,12 @@ export class Queue<T, A = unknown, F extends Error = Error> {
252258
);
253259

254260
// split into success/failure
255-
const success: JobDefinition<T>[] = [];
256-
const failure: JobDefinition<T>[] = [];
257-
const errors: unknown[] = [];
261+
const success: JobDefinition<TData>[] = [];
262+
const failure: JobDefinition<TData>[] = [];
258263
results.forEach((r, idx) => {
259264
const j = bulkJobs[idx];
260265
if (r.status === "rejected") {
261266
failure.push(j);
262-
errors.push(r.reason);
263267
} else {
264268
success.push(j);
265269
}
@@ -274,7 +278,6 @@ export class Queue<T, A = unknown, F extends Error = Error> {
274278
"Unable to add the included jobs to the queue"
275279
);
276280
err.jobs = failure;
277-
err.errors = errors;
278281
this.events.emit("error", err);
279282
}
280283
}
@@ -340,7 +343,10 @@ export class Queue<T, A = unknown, F extends Error = Error> {
340343
* visibility window for processing, and change how often DocMQ polls for new
341344
* events when in an idle state.
342345
*/
343-
process(handler: JobHandler<T, A, F>, config?: ProcessorConfig): ProcessAPI {
346+
process(
347+
handler: JobHandler<TData, TAck, TFail, TContext>,
348+
config?: ProcessorConfig<TContext>
349+
): ProcessAPI {
344350
if (this.destroyed) {
345351
throw new Error("Cannot process a destroyed queue");
346352
}
@@ -402,14 +408,15 @@ export class Queue<T, A = unknown, F extends Error = Error> {
402408
// map into a collection of async functions and then run them
403409
next
404410
.map((doc) => async () => {
405-
const w = new Worker<T, A, F>({
411+
const w = new Worker<TData, TAck, TFail, TContext>({
406412
driver: this.driver,
407413
name: this.name,
408414
doc,
409-
payload: Queue.decodePayload<T>(doc.payload),
415+
payload: Queue.decodePayload<TData>(doc.payload),
410416
handler,
411417
emitter: this.events,
412418
visibility,
419+
createContext: config?.createContext,
413420
});
414421
this.workers.push(w);
415422
this.events.emit("process", {
@@ -567,7 +574,7 @@ export class Queue<T, A = unknown, F extends Error = Error> {
567574

568575
/** Add the stat listeners, using our own event system to capture outcomes */
569576
protected enableStats() {
570-
const onFail = (info: EmitterJob<T, A, F>) => {
577+
const onFail = (info: EmitterJob<TData, TAck, TFail>) => {
571578
this.stats.outcomes.failure += 1;
572579

573580
let errorType = "Error";

src/types.ts

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ type Returnable = void | null | undefined;
88
/** Makes all keys in T required */
99
type RequireKeyed<T, K extends keyof T> = T & { [P in K]-?: T[P] };
1010

11+
/** The default context for jobs when processed */
12+
export type DefaultContext = Record<string, unknown>;
13+
1114
export interface QueueOptions {
1215
/** Specify alternate retentions for message types */
1316
retention?: {
@@ -21,7 +24,7 @@ export interface QueueOptions {
2124
statInterval?: number;
2225
}
2326

24-
export interface ProcessorConfig {
27+
export interface ProcessorConfig<C> {
2528
/** Should the processor be paused on creation? If so, no events will be called until you emit a "start" event. */
2629
pause?: boolean;
2730
/** The number of concurrent handlers to run, defaults to `5`. Jobs tend to be IO bound, increasing this number allows for more jobs to run in parallel, but at a higher RPU load in serverless environments such as Mongo Atlas */
@@ -35,6 +38,7 @@ export interface ProcessorConfig {
3538
* seconds. Defaults to `5`
3639
*/
3740
pollInterval?: number;
41+
createContext?: () => MaybePromise<C>;
3842
}
3943

4044
export interface FixedRetryStrategy {
@@ -153,13 +157,18 @@ export interface EmitterJob<T = unknown, A = unknown, F = unknown> {
153157
next?: Date;
154158
}
155159

156-
export type EmitterJobWithPayload<T, A, F> = RequireKeyed<
157-
EmitterJob<T, A, F>,
160+
export type EmitterJobWithPayload<TData, TAck, TFail> = RequireKeyed<
161+
EmitterJob<TData, TAck, TFail>,
158162
"payload"
159163
>;
160164

161165
/** DocMQ's EventEmitter makes it easy to attach logging or additional behavior to your workflow */
162-
export type Emitter<T, A, F extends Error = Error> = EventEmitter<{
166+
export type Emitter<
167+
TData,
168+
TAck,
169+
TFail extends Error = Error,
170+
TContext = DefaultContext
171+
> = EventEmitter<{
163172
/** Triggered when the Processor loop goes idle, meaning 0 jobs are currently in-process */
164173
idle: () => void;
165174
/** A debug message with additional logging details */
@@ -177,17 +186,24 @@ export type Emitter<T, A, F extends Error = Error> = EventEmitter<{
177186
/** The processor is stopping */
178187
stop: () => void;
179188
/** A set of jobs was added to the queue */
180-
add: (jobs: JobDefinition<T>[]) => void;
189+
add: (jobs: JobDefinition<TData>[]) => void;
181190
/** A job was pulled for processing */
182-
process: (info: EmitterJob<T, A, F>) => void;
191+
process: (info: EmitterJob<TData, TAck, TFail>) => void;
183192
/** A job was completed successfully */
184-
ack: (info: EmitterJobWithPayload<T, A, F>) => void;
193+
ack: (
194+
info: EmitterJobWithPayload<TData, TAck, TFail>,
195+
context: TContext
196+
) => void;
185197
/** A job has failed one of its execution attempts */
186-
fail: (info: EmitterJob<T, A, F>) => void;
198+
fail: (info: EmitterJob<TData, TAck, TFail>, context: TContext) => void;
187199
/** A job has failed all of its execution attempts */
188-
dead: (info: EmitterJob<T, A, F>) => void;
200+
dead: (info: EmitterJob<TData, TAck, TFail>, context: TContext) => void;
189201
/** A job asked to extend its visibility window */
190-
ping: (info: EmitterJob<T, A, F>, extendBy: number) => void;
202+
ping: (
203+
info: EmitterJob<TData, TAck, TFail>,
204+
extendBy: number,
205+
context: TContext
206+
) => void;
191207
/** A report of statistics for this queue */
192208
stats: (stats: QueueStats & { queue: string }) => void;
193209
}>;
@@ -208,27 +224,38 @@ export interface FailureRetryOptions {
208224
attempt?: number;
209225
}
210226

211-
export interface HandlerApi<A = unknown, F extends Error = Error> {
227+
export interface HandlerApi<
228+
TAck = unknown,
229+
TFail extends Error = Error,
230+
TContext = DefaultContext
231+
> {
212232
/** The reference value for the job */
213233
ref: string;
214234
/** The number of attempts made for this job */
215235
attempt: number;
216236
/** How long (seconds) the Job was initially reserved for */
217237
visible: number;
238+
/** The current context for this execution */
239+
context: TContext;
218240
/** Acknowledge "ack" the job, marking it as successfully handled */
219-
ack: (result?: A) => Promise<void>;
241+
ack: (result?: TAck) => Promise<void>;
220242
/** Fail the job, triggering any requeue/rescheduling logic */
221243
fail: (
222-
error: DocMQError | F | string,
244+
error: DocMQError | TFail | string,
223245
retryOptions?: FailureRetryOptions
224246
) => Promise<void>;
225247
/** Request to extend the running time for the current job */
226248
ping: (extendBy: number) => Promise<void>;
227249
}
228250

229-
export type JobHandler<T = unknown, A = unknown, F extends Error = Error> = (
230-
payload: T,
231-
api: HandlerApi<A, F>
251+
export type JobHandler<
252+
TData,
253+
TAck = unknown,
254+
TFail extends Error = Error,
255+
TContext = DefaultContext
256+
> = (
257+
payload: TData,
258+
api: HandlerApi<TAck, TFail, TContext>
232259
) => Promise<unknown>;
233260

234261
/** The DriverEmitter controls events related to the handling of the DB driver */
@@ -306,14 +333,20 @@ export interface Driver<Schema = unknown, Table = unknown, TxInfo = unknown> {
306333
destroy(): Returnable;
307334
}
308335

309-
export interface WorkerOptions<T, A, F extends Error = Error> {
336+
export interface WorkerOptions<
337+
TData,
338+
TAck,
339+
TFail extends Error = Error,
340+
TContext = DefaultContext
341+
> {
310342
driver: Driver;
311343
name: string;
312344
doc: QueueDoc;
313-
payload: T;
314-
handler: JobHandler<T, A, F>;
315-
emitter: Emitter<T, A, F>;
345+
payload: TData;
346+
handler: JobHandler<TData, TAck, TFail, TContext>;
347+
emitter: Emitter<TData, TAck, TFail, TContext>;
316348
visibility: number;
349+
createContext?: () => MaybePromise<TContext>;
317350
}
318351

319352
export interface QueueStats {

0 commit comments

Comments
 (0)