Skip to content

Commit b06aa95

Browse files
committed
Add basicPublish with ByteBuffer parameter for message body
Instead of body[]. This targets workloads that have message payloads that sits in off-heap memory. By using Netty it guarantees memory will stay off-heap.
1 parent 09d03be commit b06aa95

22 files changed

Lines changed: 563 additions & 81 deletions

pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,7 @@
784784
<includes>
785785
<include>src/main/java/com/rabbitmq/client/ConnectionFactory.java</include>
786786
<include>src/main/java/com/rabbitmq/client/PemReader.java</include>
787+
<include>src/main/java/com/rabbitmq/client/WriteListener.java</include>
787788
<include>src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java</include>
788789
<include>src/main/java/com/rabbitmq/client/impl/Environment.java</include>
789790
<include>src/main/java/com/rabbitmq/client/observation/**/*.java</include>
@@ -797,6 +798,8 @@
797798
<include>src/test/java/com/rabbitmq/client/test/ProtocolVersionMismatch.java</include>
798799
<include>src/test/java/com/rabbitmq/client/test/TestUtils.java</include>
799800
<include>src/test/java/com/rabbitmq/client/test/RpcTopologyRecordingTest.java</include>
801+
<include>src/test/java/com/rabbitmq/client/test/PublishWithByteBufferTest.java</include>
802+
<include>src/test/java/com/rabbitmq/client/test/ByteBufferPublishTest.java</include>
800803
</includes>
801804
<googleJavaFormat>
802805
<version>${google-java-format.version}</version>

src/main/java/com/rabbitmq/client/Channel.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.rabbitmq.client.AMQP.*;
2020

2121
import java.io.IOException;
22+
import java.nio.ByteBuffer;
2223
import java.util.Map;
2324
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.TimeoutException;
@@ -299,6 +300,48 @@ void basicPublish(String exchange, String routingKey, boolean mandatory, BasicPr
299300
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
300301
throws IOException;
301302

303+
/**
304+
* Publish a message with a {@link ByteBuffer} body and a {@link WriteListener} for
305+
* write completion notification.
306+
*
307+
* <p>The buffer's content between its current position and its limit will be sent.
308+
*
309+
* <p>The listener is called when the network layer has finished writing the body to the
310+
* socket. This is useful for off-heap buffers that must be released after the write completes.
311+
*
312+
* <p>Using this method will only benefit workloads using off-heap {@link ByteBuffer}s and the Netty
313+
* IO layer. Other workloads can stick to the basicPublish variants that use an array of bytes
314+
* for message payloads.
315+
*
316+
* <p><strong>Threading:</strong> the listener may be called on the Netty event loop thread.
317+
* It must not perform blocking operations.
318+
*
319+
* <p>This API is experimental and is susceptible to change at any time.
320+
*
321+
* @param exchange the exchange to publish the message to
322+
* @param routingKey the routing key
323+
* @param mandatory true if the 'mandatory' flag is to be set
324+
* @param immediate true if the 'immediate' flag is to be
325+
* set. Note that the RabbitMQ server does not support this flag.
326+
* @param props other properties for the message - routing headers etc
327+
* @param body the message body
328+
* @param listener called when the write completes, may be {@code null}
329+
* @throws java.io.IOException if an error is encountered
330+
* @since 5.30.0
331+
*/
332+
default void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, ByteBuffer body, WriteListener listener)
333+
throws IOException {
334+
byte[] bytes = null;
335+
if (body != null) {
336+
bytes = new byte[body.remaining()];
337+
body.get(bytes);
338+
}
339+
basicPublish(exchange, routingKey, mandatory, immediate, props, bytes);
340+
if (listener != null) {
341+
listener.done(true, null);
342+
}
343+
}
344+
302345
/**
303346
* Actively declare a non-autodelete, non-durable exchange with no extra arguments
304347
* @see com.rabbitmq.client.AMQP.Exchange.Declare
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright (c) 2007-2026 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Java client library, is triple-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
6+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
7+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
8+
// please see LICENSE-APACHE2.
9+
//
10+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
11+
// either express or implied. See the LICENSE file for specific language governing
12+
// rights and limitations of this software.
13+
//
14+
// If you have any questions regarding licensing, please contact us at
15+
// info@rabbitmq.com.
16+
package com.rabbitmq.client;
17+
18+
/**
19+
* Listener notified when the network layer has finished writing a message body.
20+
*
21+
* <p>This is primarily useful when publishing with an off-heap {@link java.nio.ByteBuffer}: the
22+
* callback tells the caller when the buffer is no longer in use and can be safely released.
23+
*
24+
* <p>This only applies when using Netty as the IO layer and is unnecessary when using the blocking
25+
* IO layer.
26+
*
27+
* <p><strong>Threading:</strong> the callback may be invoked on the Netty event loop thread.
28+
* Implementations must not perform blocking operations.
29+
*
30+
* <p>This API is experimental and is susceptible to change at any time.
31+
*
32+
* @since 5.30.0
33+
*/
34+
@FunctionalInterface
35+
public interface WriteListener {
36+
37+
/**
38+
* Called when the write operation completes.
39+
*
40+
* @param success {@code true} if the data was written to the socket successfully
41+
* @param cause the exception if the write failed, {@code null} on success
42+
*/
43+
void done(boolean success, Throwable cause);
44+
}

