Skip to content

Commit 44cdfc5

Browse files
authored
feat: data flow status message (#60)
1 parent db40972 commit 44cdfc5

11 files changed

Lines changed: 87 additions & 46 deletions

File tree

src/main/java/org/eclipse/dataplane/Dataplane.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* Contributors:
1111
* Think-it GmbH - initial API and implementation
1212
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - data flow properties
13+
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - introduce DataFlowStatusMessage
1314
*
1415
*/
1516

@@ -21,9 +22,9 @@
2122
import org.eclipse.dataplane.domain.controlplane.ControlPlane;
2223
import org.eclipse.dataplane.domain.dataflow.DataFlow;
2324
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
24-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
2525
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2626
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
27+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
2728
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
2829
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
2930
import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage;
@@ -114,7 +115,7 @@ private Result<Void> checkControlPlane(String controlplaneId) {
114115
return Result.failure(new ControlPlaneNotRegistered(controlplaneId));
115116
}
116117

117-
public Result<DataFlowResponseMessage> prepare(String controlplaneId, DataFlowPrepareMessage message) {
118+
public Result<DataFlowStatusMessage> prepare(String controlplaneId, DataFlowPrepareMessage message) {
118119
var initialDataFlow = DataFlow.newInstance()
119120
.id(message.processId())
120121
.state(DataFlow.State.INITIATING)
@@ -137,19 +138,19 @@ public Result<DataFlowResponseMessage> prepare(String controlplaneId, DataFlowPr
137138
dataFlow.transitionToPrepared();
138139
}
139140

140-
DataFlowResponseMessage response;
141+
DataFlowStatusMessage response;
141142
if (dataFlow.isPrepared() && dataFlow.isPush()) {
142-
response = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), initialDataFlow.getState().name(), null);
143+
response = new DataFlowStatusMessage(id, dataFlow.getId(), initialDataFlow.getState().name(), dataFlow.getDataAddress(), null);
143144
} else {
144-
response = new DataFlowResponseMessage(id, null, initialDataFlow.getState().name(), null);
145+
response = new DataFlowStatusMessage(id, dataFlow.getId(), initialDataFlow.getState().name(), null, null);
145146
}
146147

147148
return save(dataFlow).map(it -> response);
148149
});
149150
}
150151

151152

152-
public Result<DataFlowResponseMessage> start(String controlplaneId, DataFlowStartMessage message) {
153+
public Result<DataFlowStatusMessage> start(String controlplaneId, DataFlowStartMessage message) {
153154
var initialDataFlow = DataFlow.newInstance()
154155
.id(message.processId())
155156
.state(DataFlow.State.INITIATING)
@@ -171,11 +172,11 @@ public Result<DataFlowResponseMessage> start(String controlplaneId, DataFlowStar
171172
dataFlow.transitionToStarted();
172173
}
173174

174-
DataFlowResponseMessage response;
175+
DataFlowStatusMessage response;
175176
if (dataFlow.isStarted() && dataFlow.isPull()) {
176-
response = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);
177+
response = new DataFlowStatusMessage(id, dataFlow.getId(), dataFlow.getState().name(), dataFlow.getDataAddress(), null);
177178
} else {
178-
response = new DataFlowResponseMessage(id, null, dataFlow.getState().name(), null);
179+
response = new DataFlowStatusMessage(id, dataFlow.getId(), dataFlow.getState().name(), null, null);
179180
}
180181
return save(dataFlow).map(it -> response);
181182
});
@@ -213,7 +214,7 @@ public Result<Void> notifyPrepared(String dataFlowId, OnPrepare onPrepare) {
213214
.compose(onPrepare::action)
214215
.compose(dataFlow -> {
215216
dataFlow.transitionToPrepared();
216-
var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);
217+
var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null);
217218

218219
return notifyControlPlane("prepared", dataFlow, message);
219220

@@ -231,7 +232,7 @@ public Result<Void> notifyStarted(String dataFlowId, OnStart onStart) {
231232
.compose(dataFlow -> {
232233
dataFlow.transitionToStarted();
233234

234-
var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);
235+
var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null);
235236

236237
return notifyControlPlane("started", dataFlow, message);
237238

@@ -263,7 +264,9 @@ public Result<Void> notifyErrored(String dataFlowId, Throwable throwable) {
263264
.compose(dataFlow -> {
264265
dataFlow.transitionToTerminated(throwable.getMessage());
265266

266-
return notifyControlPlane("errored", dataFlow, emptyMap()); // TODO DataFlowErroredMessage not defined
267+
var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), null, throwable.getMessage());
268+
269+
return notifyControlPlane("errored", dataFlow, message);
267270
});
268271
}
269272

