Skip to content

Commit a7da1f8

Browse files
authored
Ensure AMQP protocol marshals messages before passing to broker (#1859) (#1861)
This change brings the behaior inline with other protocols and will prevent a future race condition during copy/dispatch as the data will already be marshaled. This closes #1858 (cherry picked from commit e8f10dd)
1 parent 9bb9ee6 commit a7da1f8

2 files changed

Lines changed: 32 additions & 0 deletions

File tree

activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,11 @@ protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) th
205205
}
206206

207207
message.onSend();
208+
// GH-1851 - Ensure we marshal the message before passing to the broker
209+
// This prevents a race condition later if the message is copyied/marshaled
210+
// at this same time. This behavior is in line with how messages are received
211+
// using OpenWire (when not using VM) and Stomp.
212+
message.beforeMarshall(null);
208213

209214
sendsInFlight++;
210215

activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.ArrayList;
2727
import java.util.Enumeration;
2828
import java.util.HashSet;
29+
import java.util.List;
2930
import java.util.Set;
3031
import java.util.concurrent.CountDownLatch;
3132
import java.util.concurrent.TimeUnit;
@@ -52,6 +53,10 @@
5253
import javax.jms.TopicSession;
5354
import javax.jms.TopicSubscriber;
5455

56+
import org.apache.activemq.broker.Broker;
57+
import org.apache.activemq.broker.BrokerFilter;
58+
import org.apache.activemq.broker.BrokerPlugin;
59+
import org.apache.activemq.broker.ProducerBrokerExchange;
5560
import org.apache.activemq.broker.jmx.BrokerView;
5661
import org.apache.activemq.broker.jmx.BrokerViewMBean;
5762
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
@@ -1358,4 +1363,26 @@ public void onMessage(Message message) {
13581363

13591364
assertTrue("Did not receive all messages: " + MSG_COUNT, done.await(15, TimeUnit.SECONDS));
13601365
}
1366+
1367+
@Override
1368+
protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws Exception {
1369+
super.addAdditionalPlugins(plugins);
1370+
plugins.add(new BrokerPlugin() {
1371+
1372+
@Override
1373+
public Broker installPlugin(Broker broker) {
1374+
return new BrokerFilter(broker) {
1375+
1376+
@Override
1377+
public void send(ProducerBrokerExchange producerExchange,
1378+
org.apache.activemq.command.Message messageSend) throws Exception {
1379+
super.send(producerExchange, messageSend);
1380+
1381+
// The message should always be passed to the broker in a marshaled state
1382+
assertTrue(messageSend.isMarshalled());
1383+
}
1384+
};
1385+
}
1386+
});
1387+
}
13611388
}

0 commit comments

Comments
 (0)