2626 */
2727package org .apache .hc .client5 .http .fluent ;
2828
29+ import java .util .concurrent .CompletableFuture ;
30+ import java .util .concurrent .ExecutorService ;
2931import java .util .concurrent .Future ;
32+ import java .util .concurrent .LinkedBlockingQueue ;
33+ import java .util .concurrent .RejectedExecutionException ;
34+ import java .util .concurrent .ThreadPoolExecutor ;
35+ import java .util .concurrent .TimeUnit ;
36+ import java .util .concurrent .atomic .AtomicInteger ;
3037
38+ import org .apache .hc .core5 .annotation .Contract ;
39+ import org .apache .hc .core5 .annotation .ThreadingBehavior ;
3140import org .apache .hc .core5 .concurrent .BasicFuture ;
41+ import org .apache .hc .core5 .concurrent .DefaultThreadFactory ;
3242import org .apache .hc .core5 .concurrent .FutureCallback ;
3343import org .apache .hc .core5 .http .io .HttpClientResponseHandler ;
44+ import org .apache .hc .core5 .util .Args ;
3445
3546/**
3647 * Asynchronous executor for {@link Request}s.
3748 *
3849 * @since 4.3
3950 */
51+ @ Contract (threading = ThreadingBehavior .SAFE_CONDITIONAL )
4052public class Async {
4153
54+ private static final int DEFAULT_MAX_THREADS =
55+ Math .max (2 , Math .min (32 , Runtime .getRuntime ().availableProcessors () * 2 ));
56+
57+ private static final int DEFAULT_QUEUE_CAPACITY = 1000 ;
58+
59+ private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger (0 );
60+
4261 private Executor executor ;
43- private java .util .concurrent .Executor concurrentExec ;
62+ private volatile java .util .concurrent .Executor concurrentExec ;
63+ private volatile ExecutorService ownedConcurrentExec ;
64+
65+ private int maxThreads = DEFAULT_MAX_THREADS ;
66+ private int queueCapacity = DEFAULT_QUEUE_CAPACITY ;
4467
4568 public static Async newInstance () {
4669 return new Async ();
4770 }
4871
4972 Async () {
5073 super ();
74+ // Keep legacy behavior by default.
75+ }
76+
77+ public Async maxThreads (final int maxThreads ) {
78+ Args .positive (maxThreads , "maxThreads" );
79+ this .maxThreads = maxThreads ;
80+ rebuildOwnedExecutorIfActive ();
81+ return this ;
82+ }
83+
84+ public Async queueCapacity (final int queueCapacity ) {
85+ Args .positive (queueCapacity , "queueCapacity" );
86+ this .queueCapacity = queueCapacity ;
87+ rebuildOwnedExecutorIfActive ();
88+ return this ;
89+ }
90+
91+ /**
92+ * Enables an owned bounded default executor for asynchronous request execution using the
93+ * current {@code maxThreads} and {@code queueCapacity} settings.
94+ *
95+ * @return this instance.
96+ * @since 5.7
97+ */
98+ public Async useDefaultExecutor () {
99+ return useDefaultExecutor (this .maxThreads , this .queueCapacity );
100+ }
101+
102+ /**
103+ * Enables an owned bounded default executor for asynchronous request execution.
104+ *
105+ * @param maxThreads maximum number of threads.
106+ * @param queueCapacity maximum number of queued tasks.
107+ * @return this instance.
108+ * @since 5.7
109+ */
110+ public Async useDefaultExecutor (final int maxThreads , final int queueCapacity ) {
111+ Args .positive (maxThreads , "maxThreads" );
112+ Args .positive (queueCapacity , "queueCapacity" );
113+ this .maxThreads = maxThreads ;
114+ this .queueCapacity = queueCapacity ;
115+
116+ shutdown ();
117+ this .ownedConcurrentExec = createDefaultExecutor (this .maxThreads , this .queueCapacity );
118+ this .concurrentExec = this .ownedConcurrentExec ;
119+ return this ;
120+ }
121+
122+ private void rebuildOwnedExecutorIfActive () {
123+ if (this .ownedConcurrentExec != null ) {
124+ shutdown ();
125+ this .ownedConcurrentExec = createDefaultExecutor (this .maxThreads , this .queueCapacity );
126+ this .concurrentExec = this .ownedConcurrentExec ;
127+ }
128+ }
129+
130+ private static ExecutorService createDefaultExecutor (final int maxThreads , final int queueCapacity ) {
131+ final int instanceId = INSTANCE_COUNT .incrementAndGet ();
132+ final DefaultThreadFactory threadFactory = new DefaultThreadFactory (
133+ "httpclient5-fluent-async-" + instanceId + "-" ,
134+ true );
135+
136+ final ThreadPoolExecutor exec = new ThreadPoolExecutor (
137+ maxThreads ,
138+ maxThreads ,
139+ 60L ,
140+ TimeUnit .SECONDS ,
141+ new LinkedBlockingQueue <>(queueCapacity ),
142+ threadFactory ,
143+ new ThreadPoolExecutor .CallerRunsPolicy ());
144+
145+ exec .allowCoreThreadTimeOut (true );
146+ return exec ;
51147 }
52148
53149 public Async use (final Executor executor ) {
@@ -57,9 +153,25 @@ public Async use(final Executor executor) {
57153
58154 public Async use (final java .util .concurrent .Executor concurrentExec ) {
59155 this .concurrentExec = concurrentExec ;
156+ shutdown ();
60157 return this ;
61158 }
62159
160+ /**
161+ * Shuts down resources owned by this instance, if any.
162+ * <p>
163+ * This method never attempts to shut down executors supplied via {@link #use(java.util.concurrent.Executor)}.
164+ *
165+ * @since 5.7
166+ */
167+ public void shutdown () {
168+ final ExecutorService exec = this .ownedConcurrentExec ;
169+ if (exec != null ) {
170+ this .ownedConcurrentExec = null ;
171+ exec .shutdown ();
172+ }
173+ }
174+
63175 static class ExecRunnable <T > implements Runnable {
64176
65177 private final BasicFuture <T > future ;
@@ -100,8 +212,14 @@ public <T> Future<T> execute(
100212 request ,
101213 this .executor != null ? this .executor : Executor .newInstance (),
102214 handler );
103- if (this .concurrentExec != null ) {
104- this .concurrentExec .execute (runnable );
215+
216+ final java .util .concurrent .Executor exec = this .concurrentExec ;
217+ if (exec != null ) {
218+ try {
219+ exec .execute (runnable );
220+ } catch (final RejectedExecutionException ex ) {
221+ future .failed (ex );
222+ }
105223 } else {
106224 final Thread t = new Thread (runnable );
107225 t .setDaemon (true );
@@ -122,4 +240,108 @@ public Future<Content> execute(final Request request) {
122240 return execute (request , new ContentResponseHandler (), null );
123241 }
124242
243+ /**
244+ * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes
245+ * when the response has been fully received and converted by the given response handler.
246+ *
247+ * @param request the request to execute.
248+ * @param handler the response handler.
249+ * @param <T> the handler result type.
250+ * @return a {@code CompletableFuture} producing the handler result.
251+ * @since 5.7
252+ */
253+ public <T > CompletableFuture <T > executeAsync (final Request request , final HttpClientResponseHandler <T > handler ) {
254+ final CompletableFuture <T > cf = new CompletableFuture <>();
255+ execute (request , handler , new FutureCallback <T >() {
256+
257+ @ Override
258+ public void completed (final T result ) {
259+ cf .complete (result );
260+ }
261+
262+ @ Override
263+ public void failed (final Exception ex ) {
264+ cf .completeExceptionally (ex );
265+ }
266+
267+ @ Override
268+ public void cancelled () {
269+ cf .cancel (false );
270+ }
271+
272+ });
273+ return cf ;
274+ }
275+
276+ /**
277+ * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes
278+ * when the response has been fully received and converted by the given response handler. The given
279+ * callback is invoked on completion, failure, or cancellation.
280+ *
281+ * @param request the request to execute.
282+ * @param handler the response handler.
283+ * @param callback the callback to invoke on completion, failure, or cancellation; may be {@code null}.
284+ * @param <T> the handler result type.
285+ * @return a {@code CompletableFuture} producing the handler result.
286+ * @since 5.7
287+ */
288+ public <T > CompletableFuture <T > executeAsync (
289+ final Request request , final HttpClientResponseHandler <T > handler , final FutureCallback <T > callback ) {
290+ final CompletableFuture <T > cf = new CompletableFuture <>();
291+ execute (request , handler , new FutureCallback <T >() {
292+
293+ @ Override
294+ public void completed (final T result ) {
295+ if (callback != null ) {
296+ callback .completed (result );
297+ }
298+ cf .complete (result );
299+ }
300+
301+ @ Override
302+ public void failed (final Exception ex ) {
303+ if (callback != null ) {
304+ callback .failed (ex );
305+ }
306+ cf .completeExceptionally (ex );
307+ }
308+
309+ @ Override
310+ public void cancelled () {
311+ if (callback != null ) {
312+ callback .cancelled ();
313+ }
314+ cf .cancel (false );
315+ }
316+
317+ });
318+ return cf ;
319+ }
320+
321+ /**
322+ * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes
323+ * when the response has been fully received and converted to {@link Content}.
324+ *
325+ * @param request the request to execute.
326+ * @return a {@code CompletableFuture} producing the response {@code Content}.
327+ * @since 5.7
328+ */
329+ public CompletableFuture <Content > executeAsync (final Request request ) {
330+ return executeAsync (request , new ContentResponseHandler ());
331+ }
332+
333+ /**
334+ * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes
335+ * when the response has been fully received and converted to {@link Content}. The given callback
336+ * is invoked on completion, failure, or cancellation.
337+ *
338+ * @param request the request to execute.
339+ * @param callback the callback to invoke on completion, failure, or cancellation; may be {@code null}.
340+ * @return a {@code CompletableFuture} producing the response {@code Content}.
341+ * @since 5.7
342+ */
343+ public CompletableFuture <Content > executeAsync (final Request request , final FutureCallback <Content > callback ) {
344+ return executeAsync (request , new ContentResponseHandler (), callback );
345+ }
346+
125347}
0 commit comments