Skip to content

Commit 2f364cf

Browse files
authored
Use dedicated executor service for class analysis (#340)
The common fork join pool might be used by extern libs, so spinning up a dynamically resizing dedicated executor service capped by num CPUs.
1 parent 7ce9d7a commit 2f364cf

3 files changed

Lines changed: 38 additions & 24 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
v5.1.17
2+
------
3+
* Use dedicated executor service for lambda analysis
4+
15
v5.1.16
26
------
37
* Add the support of offloading sendRequest call to an executor

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
version=5.1.16
1+
version=5.1.17
22
group=com.linkedin.parseq
33
org.gradle.parallel=true

subprojects/parseq-lambda-names/src/main/java/com/linkedin/parseq/lambda/ASMBasedTaskDescriptor.java

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@
1313
import java.util.concurrent.ConcurrentHashMap;
1414
import java.util.concurrent.ConcurrentMap;
1515
import java.util.concurrent.CountDownLatch;
16+
import java.util.concurrent.ExecutorService;
17+
import java.util.concurrent.Executors;
1618
import java.util.concurrent.ForkJoinPool;
19+
import java.util.concurrent.LinkedBlockingQueue;
20+
import java.util.concurrent.ThreadPoolExecutor;
1721
import java.util.concurrent.TimeUnit;
1822
import java.util.concurrent.atomic.AtomicInteger;
1923
import java.util.concurrent.atomic.AtomicReference;
@@ -41,9 +45,15 @@
4145
*/
4246
public class ASMBasedTaskDescriptor implements TaskDescriptor {
4347

44-
private static final ConcurrentMap<String, String> _names = new ConcurrentHashMap<>();
45-
private static final AtomicReference<CountDownLatch> _latchRef = new AtomicReference<CountDownLatch>();
46-
private static final AtomicInteger _count = new AtomicInteger();
48+
private static final ConcurrentMap<String, String> NAMES = new ConcurrentHashMap<>();
49+
private static final AtomicReference<CountDownLatch> LATCH_REF = new AtomicReference<>();
50+
private static final AtomicInteger COUNT = new AtomicInteger();
51+
// Dynamically allow downsizing of threads, never increase more than CPU due to analysis being CPU intensive
52+
private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(0,
53+
Runtime.getRuntime().availableProcessors(),
54+
5,
55+
TimeUnit.SECONDS,
56+
new LinkedBlockingQueue<>());
4757

4858
public static class AnalyzerAdvice {
4959

@@ -151,34 +161,34 @@ Optional<String> getLambdaClassDescription(String className) {
151161
return Optional.empty();
152162
}
153163
String name = className.substring(0, slashIndex);
154-
String description = _names.get(name);
164+
String description = NAMES.get(name);
155165

156166
// If we have already analyzed the class, we don't need to await
157167
// analysis on other lambdas.
158168
if (description != null) {
159169
return Optional.of(description).filter(s -> !s.isEmpty());
160170
}
161171

162-
CountDownLatch latch = _latchRef.get();
172+
CountDownLatch latch = LATCH_REF.get();
163173
if (latch != null) {
164174
try {
165175
// We wait up to one minute - an arbitrary, sufficiently large amount of time.
166176
// The wait period must be bounded to avoid locking out JVM.
167177
latch.await(1, TimeUnit.MINUTES);
168178
} catch (InterruptedException e) {
169179
System.err.println("ERROR: ParSeq Latch timed out suggesting serious issue in ASMBasedTaskDescriptor. "
170-
+ "Current number of class being analyzed: " + _count.get());
180+
+ "Current number of class being analyzed: " + COUNT.get());
171181
e.printStackTrace();
172182
Thread.currentThread().interrupt();
173183
}
174184
}
175185

176186
// Try again
177-
return Optional.ofNullable(_names.get(name)).filter(s -> !s.isEmpty());
187+
return Optional.ofNullable(NAMES.get(name)).filter(s -> !s.isEmpty());
178188
}
179189

180190
private static void add(String lambdaClassName, String description) {
181-
_names.put(lambdaClassName, description);
191+
NAMES.put(lambdaClassName, description);
182192
}
183193

184194
public static class Analyzer implements ClassFileTransformer {
@@ -218,12 +228,22 @@ public void run() {
218228
System.out.println("WARNING: Parseq cannot doAnalyze");
219229
t.printStackTrace();
220230
}
221-
if (_count.decrementAndGet() == 0) {
222-
CountDownLatch latch = _latchRef.getAndSet(null);
231+
if (COUNT.decrementAndGet() == 0) {
232+
CountDownLatch latch = LATCH_REF.getAndSet(null);
223233
latch.countDown();
224234
}
225235

226236
}
237+
238+
public static void doAnalyze(byte[] byteCode, ClassLoader loader, Exception exception) {
239+
ClassReader reader = new ClassReader(byteCode);
240+
LambdaClassLocator cv = new LambdaClassLocator(Opcodes.ASM7, loader, exception);
241+
reader.accept(cv, 0);
242+
if (cv.isLambdaClass()) {
243+
LambdaClassDescription lambdaClassDescription = cv.getLambdaClassDescription();
244+
add(lambdaClassDescription.getClassName(), lambdaClassDescription.getDescription());
245+
}
246+
}
227247
}
228248

229249
@Override
@@ -236,9 +256,9 @@ public byte[] transform(ClassLoader loader, String className, Class<?> classBein
236256
}
237257

238258
public static void analyze(byte[] byteCode, ClassLoader loader) {
239-
if (_count.getAndIncrement() == 0) {
259+
if (COUNT.getAndIncrement() == 0) {
240260
CountDownLatch latch = new CountDownLatch(1);
241-
while (!_latchRef.compareAndSet(null, latch)) {
261+
while (!LATCH_REF.compareAndSet(null, latch)) {
242262
/*
243263
* Busy spin. If we got here it means that other thread just
244264
* decremented _count to 0 and is about to null out _latchRef.
@@ -248,17 +268,7 @@ public static void analyze(byte[] byteCode, ClassLoader loader) {
248268
}
249269
}
250270
final Exception e = new Exception();
251-
ForkJoinPool.commonPool().execute(AnalyzerRunnable.of(byteCode, loader, e));
252-
}
253-
254-
public static void doAnalyze(byte[] byteCode, ClassLoader loader, Exception exception) {
255-
ClassReader reader = new ClassReader(byteCode);
256-
LambdaClassLocator cv = new LambdaClassLocator(Opcodes.ASM7, loader, exception);
257-
reader.accept(cv, 0);
258-
if (cv.isLambdaClass()) {
259-
LambdaClassDescription lambdaClassDescription = cv.getLambdaClassDescription();
260-
add(lambdaClassDescription.getClassName(), lambdaClassDescription.getDescription());
261-
}
271+
EXECUTOR_SERVICE.submit(AnalyzerRunnable.of(byteCode, loader, e));
262272
}
263273
}
264274
}

0 commit comments

Comments
 (0)