Skip to content

Commit c351fda

Browse files
committed
feat: introduce DataFlowStatusMessage
1 parent af029e7 commit c351fda

9 files changed

Lines changed: 46 additions & 41 deletions

File tree

src/main/java/org/eclipse/dataplane/Dataplane.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* Contributors:
1111
* Think-it GmbH - initial API and implementation
1212
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - data flow properties
13+
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - introduce DataFlowStatusMessage
1314
*
1415
*/
1516

@@ -21,7 +22,7 @@
2122
import org.eclipse.dataplane.domain.controlplane.ControlPlane;
2223
import org.eclipse.dataplane.domain.dataflow.DataFlow;
2324
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
24-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
25+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
2526
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2627
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
2728
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
@@ -114,7 +115,7 @@ private Result<Void> checkControlPlane(String controlplaneId) {
114115
return Result.failure(new ControlPlaneNotRegistered(controlplaneId));
115116
}
116117

117-
public Result<DataFlowResponseMessage> prepare(String controlplaneId, DataFlowPrepareMessage message) {
118+
public Result<DataFlowStatusMessage> prepare(String controlplaneId, DataFlowPrepareMessage message) {
118119
var initialDataFlow = DataFlow.newInstance()
119120
.id(message.processId())
120121
.state(DataFlow.State.INITIATING)
@@ -137,19 +138,19 @@ public Result<DataFlowResponseMessage> prepare(String controlplaneId, DataFlowPr
137138
dataFlow.transitionToPrepared();
138139
}
139140

140-
DataFlowResponseMessage response;
141+
DataFlowStatusMessage response;
141142
if (dataFlow.isPrepared() && dataFlow.isPush()) {
142-
response = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), initialDataFlow.getState().name(), null);
143+
response = new DataFlowStatusMessage(id, dataFlow.getId(), initialDataFlow.getState().name(), dataFlow.getDataAddress(), null);
143144
} else {
144-
response = new DataFlowResponseMessage(id, null, initialDataFlow.getState().name(), null);
145+
response = new DataFlowStatusMessage(id, dataFlow.getId(), initialDataFlow.getState().name(), null, null);
145146
}
146147

147148
return save(dataFlow).map(it -> response);
148149
});
149150
}
150151

151152

152-
public Result<DataFlowResponseMessage> start(String controlplaneId, DataFlowStartMessage message) {
153+
public Result<DataFlowStatusMessage> start(String controlplaneId, DataFlowStartMessage message) {
153154
var initialDataFlow = DataFlow.newInstance()
154155
.id(message.processId())
155156
.state(DataFlow.State.INITIATING)
@@ -171,11 +172,11 @@ public Result<DataFlowResponseMessage> start(String controlplaneId, DataFlowStar
171172
dataFlow.transitionToStarted();
172173
}
173174

174-
DataFlowResponseMessage response;
175+
DataFlowStatusMessage response;
175176
if (dataFlow.isStarted() && dataFlow.isPull()) {
176-
response = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);
177+
response = new DataFlowStatusMessage(id, dataFlow.getId(), dataFlow.getState().name(), dataFlow.getDataAddress(), null);
177178
} else {
178-
response = new DataFlowResponseMessage(id, null, dataFlow.getState().name(), null);
179+
response = new DataFlowStatusMessage(id, dataFlow.getId(), dataFlow.getState().name(), null, null);
179180
}
180181
return save(dataFlow).map(it -> response);
181182
});
@@ -213,7 +214,7 @@ public Result<Void> notifyPrepared(String dataFlowId, OnPrepare onPrepare) {
213214
.compose(onPrepare::action)
214215
.compose(dataFlow -> {
215216
dataFlow.transitionToPrepared();
216-
var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);
217+
var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), dataFlow.getDataAddress(),null);
217218

218219
return notifyControlPlane("prepared", dataFlow, message);
219220

@@ -231,7 +232,7 @@ public Result<Void> notifyStarted(String dataFlowId, OnStart onStart) {
231232
.compose(dataFlow -> {
232233
dataFlow.transitionToStarted();
233234

234-
var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);
235+
var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null);
235236

236237
return notifyControlPlane("started", dataFlow, message);
237238

@@ -263,7 +264,9 @@ public Result<Void> notifyErrored(String dataFlowId, Throwable throwable) {
263264
.compose(dataFlow -> {
264265
dataFlow.transitionToTerminated(throwable.getMessage());
265266

266-
return notifyControlPlane("errored", dataFlow, emptyMap()); // TODO DataFlowErroredMessage not defined
267+
var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), null, throwable.getMessage());
268+
269+
return notifyControlPlane("errored", dataFlow, message);
267270
});
268271
}
269272

