55
66import com .google .protobuf .InvalidProtocolBufferException ;
77import com .google .transit .realtime .GtfsRealtime ;
8+ import com .hivemq .client .mqtt .datatypes .MqttQos ;
9+ import com .hivemq .client .mqtt .lifecycle .MqttClientDisconnectedContext ;
10+ import com .hivemq .client .mqtt .mqtt5 .Mqtt5AsyncClient ;
11+ import com .hivemq .client .mqtt .mqtt5 .Mqtt5Client ;
12+ import com .hivemq .client .mqtt .mqtt5 .message .auth .Mqtt5SimpleAuth ;
13+ import com .hivemq .client .mqtt .mqtt5 .message .publish .Mqtt5Publish ;
814import java .net .URI ;
15+ import java .net .URISyntaxException ;
16+ import java .nio .charset .StandardCharsets ;
917import java .util .ArrayList ;
1018import java .util .List ;
19+ import java .util .Optional ;
20+ import java .util .UUID ;
1121import java .util .function .Consumer ;
12- import org .eclipse .paho .client .mqttv3 .IMqttDeliveryToken ;
13- import org .eclipse .paho .client .mqttv3 .MqttCallbackExtended ;
14- import org .eclipse .paho .client .mqttv3 .MqttClient ;
15- import org .eclipse .paho .client .mqttv3 .MqttConnectOptions ;
16- import org .eclipse .paho .client .mqttv3 .MqttException ;
17- import org .eclipse .paho .client .mqttv3 .MqttMessage ;
18- import org .eclipse .paho .client .mqttv3 .persist .MemoryPersistence ;
1922import org .opentripplanner .updater .spi .GraphUpdater ;
2023import org .opentripplanner .updater .spi .UpdateResult ;
2124import org .opentripplanner .updater .spi .WriteToGraphCallback ;
3033import org .slf4j .LoggerFactory ;
3134
3235/**
33- * This class starts an Paho MQTT client which opens a connection to a GTFS-RT data source. A
36+ * This class starts a hive MQTT client which opens a connection to a GTFS-RT data source. A
3437 * callback is registered which handles incoming GTFS-RT messages as they stream in by placing a
3538 * GTFS-RT decoder Runnable task in the single-threaded executor for handling.
3639 * <p>
@@ -56,16 +59,14 @@ public class MqttGtfsRealtimeUpdater implements GraphUpdater {
5659 private final int qos ;
5760 private final ForwardsDelayPropagationType forwardsDelayPropagationType ;
5861 private final BackwardsDelayPropagationType backwardsDelayPropagationType ;
59- private final String clientId = "OpenTripPlanner-" + MqttClient .generateClientId ();
6062 private final String configRef ;
61- private final MemoryPersistence persistence = new MemoryPersistence ();
6263 private final GtfsRealTimeTripUpdateAdapter adapter ;
6364 private final Consumer <UpdateResult > recordMetrics ;
6465 private WriteToGraphCallback saveResultOnGraph ;
6566
6667 private final boolean fuzzyTripMatching ;
6768
68- private MqttClient client ;
69+ private Mqtt5AsyncClient client ;
6970
7071 public MqttGtfsRealtimeUpdater (
7172 MqttGtfsRealtimeUpdaterParameters parameters ,
@@ -92,106 +93,113 @@ public void setup(WriteToGraphCallback writeToGraphCallback) {
9293
9394 @ Override
9495 public void run () throws Exception {
95- client = new MqttClient ( url , clientId , persistence );
96- MqttConnectOptions connOpts = new MqttConnectOptions ();
97- connOpts . setCleanSession ( true );
98- connOpts . setAutomaticReconnect ( true );
96+ client = connectAndSubscribeToClient ( );
97+ }
98+
99+ private Mqtt5AsyncClient connectAndSubscribeToClient () throws URISyntaxException {
99100 URI parsedUrl = new URI (url );
101+ Mqtt5SimpleAuth auth = createAuthFromUrl (parsedUrl );
102+
103+ Mqtt5AsyncClient asyncClient = Mqtt5Client .builder ()
104+ .identifier ("OpenTripPlanner-" + UUID .randomUUID ())
105+ .serverHost (parsedUrl .getHost ())
106+ .serverPort (parsedUrl .getPort ())
107+ .simpleAuth (auth )
108+ .automaticReconnectWithDefaultConfig ()
109+ .addConnectedListener (ctx -> onConnect ())
110+ .addDisconnectedListener (this ::onDisconnect )
111+ .buildAsync ();
112+
113+ asyncClient .connectWith ().keepAlive (30 ).cleanStart (true ).send ().join ();
114+
115+ asyncClient
116+ .subscribeWith ()
117+ .topicFilter (topic )
118+ .qos (Optional .ofNullable (MqttQos .fromCode (qos )).orElse (MqttQos .AT_MOST_ONCE ))
119+ .callback (this ::onMessage )
120+ .send ()
121+ .join ();
122+
123+ return asyncClient ;
124+ }
125+
126+ private Mqtt5SimpleAuth createAuthFromUrl (URI parsedUrl ) {
100127 if (parsedUrl .getUserInfo () != null ) {
101128 String [] userinfo = parsedUrl .getUserInfo ().split (":" );
102- connOpts .setUserName (userinfo [0 ]);
103- connOpts .setPassword (userinfo [1 ].toCharArray ());
129+ return Mqtt5SimpleAuth .builder ()
130+ .username (userinfo [0 ])
131+ .password (userinfo [1 ].getBytes (StandardCharsets .UTF_8 ))
132+ .build ();
104133 }
105- client .setCallback (new Callback ());
134+ return null ;
135+ }
106136
107- LOG .debug ("Connecting to broker: {}" , url );
108- client .connect (connOpts );
137+ private void onDisconnect (MqttClientDisconnectedContext ctx ) {
138+ LOG .info ("Disconnected client from MQTT broker: {}" , url , ctx .getCause ());
139+ }
140+
141+ private void onConnect () {
142+ LOG .info ("Connected client to MQTT broker: {} with qos: {}" , url , qos );
109143 }
110144
111145 @ Override
112146 public void teardown () {
113- try {
114- client .disconnect ();
115- } catch (MqttException e ) {
116- LOG .error ("Error disconnecting" , e );
117- }
147+ client .disconnect ();
118148 }
119149
120150 @ Override
121151 public String getConfigRef () {
122152 return configRef ;
123153 }
124154
125- private class Callback implements MqttCallbackExtended {
126-
127- @ Override
128- public void connectComplete (boolean reconnect , String serverURI ) {
129- try {
130- LOG .debug ("Connected" );
131- client .subscribe (topic , qos );
132- } catch (MqttException e ) {
133- LOG .warn ("Could not subscribe to: {}" , topic );
155+ private void onMessage (Mqtt5Publish message ) {
156+ List <GtfsRealtime .TripUpdate > updates = null ;
157+ UpdateIncrementality updateIncrementality = FULL_DATASET ;
158+ try {
159+ // Decode message
160+ GtfsRealtime .FeedMessage feedMessage = GtfsRealtime .FeedMessage .parseFrom (
161+ message .getPayloadAsBytes ()
162+ );
163+ List <GtfsRealtime .FeedEntity > feedEntityList = feedMessage .getEntityList ();
164+
165+ // Change fullDataset value if this is an incremental update
166+ if (
167+ feedMessage .hasHeader () &&
168+ feedMessage .getHeader ().hasIncrementality () &&
169+ feedMessage
170+ .getHeader ()
171+ .getIncrementality ()
172+ .equals (GtfsRealtime .FeedHeader .Incrementality .DIFFERENTIAL )
173+ ) {
174+ updateIncrementality = DIFFERENTIAL ;
134175 }
135- }
136176
137- @ Override
138- public void connectionLost (Throwable cause ) {
139- LOG .debug ("Disconnected" );
140- }
141-
142- @ Override
143- public void messageArrived (String topic , MqttMessage message ) {
144- List <GtfsRealtime .TripUpdate > updates = null ;
145- UpdateIncrementality updateIncrementality = FULL_DATASET ;
146- try {
147- // Decode message
148- GtfsRealtime .FeedMessage feedMessage = GtfsRealtime .FeedMessage .parseFrom (
149- message .getPayload ()
150- );
151- List <GtfsRealtime .FeedEntity > feedEntityList = feedMessage .getEntityList ();
152-
153- // Change fullDataset value if this is an incremental update
154- if (
155- feedMessage .hasHeader () &&
156- feedMessage .getHeader ().hasIncrementality () &&
157- feedMessage
158- .getHeader ()
159- .getIncrementality ()
160- .equals (GtfsRealtime .FeedHeader .Incrementality .DIFFERENTIAL )
161- ) {
162- updateIncrementality = DIFFERENTIAL ;
177+ // Create List of TripUpdates
178+ updates = new ArrayList <>(feedEntityList .size ());
179+ for (GtfsRealtime .FeedEntity feedEntity : feedEntityList ) {
180+ if (feedEntity .hasTripUpdate ()) {
181+ updates .add (feedEntity .getTripUpdate ());
163182 }
164-
165- // Create List of TripUpdates
166- updates = new ArrayList <>(feedEntityList .size ());
167- for (GtfsRealtime .FeedEntity feedEntity : feedEntityList ) {
168- if (feedEntity .hasTripUpdate ()) {
169- updates .add (feedEntity .getTripUpdate ());
170- }
171- }
172- } catch (InvalidProtocolBufferException e ) {
173- LOG .error ("Could not decode gtfs-rt message:" , e );
174- }
175-
176- if (updates != null ) {
177- // Handle trip updates via graph writer runnable
178- saveResultOnGraph .execute (
179- new TripUpdateGraphWriterRunnable (
180- adapter ,
181- fuzzyTripMatching ,
182- forwardsDelayPropagationType ,
183- backwardsDelayPropagationType ,
184- updateIncrementality ,
185- updates ,
186- feedId ,
187- recordMetrics
188- )
189- );
190183 }
184+ } catch (InvalidProtocolBufferException e ) {
185+ LOG .error ("Could not decode gtfs-rt message:" , e );
191186 }
192187
193- @ Override
194- public void deliveryComplete (IMqttDeliveryToken token ) {}
188+ if (updates != null ) {
189+ // Handle trip updates via graph writer runnable
190+ saveResultOnGraph .execute (
191+ new TripUpdateGraphWriterRunnable (
192+ adapter ,
193+ fuzzyTripMatching ,
194+ forwardsDelayPropagationType ,
195+ backwardsDelayPropagationType ,
196+ updateIncrementality ,
197+ updates ,
198+ feedId ,
199+ recordMetrics
200+ )
201+ );
202+ }
195203 }
196204
197205 @ Override
0 commit comments