Skip to content

Commit df5a90c

Browse files
authored
feat: add registration endpoint controller (#40)
1 parent 91d0c29 commit df5a90c

12 files changed

Lines changed: 470 additions & 37 deletions

src/main/java/org/eclipse/dataplane/Dataplane.java

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.fasterxml.jackson.core.JsonProcessingException;
1919
import com.fasterxml.jackson.databind.ObjectMapper;
2020
import org.eclipse.dataplane.domain.Result;
21+
import org.eclipse.dataplane.domain.controlplane.ControlPlane;
2122
import org.eclipse.dataplane.domain.dataflow.DataFlow;
2223
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
2324
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
@@ -26,17 +27,21 @@
2627
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
2728
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
2829
import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage;
30+
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
2931
import org.eclipse.dataplane.domain.registration.DataPlaneRegistrationMessage;
3032
import org.eclipse.dataplane.logic.OnCompleted;
3133
import org.eclipse.dataplane.logic.OnPrepare;
3234
import org.eclipse.dataplane.logic.OnStart;
3335
import org.eclipse.dataplane.logic.OnStarted;
3436
import org.eclipse.dataplane.logic.OnSuspend;
3537
import org.eclipse.dataplane.logic.OnTerminate;
38+
import org.eclipse.dataplane.port.DataPlaneRegistrationApiController;
3639
import org.eclipse.dataplane.port.DataPlaneSignalingApiController;
3740
import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed;
3841
import org.eclipse.dataplane.port.exception.DataplaneNotRegistered;
42+
import org.eclipse.dataplane.port.store.ControlPlaneStore;
3943
import org.eclipse.dataplane.port.store.DataFlowStore;
44+
import org.eclipse.dataplane.port.store.InMemoryControlPlaneStore;
4045
import org.eclipse.dataplane.port.store.InMemoryDataFlowStore;
4146

4247
import java.net.URI;
@@ -53,7 +58,8 @@
5358
public class Dataplane {
5459

5560
private final ObjectMapper objectMapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
56-
private final DataFlowStore store = new InMemoryDataFlowStore(objectMapper);
61+
private final DataFlowStore dataFlowStore = new InMemoryDataFlowStore(objectMapper);
62+
private final ControlPlaneStore controlPlaneStore = new InMemoryControlPlaneStore(objectMapper);
5763
private String id;
5864
private String endpoint;
5965
private final Set<String> transferTypes = new HashSet<>();
@@ -76,16 +82,20 @@ public DataPlaneSignalingApiController controller() {
7682
return new DataPlaneSignalingApiController(this);
7783
}
7884

85+
public DataPlaneRegistrationApiController registrationController() {
86+
return new DataPlaneRegistrationApiController(this);
87+
}
88+
7989
public Result<DataFlow> getById(String dataFlowId) {
80-
return store.findById(dataFlowId);
90+
return dataFlowStore.findById(dataFlowId);
8191
}
8292

8393
public Result<Void> save(DataFlow dataFlow) {
84-
return store.save(dataFlow);
94+
return dataFlowStore.save(dataFlow);
8595
}
8696

8797
public Result<DataFlowStatusResponseMessage> status(String dataFlowId) {
88-
return store.findById(dataFlowId)
98+
return dataFlowStore.findById(dataFlowId)
8999
.map(f -> new DataFlowStatusResponseMessage(f.getId(), f.getState().name()));
90100
}
91101

@@ -153,24 +163,24 @@ public Result<DataFlowResponseMessage> start(DataFlowStartMessage message) {
153163
}
154164

155165
public Result<Void> suspend(String flowId, DataFlowSuspendMessage message) {
156-
return store.findById(flowId)
166+
return dataFlowStore.findById(flowId)
157167
.map(dataFlow -> {
158168
dataFlow.transitionToSuspended(message.reason());
159169
return dataFlow;
160170
})
161171
.compose(onSuspend::action)
162-
.compose(store::save)
172+
.compose(dataFlowStore::save)
163173
.map(it -> null);
164174
}
165175

166176
public Result<Void> terminate(String dataFlowId, DataFlowTerminateMessage message) {
167-
return store.findById(dataFlowId)
177+
return dataFlowStore.findById(dataFlowId)
168178
.map(dataFlow -> {
169179
dataFlow.transitionToTerminated(message.reason());
170180
return dataFlow;
171181
})
172182
.compose(onTerminate::action)
173-
.compose(store::save)
183+
.compose(dataFlowStore::save)
174184
.map(it -> null);
175185
}
176186

@@ -180,7 +190,7 @@ public Result<Void> terminate(String dataFlowId, DataFlowTerminateMessage messag
180190
* @param dataFlowId the data flow id.
181191
*/
182192
public Result<Void> notifyPrepared(String dataFlowId, OnPrepare onPrepare) {
183-
return store.findById(dataFlowId)
193+
return dataFlowStore.findById(dataFlowId)
184194
.compose(onPrepare::action)
185195
.compose(dataFlow -> {
186196
dataFlow.transitionToPrepared();
@@ -197,7 +207,7 @@ public Result<Void> notifyPrepared(String dataFlowId, OnPrepare onPrepare) {
197207
* @param dataFlowId the data flow id.
198208
*/
199209
public Result<Void> notifyStarted(String dataFlowId, OnStart onStart) {
200-
return store.findById(dataFlowId)
210+
return dataFlowStore.findById(dataFlowId)
201211
.compose(onStart::action)
202212
.compose(dataFlow -> {
203213
dataFlow.transitionToStarted();
@@ -215,7 +225,7 @@ public Result<Void> notifyStarted(String dataFlowId, OnStart onStart) {
215225
* @param dataFlowId id of the data flow
216226
*/
217227
public Result<Void> notifyCompleted(String dataFlowId) {
218-
return store.findById(dataFlowId)
228+
return dataFlowStore.findById(dataFlowId)
219229
.compose(dataFlow -> {
220230
dataFlow.transitionToCompleted();
221231

@@ -230,7 +240,7 @@ public Result<Void> notifyCompleted(String dataFlowId) {
230240
* @param throwable the error
231241
*/
232242
public Result<Void> notifyErrored(String dataFlowId, Throwable throwable) {
233-
return store.findById(dataFlowId)
243+
return dataFlowStore.findById(dataFlowId)
234244
.compose(dataFlow -> {
235245
dataFlow.transitionToTerminated(throwable.getMessage());
236246

@@ -239,7 +249,7 @@ public Result<Void> notifyErrored(String dataFlowId, Throwable throwable) {
239249
}
240250

241251
public Result<Void> started(String flowId, DataFlowStartedNotificationMessage startedNotificationMessage) {
242-
return store.findById(flowId)
252+
return dataFlowStore.findById(flowId)
243253
.map(dataFlow -> {
244254
dataFlow.setDataAddress(startedNotificationMessage.dataAddress());
245255
return dataFlow;
@@ -258,7 +268,7 @@ public Result<Void> started(String flowId, DataFlowStartedNotificationMessage st
258268
* @return result indicating whether data flow was completed successfully
259269
*/
260270
public Result<Void> completed(String flowId) {
261-
return store.findById(flowId).compose(onCompleted::action)
271+
return dataFlowStore.findById(flowId).compose(onCompleted::action)
262272
.compose(dataFlow -> {
263273
dataFlow.transitionToCompleted();
264274
return save(dataFlow);
@@ -316,6 +326,23 @@ private Result<String> toJson(Object message) {
316326
}
317327
}
318328

329+
public ControlPlaneStore controlPlaneStore() {
330+
return controlPlaneStore;
331+
}
332+
333+
public Result<Void> registerControlPlane(ControlPlaneRegistrationMessage message) {
334+
var controlPlane = ControlPlane.newInstance()
335+
.id(message.controlplaneId())
336+
.endpoint(message.endpoint())
337+
.build();
338+
339+
return controlPlaneStore.save(controlPlane);
340+
}
341+
342+
public Result<Void> deleteControlPlane(String id) {
343+
return controlPlaneStore.delete(id);
344+
}
345+
319346
public static class Builder {
320347

321348
private final Dataplane dataplane = new Dataplane();
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (c) 2026 Think-it GmbH
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Think-it GmbH - initial API and implementation
12+
*
13+
*/
14+
15+
package org.eclipse.dataplane.domain.controlplane;
16+
17+
import java.util.Objects;
18+
19+
public class ControlPlane {
20+
21+
private String id;
22+
private String endpoint;
23+
24+
public String getId() {
25+
return id;
26+
}
27+
28+
public String getEndpoint() {
29+
return endpoint;
30+
}
31+
32+
public static ControlPlane.Builder newInstance() {
33+
return new ControlPlane.Builder();
34+
}
35+
36+
public static class Builder {
37+
private final ControlPlane controlPlane = new ControlPlane();
38+
39+
private Builder() {
40+
41+
}
42+
43+
public ControlPlane build() {
44+
Objects.requireNonNull(controlPlane.id);
45+
46+
return controlPlane;
47+
}
48+
49+
public Builder id(String id) {
50+
controlPlane.id = id;
51+
return this;
52+
}
53+
54+
public Builder endpoint(String endpoint) {
55+
controlPlane.endpoint = endpoint;
56+
return this;
57+
}
58+
}
59+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright (c) 2026 Think-it GmbH
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Think-it GmbH - initial API and implementation
12+
*
13+
*/
14+
15+
package org.eclipse.dataplane.domain.registration;
16+
17+
public record ControlPlaneRegistrationMessage(
18+
String controlplaneId,
19+
String endpoint
20+
// TODO: authorization
21+
) {
22+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright (c) 2025 Think-it GmbH
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Think-it GmbH - initial API and implementation
12+
*
13+
*/
14+
15+
package org.eclipse.dataplane.port;
16+
17+
import jakarta.ws.rs.Consumes;
18+
import jakarta.ws.rs.DELETE;
19+
import jakarta.ws.rs.PUT;
20+
import jakarta.ws.rs.Path;
21+
import jakarta.ws.rs.PathParam;
22+
import jakarta.ws.rs.Produces;
23+
import jakarta.ws.rs.core.Response;
24+
import org.eclipse.dataplane.Dataplane;
25+
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
26+
27+
import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
28+
29+
@Path("/v1/controlplanes")
30+
@Consumes(APPLICATION_JSON)
31+
@Produces(APPLICATION_JSON)
32+
public class DataPlaneRegistrationApiController {
33+
34+
private final Dataplane dataplane;
35+
36+
public DataPlaneRegistrationApiController(Dataplane dataplane) {
37+
this.dataplane = dataplane;
38+
}
39+
40+
@PUT
41+
@Path("/")
42+
public Response register(ControlPlaneRegistrationMessage message) {
43+
dataplane.registerControlPlane(message).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
44+
return Response.ok().build();
45+
}
46+
47+
@DELETE
48+
@Path("/{id}")
49+
public Response delete(@PathParam("id") String id) {
50+
dataplane.deleteControlPlane(id).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
51+
return Response.noContent().build();
52+
}
53+
54+
}

src/main/java/org/eclipse/dataplane/port/DataPlaneSignalingApiController.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@
1616

1717
import jakarta.ws.rs.Consumes;
1818
import jakarta.ws.rs.GET;
19-
import jakarta.ws.rs.NotFoundException;
2019
import jakarta.ws.rs.POST;
2120
import jakarta.ws.rs.Path;
2221
import jakarta.ws.rs.PathParam;
2322
import jakarta.ws.rs.Produces;
24-
import jakarta.ws.rs.WebApplicationException;
2523
import jakarta.ws.rs.core.Response;
2624
import org.eclipse.dataplane.Dataplane;
2725
import org.eclipse.dataplane.domain.dataflow.DataFlow;
@@ -31,7 +29,6 @@
3129
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
3230
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
3331
import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage;
34-
import org.eclipse.dataplane.port.exception.DataFlowNotFoundException;
3532

3633
import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
3734
import static jakarta.ws.rs.core.MediaType.WILDCARD;
@@ -50,7 +47,7 @@ public DataPlaneSignalingApiController(Dataplane dataplane) {
5047
@POST
5148
@Path("/prepare")
5249
public Response prepare(DataFlowPrepareMessage message) {
53-
var response = dataplane.prepare(message).orElseThrow(this::mapToWsRsException);
50+
var response = dataplane.prepare(message).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
5451
if (response.state().equals(DataFlow.State.PREPARING.name())) {
5552
return Response.accepted(response).build();
5653
}
@@ -60,7 +57,7 @@ public Response prepare(DataFlowPrepareMessage message) {
6057
@POST
6158
@Path("/start")
6259
public Response start(DataFlowStartMessage message) {
63-
var response = dataplane.start(message).orElseThrow(this::mapToWsRsException);
60+
var response = dataplane.start(message).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
6461
if (response.state().equals(DataFlow.State.STARTING.name())) {
6562
return Response.accepted(response).build();
6663
}
@@ -70,43 +67,36 @@ public Response start(DataFlowStartMessage message) {
7067
@POST
7168
@Path("/{flowId}/suspend")
7269
public Response suspend(@PathParam("flowId") String flowId, DataFlowSuspendMessage message) {
73-
dataplane.suspend(flowId, message).orElseThrow(this::mapToWsRsException);
70+
dataplane.suspend(flowId, message).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
7471
return Response.ok().build();
7572
}
7673

7774
@POST
7875
@Path("/{flowId}/terminate")
7976
public Response terminate(@PathParam("flowId") String flowId, DataFlowTerminateMessage message) {
80-
dataplane.terminate(flowId, message).orElseThrow(this::mapToWsRsException);
77+
dataplane.terminate(flowId, message).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
8178
return Response.ok().build();
8279
}
8380

8481
@POST
8582
@Path("/{flowId}/started")
8683
public Response started(@PathParam("flowId") String flowId, DataFlowStartedNotificationMessage startedNotificationMessage) {
87-
dataplane.started(flowId, startedNotificationMessage).orElseThrow(this::mapToWsRsException);
84+
dataplane.started(flowId, startedNotificationMessage).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
8885
return Response.ok().build();
8986
}
9087

9188
@POST
9289
@Path("/{flowId}/completed")
9390
@Consumes(WILDCARD)
9491
public Response completed(@PathParam("flowId") String flowId) {
95-
dataplane.completed(flowId).orElseThrow(this::mapToWsRsException);
92+
dataplane.completed(flowId).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
9693
return Response.ok().build();
9794
}
9895

9996
@GET
10097
@Path("/{flowId}/status")
10198
public DataFlowStatusResponseMessage status(@PathParam("flowId") String flowId) {
102-
return dataplane.status(flowId).orElseThrow(this::mapToWsRsException);
103-
}
104-
105-
private WebApplicationException mapToWsRsException(Exception exception) {
106-
if (exception instanceof DataFlowNotFoundException notFound) {
107-
return new NotFoundException(notFound);
108-
}
109-
return new WebApplicationException("unexpected internal server error");
99+
return dataplane.status(flowId).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
110100
}
111101

112102
}

0 commit comments

Comments
 (0)