Skip to content

Commit e8f10dd

Browse files
authored
Ensure AMQP protocol marshals messages before passing to broker (#1859)
This change brings the behaior inline with other protocols and will prevent a future race condition during copy/dispatch as the data will already be marshaled. This closes #1858
1 parent 4bfcd13 commit e8f10dd

2 files changed

Lines changed: 32 additions & 0 deletions

File tree

activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,11 @@ protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) th
205205
}
206206

207207
message.onSend();
208+
// GH-1851 - Ensure we marshal the message before passing to the broker
209+
// This prevents a race condition later if the message is copyied/marshaled
210+
// at this same time. This behavior is in line with how messages are received
211+
// using OpenWire (when not using VM) and Stomp.
212+
message.beforeMarshall(null);
208213

209214
sendsInFlight++;
210215

activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.ArrayList;
2727
import java.util.Enumeration;
2828
import java.util.HashSet;
29+
import java.util.List;
2930
import java.util.Set;
3031
import java.util.concurrent.CountDownLatch;
3132
import java.util.concurrent.TimeUnit;
@@ -52,6 +53,10 @@
5253
import jakarta.jms.TopicSession;
5354
import jakarta.jms.TopicSubscriber;
5455

56+
import org.apache.activemq.broker.Broker;
57+
import org.apache.activemq.broker.BrokerFilter;
58+
import org.apache.activemq.broker.BrokerPlugin;
59+
import org.apache.activemq.broker.ProducerBrokerExchange;
5560
import org.apache.activemq.broker.jmx.BrokerView;
5661
import org.apache.activemq.broker.jmx.BrokerViewMBean;
5762
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
@@ -1360,4 +1365,26 @@ public void onMessage(Message message) {
13601365

13611366
assertTrue("Did not receive all messages: " + MSG_COUNT, done.await(15, TimeUnit.SECONDS));
13621367
}
1368+
1369+
@Override
1370+
protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws Exception {
1371+
super.addAdditionalPlugins(plugins);
1372+
plugins.add(new BrokerPlugin() {
1373+
1374+
@Override
1375+
public Broker installPlugin(Broker broker) {
1376+
return new BrokerFilter(broker) {
1377+
1378+
@Override
1379+
public void send(ProducerBrokerExchange producerExchange,
1380+
org.apache.activemq.command.Message messageSend) throws Exception {
1381+
super.send(producerExchange, messageSend);
1382+
1383+
// The message should always be passed to the broker in a marshaled state
1384+
assertTrue(messageSend.isMarshalled());
1385+
}
1386+
};
1387+
}
1388+
});
1389+
}
13631390
}

0 commit comments

Comments
 (0)