3939import org .eclipse .dataplane .port .DataPlaneRegistrationApiController ;
4040import org .eclipse .dataplane .port .DataPlaneSignalingApiController ;
4141import org .eclipse .dataplane .port .exception .AuthorizationNotSupported ;
42+ import org .eclipse .dataplane .port .exception .ControlPlaneNotRegistered ;
4243import org .eclipse .dataplane .port .exception .DataFlowNotifyControlPlaneFailed ;
4344import org .eclipse .dataplane .port .exception .DataplaneNotRegistered ;
45+ import org .eclipse .dataplane .port .exception .ResourceNotFoundException ;
4446import org .eclipse .dataplane .port .store .ControlPlaneStore ;
4547import org .eclipse .dataplane .port .store .DataFlowStore ;
4648import org .eclipse .dataplane .port .store .InMemoryControlPlaneStore ;
5759import java .util .UUID ;
5860
5961import static com .fasterxml .jackson .databind .DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES ;
62+ import static jakarta .ws .rs .core .HttpHeaders .AUTHORIZATION ;
6063import static java .util .Collections .emptyMap ;
6164
6265public class Dataplane {
@@ -84,7 +87,7 @@ public static Builder newInstance() {
8487 }
8588
8689 public DataPlaneSignalingApiController controller () {
87- return new DataPlaneSignalingApiController (this );
90+ return new DataPlaneSignalingApiController (this , authorizations );
8891 }
8992
9093 public DataPlaneRegistrationApiController registrationController () {
@@ -104,7 +107,14 @@ public Result<DataFlowStatusResponseMessage> status(String dataFlowId) {
104107 .map (f -> new DataFlowStatusResponseMessage (f .getId (), f .getState ().name ()));
105108 }
106109
107- public Result <DataFlowResponseMessage > prepare (DataFlowPrepareMessage message ) {
110+ private Result <Void > checkControlPlane (String controlplaneId ) {
111+ if (controlPlaneStore .exists (controlplaneId )) {
112+ return Result .success ();
113+ }
114+ return Result .failure (new ControlPlaneNotRegistered (controlplaneId ));
115+ }
116+
117+ public Result <DataFlowResponseMessage > prepare (String controlplaneId , DataFlowPrepareMessage message ) {
108118 var initialDataFlow = DataFlow .newInstance ()
109119 .id (message .processId ())
110120 .state (DataFlow .State .INITIATING )
@@ -117,9 +127,11 @@ public Result<DataFlowResponseMessage> prepare(DataFlowPrepareMessage message) {
117127 .participantId (message .participantId ())
118128 .counterPartyId (message .counterPartyId ())
119129 .dataspaceContext (message .dataspaceContext ())
130+ .controlplaneId (controlplaneId )
120131 .build ();
121132
122- return onPrepare .action (initialDataFlow )
133+ return checkControlPlane (controlplaneId )
134+ .compose (v -> onPrepare .action (initialDataFlow ))
123135 .compose (dataFlow -> {
124136 if (dataFlow .isInitiating ()) {
125137 dataFlow .transitionToPrepared ();
@@ -137,7 +149,7 @@ public Result<DataFlowResponseMessage> prepare(DataFlowPrepareMessage message) {
137149 }
138150
139151
140- public Result <DataFlowResponseMessage > start (DataFlowStartMessage message ) {
152+ public Result <DataFlowResponseMessage > start (String controlplaneId , DataFlowStartMessage message ) {
141153 var initialDataFlow = DataFlow .newInstance ()
142154 .id (message .processId ())
143155 .state (DataFlow .State .INITIATING )
@@ -149,9 +161,11 @@ public Result<DataFlowResponseMessage> start(DataFlowStartMessage message) {
149161 .participantId (message .participantId ())
150162 .counterPartyId (message .counterPartyId ())
151163 .dataspaceContext (message .dataspaceContext ())
164+ .controlplaneId (controlplaneId )
152165 .build ();
153166
154- return onStart .action (initialDataFlow )
167+ return checkControlPlane (controlplaneId )
168+ .compose (v -> onStart .action (initialDataFlow ))
155169 .compose (dataFlow -> {
156170 if (dataFlow .isInitiating ()) {
157171 dataFlow .transitionToStarted ();
@@ -310,14 +324,16 @@ private Result<Void> notifyControlPlane(String action, DataFlow dataFlow, Object
310324 .header ("content-type" , "application/json" )
311325 .POST (HttpRequest .BodyPublishers .ofString (body ));
312326
313- var controlPlane = controlPlaneStore .findByEndpoint (dataFlow .getCallbackAddress ());
314- if (controlPlane .succeeded ()) {
315- var authorizationProfile = controlPlane .getContent ().authorization ();
316- if (authorizationProfile != null ) {
317- var authorization = authorizations .get (authorizationProfile .getType ());
318- authorization .apply (requestBuilder , authorizationProfile );
319- }
320- }
327+ controlPlaneStore .findById (dataFlow .getControlplaneId ())
328+ .compose (controlPlane -> {
329+ var authorizationProfile = controlPlane .authorization ();
330+ if (authorizationProfile != null ) {
331+ var authorization = authorizations .get (authorizationProfile .getType ());
332+ return authorization .authorizationHeader (authorizationProfile );
333+ }
334+ return Result .failure (new ResourceNotFoundException ("ControlPlane has no authorization" ));
335+ })
336+ .onSuccess (authorizationHeader -> requestBuilder .header (AUTHORIZATION , authorizationHeader ));
321337
322338 return httpClient .send (requestBuilder .build (), HttpResponse .BodyHandlers .discarding ());
323339 })
0 commit comments