src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowResponseMessage.java renamed to src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusMessage.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,19 @@
99
*
1010
* Contributors:
1111
* Think-it GmbH - initial API and implementation
12+
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - introduce DataFlowStatusMessage
1213
*
1314
*/
1415

1516
package org.eclipse.dataplane.domain.dataflow;
1617

1718
import org.eclipse.dataplane.domain.DataAddress;
1819

19-
public record DataFlowResponseMessage(
20+
public record DataFlowStatusMessage(
2021
String dataplaneId,
21-
DataAddress dataAddress,
22+
String dataFlowId,
2223
String state,
24+
DataAddress dataAddress,
2325
String error
2426
) {
2527
}

src/test/java/org/eclipse/dataplane/ControlPlane.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import jakarta.ws.rs.core.Context;
2525
import org.eclipse.dataplane.domain.Result;
2626
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
27-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
27+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
2828
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2929
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
3030
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
@@ -122,7 +122,7 @@ public ControlPlaneController(DataplaneClient counterPart, Predicate<ContainerRe
122122
@POST
123123
@Path("/{transferId}/dataflow/prepared")
124124
@Consumes(WILDCARD)
125-
public void prepared(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowResponseMessage message) {
125+
public void prepared(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowStatusMessage message) {
126126
if (!authorizationValidation.test(context)) {
127127
throw new NotAuthorizedException("Not authorized");
128128
}
@@ -132,7 +132,7 @@ public void prepared(@PathParam("transferId") String transferId, @Context Contai
132132
@POST
133133
@Path("/{transferId}/dataflow/started")
134134
@Consumes(WILDCARD)
135-
public void started(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowResponseMessage message) {
135+
public void started(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowStatusMessage message) {
136136
if (!authorizationValidation.test(context)) {
137137
throw new NotAuthorizedException("Not authorized");
138138
}

src/test/java/org/eclipse/dataplane/scenario/AuthorizationOauth2Test.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.eclipse.dataplane.HttpServer;
3333
import org.eclipse.dataplane.domain.Result;
3434
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
35-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
35+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
3636
import org.eclipse.dataplane.domain.registration.AuthorizationProfile;
3737
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
3838
import org.eclipse.dataplane.domain.registration.Oauth2ClientCredentialsAuthorization;
@@ -112,7 +112,7 @@ void shouldCommunicateWithControlPlaneUsingOauth2Authorization() {
112112
var consumerProcessId = "consumer_" + processId;
113113
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);
114114

115-
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
115+
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);
116116

117117
var notifyPreparedResult = dataPlane.getById(consumerProcessId)
118118
.compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success));

src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.eclipse.dataplane.authorization.TestAuthorization;
2121
import org.eclipse.dataplane.domain.Result;
2222
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
23-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
23+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
2424
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
2525
import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed;
2626
import org.jspecify.annotations.NonNull;
@@ -81,7 +81,7 @@ void shouldCommunicateWithControlPlaneUsingRegisteredAuthorization() {
8181
var consumerProcessId = "consumer_" + UUID.randomUUID();
8282
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), "FileSystemAsync-PUSH");
8383

84-
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
84+
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);
8585

8686
var notifyPreparedResult = dataPlane.getById(consumerProcessId)
8787
.compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success));

src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.eclipse.dataplane.domain.Result;
2323
import org.eclipse.dataplane.domain.dataflow.DataFlow;
2424
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
25-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
25+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
2626
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2727
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
2828
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
@@ -83,13 +83,13 @@ void shouldPullDataFromProvider() {
8383
var consumerProcessId = "consumer_" + processId;
8484
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);
8585

86-
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
86+
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
8787
assertThat(prepareResponse.state()).isEqualTo(PREPARED.name());
8888
assertThat(prepareResponse.dataAddress()).isNull();
8989

9090
var providerProcessId = "provider_" + processId;
9191
var startMessage = createStartMessage(providerProcessId, transferType);
92-
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
92+
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
9393
assertThat(startResponse.state()).isEqualTo(STARTED.name());
9494
assertThat(startResponse.dataAddress()).isNotNull();
9595

@@ -106,11 +106,11 @@ void shouldPermitAsyncStartup() {
106106
var processId = UUID.randomUUID().toString();
107107
var consumerProcessId = "consumer_" + processId;
108108
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);
109-
controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
109+
controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
110110

