3535import org .eclipse .edc .spi .monitor .Monitor ;
3636import org .eclipse .edc .spi .persistence .EdcPersistenceException ;
3737import org .eclipse .edc .spi .query .Criterion ;
38+ import org .eclipse .edc .spi .response .ResponseStatus ;
39+ import org .eclipse .edc .spi .response .StatusResult ;
3840import org .eclipse .edc .spi .result .Result ;
3941import org .eclipse .edc .spi .result .ServiceResult ;
4042import org .eclipse .edc .statemachine .AbstractStateEntityManager ;
5153import java .time .temporal .ChronoUnit ;
5254import java .util .List ;
5355import java .util .Map ;
56+ import java .util .concurrent .CompletableFuture ;
5457import java .util .function .Function ;
5558
5659import static java .util .Objects .requireNonNull ;
@@ -145,13 +148,13 @@ private void updateRequest(HolderCredentialRequest request) {
145148 transactionContext .execute (() -> update (request ));
146149 }
147150
148- private Processor processRequestsInState (HolderRequestState state , Function <HolderCredentialRequest , Boolean > function ) {
149- var filter = new Criterion []{hasState (state .code ()), isNotPending ()};
151+ private Processor processRequestsInState (HolderRequestState state , Function <HolderCredentialRequest , CompletableFuture < StatusResult < Void >> > function ) {
152+ var filter = new Criterion []{ hasState (state .code ()), isNotPending () };
150153 return createProcessor (function , filter );
151154 }
152155
153- private ProcessorImpl <HolderCredentialRequest > createProcessor (Function <HolderCredentialRequest , Boolean > function , Criterion [] filter ) {
154- return ProcessorImpl .Builder .newInstance (() -> store .nextNotLeased (batchSize , filter ))
156+ private ProcessorImpl <HolderCredentialRequest > createProcessor (Function <HolderCredentialRequest , CompletableFuture < StatusResult < Void >> > function , Criterion [] filter ) {
157+ return ProcessorImpl .Builder .newInstance (() -> store .nextNotLeased (batchSize , filter ), entityRetryProcessConfiguration , clock , monitor )
155158 .process (telemetry .contextPropagationMiddleware (function ))
156159 //.guard(pendingGuard, this::setPending) //todo: needed?
157160 .onNotProcessed (this ::breakLease )
@@ -162,16 +165,18 @@ private ProcessorImpl<HolderCredentialRequest> createProcessor(Function<HolderCr
162165 * processes all requests that are in {@link HolderRequestState#CREATED} or {@link HolderRequestState#REQUESTING} state. Credential requests that were
163166 * interrupted before receiving the Issuer's response are in this state.
164167 *
165- * @return true if the request was processed, false otherwise .
168+ * @return a CompletableFuture containing the result of processing the request .
166169 */
167- private Boolean processInitial (HolderCredentialRequest holderCredentialRequest ) {
170+ private CompletableFuture < StatusResult < Void >> processInitial (HolderCredentialRequest holderCredentialRequest ) {
168171 monitor .debug ("Processing '%s' request '%s'" .formatted (holderCredentialRequest .stateAsString (), holderCredentialRequest .getHolderPid ()));
172+
169173 var result = getCredentialRequestEndpoint (holderCredentialRequest )
170174 .compose (endpoint -> sendCredentialRequest (holderCredentialRequest , endpoint ))
171175 .compose (issuerPid -> handleCredentialResponse (issuerPid , holderCredentialRequest ))
172176 .onFailure (failure -> transactionContext .execute (() -> transitionError (holderCredentialRequest , failure .getFailureDetail ())));
173177
174- return result .succeeded ();
178+ StatusResult <Void > statusResult = result .succeeded () ? StatusResult .success () : StatusResult .failure (ResponseStatus .FATAL_ERROR , result .getFailureDetail ());
179+ return CompletableFuture .completedFuture (statusResult );
175180 }
176181
177182 /**
0 commit comments