Skip to content

Commit ec11efc

Browse files
committed
Removed ComputePoolService, added feature flags and fixed pool time metric name
1 parent 3cdf3d3 commit ec11efc

7 files changed

Lines changed: 52 additions & 159 deletions

File tree

conf/default-config.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,7 @@
4040
"sharing_token_expiry_seconds": 2592000,
4141
"operator_type": "public",
4242
"enable_remote_config": true,
43-
"uid_instance_id_prefix": "local-operator"
43+
"uid_instance_id_prefix": "local-operator",
44+
"enable_async_batch_request": true,
45+
"compute_pool_thread_count": 12
4446
}

src/main/java/com/uid2/operator/Const.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,8 @@ public class Config extends com.uid2.shared.Const.Config {
3939
public static final String RuntimeConfigMetadataPathProp = "runtime_config_metadata_path";
4040

4141
public static final String IdentityEnvironmentProp = "identity_environment";
42+
43+
public static final String ComputePoolThreadCountProp = "compute_pool_thread_count";
44+
public static final String EnableAsyncBatchRequestProp = "enable_async_batch_request";
4245
}
4346
}

src/main/java/com/uid2/operator/Main.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -342,10 +342,12 @@ private void run() throws Exception {
342342
this.createVertxEventLoopsMetric();
343343

344344
// Create shared compute pool for CPU-intensive operations
345-
final ComputePoolService computePoolService = new ComputePoolService(vertx);
345+
final int computePoolSize = config.getInteger(Const.Config.ComputePoolThreadCountProp, Math.max(1, Runtime.getRuntime().availableProcessors() - 2));
346+
final WorkerExecutor computeWorkerPool = vertx.createSharedWorkerExecutor("compute", computePoolSize);
347+
LOGGER.info("Created compute worker pool with size: {}", computePoolSize);
346348

347349
Supplier<Verticle> operatorVerticleSupplier = () -> {
348-
UIDOperatorVerticle verticle = new UIDOperatorVerticle(configStore, config, this.clientSideTokenGenerate, siteProvider, clientKeyProvider, clientSideKeypairProvider, getKeyManager(), saltProvider, optOutStore, Clock.systemUTC(), _statsCollectorQueue, new SecureLinkValidatorService(this.serviceLinkProvider, this.serviceProvider), this.shutdownHandler::handleSaltRetrievalResponse, this.uidInstanceIdProvider, computePoolService);
350+
UIDOperatorVerticle verticle = new UIDOperatorVerticle(configStore, config, this.clientSideTokenGenerate, siteProvider, clientKeyProvider, clientSideKeypairProvider, getKeyManager(), saltProvider, optOutStore, Clock.systemUTC(), _statsCollectorQueue, new SecureLinkValidatorService(this.serviceLinkProvider, this.serviceProvider), this.shutdownHandler::handleSaltRetrievalResponse, this.uidInstanceIdProvider, computeWorkerPool);
349351
return verticle;
350352
};
351353

@@ -374,7 +376,9 @@ private void run() throws Exception {
374376
})
375377
.onFailure(t -> {
376378
LOGGER.error("Failed to bootstrap operator: " + t.getMessage(), new Exception(t));
377-
computePoolService.close();
379+
if (computeWorkerPool != null) {
380+
computeWorkerPool.close();
381+
}
378382
vertx.close();
379383
System.exit(1);
380384
});
@@ -504,7 +508,7 @@ private static Vertx createVertx() {
504508
VertxOptions vertxOptions = new VertxOptions()
505509
.setMetricsOptions(metricOptions)
506510
.setBlockedThreadCheckInterval(threadBlockedCheckInterval)
507-
.setWorkerPoolSize(8);
511+
.setWorkerPoolSize(12);
508512

509513
return Vertx.vertx(vertxOptions);
510514
}
@@ -529,7 +533,7 @@ private static void setupMetrics(MicrometerMetricsOptions metricOptions) {
529533
Objects.equals(id.getTag(Label.HTTP_CODE.toString()), "404")))
530534
.meterFilter(new MeterFilter() {
531535
private final String httpServerResponseTime = MetricsDomain.HTTP_SERVER.getPrefix() + MetricsNaming.v4Names().getHttpResponseTime();
532-
private final String poolQueueTime = MetricsDomain.HTTP_SERVER.getPrefix() + MetricsNaming.v4Names().getPoolQueueTime();
536+
private final String poolQueueTime = MetricsDomain.NAMED_POOLS.getPrefix() + MetricsNaming.v4Names().getPoolQueueTime();
533537

534538
@Override
535539
public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) {

src/main/java/com/uid2/operator/service/ComputePoolService.java

Lines changed: 0 additions & 133 deletions
This file was deleted.

src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import io.vertx.core.Future;
4444
import io.vertx.core.Handler;
4545
import io.vertx.core.Promise;
46+
import io.vertx.core.WorkerExecutor;
4647
import io.vertx.core.buffer.Buffer;
4748
import io.vertx.core.http.HttpServerOptions;
4849
import io.vertx.core.http.HttpServerResponse;
@@ -133,14 +134,16 @@ public class UIDOperatorVerticle extends AbstractVerticle {
133134
public static final long OPT_OUT_CHECK_CUTOFF_DATE = Instant.parse("2023-09-01T00:00:00.00Z").getEpochSecond();
134135
private final Handler<Boolean> saltRetrievalResponseHandler;
135136
private final int allowClockSkewSeconds;
136-
private final ComputePoolService computePoolService;
137+
private final WorkerExecutor computeWorkerPool;
137138
protected Map<Integer, Set<String>> siteIdToInvalidOriginsAndAppNames = new HashMap<>();
138139
protected boolean keySharingEndpointProvideAppNames;
139140
protected Instant lastInvalidOriginProcessTime = Instant.now();
140141

141142
private final int optOutStatusMaxRequestSize;
142143
private final boolean optOutStatusApiEnabled;
143144

145+
private final boolean isAsyncBatchRequestsEnabled;
146+
144147
//"Android" is from https://github.com/IABTechLab/uid2-android-sdk/blob/ff93ebf597f5de7d440a84f7015a334ba4138ede/sdk/src/main/java/com/uid2/UID2Client.kt#L46
145148
//"ios"/"tvos" is from https://github.com/IABTechLab/uid2-ios-sdk/blob/91c290d29a7093cfc209eca493d1fee80c17e16a/Sources/UID2/UID2Client.swift#L36-L38
146149
private static final List<String> SUPPORTED_IN_APP = Arrays.asList("Android", "ios", "tvos");
@@ -166,7 +169,7 @@ public UIDOperatorVerticle(IConfigStore configStore,
166169
SecureLinkValidatorService secureLinkValidatorService,
167170
Handler<Boolean> saltRetrievalResponseHandler,
168171
UidInstanceIdProvider uidInstanceIdProvider,
169-
ComputePoolService computePoolService) {
172+
WorkerExecutor computeWorkerPool) {
170173
this.keyManager = keyManager;
171174
this.secureLinkValidatorService = secureLinkValidatorService;
172175
try {
@@ -200,7 +203,8 @@ public UIDOperatorVerticle(IConfigStore configStore,
200203
this.identityV3Enabled = config.getBoolean(IdentityV3Prop, false);
201204
this.disableOptoutToken = config.getBoolean(DisableOptoutTokenProp, false);
202205
this.uidInstanceIdProvider = uidInstanceIdProvider;
203-
this.computePoolService = computePoolService;
206+
this.computeWorkerPool = computeWorkerPool;
207+
this.isAsyncBatchRequestsEnabled = config.getBoolean(EnableAsyncBatchRequestProp, false);
204208
}
205209

206210
@Override
@@ -285,10 +289,6 @@ private void setUpEncryptedRoutes(Router mainRouter, BodyHandler bodyHandler) {
285289
rc -> encryptedPayloadHandler.handleTokenRefresh(rc, this::handleTokenRefreshV2)));
286290
mainRouter.post(V2_TOKEN_VALIDATE.toString()).handler(bodyHandler).handler(auth.handleV1(
287291
rc -> encryptedPayloadHandler.handle(rc, this::handleTokenValidateV2), Role.GENERATOR));
288-
mainRouter.post(V2_IDENTITY_BUCKETS.toString()).handler(bodyHandler).handler(auth.handleV1(
289-
rc -> encryptedPayloadHandler.handleAsync(rc, this::handleBucketsV2Async), Role.MAPPER));
290-
mainRouter.post(V2_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
291-
rc -> encryptedPayloadHandler.handleAsync(rc, this::handleIdentityMapV2Async), Role.MAPPER));
292292
mainRouter.post(V2_KEY_LATEST.toString()).handler(bodyHandler).handler(auth.handleV1(
293293
rc -> encryptedPayloadHandler.handle(rc, this::handleKeysRequestV2), Role.ID_READER));
294294
mainRouter.post(V2_KEY_SHARING.toString()).handler(bodyHandler).handler(auth.handleV1(
@@ -306,8 +306,21 @@ private void setUpEncryptedRoutes(Router mainRouter, BodyHandler bodyHandler) {
306306
if (this.clientSideTokenGenerate)
307307
mainRouter.post(V2_TOKEN_CLIENTGENERATE.toString()).handler(bodyHandler).handler(this::handleClientSideTokenGenerate);
308308

309-
mainRouter.post(V3_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
310-
rc -> encryptedPayloadHandler.handleAsync(rc, this::handleIdentityMapV3Async), Role.MAPPER));
309+
if (isAsyncBatchRequestsEnabled) {
310+
mainRouter.post(V2_IDENTITY_BUCKETS.toString()).handler(bodyHandler).handler(auth.handleV1(
311+
rc -> encryptedPayloadHandler.handleAsync(rc, this::handleBucketsV2Async), Role.MAPPER));
312+
mainRouter.post(V2_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
313+
rc -> encryptedPayloadHandler.handleAsync(rc, this::handleIdentityMapV2Async), Role.MAPPER));
314+
mainRouter.post(V3_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
315+
rc -> encryptedPayloadHandler.handleAsync(rc, this::handleIdentityMapV3Async), Role.MAPPER));
316+
} else {
317+
mainRouter.post(V2_IDENTITY_BUCKETS.toString()).handler(bodyHandler).handler(auth.handleV1(
318+
rc -> encryptedPayloadHandler.handle(rc, this::handleBucketsV2), Role.MAPPER));
319+
mainRouter.post(V2_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
320+
rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV2), Role.MAPPER));
321+
mainRouter.post(V3_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
322+
rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV3), Role.MAPPER));
323+
}
311324
}
312325

