Skip to content

Commit 0ec97ac

Browse files
committed
Updated readme and related refactoring
1 parent 9702c8c commit 0ec97ac

5 files changed

Lines changed: 110 additions & 154 deletions

File tree

README.md

Lines changed: 106 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,83 @@
22

33
![npm version](https://img.shields.io/npm/v/@plgworks/queue.svg?style=flat)
44

5-
PLG Works Queue helps publish critical events using EventEmitter and RabbitMQ. All events get published using node EventEmitter and, if configured, events are also published through RabbitMQ, using topic-based exchange.
5+
PLG Works Queue helps with managing subscription and publish critical events using EventEmitter and RabbitMQ. All events get published using node EventEmitter and, if configured, events are also published through RabbitMQ, using topic-based exchange.
66

7-
# Install
7+
## Install
88

99
```bash
1010
npm install @plgworks/queue --save
1111
```
1212

13-
# Examples:
13+
## Initialize
14+
While using this package initialize an instance of queue manager to use to publish an event or subscribe to an event(s). To initialize an instance RabbitMQ configuration is required. Using this instance various functional methods can be invoked. The configuration should include following parameters:
15+
- <b>username</b> [string] (mandatory)
16+
- <b>password</b> [string] (mandatory)
17+
- <b>host</b> [string] (mandatory)
18+
- <b>port</b> [string] (mandatory)
19+
- <b>heartbeats</b> [string] (mandatory) heartbeats defines after what period of time the peer TCP connection should be considered unreachable.
20+
- <b>clusterNodes</b> [Array] (mandatory) - List of RMQ hosts.
21+
- <b>enableRabbitmq</b> [integer] (optional) 0 if local usage.
22+
- <b>switchHostAfterSec</b> [integer] (optional) Wait time before switching RMQ host.
23+
- <b>connectionTimeoutSec</b> [integer] (optional) Wait time for connection to establish.
1424

15-
#### Subscribe to events published through RabbitMQ:
25+
Example snippet to initialize PLG Works queue manager.
1626

17-
- Basic example on how to listen a specific event. Arguments passed are:
18-
- <b>Events</b> [Array] (mandatory) - List of events to subscribe to.
19-
- <b>Options</b> [object] (mandatory) -
27+
```js
28+
// Config Strategy for PLG Works Queue.
29+
configStrategy = {
30+
"rabbitmq": {
31+
"username": "guest",
32+
"password": "guest",
33+
"host": "127.0.0.1",
34+
"port": "5672",
35+
"heartbeats": "30",
36+
"enableRabbitmq": 1
37+
}
38+
};
39+
// Import the queue module.
40+
const QueueManager = require('@plgworks/queue');
41+
const queueManagerInstance = await QueueManager.getInstance(configStrategy);
42+
```
43+
44+
45+
## Methods
46+
PLG Works Queue exposes following 3 methods:
47+
48+
- `queueManagerInstance.subscribeEvent.rabbit(topics, options, readCallback, subscribeCallback)`
49+
- <b>topics</b> [Array] (mandatory) - List of events to subscribe to.
50+
- <b>options</b> [object] (mandatory) -
2051
- <b>queue</b> [string] (optional) - Name of the queue on which you want to receive all your subscribed events. These queues and events, published in them, have TTL of 6 days. If a queue name is not passed, a queue with a unique name is created and is deleted when the subscriber gets disconnected.
21-
- <b>ackRequired</b> [number] - (optional) - The delivered message needs ack if passed 1 ( default 0 ). if 1 passed and ack not done, message will redeliver.
22-
- <b>prefetch</b> [number] (optional) - The number of messages released from queue in parallel. In case of ackRequired=1, queue will pause unless delivered messages are acknowledged.
23-
- <b>Callback</b> [function] (mandatory) - Callback method will be invoked whenever there is a new notification.
52+
- <b>ackRequired</b> [integer] (optional) - The delivered message needs ack if passed 1 ( default 0 ). if 1 passed and ack not done, message will redeliver.
53+
- <b>broadcastSubscription</b> [integer] (optional) - Set to 1, when queue needs to be subscribed to broadcasting events.
54+
- <b>prefetch</b> [integer] (optional) - The number of messages released from queue in parallel. In case of ackRequired=1, queue will pause unless delivered messages are acknowledged.
55+
- <b>readCallback</b> [function] (mandatory) - Callback method will be invoked whenever there is a new notification.
56+
- <b>subscribeCallback</b> [function] (optional) - Callback method to return consumerTag.
57+
58+
<br>
59+
60+
- `queueManagerInstance.subscribeEvent.local( topics, readCallback)`
61+
- <b>topics</b> [Array] (mandatory) - List of events to subscribe to.
62+
- <b>readCallback</b> [function] (mandatory) - Callback method will be invoked whenever there is a new notification.
63+
64+
<br>
65+
66+
- `queueManagerInstance.publishEvent.perform(params)`
67+
- <b>params</b> [object] (mandatory)
68+
- <b>topics</b> [Array] (optional) List of topic messages to publish.
69+
- <b>broadcast</b> [integer] (optional) When set to 1 message will be broadcasted to all channels. Default value is 0.
70+
- <b>publishAfter</b> [integer] (optional) Message to be sent after milliseconds.
71+
- <b>publisher</b> [string] (mandatory) Name of publisher
72+
- <b>message</b> [object] (mandatory)
73+
- <b>kind</b> [string] (madatory) Kind of the message.
74+
- <b>payload</b> [object] (optional) Payload to identify message and extra info.
75+
76+
<br>
77+
78+
79+
## Examples:
80+
81+
#### 1. Subscribe to events published through RabbitMQ:
2482

2583
```js
2684
// Config Strategy for PLG Works Queue.
@@ -34,16 +92,17 @@ configStrategy = {
3492
"enableRabbitmq": 1
3593
}
3694
};
95+
3796
// Import the queue module.
3897
const QueueManager = require('@plgworks/queue');
3998
let unAckCount = 0; // Number of unacknowledged messages.
4099

41100
const subscribe = async function() {
42101
let queueManagerInstance = await QueueManager.getInstance(configStrategy);
43102
queueManagerInstance.subscribeEvent.rabbit(
44-
["event.ProposedBrandedToken"], // List of events
103+
["event.PublicTestEvent"], // List of events
45104
{
46-
queue: 'myQueue',
105+
queue: 'testQueue',
47106
ackRequired: 1, // When set to 1, all delivered messages MUST get acknowledge.
48107
broadcastSubscription: 1, // When set to 1, it will subscribe to broadcast channel and receive all broadcasted messages.
49108
prefetch:10
@@ -99,7 +158,7 @@ process.on('rmq_error', rmqError);
99158
subscribe();
100159
```
101160

102-
- Example on how to listen to multiple events with one subscriber.
161+
#### 2. Listen to multiple events with one subscriber.
103162

104163
```js
105164
// Config Strategy for PLG Works Queue.
@@ -113,12 +172,13 @@ configStrategy = {
113172
"enableRabbitmq": 1
114173
}
115174
};
175+
116176
// Import the queue module.
117177
const QueueManager = require('@plgworks/queue');
118178
const subscribeMultiple = async function() {
119179
let queueManagerInstance = await QueueManager.getInstance(configStrategy);
120180
queueManagerInstance.subscribeEvent.rabbit(
121-
["event.ProposedBrandedToken", "obBoarding.registerBrandedToken"],
181+
["event.PublicTestEvent1", "event.PublicTestEvent2"],
122182
{},
123183
function(msgContent){
124184
console.log('Consumed message -> ', msgContent)
@@ -127,11 +187,7 @@ const subscribeMultiple = async function() {
127187
subscribeMultiple();
128188
```
129189

130-
#### Subscribe to local events published through EventEmitter:
131-
132-
- Basic example on how to listen a specific event. Arguments passed are:
133-
- <b>Events</b> (mandatory) - List of events to subscribe to.
134-
- <b>Callback</b> (mandatory) - Callback method will be invoked whenever there is a new notification.
190+
#### 3. Subscribe to local events published through EventEmitter:
135191

136192
```js
137193
// Config Strategy for PLG Works Queue.
@@ -142,22 +198,23 @@ configStrategy = {
142198
"host": "127.0.0.1",
143199
"port": "5672",
144200
"heartbeats": "30",
145-
"enableRabbitmq": 1
201+
"enableRabbitmq": 0
146202
}
147203
};
204+
148205
// Import the queue module.
149206
const QueueManager = require('@plgworks/queue');
150207
const subscribeLocal = async function() {
151208
let queueManagerInstance = await QueueManager.getInstance(configStrategy);
152-
queueManagerInstance.subscribeEvent.local(["event.ProposedBrandedToken"],
209+
queueManagerInstance.subscribeEvent.local(["event.PublicTestLocalEvent"],
153210
function(msgContent){
154211
console.log('Consumed message -> ', msgContent)
155212
});
156213
};
157214
subscribeLocal();
158215
```
159216

160-
#### Publish Notifications:
217+
#### 4. Publish Notifications:
161218

162219
- All events are by default published using EventEmitter and if configured, through RabbitMQ as well.
163220

@@ -174,34 +231,29 @@ configStrategy = {
174231
"enableRabbitmq": 1
175232
}
176233
};
234+
177235
// Import the Queue module.
178236
const QueueManager = require('@plgworks/queue');
179237
const publish = async function() {
180238
let queueManagerInstance = await QueueManager.getInstance(configStrategy);
181239
queueManagerInstance.publishEvent.perform(
182240
{
183-
topics:["event.ProposedBrandedToken"],
184-
broadcast: 1, // When set to 1 message will be broadcasted to all channels. 'topics' parameter should not be sent.
241+
topics:["event.PublishTestEvent"],
242+
broadcast: 1, // When set to 1 message will be broadcasted to all channels.
185243
publishAfter: 1000, // message to be sent after milliseconds.
186244
publisher: 'MyPublisher',
187245
message: {
188-
kind: "event_received",
189-
payload: {
190-
event_name: 'ProposedBrandedToken',
191-
params: {
192-
// params of the event
193-
},
194-
contract_address: 'contract address',
195-
chain_id: 'Chain id',
196-
chain_kind: 'kind of the chain'
246+
kind: "event_received",
247+
payload: {
248+
// Custom payload for message
249+
}
197250
}
198-
}
199251
});
200252
};
201253
publish();
202254
```
203255

204-
#### Pause and Restart queue consumption:
256+
#### 5. Pause and Restart queue consumption:
205257

206258
- We also support pause and start queue consumption. According to your logical condition, you can fire below events from your process to cancel or restart consumption respectively.
207259

@@ -218,35 +270,37 @@ let configStrategy = {
218270
"enableRabbitmq": 1
219271
}
220272
};
273+
221274
let queueConsumerTag = null;
222275
// Import the queue module.
223276
const QueueManager = require('@plgworks/queue');
224277
const subscribePauseRestartConsume = async function() {
225278
let queueManagerInstance = await QueueManager.getInstance(configStrategy);
226279
queueManagerInstance.subscribeEvent.rabbit(
227-
["event.ProposedBrandedToken", "obBoarding.registerBrandedToken"],
228-
{},
229-
function(msgContent){
230-
console.log('Consumed message -> ', msgContent);
231-
232-
if(some_failure_condition){
233-
process.emit('CANCEL_CONSUME', queueConsumerTag);
234-
}
235-
236-
if(failure_resolve_detected){
237-
process.emit('RESUME_CONSUME', queueConsumerTag);
280+
["event.PublicTestEvent1", "event.PublicTestEvent2"],
281+
{},
282+
function(msgContent){
283+
console.log('Consumed message -> ', msgContent);
284+
285+
if(some_failure_condition){
286+
process.emit('CANCEL_CONSUME', queueConsumerTag);
287+
}
288+
289+
if(failure_resolve_detected){
290+
process.emit('RESUME_CONSUME', queueConsumerTag);
291+
}
292+
},
293+
function(consumerTag) {
294+
queueConsumerTag = consumerTag;
238295
}
239-
},
240-
function(consumerTag) {
241-
queueConsumerTag = consumerTag;
242-
}
243296
);
244297
};
245298
subscribePauseRestartConsume();
246299
```
247300

