Skip to content

Commit 5094e8f

Browse files
add GRPC blocks bench
Signed-off-by: Anthony Petrov <anthony@swirldslabs.com>
1 parent 770c698 commit 5094e8f

8 files changed

Lines changed: 486 additions & 77 deletions

File tree

pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCall.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import io.helidon.http.http2.Http2StreamState;
2222
import io.helidon.webclient.http2.Http2ClientStream;
2323
import io.helidon.webclient.http2.StreamTimeoutException;
24+
import java.io.UncheckedIOException;
25+
import java.net.SocketException;
2426
import java.util.List;
2527
import java.util.concurrent.ExecutionException;
2628
import java.util.concurrent.TimeUnit;
@@ -194,13 +196,29 @@ private void receiveRepliesLoop() {
194196

195197
// read data from stream
196198
final PbjGrpcDatagramReader datagramReader = new PbjGrpcDatagramReader();
199+
boolean repliesReceived = false;
197200
while (isStreamOpen() && !clientStream.trailers().isDone() && clientStream.hasEntity()) {
198201
final Http2FrameData frameData;
199202
try {
200203
frameData = clientStream.readOne(grpcClient.getConfig().readTimeout());
201204
} catch (StreamTimeoutException e) {
202205
// Check if the connection is alive. See a comment above about the KeepAlive timeout.
203-
clientStream.sendPing();
206+
try {
207+
clientStream.sendPing();
208+
} catch (UncheckedIOException uioe) {
209+
// And the connection may in fact be closed.
210+
if (repliesReceived
211+
&& uioe.getCause() instanceof SocketException se
212+
&& se.getMessage() != null
213+
&& se.getMessage().contains("Socket closed")) {
214+
// We won't be able to read trailers anyway because the connection is closed.
215+
// Since at least one reply has been received and processed, complete the call:
216+
pipeline.onComplete();
217+
return;
218+
}
219+
// Either we've never received a single reply yet, or this isn't a "Socket closed".
220+
throw uioe;
221+
}
204222
// FUTURE WORK: implement an uber timeout to return
205223
continue;
206224
}
@@ -230,6 +248,7 @@ private void receiveRepliesLoop() {
230248
try {
231249
final ReplyT reply = replyCodec.parse(replyBytes);
232250
pipeline.onNext(reply);
251+
repliesReceived = true;
233252
} catch (ParseException e) {
234253
pipeline.onError(e);
235254
// We won't be able to proceed probably because parsing failed.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package com.hedera.pbj.integration.jmh.grpc;
3+
4+
import com.hedera.pbj.grpc.client.helidon.PbjGrpcCall;
5+
import com.hedera.pbj.grpc.client.helidon.PbjGrpcNetworkBytesInspector;
6+
import com.hedera.pbj.runtime.io.buffer.Bytes;
7+
8+
/**
9+
* Simulate network latency based on the actual amount of user data being sent/received over a network.
10+
* Note: with 1Gbps, we get 8ns per byte. So to test higher speeds, we should switch to floating point math.
11+
* A very fast network would make any compression look bad because we'll waste CPU time on the compression
12+
* instead of leveraging the fast network.
13+
* A slow network would show that compression may be useful sometimes. Specifically, for larger, compressible
14+
* payloads. Smaller payloads (<8K) never benefit from compression.
15+
*/
16+
public class NetworkLatencySimulator {
17+
// ms 1e-3, us 1e-6, ns 1e-9:
18+
private static final long NANOS_IN_MILLI = 1_000_000L;
19+
20+
/**
21+
* Install the NetworkLatencySimulator as a PbjGrpcNetworkBytesInspector in PbjGrpcCall.
22+
* @param networkSpeedMbitPerSecond the speed in Mbps, e.g. 1_000 for 1Gbps network
23+
* @param printSizes if true, print a few sent/received sizes for debugging/additional information
24+
*/
25+
public static void simulate(final long networkSpeedMbitPerSecond, final boolean printSizes) {
26+
// mbit->mbyte = /8:
27+
final long nanosPerByte = 1_000_000_000L * 8 / (networkSpeedMbitPerSecond * 1_000_000L);
28+
PbjGrpcCall.setNetworkBytesInspector(new PbjGrpcNetworkBytesInspector() {
29+
// max number of sizes to print if enabled:
30+
int counter = 4;
31+
32+
private void sleep(long bytes) {
33+
final long nanos = nanosPerByte * bytes;
34+
try {
35+
Thread.sleep(nanos / NANOS_IN_MILLI, (int) (nanos % NANOS_IN_MILLI));
36+
} catch (InterruptedException ignore) {
37+
}
38+
}
39+
40+
@Override
41+
public void sent(Bytes bytes) {
42+
if (printSizes && counter-- >= 0) {
43+
System.err.println("sent: " + bytes.length() + " bytes");
44+
}
45+
sleep(bytes.length());
46+
}
47+
48+
@Override
49+
public void received(Bytes bytes) {
50+
if (printSizes && counter-- >= 0) {
51+
System.err.println("received: " + bytes.length() + " bytes");
52+
}
53+
sleep(bytes.length());
54+
}
55+
});
56+
}
57+
}

pbj-integration-tests/src/jmh/java/com/hedera/pbj/integration/jmh/grpc/PbjGrpcBench.java

Lines changed: 5 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,15 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package com.hedera.pbj.integration.jmh.grpc;
33

4-
import com.github.luben.zstd.ZstdInputStream;
5-
import com.github.luben.zstd.ZstdOutputStream;
6-
import com.hedera.pbj.grpc.client.helidon.PbjGrpcCall;
7-
import com.hedera.pbj.grpc.client.helidon.PbjGrpcNetworkBytesInspector;
84
import com.hedera.pbj.grpc.helidon.PbjGrpcServiceConfig;
95
import com.hedera.pbj.grpc.helidon.PbjRouting;
106
import com.hedera.pbj.grpc.helidon.config.PbjConfig;
117
import com.hedera.pbj.integration.grpc.GrpcTestUtils;
128
import com.hedera.pbj.integration.grpc.PortsAllocator;
139
import com.hedera.pbj.runtime.grpc.GrpcClient;
14-
import com.hedera.pbj.runtime.grpc.GrpcCompression;
1510
import com.hedera.pbj.runtime.grpc.Pipeline;
1611
import com.hedera.pbj.runtime.grpc.ServiceInterface;
17-
import com.hedera.pbj.runtime.io.buffer.Bytes;
1812
import io.helidon.webserver.WebServer;
19-
import java.io.ByteArrayOutputStream;
20-
import java.io.IOException;
21-
import java.io.UncheckedIOException;
2213
import java.util.Set;
2314
import java.util.concurrent.CountDownLatch;
2415
import java.util.concurrent.Flow;
@@ -66,82 +57,20 @@
6657
public class PbjGrpcBench {
6758
private static final int INVOCATIONS = 2_000;
6859

69-
private static class ZstdGrpcTransformer implements GrpcCompression.GrpcTransformer {
70-
private static final String NAME = "zstd";
71-
private static final GrpcCompression.GrpcTransformer INSTANCE = new ZstdGrpcTransformer();
72-
73-
@Override
74-
public Bytes compress(Bytes bytes) {
75-
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
76-
ZstdOutputStream zos = new ZstdOutputStream(baos, -5)) {
77-
bytes.writeTo(zos);
78-
zos.flush();
79-
zos.close();
80-
return Bytes.wrap(baos.toByteArray());
81-
} catch (IOException e) {
82-
throw new UncheckedIOException(e);
83-
}
84-
}
85-
86-
@Override
87-
public Bytes decompress(Bytes bytes) {
88-
byte[] buffer = new byte[1024];
89-
try (ZstdInputStream zis = new ZstdInputStream(bytes.toInputStream());
90-
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
91-
int len;
92-
while ((len = zis.read(buffer, 0, buffer.length)) > 0) {
93-
baos.write(buffer, 0, len);
94-
}
95-
return Bytes.wrap(baos.toByteArray());
96-
} catch (IOException e) {
97-
throw new UncheckedIOException(e);
98-
}
99-
}
100-
}
101-
10260
static {
103-
GrpcCompression.registerCompressor(ZstdGrpcTransformer.NAME, ZstdGrpcTransformer.INSTANCE);
104-
GrpcCompression.registerDecompressor(ZstdGrpcTransformer.NAME, ZstdGrpcTransformer.INSTANCE);
105-
106-
// Simulate network latency based on the actual amount of user data being sent/received over a 1Gbps network.
107-
// Note: with 1Gbps, we get 8ns per byte. So to test higher speeds, we should switch to floating point math.
108-
// A very fast network would make any compression look bad because we'll waste CPU time on the compression
109-
// instead of leveraging the fast network.
110-
// A slow network would show that compression may be useful sometimes. Specifically, for larger, compressible
111-
// payloads. Smaller payloads (<8K) never benefit from compression.
112-
final long NETWORK_SPEED_MBIT_PER_SECOND = 1_000;
113-
// ms 1e-3, us 1e-6, ns 1e-9:
114-
final long NANOS_IN_MILLI = 1_000_000L;
115-
// also, mbit->mbyte = /8:
116-
final long NANOS_PER_BYTE = 1_000_000_000L * 8 / (NETWORK_SPEED_MBIT_PER_SECOND * 1_000_000L);
117-
PbjGrpcCall.setNetworkBytesInspector(new PbjGrpcNetworkBytesInspector() {
118-
private void sleep(long bytes) {
119-
final long nanos = NANOS_PER_BYTE * bytes;
120-
try {
121-
Thread.sleep(nanos / NANOS_IN_MILLI, (int) (nanos % NANOS_IN_MILLI));
122-
} catch (InterruptedException ignore) {
123-
}
124-
}
125-
126-
@Override
127-
public void sent(Bytes bytes) {
128-
sleep(bytes.length());
129-
}
61+
new ZstdGrpcTransformer().register("zstd");
13062

131-
@Override
132-
public void received(Bytes bytes) {
133-
sleep(bytes.length());
134-
}
135-
});
63+
// 1Gbps network:
64+
NetworkLatencySimulator.simulate(1_000, false);
13665
}
13766

138-
private record ServerHandle(WebServer server) implements AutoCloseable {
67+
public record ServerHandle(WebServer server) implements AutoCloseable {
13968
@Override
14069
public void close() {
14170
server.stop();
14271
}
14372

144-
static ServerHandle start(
73+
public static ServerHandle start(
14574
final int port, final ServiceInterface service, final PbjGrpcServiceConfig serviceConfig) {
14675
final int maxPayloadSize = 20 * 1024 * 1024;
14776
final PbjConfig pbjConfig = PbjConfig.builder()
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package com.hedera.pbj.integration.jmh.grpc;
3+
4+
import com.github.luben.zstd.ZstdInputStream;
5+
import com.github.luben.zstd.ZstdOutputStream;
6+
import com.hedera.pbj.runtime.grpc.GrpcCompression;
7+
import com.hedera.pbj.runtime.io.buffer.Bytes;
8+
import java.io.ByteArrayOutputStream;
9+
import java.io.IOException;
10+
import java.io.UncheckedIOException;
11+
12+
/**
13+
* A GRPC Compressor/Decompressor for zstd.
14+
* @param level zstd compression level, -5 to 22; 3 is the default.
15+
*/
16+
public record ZstdGrpcTransformer(int level) implements GrpcCompression.GrpcTransformer {
17+
public ZstdGrpcTransformer() {
18+
this(3);
19+
}
20+
21+
/**
22+
* Register the transformer with PBJ GrpcCompression
23+
* @param name the name of the encoding.
24+
*/
25+
public void register(String name) {
26+
GrpcCompression.registerCompressor(name, this);
27+
GrpcCompression.registerDecompressor(name, this);
28+
}
29+
30+
@Override
31+
public Bytes compress(Bytes bytes) {
32+
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
33+
ZstdOutputStream zos = new ZstdOutputStream(baos, level)) {
34+
bytes.writeTo(zos);
35+
zos.flush();
36+
zos.close();
37+
return Bytes.wrap(baos.toByteArray());
38+
} catch (IOException e) {
39+
throw new UncheckedIOException(e);
40+
}
41+
}
42+
43+
@Override
44+
public Bytes decompress(Bytes bytes) {
45+
byte[] buffer = new byte[1024];
46+
try (ZstdInputStream zis = new ZstdInputStream(bytes.toInputStream());
47+
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
48+
int len;
49+
while ((len = zis.read(buffer, 0, buffer.length)) > 0) {
50+
baos.write(buffer, 0, len);
51+
}
52+
return Bytes.wrap(baos.toByteArray());
53+
} catch (IOException e) {
54+
throw new UncheckedIOException(e);
55+
}
56+
}
57+
}

0 commit comments

Comments
 (0)