src/main/java/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
import java.io.ByteArrayOutputStream;
1919
import java.io.DataOutputStream;
2020
import java.io.IOException;
21+
import java.nio.ByteBuffer;
2122
import java.util.concurrent.locks.Lock;
2223
import java.util.concurrent.locks.ReentrantLock;
2324

2425
import com.rabbitmq.client.AMQP;
2526
import com.rabbitmq.client.Command;
27+
import com.rabbitmq.client.WriteListener;
2628

2729
/**
2830
* AMQP 0-9-1-specific implementation of {@link Command} which accumulates
@@ -46,44 +48,49 @@ public class AMQCommand implements Command {
4648
/** The assembler for this command - synchronised on - contains all the state */
4749
private final CommandAssembler assembler;
4850
private final Lock assemblerLock = new ReentrantLock();
51+
private final WriteListener writeListener;
4952

53+
/** Construct a command for inbound frame assembly with a max body length. */
5054
AMQCommand(int maxBodyLength) {
51-
this(null, null, null, maxBodyLength);
55+
this.assembler = new CommandAssembler(null, null, maxBodyLength);
56+
this.writeListener = null;
5257
}
5358

54-
/** Construct a command ready to fill in by reading frames */
59+
/** Construct a command ready to fill in by reading frames. */
5560
public AMQCommand() {
56-
this(null, null, null, Integer.MAX_VALUE);
61+
this(Integer.MAX_VALUE);
5762
}
5863

5964
/**
6065
* Construct a command with just a method, and without header or body.
6166
* @param method the wrapped method
6267
*/
6368
public AMQCommand(com.rabbitmq.client.Method method) {
64-
this(method, null, null, Integer.MAX_VALUE);
69+
this.assembler = new CommandAssembler((Method) method, null, Integer.MAX_VALUE);
70+
this.writeListener = null;
6571
}
6672

6773
/**
68-
* Construct a command with a specified method, header and body.
74+
* Construct a command with a ByteBuffer body for transmission.
6975
* @param method the wrapped method
7076
* @param contentHeader the wrapped content header
71-
* @param body the message body data
77+
* @param body the message body as a ByteBuffer
7278
*/
73-
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body) {
74-
this.assembler = new CommandAssembler((Method) method, contentHeader, body, Integer.MAX_VALUE);
79+
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, ByteBuffer body) {
80+
this(method, contentHeader, body, null);
7581
}
7682

7783
/**
78-
* Construct a command with a specified method, header and body.
84+
* Construct a command with a ByteBuffer body and a write completion listener.
7985
* @param method the wrapped method
8086
* @param contentHeader the wrapped content header
81-
* @param body the message body data
82-
* @param maxBodyLength the maximum size for an inbound message body
87+
* @param body the message body as a ByteBuffer
88+
* @param writeListener called when the network layer finishes writing, may be {@code null}
8389
*/
84-
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body,
85-
int maxBodyLength) {
86-
this.assembler = new CommandAssembler((Method) method, contentHeader, body, maxBodyLength);
90+
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, ByteBuffer body,
91+
WriteListener writeListener) {
92+
this.assembler = new CommandAssembler((Method) method, contentHeader, body);
93+
this.writeListener = writeListener;
8794
}
8895

