Skip to content

Commit f3b1981

Browse files
addressing comments
Signed-off-by: Anthony Petrov <anthony@swirldslabs.com>
1 parent 021956d commit f3b1981

4 files changed

Lines changed: 36 additions & 31 deletions

File tree

pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCall.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import io.helidon.http.http2.Http2StreamState;
2222
import io.helidon.webclient.http2.Http2ClientStream;
2323
import io.helidon.webclient.http2.StreamTimeoutException;
24-
import java.io.UncheckedIOException;
25-
import java.net.SocketException;
2624
import java.util.List;
2725
import java.util.concurrent.ExecutionException;
2826
import java.util.concurrent.TimeUnit;
@@ -197,29 +195,13 @@ private void receiveRepliesLoop() {
197195
// read data from stream
198196
final PbjGrpcDatagramReader datagramReader =
199197
new PbjGrpcDatagramReader(grpcClient.getConfig().maxIncomingBufferSize());
200-
boolean repliesReceived = false;
201198
while (isStreamOpen() && !clientStream.trailers().isDone() && clientStream.hasEntity()) {
202199
final Http2FrameData frameData;
203200
try {
204201
frameData = clientStream.readOne(grpcClient.getConfig().readTimeout());
205202
} catch (StreamTimeoutException e) {
206203
// Check if the connection is alive. See a comment above about the KeepAlive timeout.
207-
try {
208-
clientStream.sendPing();
209-
} catch (UncheckedIOException uioe) {
210-
// And the connection may in fact be closed.
211-
if (repliesReceived
212-
&& uioe.getCause() instanceof SocketException se
213-
&& se.getMessage() != null
214-
&& se.getMessage().contains("Socket closed")) {
215-
// We won't be able to read trailers anyway because the connection is closed.
216-
// Since at least one reply has been received and processed, complete the call:
217-
pipeline.onComplete();
218-
return;
219-
}
220-
// Either we've never received a single reply yet, or this isn't a "Socket closed".
221-
throw uioe;
222-
}
204+
clientStream.sendPing();
223205
// FUTURE WORK: implement an uber timeout to return
224206
continue;
225207
}
@@ -254,7 +236,6 @@ private void receiveRepliesLoop() {
254236
Codec.DEFAULT_MAX_DEPTH,
255237
grpcClient.getConfig().maxSize());
256238
pipeline.onNext(reply);
257-
repliesReceived = true;
258239
} catch (ParseException e) {
259240
pipeline.onError(e);
260241
// We won't be able to proceed probably because parsing failed.

pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/grpc/GrpcCompression.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,12 @@ public Bytes decompress(Bytes bytes) {
9090
private static final Map<String, Decompressor> DECOMPRESSOR_MAP = new HashMap<>();
9191

9292
/** Register a Compressor, potentially overwriting an existing registration for `name`. */
93-
public static void registerCompressor(String name, Compressor compressor) {
93+
public static synchronized void registerCompressor(String name, Compressor compressor) {
9494
COMPRESSOR_MAP.put(name, compressor);
9595
}
9696

9797
/** Register a Decompressor, potentially overwriting an existing registration for `name`. */
98-
public static void registerDecompressor(String name, Decompressor decompressor) {
98+
public static synchronized void registerDecompressor(String name, Decompressor decompressor) {
9999
DECOMPRESSOR_MAP.put(name, decompressor);
100100
}
101101

pbj-integration-tests/src/jmh/java/com/hedera/pbj/integration/jmh/grpc/NetworkLatencySimulator.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@
1212
* instead of leveraging the fast network.
1313
* A slow network would show that compression may be useful sometimes. Specifically, for larger, compressible
1414
* payloads. Smaller payloads (<8K) never benefit from compression.
15+
* <p>
16+
* The network latency simulator uses the PbjGrpcNetworkBytesInspector feature of the PbjGrpcCall, which is
17+
* a static entity, meaning that it affects any and all PbjGrpcCall objects in the same JVM. This is by design
18+
* for performance reasons. However, users of this Simulator should be aware of this and not try to use it
19+
* from multiple threads with different latency parameters at once.
1520
*/
1621
public class NetworkLatencySimulator {
1722
// ms 1e-3, us 1e-6, ns 1e-9:
@@ -31,9 +36,9 @@ public static void simulate(final long networkSpeedMbitPerSecond, final boolean
3136

3237
private void sleep(long bytes) {
3338
final long nanos = nanosPerByte * bytes;
34-
try {
35-
Thread.sleep(nanos / NANOS_IN_MILLI, (int) (nanos % NANOS_IN_MILLI));
36-
} catch (InterruptedException ignore) {
39+
final long deadline = System.nanoTime() + nanos;
40+
while (System.nanoTime() < deadline) {
41+
Thread.onSpinWait();
3742
}
3843
}
3944

pbj-integration-tests/src/jmh/java/com/hedera/pbj/integration/jmh/grpc/block/TestBlockGrpcBench.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@
77
import com.hedera.pbj.integration.jmh.grpc.NetworkLatencySimulator;
88
import com.hedera.pbj.integration.jmh.grpc.PbjGrpcBench;
99
import com.hedera.pbj.integration.jmh.grpc.ZstdGrpcTransformer;
10+
import com.hedera.pbj.runtime.Codec;
1011
import com.hedera.pbj.runtime.grpc.GrpcClient;
1112
import com.hedera.pbj.runtime.grpc.Pipeline;
13+
import java.io.UncheckedIOException;
14+
import java.net.SocketException;
1215
import java.util.Set;
1316
import java.util.concurrent.CountDownLatch;
1417
import java.util.concurrent.Flow;
@@ -54,12 +57,12 @@
5457
@OutputTimeUnit(TimeUnit.SECONDS)
5558
@BenchmarkMode(Mode.Throughput)
5659
public class TestBlockGrpcBench {
57-
private static final int INVOCATIONS = 2_000;
60+
private static final int INVOCATIONS = 400;
5861

5962
static {
6063
new ZstdGrpcTransformer(-5).register("zstd-5");
61-
new ZstdGrpcTransformer(0).register("zstd0");
6264
new ZstdGrpcTransformer(3).register("zstd"); // the default level
65+
new ZstdGrpcTransformer(10).register("zstd10");
6366

6467
// 1Gbps network:
6568
NetworkLatencySimulator.simulate(1_000, true);
@@ -70,8 +73,13 @@ static TestBlockStreamerInterface.TestBlockStreamerClient createClient(final int
7073
if (encodings == null || encodings.length == 0) {
7174
grpcClient = GrpcTestUtils.createGrpcClient(port, GrpcTestUtils.PROTO_OPTIONS);
7275
} else {
73-
grpcClient =
74-
GrpcTestUtils.createGrpcClient(port, GrpcTestUtils.PROTO_OPTIONS, encodings[0], Set.of(encodings));
76+
grpcClient = GrpcTestUtils.createGrpcClient(
77+
port,
78+
GrpcTestUtils.PROTO_OPTIONS,
79+
encodings[0],
80+
Set.of(encodings),
81+
Codec.DEFAULT_MAX_SIZE,
82+
Codec.DEFAULT_MAX_SIZE * 5);
7583
}
7684

7785
return new TestBlockStreamerInterface.TestBlockStreamerClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS);
@@ -82,7 +90,7 @@ public static class BenchState {
8290
@Param({"102400", "524288", "2048000"})
8391
int maxBlockSize;
8492

85-
@Param({"identity", "gzip", "zstd", "zstd0", "zstd-5"})
93+
@Param({"identity", "gzip", "zstd", "zstd10", "zstd-5"})
8694
String encodings;
8795

8896
PortsAllocator.Port port;
@@ -156,8 +164,19 @@ public void onSubscribe(Flow.Subscription subscription) {
156164

157165
@Override
158166
public void onError(Throwable throwable) {
159-
new RuntimeException(throwable).printStackTrace();
160167
latch.countDown();
168+
if (throwable instanceof UncheckedIOException uioe
169+
&& uioe.getCause() instanceof SocketException se
170+
&& se.getMessage() != null
171+
&& se.getMessage().contains("Socket closed")) {
172+
// A streaming server may close its connection sometimes before
173+
// the client has received and processed all the replies. However, the client's
174+
// PbjGrpcCall may try and ping the server during the processing. This results in
175+
// calling this method. And this seems to happen often enough to print this error
176+
// more often than we'd like. So we just ignore this particular error altogether here.
177+
return;
178+
}
179+
new RuntimeException(throwable).printStackTrace();
161180
}
162181

163182
@Override

0 commit comments

Comments
 (0)