Skip to content

Commit 8cd12a0

Browse files
RSam25Release Workflow
andauthored
Move compute-heavy endpoints off event loop on to worker threads (#2310)
* Added ComputePoolService and moved the identity map endpoints onto the worker pool * [CI Pipeline] Released Snapshot version: 5.63.31-alpha-290-SNAPSHOT * Added percentiles and pool name to default micrometer metrics * [CI Pipeline] Released Snapshot version: 5.63.32-alpha-291-SNAPSHOT * Added libpng CVEs to trivyignore for testing * [CI Pipeline] Released Snapshot version: 5.63.33-alpha-292-SNAPSHOT * Removed ComputePoolService, added feature flags and fixed pool time metric name * Added log message for when feature is enabled * [CI Pipeline] Released Snapshot version: 5.63.34-alpha-293-SNAPSHOT * Changed GC to ZGC * [CI Pipeline] Released Snapshot version: 5.63.35-alpha-294-SNAPSHOT * Reverted ZGC and set feature flag default false * Reverted trivyignore * Changed default config for compute pool thread count * Made key/bidstream and key/sharing async * Flipped feature flag to test in validator * [CI Pipeline] Released Snapshot version: 5.63.36-alpha-296-SNAPSHOT * [CI Pipeline] Released Snapshot version: 5.64.3-alpha-297-SNAPSHOT * Updated base alpine image to get rid of CVE * Renamed compute pool to compute-heavy-request pool * Changed to using blockingHandlers * [CI Pipeline] Released Snapshot version: 5.64.12-alpha-300-SNAPSHOT * Added env config for worker pool threads * [CI Pipeline] Released Snapshot version: 5.64.13-alpha-301-SNAPSHOT * Cleaned up redundant code * [CI Pipeline] Released Snapshot version: 5.64.14-alpha-302-SNAPSHOT * Added unit tests to check async batch request feature flag --------- Co-authored-by: Release Workflow <unifiedid-admin+release@thetradedesk.com>
1 parent 1bd04f0 commit 8cd12a0

5 files changed

Lines changed: 96 additions & 14 deletions

File tree

conf/default-config.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,6 @@
4040
"sharing_token_expiry_seconds": 2592000,
4141
"operator_type": "public",
4242
"enable_remote_config": true,
43-
"uid_instance_id_prefix": "local-operator"
43+
"uid_instance_id_prefix": "local-operator",
44+
"enable_async_batch_request": false
4445
}

src/main/java/com/uid2/operator/Const.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,9 @@ public class Config extends com.uid2.shared.Const.Config {
3939
public static final String RuntimeConfigMetadataPathProp = "runtime_config_metadata_path";
4040

4141
public static final String IdentityEnvironmentProp = "identity_environment";
42+
43+
public static final String EnableAsyncBatchRequestProp = "enable_async_batch_request";
44+
45+
public static final String DefaultWorkerPoolThreadCountProp = "default_worker_pool_thread_count";
4246
}
4347
}

src/main/java/com/uid2/operator/Main.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
import java.time.Duration;
6262
import java.time.Instant;
6363
import java.util.*;
64-
import java.util.function.Consumer;
6564
import java.util.function.Supplier;
6665

