Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@
<includes>
<include>src/main/java/com/rabbitmq/client/ConnectionFactory.java</include>
<include>src/main/java/com/rabbitmq/client/PemReader.java</include>
<include>src/main/java/com/rabbitmq/client/WriteListener.java</include>
<include>src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java</include>
<include>src/main/java/com/rabbitmq/client/impl/Environment.java</include>
<include>src/main/java/com/rabbitmq/client/observation/**/*.java</include>
Expand All @@ -797,6 +798,8 @@
<include>src/test/java/com/rabbitmq/client/test/ProtocolVersionMismatch.java</include>
<include>src/test/java/com/rabbitmq/client/test/TestUtils.java</include>
<include>src/test/java/com/rabbitmq/client/test/RpcTopologyRecordingTest.java</include>
<include>src/test/java/com/rabbitmq/client/test/PublishWithByteBufferTest.java</include>
<include>src/test/java/com/rabbitmq/client/test/ByteBufferPublishTest.java</include>
</includes>
<googleJavaFormat>
<version>${google-java-format.version}</version>
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/com/rabbitmq/client/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.rabbitmq.client.AMQP.*;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -299,6 +300,48 @@ void basicPublish(String exchange, String routingKey, boolean mandatory, BasicPr
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;

/**
* Publish a message with a {@link ByteBuffer} body and a {@link WriteListener} for
* write completion notification.
*
* <p>The buffer's content between its current position and its limit will be sent.
*
* <p>The listener is called when the network layer has finished writing the body to the
* socket. This is useful for off-heap buffers that must be released after the write completes.
*
* <p>Using this method will only benefit workloads using off-heap {@link ByteBuffer}s and the Netty
* IO layer. Other workloads can stick to the basicPublish variants that use an array of bytes
* for message payloads.
*
* <p><strong>Threading:</strong> the listener may be called on the Netty event loop thread.
* It must not perform blocking operations.
*
* <p>This API is experimental and is susceptible to change at any time.
*
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param mandatory true if the 'mandatory' flag is to be set
* @param immediate true if the 'immediate' flag is to be
* set. Note that the RabbitMQ server does not support this flag.
* @param props other properties for the message - routing headers etc
* @param body the message body
* @param listener called when the write completes, may be {@code null}
* @throws java.io.IOException if an error is encountered
* @since 5.30.0
*/
default void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, ByteBuffer body, WriteListener listener)
throws IOException {
byte[] bytes = null;
if (body != null) {
bytes = new byte[body.remaining()];
body.get(bytes);
}
basicPublish(exchange, routingKey, mandatory, immediate, props, bytes);
if (listener != null) {
listener.done(true, null);
}
}

/**
* Actively declare a non-autodelete, non-durable exchange with no extra arguments
* @see com.rabbitmq.client.AMQP.Exchange.Declare
Expand Down
44 changes: 44 additions & 0 deletions src/main/java/com/rabbitmq/client/WriteListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2007-2026 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.
package com.rabbitmq.client;

/**
* Listener notified when the network layer has finished writing a message body.
*
* <p>This is primarily useful when publishing with an off-heap {@link java.nio.ByteBuffer}: the
* callback tells the caller when the buffer is no longer in use and can be safely released.
*
* <p>This only applies when using Netty as the IO layer and is unnecessary when using the blocking
* IO layer.
*
* <p><strong>Threading:</strong> the callback may be invoked on the Netty event loop thread.
* Implementations must not perform blocking operations.
*
* <p>This API is experimental and is susceptible to change at any time.
*
* @since 5.30.0
*/
@FunctionalInterface
public interface WriteListener {

/**
* Called when the write operation completes.
*
* @param success {@code true} if the data was written to the socket successfully
* @param cause the exception if the write failed, {@code null} on success
*/
void done(boolean success, Throwable cause);
}
63 changes: 35 additions & 28 deletions src/main/java/com/rabbitmq/client/impl/AMQCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Command;
import com.rabbitmq.client.WriteListener;

