Skip to content

Commit bce0980

Browse files
authored
feat: permit async startup (#29)
1 parent 67f908a commit bce0980

4 files changed

Lines changed: 136 additions & 56 deletions

File tree

src/main/java/org/eclipse/dataplane/Dataplane.java

Lines changed: 54 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,13 @@
4747
import java.util.Set;
4848
import java.util.UUID;
4949

50+
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
5051
import static java.util.Collections.emptyMap;
5152

5253
public class Dataplane {
5354

54-
private final DataFlowStore store = new InMemoryDataFlowStore();
55+
private final ObjectMapper objectMapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
56+
private final DataFlowStore store = new InMemoryDataFlowStore(objectMapper);
5557
private String id;
5658
private String name;
5759
private String description;
@@ -67,7 +69,6 @@ public class Dataplane {
6769
private OnCompleted onCompleted = dataFlow -> Result.failure(new UnsupportedOperationException("onCompleted is not implemented"));
6870

6971
private final HttpClient httpClient = HttpClient.newHttpClient();
70-
private final ObjectMapper objectMapper = new ObjectMapper();
7172

7273
public static Builder newInstance() {
7374
return new Builder();
@@ -184,22 +185,31 @@ public Result<Void> notifyPrepared(String dataFlowId, OnPrepare onPrepare) {
184185
return store.findById(dataFlowId)
185186
.compose(onPrepare::action)
186187
.compose(dataFlow -> {
188+
dataFlow.transitionToPrepared();
187189
var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);
188190

189-
return notifyControlPlane("prepared", dataFlow, message)
190-
.compose(response -> {
191-
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
192-
if (successful) {
193-
dataFlow.transitionToPrepared();
194-
return save(dataFlow);
195-
}
196-
197-
return Result.failure(new DataFlowNotifyControlPlaneFailed("prepared", response));
198-
});
191+
return notifyControlPlane("prepared", dataFlow, message);
199192

200193
});
201194
}
202195

196+
/**
197+
* Notify the control plane that the data flow has been started.
198+
*
199+
* @param dataFlowId the data flow id.
200+
*/
201+
public Result<Void> notifyStarted(String dataFlowId, OnStart onStart) {
202+
return store.findById(dataFlowId)
203+
.compose(onStart::action)
204+
.compose(dataFlow -> {
205+
dataFlow.transitionToStarted();
206+
207+
var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);
208+
209+
return notifyControlPlane("started", dataFlow, message);
210+
211+
});
212+
}
203213

204214
/**
205215
* Notify the control plane that the data flow has been completed.
@@ -208,36 +218,26 @@ public Result<Void> notifyPrepared(String dataFlowId, OnPrepare onPrepare) {
208218
*/
209219
public Result<Void> notifyCompleted(String dataFlowId) {
210220
return store.findById(dataFlowId)
211-
.compose(dataFlow -> notifyControlPlane("completed", dataFlow, emptyMap()) // TODO DataFlowCompletedMessage not defined
212-
.compose(response -> {
213-
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
214-
if (successful) {
215-
dataFlow.transitionToCompleted();
216-
return save(dataFlow);
217-
}
218-
219-
return Result.failure(new DataFlowNotifyControlPlaneFailed("completed", response));
220-
}));
221+
.compose(dataFlow -> {
222+
dataFlow.transitionToCompleted();
223+
224+
return notifyControlPlane("completed", dataFlow, emptyMap()); // TODO DataFlowCompletedMessage not defined
225+
});
221226
}
222227

223228
/**
224229
* Notify the control plane that the data flow failed for some reason
225230
*
226231
* @param dataFlowId id of the data flow
227-
* @param throwable the error
232+
* @param throwable the error
228233
*/
229234
public Result<Void> notifyErrored(String dataFlowId, Throwable throwable) {
230235
return store.findById(dataFlowId)
231-
.compose(dataFlow -> notifyControlPlane("errored", dataFlow, emptyMap()) // TODO DataFlowErroredMessage not defined
232-
.compose(response -> {
233-
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
234-
if (successful) {
235-
dataFlow.transitionToTerminated(throwable.getMessage());
236-
return save(dataFlow);
237-
}
238-
239-
return Result.failure(new DataFlowNotifyControlPlaneFailed("errored", response));
240-
}));
236+
.compose(dataFlow -> {
237+
dataFlow.transitionToTerminated(throwable.getMessage());
238+
239+
return notifyControlPlane("errored", dataFlow, emptyMap()); // TODO DataFlowErroredMessage not defined
240+
});
241241
}
242242

243243
public Result<Void> started(String flowId, DataFlowStartedNotificationMessage startedNotificationMessage) {
@@ -256,7 +256,7 @@ public Result<Void> started(String flowId, DataFlowStartedNotificationMessage st
256256
/**
257257
* Received notification that the flow has been completed
258258
*
259-
* @param flowId id of the data flow
259+
* @param flowId id of the data flow
260260
* @return result indicating whether data flow was completed successfully
261261
*/
262262
public Result<Void> completed(String flowId) {
@@ -288,17 +288,26 @@ public Result<Void> registerOn(String controlPlaneEndpoint) {
288288
});
289289
}
290290

291-
private Result<HttpResponse<Void>> notifyControlPlane(String action, DataFlow dataFlow, Object message) {
292-
return toJson(message).map(body -> {
293-
var endpoint = dataFlow.callbackEndpointFor(action);
294-
var request = HttpRequest.newBuilder()
295-
.uri(URI.create(endpoint))
296-
.header("content-type", "application/json")
297-
.POST(HttpRequest.BodyPublishers.ofString(body))
298-
.build();
299-
300-
return httpClient.send(request, HttpResponse.BodyHandlers.discarding());
301-
});
291+
private Result<Void> notifyControlPlane(String action, DataFlow dataFlow, Object message) {
292+
return toJson(message)
293+
.map(body -> {
294+
var endpoint = dataFlow.callbackEndpointFor(action);
295+
var request = HttpRequest.newBuilder()
296+
.uri(URI.create(endpoint))
297+
.header("content-type", "application/json")
298+
.POST(HttpRequest.BodyPublishers.ofString(body))
299+
.build();
300+
301+
return httpClient.send(request, HttpResponse.BodyHandlers.discarding());
302+
})
303+
.compose(response -> {
304+
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
305+
if (successful) {
306+
return save(dataFlow);
307+
}
308+
309+
return Result.failure(new DataFlowNotifyControlPlaneFailed(action, response));
310+
});
302311
}
303312

304313
private Result<String> toJson(Object message) {

src/main/java/org/eclipse/dataplane/port/store/InMemoryDataFlowStore.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package org.eclipse.dataplane.port.store;
1616

17+
import com.fasterxml.jackson.core.JsonProcessingException;
18+
import com.fasterxml.jackson.databind.ObjectMapper;
1719
import org.eclipse.dataplane.domain.Result;
1820
import org.eclipse.dataplane.domain.dataflow.DataFlow;
1921
import org.eclipse.dataplane.port.exception.DataFlowNotFoundException;
@@ -23,12 +25,21 @@
2325

2426
public class InMemoryDataFlowStore implements DataFlowStore {
2527

26-
private final Map<String, DataFlow> store = new HashMap<>();
28+
private final Map<String, String> store = new HashMap<>();
29+
private final ObjectMapper objectMapper;
30+
31+
public InMemoryDataFlowStore(ObjectMapper objectMapper) {
32+
this.objectMapper = objectMapper;
33+
}
2734

2835
@Override
2936
public Result<Void> save(DataFlow dataFlow) {
30-
store.put(dataFlow.getId(), dataFlow);
31-
return Result.success();
37+
try {
38+
store.put(dataFlow.getId(), objectMapper.writeValueAsString(dataFlow));
39+
return Result.success();
40+
} catch (JsonProcessingException e) {
41+
return Result.failure(e);
42+
}
3243
}
3344

3445
@Override
@@ -37,6 +48,12 @@ public Result<DataFlow> findById(String flowId) {
3748
if (dataFlow == null) {
3849
return Result.failure(new DataFlowNotFoundException("DataFlow %s not found".formatted(flowId)));
3950
}
40-
return Result.success(dataFlow);
51+
52+
try {
53+
var deserialized = objectMapper.readValue(dataFlow, DataFlow.class);
54+
return Result.success(deserialized);
55+
} catch (JsonProcessingException e) {
56+
return Result.failure(e);
57+
}
4158
}
4259
}

src/test/java/org/eclipse/dataplane/ControlPlane.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.Executors;
3131

3232
import static jakarta.ws.rs.core.MediaType.WILDCARD;
33+
import static org.assertj.core.api.Assertions.assertThat;
3334

3435
/**
3536
* This simulates control plane for both consumer and provider.
@@ -102,7 +103,14 @@ public ControlPlaneController(DataplaneClient counterPart) {
102103
@Path("/{transferId}/dataflow/prepared")
103104
@Consumes(WILDCARD)
104105
public void prepared(@PathParam("transferId") String transferId, DataFlowResponseMessage message) {
106+
assertThat(message.state()).isEqualTo("PREPARED");
107+
}
105108

109+
@POST
110+
@Path("/{transferId}/dataflow/started")
111+
@Consumes(WILDCARD)
112+
public void started(@PathParam("transferId") String transferId, DataFlowResponseMessage message) {
113+
assertThat(message.state()).isEqualTo("STARTED");
106114
}
107115

108116
@POST

src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
2525
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2626
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
27+
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
28+
import org.jspecify.annotations.NonNull;
2729
import org.junit.jupiter.api.AfterEach;
2830
import org.junit.jupiter.api.BeforeEach;
2931
import org.junit.jupiter.api.Test;
@@ -41,6 +43,7 @@
4143
import static org.awaitility.Awaitility.await;
4244
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.PREPARED;
4345
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.STARTED;
46+
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.STARTING;
4447

4548
class ConsumerPullTest {
4649

@@ -69,18 +72,14 @@ void shouldPullDataFromProvider() {
6972
var transferType = "FileSystem-PULL";
7073
var processId = UUID.randomUUID().toString();
7174
var consumerProcessId = "consumer_" + processId;
72-
var prepareMessage = new DataFlowPrepareMessage("theMessageId", "theParticipantId", "theCounterPartyId",
73-
"theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", controlPlane.consumerCallbackAddress(),
74-
transferType, emptyList(), emptyMap());
75+
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);
7576

7677
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
7778
assertThat(prepareResponse.state()).isEqualTo(PREPARED.name());
7879
assertThat(prepareResponse.dataAddress()).isNull();
7980

8081
var providerProcessId = "provider_" + processId;
81-
var startMessage = new DataFlowStartMessage("theMessageId", "theParticipantId", "theCounterPartyId",
82-
"theDataspaceContext", providerProcessId, "theAgreementId", "theDatasetId", controlPlane.providerCallbackAddress(),
83-
transferType, null, emptyList(), emptyMap());
82+
var startMessage = createStartMessage(providerProcessId, transferType);
8483
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
8584
assertThat(startResponse.state()).isEqualTo(STARTED.name());
8685
assertThat(startResponse.dataAddress()).isNotNull();
@@ -92,6 +91,39 @@ void shouldPullDataFromProvider() {
9291
});
9392
}
9493

94+
@Test
95+
void shouldPermitAsyncStartup() {
96+
var transferType = "FileSystemAsync-PULL";
97+
var processId = UUID.randomUUID().toString();
98+
var consumerProcessId = "consumer_" + processId;
99+
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);
100+
controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
101+
102+
var providerProcessId = "provider_" + processId;
103+
var startMessage = createStartMessage(providerProcessId, transferType);
104+
var startResponse = controlPlane.providerStart(startMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
105+
assertThat(startResponse.state()).isEqualTo(STARTING.name());
106+
assertThat(startResponse.dataAddress()).isNull();
107+
108+
providerDataPlane.completeStartup(providerProcessId);
109+
110+
assertThat(controlPlane.providerStatus(providerProcessId).statusCode(200).extract().as(DataFlowStatusResponseMessage.class).state())
111+
.isEqualTo(STARTED.name());
112+
}
113+
114+
private @NonNull DataFlowStartMessage createStartMessage(String providerProcessId, String transferType) {
115+
return new DataFlowStartMessage("theMessageId", "theParticipantId", "theCounterPartyId",
116+
"theDataspaceContext", providerProcessId, "theAgreementId", "theDatasetId", controlPlane.providerCallbackAddress(),
117+
transferType, null, emptyList(), emptyMap());
118+
}
119+
120+
private @NonNull DataFlowPrepareMessage createPrepareMessage(String consumerProcessId, String transferType) {
121+
return new DataFlowPrepareMessage("theMessageId", "theParticipantId", "theCounterPartyId",
122+
"theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", controlPlane.consumerCallbackAddress(),
123+
transferType, emptyList(), emptyMap());
124+
}
125+
126+
95127
private class ConsumerDataPlane {
96128

97129
private final Path storage;
@@ -144,7 +176,22 @@ public Object controller() {
144176
return sdk.controller();
145177
}
146178

179+
public void completeStartup(String dataFlowId) {
180+
sdk.getById(dataFlowId)
181+
.compose(dataFlow -> sdk.notifyStarted(dataFlowId, this::prepareSourceDataAddress))
182+
.orElseThrow(f -> new RuntimeException(f.getCause()));
183+
}
184+
147185
private Result<DataFlow> onStart(DataFlow dataFlow) {
186+
if (dataFlow.getTransferType().equals("FileSystemAsync-PULL")) {
187+
dataFlow.transitionToStarting();
188+
return Result.success(dataFlow);
189+
}
190+
191+
return prepareSourceDataAddress(dataFlow);
192+
}
193+
194+
private Result<DataFlow> prepareSourceDataAddress(DataFlow dataFlow) {
148195
try {
149196
var destinationDirectory = Files.createTempDirectory(dataFlow.getId());
150197
for (var i = 0; i < filesToBeCreated; i++) {
@@ -154,7 +201,6 @@ private Result<DataFlow> onStart(DataFlow dataFlow) {
154201

155202
var dataAddress = new DataAddress("FileSystem", "directory", destinationDirectory.toString(), emptyList());
156203
dataFlow.setDataAddress(dataAddress);
157-
158204
return Result.success(dataFlow);
159205
} catch (IOException e) {
160206
return Result.failure(e);

0 commit comments

Comments
 (0)