Skip to content

Commit b4a2d55

Browse files
committed
Added ComputePoolService and moved the identity map endpoints onto the worker pool
1 parent f68d176 commit b4a2d55

5 files changed

Lines changed: 162 additions & 9 deletions

File tree

src/main/java/com/uid2/operator/Main.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,11 @@ private void run() throws Exception {
341341
this.createVertxInstancesMetric();
342342
this.createVertxEventLoopsMetric();
343343

344+
// Create shared compute pool for CPU-intensive operations
345+
final ComputePoolService computePoolService = new ComputePoolService(vertx);
346+
344347
Supplier<Verticle> operatorVerticleSupplier = () -> {
345-
UIDOperatorVerticle verticle = new UIDOperatorVerticle(configStore, config, this.clientSideTokenGenerate, siteProvider, clientKeyProvider, clientSideKeypairProvider, getKeyManager(), saltProvider, optOutStore, Clock.systemUTC(), _statsCollectorQueue, new SecureLinkValidatorService(this.serviceLinkProvider, this.serviceProvider), this.shutdownHandler::handleSaltRetrievalResponse, this.uidInstanceIdProvider);
348+
UIDOperatorVerticle verticle = new UIDOperatorVerticle(configStore, config, this.clientSideTokenGenerate, siteProvider, clientKeyProvider, clientSideKeypairProvider, getKeyManager(), saltProvider, optOutStore, Clock.systemUTC(), _statsCollectorQueue, new SecureLinkValidatorService(this.serviceLinkProvider, this.serviceProvider), this.shutdownHandler::handleSaltRetrievalResponse, this.uidInstanceIdProvider, computePoolService);
346349
return verticle;
347350
};
348351

@@ -371,6 +374,7 @@ private void run() throws Exception {
371374
})
372375
.onFailure(t -> {
373376
LOGGER.error("Failed to bootstrap operator: " + t.getMessage(), new Exception(t));
377+
computePoolService.close();
374378
vertx.close();
375379
System.exit(1);
376380
});
@@ -499,7 +503,8 @@ private static Vertx createVertx() {
499503

500504
VertxOptions vertxOptions = new VertxOptions()
501505
.setMetricsOptions(metricOptions)
502-
.setBlockedThreadCheckInterval(threadBlockedCheckInterval);
506+
.setBlockedThreadCheckInterval(threadBlockedCheckInterval)
507+
.setWorkerPoolSize(8);
503508

504509
return Vertx.vertx(vertxOptions);
505510
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package com.uid2.operator.service;
2+
3+
import io.micrometer.core.instrument.Gauge;
4+
import io.micrometer.core.instrument.Metrics;
5+
import io.micrometer.core.instrument.Timer;
6+
import io.vertx.core.Future;
7+
import io.vertx.core.Vertx;
8+
import io.vertx.core.WorkerExecutor;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import java.time.Duration;
13+
import java.util.concurrent.Callable;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.atomic.AtomicLong;
16+
17+
public class ComputePoolService {
18+
private static final Logger LOGGER = LoggerFactory.getLogger(ComputePoolService.class);
19+
20+
private static final String POOL_NAME = "compute";
21+
private static final String METRIC_PREFIX = "uid2_compute_pool_";
22+
23+
private final WorkerExecutor workerExecutor;
24+
25+
// Queue length: incremented on queue, decremented on completion
26+
// since Vert.x worker executor doesn't expose queue length
27+
private final AtomicLong queueLength = new AtomicLong(0);
28+
29+
// Prometheus histogram for queue wait time (time between queued and dispatched)
30+
private final Timer queueWaitTimer;
31+
32+
// Prometheus gauge for queue length
33+
private final Gauge queueLengthGauge;
34+
35+
/**
36+
* Creates a ComputePoolService with default pool size (available processors - 2, minimum 1).
37+
*
38+
* @param vertx the Vert.x instance
39+
*/
40+
public ComputePoolService(Vertx vertx) {
41+
this(vertx, Math.max(1, Runtime.getRuntime().availableProcessors() - 2));
42+
}
43+
44+
/**
45+
* Creates a ComputePoolService with a specified pool size.
46+
*
47+
* @param vertx the Vert.x instance
48+
* @param poolSize the number of worker threads in the pool
49+
*/
50+
public ComputePoolService(Vertx vertx, int poolSize) {
51+
this.workerExecutor = vertx.createSharedWorkerExecutor(POOL_NAME, poolSize);
52+
53+
// Histogram buckets are logarithmically distributed between 0.1ms and 500ms
54+
this.queueWaitTimer = Timer.builder(METRIC_PREFIX + "queue_wait_seconds")
55+
.description("Time tasks spend waiting in queue before being dispatched to a worker")
56+
.publishPercentileHistogram()
57+
.minimumExpectedValue(Duration.ofNanos(100_000)) // 0.1ms
58+
.maximumExpectedValue(Duration.ofMillis(500)) // 500ms
59+
.register(Metrics.globalRegistry);
60+
61+
this.queueLengthGauge = Gauge.builder(METRIC_PREFIX + "queue_length", queueLength::get)
62+
.description("Number of tasks queued but not yet completed")
63+
.register(Metrics.globalRegistry);
64+
65+
LOGGER.info("ComputePoolService initialized with pool size: {}", poolSize);
66+
}
67+
68+
/**
69+
* Queues a blocking task for execution on the compute worker pool.
70+
* <p>
71+
* Thread-safety: This method can be safely called from multiple threads concurrently.
72+
*
73+
* @param <T> the result type
74+
* @param callable the blocking task to execute
75+
* @return a Future that completes with the task result
76+
*/
77+
public <T> Future<T> executeBlocking(Callable<T> callable) {
78+
final long queuedAt = System.nanoTime();
79+
queueLength.incrementAndGet();
80+
81+
return workerExecutor.<T>executeBlocking(() -> {
82+
try {
83+
final long dispatchedAt = System.nanoTime();
84+
queueWaitTimer.record(dispatchedAt - queuedAt, TimeUnit.NANOSECONDS);
85+
86+
return callable.call();
87+
} finally {
88+
queueLength.decrementAndGet();
89+
}
90+
});
91+
}
92+
93+
/**
94+
* Queues a blocking task that doesn't return a value.
95+
*
96+
* @param runnable the blocking task to execute
97+
* @return a Future that completes when the task finishes
98+
*/
99+
public Future<Void> executeBlocking(Runnable runnable) {
100+
return executeBlocking(() -> {
101+
runnable.run();
102+
return null;
103+
});
104+
}
105+
106+
107+
/**
108+
* Returns the current queue length (tasks queued but not yet completed).
109+
*/
110+
public long getQueueLength() {
111+
return queueLength.get();
112+
}
113+
114+
public void close() {
115+
if (workerExecutor != null) {
116+
workerExecutor.close();
117+
LOGGER.info("ComputePoolService closed");
118+
}
119+
}
120+
}

src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public class UIDOperatorVerticle extends AbstractVerticle {
133133
public static final long OPT_OUT_CHECK_CUTOFF_DATE = Instant.parse("2023-09-01T00:00:00.00Z").getEpochSecond();
134134
private final Handler<Boolean> saltRetrievalResponseHandler;
135135
private final int allowClockSkewSeconds;
136+
private final ComputePoolService computePoolService;
136137
protected Map<Integer, Set<String>> siteIdToInvalidOriginsAndAppNames = new HashMap<>();
137138
protected boolean keySharingEndpointProvideAppNames;
138139
protected Instant lastInvalidOriginProcessTime = Instant.now();
@@ -164,7 +165,8 @@ public UIDOperatorVerticle(IConfigStore configStore,
164165
IStatsCollectorQueue statsCollectorQueue,
165166
SecureLinkValidatorService secureLinkValidatorService,
166167
Handler<Boolean> saltRetrievalResponseHandler,
167-
UidInstanceIdProvider uidInstanceIdProvider) {
168+
UidInstanceIdProvider uidInstanceIdProvider,
169+
ComputePoolService computePoolService) {
168170
this.keyManager = keyManager;
169171
this.secureLinkValidatorService = secureLinkValidatorService;
170172
try {
@@ -198,6 +200,7 @@ public UIDOperatorVerticle(IConfigStore configStore,
198200
this.identityV3Enabled = config.getBoolean(IdentityV3Prop, false);
199201
this.disableOptoutToken = config.getBoolean(DisableOptoutTokenProp, false);
200202
this.uidInstanceIdProvider = uidInstanceIdProvider;
203+
this.computePoolService = computePoolService;
201204
}
202205

203206
@Override
@@ -283,9 +286,9 @@ private void setUpEncryptedRoutes(Router mainRouter, BodyHandler bodyHandler) {
283286
mainRouter.post(V2_TOKEN_VALIDATE.toString()).handler(bodyHandler).handler(auth.handleV1(
284287
rc -> encryptedPayloadHandler.handle(rc, this::handleTokenValidateV2), Role.GENERATOR));
285288
mainRouter.post(V2_IDENTITY_BUCKETS.toString()).handler(bodyHandler).handler(auth.handleV1(
286-
rc -> encryptedPayloadHandler.handle(rc, this::handleBucketsV2), Role.MAPPER));
289+
rc -> encryptedPayloadHandler.handleAsync(rc, this::handleBucketsV2Async), Role.MAPPER));
287290
mainRouter.post(V2_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
288-
rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV2), Role.MAPPER));
291+
rc -> encryptedPayloadHandler.handleAsync(rc, this::handleIdentityMapV2Async), Role.MAPPER));
289292
mainRouter.post(V2_KEY_LATEST.toString()).handler(bodyHandler).handler(auth.handleV1(
290293
rc -> encryptedPayloadHandler.handle(rc, this::handleKeysRequestV2), Role.ID_READER));
291294
mainRouter.post(V2_KEY_SHARING.toString()).handler(bodyHandler).handler(auth.handleV1(
@@ -304,7 +307,7 @@ private void setUpEncryptedRoutes(Router mainRouter, BodyHandler bodyHandler) {
304307
mainRouter.post(V2_TOKEN_CLIENTGENERATE.toString()).handler(bodyHandler).handler(this::handleClientSideTokenGenerate);
305308

306309
mainRouter.post(V3_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
307-
rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV3), Role.MAPPER));
310+
rc -> encryptedPayloadHandler.handleAsync(rc, this::handleIdentityMapV3Async), Role.MAPPER));
308311
}
309312

