Skip to content

Commit 7ce9d7a

Browse files
authored
run sendRequest in an executor (#341)
* run sendRequest in an executor
1 parent e37177b commit 7ce9d7a

7 files changed

Lines changed: 57 additions & 11 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ target
1515
/.gradle
1616
.settings
1717
subprojects/parseq-lambda-names/bin/
18+
subprojects/parseq-tracevis/npm-debug.log

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
v5.1.16
2+
------
3+
* Add the support of offloading sendRequest call to an executor
4+
15
v5.1.15
26
------
37
* Fix stack overflow error in TaskDescriptor when lambda is used

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
version=5.1.15
1+
version=5.1.16
22
group=com.linkedin.parseq
33
org.gradle.parallel=true
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.linkedin.restli.client;
2+
3+
import java.util.concurrent.Executor;
4+
5+
6+
/**
7+
* A simple executor implementation that executes the task immediately on the calling thread
8+
*/
9+
class DirectExecutor implements Executor {
10+
11+
private static final DirectExecutor INSTANCE = new DirectExecutor();
12+
13+
static DirectExecutor getInstance() {
14+
return INSTANCE;
15+
}
16+
17+
@Override
18+
public void execute(Runnable command) {
19+
command.run();
20+
}
21+
}

subprojects/parseq-restli-client/src/main/java/com/linkedin/restli/client/ParSeqRestClient.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.linkedin.restli.client.metrics.Metrics;
3636
import com.linkedin.restli.common.OperationNameGenerator;
3737
import java.util.Optional;
38+
import java.util.concurrent.Executor;
3839
import java.util.concurrent.TimeUnit;
3940
import java.util.function.Function;
4041
import org.slf4j.Logger;
@@ -69,16 +70,24 @@ public class ParSeqRestClient extends BatchingStrategy<RequestGroup, RestRequest
6970
private final RequestConfigProvider _requestConfigProvider;
7071
private final boolean _d2RequestTimeoutEnabled;
7172
private final Function<Request<?>, RequestContext> _requestContextProvider;
73+
private final Executor _executor;
7274

7375
ParSeqRestClient(final Client client, final RequestConfigProvider requestConfigProvider,
74-
Function<Request<?>, RequestContext> requestContextProvider, final boolean d2RequestTimeoutEnabled) {
76+
Function<Request<?>, RequestContext> requestContextProvider, final boolean d2RequestTimeoutEnabled,
77+
Executor executor) {
7578
ArgumentUtil.requireNotNull(client, "client");
7679
ArgumentUtil.requireNotNull(requestConfigProvider, "requestConfigProvider");
7780
ArgumentUtil.requireNotNull(requestContextProvider, "requestContextProvider");
7881
_client = client;
7982
_requestConfigProvider = requestConfigProvider;
8083
_requestContextProvider = requestContextProvider;
8184
_d2RequestTimeoutEnabled = d2RequestTimeoutEnabled;
85+
_executor = executor;
86+
}
87+
88+
ParSeqRestClient(final Client client, final RequestConfigProvider requestConfigProvider,
89+
Function<Request<?>, RequestContext> requestContextProvider, final boolean d2RequestTimeoutEnabled) {
90+
this(client, requestConfigProvider, requestContextProvider, d2RequestTimeoutEnabled, DirectExecutor.getInstance());
8291
}
8392

8493
/**
@@ -93,6 +102,7 @@ public ParSeqRestClient(final Client client) {
93102
_requestConfigProvider = RequestConfigProvider.build(new ParSeqRestliClientConfigBuilder().build(), () -> Optional.empty());
94103
_requestContextProvider = request -> new RequestContext();
95104
_d2RequestTimeoutEnabled = false;
105+
_executor = DirectExecutor.getInstance();
96106
}
97107

98108
/**
@@ -107,6 +117,7 @@ public ParSeqRestClient(final RestClient client) {
107117
_requestConfigProvider = RequestConfigProvider.build(new ParSeqRestliClientConfigBuilder().build(), () -> Optional.empty());
108118
_requestContextProvider = request -> new RequestContext();
109119
_d2RequestTimeoutEnabled = false;
120+
_executor = DirectExecutor.getInstance();
110121
}
111122

112123
@Override
@@ -119,11 +130,13 @@ public <T> Promise<Response<T>> sendRequest(final Request<T> request) {
119130
@Deprecated
120131
public <T> Promise<Response<T>> sendRequest(final Request<T> request, final RequestContext requestContext) {
121132
final SettablePromise<Response<T>> promise = Promises.settable();
122-
try {
123-
_client.sendRequest(request, requestContext, new PromiseCallbackAdapter<T>(promise));
124-
} catch (Throwable e) {
125-
promise.fail(e);
126-
}
133+
_executor.execute(() -> {
134+
try {
135+
_client.sendRequest(request, requestContext, new PromiseCallbackAdapter<T>(promise));
136+
} catch (Throwable e) {
137+
promise.fail(e);
138+
}
139+
});
127140
return promise;
128141
}
129142

subprojects/parseq-restli-client/src/main/java/com/linkedin/restli/client/ParSeqRestliClientBuilder.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.Collections;
44
import java.util.Map;
55
import java.util.Optional;
6+
import java.util.concurrent.Executor;
67
import java.util.function.Function;
78

89
import org.slf4j.Logger;
@@ -35,6 +36,7 @@ public class ParSeqRestliClientBuilder {
3536
private BatchingSupport _batchingSupport;
3637
private InboundRequestContextFinder _inboundRequestContextFinder;
3738
private Function<Request<?>, RequestContext> _requestContextProvider;
39+
private Executor _executor = DirectExecutor.getInstance();
3840

3941
/**
4042
* This method may throw RuntimeException e.g. when there is a problem with configuration.
@@ -65,7 +67,7 @@ public ParSeqRestClient build() {
6567
request -> new RequestContext() :
6668
_requestContextProvider;
6769

68-
ParSeqRestClient parseqClient = new ParSeqRestClient(_client, configProvider, requestContextProvider, _d2RequestTimeoutEnabled);
70+
ParSeqRestClient parseqClient = new ParSeqRestClient(_client, configProvider, requestContextProvider, _d2RequestTimeoutEnabled, _executor);
6971
if (_batchingSupport != null) {
7072
LOGGER.debug("Found batching support");
7173
_batchingSupport.registerStrategy(parseqClient);
@@ -160,4 +162,9 @@ public ParSeqRestliClientBuilder setD2RequestTimeoutEnabled(boolean enabled) {
160162
_d2RequestTimeoutEnabled = enabled;
161163
return this;
162164
}
165+
166+
public ParSeqRestliClientBuilder setExecutor(Executor executor) {
167+
_executor = executor;
168+
return this;
169+
}
163170
}

subprojects/parseq/src/main/java/com/linkedin/parseq/FusionTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,11 @@ private ShallowTraceBuilder getEffectiveShallowTraceBuilder(final FusionTraceCon
9999
}
100100

101101
private void addRelationships(final FusionTraceContext traceContext) {
102-
final ShallowTraceBuilder effectoveShallowTraceBuilder = getEffectiveShallowTraceBuilder(traceContext);
102+
final ShallowTraceBuilder effectiveShallowTraceBuilder = getEffectiveShallowTraceBuilder(traceContext);
103103
TraceBuilder builder = getTraceBuilder();
104-
builder.addRelationship(Relationship.PARENT_OF, traceContext.getParent().getShallowTraceBuilder(), effectoveShallowTraceBuilder);
104+
builder.addRelationship(Relationship.PARENT_OF, traceContext.getParent().getShallowTraceBuilder(), effectiveShallowTraceBuilder);
105105
if (_predecessorShallowTraceBuilder != null) {
106-
builder.addRelationship(Relationship.SUCCESSOR_OF, effectoveShallowTraceBuilder, _predecessorShallowTraceBuilder);
106+
builder.addRelationship(Relationship.SUCCESSOR_OF, effectiveShallowTraceBuilder, _predecessorShallowTraceBuilder);
107107
}
108108
}
109109

0 commit comments

Comments
 (0)