/**
* AMQP 0-9-1-specific implementation of {@link Command} which accumulates
Expand All @@ -46,44 +48,49 @@ public class AMQCommand implements Command {
/** The assembler for this command - synchronised on - contains all the state */
private final CommandAssembler assembler;
private final Lock assemblerLock = new ReentrantLock();
private final WriteListener writeListener;

/** Construct a command for inbound frame assembly with a max body length. */
AMQCommand(int maxBodyLength) {
this(null, null, null, maxBodyLength);
this.assembler = new CommandAssembler(null, null, maxBodyLength);
this.writeListener = null;
}

/** Construct a command ready to fill in by reading frames */
/** Construct a command ready to fill in by reading frames. */
public AMQCommand() {
this(null, null, null, Integer.MAX_VALUE);
this(Integer.MAX_VALUE);
}

/**
* Construct a command with just a method, and without header or body.
* @param method the wrapped method
*/
public AMQCommand(com.rabbitmq.client.Method method) {
this(method, null, null, Integer.MAX_VALUE);
this.assembler = new CommandAssembler((Method) method, null, Integer.MAX_VALUE);
this.writeListener = null;
}

/**
* Construct a command with a specified method, header and body.
* Construct a command with a ByteBuffer body for transmission.
* @param method the wrapped method
* @param contentHeader the wrapped content header
* @param body the message body data
* @param body the message body as a ByteBuffer
*/
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body) {
this.assembler = new CommandAssembler((Method) method, contentHeader, body, Integer.MAX_VALUE);
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, ByteBuffer body) {
this(method, contentHeader, body, null);
}

/**
* Construct a command with a specified method, header and body.
* Construct a command with a ByteBuffer body and a write completion listener.
* @param method the wrapped method
* @param contentHeader the wrapped content header
* @param body the message body data
* @param maxBodyLength the maximum size for an inbound message body
* @param body the message body as a ByteBuffer
* @param writeListener called when the network layer finishes writing, may be {@code null}
*/
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body,
int maxBodyLength) {
this.assembler = new CommandAssembler((Method) method, contentHeader, body, maxBodyLength);
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, ByteBuffer body,
WriteListener writeListener) {
this.assembler = new CommandAssembler((Method) method, contentHeader, body);
this.writeListener = writeListener;
}

/** Public API - {@inheritDoc} */
Expand Down Expand Up @@ -122,13 +129,13 @@ public void transmit(AMQChannel channel) throws IOException {
try {
Method m = this.assembler.getMethod();
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();

Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
ByteBuffer body = this.assembler.getByteBufferBody();
int bodySize = body == null ? 0 : body.remaining();
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, bodySize);

int frameMax = connection.getFrameMax();
boolean cappedFrameMax = frameMax > 0;
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : bodySize;

if (cappedFrameMax && headerFrame.size() > frameMax) {
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
Expand All @@ -137,14 +144,14 @@ public void transmit(AMQChannel channel) throws IOException {
connection.writeFrame(m.toFrame(channelNumber));
connection.writeFrame(headerFrame);

for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;

int fragmentLength = (remaining < bodyPayloadMax) ? remaining
: bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body,
offset, fragmentLength);
connection.writeFrame(frame);
if (body != null) {
int bodyPosition = body.position();
for (int offset = 0; offset < bodySize; offset += bodyPayloadMax) {
int remaining = bodySize - offset;
int fragmentLength = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body, bodyPosition + offset, fragmentLength);
connection.writeFrame(frame);
}
}
} else {
connection.writeFrame(m.toFrame(channelNumber));
Expand All @@ -153,7 +160,7 @@ public void transmit(AMQChannel channel) throws IOException {
assemblerLock.unlock();
}

connection.flush();
connection.flush(this.writeListener);
}