310313
private void handleClientSideTokenGenerate(RoutingContext rc) {
@@ -1037,6 +1040,12 @@ private Future handleLogoutAsyncV2(RoutingContext rc) {
10371040
}
10381041
}
10391042

1043+
private Future<Void> handleBucketsV2Async(RoutingContext rc) {
1044+
return computePoolService.executeBlocking(() -> {
1045+
handleBucketsV2(rc);
1046+
});
1047+
}
1048+
10401049
private void handleBucketsV2(RoutingContext rc) {
10411050
final JsonObject req = (JsonObject) rc.data().get("request");
10421051
final String qp = req.getString("since_timestamp");
@@ -1222,6 +1231,12 @@ private boolean validateServiceLink(RoutingContext rc) {
12221231
return false;
12231232
}
12241233

1234+
private Future<Void> handleIdentityMapV2Async(RoutingContext rc) {
1235+
return computePoolService.executeBlocking(() -> {
1236+
handleIdentityMapV2(rc);
1237+
});
1238+
}
1239+
12251240
private void handleIdentityMapV2(RoutingContext rc) {
12261241
try {
12271242
final Integer siteId = RoutingContextUtil.getSiteId(rc);
@@ -1285,6 +1300,12 @@ private InputUtil.InputVal[] getIdentityMapV2Input(RoutingContext rc) {
12851300
getInputList.get();
12861301
}
12871302

1303+
private Future<Void> handleIdentityMapV3Async(RoutingContext rc) {
1304+
return computePoolService.executeBlocking(() -> {
1305+
handleIdentityMapV3(rc);
1306+
});
1307+
}
1308+
12881309
private void handleIdentityMapV3(RoutingContext rc) {
12891310
try {
12901311
JsonObject jsonInput = (JsonObject) rc.data().get("request");

src/test/java/com/uid2/operator/ExtendedUIDOperatorVerticle.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.uid2.operator.model.KeyManager;
44
import com.uid2.operator.monitoring.IStatsCollectorQueue;
5+
import com.uid2.operator.service.ComputePoolService;
56
import com.uid2.operator.service.IUIDOperatorService;
67
import com.uid2.operator.service.SecureLinkValidatorService;
78
import com.uid2.operator.store.IConfigStore;
@@ -33,8 +34,9 @@ public ExtendedUIDOperatorVerticle(IConfigStore configStore,
3334
IStatsCollectorQueue statsCollectorQueue,
3435
SecureLinkValidatorService secureLinkValidationService,
3536
Handler<Boolean> saltRetrievalResponseHandler,
36-
UidInstanceIdProvider uidInstanceIdProvider) {
37-
super(configStore, config, clientSideTokenGenerate, siteProvider, clientKeyProvider, clientSideKeypairProvider, keyManager, saltProvider, optOutStore, clock, statsCollectorQueue, secureLinkValidationService, saltRetrievalResponseHandler, uidInstanceIdProvider);
37+
UidInstanceIdProvider uidInstanceIdProvider,
38+
ComputePoolService computePoolService) {
39+
super(configStore, config, clientSideTokenGenerate, siteProvider, clientKeyProvider, clientSideKeypairProvider, keyManager, saltProvider, optOutStore, clock, statsCollectorQueue, secureLinkValidationService, saltRetrievalResponseHandler, uidInstanceIdProvider, computePoolService);
3840
}
3941

4042
public IUIDOperatorService getIdService() {

src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public class UIDOperatorVerticleTest {
142142
private ExtendedUIDOperatorVerticle uidOperatorVerticle;
143143
private RuntimeConfig runtimeConfig;
144144
private EncryptedTokenEncoder encoder;
145+
private ComputePoolService computePoolService;
145146

146147
@BeforeEach
147148
void deployVerticle(Vertx vertx, VertxTestContext testContext, TestInfo testInfo) {
@@ -165,7 +166,8 @@ void deployVerticle(Vertx vertx, VertxTestContext testContext, TestInfo testInfo
165166

166167
this.uidInstanceIdProvider = new UidInstanceIdProvider("test-instance", "id");
167168

168-
this.uidOperatorVerticle = new ExtendedUIDOperatorVerticle(configStore, config, config.getBoolean("client_side_token_generate"), siteProvider, clientKeyProvider, clientSideKeypairProvider, new KeyManager(keysetKeyStore, keysetProvider), saltProvider, optOutStore, clock, statsCollectorQueue, secureLinkValidatorService, shutdownHandler::handleSaltRetrievalResponse, uidInstanceIdProvider);
169+
this.computePoolService = new ComputePoolService(vertx);
170+
this.uidOperatorVerticle = new ExtendedUIDOperatorVerticle(configStore, config, config.getBoolean("client_side_token_generate"), siteProvider, clientKeyProvider, clientSideKeypairProvider, new KeyManager(keysetKeyStore, keysetProvider), saltProvider, optOutStore, clock, statsCollectorQueue, secureLinkValidatorService, shutdownHandler::handleSaltRetrievalResponse, uidInstanceIdProvider, this.computePoolService);
169171
vertx.deployVerticle(uidOperatorVerticle, testContext.succeeding(id -> testContext.completeNow()));
170172

171173
this.registry = new SimpleMeterRegistry();
@@ -177,6 +179,9 @@ void deployVerticle(Vertx vertx, VertxTestContext testContext, TestInfo testInfo
177179
@AfterEach
178180
void teardown() {
179181
Metrics.globalRegistry.remove(registry);
182+
if (computePoolService != null) {
183+
computePoolService.close();
184+
}
180185
}
181186

182187
private RuntimeConfig setupRuntimeConfig(JsonObject config) {

0 commit comments

Comments
 (0)