src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.Map;
2323
import java.util.Objects;
2424

25-
// TODO: could it store the messages?
2625
public class DataFlow {
2726

2827
private String id;

src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowResponseMessage.java renamed to src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusMessage.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,19 @@
99
*
1010
* Contributors:
1111
* Think-it GmbH - initial API and implementation
12+
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - introduce DataFlowStatusMessage
1213
*
1314
*/
1415

1516
package org.eclipse.dataplane.domain.dataflow;
1617

1718
import org.eclipse.dataplane.domain.DataAddress;
1819

19-
public record DataFlowResponseMessage(
20+
public record DataFlowStatusMessage(
2021
String dataplaneId,
21-
DataAddress dataAddress,
22+
String dataFlowId,
2223
String state,
24+
DataAddress dataAddress,
2325
String error
2426
) {
2527
}

src/test/java/org/eclipse/dataplane/ControlPlane.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
import jakarta.ws.rs.core.Context;
2525
import org.eclipse.dataplane.domain.Result;
2626
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
27-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
2827
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2928
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
29+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
3030
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
3131
import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage;
3232

@@ -122,7 +122,7 @@ public ControlPlaneController(DataplaneClient counterPart, Predicate<ContainerRe
122122
@POST
123123
@Path("/{transferId}/dataflow/prepared")
124124
@Consumes(WILDCARD)
125-
public void prepared(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowResponseMessage message) {
125+
public void prepared(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowStatusMessage message) {
126126
if (!authorizationValidation.test(context)) {
127127
throw new NotAuthorizedException("Not authorized");
128128
}
@@ -132,7 +132,7 @@ public void prepared(@PathParam("transferId") String transferId, @Context Contai
132132
@POST
133133
@Path("/{transferId}/dataflow/started")
134134
@Consumes(WILDCARD)
135-
public void started(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowResponseMessage message) {
135+
public void started(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowStatusMessage message) {
136136
if (!authorizationValidation.test(context)) {
137137
throw new NotAuthorizedException("Not authorized");
138138
}

src/test/java/org/eclipse/dataplane/DataplaneTest.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
*
1010
* Contributors:
1111
* Think-it GmbH - initial API and implementation
12+
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - introduce DataFlowStatusMessage
1213
*
1314
*/
1415

@@ -30,6 +31,7 @@
3031
import java.net.URI;
3132

3233
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
34+
import static com.github.tomakehurst.wiremock.client.WireMock.absent;
3335
import static com.github.tomakehurst.wiremock.client.WireMock.and;
3436
import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
3537
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
@@ -43,6 +45,7 @@
4345
import static org.assertj.core.api.Assertions.assertThat;
4446
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4547
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.COMPLETED;
48+
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.TERMINATED;
4649

4750
class DataplaneTest {
4851

@@ -111,12 +114,42 @@ void shouldTransitionToCompleted_whenControlPlaneRespondCorrectly() {
111114
assertThat(result.succeeded()).isTrue();
112115
assertThat(dataplane.status("dataFlowId").getContent().state()).isEqualTo(COMPLETED.name());
113116
}
117+
}
118+
119+
@Nested
120+
class NotifyErrored {
121+
@Test
122+
void shouldFail_whenDataFlowDoesNotExist() {
123+
var dataplane = Dataplane.newInstance().build();
114124

115-
private DataFlowPrepareMessage createPrepareMessage() {
116-
return new DataFlowPrepareMessage("any", "any", "any", "any", "dataFlowId", "any", "any",
117-
URI.create(controlPlane.baseUrl()), "Something-PUSH", emptyList(), emptyMap());
125+
var result = dataplane.notifyErrored("dataFlowId", new RuntimeException("some-error"));
126+
127+
assertThat(result.failed()).isTrue();
128+
assertThatThrownBy(result::orElseThrow).isExactlyInstanceOf(ResourceNotFoundException.class);
118129
}
119130

131+
@Test
132+
void shouldSendDataFlowStatusMessage_whenDataFlowIsErrored() {
133+
controlPlane.stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200)));
134+
var dataplane = Dataplane.newInstance().id("dataplane-id").onPrepare(Result::success).build();
135+
dataplane.registerControlPlane(new ControlPlaneRegistrationMessage("controlplaneId", URI.create("http://localhost/any")));
136+
dataplane.prepare("controlplaneId", createPrepareMessage());
137+
138+
var result = dataplane.notifyErrored("dataFlowId", new RuntimeException("some-error"));
139+
140+
assertThat(result.succeeded()).isTrue();
141+
assertThat(dataplane.status("dataFlowId").getContent().state()).isEqualTo(TERMINATED.name());
142+
143+
controlPlane.verify(postRequestedFor(urlPathEqualTo("/transfers/dataFlowId/dataflow/errored"))
144+
.withRequestBody(and(
145+
matchingJsonPath("dataplaneId", equalTo("dataplane-id")),
146+
matchingJsonPath("dataFlowId", equalTo("dataFlowId")),
147+
matchingJsonPath("state", equalTo("TERMINATED")),
148+
matchingJsonPath("dataAddress", absent()),
149+
matchingJsonPath("error", equalTo("some-error"))
150+
))
151+
);
152+
}
120153
}
121154

122155
@Nested
@@ -164,4 +197,8 @@ void shouldFail_whenStatusIsNot200() {
164197
}
165198
}
166199

200+
private DataFlowPrepareMessage createPrepareMessage() {
201+
return new DataFlowPrepareMessage("any", "any", "any", "any", "dataFlowId", "any", "any",
202+
URI.create(controlPlane.baseUrl()), "Something-PUSH", emptyList(), emptyMap());
203+
}
167204
}

src/test/java/org/eclipse/dataplane/scenario/AuthorizationOauth2Test.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.eclipse.dataplane.HttpServer;
3333
import org.eclipse.dataplane.domain.Result;
3434
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
35-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
35+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
3636
import org.eclipse.dataplane.domain.registration.AuthorizationProfile;
3737
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
3838
import org.eclipse.dataplane.domain.registration.Oauth2ClientCredentialsAuthorization;
@@ -112,7 +112,7 @@ void shouldCommunicateWithControlPlaneUsingOauth2Authorization() {
112112
var consumerProcessId = "consumer_" + processId;
113113
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);
114114

115-
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
115+
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);
116116

117117
var notifyPreparedResult = dataPlane.getById(consumerProcessId)
118118
.compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success));

src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.eclipse.dataplane.authorization.TestAuthorization;
2121
import org.eclipse.dataplane.domain.Result;
2222
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
23-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
23+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
2424
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
2525
import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed;
2626
import org.jspecify.annotations.NonNull;
@@ -81,7 +81,7 @@ void shouldCommunicateWithControlPlaneUsingRegisteredAuthorization() {
8181
var consumerProcessId = "consumer_" + UUID.randomUUID();
8282
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), "FileSystemAsync-PUSH");
8383

