2727import org .eclipse .dataplane .domain .dataflow .DataFlowStatusResponseMessage ;
2828import org .eclipse .dataplane .domain .dataflow .DataFlowSuspendMessage ;
2929import org .eclipse .dataplane .domain .dataflow .DataFlowTerminateMessage ;
30+ import org .eclipse .dataplane .domain .registration .Authorization ;
31+ import org .eclipse .dataplane .domain .registration .AuthorizationType ;
3032import org .eclipse .dataplane .domain .registration .ControlPlaneRegistrationMessage ;
3133import org .eclipse .dataplane .domain .registration .DataPlaneRegistrationMessage ;
3234import org .eclipse .dataplane .logic .OnCompleted ;
3739import org .eclipse .dataplane .logic .OnTerminate ;
3840import org .eclipse .dataplane .port .DataPlaneRegistrationApiController ;
3941import org .eclipse .dataplane .port .DataPlaneSignalingApiController ;
42+ import org .eclipse .dataplane .port .exception .AuthorizationNotSupported ;
4043import org .eclipse .dataplane .port .exception .DataFlowNotifyControlPlaneFailed ;
4144import org .eclipse .dataplane .port .exception .DataplaneNotRegistered ;
4245import org .eclipse .dataplane .port .store .ControlPlaneStore ;
4851import java .net .http .HttpClient ;
4952import java .net .http .HttpRequest ;
5053import java .net .http .HttpResponse ;
54+ import java .util .HashMap ;
5155import java .util .HashSet ;
56+ import java .util .Map ;
5257import java .util .Set ;
5358import java .util .UUID ;
59+ import java .util .function .BiConsumer ;
5460
5561import static com .fasterxml .jackson .databind .DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES ;
5662import static java .util .Collections .emptyMap ;
@@ -73,6 +79,7 @@ public class Dataplane {
7379 private OnCompleted onCompleted = dataFlow -> Result .failure (new UnsupportedOperationException ("onCompleted is not implemented" ));
7480
7581 private final HttpClient httpClient = HttpClient .newHttpClient ();
82+ private final Map <String , AuthorizationType > authorizationTypes = new HashMap <>();
7683
7784 public static Builder newInstance () {
7885 return new Builder ();
@@ -229,7 +236,7 @@ public Result<Void> notifyCompleted(String dataFlowId) {
229236 .compose (dataFlow -> {
230237 dataFlow .transitionToCompleted ();
231238
232- return notifyControlPlane ("completed" , dataFlow , emptyMap ()); // TODO DataFlowCompletedMessage not defined
239+ return notifyControlPlane ("completed" , dataFlow , emptyMap ());
233240 });
234241 }
235242
@@ -300,13 +307,22 @@ private Result<Void> notifyControlPlane(String action, DataFlow dataFlow, Object
300307 return toJson (message )
301308 .map (body -> {
302309 var endpoint = dataFlow .callbackEndpointFor (action );
303- var request = HttpRequest .newBuilder ()
310+ var requestBuilder = HttpRequest .newBuilder ()
304311 .uri (URI .create (endpoint ))
305312 .header ("content-type" , "application/json" )
306- .POST (HttpRequest .BodyPublishers .ofString (body ))
307- .build ();
313+ .POST (HttpRequest .BodyPublishers .ofString (body ));
314+
315+ var controlPlane = controlPlaneStore .findByEndpoint (dataFlow .getCallbackAddress ());
316+ if (controlPlane .succeeded ()) {
317+ var authorization = controlPlane .getContent ().authorization ();
318+ if (authorization != null ) {
319+ var authorizationType = authorizationTypes .get (authorization .getType ());
320+ var castAuthorization = objectMapper .convertValue (authorization , authorizationType .authorizationClass ());
321+ authorizationType .authorizationFunction ().accept (requestBuilder , castAuthorization );
322+ }
323+ }
308324
309- return httpClient .send (request , HttpResponse .BodyHandlers .discarding ());
325+ return httpClient .send (requestBuilder . build () , HttpResponse .BodyHandlers .discarding ());
310326 })
311327 .compose (response -> {
312328 var successful = response .statusCode () >= 200 && response .statusCode () < 300 ;
@@ -331,9 +347,16 @@ public ControlPlaneStore controlPlaneStore() {
331347 }
332348
333349 public Result <Void > registerControlPlane (ControlPlaneRegistrationMessage message ) {
350+ for (var auth : message .authorization ()) {
351+ if (!authorizationTypes .containsKey (auth .getType ())) {
352+ return Result .failure (new AuthorizationNotSupported (auth ));
353+ }
354+ }
355+
334356 var controlPlane = ControlPlane .newInstance ()
335357 .id (message .controlplaneId ())
336358 .endpoint (message .endpoint ())
359+ .authorization (message .authorization ())
337360 .build ();
338361
339362 return controlPlaneStore .save (controlPlane );
@@ -406,5 +429,10 @@ public Builder onTerminate(OnTerminate onTerminate) {
406429 dataplane .onTerminate = onTerminate ;
407430 return this ;
408431 }
432+
433+ public <T extends Authorization > Builder registerAuthorization (String type , Class <T > authorizationClass , BiConsumer <HttpRequest .Builder , T > authorizationFunction ) {
434+ dataplane .authorizationTypes .put (type , new AuthorizationType <>(type , authorizationClass , authorizationFunction ));
435+ return this ;
436+ }
409437 }
410438}
0 commit comments