Skip to content

Commit 9979d4e

Browse files
committed
feat: introduce DataFlowStatusMessage
1 parent 87a005e commit 9979d4e

9 files changed

Lines changed: 46 additions & 41 deletions

File tree

src/main/java/org/eclipse/dataplane/Dataplane.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* Contributors:
1111
* Think-it GmbH - initial API and implementation
1212
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - data flow properties
13+
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - introduce DataFlowStatusMessage
1314
*
1415
*/
1516

@@ -21,7 +22,7 @@
2122
import org.eclipse.dataplane.domain.controlplane.ControlPlane;
2223
import org.eclipse.dataplane.domain.dataflow.DataFlow;
2324
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
24-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
25+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
2526
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2627
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
2728
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
@@ -104,7 +105,7 @@ public Result<DataFlowStatusResponseMessage> status(String dataFlowId) {
104105
.map(f -> new DataFlowStatusResponseMessage(f.getId(), f.getState().name()));
105106
}
106107

107-
public Result<DataFlowResponseMessage> prepare(DataFlowPrepareMessage message) {
108+
public Result<DataFlowStatusMessage> prepare(DataFlowPrepareMessage message) {
108109
var initialDataFlow = DataFlow.newInstance()
109110
.id(message.processId())
110111
.state(DataFlow.State.INITIATING)
@@ -125,19 +126,19 @@ public Result<DataFlowResponseMessage> prepare(DataFlowPrepareMessage message) {
125126
dataFlow.transitionToPrepared();
126127
}
127128

128-
DataFlowResponseMessage response;
129+
DataFlowStatusMessage response;
129130
if (dataFlow.isPrepared() && dataFlow.isPush()) {
130-
response = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), initialDataFlow.getState().name(), null);
131+
response = new DataFlowStatusMessage(id, dataFlow.getId(), initialDataFlow.getState().name(), dataFlow.getDataAddress(), null);
131132
} else {
132-
response = new DataFlowResponseMessage(id, null, initialDataFlow.getState().name(), null);
133+
response = new DataFlowStatusMessage(id, dataFlow.getId(), initialDataFlow.getState().name(), null, null);
133134
}
134135

135136
return save(dataFlow).map(it -> response);
136137
});
137138
}
138139

139140

140-
public Result<DataFlowResponseMessage> start(DataFlowStartMessage message) {
141+
public Result<DataFlowStatusMessage> start(DataFlowStartMessage message) {
141142
var initialDataFlow = DataFlow.newInstance()
142143
.id(message.processId())
143144
.state(DataFlow.State.INITIATING)
@@ -157,11 +158,11 @@ public Result<DataFlowResponseMessage> start(DataFlowStartMessage message) {
157158
dataFlow.transitionToStarted();
158159
}
159160

160-
DataFlowResponseMessage response;
161+
DataFlowStatusMessage response;
161162
if (dataFlow.isStarted() && dataFlow.isPull()) {
162-
response = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);
163+
response = new DataFlowStatusMessage(id, dataFlow.getId(), dataFlow.getState().name(), dataFlow.getDataAddress(), null);
163164
} else {
164-
response = new DataFlowResponseMessage(id, null, dataFlow.getState().name(), null);
165+
response = new DataFlowStatusMessage(id, dataFlow.getId(), dataFlow.getState().name(), null, null);
165166
}
166167
return save(dataFlow).map(it -> response);
167168
});
@@ -199,7 +200,7 @@ public Result<Void> notifyPrepared(String dataFlowId, OnPrepare onPrepare) {
199200
.compose(onPrepare::action)
200201
.compose(dataFlow -> {
201202
dataFlow.transitionToPrepared();
202-
var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);
203+
var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), dataFlow.getDataAddress(),null);
203204

204205
return notifyControlPlane("prepared", dataFlow, message);
205206

@@ -217,7 +218,7 @@ public Result<Void> notifyStarted(String dataFlowId, OnStart onStart) {
217218
.compose(dataFlow -> {
218219
dataFlow.transitionToStarted();
219220

220-
var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);
221+
var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null);
221222

222223
return notifyControlPlane("started", dataFlow, message);
223224

@@ -249,7 +250,9 @@ public Result<Void> notifyErrored(String dataFlowId, Throwable throwable) {
249250
.compose(dataFlow -> {
250251
dataFlow.transitionToTerminated(throwable.getMessage());
251252

252-
return notifyControlPlane("errored", dataFlow, emptyMap()); // TODO DataFlowErroredMessage not defined
253+
var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), null, throwable.getMessage());
254+
255+
return notifyControlPlane("errored", dataFlow, message);
253256
});
254257
}
255258