248301

249-
# Running test cases
302+
## Running test cases
303+
Run following command to execute test cases.
250304
```shell script
251305
./node_modules/.bin/mocha --recursive "./test/**/*.js"
252-
```
306+
```

lib/validator/eventParam.js

Lines changed: 0 additions & 56 deletions
This file was deleted.

lib/validator/init.js

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
const rootPrefix = '../..',
88
responseHelper = require(rootPrefix + '/lib/formatter/response'),
9-
eventParams = require(rootPrefix + '/lib/validator/eventParam'),
109
util = require(rootPrefix + '/lib/util'),
1110
paramErrorConfig = require(rootPrefix + '/config/paramErrorConfig'),
1211
apiErrorConfig = require(rootPrefix + '/config/apiErrorConfig');
@@ -24,40 +23,6 @@ const errorConfig = {
2423
class Init {
2524
constructor() {}
2625

27-
/**
28-
* Perform detailed validation for specific event params
29-
*
30-
* @param {object} params - event parameters
31-
* @param {array} params.topics - on which topic messages
32-
* @param {object} params.message -
33-
* @param {string} params.message.kind - kind of the message
34-
* @param {object} params.message.payload - Payload to identify message and extra info.
35-
*
36-
* @return {Promise<result>}
37-
*/
38-
async detailed(params) {
39-
const oThis = this;
40-
41-
let r = oThis.light(params);
42-
43-
if (r.isFailure()) {
44-
return Promise.resolve(r);
45-
}
46-
47-
let message = params['message'];
48-
49-
if (message['kind'] === 'event_received') {
50-
if (message['payload']['event_name'] === 'StakingIntentConfirmed') {
51-
r = await eventParams.validateStakingIntent(message['payload']['params']);
52-
if (r.isFailure()) {
53-
return Promise.resolve(r);
54-
}
55-
}
56-
}
57-
58-
return Promise.resolve(responseHelper.successWithData({}));
59-
}
60-
6126
/**
6227
* Perform basic validation for specific event params
6328
*

services/publishEvent.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@ class RmqPublishEvent {
2727
*
2828
* @param {object} params - event parameters
2929
* @param {array} params.topics - on which topic messages
30-
* @param {object} params.message -
30+
* @param {string} params.publisher - name of publisher
31+
* @param {object} params.message
3132
* @param {string} params.message.kind - kind of the message
3233
* @param {object} params.message.payload - Payload to identify message and extra info.
34+
* @param {number} [params.broadcast] - boolean to broadcast message to all channels
35+
* @param {number} [params.publishAfter] - message to be sent after miliseconds
3336
*
3437
* @return {Promise<result>}
3538
*/

0 commit comments

Comments
 (0)