From 62ceca932784eb530d20e69362c91fc0eab4dab6 Mon Sep 17 00:00:00 2001 From: Tarun Telang Date: Sun, 5 Apr 2026 18:23:16 +0530 Subject: [PATCH] Adding content for chapter 13 about MP Reactive Messaging Adding chapter 13 about MP Reactive Messaging. It covers the following topics. * Event-driven architecture (EDA) overview * Reactive Streams fundamentals * Adding dependencies (Maven, Gradle, and connector dependencies) * Defining messaging logic with CDI beans * Leveraging CDI features in messaging beans * Channels and wiring * Declarative messaging with `@Incoming` and `@Outgoing` * Supported method signatures and message processing patterns * Programmatic messaging with `@Channel` and `@Emitter` * Managing backpressure in reactive messaging * Integration with other MicroProfile specifications * Reactive stream operations * Graph Validation and Deployment Checks --- modules/ROOT/pages/chapter13/chapter13.adoc | 1105 +++++++++++++++++++ 1 file changed, 1105 insertions(+) create mode 100644 modules/ROOT/pages/chapter13/chapter13.adoc diff --git a/modules/ROOT/pages/chapter13/chapter13.adoc b/modules/ROOT/pages/chapter13/chapter13.adoc new file mode 100644 index 0000000..5e00691 --- /dev/null +++ b/modules/ROOT/pages/chapter13/chapter13.adoc @@ -0,0 +1,1105 @@ += MicroProfile Reactive Messaging +:reactive-messaging-spec-name: MicroProfile Reactive Messaging +:reactive-messaging-spec-version: 3.0.1 +:mp-version: 7.1 + +== Introduction + +Modern cloud-native applications often need to process high-volume, real-time data and react to events as they occur. Traditional request-response communication is not well-suited for long-running, asynchronous interactions between services. + +{reactive-messaging-spec-name} provides an annotation-driven programming model for building asynchronous, message-driven applications with support for backpressure. In practice, applications connect to brokers such as Kafka or AMQP-based systems by using runtime-specific connectors. + +Using this model, developers can build scalable and resilient microservices that communicate through events rather than tightly coupled synchronous calls. + +== Topics Covered + +* Event-driven architecture (EDA) overview +* Reactive Streams fundamentals +* Adding dependencies (Maven, Gradle, and connector dependencies) +* Defining messaging logic with CDI beans +* Leveraging CDI features in messaging beans +* Channels and wiring +* Declarative messaging with `@Incoming` and `@Outgoing` +* Supported method signatures and message processing patterns +* Programmatic messaging with `@Channel` and `@Emitter` +* Managing backpressure in reactive messaging +* Integration with other MicroProfile specifications +* Reactive stream operations +* Best practices and production patterns + +== Event-Driven Architecture (EDA) Overview + +Event-Driven Architecture (EDA) is a software design paradigm in which system behavior is driven by events. + +An event represents a meaningful change in state that other components can observe and process. For example, when an order is placed, one service can emit an event that payment, inventory, and shipping services react to independently. + +Components communicate asynchronously by producing and consuming events. This loose coupling makes the system more flexible, scalable, and resilient than a purely request-response approach. EDA is particularly well-suited to distributed systems, microservices, and real-time applications. + +=== Comparison with Request-Response Architecture + +Traditional request-response architecture follows a synchronous, call-and-wait pattern where a client sends a request and blocks until it receives a response. +This model has significant limitations in distributed systems: + +[cols="2,3,3", options="header"] +|=== +|Aspect |Request-Response |Event-Driven + +|Communication style +|Synchronous, blocking +|Asynchronous, non-blocking + +|Coupling +|Tight coupling between client and server +|Loose coupling between producers and consumers + +|Scalability +|Limited by synchronous calls and blocking threads +|Highly scalable with asynchronous processing and backpressure + +|Failure handling +|Failures directly impact the caller +|Failures are more isolated; messages can often be retried or reprocessed depending on the broker and configuration + +|Data flow +|Point-to-point request/response +|Publish-subscribe, queues, or streams + +|=== + +== Reactive Streams + +Reactive Streams defines a standard for asynchronous stream processing with backpressure. It enables producers and consumers to coordinate data flow safely and efficiently. + +Backpressure prevents producers from overwhelming consumers by allowing consumers to request only the amount of data they are ready to process. + +MicroProfile Reactive Messaging builds on this model and provides an annotation-driven programming style using concepts such as channels, `@Incoming`, and `@Outgoing`. + +In many real-world applications, reactive messaging complements rather than replaces REST. A common hybrid approach is to initiate user-facing operations such as checkout synchronously, and then use events for downstream steps such as payment authorization, inventory reservation, and shipment updates. + +For learning, local development, and testing, many runtimes provide an internal in-memory connector. This connector allows messages to flow between channels within the same application, without requiring an external broker. For production systems, use a messaging provider such as Apache Kafka or RabbitMQ. + +=== Maven Dependency + +Add the following dependency to your `pom.xml`: + +[source,xml] +---- + + org.eclipse.microprofile.reactive.messaging + microprofile-reactive-messaging-api + ${reactive-messaging-spec-version} + provided + +---- + +This dependency provides the MicroProfile Reactive Messaging API. The `provided` scope indicates that the implementation is supplied by your MicroProfile-compatible runtime or application server rather than bundled inside your application. + +=== Gradle Dependency + +Add the following dependency to your `build.gradle`: + +[source,groovy] +---- +dependencies { + compileOnly "org.eclipse.microprofile.reactive.messaging:microprofile-reactive-messaging-api:3.0.1" +} +---- + +This dependency provides the MicroProfile Reactive Messaging API. The `compileOnly` configuration indicates that the implementation is supplied by your MicroProfile-compatible runtime or application server rather than bundled inside your application. + +=== Connector Dependencies + +MicroProfile Reactive Messaging requires connectors to integrate with messaging systems such as Apache Kafka, AMQP, or JMS. Connectors are runtime-specific implementations that handle communication with the underlying messaging infrastructure. They allow your application to produce and consume messages without dealing directly with low-level protocol details. + +The exact connector dependency depends on the runtime you are using. The connector name and required client libraries vary by runtime. For Open Liberty, the connector name is `liberty-kafka`. For Quarkus or SmallRye-based runtimes, use `smallrye-kafka`. + +Depending on your runtime and packaging model, you may also need to make the Kafka client libraries available to the server or application classloader. + +Refer to your runtime's documentation for the correct connector artifact coordinates, supported brokers, and configuration details. For a complete channel configuration example, see the <<_named_channels_for_message_flow>> section. + +== Defining Messaging Logic with CDI Beans + +MicroProfile Reactive Messaging leverages Contexts and Dependency Injection (CDI) to define message processing logic through declarative annotations on CDI beans. This approach keeps messaging code clean and maintainable without requiring developers to work directly with low-level messaging APIs or manually implement Reactive Streams interfaces. + +=== Basic Structure of Messaging Beans + +A messaging bean in MicroProfile Reactive Messaging is a standard CDI bean with methods annotated to indicate how they participate in message processing. These beans are discovered by CDI, and the reactive messaging runtime wires the annotated methods into the message flow. + +*Key Characteristics*: + +* *CDI Managed*: Beans are managed by the CDI container, enabling dependency injection and lifecycle management. +* *Annotation-Driven*: Methods are annotated with `@Incoming`, `@Outgoing`, or both to define message flows. +* *Flexible Scopes*: Beans can use various CDI scopes such as `@ApplicationScoped`, `@RequestScoped`, or `@Dependent`, depending on the use case. + +*Basic Bean Structure*: + +[source,java] +---- +import io.microprofile.tutorial.store.payment.entity.Order; +import io.microprofile.tutorial.store.payment.entity.OrderStatus; +import jakarta.enterprise.context.ApplicationScoped; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +@ApplicationScoped +public class PaymentRequestHandler { + + //... + + @Incoming("order-created") + @Outgoing("payment-authorized") + public Order processPayment(Order order) { + LOGGER.info(() -> "Processing payment for order " + order.getOrderId()); + order.setStatus(OrderStatus.PAID); + return order; + } +} +---- + +In this example: + +* The bean is application-scoped, so one instance serves the application. +* The order is created through a synchronous checkout request. +* The `processPayment` method then receives `Order` messages from the `order-created` channel. +* It performs a simple transformation by updating the order status to `PAID`. +* It sends the updated order to the `payment-authorized` channel for downstream processing by services such as Inventory or Order status handlers. + +== Leveraging CDI Features in Messaging Beans + +Since messaging beans are CDI-managed, you can leverage the full power of CDI features such as dependency injection, scopes, interceptors, and events. This allows you to build complex message-processing logic while keeping your code clean, maintainable, and easy to test. + +=== Dependency Injection in Messaging Beans + +You can inject other CDI beans, services, or resources into your messaging beans to perform tasks such as persistence, external API calls, notifications, or business-rule validation. For example, an `OrderStatusEventHandler` bean can inject an `OrderService` to update the order in the database and a `NotificationService` to send payment confirmations: + +[source,java] +---- +// ... + +@ApplicationScoped +public class OrderStatusEventHandler { + + @Inject + OrderService orderService; + + @Inject + NotificationService notificationService; + + @Incoming("payment-authorized") + public void onPaymentAuthorized(Long orderId) { + Order updatedOrder = orderService.updateOrderStatus(orderId, OrderStatus.PAID); + notificationService.notifyOrderStatusChanged(updatedOrder); + } +} +---- + +In this example: + +* The `OrderStatusEventHandler` bean injects an `OrderService` for updating the persisted order and a `NotificationService` for sending payment confirmations. +* The `onPaymentAuthorized` method reacts to the `payment-authorized` channel, showing how the fulfillment workflow continues asynchronously after the initial REST checkout has completed. +* This separation keeps the user-facing request/response path simple while moving cross-service processing into dedicated messaging handlers. + +=== Scope Management in Messaging Beans + +Different CDI scopes affect bean lifecycle, sharing, and concurrency behavior. The Reactive Messaging specification mandates support for the following CDI scopes: + +* `@ApplicationScoped`: A single instance exists for the entire application. This scope is ideal for stateless processors and is the most common choice for messaging beans. +* `@Dependent`: A new instance is created for each injection point. Use this scope with care, because it can result in multiple message-processing pipelines being created. + +Implementations can support other scopes, such as `@RequestScoped`, but behavior outside the two mandated scopes is not defined by the specification. In practice, choose the scope that best matches the state-management and threading needs of your messaging logic. + +=== Benefits of CDI Integration + +Using CDI with messaging beans provides several important benefits: + +* *Simplified Code*: You do not need to manually manage bean lifecycles or wire dependencies yourself. CDI handles this for you, allowing you to focus on business logic rather than infrastructure concerns. +* *Modularity*: Messaging logic can be organized across multiple beans with a clear separation of concerns. +* *Integration*: CDI works seamlessly with other Jakarta EE and MicroProfile specifications such as Config, Fault Tolerance, Metrics, and Health. +* *Lifecycle Management*: CDI ensures beans are initialized and destroyed consistently, which helps reduce resource-management issues. +* *Interceptors and Decorators*: You can add cross-cutting concerns such as logging, security, or auditing without cluttering the core processing logic. + +== Channels and Wiring + +Channels are the fundamental building blocks of message flow in MicroProfile Reactive Messaging. They serve as logical conduits connecting message producers, processors, and consumers. Channels can be internal, where beans in the same application are wired together by name, or external, where the channel is backed by a broker such as Kafka. + +=== Named Channels for Message Flow + +In the MicroProfile e-commerce flow, the user-facing checkout starts through a synchronous REST request, and then the fulfillment workflow continues through messaging channels: + +* the `shoppingcart` component submits an order to the existing `order` service through REST +* the `order` service persists the order and publishes an `OrderCreated` event to the `order-created` channel +* the `payment` service consumes that event and can publish a payment result such as `payment-authorized` +* the `inventory` and `shipment` services can also then react to these events independently +* additional processors can filter, enrich, audit, or route the same event stream without changing the public REST API + +*Channel Types*: + +* *Internal Channels*: Connect components within the same application when methods are wired together by channel name. +* *External Channels*: Connect services across processes or instances through an external message broker, configured using MicroProfile Config. + +*Channel Configuration*: + +Channels are configured using MicroProfile Config, typically in `microprofile-config.properties` or as environment variables. Replace `` with the appropriate connector for your runtime, such as `liberty-kafka` for Open Liberty or `smallrye-kafka` for Quarkus: + +[source,properties] +---- +# Shared broker address for all channels +mp.messaging.connector.liberty-kafka.bootstrap.servers=localhost:9092 + +# Incoming channel from Kafka +mp.messaging.incoming.order-created.connector=liberty-kafka +mp.messaging.incoming.order-created.topic=order-created-topic +mp.messaging.incoming.order-created.group.id=payment-service + +# Outgoing channel to Kafka +mp.messaging.outgoing.payment-authorized.connector=liberty-kafka +mp.messaging.outgoing.payment-authorized.topic=payment-authorized-topic +---- + +*Channel Names*: + +Channel names referenced in `@Incoming`, `@Outgoing`, and `@Channel` annotations must match the configuration property names: + +[source,java] +---- +@Incoming("order-created") // Matches mp.messaging.incoming.order-created.* +@Outgoing("payment-authorized") +public Order processPayment(Order order) { + // Processing logic + return order; +} +---- + +Internal channels do not require connector configuration. For communication between separate services or multiple server instances, configure an external broker such as Kafka. + +== Declarative Messaging with @Incoming and @Outgoing + +Declarative messaging is at the heart of MicroProfile Reactive Messaging. By using simple annotations, developers can define complex message-processing pipelines without writing boilerplate code for message handling, threading, or backpressure management. The framework handles these concerns automatically based on the method signatures. + +In the MicroProfile e-commerce store application,the checkout begins with a synchronous REST request, and then downstream fulfillment steps are expressed declaratively through channels such as `order-created`, `payment-authorized`, and `inventory-reserved`. + +=== Consuming Messages with @Incoming + +The `@Incoming` annotation marks a method as a consumer of messages from a specified channel. When messages arrive on that channel, the runtime automatically invokes the method. + +*Basic Message Consumption*: + +[source,java] +---- +@ApplicationScoped +public class PaymentRequestHandler { + + @Incoming("order-created") + @Outgoing("payment-authorized") + public Order processPayment(Order order) { + order.setStatus(OrderStatus.PAID); + return order; + } +} +---- + +In this flow, the `Order` is created first through REST. Once the Order Service emits an `order-created` event, `processPayment` consumes it, updates the order status to `PAID`, and returns the updated order to the `payment-authorized` channel. + +The runtime automatically acknowledges the message after the method returns successfully. If an exception is thrown, the message is negatively acknowledged and can be retried according to the connector's policy. + +The following signatures show the range of options available in MicroProfile Reactive Messaging. + +*Supported Method Signatures*: + +*1. Simple Payload (Automatic Acknowledgment)*: + +The runtime extracts the payload from the incoming message and passes it to the method. Acknowledgment is handled automatically: the message is acknowledged after the method returns normally, or negatively acknowledged if the method throws an exception. + +[source,java] +---- +@Incoming("items") +public void process(String item) { + // Process item + // Message is automatically acknowledged after method returns +} +---- + +Use this form when your processing is synchronous and you do not need access to message metadata or manual acknowledgment control. + +*2. Asynchronous Processing with CompletionStage*: + +`java.util.concurrent.CompletionStage` is used for asynchronous computation which may complete with a value or an exception. It can be chained using methods such as `thenApply`, `thenCompose`, and `exceptionally` to build non-blocking processing pipelines. + +[source,java] +---- +@Incoming("items") +public CompletionStage processAsync(String item) { + return processItemAsync(item); + // Message acknowledged when CompletionStage completes +} +---- + +When used as a method return type in MicroProfile Reactive Messaging, the message is ackno ledged only after the `CompletionStage` completes successfully, giving you control over when acknowledgment happens relative to your async work. + +If the `CompletionStage` completes exceptionally, the message is negatively acknowledged, allowing for retries or error handling based on your connector's configuration. + +*3. Message Wrapper for Manual Control*: + +`Message` is the core envelope type in MicroProfile Reactive Messaging, defined in the `org.eclipse.microprofile.reactive.messaging` package. It wraps the payload of type `T` alongside metadata and acknowledgment callbacks. Key methods include: + +* `getPayload()` — returns the typed payload. +* `ack()` — returns a `CompletionStage` that positively acknowledges the message, signalling to the broker that processing succeeded. +* `nack(Throwable)` — negatively acknowledges the message, allowing the connector to retry or dead-letter it depending on its configuration. +* `getMetadata(Class)` — retrieves optional connector-specific metadata such as topic, partition, or headers. + +[source,java] +---- +@Incoming("items") +public CompletionStage process(Message message) { + String payload = message.getPayload(); + + return processItem(payload) + .thenCompose(v -> message.ack()) + .exceptionally(throwable -> { + message.nack(throwable); + return null; + }); +} +---- + +Use `Message` when you need explicit acknowledgment control, access to metadata, or custom error handling beyond what automatic acknowledgment provides. + +*4. Reactive Streams Processing*: + +`Multi` is a reactive type from the Mutiny library (`io.smallrye.mutiny.Multi`) that represents an asynchronous stream of zero or more items. It implements the Reactive Streams `Publisher` contract and provides a rich set of operators for transforming, filtering, merging, and grouping streams in a composable, non-blocking way. Common operators include: + +* `map(Function)` — transforms each item to a new value. +* `filter(Predicate)` — passes only items matching the condition downstream. +* `flatMap(Function)` — maps each item to a `Multi` and merges the resulting streams. +* `group().intoLists().of(n)` — aggregates items into fixed-size lists. + +When used as both a parameter and return type on an `@Incoming` method, the runtime hands the entire incoming stream to the method, and the returned `Multi` becomes the processed output stream. This gives full reactive backpressure control across the pipeline. + +[source,java] +---- +@Incoming("items") +public Multi process(Multi items) { + return items + .map(String::toUpperCase) + .filter(item -> item.length() > 3); +} +---- + +The choice of signature depends on your processing requirements: + +* Use a simple payload for straightforward synchronous logic. +* Use `CompletionStage` for asynchronous operations such as REST calls or database work. +* Use `Message` wrapper when you need manual acknowledgment control or access to metadata. +* Use reactive types (`Multi`, `Publisher`) for stream transformations. + +=== Producing Messages with @Outgoing + +The `@Outgoing` annotation marks a method as a producer of messages to a specified channel. Messages produced to the channel can be consumed by other methods annotated with `@Incoming` or sent to external brokers. + +*Simple Value Production*: + +The `@Outgoing` method returns a single value directly. The runtime wraps the returned value in a `Message`, sends it to the channel, and then immediately calls the method again to produce the next item. This creates a continuous, demand-driven loop where the runtime controls the pace of production. + +[source,java] +---- +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +@Outgoing("prices") +public double generatePrice() { + return computeCurrentPrice(); +} +---- + +This method is invoked repeatedly, and each returned value is sent as a message to the `prices` channel. Use this form for sources such as price feeds, counters, or periodic computations that do not require explicit stream control. + +*Stream Production with Multi*: + +When an `@Outgoing` method returns a `Multi`, the runtime subscribes to the returned stream once and uses it as the message source for the channel's lifetime. The `Multi` itself drives the pace of emission and naturally supports backpressure. + +[source,java] +---- +import io.smallrye.mutiny.Multi; +import java.time.Duration; + +@Outgoing("heartbeat") +public Multi generateHeartbeat() { + return Multi.createFrom().ticks().every(Duration.ofSeconds(5)) + .map(tick -> "alive"); +} +---- + +This creates an infinite stream that emits `alive` every 5 seconds. + +This approach is preferred for event streams, periodic emitters, or any source where you want declarative control over the rate and volume of messages produced. + +*Periodic Production with Timestamps*: + +Returning a `Multi` is a common pattern when downstream consumers need to know exactly when each event was produced, for example, to calculate latency, order events, or drive time-based processing. + +[source,java] +---- +import java.time.Instant; + +@Outgoing("ticks") +public Multi generateTicks() { + return Multi.createFrom().ticks().every(Duration.ofSeconds(1)) + .map(tick -> Instant.now()); +} +---- + +`Multi.createFrom().ticks().every(Duration)` generates a monotonically incrementing counter at a fixed interval; mapping each tick to `Instant.now()` captures the wall-clock time at the moment of emission and sends it as the message payload. + +=== Message Processing Patterns + +MicroProfile Reactive Messaging supports flexible transformation patterns by combining `@Incoming` and `@Outgoing` annotations. + +*1. One-to-One Transformation (Mapping)*: + +Each incoming message is transformed and forwarded as exactly one outgoing message. + +The MicroProfile e-commerce store application uses this one-to-one `order-created` -> `payment-authorized` -> `inventory-reserved` workflow. This is the most common pattern for enriching, updating, or converting events between channels. + +[source,java] +---- +@Incoming("order-created") +@Outgoing("payment-authorized") +public Order processPayment(Order order) { + LOGGER.info(() -> "Processing payment for order " + order.getOrderId()); + order.setStatus(OrderStatus.PAID); + return order; +} +---- + +Each incoming `order-created` event produces exactly one outgoing message with the updated payment status. + +*2. One-to-Many Transformation (Splitting)*: + +Each incoming message produces multiple outgoing messages by returning a `Multi`, allowing a single event to fan out into independent downstream items. + +[source,java] +---- +@Incoming("purchase-orders") +@Outgoing("line-items") +public Multi splitOrder(PurchaseOrder order) { + return Multi.createFrom().iterable(order.getItems()); +} +---- + +This code snippet above splits one incoming purchase order into multiple line-item messages, with each item sent downstream separately. + +*3. Many-to-One Transformation (Aggregation)*: + +Multiple incoming messages are collected and combined into a single outgoing message, useful for batching, summarising, or reducing a stream of items into a consolidated result. + +[source,java] +---- +@Incoming("line-items") +@Outgoing("order-summaries") +public Multi aggregateOrderItems(Multi items) { + return items + .group().intoLists().of(5) + .map(batch -> { + BigDecimal total = batch.stream() + .map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity()))) + .reduce(BigDecimal.ZERO, BigDecimal::add); + + return new OrderSummary(batch.get(0).getOrderId(), total, batch.size()); + }); +} +---- + +The code snippet above groups incoming line items into batches of 5, calculates the total price for each batch, and produces an `OrderSummary` for each group. + +*4. Filtering Messages*: + +Selectively passes messages downstream based on a predicate, automatically acknowledging and discarding messages that do not meet the filter condition. + +[source,java] +---- +@Incoming("all-events") +@Outgoing("important-events") +public Multi filterEvents(Multi events) { + return events.filter(event -> event.getPriority() > 5); +} +---- + +In the above code snippet, only events meeting the filter criteria are passed downstream. Low-priority (less than 5) events are discarded and automatically acknowledged. + +*5. Asynchronous Enrichment*: + +Each incoming payload triggers a non-blocking asynchronous operation, such as a database lookup, and the result is forwarded downstream only after the `CompletionStage` completes successfully. + +[source,java] +---- +@Incoming("user-ids") +@Outgoing("users") +public CompletionStage enrichUser(String userId) { + return userRepository.findById(userId); +} +---- + +In the above code snippet, each user ID triggers an asynchronous database lookup, and the enriched `User` object is sent downstream when the lookup completes. + +*6. Conditional Routing (Dropping Messages)*: + +Inspects each message's payload and either forwards it downstream or explicitly acknowledges and drops it by returning `null`, preventing unwanted messages from reaching the output channel. + +[source,java] +---- +@Incoming("transactions") +@Outgoing("high-value-transactions") +public Message routeHighValue(Message message) { + Transaction tx = message.getPayload(); + + if (tx.getAmount() > 10000) { + return message; + } else { + message.ack(); + return null; + } +} +---- + +This example forwards only high-value transactions and drops lower-value ones after acknowledgment. + +*Stream Transformation Pipeline*: + +Chains multiple reactive operators such as normalize, filter, async enrich, and aggregate, into a single declarative pipeline that the framework executes with automatic backpressure and error propagation. + +[source,java] +---- +@Incoming("raw-data") +@Outgoing("processed-data") +public Multi processStream(Multi stream) { + return stream + .map(this::normalize) + .filter(this::isValid) + .onItem().transformToUniAndConcatenate(this::enrich) + .map(this::aggregate); +} +---- + +These patterns can be combined to build sophisticated message-processing pipelines declaratively, with the framework handling threading, backpressure, and error propagation automatically. + +*Guidelines*: + +* Use simple payload signatures for straightforward transformations. +* Use `Multi` when working with streams or requiring backpressure control. +* Use `CompletionStage` for async operations like database lookups or REST calls. +* Use `Message` when you need manual acknowledgment control or metadata access. +* Return `null` from `@Outgoing` methods to drop messages without forwarding. + +== Programmatic Messaging with @Channel and @Emitter + +In the MicroProfile e-commerce store, checkout still begins with a synchronous REST request. After the `order` service validates and persists the order, it can publish an `order-created` event so that payment, inventory, and shipment processing continue asynchronously. + +We use `@Channel` in the code for the direct access to a named stream, and use `Emitter` when imperative code such as a REST endpoint needs to publish into that stream. + +=== Using @Channel to Access Channels + +The `@Channel` annotation allows developers to inject a channel directly into a CDI bean for programmatic access to the message stream. This is useful when you want to observe a stream, adapt it to another reactive API, or integrate it with existing imperative code. + +[source,java] +---- +//.. + +@ApplicationScoped +public class PaymentEventMonitor { + + @Inject + @Channel("payment-authorized") + Multi paymentEvents; + + public void displayAuthorizedPayments() { + paymentEvents.subscribe().with( + order -> System.out.println( + "Payment authorized for order " + order.getOrderId()) + ); + } +} +---- + +In this example, the bean observes the `payment-authorized` stream after the payment service emits the next fulfillment event. + +The injected type depends on the stream's cardinality: + +* `Multi` for streams of multiple elements +* `Publisher` for Reactive Streams Publisher +* `PublisherBuilder` for Reactive Streams operators + +=== Using @Emitter for Programmatic Message Emission + +An `Emitter` provides a way to programmatically send messages to a channel from imperative code, such as REST endpoints, scheduled tasks, or event handlers. In the store application, this is the natural bridge between the synchronous checkout request and the asynchronous fulfillment pipeline. + +[source,java] +---- +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; + +import java.util.concurrent.CompletionStage; + +@ApplicationScoped +public class OrderEventPublisher { + + @Inject + @Channel("order-created") + Emitter orderCreatedEmitter; + + public CompletionStage publishOrderCreated(Order order) { + return orderCreatedEmitter.send(order); + } +} +---- + +*Emitter Features*: + +* *Payload Sending*: `send(T payload)` sends a payload and returns `CompletionStage`. The `CompletionStage` completes when the emitted message is acknowledged. If processing fails, the `CompletionStage` completes exceptionally. +* *Message Sending*: `send(Message message)` sends a `Message` object and returns `void`. Use this form when you need custom acknowledgment or negative-acknowledgment functions. +* *Backpressure Aware*: Emitters respect backpressure from downstream consumers, preventing imperative producers from overwhelming slow processors. See the "Managing Backpressure" section for detailed backpressure control with `@OnOverflow`. + +*Emitter with Payload Acknowledgment*: + +Sends a raw payload to the channel and returns a `CompletionStage` that completes when the message is acknowledged, allowing you to chain success or error handling logic. + +[source,java] +---- +@Inject +@Channel("order-created") +Emitter orderCreatedEmitter; + +public CompletionStage publishOrderCreated(Order order) { + return orderCreatedEmitter.send(order) + .thenAccept(v -> log.info("Order-created event sent successfully")) + .exceptionally(throwable -> { + log.error("Failed to publish order-created event", throwable); + return null; + }); +} +---- + +*Emitter with Message and Custom Acknowledgment*: + +Wraps the payload in a `Message` with explicit acknowledgment and negative-acknowledgment callbacks, giving you full control over success and failure handling after the broker processes the message. + +[source,java] +---- +@Inject +@Channel("order-created") +Emitter orderCreatedEmitter; + +public void publishOrderCreatedWithAck(Order order) { + orderCreatedEmitter.send(Message.of(order, + () -> { + log.info("Order acknowledged: " + order.getOrderId()); + return CompletableFuture.completedFuture(null); + }, + throwable -> { + log.error("Order nacked: " + order.getOrderId(), throwable); + return CompletableFuture.completedFuture(null); + } + )); +} +---- + +*Use Cases for @Emitter*: + +* *REST Endpoints*: Turning a synchronous request into an event after validation and persistence succeed. +* *Scheduled Tasks*: Periodically generating messages based on `@Scheduled` timers or cron expressions. +* *External Integrations*: Forwarding events from external systems such as webhooks or file watchers to message channels. +* *Event Sourcing*: Publishing domain events to event stores or Kafka topics after state changes are committed. +* *Testing*: Injecting test messages into channels programmatically during integration tests. + +*Example: REST to Messaging Bridge*: + +[source,java] +---- +import jakarta.inject.Inject; +import jakarta.ws.rs.*; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; + +@Path("/orders") +@ApplicationScoped +public class OrderResource { + + @Inject + OrderService orderService; + + @Inject + @Channel("order-created") + Emitter orderCreatedEmitter; + + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response createOrder(Order order) { + Order persisted = orderService.create(order); + orderCreatedEmitter.send(persisted); + return Response.accepted(persisted).build(); + } +} +---- + +This pattern decouples the REST layer from the downstream processing layer. The request is still validated and persisted synchronously, and the emitted `order-created` event then starts the payment, inventory, and shipment workflow in a separate messaging pipeline. + +== Managing Backpressure in Reactive Messaging + +Backpressure is one of the most critical challenges in reactive systems. It occurs when message producers generate data faster than consumers can process it. Without proper handling, this mismatch can lead to memory exhaustion, dropped messages, or system failures. + +=== Understanding Backpressure + +In a messaging system, backpressure scenarios arise from: + +* *Fast Producer, Slow Consumer*: A Kafka topic receives thousands of messages per second, but the consumer can only process 100 per second due to database write latency. +* *Bursty Traffic*: Periodic spikes in message volume (for example, end-of-month batch jobs, flash sales) overwhelm downstream processors. +* *Resource Constraints*: Limited database connections, network bandwidth, or CPU capacity become bottlenecks during message processing. + +*Traditional Approaches and Their Problems*: + +1. *Unbounded Buffers*: Queue all messages in memory without limits, leading to `OutOfMemoryError` when the queue grows faster than it drains. +2. *Dropping Messages*: Silently discard excess messages to prevent buffer overflow, causing data loss and inconsistent state. +3. *Blocking Producers*: Block producers until consumers catch up, reducing overall throughput and potentially causing cascading failures across distributed systems. + +Reactive Streams' request-based flow control provides a superior solution: consumers explicitly signal how many elements they can handle through the `Subscription.request(n)` method, and producers respect these demand signals by never sending more than requested. + +=== Automatic Backpressure Management + +MicroProfile Reactive Messaging automatically manages backpressure through the Reactive Streams protocol. The framework handles demand signaling and flow control behind the scenes based on your method signatures. + +*Sequential Processing Pattern*: + +[source,java] +---- +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +@Incoming("data") +@Outgoing("processed-data") +public String process(String data) { + return transform(data); +} +---- + +The framework automatically: + +1. Requests one element from the upstream channel +2. Waits for the `process` method to complete +3. Sends the result downstream +4. Requests the next element only after the current one is fully processed + +This sequential processing naturally provides backpressure: the upstream producer will not send more data until the current item is processed. If `transform(data)` takes 100ms, the consumer processes at most 10 messages per second, and the producer adapts to this rate. + +*Asynchronous Processing with Backpressure*: + +[source,java] +---- +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CompletableFuture; + +@Incoming("tasks") +@Outgoing("results") +public CompletionStage processAsync(Task task) { + return CompletableFuture.supplyAsync(() -> + expensiveOperation(task) + ); +} +---- + +The framework waits for each `CompletionStage` to complete before requesting the next message. This ensures backpressure is maintained even when processing happens asynchronously on different threads or involves I/O operations. + +*Stream Processing with Reactive Types*: + +[source,java] +---- +import io.smallrye.mutiny.Multi; + +@Incoming("events") +@Outgoing("filtered-events") +public Multi filterStream(Multi events) { + return events + .filter(event -> event.isImportant()) + .map(this::enrich); +} +---- + +When using reactive types like `Multi`, backpressure propagates automatically through the entire chain of operators. If a downstream subscriber can only handle 10 items, the `filter` and `map` operators request only what's needed from upstream. + +=== Configuring Backpressure Behavior + +While automatic backpressure works well for many scenarios, you can configure specific behaviors for advanced requirements: + +*Buffering Strategy*: + +Configure buffer sizes for incoming channels to smooth out temporary speed mismatches: + +[source,properties] +---- +# Allow producer to send up to 256 messages ahead +mp.messaging.incoming.data.buffer-size=256 +---- + +This creates a buffer of 256 messages between the connector and your consumer method. The producer can send messages at its natural rate while the consumer processes at its own pace, as long as the buffer doesn't fill completely. + +*Choosing Buffer Size*: + +* *Small buffers (10-50)*: Tight backpressure control, minimal memory usage, but frequent demand signals +* *Medium buffers (100-500)*: Good balance for most scenarios with occasional bursts +* *Large buffers (1000+)*: Tolerate significant bursts, but consume more memory and delay backpressure signals + +*Concurrent Processing*: + +Process multiple messages concurrently while maintaining overall backpressure control: + +[source,java] +---- +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; + +@Incoming("tasks") +@Outgoing("results") +public Multi processConcurrent(Multi tasks) { + return tasks + .onItem().transformToUniAndMerge(task -> + Uni.createFrom().completionStage(() -> + CompletableFuture.supplyAsync(() -> process(task)) + ), + 3 // Process up to 3 messages concurrently + ); +} +---- + +The concurrency limit (3 in this example) ensures that no more than 3 messages are processed simultaneously. This provides controlled parallelism for I/O-bound operations while preventing resource exhaustion. + +*Batching for Throughput*: + +Process messages in batches to improve throughput for operations that benefit from bulk processing: + +[source,java] +---- +import io.smallrye.mutiny.Multi; +import java.time.Duration; + +@Incoming("events") +@Outgoing("batched-events") +public Multi> batch(Multi events) { + return events + .group().intoLists().of(100, Duration.ofSeconds(5)); +} +---- + +This batches up to 100 events or waits up to 5 seconds, whichever comes first. Batching is particularly effective for: + +* Bulk database inserts +* Batch API calls to external services +* Aggregated analytics computations + +*Rate Limiting*: + +Control the maximum processing rate to prevent overwhelming downstream systems: + +[source,java] +---- +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import java.time.Duration; + +@Incoming("high-volume") +@Outgoing("rate-limited") +public Multi rateLimit(Multi stream) { + return stream + .group().intoLists().of(10) // Batch 10 items + .onItem().call(batch -> // Pause between batches + Uni.createFrom().nullItem() + .onItem().delayIt().by(Duration.ofMillis(100)) + ) + .onItem().transformToMultiAndConcatenate(Multi::createFrom().iterable); +} +---- + +This processes at most 100 messages per second (10 messages every 100ms), protecting downstream services from overload. + +=== Controlling Backpressure with @OnOverflow + +When using `@Emitter` to programmatically send messages to channels, the `@OnOverflow` annotation controls what happens when messages are emitted faster than downstream consumers can process them. Without this annotation, the default strategy is `OnOverflow.Strategy.BUFFER` with a buffer size of 128 elements (or the value of the `mp.messaging.emitter.default-buffer-size` configuration property). + +[source,java] +---- +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.OnOverflow; +import jakarta.inject.Inject; + +@Inject +@Channel("orders") +@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 300) +Emitter orderEmitter; +---- + +*Available Overflow Strategies*: + +[cols="1,3"] +|=== +|Strategy |Behavior + +|`BUFFER` +|Buffer messages up to `bufferSize` elements. If `bufferSize` is not set, the `mp.messaging.emitter.default-buffer-size` property is used (default: 128). Throws `java.lang.IllegalStateException` from `send()` when the buffer is full. + +|`UNBOUNDED_BUFFER` +|Use an unbounded buffer. Messages accumulate in memory without limit. The application may run out of memory if messages accumulate faster than they are consumed. Use only when you can guarantee consumption keeps pace. + +|`THROW_EXCEPTION` +|Throw `IllegalStateException` from `send()` immediately if the downstream cannot keep up. No buffering occurs. + +|`DROP` +|Drop the most recently emitted message when the downstream cannot keep up. Previously buffered messages are retained. This strategy prioritizes older messages. + +|`FAIL` +|Propagate a failure signal downstream when backpressure is detected. The emitter stops accepting new messages and the stream terminates. + +|`LATEST` +|Keep only the most recent message, discarding all previously buffered messages when the downstream cannot keep up. This strategy prioritizes the newest data. + +|`NONE` +|Ignore backpressure signals completely. The downstream consumer must implement its own buffering strategy. Use with caution as this can lead to unbounded memory growth. +|=== + +*Choosing the Right Strategy*: + +* *`BUFFER`*: Best for most scenarios where occasional slowdowns are acceptable and you want to preserve all messages. Good for business transactions, orders, or events that must not be lost. +* *`DROP` or `LATEST`*: Appropriate for high-frequency sensor data, metrics, or real-time dashboards where missing some values is acceptable and recent data is more valuable than complete history. +* *`FAIL`*: Use when any backpressure indicates a system problem that should halt processing and trigger alerts. Appropriate for critical systems where degraded performance should stop operations. +* *`THROW_EXCEPTION`*: Provides immediate feedback to the caller that the system is overloaded. Useful in REST endpoints to return HTTP 503 Service Unavailable when the system cannot accept more requests. +* *`UNBOUNDED_BUFFER`*: Rarely recommended. Only use in controlled environments with guaranteed bounded input (for example, processing a fixed-size file). + +*Examples*: + +[source,java] +---- +// High-frequency sensor data where latest value matters most +@Inject +@Channel("temperature-readings") +@OnOverflow(OnOverflow.Strategy.LATEST) +Emitter temperatureEmitter; + +// Critical transactions that must all be processed +@Inject +@Channel("payments") +@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 1000) +Emitter paymentEmitter; + +// REST endpoint that should fail fast when overloaded +@Inject +@Channel("api-requests") +@OnOverflow(OnOverflow.Strategy.THROW_EXCEPTION) +Emitter requestEmitter; + +@POST +@Path("/requests") +public Response handleRequest(ApiRequest request) { + try { + requestEmitter.send(request); + return Response.accepted().build(); + } catch (IllegalStateException e) { + return Response.status(503).entity("System overloaded").build(); + } +} +---- + +When the `@OnOverflow` annotation is absent, the `BUFFER` strategy with default buffer size applies automatically. This provides a sensible default for most applications while allowing fine-grained control when needed. + +== Integration with Other MicroProfile Specifications + +MicroProfile Reactive Messaging integrates seamlessly with other MicroProfile specifications, enabling comprehensive solutions for building resilient, observable, configurable, and secure microservices. + +=== MicroProfile Fault Tolerance Integration + +MicroProfile Fault Tolerance complements reactive messaging by adding `@Retry`, `@Timeout`, `@CircuitBreaker`, `@Bulkhead`, and `@Fallback` to message-processing methods. This is covered in the `Message Acknowledgment and Error Handling` section. + +=== MicroProfile Metrics Integration + +When MicroProfile Reactive Messaging 3.0.1 is used in an environment where MicroProfile Metrics is available, implementations automatically produce the following metrics in the `base` scope: + +[cols="2,1,1,2"] +|=== +|Metric Name |Type |Unit |Description + +|`mp.messaging.message.count` +|Counter +|None +|The number of messages processed. Includes a `channel` tag identifying the channel name. +|=== + +Example metric output: +---- +mp_messaging_message_count_total{channel="orders"} 1523 +mp_messaging_message_count_total{channel="notifications"} 847 +---- + +These metrics require no additional code or configuration and are automatically exposed via the MicroProfile Metrics endpoint (typically `/metrics`). + +=== MicroProfile Health Integration + +MicroProfile Health enables startup, liveness and readiness checkes for messaging components, essential for container orchestration platforms like Kubernetes. + +=== MicroProfile Config Integration + +MicroProfile Config enables externalized configuration for messaging channels, connectors, and application properties, supporting multiple configuration sources with defined precedence. + +== Reactive Stream Operations + +Reactive stream operations provide powerful tools for transforming, filtering, and combining message streams. MicroProfile Reactive Messaging implementations support reactive operators through libraries like Mutiny, enabling functional-style stream processing with backpressure management. + +Reactive stream operations enable building sophisticated message processing pipelines declaratively while maintaining backpressure and resource efficiency throughout the entire flow. + +== Graph Validation and Deployment Checks + +When the application starts, the MicroProfile Reactive Messaging implementation constructs a directed graph connecting all `@Incoming` and `@Outgoing` methods, `Emitter` instances, `@Channel` injections, and connectors. The implementation validates this graph and throws a `DeploymentException` if any of the following invalid conditions exist: + +*Invalid Channel Configurations*: + +* A method annotated with `@Incoming` has no upstream channel +* A method annotated with `@Outgoing` has no downstream channel +* A method annotated with `@Incoming` has multiple upstream channels (fan-in not supported) +* A method annotated with `@Outgoing` has multiple downstream channels (fan-out requires multiple methods) +* An `Emitter` has no downstream channel +* An `Emitter` has multiple downstream channels +* An injected `@Channel` has no upstream channel +* An injected `@Channel` has multiple upstream channels + +*Invalid Connector Configurations*: + +* The application references a connector that is not available in the runtime +* An incoming connector has no downstream channel +* An incoming connector has multiple downstream channels +* An outgoing connector has no upstream channel +* An outgoing connector has multiple upstream channels + +*Channel Uniqueness Constraint* + +Each channel name may appear in at most **one** `@Incoming` annotation across the entire application. If two methods are annotated with `@Incoming("same-channel")`, the implementation treats this as multiple downstream consumers for a single channel, which violates the specification and causes a `DeploymentException` at startup. + +The same constraint applies to `@Outgoing`: a given channel name may appear in at most **one** `@Outgoing` annotation. + +This design enforces a clear, unambiguous message flow graph. + +== Summary + +MicroProfile Reactive Messaging provides a powerful, standardized approach to building Event-Driven microservices. By leveraging reactive streams, CDI integration, and declarative annotations, developers can create scalable, resilient messaging applications with minimal boilerplate code. + +This chapter explored the foundations of event-driven architecture, which enables loose coupling between microservices, horizontal scalability, and real-time event processing across distributed systems. At the core of reactive messaging is the Reactive Streams specification, which uses the Publisher/Subscriber model with request-based backpressure to prevent resource exhaustion and ensure efficient stream processing even under high load. Through seamless CDI integration, the specification mandates support for `@ApplicationScoped` and `@Dependent` scopes, enabling messaging beans to integrate naturally with other Jakarta EE components and benefit from dependency injection and lifecycle management. + +The declarative messaging model with `@Incoming` and `@Outgoing` annotations eliminates boilerplate code for message routing and transformation, allowing developers to focus on business logic rather than infrastructure concerns. When declarative patterns are insufficient, `@Channel` and `@Emitter` provide imperative message sending capabilities, particularly useful for bridging REST endpoints to message streams. The framework handles backpressure automatically through reactive operators, configurable buffer sizes, and `@OnOverflow` strategies, with `BUFFER` defaulting to 128 elements to prevent overwhelming downstream consumers. + +Reliable message processing is ensured through multiple acknowledgment strategies including `POST_PROCESSING`, `PRE_PROCESSING`, `MANUAL`, and `NONE`, along with comprehensive error handling patterns that support dead letter queues and retry policies. The runtime performs graph validation at startup, throwing `DeploymentException` for misconfigured channels to ensure failures are detected before production deployment rather than at runtime. When MicroProfile Metrics is available, the implementation automatically produces metrics per channel such as `mp.messaging.message.count`, complemented by integration with MicroProfile Health for liveness and readiness probes. + +The specification's seamless integration with other MicroProfile specifications enhances the overall capabilities of reactive messaging applications. MicroProfile Fault Tolerance provides retry, circuit breaker, and timeout patterns for resilience; Config enables externalized configuration for different environments; Metrics supports both automatic and custom observability; Health integrates with Kubernetes for readiness probes; and OpenAPI documents REST endpoints that bridge to messaging systems. The connector ecosystem, supporting Kafka, AMQP, JMS, and other brokers, handles protocol-specific details, serialization, and connection management, freeing applications to focus purely on message processing logic. + +To continue your journey with MicroProfile Reactive Messaging, explore the complete https://download.eclipse.org/microprofile/microprofile-reactive-messaging-3.0.1/microprofile-reactive-messaging-spec-3.0.1.html[MicroProfile Reactive Messaging 3.0.1 Specification] for detailed API documentation and conformance requirements. Study connector-specific features for your chosen message broker in your runtime's documentation. Leverage the integration points with other MicroProfile specifications such as Fault Tolerance for resilience, Config for externalization, Metrics for observability, and Health for container orchestration. + +By mastering these concepts and patterns, you can build modern, reactive microservices that handle high-volume data streams efficiently and reliably in production environments, taking full advantage of the reactive programming model to create responsive, resilient, and scalable distributed systems. + +This concludes the MicroProfile tutorial. You are now equipped with the foundational knowledge to build robust, cloud-native microservices using the MicroProfile specification. Thank you for following along, and happy coding!