6766
import static com.uid2.operator.Const.Config.EnableRemoteConfigProp;
@@ -488,7 +487,7 @@ private static Vertx createVertx() {
488487

489488
MicrometerMetricsOptions metricOptions = new MicrometerMetricsOptions()
490489
.setPrometheusOptions(prometheusOptions)
491-
.setLabels(EnumSet.of(Label.HTTP_METHOD, Label.HTTP_CODE, Label.HTTP_PATH))
490+
.setLabels(EnumSet.of(Label.HTTP_METHOD, Label.HTTP_CODE, Label.HTTP_PATH, Label.POOL_NAME))
492491
.setJvmMetricsEnabled(true)
493492
.setEnabled(true);
494493
setupMetrics(metricOptions);
@@ -497,9 +496,14 @@ private static Vertx createVertx() {
497496
? 60 * 1000
498497
: 3600 * 1000;
499498

499+
final int defaultWorkerPoolSize = Math.max(2, (Runtime.getRuntime().availableProcessors() - 2) / 2 + 1);
500+
final int workerPoolSize = getEnvInt(Const.Config.DefaultWorkerPoolThreadCountProp, defaultWorkerPoolSize);
501+
LOGGER.info("Creating Vertx with default worker pool size: {}", workerPoolSize);
502+
500503
VertxOptions vertxOptions = new VertxOptions()
501504
.setMetricsOptions(metricOptions)
502-
.setBlockedThreadCheckInterval(threadBlockedCheckInterval);
505+
.setBlockedThreadCheckInterval(threadBlockedCheckInterval)
506+
.setWorkerPoolSize(workerPoolSize);
503507

504508
return Vertx.vertx(vertxOptions);
505509
}
@@ -524,6 +528,7 @@ private static void setupMetrics(MicrometerMetricsOptions metricOptions) {
524528
Objects.equals(id.getTag(Label.HTTP_CODE.toString()), "404")))
525529
.meterFilter(new MeterFilter() {
526530
private final String httpServerResponseTime = MetricsDomain.HTTP_SERVER.getPrefix() + MetricsNaming.v4Names().getHttpResponseTime();
531+
private final String poolQueueTime = MetricsDomain.NAMED_POOLS.getPrefix() + MetricsNaming.v4Names().getPoolQueueTime();
527532

528533
@Override
529534
public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) {
@@ -533,6 +538,12 @@ public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticC
533538
.build()
534539
.merge(config);
535540
}
541+
if (id.getName().equals(poolQueueTime)) {
542+
return DistributionStatisticConfig.builder()
543+
.percentiles(0.50, 0.90, 0.95, 0.99)
544+
.build()
545+
.merge(config);
546+
}
536547
return config;
537548
}
538549
})
@@ -626,4 +637,17 @@ private IOperatorKeyRetriever createOperatorKeyRetriever() throws Exception {
626637
}
627638
}
628639
}
640+
641+
private static int getEnvInt(String name, int defaultValue) {
642+
String value = System.getenv(name);
643+
if (value == null || value.isEmpty()) {
644+
return defaultValue;
645+
}
646+
try {
647+
return Integer.parseInt(value);
648+
} catch (NumberFormatException e) {
649+
LOGGER.warn("Invalid integer value for environment variable {}: '{}', using default: {}", name, value, defaultValue);
650+
return defaultValue;
651+
}
652+
}
629653
}

