Skip to content

gonicus/rabbitmq-demo

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

RabbitMQ Demo

This demo showcases a practical implementation of resilence and load balancing strategies using asynchronous messaging with RabbitMQ. It provides a RabbitMQ cluster with a scalable pool of worker instances. The worker processes are subscribed to a classic queue and a quorum queue following the Competing Consumers pattern. Furthermore, monitoring is provided with the RabbitMQ management interface and a Grafana instance.

System Description

There are three RabbitMQ instances in the deployment in total, which are configured as a cluster. A HA-Proxy in front of the cluster provides load-balancing to show a production like setup. On the messaging client side there is a client service (producer) and 5 worker (consumer) processes by default. The demo simulates a software that operates within the finctional domains user and payment. The workers process commands for user account creation and invoice creation, which are published by the client service. If the task produces a result, the workers will store it in the result backend. In this project a Valkey instance is used as the result backend. In reality the work packages may take ain amount of time, long enough to make it unfeasible for synchronous processing. Hence, the amount of time needed to process a job is configurable and optionally randomly distributed. The figure below shows the containers involved:

To demonstrate different RabbitMQ queue types the user creation is associated with the queue user.account.create.demo-service which is a RabbitMQ classic queue. The queue payment.invoice.create.demo-service is associated with a quorum queue which has stronger gurantees regarding resilience and availability. Besides the command queues the system also contains dead letter queues (DLQ) that contain messages that have expired or contain other errors. The time to live (TTL) is intentionally set very short on the classic queue to one minute for demonstration purposes. Hence, if the workers cannot process all messages in time, the DLQ becomes filled with messages. The following figure shows the message topology:

In the upper figure, the client publishes messages to an exchange. From there the message is routed to one of the queues. Note that this is only a simple example, where only a single queue is bound to each exchange. Commonly there would be multiple queues bound to an exchange. Depending on the binding pattern the message could be routed to multiple queues in that case. If a topic exchange is used, the wildcards # and * may be used in the binding pattern to match multiple routing keys. In the present case, if a message with the routing key user.account.create is published to the exchange user.commands, it will be routed to user.account.create.demo-service. If the message expires due to TTL , the message will be dead lettered to user.dlx and routed to user.dlq.

The dead letter queues, dead letter exchanges and the associated policies are predefined in the rabbitmq configuration config/rabbitmq/definitions.json, because it is assumed that per domain (user or payment) the dead letter queue is shared. Retention policies such as TTL are governed by the broker in the present design. However, the service specific exchanges and queues are still defined by the service, see demo-service/src/demo_service/task.py.

Setup

Before starting the demo it is necessary to download the RabbitMQ Grafana dashboard and place it under config/grafana/dashboards/rabbitmq-dashboard.json so docker compose can mount it into the grafana container. The following one liner will download the dashboard and place it correctly:

curl -L https://grafana.com/api/dashboards/10991/revisions/15/download -o config/grafana/dashboards/rabbitmq-dashboard.json

After performing the upper setup step the cluster can be started with a simple

docker compose up

To stop the demo run

docker compose down

Usage

As described before the demo is designed as a client that publishes jobs to be processed by workers. The amount of time taken by the workers to process a single job and the amount of jobs are configurable. Messages can be published to the classic queue with the following syntax

docker compose run --rm --build client <Number of Messages> <Processing time>

plus some optional flags that can be obtained with --help after client. Replace <Number of Messages> and <Processing time> with the desired numbers. The processing time is fixed by default and takes <Processing time> seconds. Optionally the processing time can be randomly distributed around <Processing time> when --random is used. This ensures a more even distribution of message consumption by the workers. To publish 400 messages that take 0.1 seconds to be processed run

docker compose run --rm --build client 400 0.1

The processing of the messages can be seen as a short burst when opening one of the monitoring tools in the browser

Service Address
Grafana http://127.0.0.1:8000/grafana
RabbitMQ Management http://127.0.0.1:8000/rabbitmq
HA-Proxy Statistics http://127.0.0.1:8000/haproxy

The user name to access is always admin and the password is admin.

To publish a message to the quorum queue use the --quorum flag e.g.

docker compose run --rm --build client --quorum 400 0.1

Optionally one can also publish messages repeatedly. With the --repeat-after flag. For example, to check the dead letter queues one could use

docker compose run --rm --build client --quorum --repeat-after 1 400 0.1

This exceeds the workers capacity and one can see how to dead letter queues slowly get filled. Visit http://127.0.0.1:8000/rabbitmq/#/queues and watch the queues payment.invoice.create.demo-service and payment.dlq.

Evaluation