8996
/** Public API - {@inheritDoc} */
@@ -122,13 +129,13 @@ public void transmit(AMQChannel channel) throws IOException {
122129
try {
123130
Method m = this.assembler.getMethod();
124131
if (m.hasContent()) {
125-
byte[] body = this.assembler.getContentBody();
126-
127-
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
132+
ByteBuffer body = this.assembler.getByteBufferBody();
133+
int bodySize = body == null ? 0 : body.remaining();
134+
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, bodySize);
128135

129136
int frameMax = connection.getFrameMax();
130137
boolean cappedFrameMax = frameMax > 0;
131-
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
138+
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : bodySize;
132139

133140
if (cappedFrameMax && headerFrame.size() > frameMax) {
134141
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
@@ -137,14 +144,14 @@ public void transmit(AMQChannel channel) throws IOException {
137144
connection.writeFrame(m.toFrame(channelNumber));
138145
connection.writeFrame(headerFrame);
139146

140-
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
141-
int remaining = body.length - offset;
142-
143-
int fragmentLength = (remaining < bodyPayloadMax) ? remaining
144-
: bodyPayloadMax;
145-
Frame frame = Frame.fromBodyFragment(channelNumber, body,
146-
offset, fragmentLength);
147-
connection.writeFrame(frame);
147+
if (body != null) {
148+
int bodyPosition = body.position();
149+
for (int offset = 0; offset < bodySize; offset += bodyPayloadMax) {
150+
int remaining = bodySize - offset;
151+
int fragmentLength = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
152+
Frame frame = Frame.fromBodyFragment(channelNumber, body, bodyPosition + offset, fragmentLength);
153+
connection.writeFrame(frame);
154+
}
148155
}
149156
} else {
150157
connection.writeFrame(m.toFrame(channelNumber));
@@ -153,7 +160,7 @@ public void transmit(AMQChannel channel) throws IOException {
153160
assemblerLock.unlock();
154161
}
155162

156-
connection.flush();
163+
connection.flush(this.writeListener);
157164
}
158165

159166
@Override public String toString() {
@@ -200,7 +207,7 @@ public static void checkPreconditions() {
200207
* code in Frame.
201208
*/
202209
private static void checkEmptyFrameSize() {
203-
Frame f = new Frame(AMQP.FRAME_BODY, 0, new byte[0]);
210+
Frame f = new Frame(AMQP.FRAME_BODY, 0, ByteBuffer.wrap(new byte[0]));
204211
ByteArrayOutputStream s = new ByteArrayOutputStream();
205212
try {
206213
f.writeTo(new DataOutputStream(s));

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,8 +655,12 @@ void writeFrame(Frame f) throws IOException {
655655
* Public API - flush the output buffers
656656
*/
657657
public void flush() throws IOException {
658+
flush(null);
659+
}
660+
661+
void flush(WriteListener listener) throws IOException {
658662
try {
659-
_frameHandler.flush();
663+
_frameHandler.flush(listener);
660664
} catch (IOException ioe) {
661665
this.errorOnWriteListener.handle(this, ioe);
662666
}

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.slf4j.LoggerFactory;
2929

3030
import java.io.IOException;
31+
import java.nio.ByteBuffer;
3132
import java.util.*;
3233
import java.util.concurrent.CompletableFuture;
3334
import java.util.concurrent.CopyOnWriteArrayList;
@@ -714,6 +715,18 @@ public void basicPublish(String exchange, String routingKey,
714715
boolean mandatory, boolean immediate,
715716
BasicProperties props, byte[] body)
716717
throws IOException
718+
{
719+
basicPublish(exchange, routingKey, mandatory, immediate, props,
720+
body == null ? null : ByteBuffer.wrap(body), null);
721+
}
722+
723+
/** Public API - {@inheritDoc} */
724+
@Override
725+
public void basicPublish(String exchange, String routingKey,
726+
boolean mandatory, boolean immediate,
727+
BasicProperties props, ByteBuffer body,
728+
WriteListener listener)
729+
throws IOException
717730
{
718731
final long deliveryTag;
719732
if (nextPublishSeqNo > 0) {
@@ -734,7 +747,7 @@ public void basicPublish(String exchange, String routingKey,
734747
.build();
735748
try {
736749
ObservationCollector.PublishCall publishCall = properties -> {
737-
AMQCommand command = new AMQCommand(publish, properties, body);
750+
AMQCommand command = new AMQCommand(publish, properties, body, listener);
738751
transmit(command);
739752
};
740753
observationCollector.publish(publishCall, publish, props, body, this.connectionInfo());

src/main/java/com/rabbitmq/client/impl/CommandAssembler.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.rabbitmq.client.impl;
1717

1818
import java.io.IOException;
19+
import java.nio.ByteBuffer;
1920
import java.util.ArrayList;
2021
import java.util.List;
2122

@@ -50,20 +51,41 @@ private enum CAState {
5051
/** sum of the lengths of all fragments */
5152
private int bodyLength;
5253

54+
/** Zero-copy body buffer, mutually exclusive with bodyN usage on the outbound path */
55+
private final ByteBuffer byteBufferBody;
56+
5357
/** No bytes of content body not yet accumulated */
5458
private long remainingBodyBytes;
5559

5660
private final int maxBodyLength;
5761

58-
public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body,
62+
public CommandAssembler(Method method, AMQContentHeader contentHeader,
5963
int maxBodyLength) {
6064
this.method = method;
6165
this.contentHeader = contentHeader;
6266
this.bodyN = new ArrayList<>(2);
6367
this.bodyLength = 0;
68+
this.byteBufferBody = null;
6469
this.remainingBodyBytes = 0;
6570
this.maxBodyLength = maxBodyLength;
66-
appendBodyFragment(body);
71+
if (method == null) {
72+
this.state = CAState.EXPECTING_METHOD;
73+
} else if (contentHeader == null) {
74+
this.state = method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
75+
} else {
76+
this.remainingBodyBytes = contentHeader.getBodySize() - this.bodyLength;
77+
updateContentBodyState();
78+
}
79+
}
80+
81+
public CommandAssembler(Method method, AMQContentHeader contentHeader, ByteBuffer body) {
82+
this.method = method;
83+
this.contentHeader = contentHeader;
84+
this.bodyN = new ArrayList<>(0);
85+
this.bodyLength = body == null ? 0 : body.remaining();
86+
this.byteBufferBody = body;
87+
this.remainingBodyBytes = 0;
88+
this.maxBodyLength = Integer.MAX_VALUE;
6789
if (method == null) {
6890
this.state = CAState.EXPECTING_METHOD;
6991
} else if (contentHeader == null) {
@@ -151,9 +173,19 @@ private byte[] coalesceContentBody() {
151173
}
152174

153175
public synchronized byte[] getContentBody() {
176+
if (byteBufferBody != null) {
177+
ByteBuffer dup = byteBufferBody.duplicate();
178+
byte[] result = new byte[dup.remaining()];
179+
dup.get(result);
180+
return result;
181+
}
154182
return coalesceContentBody();
155183
}
156184

185+
public ByteBuffer getByteBufferBody() {
186+
return this.byteBufferBody;
187+
}
188+
157189
private void appendBodyFragment(byte[] fragment) {
158190
if (fragment == null || fragment.length == 0) return;
159191
bodyN.add(fragment);

src/main/java/com/rabbitmq/client/impl/DefaultHeartbeatSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public void run() {
137137

138138
if (now > (lastActivityTime + this.heartbeatNanos)) {
139139
frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
140-
frameHandler.flush();
140+
frameHandler.flush(null);
141141
}
142142
} catch (IOException e) {
143143
// ignore

0 commit comments

Comments
 (0)