Skip to content

Commit e32e815

Browse files
authored
feat: resume endpoint (#61)
1 parent e7c6810 commit e32e815

7 files changed

Lines changed: 107 additions & 2 deletions

File tree

src/main/java/org/eclipse/dataplane/Dataplane.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.eclipse.dataplane.domain.controlplane.ControlPlane;
2323
import org.eclipse.dataplane.domain.dataflow.DataFlow;
2424
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
25+
import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage;
2526
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2627
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
2728
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
@@ -33,6 +34,7 @@
3334
import org.eclipse.dataplane.domain.registration.DataPlaneRegistrationMessage;
3435
import org.eclipse.dataplane.logic.OnCompleted;
3536
import org.eclipse.dataplane.logic.OnPrepare;
37+
import org.eclipse.dataplane.logic.OnResume;
3638
import org.eclipse.dataplane.logic.OnStart;
3739
import org.eclipse.dataplane.logic.OnStarted;
3840
import org.eclipse.dataplane.logic.OnSuspend;
@@ -77,6 +79,7 @@ public class Dataplane {
7779
private OnStart onStart = dataFlow -> Result.failure(new UnsupportedOperationException("onStart is not implemented"));
7880
private OnTerminate onTerminate = dataFlow -> Result.failure(new UnsupportedOperationException("onTerminate is not implemented"));
7981
private OnSuspend onSuspend = dataFlow -> Result.failure(new UnsupportedOperationException("onSuspend is not implemented"));
82+
private OnResume onResume = dataFlow -> Result.failure(new UnsupportedOperationException("onResume is not implemented"));
8083
private OnStarted onStarted = dataFlow -> Result.failure(new UnsupportedOperationException("onStarted is not implemented"));
8184
private OnCompleted onCompleted = dataFlow -> Result.failure(new UnsupportedOperationException("onCompleted is not implemented"));
8285

@@ -193,6 +196,24 @@ public Result<Void> suspend(String flowId, DataFlowSuspendMessage message) {
193196
.map(it -> null);
194197
}
195198

199+
public Result<DataFlowStatusMessage> resume(String flowId, DataFlowResumeMessage message) {
200+
return dataFlowStore.findById(flowId)
201+
.map(dataFlow -> {
202+
if (message.dataAddress() != null) {
203+
dataFlow.setDataAddress(message.dataAddress());
204+
}
205+
return dataFlow;
206+
})
207+
.compose(onResume::action)
208+
.compose(dataFlow -> {
209+
dataFlow.transitionToStarted();
210+
211+
var response = new DataFlowStatusMessage(id, flowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null);
212+
213+
return save(dataFlow).map(it -> response);
214+
});
215+
}
216+
196217
public Result<Void> terminate(String dataFlowId, DataFlowTerminateMessage message) {
197218
return dataFlowStore.findById(dataFlowId)
198219
.map(dataFlow -> {
@@ -443,6 +464,11 @@ public Builder onSuspend(OnSuspend onSuspend) {
443464
return this;
444465
}
445466

467+
public Builder onResume(OnResume onResume) {
468+
dataplane.onResume = onResume;
469+
return this;
470+
}
471+
446472
public Builder onTerminate(OnTerminate onTerminate) {
447473
dataplane.onTerminate = onTerminate;
448474
return this;
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation
12+
*
13+
*/
14+
15+
package org.eclipse.dataplane.domain.dataflow;
16+
17+
import org.eclipse.dataplane.domain.DataAddress;
18+
19+
public record DataFlowResumeMessage(
20+
String messageId,
21+
String processId,
22+
DataAddress dataAddress
23+
) {
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation
12+
*
13+
*/
14+
15+
package org.eclipse.dataplane.logic;
16+
17+
import org.eclipse.dataplane.domain.Result;
18+
import org.eclipse.dataplane.domain.dataflow.DataFlow;
19+
20+
public interface OnResume {
21+
22+
Result<DataFlow> action(DataFlow dataFlow);
23+
24+
}

src/main/java/org/eclipse/dataplane/port/DataPlaneSignalingApiController.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
*
1010
* Contributors:
1111
* Think-it GmbH - initial API and implementation
12+
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - resume endpoint
1213
*
1314
*/
1415

@@ -28,6 +29,7 @@
2829
import org.eclipse.dataplane.domain.Result;
2930
import org.eclipse.dataplane.domain.dataflow.DataFlow;
3031
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
32+
import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage;
3133
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
3234
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
3335
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
@@ -86,6 +88,14 @@ public Response suspend(@PathParam("flowId") String flowId, DataFlowSuspendMessa
8688
return Response.ok().build();
8789
}
8890

91+
@POST
92+
@Path("/{flowId}/resume")
93+
public Response resume(@PathParam("flowId") String flowId, DataFlowResumeMessage message) {
94+
var response = dataplane.resume(flowId, message).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
95+
96+
return Response.ok(response).build();
97+
}
98+
8999
@POST
90100
@Path("/{flowId}/terminate")
91101
public Response terminate(@PathParam("flowId") String flowId, DataFlowTerminateMessage message) {

src/test/java/org/eclipse/dataplane/ControlPlane.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import jakarta.ws.rs.core.Context;
2525
import org.eclipse.dataplane.domain.Result;
2626
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
27+
import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage;
2728
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2829
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
2930
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
@@ -83,6 +84,10 @@ public ValidatableResponse providerSuspend(String flowId, DataFlowSuspendMessage
8384
return providerClient.suspend(flowId, suspendMessage);
8485
}
8586

87+
public ValidatableResponse providerResume(String flowId, DataFlowResumeMessage resumeMessage) {
88+
return providerClient.resume(flowId, resumeMessage);
89+
}
90+
8691
public ValidatableResponse providerStatus(String flowId) {
8792
return providerClient.status(flowId);
8893
}

src/test/java/org/eclipse/dataplane/DataplaneClient.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.restassured.specification.RequestSpecification;
2020
import org.eclipse.dataplane.domain.Result;
2121
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
22+
import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage;
2223
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2324
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
2425
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
@@ -93,6 +94,14 @@ public ValidatableResponse suspend(String flowId, DataFlowSuspendMessage suspend
9394
.log().ifValidationFails();
9495
}
9596

97+
public ValidatableResponse resume(String flowId, DataFlowResumeMessage resumeMessage) {
98+
return baseRequest()
99+
.body(resumeMessage)
100+
.post("/v1/dataflows/{id}/resume", flowId)
101+
.then()
102+
.log().ifValidationFails();
103+
}
104+
96105
private RequestSpecification baseRequest() {
97106
var requestSpecification = given()
98107
.contentType(ContentType.JSON)

src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
*
1010
* Contributors:
1111
* Think-it GmbH - initial API and implementation
12+
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - resume endpoint
1213
*
1314
*/
1415

@@ -22,6 +23,7 @@
2223
import org.eclipse.dataplane.domain.Result;
2324
import org.eclipse.dataplane.domain.dataflow.DataFlow;
2425
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
26+
import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage;
2527
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
2628
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
2729
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
@@ -133,7 +135,8 @@ void shouldSuspendAndResumeOnProvider() {
133135

134136
consumerDataPlane.assertNoMoreDataIsTransferred();
135137

136-
var resumeResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
138+
var resumeMessage = resumeMessage(providerProcessId);
139+
var resumeResponse = controlPlane.providerResume(providerProcessId, resumeMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
137140
controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(resumeResponse.dataAddress())).statusCode(200);
138141

139142
consumerDataPlane.assertDataIsFlowing();
@@ -151,6 +154,10 @@ private DataFlowStartMessage startMessage(String providerProcessId, String trans
151154
transferType, null, emptyList(), emptyMap());
152155
}
153156

157+
private DataFlowResumeMessage resumeMessage(String providerProcessId) {
158+
return new DataFlowResumeMessage("theMessageId", providerProcessId, null);
159+
}
160+
154161
private static class ConsumerDataPlane {
155162

156163
private final Path storage;
@@ -221,6 +228,7 @@ private static class ProviderDataPlane {
221228
.registerAuthorization(new TestAuthorization())
222229
.onStart(this::onStart)
223230
.onSuspend(this::stopDataFlow)
231+
.onResume(this::onStart)
224232
.onTerminate(this::stopDataFlow)
225233
.build();
226234
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
@@ -265,6 +273,5 @@ private Result<DataFlow> stopDataFlow(DataFlow dataFlow) {
265273
return Result.failure(e);
266274
}
267275
}
268-
269276
}
270277
}

0 commit comments

Comments
 (0)