The focus of this demo is to showcase load balancing and fault tolerance through asynchronous messaging. The load balancing aspect is mainly enabled through scaling the number of worker processes. In the following section this load balancing aspect shall be demonstrated by scaling the number of worker processes and measuring the total time taken to process all work packages.

Load balancing

When running 5 workers and sending 1000 messages that take 0.1 seconds to process a delay of 1000 * 0.1s / 5 = 20s plus some delay due to publishing and receiving the messages is expected. This can be verified with

docker compose run --rm --build client --time --quorum 1000 0.1

this should give a result like

Finished sending tasks after 5.095s
All tasks finished in 21.580s

Processing of the tasks starts once they are published, hence tasks will be processed by workers before all tasks are sent. When doubling the number of workers with

docker compose up --scale worker=10

and running the previous command, the time taken to process the tasks is roughly halved

Finished sending tasks after 5.174s
All tasks finished in 10.890s

Sending tasks with a duration of 0s

docker compose run --rm --build client --time --quorum 1000 0

gives the result

Finished sending tasks after 5.763s
All tasks finished in 7.110s

This shows that the message throughput is currently mainly limited by the publishing of the messages at about 1000 / 7.11 * messages/s = 141 messages/s. This is partly because Celery will wait for the broker to acknowledge every single task before sending the next task. The broker could handle a much higher throughput.

After scaling the number of workers up and down one might notice that scaling is possible without halting other services or terminating the deployment. Moreover, workers can join the worker pool with a simple docker compose up --scale without changing any configurational settings of other services. The ease of scaling is a major advantage of asynchronous messaging. One caveat should be noted, scaling workers down can be problematic if proper precautions are not taken. Workers may be interrupted mid-task, hence the system must be configured to account for this. When using SIGTERM to shutdown a worker, celery will let workers finish their current tasks. In the present example when scaling down workers with --scale docker will stop the workers by sending a SIGTERM. If the worker process does not shutdown within the grace period a SIGKILL will be sent. That means tasks might still be interrupted. The correct way to handle failures, of course, depends on the specific system. A setting which is very relevant in this regard is whether messages or acknowledged on delivery (automatic ack) or once a worker has finished processing its task (manual ack). The relevant celery settings are task_acks_late and task_reject_on_worker_lost.

Fault tolerance

Regarding fault tolerance two primary aspects are important, loss of borker cluster node instances and loss of worker instances. As seen in the previous section adding or removing worker instances from the worker pool is possible. Hence, other workers typically compensate the loss of worker instances. Another aspect of asynchronous messaging that might be of interest here is the decoupling of workers from the producers. Messages are only dispatched to the workers when they can process them. Hence, the workers become more resilent regarding a flood of messages. If the amount of messages exceeds the capacity of the workers, messages might be processed delayed or dead lettered. However, at least some of the messages are processed and the system can continue to operate.

The present RabbitMQ cluster consists of three nodes. Cluster details are replicated equally accross all nodes with the exception of queues. Whether a queue is replicated depends on the queue type. Messages in the demo system are either published to the classic queue user.account.create.demo-service or the quorum queue payment.invoice.create.demo-service. Classic queues do not support replication and are stored on one node only, but can be accessed from any node. Since any node can dispatch messages to workers it is not necessary and typically not the case that a worker is connected to the exact node a queue is stored on. Of course, this will require some extra network traffic to fetch the data. Normally queue data is persisted on disk. For demonstration purposes this docker compose deployment does not define any persistent volumes. Hence, when a node is removed by removing the container, it's queue data is also lost.

To find the node that hosts the classic queue user.account.create.demo-service, the node column of http://127.0.0.1:8000/rabbitmq/#/queues can be checked. If the RabbitMQ management interface shows e.g. rabbit@rabbitmq-3, the queue is provided by rabbit@rabbitmq-3 with the host rabbitmq-3. That container and can be disabled using

docker compose down rabbitmq-3

This will typically trigger a series of restarts for the worker processes with a NOT_FOUND error message. The queue is bound to the node rabbit@rabbitmq-3 and RabbitMQ will not let the queue be redeclared if the node is down. In this case one would probably need manual intervention and the cluster would not heal itself automatically. If high availability is a requirement then quorum queues should be chosen instead of classic queues. To start the deployment with user.account.create.demo-service as a quorum queue run

docker compose down
docker compose --env-file quorum.env up

When removing a node again with

docker compose down rabbitmq-3

The worker services will continue to operate, although a cluster node has been lost. This can be checked by running

docker compose run --rm --build client --time --quorum 400 0.1

If cluster nodes are lost the cluster can continue to operate as long as there is a majority of nodes remaining. In the present case that means one node can be lost. Depending on the node that is removed a raft election might be triggered in case the node was a queue leader for the specific queue. This follows since Raft is the consensus algorithm used to replicate the quorum queues in RabbitMQ.