Skip to content

DamirDenis-Tudor/ktor-server-rabbitmq

Repository files navigation

Deployment Status Pull Request Checks codecov

Overview

  • This plugin provides access to major core functionalities of the com.rabbitmq:amqp-client and dev.kourier:amqp-client libraries.

Features

  • Integrated with coroutines and has a separate dispatcher.
  • Includes a built-in connection/channel management system.
  • Gives the possibility to interact directly with the java library.
  • Seamlessly integrates with the Kotlin DSL, making it readable, maintainable, and easy to use.

Table of Contents

  1. Choose a distribution
  2. Installation
  3. Queue Binding Example
  4. Producer Example
  5. Consumer Example
  6. Advanced Consumer Example
  7. Library Calls Example
  8. Multiple Instances Example
  9. Custom Coroutine Scope Example
  10. Serialization Fallback Example
  11. Dead Letter Queue Example
  12. Logging

Choose a distribution

This library is available in multiple distributions. Choose the one that best fits your needs:

Default Distribution

This distribution is an alias to the Java Client distribution for the JVM platform, and to the Kourier Client distribution for Kotlin Native platforms.

dependencies {
    implementation("io.github.damirdenis-tudor:ktor-server-rabbitmq:<version>")
}

Java Client Distribution

This distribution uses the official RabbitMQ Java client library (com.rabbitmq:amqp-client) under the hood, and is available only for the JVM platform.

dependencies {
    implementation("io.github.damirdenis-tudor:ktor-server-rabbitmq-java:<version>")
}

Kourier (Pure Kotlin & Kotlin Multiplatform) Client Distribution

This distribution uses the pure Kotlin Kourier client library (dev.kourier:amqp-client) under the hood, and is available for both JVM and Kotlin Native platforms.

dependencies {
    implementation("io.github.damirdenis-tudor:ktor-server-rabbitmq-kourier:<version>")
}

Usage

Installation

install(RabbitMQ) {
    uri = "amqp://<user>:<password>@<address>:<port>"
    defaultConnectionName = "<default_connection>"
    connectionAttempts = 20
    attemptDelay = 10
    dispatcherThreadPollSize = 4
    tlsEnabled = false
}

Queue binding example

rabbitmq {
    queueBind {
        queue = "demo-queue"
        exchange = "demo-exchange"
        routingKey = "demo-routing-key"
        queueDeclare {
            queue = "demo-queue"
            durable = true
        }
        exchangeDeclare {
            exchange = "demo-exchange"
            type = "direct"
        }
    }
}

Producer example

rabbitmq {
    repeat(10) {
        basicPublish {
            exchange = "demo-exchange"
            routingKey = "demo-routing-key"
            message { "Hello World!" }
        }
    }
}

Consumer Example

rabbitmq {
    basicConsume {
        autoAck = true
        queue = "demo-queue"
        deliverCallback<String> { message ->
            logger.info("Received message: $message")
        }
    }
}

Consumer Example with coroutinePollSize

rabbitmq {
    connection(id = "consume") {
        basicConsume {
            autoAck = true
            queue = "demo-queue"
            dispacher = Dispacher.IO
            coroutinePollSize = 1_000
            deliverCallback<String> { message ->
                logger.info("Received message: $message")
                delay(30)
            }
        }
    }
}

Consumer Example with coroutinePollSize

rabbitmq {
    connection(id = "consume") {
        basicConsume {
            autoAck = true
            queue = "demo-queue"
            dispacher = Dispacher.IO
            coroutinePollSize = 1_000
            deliverCallback<String> { message ->
                logger.info("Received message: $message")
                delay(30)
            }
        }
    }
}

Library Calls Example

rabbitmq {
    libChannel(id = 2) {
        basicPublish("demo-queue", "demo-routing-key", null, "Hello!".toByteArray())

        val consumer = object : DefaultConsumer(channel) {
            override fun handleDelivery(
                consumerTag: String?,
                envelope: Envelope?,
                properties: AMQP.BasicProperties?,
                body: ByteArray?
            ) {

            }
        }

        basicConsume("demo-queue", true, consumer)
    }
}
rabbitmq {
    libConnection(id = "lib-connection") {
        val channel = createChannel()

        channel.basicPublish("demo-queue", "demo-routing-key", null, "Hello!".toByteArray())

        val consumer = object : DefaultConsumer(channel) {
            override fun handleDelivery(
                consumerTag: String?,
                envelope: Envelope?,
                properties: AMQP.BasicProperties?,
                body: ByteArray?
            ) {

            }
        }

        channel.basicConsume("demo-queue", true, consumer)
    }
}

Multiple Instances Example

fun Application.module() {
    // Production RabbitMQ cluster
    install(RabbitMQ(instanceName = "production")) {
        uri = "amqp://prod-user:prod-pass@prod-rabbitmq:5672"
        dispatcherThreadPollSize = 8
    }

    // Analytics RabbitMQ cluster
    install(RabbitMQ(instanceName = "analytics")) {
        uri = "amqp://analytics-user:analytics-pass@analytics-rabbitmq:5672"
        dispatcherThreadPollSize = 4
    }

    // Setup production queues
    rabbitmq(instanceName = "production") {
        queueBind {
            queue = "orders"
            exchange = "orders-exchange"
            routingKey = "order.created"
            queueDeclare {
                queue = "orders"
                durable = true
            }
            exchangeDeclare {
                exchange = "orders-exchange"
                type = "direct"
            }
        }
    }

    // Setup analytics queues
    rabbitmq(instanceName = "analytics") {
        queueBind {
            queue = "events"
            exchange = "analytics-exchange"
            routingKey = "user.action"
            queueDeclare {
                queue = "events"
                durable = false
            }
            exchangeDeclare {
                exchange = "analytics-exchange"
                type = "topic"
            }
        }
    }

    // Process critical orders
    rabbitmq(instanceName = "production") {
        basicConsume {
            queue = "orders"
            autoAck = false
            deliverCallback<String> { message ->
                // Process order
                processOrder(message.body)
                basicAck {
                    deliveryTag = message.envelope.deliveryTag
                }
                
                // Send analytics event
                rabbitmq(instanceName = "analytics") {
                    basicPublish {
                        exchange = "analytics-exchange"
                        routingKey = "user.action"
                        message { "Order processed: ${message.body}" }
                    }
                }
            }
        }
    }
}

