Skip to content

Commit d112ddb

Browse files
authored
feat: permit async preparation (#24)
1 parent 4708bf2 commit d112ddb

7 files changed

Lines changed: 125 additions & 81 deletions

File tree

src/main/java/org/eclipse/dataplane/Dataplane.java

Lines changed: 61 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@
3434
import org.eclipse.dataplane.logic.OnSuspend;
3535
import org.eclipse.dataplane.logic.OnTerminate;
3636
import org.eclipse.dataplane.port.DataPlaneSignalingApiController;
37-
import org.eclipse.dataplane.port.exception.DataFlowNotifyCompletedFailed;
38-
import org.eclipse.dataplane.port.exception.DataFlowNotifyErroredFailed;
37+
import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed;
3938
import org.eclipse.dataplane.port.exception.DataplaneNotRegistered;
4039
import org.eclipse.dataplane.port.store.DataFlowStore;
4140
import org.eclipse.dataplane.port.store.InMemoryDataFlowStore;
@@ -48,6 +47,8 @@
4847
import java.util.Set;
4948
import java.util.UUID;
5049

50+
import static java.util.Collections.emptyMap;
51+
5152
public class Dataplane {
5253

5354
private final DataFlowStore store = new InMemoryDataFlowStore();
@@ -175,33 +176,50 @@ public Result<Void> terminate(String dataFlowId, DataFlowTerminateMessage messag
175176
}
176177

177178
/**
178-
* Notify the control plane that the data flow has been completed.
179+
* Notify the control plane that the data flow has been prepared.
179180
*
180-
* @param dataFlowId id of the data flow
181+
* @param dataFlowId the data flow id.
181182
*/
182-
public Result<Void> notifyCompleted(String dataFlowId) {
183+
public Result<Void> notifyPrepared(String dataFlowId, OnPrepare onPrepare) {
183184
return store.findById(dataFlowId)
185+
.compose(onPrepare::action)
184186
.compose(dataFlow -> {
185-
var endpoint = dataFlow.getCallbackAddress() + "/transfers/" + dataFlow.getId() + "/dataflow/completed";
187+
var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);
186188

187-
var request = HttpRequest.newBuilder()
188-
.uri(URI.create(endpoint))
189-
.header("content-type", "application/json")
190-
.POST(HttpRequest.BodyPublishers.ofString("{}")) // TODO DataFlowCompletedMessage not defined
191-
.build();
189+
return notifyControlPlane("prepared", dataFlow, message)
190+
.compose(response -> {
191+
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
192+
if (successful) {
193+
dataFlow.transitionToPrepared();
194+
return save(dataFlow);
195+
}
192196

193-
var response = httpClient.send(request, HttpResponse.BodyHandlers.discarding());
197+
return Result.failure(new DataFlowNotifyControlPlaneFailed("prepared", response));
198+
});
194199

195-
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
196-
if (successful) {
197-
dataFlow.transitionToCompleted();
198-
return save(dataFlow);
199-
}
200-
201-
return Result.failure(new DataFlowNotifyCompletedFailed(response));
202200
});
203201
}
204202

203+
204+
/**
205+
* Notify the control plane that the data flow has been completed.
206+
*
207+
* @param dataFlowId id of the data flow
208+
*/
209+
public Result<Void> notifyCompleted(String dataFlowId) {
210+
return store.findById(dataFlowId)
211+
.compose(dataFlow -> notifyControlPlane("completed", dataFlow, emptyMap()) // TODO DataFlowCompletedMessage not defined
212+
.compose(response -> {
213+
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
214+
if (successful) {
215+
dataFlow.transitionToCompleted();
216+
return save(dataFlow);
217+
}
218+
219+
return Result.failure(new DataFlowNotifyControlPlaneFailed("completed", response));
220+
}));
221+
}
222+
205223
/**
206224
* Notify the control plane that the data flow failed for some reason
207225
*
@@ -210,25 +228,16 @@ public Result<Void> notifyCompleted(String dataFlowId) {
210228
*/
211229
public Result<Void> notifyErrored(String dataFlowId, Throwable throwable) {
212230
return store.findById(dataFlowId)
213-
.compose(dataFlow -> {
214-
var endpoint = dataFlow.getCallbackAddress() + "/transfers/" + dataFlow.getId() + "/dataflow/errored";
215-
216-
var request = HttpRequest.newBuilder()
217-
.uri(URI.create(endpoint))
218-
.header("content-type", "application/json")
219-
.POST(HttpRequest.BodyPublishers.ofString("{}")) // TODO DataFlowErroredMessage not defined
220-
.build();
221-
222-
var response = httpClient.send(request, HttpResponse.BodyHandlers.discarding());
223-
224-
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
225-
if (successful) {
226-
dataFlow.transitionToTerminated(throwable.getMessage());
227-
return save(dataFlow);
228-
}
229-
230-
return Result.failure(new DataFlowNotifyErroredFailed(response));
231-
});
231+
.compose(dataFlow -> notifyControlPlane("errored", dataFlow, emptyMap()) // TODO DataFlowErroredMessage not defined
232+
.compose(response -> {
233+
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
234+
if (successful) {
235+
dataFlow.transitionToTerminated(throwable.getMessage());
236+
return save(dataFlow);
237+
}
238+
239+
return Result.failure(new DataFlowNotifyControlPlaneFailed("errored", response));
240+
}));
232241
}
233242

234243
public Result<Void> started(String flowId, DataFlowStartedNotificationMessage startedNotificationMessage) {
@@ -279,7 +288,20 @@ public Result<Void> registerOn(String controlPlaneEndpoint) {
279288
});
280289
}
281290

282-
private Result<String> toJson(DataPlaneRegistrationMessage message) {
291+
private Result<HttpResponse<Void>> notifyControlPlane(String action, DataFlow dataFlow, Object message) {
292+
return toJson(message).map(body -> {
293+
var endpoint = dataFlow.callbackEndpointFor(action);
294+
var request = HttpRequest.newBuilder()
295+
.uri(URI.create(endpoint))
296+
.header("content-type", "application/json")
297+
.POST(HttpRequest.BodyPublishers.ofString(body))
298+
.build();
299+
300+
return httpClient.send(request, HttpResponse.BodyHandlers.discarding());
301+
});
302+
}
303+
304+
private Result<String> toJson(Object message) {
283305
try {
284306
return Result.success(objectMapper.writeValueAsString(message));
285307
} catch (JsonProcessingException e) {

src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ public void setDataAddress(DataAddress dataAddress) {
153153
this.dataAddress = dataAddress;
154154
}
155155

156+
public String callbackEndpointFor(String action) {
157+
return getCallbackAddress() + "/transfers/" + getId() + "/dataflow/" + action;
158+
}
159+
156160
public static class Builder {
157161
private final DataFlow dataFlow = new DataFlow();
158162

src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyCompletedFailed.java renamed to src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyControlPlaneFailed.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,21 @@
1616

1717
import java.net.http.HttpResponse;
1818

19-
public class DataFlowNotifyCompletedFailed extends Exception {
19+
public class DataFlowNotifyControlPlaneFailed extends Exception {
20+
private final String action;
2021
private final HttpResponse<Void> response;
2122

22-
public DataFlowNotifyCompletedFailed(HttpResponse<Void> response) {
23+
public DataFlowNotifyControlPlaneFailed(String action, HttpResponse<Void> response) {
2324
super("control-plane responded with %s".formatted(response.statusCode()));
25+
this.action = action;
2426
this.response = response;
2527
}
2628

2729
public HttpResponse<Void> getResponse() {
2830
return response;
2931
}
32+
33+
public String getAction() {
34+
return action;
35+
}
3036
}

src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyErroredFailed.java

Lines changed: 0 additions & 30 deletions
This file was deleted.

src/test/java/org/eclipse/dataplane/ControlPlane.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import jakarta.ws.rs.Path;
2121
import jakarta.ws.rs.PathParam;
2222
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
23+
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
2324
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2425
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
2526
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
@@ -97,6 +98,13 @@ public ControlPlaneController(DataplaneClient counterPart) {
9798
this.counterPart = counterPart;
9899
}
99100

101+
@POST
102+
@Path("/{transferId}/dataflow/prepared")
103+
@Consumes(WILDCARD)
104+
public void prepared(@PathParam("transferId") String transferId, DataFlowResponseMessage message) {
105+
106+
}
107+
100108
@POST
101109
@Path("/{transferId}/dataflow/completed")
102110
@Consumes(WILDCARD)

src/test/java/org/eclipse/dataplane/DataplaneTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import org.eclipse.dataplane.domain.Result;
1919
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
2020
import org.eclipse.dataplane.port.exception.DataFlowNotFoundException;
21-
import org.eclipse.dataplane.port.exception.DataFlowNotifyCompletedFailed;
21+
import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed;
2222
import org.eclipse.dataplane.port.exception.DataplaneNotRegistered;
2323
import org.junit.jupiter.api.AfterEach;
2424
import org.junit.jupiter.api.BeforeEach;
@@ -92,7 +92,7 @@ void shouldReturnFailedFuture_whenControlPlaneRespondWithError() {
9292
var result = dataplane.notifyCompleted("dataFlowId");
9393

9494
assertThat(result.failed()).isTrue();
95-
assertThatThrownBy(result::orElseThrow).isExactlyInstanceOf(DataFlowNotifyCompletedFailed.class);
95+
assertThatThrownBy(result::orElseThrow).isExactlyInstanceOf(DataFlowNotifyControlPlaneFailed.class);
9696
assertThat(dataplane.status("dataFlowId").getContent().state()).isNotEqualTo(COMPLETED.name());
9797
}
9898

src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import static org.awaitility.Awaitility.await;
4444
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.COMPLETED;
4545
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.PREPARED;
46+
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.PREPARING;
4647
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.STARTED;
4748
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.TERMINATED;
4849

@@ -72,7 +73,7 @@ void shouldPushDataToEndpointPreparedByConsumer() {
7273
var transferType = "FileSystem-PUSH";
7374
var processId = UUID.randomUUID().toString();
7475
var consumerProcessId = "consumer_" + processId;
75-
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);
76+
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);
7677

7778
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
7879
assertThat(prepareResponse.state()).isEqualTo(PREPARED.name());
@@ -103,7 +104,7 @@ void shouldSendError_whenFlowFails() {
103104
var transferType = "FileSystem-PUSH";
104105
var processId = UUID.randomUUID().toString();
105106
var consumerProcessId = "consumer_" + processId;
106-
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);
107+
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);
107108

108109
controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
109110
var invalidDataAddress = new DataAddress("FileSystem", "", emptyList());
@@ -121,15 +122,32 @@ void shouldSendError_whenFlowFails() {
121122
});
122123
}
123124

125+
@Test
126+
void shouldPermitAsyncPreparation() {
127+
var transferType = "FileSystemAsync-PUSH";
128+
var processId = UUID.randomUUID().toString();
129+
var consumerProcessId = "consumer_" + processId;
130+
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);
131+
132+
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
133+
assertThat(prepareResponse.state()).isEqualTo(PREPARING.name());
134+
assertThat(prepareResponse.dataAddress()).isNull();
135+
136+
consumerDataPlane.completePreparation(consumerProcessId);
137+
138+
assertThat(controlPlane.consumerStatus(consumerProcessId).statusCode(200).extract().as(DataFlowStatusResponseMessage.class).state())
139+
.isEqualTo(PREPARED.name());
140+
}
141+
124142
private @NonNull DataFlowStartMessage createStartMessage(String providerProcessId, String callbackAddress, String transferType, DataAddress destinationDataAddress) {
125143
return new DataFlowStartMessage("theMessageId", "theParticipantId", "theCounterPartyId",
126144
"theDataspaceContext", providerProcessId, "theAgreementId", "theDatasetId", callbackAddress,
127145
transferType, destinationDataAddress, emptyList(), emptyMap());
128146
}
129147

130-
private @NonNull DataFlowPrepareMessage createPrepareMessage(String consumerProcessId, String transferType) {
148+
private @NonNull DataFlowPrepareMessage createPrepareMessage(String consumerProcessId, String callbackAddress, String transferType) {
131149
return new DataFlowPrepareMessage("theMessageId", "theParticipantId", "theCounterPartyId",
132-
"theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", "theCallbackAddress",
150+
"theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", callbackAddress,
133151
transferType, emptyList(), emptyMap());
134152
}
135153

@@ -178,13 +196,24 @@ private static class ConsumerDataPlane {
178196
.onTerminate(Result::success)
179197
.build();
180198

199+
public void completePreparation(String dataFlowId) {
200+
sdk.getById(dataFlowId)
201+
.compose(dataFlow -> sdk.notifyPrepared(dataFlowId, this::prepareDestinationDataAddress))
202+
.orElseThrow(f -> new RuntimeException(f.getCause()));
203+
}
204+
181205
private Result<DataFlow> onPrepare(DataFlow dataFlow) {
182-
try {
183-
var destinationFile = Files.createTempDirectory("consumer-dest").resolve(dataFlow.getId() + "-content");
184-
var dataAddress = new DataAddress("FileSystem", "file", destinationFile.toString(), emptyList());
206+
if (dataFlow.getTransferType().equals("FileSystemAsync-PUSH")) {
207+
dataFlow.transitionToPreparing();
208+
return Result.success(dataFlow);
209+
}
185210

186-
dataFlow.setDataAddress(dataAddress);
211+
return prepareDestinationDataAddress(dataFlow);
212+
}
187213

214+
private @NonNull Result<DataFlow> prepareDestinationDataAddress(DataFlow dataFlow) {
215+
try {
216+
dataFlow.setDataAddress(destinationDataAddress(dataFlow));
188217
return Result.success(dataFlow);
189218
} catch (IOException e) {
190219
return Result.failure(e);
@@ -205,6 +234,11 @@ private Result<DataFlow> onCompleted(DataFlow dataFlow) {
205234
}
206235
}
207236

237+
private @NonNull DataAddress destinationDataAddress(DataFlow dataFlow) throws IOException {
238+
var destinationFile = Files.createTempDirectory("consumer-dest").resolve(dataFlow.getId() + "-content");
239+
return new DataAddress("file", destinationFile.toString(), emptyList());
240+
}
241+
208242
public Object controller() {
209243
return sdk.controller();
210244
}

0 commit comments

Comments
 (0)