Skip to content

Commit 9bcce99

Browse files
authored
refactor: remove dataplaneId from DataFlowResponseMessage (#5652)
* refactor: remove dataplaneId from DataFlowResponseMessage * rename message
1 parent 19f84cd commit 9bcce99

13 files changed

Lines changed: 72 additions & 75 deletions

File tree

data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingApiExtension.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.eclipse.edc.signaling.port.api.DataPlaneTransferApiController;
2424
import org.eclipse.edc.signaling.port.api.DataPlaneTransferAuthorizationFilter;
2525
import org.eclipse.edc.signaling.port.transformer.DataAddressToDspDataAddressTransformer;
26-
import org.eclipse.edc.signaling.port.transformer.DataFlowResponseMessageToDataFlowResponseTransformer;
26+
import org.eclipse.edc.signaling.port.transformer.DataFlowStatusMessageToDataFlowResponseTransformer;
2727
import org.eclipse.edc.signaling.port.transformer.DspDataAddressToDataAddressTransformer;
2828
import org.eclipse.edc.signaling.spi.authorization.SignalingAuthorizationRegistry;
2929
import org.eclipse.edc.spi.system.ServiceExtension;
@@ -76,7 +76,7 @@ public void initialize(ServiceExtensionContext context) {
7676

7777
var typeTransformerRegistry = transformerRegistry.forContext("signaling-api");
7878
typeTransformerRegistry.register(new DataAddressToDspDataAddressTransformer());
79-
typeTransformerRegistry.register(new DataFlowResponseMessageToDataFlowResponseTransformer());
79+
typeTransformerRegistry.register(new DataFlowStatusMessageToDataFlowResponseTransformer());
8080
typeTransformerRegistry.register(new DspDataAddressToDataAddressTransformer());
8181

8282
webService.registerResource(ApiContext.MANAGEMENT, new DataPlaneRegistrationApiV4Controller(dataPlaneSelectorService));

data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingFlowControllerExtension.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.eclipse.edc.signaling.logic.DataPlaneSignalingFlowController;
2525
import org.eclipse.edc.signaling.port.ClientFactory;
2626
import org.eclipse.edc.signaling.port.transformer.DataAddressToDspDataAddressTransformer;
27-
import org.eclipse.edc.signaling.port.transformer.DataFlowResponseMessageToDataFlowResponseTransformer;
27+
import org.eclipse.edc.signaling.port.transformer.DataFlowStatusMessageToDataFlowResponseTransformer;
2828
import org.eclipse.edc.signaling.port.transformer.DspDataAddressToDataAddressTransformer;
2929
import org.eclipse.edc.spi.system.ServiceExtension;
3030
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
@@ -58,7 +58,7 @@ public String name() {
5858
public DataFlowController dataFlowController() {
5959
var typeTransformerRegistry = transformerRegistry.forContext("signaling-api");
6060
typeTransformerRegistry.register(new DataAddressToDspDataAddressTransformer());
61-
typeTransformerRegistry.register(new DataFlowResponseMessageToDataFlowResponseTransformer());
61+
typeTransformerRegistry.register(new DataFlowStatusMessageToDataFlowResponseTransformer());
6262
typeTransformerRegistry.register(new DspDataAddressToDataAddressTransformer());
6363
return new DataPlaneSignalingFlowController(apiConfiguration.createPublicUri(), dataPlaneSelectorService,
6464
typeTransformerRegistry, clientFactory, dataAddressStore);

data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/domain/DataFlowResponseMessage.java renamed to data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/domain/DataFlowStatusMessage.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,16 @@
1414

1515
package org.eclipse.edc.signaling.domain;
1616

17-
public final class DataFlowResponseMessage {
17+
public final class DataFlowStatusMessage {
1818

19-
private String dataplaneId;
2019
private DspDataAddress dataAddress;
2120
private String state;
2221
private String error;
2322

24-
private DataFlowResponseMessage() {
23+
private DataFlowStatusMessage() {
2524

2625
}
2726

28-
public String getDataplaneId() {
29-
return dataplaneId;
30-
}
31-
3227
public DspDataAddress getDataAddress() {
3328
return dataAddress;
3429
}
@@ -43,7 +38,7 @@ public String getError() {
4338

4439
public static class Builder {
4540

46-
private final DataFlowResponseMessage instance = new DataFlowResponseMessage();
41+
private final DataFlowStatusMessage instance = new DataFlowStatusMessage();
4742

4843
public static Builder newInstance() {
4944
return new Builder();
@@ -53,16 +48,11 @@ private Builder() {
5348

5449
}
5550

56-
public DataFlowResponseMessage build() {
51+
public DataFlowStatusMessage build() {
5752
return instance;
5853
}
5954

6055

61-
public Builder dataplaneId(String dataplaneId) {
62-
instance.dataplaneId = dataplaneId;
63-
return this;
64-
}
65-
6656
public Builder dataAddress(DspDataAddress dataAddress) {
6757
instance.dataAddress = dataAddress;
6858
return this;

data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowController.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,11 @@ public StatusResult<DataFlowResponse> prepare(TransferProcess transferProcess, P
9595

9696
var message = builder.build();
9797

98-
return clientFactory.createClient(selection.getContent())
98+
var dataPlaneInstance = selection.getContent();
99+
return clientFactory.createClient(dataPlaneInstance)
99100
.prepare(message)
100101
.compose(response -> typeTransformerRegistry.transform(response, DataFlowResponse.class)
102+
.<DataFlowResponse, Result<DataFlowResponse>>map(r -> r.toBuilder().dataPlaneId(dataPlaneInstance.getId()).build())
101103
.flatMap(this::toStatusResult));
102104
}
103105

@@ -137,9 +139,11 @@ public StatusResult<DataFlowResponse> prepare(TransferProcess transferProcess, P
137139

138140
var message = builder.build();
139141

140-
return clientFactory.createClient(selection.getContent())
142+
var dataPlaneInstance = selection.getContent();
143+
return clientFactory.createClient(dataPlaneInstance)
141144
.start(message)
142145
.compose(response -> typeTransformerRegistry.transform(response, DataFlowResponse.class)
146+
.<DataFlowResponse, Result<DataFlowResponse>>map(r -> r.toBuilder().dataPlaneId(dataPlaneInstance.getId()).build())
143147
.flatMap(this::toStatusResult));
144148
}
145149

data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/DataPlaneSignalingClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
2424
import org.eclipse.edc.http.spi.EdcHttpClient;
2525
import org.eclipse.edc.signaling.domain.DataFlowPrepareMessage;
26-
import org.eclipse.edc.signaling.domain.DataFlowResponseMessage;
2726
import org.eclipse.edc.signaling.domain.DataFlowStartMessage;
2827
import org.eclipse.edc.signaling.domain.DataFlowStartedNotificationMessage;
28+
import org.eclipse.edc.signaling.domain.DataFlowStatusMessage;
2929
import org.eclipse.edc.signaling.spi.authorization.SignalingAuthorizationRegistry;
3030
import org.eclipse.edc.spi.response.StatusResult;
3131
import org.eclipse.edc.spi.result.Result;
@@ -60,13 +60,13 @@ public DataPlaneSignalingClient(DataPlaneInstance dataPlane, EdcHttpClient httpC
6060
this.authorizationRegistry = authorizationRegistry;
6161
}
6262

63-
public StatusResult<DataFlowResponseMessage> prepare(DataFlowPrepareMessage request) {
63+
public StatusResult<DataFlowStatusMessage> prepare(DataFlowPrepareMessage request) {
6464
var url = "%s/prepare".formatted(dataPlane.getUrl());
6565
return createRequestBuilder(request, url)
6666
.compose(builder -> execute(builder, this::handleResponse));
6767
}
6868

69-
public StatusResult<DataFlowResponseMessage> start(DataFlowStartMessage request) {
69+
public StatusResult<DataFlowStatusMessage> start(DataFlowStartMessage request) {
7070
var url = "%s/start".formatted(dataPlane.getUrl());
7171
return createRequestBuilder(request, url)
7272
.compose(builder -> execute(builder, this::handleResponse));
@@ -103,14 +103,14 @@ private StatusResult<Void> sendMessage(String flowId, String name, Object messag
103103
}
104104
}
105105

106-
private Result<DataFlowResponseMessage> handleResponse(Response response) {
106+
private Result<DataFlowStatusMessage> handleResponse(Response response) {
107107
if (!response.isSuccessful()) {
108108
return Result.failure("Data-plane responded with %d - %s".formatted(response.code(), response.message()));
109109
}
110110

111111
try {
112112
var inputStream = response.body().byteStream();
113-
var message = objectMapperSupplier.get().readValue(inputStream, DataFlowResponseMessage.class);
113+
var message = objectMapperSupplier.get().readValue(inputStream, DataFlowStatusMessage.class);
114114
return Result.success(message);
115115
} catch (IOException e) {
116116
return Result.failure("Cannot parse response body: " + e.getMessage());

data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApi.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.swagger.v3.oas.annotations.tags.Tag;
2424
import jakarta.ws.rs.core.Response;
2525
import org.eclipse.edc.api.model.ApiCoreSchema;
26-
import org.eclipse.edc.signaling.domain.DataFlowResponseMessage;
26+
import org.eclipse.edc.signaling.domain.DataFlowStatusMessage;
2727

2828
import static jakarta.ws.rs.HttpMethod.POST;
2929

@@ -40,7 +40,7 @@ public interface DataPlaneTransferApi {
4040
content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class))))
4141
}
4242
)
43-
Response prepared(String transferId, DataFlowResponseMessage message);
43+
Response prepared(String transferId, DataFlowStatusMessage message);
4444

4545
@Operation(
4646
method = POST,
@@ -51,7 +51,7 @@ public interface DataPlaneTransferApi {
5151
content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class))))
5252
}
5353
)
54-
Response started(String transferId, DataFlowResponseMessage message);
54+
Response started(String transferId, DataFlowStatusMessage message);
5555

5656
@Operation(
5757
method = POST,

data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneTransferApiController.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import org.eclipse.edc.connector.controlplane.transfer.spi.types.command.NotifyPreparedCommand;
2727
import org.eclipse.edc.connector.controlplane.transfer.spi.types.command.NotifyStartedCommand;
2828
import org.eclipse.edc.signaling.domain.DataFlowPrepareMessage;
29-
import org.eclipse.edc.signaling.domain.DataFlowResponseMessage;
3029
import org.eclipse.edc.signaling.domain.DataFlowStartMessage;
30+
import org.eclipse.edc.signaling.domain.DataFlowStatusMessage;
3131
import org.eclipse.edc.spi.result.ServiceResult;
3232
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
3333

@@ -51,7 +51,7 @@ public DataPlaneTransferApiController(TransferProcessService transferProcessServ
5151
@Path("/{transferId}/dataflow/prepared")
5252
@POST
5353
@Override
54-
public Response prepared(@PathParam("transferId") String transferId, DataFlowResponseMessage message) {
54+
public Response prepared(@PathParam("transferId") String transferId, DataFlowStatusMessage message) {
5555
typeTransformerRegistry.transform(message, DataFlowResponse.class)
5656
.map(ServiceResult::success)
5757
.orElse(failure -> ServiceResult.badRequest(failure.getMessages()))
@@ -65,7 +65,7 @@ public Response prepared(@PathParam("transferId") String transferId, DataFlowRes
6565
@Path("/{transferId}/dataflow/started")
6666
@POST
6767
@Override
68-
public Response started(@PathParam("transferId") String transferId, DataFlowResponseMessage message) {
68+
public Response started(@PathParam("transferId") String transferId, DataFlowStatusMessage message) {
6969
typeTransformerRegistry.transform(message, DataFlowResponse.class)
7070
.map(ServiceResult::success)
7171
.orElse(failure -> ServiceResult.badRequest(failure.getMessages()))

data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/transformer/DataFlowResponseMessageToDataFlowResponseTransformer.java renamed to data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/transformer/DataFlowStatusMessageToDataFlowResponseTransformer.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@
1515
package org.eclipse.edc.signaling.port.transformer;
1616

1717
import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse;
18-
import org.eclipse.edc.signaling.domain.DataFlowResponseMessage;
18+
import org.eclipse.edc.signaling.domain.DataFlowStatusMessage;
1919
import org.eclipse.edc.spi.types.domain.DataAddress;
2020
import org.eclipse.edc.transform.spi.TransformerContext;
2121
import org.eclipse.edc.transform.spi.TypeTransformer;
2222
import org.jetbrains.annotations.NotNull;
2323
import org.jetbrains.annotations.Nullable;
2424

25-
public class DataFlowResponseMessageToDataFlowResponseTransformer implements TypeTransformer<DataFlowResponseMessage, DataFlowResponse> {
25+
public class DataFlowStatusMessageToDataFlowResponseTransformer implements TypeTransformer<DataFlowStatusMessage, DataFlowResponse> {
2626

2727
@Override
28-
public Class<DataFlowResponseMessage> getInputType() {
29-
return DataFlowResponseMessage.class;
28+
public Class<DataFlowStatusMessage> getInputType() {
29+
return DataFlowStatusMessage.class;
3030
}
3131

3232
@Override
@@ -35,11 +35,10 @@ public Class<DataFlowResponse> getOutputType() {
3535
}
3636

3737
@Override
38-
public @Nullable DataFlowResponse transform(@NotNull DataFlowResponseMessage dataFlowResponseMessage, @NotNull TransformerContext context) {
38+
public @Nullable DataFlowResponse transform(@NotNull DataFlowStatusMessage dataFlowStatusMessage, @NotNull TransformerContext context) {
3939
return DataFlowResponse.Builder.newInstance()
40-
.dataAddress(context.transform(dataFlowResponseMessage.getDataAddress(), DataAddress.class))
41-
.dataPlaneId(dataFlowResponseMessage.getDataplaneId())
42-
.async(dataFlowResponseMessage.getState().endsWith("ING"))
40+
.dataAddress(context.transform(dataFlowStatusMessage.getDataAddress(), DataAddress.class))
41+
.async(dataFlowStatusMessage.getState().endsWith("ING"))
4342
.build();
4443
}
4544

data-protocols/data-plane-signaling/data-plane-signaling-core/src/test/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowControllerTest.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
2323
import org.eclipse.edc.policy.model.Policy;
2424
import org.eclipse.edc.signaling.domain.DataFlowPrepareMessage;
25-
import org.eclipse.edc.signaling.domain.DataFlowResponseMessage;
2625
import org.eclipse.edc.signaling.domain.DataFlowStartMessage;
26+
import org.eclipse.edc.signaling.domain.DataFlowStatusMessage;
2727
import org.eclipse.edc.signaling.domain.DspDataAddress;
2828
import org.eclipse.edc.signaling.port.ClientFactory;
2929
import org.eclipse.edc.signaling.port.DataPlaneSignalingClient;
@@ -74,22 +74,24 @@ class Prepare {
7474

7575
@Test
7676
void shouldCallPrepareOnDataPlane() {
77-
var dataPlaneInstance = createDataPlaneInstance();
77+
var dataPlaneInstance = dataPlaneInstanceBuilder().id("data-plane-id").build();
7878
Map<String, Object> claims = Map.of("key", "value");
7979
var transferProcess = transferProcessBuilder().claims(claims).build();
8080
when(selectorService.selectFor(any())).thenReturn(ServiceResult.success(dataPlaneInstance));
8181
when(clientFactory.createClient(any())).thenReturn(dataPlaneClient);
82-
var flowResponseMessage = DataFlowResponseMessage.Builder.newInstance()
82+
var flowResponseMessage = DataFlowStatusMessage.Builder.newInstance()
8383
.dataAddress(createDspDataAddress())
8484
.build();
8585
when(dataPlaneClient.prepare(any())).thenReturn(StatusResult.success(flowResponseMessage));
8686
when(typeTransformerRegistry.transform(isA(DataAddress.class), any())).thenReturn(Result.success(createDspDataAddress()));
87-
var response = DataFlowResponse.Builder.newInstance().dataPlaneId("dataPlaneId").dataAddress(testDataAddress()).build();
88-
when(typeTransformerRegistry.transform(isA(DataFlowResponseMessage.class), any())).thenReturn(Result.success(response));
87+
var response = DataFlowResponse.Builder.newInstance().dataAddress(testDataAddress()).build();
88+
when(typeTransformerRegistry.transform(isA(DataFlowStatusMessage.class), any())).thenReturn(Result.success(response));
8989

9090
var result = flowController.prepare(transferProcess, policyBuilder().build());
9191

92-
assertThat(result).isSucceeded().isSameAs(response);
92+
assertThat(result).isSucceeded().satisfies(actualResponse -> {
93+
assertThat(actualResponse.getDataPlaneId()).isEqualTo("data-plane-id");
94+
});
9395
var captor = ArgumentCaptor.forClass(DataFlowPrepareMessage.class);
9496
verify(dataPlaneClient).prepare(captor.capture());
9597
var message = captor.getValue();
@@ -120,20 +122,22 @@ void shouldSelectAndCallStartOnDataplane() {
120122
.transferType(HTTP_DATA_PULL)
121123
.contentDataAddress(testDataAddress())
122124
.build();
123-
var dataPlaneInstance = createDataPlaneInstance();
125+
var dataPlaneInstance = dataPlaneInstanceBuilder().id("data-plane-id").build();
124126
when(selectorService.selectFor(any())).thenReturn(ServiceResult.success(dataPlaneInstance));
125127
when(clientFactory.createClient(any())).thenReturn(dataPlaneClient);
126128
when(typeTransformerRegistry.transform(isA(DataAddress.class), any())).thenReturn(Result.success(createDspDataAddress()));
127129
var response = DataFlowResponse.Builder.newInstance().dataPlaneId("dataPlaneId").dataAddress(testDataAddress()).build();
128-
when(typeTransformerRegistry.transform(isA(DataFlowResponseMessage.class), any())).thenReturn(Result.success(response));
129-
when(dataPlaneClient.start(any())).thenReturn(StatusResult.success(DataFlowResponseMessage.Builder.newInstance()
130+
when(typeTransformerRegistry.transform(isA(DataFlowStatusMessage.class), any())).thenReturn(Result.success(response));
131+
when(dataPlaneClient.start(any())).thenReturn(StatusResult.success(DataFlowStatusMessage.Builder.newInstance()
130132
.dataAddress(createDspDataAddress())
131133
.build()));
132134
when(dataAddressStore.resolve(any())).thenReturn(StoreResult.success(DataAddress.Builder.newInstance().type("test").build()));
133135

134136
var result = flowController.start(transferProcess, policy);
135137

136-
assertThat(result).isSucceeded().isSameAs(response);
138+
assertThat(result).isSucceeded().satisfies(actualResponse -> {
139+
assertThat(actualResponse.getDataPlaneId()).isEqualTo("data-plane-id");
140+
});
137141
var captor = ArgumentCaptor.forClass(DataFlowStartMessage.class);
138142
verify(dataPlaneClient).start(captor.capture());
139143
var message = captor.getValue();
@@ -163,7 +167,7 @@ void returnFailedResultIfTransferFails() {
163167
.build();
164168

165169
when(dataPlaneClient.start(any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR, errorMsg));
166-
var dataPlaneInstance = createDataPlaneInstance();
170+
var dataPlaneInstance = dataPlaneInstanceBuilder().build();
167171
when(selectorService.selectFor(any())).thenReturn(ServiceResult.success(dataPlaneInstance));
168172
when(clientFactory.createClient(any())).thenReturn(dataPlaneClient);
169173
when(typeTransformerRegistry.transform(isA(DataAddress.class), any())).thenReturn(Result.success(createDspDataAddress()));
@@ -426,10 +430,6 @@ private DataPlaneInstance.Builder dataPlaneInstanceBuilder() {
426430
return DataPlaneInstance.Builder.newInstance().url("http://any");
427431
}
428432

429-
private DataPlaneInstance createDataPlaneInstance() {
430-
return dataPlaneInstanceBuilder().build();
431-
}
432-
433433
private DataAddress testDataAddress() {
434434
return DataAddress.Builder.newInstance().type("test-type").build();
435435
}

0 commit comments

Comments
 (0)