Multiple Connections Example

fun Application.module() {
    install(RabbitMQ) {
        uri = "amqp://guest:guest@localhost:5672"
        defaultConnectionName = "default"
    }

    rabbitmq {
        // Setup queues and exchanges
        queueBind {
            queue = "orders-queue"
            exchange = "orders-exchange"
            routingKey = "order.created"
            queueDeclare {
                queue = "orders-queue"
                durable = true
            }
            exchangeDeclare {
                exchange = "orders-exchange"
                type = "direct"
            }
        }
    }

    // Producer connection
    rabbitmq {
        connection(id = "producer") {
            repeat(100) {
                basicPublish {
                    exchange = "orders-exchange"
                    routingKey = "order.created"
                    message { "Order #$it created" }
                }
            }
        }
    }

    // Consumer connection with high throughput
    rabbitmq {
        connection(id = "consumer") {
            basicConsume {
                queue = "orders-queue"
                autoAck = false
                dispatcher = Dispatchers.IO
                coroutinePollSize = 10
                deliverCallback<String> { message ->
                    // Process order
                    println("Processing: ${message.body}")
                    delay(100) // Simulate processing time
                    
                    basicAck {
                        deliveryTag = message.envelope.deliveryTag
                    }
                }
            }
        }
    }
}

Custom Coroutine Scope Example

val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
    println("ExceptionHandler got $throwable")
}

val rabbitMQScope = CoroutineScope(SupervisorJob() + exceptionHandler)

// ...

install(RabbitMQ) {
    connectionAttempts = 3
    attemptDelay = 10
    uri = rabbitMQContainer.amqpUrl
    scope = rabbitMQScope
}

// ...

rabbitmq {
    connection(id = "consume") {
        basicConsume {
            autoAck = true
            queue = "demo-queue"
            dispacher = Dispacher.IO
            coroutinePollSize = 1_000
            deliverCallback<String> { message ->
                throw Exception("business logic exception")
            }
        }
    }
}

Serialization Fallback Example

@Serializable
data class Message(
    var content: String
)

fun Application.module() {
    install(RabbitMQ) {
        uri = "amqp://guest:guest@localhost:5672"
        dispatcherThreadPollSize = 3
    }

    rabbitmq {
        queueBind {
            queue = "test-queue"
            exchange = "test-exchange"
            queueDeclare {
                queue = "test-queue"
                arguments = mapOf(
                    "x-dead-letter-exchange" to "dlx",
                    "x-dead-letter-routing-key" to "dlq-dlx"
                )
            }
            exchangeDeclare {
                exchange = "test-exchange"
                type = "fanout"
            }
        }
    }

    rabbitmq {
        repeat(10) {
            basicPublish {
                exchange = "test-exchange"
                message {
                    Message(content = "Hello world!")
                }
            }
        }
        repeat(10) {
            basicPublish {
                exchange = "test-exchange"
                message { "Hello world!" }
            }
        }
    }

    rabbitmq {
        basicConsume {
            queue = "test-queue"
            autoAck = false
            deliverCallback<Message> { message ->
                println("Received as Message: ${message.body}")
            }
            deliverFailureCallback { message ->
                println("Could not serialize, received as ByteArray: ${message.body}")
            }
        }
    }
}

Dead Letter Queue Example

@Serializable
data class Message(
    var content: String
)

fun Application.module() {
    install(RabbitMQ) {
        uri = "amqp://guest:guest@localhost:5672"
        dispatcherThreadPollSize = 3
    }

    rabbitmq {
        queueBind {
            queue = "dlq"
            exchange = "dlx"
            routingKey = "dlq-dlx"
            queueDeclare {
                queue = "dlq"
                durable = true
            }
            exchangeDeclare {
                exchange = "dlx"
                type = "direct"
            }
        }

        queueBind {
            queue = "test-queue"
            exchange = "test-exchange"
            queueDeclare {
                queue = "test-queue"
                arguments = mapOf(
                    "x-dead-letter-exchange" to "dlx",
                    "x-dead-letter-routing-key" to "dlq-dlx"
                )
            }
            exchangeDeclare {
                exchange = "test-exchange"
                type = "fanout"
            }
        }
    }

    rabbitmq {
        repeat(100) {
            basicPublish {
                exchange = "test-exchange"
                message {
                    Message(content = "Hello world!")
                }
            }
        }
    }

    rabbitmq {
        basicConsume {
            queue = "test-queue"
            autoAck = false
            deliverCallback<Message> { message ->
                basicReject {
                    deliveryTag = message.envelope.deliveryTag
                    requeue = false
                }
            }
        }

        basicConsume {
            queue = "dlq"
            autoAck = true
            deliverCallback<Message> { message ->
                println("Received message in dead letter queue: ${message.body}")
            }
        }
    }
}

Logging

  • In order to set a logging level to this library add this line in logback.xml file:
<logger name="io.github.damir.denis.tudor.ktor.server.rabbitmq" level="DEBUG"/>