1212import java .time .Duration ;
1313import java .util .concurrent .Callable ;
1414import java .util .concurrent .TimeUnit ;
15+ import java .util .concurrent .atomic .AtomicBoolean ;
1516import java .util .concurrent .atomic .AtomicLong ;
1617
1718public class ComputePoolService {
@@ -22,15 +23,15 @@ public class ComputePoolService {
2223
2324 private final WorkerExecutor workerExecutor ;
2425
25- // Queue length : incremented on queue, decremented on completion
26- // since Vert.x worker executor doesn't expose queue length
27- private final AtomicLong queueLength = new AtomicLong (0 );
26+ // Queue pending : incremented on queue, decremented on dispatch
27+ // Tracks tasks waiting in queue but not yet started
28+ private final AtomicLong queuePending = new AtomicLong (0 );
2829
2930 // Prometheus histogram for queue wait time (time between queued and dispatched)
3031 private final Timer queueWaitTimer ;
3132
32- // Prometheus gauge for queue length
33- private final Gauge queueLengthGauge ;
33+ // Prometheus gauge for queue pending
34+ private final Gauge queuePendingGauge ;
3435
3536 /**
3637 * Creates a ComputePoolService with default pool size (available processors - 2, minimum 1).
@@ -58,8 +59,8 @@ public ComputePoolService(Vertx vertx, int poolSize) {
5859 .maximumExpectedValue (Duration .ofMillis (500 )) // 500ms
5960 .register (Metrics .globalRegistry );
6061
61- this .queueLengthGauge = Gauge .builder (METRIC_PREFIX + "queue_length " , queueLength ::get )
62- .description ("Number of tasks queued but not yet completed " )
62+ this .queuePendingGauge = Gauge .builder (METRIC_PREFIX + "queue_pending " , queuePending ::get )
63+ .description ("Number of tasks waiting in queue but not yet dispatched to a worker " )
6364 .register (Metrics .globalRegistry );
6465
6566 LOGGER .info ("ComputePoolService initialized with pool size: {}" , poolSize );
@@ -76,18 +77,30 @@ public ComputePoolService(Vertx vertx, int poolSize) {
7677 */
7778 public <T > Future <T > executeBlocking (Callable <T > callable ) {
7879 final long queuedAt = System .nanoTime ();
79- queueLength .incrementAndGet ();
80+ final AtomicBoolean dispatched = new AtomicBoolean (false );
81+
82+ queuePending .incrementAndGet ();
83+
84+ try {
85+ return workerExecutor .<T >executeBlocking (() -> {
86+ dispatched .set (true );
87+ queuePending .decrementAndGet ();
8088
81- return workerExecutor .<T >executeBlocking (() -> {
82- try {
8389 final long dispatchedAt = System .nanoTime ();
8490 queueWaitTimer .record (dispatchedAt - queuedAt , TimeUnit .NANOSECONDS );
8591
8692 return callable .call ();
87- } finally {
88- queueLength .decrementAndGet ();
89- }
90- });
93+ }).onComplete (ar -> {
94+ // If task was never dispatched, clean up
95+ if (!dispatched .get ()) {
96+ queuePending .decrementAndGet ();
97+ }
98+ });
99+ } catch (Exception e ) {
100+ // executeBlocking threw before returning a Future
101+ queuePending .decrementAndGet ();
102+ return Future .failedFuture (e );
103+ }
91104 }
92105
93106 /**
@@ -105,10 +118,10 @@ public Future<Void> executeBlocking(Runnable runnable) {
105118
106119
107120 /**
108- * Returns the current queue length ( tasks queued but not yet completed) .
121+ * Returns the number of tasks waiting in queue but not yet dispatched to a worker .
109122 */
110- public long getQueueLength () {
111- return queueLength .get ();
123+ public long getQueuePending () {
124+ return queuePending .get ();
112125 }
113126
114127 public void close () {
0 commit comments