111111
var providerProcessId = "provider_" + processId;
112112
var startMessage = createStartMessage(providerProcessId, transferType);
113-
var startResponse = controlPlane.providerStart(startMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
113+
var startResponse = controlPlane.providerStart(startMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);
114114
assertThat(startResponse.state()).isEqualTo(STARTING.name());
115115
assertThat(startResponse.dataAddress()).isNull();
116116

src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.eclipse.dataplane.domain.Result;
2323
import org.eclipse.dataplane.domain.dataflow.DataFlow;
2424
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
25-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
25+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
2626
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2727
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
2828
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
@@ -85,14 +85,14 @@ void shouldPushDataToEndpointPreparedByConsumer() {
8585
var consumerProcessId = "consumer_" + processId;
8686
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);
8787

88-
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
88+
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
8989
assertThat(prepareResponse.state()).isEqualTo(PREPARED.name());
9090
assertThat(prepareResponse.dataAddress()).isNotNull();
9191
var destinationDataAddress = prepareResponse.dataAddress();
9292

9393
var providerProcessId = "provider_" + processId;
9494
var startMessage = createStartMessage(providerProcessId, controlPlane.providerCallbackAddress(), transferType, destinationDataAddress);
95-
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
95+
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
9696

9797
assertThat(startResponse.state()).isEqualTo(STARTED.name());
9898
assertThat(startResponse.dataAddress()).isNull();
@@ -116,12 +116,12 @@ void shouldSendError_whenFlowFails() {
116116
var consumerProcessId = "consumer_" + processId;
117117
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);
118118

119-
controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
119+
controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
120120
var invalidDataAddress = new DataAddress("FileSystem", "", emptyList());
121121

122122
var providerProcessId = "provider_" + processId;
123123
var startMessage = createStartMessage(providerProcessId, controlPlane.providerCallbackAddress(), transferType, invalidDataAddress);
124-
controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
124+
controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
125125

126126
await().untilAsserted(() -> {
127127
var providerStatus = controlPlane.providerStatus(providerProcessId).statusCode(200).extract().as(DataFlowStatusResponseMessage.class);
@@ -139,7 +139,7 @@ void shouldPermitAsyncPreparation() {
139139
var consumerProcessId = "consumer_" + processId;
140140
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);
141141

142-
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
142+
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);
143143
assertThat(prepareResponse.state()).isEqualTo(PREPARING.name());
144144
assertThat(prepareResponse.dataAddress()).isNull();
145145

src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.eclipse.dataplane.domain.Result;
2323
import org.eclipse.dataplane.domain.dataflow.DataFlow;
2424
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
25-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
25+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
2626
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2727
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
2828
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
@@ -87,13 +87,13 @@ void shouldPullDataFromProvider_thenProviderTerminatesIt() {
8787
var consumerProcessId = "consumer_" + processId;
8888
var prepareMessage = prepareMessage(consumerProcessId, transferType);
8989

90-
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
90+
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
9191
assertThat(prepareResponse.state()).isEqualTo(PREPARED.name());
9292
assertThat(prepareResponse.dataAddress()).isNull();
9393

9494
var providerProcessId = "provider_" + processId;
9595
var startMessage = startMessage(providerProcessId, transferType);
96-
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
96+
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
9797
assertThat(startResponse.state()).isEqualTo(STARTED.name());
9898
assertThat(startResponse.dataAddress()).isNotNull();
9999

@@ -115,13 +115,13 @@ void shouldSuspendAndResumeOnProvider() {
115115
var consumerProcessId = "consumer_" + processId;
116116
var prepareMessage = prepareMessage(consumerProcessId, transferType);
117117

118-
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
118+
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
119119
assertThat(prepareResponse.state()).isEqualTo(PREPARED.name());
120120
assertThat(prepareResponse.dataAddress()).isNull();
121121

122122
var providerProcessId = "provider_" + processId;
123123
var startMessage = startMessage(providerProcessId, transferType);
124-
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
124+
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
125125
assertThat(startResponse.state()).isEqualTo(STARTED.name());
126126
assertThat(startResponse.dataAddress()).isNotNull();
127127

@@ -133,7 +133,7 @@ void shouldSuspendAndResumeOnProvider() {
133133

134134
consumerDataPlane.assertNoMoreDataIsTransferred();
135135

136-
var resumeResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
136+
var resumeResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
137137
controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(resumeResponse.dataAddress())).statusCode(200);
138138

139139
consumerDataPlane.assertDataIsFlowing();

0 commit comments

Comments
 (0)