Skip to content

Commit 9702c8c

Browse files
committed
Updated readme
1 parent 023e558 commit 9702c8c

1 file changed

Lines changed: 252 additions & 1 deletion

File tree

README.md

Lines changed: 252 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,252 @@
1-
# queue
1+
# Queue
2+
3+
![npm version](https://img.shields.io/npm/v/@plgworks/queue.svg?style=flat)
4+
5+
PLG Works Queue helps publish critical events using EventEmitter and RabbitMQ. All events get published using node EventEmitter and, if configured, events are also published through RabbitMQ, using topic-based exchange.
6+
7+
# Install
8+
9+
```bash
10+
npm install @plgworks/queue --save
11+
```
12+
13+
# Examples:
14+
15+
#### Subscribe to events published through RabbitMQ:
16+
17+
- Basic example on how to listen a specific event. Arguments passed are:
18+
- <b>Events</b> [Array] (mandatory) - List of events to subscribe to.
19+
- <b>Options</b> [object] (mandatory) -
20+
- <b>queue</b> [string] (optional) - Name of the queue on which you want to receive all your subscribed events. These queues and events, published in them, have TTL of 6 days. If a queue name is not passed, a queue with a unique name is created and is deleted when the subscriber gets disconnected.
21+
- <b>ackRequired</b> [number] - (optional) - The delivered message needs ack if passed 1 ( default 0 ). if 1 passed and ack not done, message will redeliver.
22+
- <b>prefetch</b> [number] (optional) - The number of messages released from queue in parallel. In case of ackRequired=1, queue will pause unless delivered messages are acknowledged.
23+
- <b>Callback</b> [function] (mandatory) - Callback method will be invoked whenever there is a new notification.
24+
25+
```js
26+
// Config Strategy for PLG Works Queue.
27+
configStrategy = {
28+
"rabbitmq": {
29+
"username": "guest",
30+
"password": "guest",
31+
"host": "127.0.0.1",
32+
"port": "5672",
33+
"heartbeats": "30",
34+
"enableRabbitmq": 1
35+
}
36+
};
37+
// Import the queue module.
38+
const QueueManager = require('@plgworks/queue');
39+
let unAckCount = 0; // Number of unacknowledged messages.
40+
41+
const subscribe = async function() {
42+
let queueManagerInstance = await QueueManager.getInstance(configStrategy);
43+
queueManagerInstance.subscribeEvent.rabbit(
44+
["event.ProposedBrandedToken"], // List of events
45+
{
46+
queue: 'myQueue',
47+
ackRequired: 1, // When set to 1, all delivered messages MUST get acknowledge.
48+
broadcastSubscription: 1, // When set to 1, it will subscribe to broadcast channel and receive all broadcasted messages.
49+
prefetch:10
50+
},
51+
function(msgContent){
52+
// Please make sure to return promise in callback function.
53+
// On resolving the promise, the message will get acknowledged.
54+
// On rejecting the promise, the message will be re-queued (noAck)
55+
return new Promise(async function(onResolve, onReject) {
56+
// Incrementing unacknowledged message count.
57+
unAckCount++;
58+
console.log('Consumed message -> ', msgContent);
59+
response = await processMessage(msgContent);
60+
61+
// Complete the task and in the end of all tasks done
62+
if(response == success){
63+
// The message MUST be acknowledged here.
64+
// To acknowledge the message, call onResolve
65+
// Decrementing unacknowledged message count.
66+
unAckCount--;
67+
onResolve();
68+
} else {
69+
//in case of failure to requeue same message.
70+
onReject();
71+
}
72+
73+
})
74+
75+
});
76+
};
77+
// Gracefully handle SIGINT, SIGTERM signals.
78+
// Once SIGINT/SIGTERM signal is received, programme will stop consuming new messages.
79+
// But, the current process MUST handle unacknowledged queued messages.
80+
process.on('SIGINT', function () {
81+
console.log('Received SIGINT, checking unAckCount.');
82+
const f = function(){
83+
if (unAckCount === 0) {
84+
process.exit(1);
85+
} else {
86+
console.log('waiting for open tasks to be done.');
87+
setTimeout(f, 1000);
88+
}
89+
};
90+
setTimeout(f, 1000);
91+
});
92+
93+
function rmqError(err) {
94+
console.log('rmqError occured.', err);
95+
process.emit('SIGINT');
96+
}
97+
// Event published from package in case of internal error.
98+
process.on('rmq_error', rmqError);
99+
subscribe();
100+
```
101+
102+
- Example on how to listen to multiple events with one subscriber.
103+
104+
```js
105+
// Config Strategy for PLG Works Queue.
106+
configStrategy = {
107+
"rabbitmq": {
108+
"username": "guest",
109+
"password": "guest",
110+
"host": "127.0.0.1",
111+
"port": "5672",
112+
"heartbeats": "30",
113+
"enableRabbitmq": 1
114+
}
115+
};
116+
// Import the queue module.
117+
const QueueManager = require('@plgworks/queue');
118+
const subscribeMultiple = async function() {
119+
let queueManagerInstance = await QueueManager.getInstance(configStrategy);
120+
queueManagerInstance.subscribeEvent.rabbit(
121+
["event.ProposedBrandedToken", "obBoarding.registerBrandedToken"],
122+
{},
123+
function(msgContent){
124+
console.log('Consumed message -> ', msgContent)
125+
});
126+
};
127+
subscribeMultiple();
128+
```
129+
130+
#### Subscribe to local events published through EventEmitter:
131+
132+
- Basic example on how to listen a specific event. Arguments passed are:
133+
- <b>Events</b> (mandatory) - List of events to subscribe to.
134+
- <b>Callback</b> (mandatory) - Callback method will be invoked whenever there is a new notification.
135+
136+
```js
137+
// Config Strategy for PLG Works Queue.
138+
configStrategy = {
139+
"rabbitmq": {
140+
"username": "guest",
141+
"password": "guest",
142+
"host": "127.0.0.1",
143+
"port": "5672",
144+
"heartbeats": "30",
145+
"enableRabbitmq": 1
146+
}
147+
};
148+
// Import the queue module.
149+
const QueueManager = require('@plgworks/queue');
150+
const subscribeLocal = async function() {
151+
let queueManagerInstance = await QueueManager.getInstance(configStrategy);
152+
queueManagerInstance.subscribeEvent.local(["event.ProposedBrandedToken"],
153+
function(msgContent){
154+
console.log('Consumed message -> ', msgContent)
155+
});
156+
};
157+
subscribeLocal();
158+
```
159+
160+
#### Publish Notifications:
161+
162+
- All events are by default published using EventEmitter and if configured, through RabbitMQ as well.
163+
164+
```js
165+
// Config Strategy for PLG Works Queue.
166+
configStrategy = {
167+
"rabbitmq": {
168+
"username": "guest",
169+
"password": "guest",
170+
"host": "127.0.0.1",
171+
"port": "5672",
172+
"heartbeats": "30",
173+
"connectionTimeoutSec": "60",
174+
"enableRabbitmq": 1
175+
}
176+
};
177+
// Import the Queue module.
178+
const QueueManager = require('@plgworks/queue');
179+
const publish = async function() {
180+
let queueManagerInstance = await QueueManager.getInstance(configStrategy);
181+
queueManagerInstance.publishEvent.perform(
182+
{
183+
topics:["event.ProposedBrandedToken"],
184+
broadcast: 1, // When set to 1 message will be broadcasted to all channels. 'topics' parameter should not be sent.
185+
publishAfter: 1000, // message to be sent after milliseconds.
186+
publisher: 'MyPublisher',
187+
message: {
188+
kind: "event_received",
189+
payload: {
190+
event_name: 'ProposedBrandedToken',
191+
params: {
192+
// params of the event
193+
},
194+
contract_address: 'contract address',
195+
chain_id: 'Chain id',
196+
chain_kind: 'kind of the chain'
197+
}
198+
}
199+
});
200+
};
201+
publish();
202+
```
203+
204+
#### Pause and Restart queue consumption:
205+
206+
- We also support pause and start queue consumption. According to your logical condition, you can fire below events from your process to cancel or restart consumption respectively.
207+
208+
```js
209+
210+
// Config Strategy for PLG Works Queue.
211+
let configStrategy = {
212+
"rabbitmq": {
213+
"username": "guest",
214+
"password": "guest",
215+
"host": "127.0.0.1",
216+
"port": "5672",
217+
"heartbeats": "30",
218+
"enableRabbitmq": 1
219+
}
220+
};
221+
let queueConsumerTag = null;
222+
// Import the queue module.
223+
const QueueManager = require('@plgworks/queue');
224+
const subscribePauseRestartConsume = async function() {
225+
let queueManagerInstance = await QueueManager.getInstance(configStrategy);
226+
queueManagerInstance.subscribeEvent.rabbit(
227+
["event.ProposedBrandedToken", "obBoarding.registerBrandedToken"],
228+
{},
229+
function(msgContent){
230+
console.log('Consumed message -> ', msgContent);
231+
232+
if(some_failure_condition){
233+
process.emit('CANCEL_CONSUME', queueConsumerTag);
234+
}
235+
236+
if(failure_resolve_detected){
237+
process.emit('RESUME_CONSUME', queueConsumerTag);
238+
}
239+
},
240+
function(consumerTag) {
241+
queueConsumerTag = consumerTag;
242+
}
243+
);
244+
};
245+
subscribePauseRestartConsume();
246+
```
247+
248+
249+
# Running test cases
250+
```shell script
251+
./node_modules/.bin/mocha --recursive "./test/**/*.js"
252+
```

0 commit comments

Comments
 (0)