@Override public String toString() {
Expand Down Expand Up @@ -200,7 +207,7 @@ public static void checkPreconditions() {
* code in Frame.
*/
private static void checkEmptyFrameSize() {
Frame f = new Frame(AMQP.FRAME_BODY, 0, new byte[0]);
Frame f = new Frame(AMQP.FRAME_BODY, 0, ByteBuffer.wrap(new byte[0]));
ByteArrayOutputStream s = new ByteArrayOutputStream();
try {
f.writeTo(new DataOutputStream(s));
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/com/rabbitmq/client/impl/AMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -655,8 +655,12 @@ void writeFrame(Frame f) throws IOException {
* Public API - flush the output buffers
*/
public void flush() throws IOException {
flush(null);
}

void flush(WriteListener listener) throws IOException {
try {
_frameHandler.flush();
_frameHandler.flush(listener);
} catch (IOException ioe) {
this.errorOnWriteListener.handle(this, ioe);
}
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/com/rabbitmq/client/impl/ChannelN.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -714,6 +715,18 @@ public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate,
BasicProperties props, byte[] body)
throws IOException
{
basicPublish(exchange, routingKey, mandatory, immediate, props,
body == null ? null : ByteBuffer.wrap(body), null);
}

/** Public API - {@inheritDoc} */
@Override
public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate,
BasicProperties props, ByteBuffer body,
WriteListener listener)
throws IOException
{
final long deliveryTag;
if (nextPublishSeqNo > 0) {
Expand All @@ -734,7 +747,7 @@ public void basicPublish(String exchange, String routingKey,
.build();
try {
ObservationCollector.PublishCall publishCall = properties -> {
AMQCommand command = new AMQCommand(publish, properties, body);
AMQCommand command = new AMQCommand(publish, properties, body, listener);
transmit(command);
};
observationCollector.publish(publishCall, publish, props, body, this.connectionInfo());
Expand Down
36 changes: 34 additions & 2 deletions src/main/java/com/rabbitmq/client/impl/CommandAssembler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.rabbitmq.client.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -50,20 +51,41 @@ private enum CAState {
/** sum of the lengths of all fragments */
private int bodyLength;

/** Zero-copy body buffer, mutually exclusive with bodyN usage on the outbound path */
private final ByteBuffer byteBufferBody;

/** No bytes of content body not yet accumulated */
private long remainingBodyBytes;

private final int maxBodyLength;

public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body,
public CommandAssembler(Method method, AMQContentHeader contentHeader,
int maxBodyLength) {
this.method = method;
this.contentHeader = contentHeader;
this.bodyN = new ArrayList<>(2);
this.bodyLength = 0;
this.byteBufferBody = null;
this.remainingBodyBytes = 0;
this.maxBodyLength = maxBodyLength;
appendBodyFragment(body);
if (method == null) {
this.state = CAState.EXPECTING_METHOD;
} else if (contentHeader == null) {
this.state = method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
} else {
this.remainingBodyBytes = contentHeader.getBodySize() - this.bodyLength;
updateContentBodyState();
}
}

public CommandAssembler(Method method, AMQContentHeader contentHeader, ByteBuffer body) {
this.method = method;
this.contentHeader = contentHeader;
this.bodyN = new ArrayList<>(0);
this.bodyLength = body == null ? 0 : body.remaining();
this.byteBufferBody = body;
this.remainingBodyBytes = 0;
this.maxBodyLength = Integer.MAX_VALUE;
if (method == null) {
this.state = CAState.EXPECTING_METHOD;
} else if (contentHeader == null) {
Expand Down Expand Up @@ -151,9 +173,19 @@ private byte[] coalesceContentBody() {
}

public synchronized byte[] getContentBody() {
if (byteBufferBody != null) {
ByteBuffer dup = byteBufferBody.duplicate();
byte[] result = new byte[dup.remaining()];
dup.get(result);
return result;
}
return coalesceContentBody();
}

public ByteBuffer getByteBufferBody() {
return this.byteBufferBody;
}

private void appendBodyFragment(byte[] fragment) {
if (fragment == null || fragment.length == 0) return;
bodyN.add(fragment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void run() {

if (now > (lastActivityTime + this.heartbeatNanos)) {
frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
frameHandler.flush();
frameHandler.flush(null);
}
} catch (IOException e) {
// ignore
Expand Down
Loading
Loading