src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ public class UIDOperatorVerticle extends AbstractVerticle {
139139
private final int optOutStatusMaxRequestSize;
140140
private final boolean optOutStatusApiEnabled;
141141

142+
private final boolean isAsyncBatchRequestsEnabled;
143+
142144
//"Android" is from https://github.com/IABTechLab/uid2-android-sdk/blob/ff93ebf597f5de7d440a84f7015a334ba4138ede/sdk/src/main/java/com/uid2/UID2Client.kt#L46
143145
//"ios"/"tvos" is from https://github.com/IABTechLab/uid2-ios-sdk/blob/91c290d29a7093cfc209eca493d1fee80c17e16a/Sources/UID2/UID2Client.swift#L36-L38
144146
private static final List<String> SUPPORTED_IN_APP = Arrays.asList("Android", "ios", "tvos");
@@ -197,6 +199,7 @@ public UIDOperatorVerticle(IConfigStore configStore,
197199
this.identityV3Enabled = config.getBoolean(IdentityV3Prop, false);
198200
this.disableOptoutToken = config.getBoolean(DisableOptoutTokenProp, false);
199201
this.uidInstanceIdProvider = uidInstanceIdProvider;
202+
this.isAsyncBatchRequestsEnabled = config.getBoolean(EnableAsyncBatchRequestProp, false);
200203
}
201204

202205
@Override
@@ -281,16 +284,8 @@ private void setUpEncryptedRoutes(Router mainRouter, BodyHandler bodyHandler) {
281284
rc -> encryptedPayloadHandler.handleTokenRefresh(rc, this::handleTokenRefreshV2)));
282285
mainRouter.post(V2_TOKEN_VALIDATE.toString()).handler(bodyHandler).handler(auth.handleV1(
283286
rc -> encryptedPayloadHandler.handle(rc, this::handleTokenValidateV2), Role.GENERATOR));
284-
mainRouter.post(V2_IDENTITY_BUCKETS.toString()).handler(bodyHandler).handler(auth.handleV1(
285-
rc -> encryptedPayloadHandler.handle(rc, this::handleBucketsV2), Role.MAPPER));
286-
mainRouter.post(V2_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
287-
rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV2), Role.MAPPER));
288287
mainRouter.post(V2_KEY_LATEST.toString()).handler(bodyHandler).handler(auth.handleV1(
289288
rc -> encryptedPayloadHandler.handle(rc, this::handleKeysRequestV2), Role.ID_READER));
290-
mainRouter.post(V2_KEY_SHARING.toString()).handler(bodyHandler).handler(auth.handleV1(
291-
rc -> encryptedPayloadHandler.handle(rc, this::handleKeysSharing), Role.SHARER, Role.ID_READER));
292-
mainRouter.post(V2_KEY_BIDSTREAM.toString()).handler(bodyHandler).handler(auth.handleV1(
293-
rc -> encryptedPayloadHandler.handle(rc, this::handleKeysBidstream), Role.ID_READER));
294289
mainRouter.post(V2_TOKEN_LOGOUT.toString()).handler(bodyHandler).handler(auth.handleV1(
295290
rc -> encryptedPayloadHandler.handleAsync(rc, this::handleLogoutAsyncV2), Role.OPTOUT));
296291
if (this.optOutStatusApiEnabled) {
@@ -302,8 +297,31 @@ private void setUpEncryptedRoutes(Router mainRouter, BodyHandler bodyHandler) {
302297
if (this.clientSideTokenGenerate)
303298
mainRouter.post(V2_TOKEN_CLIENTGENERATE.toString()).handler(bodyHandler).handler(this::handleClientSideTokenGenerate);
304299

305-
mainRouter.post(V3_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
306-
rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV3), Role.MAPPER));
300+
if (isAsyncBatchRequestsEnabled) {
301+
LOGGER.info("Async batch requests enabled");
302+
mainRouter.post(V2_KEY_SHARING.toString()).handler(bodyHandler).blockingHandler(auth.handleV1(
303+
rc -> encryptedPayloadHandler.handle(rc, this::handleKeysSharing), Role.SHARER, Role.ID_READER), false);
304+
mainRouter.post(V2_KEY_BIDSTREAM.toString()).handler(bodyHandler).blockingHandler(auth.handleV1(
305+
rc -> encryptedPayloadHandler.handle(rc, this::handleKeysBidstream), Role.ID_READER), false);
306+
mainRouter.post(V2_IDENTITY_BUCKETS.toString()).handler(bodyHandler).blockingHandler(auth.handleV1(
307+
rc -> encryptedPayloadHandler.handle(rc, this::handleBucketsV2), Role.MAPPER), false);
308+
mainRouter.post(V2_IDENTITY_MAP.toString()).handler(bodyHandler).blockingHandler(auth.handleV1(
309+
rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV2), Role.MAPPER), false);
310+
mainRouter.post(V3_IDENTITY_MAP.toString()).handler(bodyHandler).blockingHandler(auth.handleV1(
311+
rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV3), Role.MAPPER), false);
312+
} else {
313+
LOGGER.info("Async batch requests disabled");
314+
mainRouter.post(V2_KEY_SHARING.toString()).handler(bodyHandler).handler(auth.handleV1(
315+
rc -> encryptedPayloadHandler.handle(rc, this::handleKeysSharing), Role.SHARER, Role.ID_READER));
316+
mainRouter.post(V2_KEY_BIDSTREAM.toString()).handler(bodyHandler).handler(auth.handleV1(
317+
rc -> encryptedPayloadHandler.handle(rc, this::handleKeysBidstream), Role.ID_READER));
318+
mainRouter.post(V2_IDENTITY_BUCKETS.toString()).handler(bodyHandler).handler(auth.handleV1(
319+
rc -> encryptedPayloadHandler.handle(rc, this::handleBucketsV2), Role.MAPPER));
320+
mainRouter.post(V2_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
321+
rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV2), Role.MAPPER));
322+
mainRouter.post(V3_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
323+
rc -> encryptedPayloadHandler.handle(rc, this::handleIdentityMapV3), Role.MAPPER));
324+
}
307325
}
308326

