Skip to content

Commit d209a36

Browse files
authored
Feature/add hivemq client for mqtt services (#224)
* Add HiveMQ client dependency to Maven project Introduced the HiveMQ MQTT client (version 1.3.3) to the Maven project dependencies in pom.xml. This addition enables enhanced MQTT communication capabilities alongside the existing Paho client. * Refactor MQTT client handling Refactor MQTT client handling by introducing MqttClientWrapper and PahoMqttClientWrapper interfaces. This change encapsulates MQTT client operations, providing better abstraction and sanitizes the exception handling within the PahoMqttClientWrapper. Additionally, updated pom.xml to include the necessary MQTT dependency. * Refactor MQTT client handling Introduced MqttClientWrapper and PahoMqttClientWrapper to encapsulate MQTT client operations, improving abstraction and sanitarizing exception handling. This also simplifies the CloudOnboardingServiceImpl class and removes direct dependencies on the Paho MQTT library. * Update DeleteMessageServiceImpl to use PahoMqttClientWrapper Replaced direct usage of IMqttClient with PahoMqttClientWrapper for message publishing in DeleteMessageServiceImpl. Also removed unnecessary exception handling and simplified the payload publishing process. * Refactor MQTT client handling in ListEndpointsServiceImpl. Replace direct usage of IMqttClient with PahoMqttClientWrapper to improve MQTT client management. Remove the exception handling for MqttException, as it is now managed within the wrapper. * Refactor MQTT message confirmation handling Replaced direct use of `IMqttClient` with `PahoMqttClientWrapper` to streamline the client interface and removed unnecessary exception handling. This refactoring simplifies the codebase and improves maintainability by reducing direct dependencies on the Paho client within `MessageConfirmationServiceImpl.java`. * Refactor MQTT client handling for improved reliability Replace direct usage of `IMqttClient` with `PahoMqttClientWrapper` across multiple service implementations to streamline error handling and reduce boilerplate code. This includes updating message sending methods to remove redundant try-catch blocks and simplifying message publication. * Refactor MQTT client integration Renamed MQTT client classes to a new package and added HiveMQ MQTT client wrapper for improved modularity. Eliminated direct Paho dependencies in MessageQueryHelperService in favor of a unified client wrapper interface. Updated `.gitignore` to include local settings configuration. * Support HiveMQ MQTT Client in messaging services Added constructors to various messaging service implementations to support the HiveMQ MQTT client. This increases compatibility and allows for more flexible client configurations. Existing functionality with the Paho MQTT client remains unchanged. * Rename paho package to client Renamed MqttOptionService and MqttClientService classes to reflect a more appropriate package name of `client` instead of `paho`. This enhances clarity and consistency throughout the project. * Add HiveMqttClientService for creating MQTT clients Introduced HiveMqttClientService to facilitate the creation of MQTT clients using onboarding responses or router devices. This service ensures robust client creation, utilizing provided host, port, and clientId parameters and enforcing necessary validations. * Add Javadoc comments to MqttClientWrapper interface Added detailed Javadoc comments to the MqttClientWrapper interface. These comments describe the interface and its publish method, providing clarity on how to use them. * Add class-level Javadoc comments for MQTT client wrappers Added detailed Javadoc comments to both PahoMqttClientWrapper and HiveMqttClientWrapper classes. These comments provide an overview of the class functionality and its role in publishing MQTT messages to specified topics, which enhances code readability and maintainability. * Refactor PingServiceImpl to improve client handling Refactor the PingServiceImpl to use wrapper classes for different MQTT clients, ensuring better abstraction and flexibility. This change replaces the exception handling mechanism and removes unnecessary imports, thus streamlining the message publishing process. * Update SDK version to 3.3.0 Upgrade the agrirouter SDK version from 3.2.2 to 3.3.0 across all modules. This includes changes in the parent POM and specific module POMs for tests, convenience, implementation, and API. The update ensures consistency and leverages the new features and fixes in version 3.3.0. * Add SuppressWarnings annotation to constructor Added a @SuppressWarnings("unused") annotation to the PingServiceImpl constructor that accepts an Mqtt3AsyncClient parameter. This change addresses potential warnings about unused parameters, enhancing code cleanliness. * Update GitHub Actions to use latest versions Upgraded actions/checkout to v4.2.2 and actions/setup-java to v4.5.0 across all workflow steps. This ensures compatibility with the latest features and security enhancements.
1 parent 4bc62e8 commit d209a36

21 files changed

Lines changed: 419 additions & 235 deletions

File tree

.github/workflows/continuous_integration.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ jobs:
1212
build_jdk17:
1313
runs-on: ubuntu-latest
1414
steps:
15-
- uses: actions/checkout@v3
15+
- uses: actions/checkout@v4.2.2
1616
- name: Set up JDK 17
17-
uses: actions/setup-java@v3
17+
uses: actions/setup-java@v4.5.0
1818
with:
1919
java-version: 17
2020
distribution: zulu
@@ -25,9 +25,9 @@ jobs:
2525
build_jdk21:
2626
runs-on: ubuntu-latest
2727
steps:
28-
- uses: actions/checkout@v3
28+
- uses: actions/checkout@v4.2.2
2929
- name: Set up JDK 21
30-
uses: actions/setup-java@v3
30+
uses: actions/setup-java@v4.5.0
3131
with:
3232
java-version: 21
3333
distribution: zulu
@@ -38,9 +38,9 @@ jobs:
3838
integration_test:
3939
runs-on: ubuntu-latest
4040
steps:
41-
- uses: actions/checkout@v3
41+
- uses: actions/checkout@v4.2.2
4242
- name: Set up JDK 17
43-
uses: actions/setup-java@v3
43+
uses: actions/setup-java@v4.5.0
4444
with:
4545
java-version: 17
4646
distribution: zulu

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,8 @@ buildNumber.properties
2727
# GENERATED #
2828
src/main/generated
2929

30+
# LOCAL SETTINGS #
31+
ci/local-settings.xml
32+
3033
# LCK #
3134
*.lck

agrirouter-sdk-java-api/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@
4848
<groupId>org.apache.commons</groupId>
4949
<artifactId>commons-lang3</artifactId>
5050
</dependency>
51+
<dependency>
52+
<groupId>org.eclipse.paho</groupId>
53+
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
54+
</dependency>
55+
<dependency>
56+
<groupId>com.hivemq</groupId>
57+
<artifactId>hivemq-mqtt-client</artifactId>
58+
</dependency>
5159
</dependencies>
5260

5361
<build>
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.dke.data.agrirouter.api.mqtt;
2+
3+
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
4+
5+
/**
6+
* A wrapper around HiveMQ's Mqtt3AsyncClient that implements the MqttClientWrapper interface.
7+
* This class is responsible for publishing messages to a specified MQTT topic using the
8+
* provided Mqtt3AsyncClient.
9+
*/
10+
public class HiveMqttClientWrapper implements MqttClientWrapper {
11+
12+
private final Mqtt3AsyncClient mqttClient;
13+
14+
public HiveMqttClientWrapper(Mqtt3AsyncClient mqttClient) {
15+
this.mqttClient = mqttClient;
16+
}
17+
18+
@Override
19+
public void publish(String measures, byte[] payload) {
20+
this.mqttClient.publishWith().topic(measures).payload(payload).send();
21+
}
22+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.dke.data.agrirouter.api.mqtt;
2+
3+
/**
4+
* Interface representing an MQTT client wrapper for publishing messages.
5+
* Implementations of this interface should define how to publish messages
6+
* to a specific topic with a given payload.
7+
*/
8+
public interface MqttClientWrapper {
9+
10+
/**
11+
* Publishes a message to a specific topic with a given payload.
12+
*
13+
* @param measures the topic to which the message should be published
14+
* @param payload the byte array representing the content of the message
15+
*/
16+
void publish(String measures, byte[] payload);
17+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.dke.data.agrirouter.api.mqtt;
2+
3+
import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException;
4+
import org.eclipse.paho.client.mqttv3.IMqttClient;
5+
import org.eclipse.paho.client.mqttv3.MqttException;
6+
import org.eclipse.paho.client.mqttv3.MqttMessage;
7+
8+
/**
9+
* PahoMqttClientWrapper is an implementation of the MqttClientWrapper interface,
10+
* providing a wrapper around an instance of IMqttClient from the Paho MQTT library.
11+
* This class handles the publishing of MQTT messages to specified topics with a given payload.
12+
*/
13+
public class PahoMqttClientWrapper implements MqttClientWrapper {
14+
15+
private final IMqttClient mqttClient;
16+
17+
public PahoMqttClientWrapper(IMqttClient mqttClient) {
18+
this.mqttClient = mqttClient;
19+
}
20+
21+
@Override
22+
public void publish(String measures, byte[] payload) {
23+
try {
24+
this.mqttClient.publish(measures, new MqttMessage(payload));
25+
} catch (MqttException e) {
26+
throw new CouldNotSendMqttMessageException(e);
27+
}
28+
}
29+
30+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package com.dke.data.agrirouter.convenience.mqtt.hive;
2+
3+
import com.dke.data.agrirouter.api.dto.onboard.OnboardingResponse;
4+
import com.dke.data.agrirouter.api.dto.onboard.RouterDevice;
5+
import com.dke.data.agrirouter.api.env.Environment;
6+
import com.dke.data.agrirouter.api.exception.CouldNotCreateMqttClientException;
7+
import com.dke.data.agrirouter.impl.EnvironmentalService;
8+
import com.hivemq.client.mqtt.MqttClient;
9+
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
10+
import org.apache.commons.lang3.StringUtils;
11+
12+
import java.util.Objects;
13+
14+
/**
15+
* Service to create a MQTT client using the given onboarding response.
16+
*/
17+
@SuppressWarnings("unused")
18+
public class HiveMqttClientService extends EnvironmentalService {
19+
20+
/**
21+
* Constructor for an environmental service.
22+
*
23+
* @param environment -
24+
*/
25+
public HiveMqttClientService(Environment environment) {
26+
super(environment);
27+
}
28+
29+
/**
30+
* Creates a MQTT client using the given onboarding response. Communication relies on given root
31+
* certificates in an external keystore. The keystore with the root certificates is not created
32+
* locally.
33+
*
34+
* @param onboardingResponse -
35+
* @return -
36+
*/
37+
public Mqtt3AsyncClient create(OnboardingResponse onboardingResponse) {
38+
return this.createMqttClient(
39+
onboardingResponse.getConnectionCriteria().getHost(),
40+
onboardingResponse.getConnectionCriteria().getPort(),
41+
onboardingResponse.getConnectionCriteria().getClientId());
42+
}
43+
44+
/**
45+
* Creates a MQTT client using the given router Device. Communication relies on given root
46+
* certificates in an external keystore. The keystore with the root certificates is not created
47+
* locally.
48+
*
49+
* @param routerDevice -
50+
* @return -
51+
*/
52+
public Mqtt3AsyncClient create(RouterDevice routerDevice) {
53+
return this.createMqttClient(
54+
routerDevice.getConnectionCriteria().getHost(),
55+
String.valueOf(routerDevice.getConnectionCriteria().getPort()),
56+
routerDevice.getConnectionCriteria().getClientId());
57+
}
58+
59+
private Mqtt3AsyncClient createMqttClient(String host, String port, String clientId) {
60+
if (StringUtils.isAnyBlank(host, port, clientId)) {
61+
throw new CouldNotCreateMqttClientException(
62+
"Currently there are parameters missing. Did you onboard correctly - host, port or client id are missing.");
63+
}
64+
return MqttClient.builder()
65+
.useMqttVersion3()
66+
.identifier(Objects.requireNonNull(clientId))
67+
.serverHost(host)
68+
.serverPort(Integer.parseInt(port))
69+
.buildAsync();
70+
}
71+
}
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
package com.dke.data.agrirouter.impl.messaging;
22

3-
import org.eclipse.paho.client.mqttv3.IMqttClient;
3+
import com.dke.data.agrirouter.api.mqtt.MqttClientWrapper;
44

55
/**
66
* Base class which holds the MQTT client with the connection provided by the provider.
77
*/
88
public class MqttService {
99

10-
private final IMqttClient mqttClient;
10+
private final MqttClientWrapper mqttClientWrapper;
1111

12-
public MqttService(IMqttClient mqttClient) {
13-
this.mqttClient = mqttClient;
12+
public MqttService(MqttClientWrapper mqttClientWrapper) {
13+
this.mqttClientWrapper = mqttClientWrapper;
1414
}
1515

16-
protected IMqttClient getMqttClient() {
17-
return mqttClient;
16+
protected MqttClientWrapper getMqttClient() {
17+
return mqttClientWrapper;
1818
}
1919
}

agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/helper/mqtt/MessageQueryHelperService.java

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
package com.dke.data.agrirouter.impl.messaging.helper.mqtt;
22

33
import com.dke.data.agrirouter.api.enums.TechnicalMessageType;
4-
import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException;
4+
import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper;
5+
import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper;
56
import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService;
67
import com.dke.data.agrirouter.api.service.parameters.MessageQueryParameters;
78
import com.dke.data.agrirouter.api.service.parameters.SendMessageParameters;
89
import com.dke.data.agrirouter.impl.messaging.MessageEncoder;
910
import com.dke.data.agrirouter.impl.messaging.MqttService;
1011
import com.dke.data.agrirouter.impl.messaging.helper.QueryAllMessagesParameterCreator;
1112
import com.dke.data.agrirouter.impl.messaging.rest.MessageSender;
13+
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
1214
import org.eclipse.paho.client.mqttv3.IMqttClient;
13-
import org.eclipse.paho.client.mqttv3.MqttException;
14-
import org.eclipse.paho.client.mqttv3.MqttMessage;
1515

1616
import java.util.Collections;
1717
import java.util.Objects;
@@ -26,41 +26,44 @@ public MessageQueryHelperService(
2626
IMqttClient mqttClient,
2727
EncodeMessageService encodeMessageService,
2828
TechnicalMessageType technicalMessageType) {
29-
super(mqttClient);
30-
this.logMethodBegin();
29+
super(new PahoMqttClientWrapper(mqttClient));
30+
this.encodeMessageService = encodeMessageService;
31+
this.technicalMessageType = technicalMessageType;
32+
}
33+
34+
public MessageQueryHelperService(
35+
Mqtt3AsyncClient mqttClient,
36+
EncodeMessageService encodeMessageService,
37+
TechnicalMessageType technicalMessageType) {
38+
super(new HiveMqttClientWrapper(mqttClient));
3139
this.encodeMessageService = encodeMessageService;
3240
this.technicalMessageType = technicalMessageType;
33-
this.logMethodEnd();
3441
}
3542

3643
public String send(MessageQueryParameters parameters) {
3744
this.logMethodBegin(parameters);
3845

3946
this.getNativeLogger().trace("Validate parameters.");
4047
parameters.validate();
41-
try {
42-
this.getNativeLogger().trace("Encode message.");
43-
var encodedMessage = this.encode(this.technicalMessageType, parameters);
48+
this.getNativeLogger().trace("Encode message.");
49+
var encodedMessage = this.encode(this.technicalMessageType, parameters);
4450

45-
this.getNativeLogger().trace("Build message parameters.");
46-
var sendMessageParameters = new SendMessageParameters();
47-
sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse());
48-
sendMessageParameters.setEncodedMessages(
49-
Collections.singletonList(encodedMessage.getEncodedMessage()));
51+
this.getNativeLogger().trace("Build message parameters.");
52+
var sendMessageParameters = new SendMessageParameters();
53+
sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse());
54+
sendMessageParameters.setEncodedMessages(
55+
Collections.singletonList(encodedMessage.getEncodedMessage()));
5056

51-
this.getNativeLogger().trace("Send and fetch message response.");
52-
var messageAsJson = this.createMessageBody(sendMessageParameters);
53-
var payload = messageAsJson.getBytes();
54-
this.getMqttClient()
55-
.publish(
56-
Objects.requireNonNull(parameters.getOnboardingResponse())
57-
.getConnectionCriteria()
58-
.getMeasures(),
59-
new MqttMessage(payload));
60-
return encodedMessage.getApplicationMessageID();
61-
} catch (MqttException e) {
62-
throw new CouldNotSendMqttMessageException(e);
63-
}
57+
this.getNativeLogger().trace("Send and fetch message response.");
58+
var messageAsJson = this.createMessageBody(sendMessageParameters);
59+
var payload = messageAsJson.getBytes();
60+
this.getMqttClient()
61+
.publish(
62+
Objects.requireNonNull(parameters.getOnboardingResponse())
63+
.getConnectionCriteria()
64+
.getMeasures(),
65+
payload);
66+
return encodedMessage.getApplicationMessageID();
6467
}
6568

6669
@Override

agrirouter-sdk-java-impl/src/main/java/com/dke/data/agrirouter/impl/messaging/mqtt/CloudOffboardingServiceImpl.java

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package com.dke.data.agrirouter.impl.messaging.mqtt;
22

3-
import com.dke.data.agrirouter.api.exception.CouldNotSendMqttMessageException;
43
import com.dke.data.agrirouter.api.messaging.MqttAsyncMessageSendingResult;
4+
import com.dke.data.agrirouter.api.mqtt.HiveMqttClientWrapper;
5+
import com.dke.data.agrirouter.api.mqtt.PahoMqttClientWrapper;
56
import com.dke.data.agrirouter.api.service.messaging.encoding.EncodeMessageService;
67
import com.dke.data.agrirouter.api.service.messaging.mqtt.CloudOffboardingService;
78
import com.dke.data.agrirouter.api.service.parameters.CloudOffboardingParameters;
@@ -10,9 +11,8 @@
1011
import com.dke.data.agrirouter.impl.messaging.MessageEncoder;
1112
import com.dke.data.agrirouter.impl.messaging.MqttService;
1213
import com.dke.data.agrirouter.impl.messaging.encoding.EncodeMessageServiceImpl;
14+
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
1315
import org.eclipse.paho.client.mqttv3.IMqttClient;
14-
import org.eclipse.paho.client.mqttv3.MqttException;
15-
import org.eclipse.paho.client.mqttv3.MqttMessage;
1616

1717
import java.util.Collections;
1818
import java.util.Objects;
@@ -28,36 +28,35 @@ public class CloudOffboardingServiceImpl extends MqttService
2828
private final EncodeMessageService encodeMessageService = new EncodeMessageServiceImpl();
2929

3030
public CloudOffboardingServiceImpl(IMqttClient mqttClient) {
31-
super(mqttClient);
31+
super(new PahoMqttClientWrapper(mqttClient));
3232
}
3333

34-
/**
34+
public CloudOffboardingServiceImpl(Mqtt3AsyncClient mqttClient) {
35+
super(new HiveMqttClientWrapper(mqttClient));
36+
}
37+
38+
/**
3539
* Offboarding a virtual CU. Will deliver no result if the action was successful, if there's any
3640
* error an exception will be thrown.
3741
*
3842
* @param parameters Parameters for offboarding.
3943
*/
40-
@Override
44+
@Override
4145
public String send(CloudOffboardingParameters parameters) {
4246
parameters.validate();
43-
try {
44-
var encodedMessage = this.encode(parameters);
45-
var sendMessageParameters = new SendMessageParameters();
46-
sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse());
47-
sendMessageParameters.setEncodedMessages(
48-
Collections.singletonList(encodedMessage.getEncodedMessage()));
49-
var messageAsJson = this.createMessageBody(sendMessageParameters);
50-
var payload = messageAsJson.getBytes();
51-
this.getMqttClient()
52-
.publish(
53-
Objects.requireNonNull(parameters.getOnboardingResponse())
54-
.getConnectionCriteria()
55-
.getMeasures(),
56-
new MqttMessage(payload));
57-
return encodedMessage.getApplicationMessageID();
58-
} catch (MqttException e) {
59-
throw new CouldNotSendMqttMessageException(e);
60-
}
47+
var encodedMessage = this.encode(parameters);
48+
var sendMessageParameters = new SendMessageParameters();
49+
sendMessageParameters.setOnboardingResponse(parameters.getOnboardingResponse());
50+
sendMessageParameters.setEncodedMessages(
51+
Collections.singletonList(encodedMessage.getEncodedMessage()));
52+
var messageAsJson = this.createMessageBody(sendMessageParameters);
53+
var payload = messageAsJson.getBytes();
54+
this.getMqttClient()
55+
.publish(
56+
Objects.requireNonNull(parameters.getOnboardingResponse())
57+
.getConnectionCriteria()
58+
.getMeasures(), payload);
59+
return encodedMessage.getApplicationMessageID();
6160
}
6261

6362
@Override

0 commit comments

Comments
 (0)