Skip to content

Commit eaddce5

Browse files
TylerMuraliclaude
andcommitted
Fix network bridge silent death when transport exception occurs during broker info handshake (#1864)
Remove early return in onException() handlers for local and remote transports in DemandForwardingBridgeSupport. When futureBrokerInfo was not done, the handler cancelled the future and returned without calling serviceLocalException()/serviceRemoteException(), preventing the reconnection chain from firing. The bridge would silently die and never re-establish. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent c6f21cb commit eaddce5

2 files changed

Lines changed: 216 additions & 2 deletions

File tree

activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,6 @@ public void onException(IOException error) {
228228
LOG.info("Error with pending local brokerInfo on: {} ({})", localBroker, error.getMessage());
229229
LOG.debug("Peer error: ", error);
230230
futureLocalBrokerInfo.cancel(true);
231-
return;
232231
}
233232
serviceLocalException(error);
234233
}
@@ -248,7 +247,6 @@ public void onException(IOException error) {
248247
LOG.info("Error with pending remote brokerInfo on: {} ({})", remoteBroker, error.getMessage());
249248
LOG.debug("Peer error: ", error);
250249
futureRemoteBrokerInfo.cancel(true);
251-
return;
252250
}
253251
serviceRemoteException(error);
254252
}
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.network;
18+
19+
import static org.junit.Assert.assertNotNull;
20+
import static org.junit.Assert.assertTrue;
21+
22+
import java.net.URI;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import jakarta.jms.Connection;
26+
import jakarta.jms.MessageConsumer;
27+
import jakarta.jms.MessageProducer;
28+
import jakarta.jms.Session;
29+
import jakarta.jms.TextMessage;
30+
31+
import org.apache.activemq.ActiveMQConnectionFactory;
32+
import org.apache.activemq.broker.BrokerService;
33+
import org.apache.activemq.command.ActiveMQQueue;
34+
import org.apache.activemq.util.Wait;
35+
import org.junit.After;
36+
import org.junit.Test;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
/**
41+
* Test that verifies network bridges reconnect after the remote broker is
42+
* stopped abruptly during or before the broker info handshake completes.
43+
*
44+
* This covers the bug where {@code onException()} in
45+
* {@link DemandForwardingBridgeSupport} returned early when
46+
* {@code futureBrokerInfo} was not done, preventing
47+
* {@code serviceLocalException()}/{@code serviceRemoteException()} from
48+
* firing and thus blocking the reconnection chain.
49+
*/
50+
public class NetworkBridgeReconnectOnHandshakeFailureTest {
51+
52+
private static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeReconnectOnHandshakeFailureTest.class);
53+
54+
private BrokerService localBroker;
55+
private BrokerService remoteBroker;
56+
57+
@After
58+
public void tearDown() throws Exception {
59+
if (localBroker != null) {
60+
try { localBroker.stop(); } catch (Exception ignored) {}
61+
localBroker.waitUntilStopped();
62+
}
63+
if (remoteBroker != null) {
64+
try { remoteBroker.stop(); } catch (Exception ignored) {}
65+
remoteBroker.waitUntilStopped();
66+
}
67+
}
68+
69+
/**
70+
* Verify that when the remote broker is abruptly stopped (causing a
71+
* transport exception potentially during the broker info handshake),
72+
* the network bridge reconnects once the remote broker is restarted.
73+
*/
74+
@Test(timeout = 60_000)
75+
public void testBridgeReconnectsAfterRemoteBrokerRestart() throws Exception {
76+
remoteBroker = createRemoteBroker(0);
77+
remoteBroker.start();
78+
remoteBroker.waitUntilStarted();
79+
int remotePort = remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort();
80+
LOG.info("Remote broker started on port {}", remotePort);
81+
82+
localBroker = createLocalBroker(remotePort);
83+
localBroker.start();
84+
localBroker.waitUntilStarted();
85+
DiscoveryNetworkConnector nc = (DiscoveryNetworkConnector) localBroker.getNetworkConnectors().get(0);
86+
87+
// Wait for the bridge to fully establish
88+
assertTrue("Bridge should be established", Wait.waitFor(() ->
89+
!nc.activeBridges().isEmpty(), 15_000, 200));
90+
LOG.info("Bridge established");
91+
92+
// Abruptly stop the remote broker — this triggers onException on the
93+
// bridge transports, potentially while futureBrokerInfo is still pending
94+
remoteBroker.stop();
95+
remoteBroker.waitUntilStopped();
96+
LOG.info("Remote broker stopped abruptly");
97+
98+
// Wait for the bridge to go down
99+
assertTrue("Bridge should go down after remote stop", Wait.waitFor(() ->
100+
nc.activeBridges().isEmpty(), 10_000, 200));
101+
102+
// Restart the remote broker on the same port
103+
remoteBroker = createRemoteBroker(remotePort);
104+
remoteBroker.start();
105+
remoteBroker.waitUntilStarted();
106+
LOG.info("Remote broker restarted on port {}", remotePort);
107+
108+
// The bridge should reconnect — this is what failed before the fix,
109+
// because onException returned early without calling serviceRemoteException()
110+
assertTrue("Bridge should reconnect after remote broker restart", Wait.waitFor(() ->
111+
!nc.activeBridges().isEmpty(), 30_000, 500));
112+
LOG.info("Bridge reconnected successfully");
113+
114+
// Verify messages can flow across the re-established bridge
115+
verifyMessageFlow(localBroker, remoteBroker);
116+
}
117+
118+
/**
119+
* A more aggressive variant: stop the remote broker multiple times in
120+
* quick succession to increase the chance of hitting the onException
121+
* path during broker info exchange.
122+
*/
123+
@Test(timeout = 120_000)
124+
public void testBridgeReconnectsAfterMultipleRemoteBrokerRestarts() throws Exception {
125+
remoteBroker = createRemoteBroker(0);
126+
remoteBroker.start();
127+
remoteBroker.waitUntilStarted();
128+
int remotePort = remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort();
129+
130+
localBroker = createLocalBroker(remotePort);
131+
localBroker.start();
132+
localBroker.waitUntilStarted();
133+
DiscoveryNetworkConnector nc = (DiscoveryNetworkConnector) localBroker.getNetworkConnectors().get(0);
134+
135+
for (int i = 0; i < 3; i++) {
136+
LOG.info("=== Restart cycle {} ===", i + 1);
137+
138+
assertTrue("Bridge should be established (cycle " + (i + 1) + ")", Wait.waitFor(() ->
139+
!nc.activeBridges().isEmpty(), 30_000, 500));
140+
141+
remoteBroker.stop();
142+
remoteBroker.waitUntilStopped();
143+
144+
assertTrue("Bridge should go down (cycle " + (i + 1) + ")", Wait.waitFor(() ->
145+
nc.activeBridges().isEmpty(), 10_000, 200));
146+
147+
remoteBroker = createRemoteBroker(remotePort);
148+
remoteBroker.start();
149+
remoteBroker.waitUntilStarted();
150+
}
151+
152+
assertTrue("Bridge should be established after all restarts", Wait.waitFor(() ->
153+
!nc.activeBridges().isEmpty(), 30_000, 500));
154+
verifyMessageFlow(localBroker, remoteBroker);
155+
}
156+
157+
private BrokerService createRemoteBroker(int port) throws Exception {
158+
BrokerService broker = new BrokerService();
159+
broker.setBrokerName("remoteBroker");
160+
broker.setUseJmx(false);
161+
broker.setPersistent(false);
162+
broker.setUseShutdownHook(false);
163+
broker.addConnector("tcp://localhost:" + port);
164+
return broker;
165+
}
166+
167+
private BrokerService createLocalBroker(int remotePort) throws Exception {
168+
BrokerService broker = new BrokerService();
169+
broker.setBrokerName("localBroker");
170+
broker.setUseJmx(false);
171+
broker.setPersistent(false);
172+
broker.setUseShutdownHook(false);
173+
DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector(
174+
new URI("static:(tcp://localhost:" + remotePort + ")?useExponentialBackOff=false&initialReconnectDelay=1000"));
175+
nc.setName("bridge-reconnect-test");
176+
broker.addNetworkConnector(nc);
177+
return broker;
178+
}
179+
180+
private void verifyMessageFlow(BrokerService local, BrokerService remote) throws Exception {
181+
ActiveMQQueue dest = new ActiveMQQueue("RECONNECT.HANDSHAKE.TEST");
182+
183+
// Create consumer on remote broker
184+
ActiveMQConnectionFactory remoteFac = new ActiveMQConnectionFactory(remote.getVmConnectorURI());
185+
Connection remoteConn = remoteFac.createConnection();
186+
remoteConn.start();
187+
Session remoteSession = remoteConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
188+
MessageConsumer consumer = remoteSession.createConsumer(dest);
189+
190+
// Wait for the demand subscription to propagate across the bridge
191+
assertTrue("Demand subscription should propagate", Wait.waitFor(() -> {
192+
try {
193+
return local.getDestination(dest) != null
194+
&& local.getDestination(dest).getConsumers().size() > 0;
195+
} catch (Exception e) {
196+
return false;
197+
}
198+
}, 15_000, 200));
199+
200+
// Send message from local broker
201+
ActiveMQConnectionFactory localFac = new ActiveMQConnectionFactory(local.getVmConnectorURI());
202+
Connection localConn = localFac.createConnection();
203+
localConn.start();
204+
Session localSession = localConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
205+
MessageProducer producer = localSession.createProducer(dest);
206+
producer.send(localSession.createTextMessage("test-after-reconnect"));
207+
producer.close();
208+
209+
// Receive on remote
210+
TextMessage received = (TextMessage) consumer.receive(TimeUnit.SECONDS.toMillis(10));
211+
assertNotNull("Message should flow across the re-established bridge", received);
212+
213+
localConn.close();
214+
remoteConn.close();
215+
}
216+
}

0 commit comments

Comments
 (0)