309327
private void handleClientSideTokenGenerate(RoutingContext rc) {

src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public class UIDOperatorVerticleTest {
142142
private ExtendedUIDOperatorVerticle uidOperatorVerticle;
143143
private RuntimeConfig runtimeConfig;
144144
private EncryptedTokenEncoder encoder;
145+
private ListAppender<ILoggingEvent> asyncBatchRequestLogWatcher;
145146

146147
@BeforeEach
147148
void deployVerticle(Vertx vertx, VertxTestContext testContext, TestInfo testInfo) {
@@ -161,6 +162,20 @@ void deployVerticle(Vertx vertx, VertxTestContext testContext, TestInfo testInfo
161162
if (testInfo.getDisplayName().equals("cstgNoPhoneSupport(Vertx, VertxTestContext)")) {
162163
config.put("enable_phone_support", false);
163164
}
165+
if (testInfo.getTestMethod().isPresent() &&
166+
testInfo.getTestMethod().get().getName().equals("asyncBatchRequestEnabledLogsCorrectMessage")) {
167+
config.put(Const.Config.EnableAsyncBatchRequestProp, true);
168+
asyncBatchRequestLogWatcher = new ListAppender<>();
169+
asyncBatchRequestLogWatcher.start();
170+
((Logger) LoggerFactory.getLogger(UIDOperatorVerticle.class)).addAppender(asyncBatchRequestLogWatcher);
171+
}
172+
if (testInfo.getTestMethod().isPresent() &&
173+
testInfo.getTestMethod().get().getName().equals("asyncBatchRequestDisabledLogsCorrectMessage")) {
174+
config.put(Const.Config.EnableAsyncBatchRequestProp, false);
175+
asyncBatchRequestLogWatcher = new ListAppender<>();
176+
asyncBatchRequestLogWatcher.start();
177+
((Logger) LoggerFactory.getLogger(UIDOperatorVerticle.class)).addAppender(asyncBatchRequestLogWatcher);
178+
}
164179
when(configStore.getConfig()).thenAnswer(x -> runtimeConfig);
165180

166181
this.uidInstanceIdProvider = new UidInstanceIdProvider("test-instance", "id");
@@ -5602,4 +5617,24 @@ void identityBucketsAlwaysReturnMilliseconds(Vertx vertx, VertxTestContext testC
56025617
testContext.completeNow();
56035618
});
56045619
}
5620+
5621+
@Test
5622+
void asyncBatchRequestEnabledLogsCorrectMessage(Vertx vertx, VertxTestContext testContext) {
5623+
// Verify that when enable_async_batch_request is true, the correct log message is emitted
5624+
assertThat(asyncBatchRequestLogWatcher.list.stream()
5625+
.map(ILoggingEvent::getFormattedMessage)
5626+
.collect(Collectors.toList()))
5627+
.contains("Async batch requests enabled");
5628+
testContext.completeNow();
5629+
}
5630+
5631+
@Test
5632+
void asyncBatchRequestDisabledLogsCorrectMessage(Vertx vertx, VertxTestContext testContext) {
5633+
// Verify that when enable_async_batch_request is false, the correct log message is emitted
5634+
assertThat(asyncBatchRequestLogWatcher.list.stream()
5635+
.map(ILoggingEvent::getFormattedMessage)
5636+
.collect(Collectors.toList()))
5637+
.contains("Async batch requests disabled");
5638+
testContext.completeNow();
5639+
}
56055640
}

0 commit comments

Comments
 (0)