2222import org .eclipse .dataplane .domain .DataAddress ;
2323import org .eclipse .dataplane .domain .Result ;
2424import org .eclipse .dataplane .domain .dataflow .DataFlow ;
25- import org .eclipse .dataplane .domain .dataflow .DataFlowPrepareMessage ;
2625import org .eclipse .dataplane .domain .dataflow .DataFlowResumeMessage ;
27- import org .eclipse .dataplane .domain .dataflow .DataFlowStartMessage ;
2826import org .eclipse .dataplane .domain .dataflow .DataFlowStartedNotificationMessage ;
2927import org .eclipse .dataplane .domain .dataflow .DataFlowStatusMessage ;
3028import org .eclipse .dataplane .domain .dataflow .DataFlowSuspendMessage ;
4947import java .util .concurrent .TimeUnit ;
5048
5149import static java .util .Collections .emptyList ;
52- import static java .util .Collections .emptyMap ;
5350import static org .assertj .core .api .Assertions .assertThat ;
5451import static org .awaitility .Awaitility .await ;
52+ import static org .eclipse .dataplane .MessageFactory .createPrepareMessage ;
53+ import static org .eclipse .dataplane .MessageFactory .createStartMessage ;
5554import static org .eclipse .dataplane .authorization .TestAuthorization .TOKEN_GENERATOR ;
5655import static org .eclipse .dataplane .authorization .TestAuthorization .createAuthorizationProfile ;
5756import static org .eclipse .dataplane .domain .dataflow .DataFlow .State .PREPARED ;
@@ -86,14 +85,14 @@ void shouldPullDataFromProvider_thenProviderTerminatesIt() {
8685 var transferType = "FileSystemStreaming-PULL" ;
8786 var processId = UUID .randomUUID ().toString ();
8887 var consumerProcessId = "consumer_" + processId ;
89- var prepareMessage = prepareMessage (consumerProcessId , transferType );
88+ var prepareMessage = createPrepareMessage (consumerProcessId , controlPlane . consumerCallbackAddress () , transferType );
9089
9190 var prepareResponse = controlPlane .consumerPrepare (prepareMessage ).statusCode (200 ).extract ().as (DataFlowStatusMessage .class );
9291 assertThat (prepareResponse .state ()).isEqualTo (PREPARED .name ());
9392 assertThat (prepareResponse .dataAddress ()).isNull ();
9493
9594 var providerProcessId = "provider_" + processId ;
96- var startMessage = startMessage (providerProcessId , transferType );
95+ var startMessage = createStartMessage (providerProcessId , controlPlane . providerCallbackAddress () , transferType );
9796 var startResponse = controlPlane .providerStart (startMessage ).statusCode (200 ).extract ().as (DataFlowStatusMessage .class );
9897 assertThat (startResponse .state ()).isEqualTo (STARTED .name ());
9998 assertThat (startResponse .dataAddress ()).isNotNull ();
@@ -114,14 +113,14 @@ void shouldSuspendAndResumeOnProvider() {
114113 var transferType = "FileSystemStreaming-PULL" ;
115114 var processId = UUID .randomUUID ().toString ();
116115 var consumerProcessId = "consumer_" + processId ;
117- var prepareMessage = prepareMessage (consumerProcessId , transferType );
116+ var prepareMessage = createPrepareMessage (consumerProcessId , controlPlane . consumerCallbackAddress () , transferType );
118117
119118 var prepareResponse = controlPlane .consumerPrepare (prepareMessage ).statusCode (200 ).extract ().as (DataFlowStatusMessage .class );
120119 assertThat (prepareResponse .state ()).isEqualTo (PREPARED .name ());
121120 assertThat (prepareResponse .dataAddress ()).isNull ();
122121
123122 var providerProcessId = "provider_" + processId ;
124- var startMessage = startMessage (providerProcessId , transferType );
123+ var startMessage = createStartMessage (providerProcessId , controlPlane . providerCallbackAddress () , transferType );
125124 var startResponse = controlPlane .providerStart (startMessage ).statusCode (200 ).extract ().as (DataFlowStatusMessage .class );
126125 assertThat (startResponse .state ()).isEqualTo (STARTED .name ());
127126 assertThat (startResponse .dataAddress ()).isNotNull ();
@@ -141,18 +140,6 @@ void shouldSuspendAndResumeOnProvider() {
141140 consumerDataPlane .assertDataIsFlowing ();
142141 }
143142
144- private DataFlowPrepareMessage prepareMessage (String consumerProcessId , String transferType ) {
145- return new DataFlowPrepareMessage ("theMessageId" , "theParticipantId" , "theCounterPartyId" ,
146- "theDataspaceContext" , consumerProcessId , "theAgreementId" , "theDatasetId" , controlPlane .consumerCallbackAddress (),
147- transferType , emptyList (), emptyMap ());
148- }
149-
150- private DataFlowStartMessage startMessage (String providerProcessId , String transferType ) {
151- return new DataFlowStartMessage ("theMessageId" , "theParticipantId" , "theCounterPartyId" ,
152- "theDataspaceContext" , providerProcessId , "theAgreementId" , "theDatasetId" , controlPlane .providerCallbackAddress (),
153- transferType , null , emptyList (), emptyMap ());
154- }
155-
156143 private DataFlowResumeMessage resumeMessage (String providerProcessId ) {
157144 return new DataFlowResumeMessage ("theMessageId" , providerProcessId , null );
158145 }
0 commit comments