Skip to content

Commit 39126f2

Browse files
committed
chore: conditionally process data address on resume flow
1 parent 7784da3 commit 39126f2

1 file changed

Lines changed: 11 additions & 4 deletions

File tree

src/main/java/org/eclipse/dataplane/Dataplane.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
6565
import static jakarta.ws.rs.core.HttpHeaders.AUTHORIZATION;
6666
import static java.util.Collections.emptyMap;
67+
import static org.eclipse.dataplane.domain.dataflow.DataFlow.Type.CONSUMER;
68+
import static org.eclipse.dataplane.domain.dataflow.DataFlow.Type.PROVIDER;
6769

6870
public class Dataplane {
6971

@@ -132,7 +134,7 @@ public Result<DataFlowStatusMessage> prepare(String controlplaneId, DataFlowPrep
132134
.counterPartyId(message.counterPartyId())
133135
.dataspaceContext(message.dataspaceContext())
134136
.controlplaneId(controlplaneId)
135-
.type(DataFlow.Type.CONSUMER)
137+
.type(CONSUMER)
136138
.build();
137139

138140
return checkControlPlane(controlplaneId)
@@ -167,7 +169,7 @@ public Result<DataFlowStatusMessage> start(String controlplaneId, DataFlowStartM
167169
.counterPartyId(message.counterPartyId())
168170
.dataspaceContext(message.dataspaceContext())
169171
.controlplaneId(controlplaneId)
170-
.type(DataFlow.Type.PROVIDER)
172+
.type(PROVIDER)
171173
.build();
172174

173175
return checkControlPlane(controlplaneId)
@@ -201,7 +203,9 @@ public Result<Void> suspend(String flowId, DataFlowSuspendMessage message) {
201203
public Result<DataFlowStatusMessage> resume(String flowId, DataFlowResumeMessage message) {
202204
return dataFlowStore.findById(flowId)
203205
.map(dataFlow -> {
204-
if (message.dataAddress() != null) {
206+
var shouldReceiveDataAddress = (PROVIDER.equals(dataFlow.getType()) && dataFlow.isPush()) ||
207+
(CONSUMER.equals(dataFlow.getType()) && dataFlow.isPull());
208+
if (shouldReceiveDataAddress) {
205209
dataFlow.setDataAddress(message.dataAddress());
206210
}
207211
return dataFlow;
@@ -210,7 +214,10 @@ public Result<DataFlowStatusMessage> resume(String flowId, DataFlowResumeMessage
210214
.compose(dataFlow -> {
211215
dataFlow.transitionToStarted();
212216

213-
var response = new DataFlowStatusMessage(id, flowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null);
217+
var shouldProvideDataAddress = (PROVIDER.equals(dataFlow.getType()) && dataFlow.isPull()) ||
218+
(CONSUMER.equals(dataFlow.getType()) && dataFlow.isPush());
219+
var dataAddress = shouldProvideDataAddress ? dataFlow.getDataAddress() : null;
220+
var response = new DataFlowStatusMessage(id, flowId, dataFlow.getState().name(), dataAddress, null);
214221

215222
return save(dataFlow).map(it -> response);
216223
});

0 commit comments

Comments
 (0)