Skip to content

Commit bd79c14

Browse files
authored
feat: add filtering for participant context in selectFor (#5669)
1 parent d453272 commit bd79c14

13 files changed

Lines changed: 121 additions & 49 deletions

File tree

core/data-plane-selector/data-plane-selector-core/src/main/java/org/eclipse/edc/connector/dataplane/selector/service/EmbeddedDataPlaneSelectorService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.function.Predicate;
3030

3131
import static org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstanceStates.UNREGISTERED;
32+
import static org.eclipse.edc.participantcontext.spi.types.ParticipantResource.queryByParticipantContextId;
3233

3334
public class EmbeddedDataPlaneSelectorService implements DataPlaneSelectorService {
3435

@@ -85,7 +86,7 @@ public ServiceResult<DataPlaneInstance> selectFor(TransferProcess transferProces
8586
return ServiceResult.badRequest("Strategy " + selectionStrategy + " was not found");
8687
}
8788
return transactionContext.execute(() -> {
88-
try (var stream = store.getAll()) {
89+
try (var stream = store.query(queryByParticipantContextId(transferProcess.getParticipantContextId()).build())) {
8990
var dataPlanes = stream
9091
.filter(it -> it.getState() != UNREGISTERED.code())
9192
.filter(it -> it.getAllowedTransferTypes().contains(transferProcess.getTransferType()))

core/data-plane-selector/data-plane-selector-core/src/main/java/org/eclipse/edc/connector/dataplane/selector/store/InMemoryDataPlaneInstanceStore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstanceStates;
1919
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
2020
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
21+
import org.eclipse.edc.spi.query.QuerySpec;
2122
import org.eclipse.edc.spi.result.StoreResult;
2223
import org.eclipse.edc.store.InMemoryStatefulEntityStore;
2324

@@ -52,4 +53,9 @@ public StoreResult<DataPlaneInstance> deleteById(String instanceId) {
5253
public Stream<DataPlaneInstance> getAll() {
5354
return findAll();
5455
}
56+
57+
@Override
58+
public Stream<DataPlaneInstance> query(QuerySpec querySpec) {
59+
return findAll(querySpec);
60+
}
5561
}

core/data-plane-selector/data-plane-selector-core/src/test/java/org/eclipse/edc/connector/dataplane/selector/service/EmbeddedDataPlaneSelectorServiceTest.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,20 @@ public class EmbeddedDataPlaneSelectorServiceTest {
5454
private final String configuredSelectionStrategy = "strategy";
5555
private final DataPlaneSelectorService service = new EmbeddedDataPlaneSelectorService(store, selectionStrategyRegistry, new NoopTransactionContext(), configuredSelectionStrategy);
5656

57+
private DataPlaneInstance.Builder createInstanceBuilder(String id) {
58+
return DataPlaneInstance.Builder.newInstance()
59+
.id(id)
60+
.url("http://any");
61+
}
62+
63+
private DataAddress createAddress(String type) {
64+
return DataAddress.Builder.newInstance()
65+
.type(type)
66+
.keyName("key-name")
67+
.property("someprop", "someval")
68+
.build();
69+
}
70+
5771
@Nested
5872
class SelectFor {
5973

@@ -64,7 +78,7 @@ void shouldUseConfiguredSelector() {
6478
createInstanceBuilder("ignoredInstance").allowedTransferType("anotherTransferType").state(REGISTERED.code()).build(),
6579
createInstanceBuilder("expectedInstance").allowedTransferType("chosenTransferType").state(REGISTERED.code()).build()
6680
);
67-
when(store.getAll()).thenAnswer(i -> instances.stream());
81+
when(store.query(any())).thenAnswer(i -> instances.stream());
6882
var strategy = testRandomSelectionStrategy();
6983
when(selectionStrategyRegistry.find(any())).thenReturn(strategy);
7084

@@ -81,7 +95,7 @@ void shouldFilterOutUnregisteredInstances() {
8195
.allowedSourceType("srcTestType").allowedTransferType("chosenTransferType").build();
8296
var unregisteredInstance = createInstanceBuilder("unregistered").state(UNREGISTERED.code())
8397
.allowedSourceType("srcTestType").allowedTransferType("chosenTransferType").build();
84-
when(store.getAll()).thenReturn(Stream.of(unregisteredInstance, registeredInstance));
98+
when(store.query(any())).thenReturn(Stream.of(unregisteredInstance, registeredInstance));
8599
var strategy = testRandomSelectionStrategy();
86100
when(selectionStrategyRegistry.find(any())).thenReturn(strategy);
87101

@@ -101,7 +115,7 @@ void shouldFilterInstancesWithLabels_whenLabelsAreDefined() {
101115
.build();
102116
var expectedInstance = createInstanceBuilder("expectedInstance").state(REGISTERED.code()).allowedTransferType("chosenTransferType")
103117
.label("gold").label("blue").build();
104-
when(store.getAll()).thenReturn(Stream.of(ignoredInstance, expectedInstance));
118+
when(store.query(any())).thenReturn(Stream.of(ignoredInstance, expectedInstance));
105119
var strategy = testRandomSelectionStrategy();
106120
when(selectionStrategyRegistry.find(any())).thenReturn(strategy);
107121

@@ -320,18 +334,4 @@ void shouldFail_whenLeaseFails() {
320334
verify(store, never()).save(any());
321335
}
322336
}
323-
324-
private DataPlaneInstance.Builder createInstanceBuilder(String id) {
325-
return DataPlaneInstance.Builder.newInstance()
326-
.id(id)
327-
.url("http://any");
328-
}
329-
330-
private DataAddress createAddress(String type) {
331-
return DataAddress.Builder.newInstance()
332-
.type(type)
333-
.keyName("key-name")
334-
.property("someprop", "someval")
335-
.build();
336-
}
337337
}

data-protocols/data-plane-signaling/data-plane-signaling-core/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ dependencies {
2323
api(project(":spi:common:json-ld-spi"))
2424
api(project(":spi:common:transform-spi"))
2525
api(project(":spi:common:web-spi"))
26+
api(project(":spi:common:participant-context-single-spi"))
2627
api(project(":spi:control-plane:contract-spi"))
2728
api(project(":spi:control-plane:control-plane-spi"))
2829
api(project(":spi:control-plane:transfer-spi"))

data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingApiExtension.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import org.eclipse.edc.connector.controlplane.services.spi.transferprocess.TransferProcessService;
1818
import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
19+
import org.eclipse.edc.participantcontext.single.spi.SingleParticipantContextSupplier;
1920
import org.eclipse.edc.runtime.metamodel.annotation.Configuration;
2021
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
2122
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
@@ -26,6 +27,7 @@
2627
import org.eclipse.edc.signaling.port.transformer.DataFlowStatusMessageToDataFlowResponseTransformer;
2728
import org.eclipse.edc.signaling.port.transformer.DspDataAddressToDataAddressTransformer;
2829
import org.eclipse.edc.signaling.spi.authorization.SignalingAuthorizationRegistry;
30+
import org.eclipse.edc.spi.monitor.Monitor;
2931
import org.eclipse.edc.spi.system.ServiceExtension;
3032
import org.eclipse.edc.spi.system.ServiceExtensionContext;
3133
import org.eclipse.edc.spi.system.apiversion.ApiVersionService;
@@ -64,6 +66,12 @@ public class DataPlaneSignalingApiExtension implements ServiceExtension {
6466
@Inject
6567
private SignalingAuthorizationRegistry signalingAuthorizationRegistry;
6668

69+
@Inject(required = false)
70+
private SingleParticipantContextSupplier participantContextSupplier;
71+
72+
@Inject
73+
private Monitor monitor;
74+
6775
@Override
6876
public String name() {
6977
return NAME;
@@ -79,7 +87,12 @@ public void initialize(ServiceExtensionContext context) {
7987
typeTransformerRegistry.register(new DataFlowStatusMessageToDataFlowResponseTransformer());
8088
typeTransformerRegistry.register(new DspDataAddressToDataAddressTransformer());
8189

82-
webService.registerResource(ApiContext.MANAGEMENT, new DataPlaneRegistrationApiV4Controller(dataPlaneSelectorService));
90+
if (participantContextSupplier != null) {
91+
webService.registerResource(ApiContext.MANAGEMENT, new DataPlaneRegistrationApiV4Controller(dataPlaneSelectorService, participantContextSupplier));
92+
} else {
93+
monitor.debug("Running in virtual mode registration of DataPlaneRegistrationApiV4Controller will be skipped");
94+
}
95+
8396
webService.registerResource(ApiContext.SIGNALING, new DataPlaneTransferAuthorizationFilter(signalingAuthorizationRegistry, transferProcessService, dataPlaneSelectorService));
8497
webService.registerResource(ApiContext.SIGNALING, new DataPlaneTransferApiController(transferProcessService, typeTransformerRegistry));
8598

data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/api/DataPlaneRegistrationApiV4Controller.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
2525
import org.eclipse.edc.connector.dataplane.selector.spi.instance.AuthorizationProfile;
2626
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
27+
import org.eclipse.edc.participantcontext.single.spi.SingleParticipantContextSupplier;
2728
import org.eclipse.edc.signaling.domain.DataPlaneRegistrationMessage;
2829

2930
import java.util.Map;
@@ -37,23 +38,26 @@
3738
public class DataPlaneRegistrationApiV4Controller implements DataPlaneRegistrationApiV4 {
3839

3940
private final DataPlaneSelectorService dataPlaneSelectorService;
41+
private final SingleParticipantContextSupplier participantContextSupplier;
4042

41-
public DataPlaneRegistrationApiV4Controller(DataPlaneSelectorService dataPlaneSelectorService) {
43+
public DataPlaneRegistrationApiV4Controller(DataPlaneSelectorService dataPlaneSelectorService, SingleParticipantContextSupplier participantContextSupplier) {
4244
this.dataPlaneSelectorService = dataPlaneSelectorService;
45+
this.participantContextSupplier = participantContextSupplier;
4346
}
4447

4548
@PUT
4649
@Override
4750
public Response register(DataPlaneRegistrationMessage registration) {
4851
toAuthorizationProfile(registration.authorization());
49-
var dataPlaneInstance = DataPlaneInstance.Builder.newInstance()
50-
.id(registration.dataplaneId())
51-
.url(registration.endpoint())
52-
.allowedTransferType(registration.transferTypes())
53-
.authorizationProfile(toAuthorizationProfile(registration.authorization()))
54-
.build();
55-
56-
dataPlaneSelectorService.register(dataPlaneInstance)
52+
53+
participantContextSupplier.get().map(participantContext -> DataPlaneInstance.Builder.newInstance()
54+
.id(registration.dataplaneId())
55+
.url(registration.endpoint())
56+
.allowedTransferType(registration.transferTypes())
57+
.authorizationProfile(toAuthorizationProfile(registration.authorization()))
58+
.participantContextId(participantContext.getParticipantContextId())
59+
.build())
60+
.compose(dataPlaneSelectorService::register)
5761
.orElseThrow(it -> mapToException(it, DataPlaneInstance.class, registration.dataplaneId()));
5862

5963
return Response.ok().build();

data-protocols/data-plane-signaling/data-plane-signaling-core/src/test/java/org/eclipse/edc/signaling/DataPlaneRegistrationApiV4ControllerTest.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import io.restassured.specification.RequestSpecification;
1919
import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
2020
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
21+
import org.eclipse.edc.participantcontext.single.spi.SingleParticipantContextSupplier;
22+
import org.eclipse.edc.participantcontext.spi.types.ParticipantContext;
2123
import org.eclipse.edc.signaling.port.api.DataPlaneRegistrationApiV4Controller;
2224
import org.eclipse.edc.spi.result.ServiceResult;
2325
import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase;
@@ -38,6 +40,20 @@
3840
public class DataPlaneRegistrationApiV4ControllerTest extends RestControllerTestBase {
3941

4042
private final DataPlaneSelectorService dataPlaneSelectorService = mock();
43+
private final SingleParticipantContextSupplier participantContextSupplier = () ->
44+
ServiceResult.success(ParticipantContext.Builder.newInstance().participantContextId("participant-context-id").identity("identity").build());
45+
46+
@Override
47+
protected Object controller() {
48+
return new DataPlaneRegistrationApiV4Controller(dataPlaneSelectorService, participantContextSupplier);
49+
}
50+
51+
private RequestSpecification baseRequest() {
52+
return given()
53+
.baseUri("http://localhost:" + port)
54+
.basePath("/v4beta")
55+
.when();
56+
}
4157

4258
@Nested
4359
class Register {
@@ -112,16 +128,4 @@ void shouldReturnError_whenDeletionFails() {
112128
.statusCode(409);
113129
}
114130
}
115-
116-
@Override
117-
protected Object controller() {
118-
return new DataPlaneRegistrationApiV4Controller(dataPlaneSelectorService);
119-
}
120-
121-
private RequestSpecification baseRequest() {
122-
return given()
123-
.baseUri("http://localhost:" + port)
124-
.basePath("/v4beta")
125-
.when();
126-
}
127131
}

data-protocols/data-plane-signaling/data-plane-signaling-core/src/test/java/org/eclipse/edc/signaling/port/api/DataPlaneRegistrationApiV4ControllerTest.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
import io.restassured.http.ContentType;
1818
import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
19+
import org.eclipse.edc.participantcontext.single.spi.SingleParticipantContextSupplier;
20+
import org.eclipse.edc.participantcontext.spi.types.ParticipantContext;
1921
import org.eclipse.edc.signaling.domain.DataPlaneRegistrationMessage;
2022
import org.eclipse.edc.spi.result.ServiceResult;
2123
import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase;
@@ -36,6 +38,14 @@ class DataPlaneRegistrationApiV4ControllerTest extends RestControllerTestBase {
3638

3739
private final DataPlaneSelectorService dataPlaneSelectorService = mock();
3840

41+
private final SingleParticipantContextSupplier participantContextSupplier = () ->
42+
ServiceResult.success(ParticipantContext.Builder.newInstance().participantContextId("participant-context-id").identity("identity").build());
43+
44+
@Override
45+
protected Object controller() {
46+
return new DataPlaneRegistrationApiV4Controller(dataPlaneSelectorService, participantContextSupplier);
47+
}
48+
3949
@Nested
4050
class Register {
4151

@@ -55,9 +65,9 @@ void shouldRegisterDataPlane() {
5565

5666
verify(dataPlaneSelectorService).register(argThat(instance ->
5767
instance.getId().equals("dp-id") &&
58-
instance.getUrl().toString().equals("http://dataplane/endpoint") &&
59-
instance.getAllowedTransferTypes().contains("HttpData-PUSH") &&
60-
instance.getAuthorizationProfile() == null
68+
instance.getUrl().toString().equals("http://dataplane/endpoint") &&
69+
instance.getAllowedTransferTypes().contains("HttpData-PUSH") &&
70+
instance.getAuthorizationProfile() == null
6171
));
6272
}
6373

@@ -79,7 +89,7 @@ void shouldRegisterDataPlane_withAuthorizationProfile() {
7989

8090
verify(dataPlaneSelectorService).register(argThat(instance ->
8191
instance.getAuthorizationProfile() != null &&
82-
instance.getAuthorizationProfile().type().equals("oauth2")
92+
instance.getAuthorizationProfile().type().equals("oauth2")
8393
));
8494
}
8595

@@ -130,9 +140,4 @@ void shouldReturnNotFound_whenDataPlaneDoesNotExist() {
130140
.statusCode(404);
131141
}
132142
}
133-
134-
@Override
135-
protected Object controller() {
136-
return new DataPlaneRegistrationApiV4Controller(dataPlaneSelectorService);
137-
}
138143
}

extensions/data-plane-selector/store/sql/data-plane-instance-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/selector/store/sql/SqlDataPlaneInstanceStore.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,18 @@ public Stream<DataPlaneInstance> getAll() {
162162
});
163163
}
164164

165+
@Override
166+
public Stream<DataPlaneInstance> query(QuerySpec querySpec) {
167+
return transactionContext.execute(() -> {
168+
try {
169+
var statement = statements.createQuery(querySpec);
170+
return queryExecutor.query(getConnection(), true, this::mapResultSet, statement.getQueryAsString(), statement.getParameters());
171+
} catch (SQLException e) {
172+
throw new EdcPersistenceException(e);
173+
}
174+
});
175+
}
176+
165177
private DataPlaneInstance findByIdInternal(Connection connection, String id) {
166178
var sql = statements.getFindByIdTemplate();
167179
return queryExecutor.single(connection, false, this::mapResultSet, sql, id);

extensions/data-plane-selector/store/sql/data-plane-instance-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/selector/store/sql/schema/DataPlaneInstanceMapping.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public DataPlaneInstanceMapping(DataPlaneInstanceStatements statements) {
3131
add("stateCount", data);
3232
add("stateTimestamp", data);
3333
add("createdAt", data);
34+
add("participantContextId", data);
3435
add("traceContext", new JsonFieldTranslator(statements.getTraceContextColumn()));
3536
add("errorDetail", data);
3637
}

0 commit comments

Comments
 (0)