src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowResponseMessage.java renamed to src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusMessage.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,19 @@
99
*
1010
* Contributors:
1111
* Think-it GmbH - initial API and implementation
12+
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - introduce DataFlowStatusMessage
1213
*
1314
*/
1415

1516
package org.eclipse.dataplane.domain.dataflow;
1617

1718
import org.eclipse.dataplane.domain.DataAddress;
1819

19-
public record DataFlowResponseMessage(
20+
public record DataFlowStatusMessage(
2021
String dataplaneId,
21-
DataAddress dataAddress,
22+
String dataFlowId,
2223
String state,
24+
DataAddress dataAddress,
2325
String error
2426
) {
2527
}

src/test/java/org/eclipse/dataplane/ControlPlane.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import jakarta.ws.rs.container.ContainerRequestContext;
2424
import jakarta.ws.rs.core.Context;
2525
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
26-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
26+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
2727
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2828
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
2929
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
@@ -120,7 +120,7 @@ public ControlPlaneController(DataplaneClient counterPart, Predicate<ContainerRe
120120
@POST
121121
@Path("/{transferId}/dataflow/prepared")
122122
@Consumes(WILDCARD)
123-
public void prepared(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowResponseMessage message) {
123+
public void prepared(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowStatusMessage message) {
124124
if (!authorizationValidation.test(context)) {
125125
throw new NotAuthorizedException("Not authorized");
126126
}
@@ -130,7 +130,7 @@ public void prepared(@PathParam("transferId") String transferId, @Context Contai
130130
@POST
131131
@Path("/{transferId}/dataflow/started")
132132
@Consumes(WILDCARD)
133-
public void started(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowResponseMessage message) {
133+
public void started(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowStatusMessage message) {
134134
if (!authorizationValidation.test(context)) {
135135
throw new NotAuthorizedException("Not authorized");
136136
}

src/test/java/org/eclipse/dataplane/scenario/AuthorizationOauth2Test.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.eclipse.dataplane.HttpServer;
3333
import org.eclipse.dataplane.domain.Result;
3434
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
35-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
35+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
3636
import org.eclipse.dataplane.domain.registration.AuthorizationProfile;
3737
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
3838
import org.eclipse.dataplane.domain.registration.Oauth2ClientCredentialsAuthorization;
@@ -107,7 +107,7 @@ void shouldCommunicateWithControlPlaneUsingOauth2Authorization() {
107107
var consumerProcessId = "consumer_" + processId;
108108
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);
109109

110-
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
110+
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);
111111

112112
var notifyPreparedResult = dataPlane.getById(consumerProcessId)
113113
.compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success));

src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.eclipse.dataplane.HttpServer;
2020
import org.eclipse.dataplane.domain.Result;
2121
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
22-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
22+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
2323
import org.eclipse.dataplane.domain.registration.Authorization;
2424
import org.eclipse.dataplane.domain.registration.AuthorizationProfile;
2525
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
@@ -82,7 +82,7 @@ void shouldCommunicateWithControlPlaneUsingRegisteredAuthorization() {
8282
var consumerProcessId = "consumer_" + processId;
8383
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);
8484

85-
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
85+
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);
8686

8787
var notifyPreparedResult = dataPlane.getById(consumerProcessId)
8888
.compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success));

src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.eclipse.dataplane.domain.Result;
2222
import org.eclipse.dataplane.domain.dataflow.DataFlow;
2323
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
24-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
24+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
2525
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2626
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
2727
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
@@ -75,13 +75,13 @@ void shouldPullDataFromProvider() {
7575
var consumerProcessId = "consumer_" + processId;
7676
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);
7777

78-
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
78+
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
7979
assertThat(prepareResponse.state()).isEqualTo(PREPARED.name());
8080
assertThat(prepareResponse.dataAddress()).isNull();
8181

8282
var providerProcessId = "provider_" + processId;
8383
var startMessage = createStartMessage(providerProcessId, transferType);
84-
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
84+
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
8585
assertThat(startResponse.state()).isEqualTo(STARTED.name());
8686
assertThat(startResponse.dataAddress()).isNotNull();
8787

@@ -98,11 +98,11 @@ void shouldPermitAsyncStartup() {
9898
var processId = UUID.randomUUID().toString();
9999
var consumerProcessId = "consumer_" + processId;
100100
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);
101-
controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
101+
controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
102102

103103
var providerProcessId = "provider_" + processId;
104104
var startMessage = createStartMessage(providerProcessId, transferType);
105-
var startResponse = controlPlane.providerStart(startMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
105+
var startResponse = controlPlane.providerStart(startMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);
106106
assertThat(startResponse.state()).isEqualTo(STARTING.name());
107107
assertThat(startResponse.dataAddress()).isNull();
108108

src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.eclipse.dataplane.domain.Result;
2222
import org.eclipse.dataplane.domain.dataflow.DataFlow;
2323
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
24-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
24+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
2525
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2626
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
2727
import org.jspecify.annotations.NonNull;
@@ -77,14 +77,14 @@ void shouldPushDataToEndpointPreparedByConsumer() {
7777
var consumerProcessId = "consumer_" + processId;
7878
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);
7979

80-
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
80+
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
8181
assertThat(prepareResponse.state()).isEqualTo(PREPARED.name());
8282
assertThat(prepareResponse.dataAddress()).isNotNull();
8383
var destinationDataAddress = prepareResponse.dataAddress();
8484

8585
var providerProcessId = "provider_" + processId;
8686
var startMessage = createStartMessage(providerProcessId, controlPlane.providerCallbackAddress(), transferType, destinationDataAddress);
87-
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
87+
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
8888

8989
assertThat(startResponse.state()).isEqualTo(STARTED.name());
9090
assertThat(startResponse.dataAddress()).isNull();
@@ -108,12 +108,12 @@ void shouldSendError_whenFlowFails() {
108108
var consumerProcessId = "consumer_" + processId;
109109
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);
110110

111-
controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
111+
controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
112112
var invalidDataAddress = new DataAddress("FileSystem", "", emptyList());
113113

114114
var providerProcessId = "provider_" + processId;
115115
var startMessage = createStartMessage(providerProcessId, controlPlane.providerCallbackAddress(), transferType, invalidDataAddress);
116-
controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
116+
controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
117117

118118
await().untilAsserted(() -> {
119119
var providerStatus = controlPlane.providerStatus(providerProcessId).statusCode(200).extract().as(DataFlowStatusResponseMessage.class);
@@ -131,7 +131,7 @@ void shouldPermitAsyncPreparation() {
131131
var consumerProcessId = "consumer_" + processId;
132132
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);
133133

134-
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
134+
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);
135135
assertThat(prepareResponse.state()).isEqualTo(PREPARING.name());
136136
assertThat(prepareResponse.dataAddress()).isNull();
137137

src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.eclipse.dataplane.domain.Result;
2222
import org.eclipse.dataplane.domain.dataflow.DataFlow;
2323
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
24-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
24+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
2525
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2626
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
2727
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
@@ -79,13 +79,13 @@ void shouldPullDataFromProvider_thenProviderTerminatesIt() {
7979
var consumerProcessId = "consumer_" + processId;
8080
var prepareMessage = prepareMessage(consumerProcessId, transferType);
8181

82-
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
82+
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
8383
assertThat(prepareResponse.state()).isEqualTo(PREPARED.name());
8484
assertThat(prepareResponse.dataAddress()).isNull();
8585

8686
var providerProcessId = "provider_" + processId;
8787
var startMessage = startMessage(providerProcessId, transferType);
88-
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
88+
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
8989
assertThat(startResponse.state()).isEqualTo(STARTED.name());
9090
assertThat(startResponse.dataAddress()).isNotNull();
9191

@@ -107,13 +107,13 @@ void shouldSuspendAndResumeOnProvider() {
107107
var consumerProcessId = "consumer_" + processId;
108108
var prepareMessage = prepareMessage(consumerProcessId, transferType);
109109

110-
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
110+
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
111111
assertThat(prepareResponse.state()).isEqualTo(PREPARED.name());
112112
assertThat(prepareResponse.dataAddress()).isNull();
113113

114114
var providerProcessId = "provider_" + processId;
115115
var startMessage = startMessage(providerProcessId, transferType);
116-
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
116+
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
117117
assertThat(startResponse.state()).isEqualTo(STARTED.name());
118118
assertThat(startResponse.dataAddress()).isNotNull();
119119

@@ -125,7 +125,7 @@ void shouldSuspendAndResumeOnProvider() {
125125

126126
consumerDataPlane.assertNoMoreDataIsTransferred();
127127

128-
var resumeResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
128+
var resumeResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
129129
controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(resumeResponse.dataAddress())).statusCode(200);
130130

131131
consumerDataPlane.assertDataIsFlowing();

0 commit comments

Comments
 (0)