313326
private void handleClientSideTokenGenerate(RoutingContext rc) {
@@ -1041,8 +1054,9 @@ private Future handleLogoutAsyncV2(RoutingContext rc) {
10411054
}
10421055

10431056
private Future<Void> handleBucketsV2Async(RoutingContext rc) {
1044-
return computePoolService.executeBlocking(() -> {
1057+
return computeWorkerPool.executeBlocking(() -> {
10451058
handleBucketsV2(rc);
1059+
return null;
10461060
});
10471061
}
10481062

@@ -1232,8 +1246,9 @@ private boolean validateServiceLink(RoutingContext rc) {
12321246
}
12331247

12341248
private Future<Void> handleIdentityMapV2Async(RoutingContext rc) {
1235-
return computePoolService.executeBlocking(() -> {
1249+
return computeWorkerPool.executeBlocking(() -> {
12361250
handleIdentityMapV2(rc);
1251+
return null;
12371252
});
12381253
}
12391254

@@ -1301,8 +1316,9 @@ private InputUtil.InputVal[] getIdentityMapV2Input(RoutingContext rc) {
13011316
}
13021317

13031318
private Future<Void> handleIdentityMapV3Async(RoutingContext rc) {
1304-
return computePoolService.executeBlocking(() -> {
1319+
return computeWorkerPool.executeBlocking(() -> {
13051320
handleIdentityMapV3(rc);
1321+
return null;
13061322
});
13071323
}
13081324

src/test/java/com/uid2/operator/ExtendedUIDOperatorVerticle.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.uid2.operator.model.KeyManager;
44
import com.uid2.operator.monitoring.IStatsCollectorQueue;
5-
import com.uid2.operator.service.ComputePoolService;
65
import com.uid2.operator.service.IUIDOperatorService;
76
import com.uid2.operator.service.SecureLinkValidatorService;
87
import com.uid2.operator.store.IConfigStore;
@@ -12,6 +11,7 @@
1211
import com.uid2.shared.store.*;
1312
import com.uid2.shared.store.salt.ISaltProvider;
1413
import io.vertx.core.Handler;
14+
import io.vertx.core.WorkerExecutor;
1515
import io.vertx.core.json.JsonObject;
1616

1717
import java.time.Clock;
@@ -35,8 +35,8 @@ public ExtendedUIDOperatorVerticle(IConfigStore configStore,
3535
SecureLinkValidatorService secureLinkValidationService,
3636
Handler<Boolean> saltRetrievalResponseHandler,
3737
UidInstanceIdProvider uidInstanceIdProvider,
38-
ComputePoolService computePoolService) {
39-
super(configStore, config, clientSideTokenGenerate, siteProvider, clientKeyProvider, clientSideKeypairProvider, keyManager, saltProvider, optOutStore, clock, statsCollectorQueue, secureLinkValidationService, saltRetrievalResponseHandler, uidInstanceIdProvider, computePoolService);
38+
WorkerExecutor computeWorkerPool) {
39+
super(configStore, config, clientSideTokenGenerate, siteProvider, clientKeyProvider, clientSideKeypairProvider, keyManager, saltProvider, optOutStore, clock, statsCollectorQueue, secureLinkValidationService, saltRetrievalResponseHandler, uidInstanceIdProvider, computeWorkerPool);
4040
}
4141

4242
public IUIDOperatorService getIdService() {

src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.vertx.core.Future;
3939
import io.vertx.core.Handler;
4040
import io.vertx.core.Vertx;
41+
import io.vertx.core.WorkerExecutor;
4142
import io.vertx.core.buffer.Buffer;
4243
import io.vertx.core.http.HttpHeaders;
4344
import io.vertx.core.json.JsonArray;
@@ -142,7 +143,7 @@ public class UIDOperatorVerticleTest {
142143
private ExtendedUIDOperatorVerticle uidOperatorVerticle;
143144
private RuntimeConfig runtimeConfig;
144145
private EncryptedTokenEncoder encoder;
145-
private ComputePoolService computePoolService;
146+
private WorkerExecutor computeWorkerPool;
146147

147148
@BeforeEach
148149
void deployVerticle(Vertx vertx, VertxTestContext testContext, TestInfo testInfo) {
@@ -166,8 +167,8 @@ void deployVerticle(Vertx vertx, VertxTestContext testContext, TestInfo testInfo
166167

167168
this.uidInstanceIdProvider = new UidInstanceIdProvider("test-instance", "id");
168169

169-
this.computePoolService = new ComputePoolService(vertx);
170-
this.uidOperatorVerticle = new ExtendedUIDOperatorVerticle(configStore, config, config.getBoolean("client_side_token_generate"), siteProvider, clientKeyProvider, clientSideKeypairProvider, new KeyManager(keysetKeyStore, keysetProvider), saltProvider, optOutStore, clock, statsCollectorQueue, secureLinkValidatorService, shutdownHandler::handleSaltRetrievalResponse, uidInstanceIdProvider, this.computePoolService);
170+
this.computeWorkerPool = vertx.createSharedWorkerExecutor("compute", 4);
171+
this.uidOperatorVerticle = new ExtendedUIDOperatorVerticle(configStore, config, config.getBoolean("client_side_token_generate"), siteProvider, clientKeyProvider, clientSideKeypairProvider, new KeyManager(keysetKeyStore, keysetProvider), saltProvider, optOutStore, clock, statsCollectorQueue, secureLinkValidatorService, shutdownHandler::handleSaltRetrievalResponse, uidInstanceIdProvider, this.computeWorkerPool);
171172
vertx.deployVerticle(uidOperatorVerticle, testContext.succeeding(id -> testContext.completeNow()));
172173

173174
this.registry = new SimpleMeterRegistry();
@@ -179,8 +180,8 @@ void deployVerticle(Vertx vertx, VertxTestContext testContext, TestInfo testInfo
179180
@AfterEach
180181
void teardown() {
181182
Metrics.globalRegistry.remove(registry);
182-
if (computePoolService != null) {
183-
computePoolService.close();
183+
if (computeWorkerPool != null) {
184+
computeWorkerPool.close();
184185
}
185186
}
186187

0 commit comments

Comments
 (0)