84-
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
84+
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);
8585

8686
var notifyPreparedResult = dataPlane.getById(consumerProcessId)
8787
.compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success));
@@ -120,7 +120,7 @@ void shouldGetUnauthorized_withDataPlaneIsNotAuthenticated() {
120120
var consumerProcessId = "consumer_" + UUID.randomUUID();
121121
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), "FileSystemAsync-PUSH");
122122

123-
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
123+
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);
124124

125125
var notifyPreparedResult = dataPlane.getById(consumerProcessId)
126126
.compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success));

src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import org.eclipse.dataplane.domain.Result;
2323
import org.eclipse.dataplane.domain.dataflow.DataFlow;
2424
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
25-
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
2625
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2726
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
27+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
2828
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
2929
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
3030
import org.jspecify.annotations.NonNull;
@@ -83,13 +83,13 @@ void shouldPullDataFromProvider() {
8383
var consumerProcessId = "consumer_" + processId;
8484
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);
8585

86-
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
86+
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
8787
assertThat(prepareResponse.state()).isEqualTo(PREPARED.name());
8888
assertThat(prepareResponse.dataAddress()).isNull();
8989

9090
var providerProcessId = "provider_" + processId;
9191
var startMessage = createStartMessage(providerProcessId, transferType);
92-
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
92+
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
9393
assertThat(startResponse.state()).isEqualTo(STARTED.name());
9494
assertThat(startResponse.dataAddress()).isNotNull();
9595

@@ -106,11 +106,11 @@ void shouldPermitAsyncStartup() {
106106
var processId = UUID.randomUUID().toString();
107107
var consumerProcessId = "consumer_" + processId;
108108
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);
109-
controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
109+
controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
110110

111111
var providerProcessId = "provider_" + processId;
112112
var startMessage = createStartMessage(providerProcessId, transferType);
113-
var startResponse = controlPlane.providerStart(startMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
113+
var startResponse = controlPlane.providerStart(startMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);
114114
assertThat(startResponse.state()).isEqualTo(STARTING.name());
115115
assertThat(startResponse.dataAddress()).isNull();
116116

0 commit comments

Comments
 (0)