Skip to content

Commit 9bd5c37

Browse files
committed
RATIS-2486. Add detailed zero-copy metrics for gRPC message types and fallback tracking.
RATIS-2486. Add zero-copy metrics unit tests.
1 parent d5bd9c4 commit 9bd5c37

5 files changed

Lines changed: 433 additions & 14 deletions

File tree

ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/ZeroCopyMetrics.java

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@
1717
*/
1818
package org.apache.ratis.grpc.metrics;
1919

20+
import java.util.function.Supplier;
21+
22+
import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller.Metrics;
2023
import org.apache.ratis.metrics.LongCounter;
2124
import org.apache.ratis.metrics.MetricRegistryInfo;
2225
import org.apache.ratis.metrics.RatisMetricRegistry;
2326
import org.apache.ratis.metrics.RatisMetrics;
2427
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
2528
import org.apache.ratis.thirdparty.com.google.protobuf.AbstractMessage;
2629

27-
import java.util.function.Supplier;
28-
2930
public class ZeroCopyMetrics extends RatisMetrics {
3031
private static final String RATIS_GRPC_METRICS_APP_NAME = "ratis_grpc";
3132
private static final String RATIS_GRPC_METRICS_COMP_NAME = "zero_copy";
@@ -35,6 +36,20 @@ public class ZeroCopyMetrics extends RatisMetrics {
3536
private final LongCounter nonZeroCopyMessages = getRegistry().counter("num_non_zero_copy_messages");
3637
private final LongCounter releasedMessages = getRegistry().counter("num_released_messages");
3738

39+
// Per-message-type zero-copy counters.
40+
private final LongCounter zeroCopyAppendEntries = getRegistry().counter("num_zero_copy_append_entries");
41+
private final LongCounter zeroCopyInstallSnapshot = getRegistry().counter("num_zero_copy_install_snapshot");
42+
private final LongCounter zeroCopyClientRequest = getRegistry().counter("num_zero_copy_client_request");
43+
44+
// Aggregated savings and parse time (nanos) for zero-copy path.
45+
private final LongCounter bytesSavedByZeroCopy = getRegistry().counter("bytes_saved_by_zero_copy");
46+
private final LongCounter zeroCopyParseTimeNanos = getRegistry().counter("zero_copy_parse_time_nanos");
47+
48+
// Reason counters for zero-copy fallback.
49+
private final LongCounter fallbackNotKnownLength = getRegistry().counter("zero_copy_fallback_not_known_length");
50+
private final LongCounter fallbackNotDetachable = getRegistry().counter("zero_copy_fallback_not_detachable");
51+
private final LongCounter fallbackNotByteBuffer = getRegistry().counter("zero_copy_fallback_not_byte_buffer");
52+
3853
public ZeroCopyMetrics() {
3954
super(createRegistry());
4055
}
@@ -54,6 +69,21 @@ public void onZeroCopyMessage(AbstractMessage ignored) {
5469
zeroCopyMessages.inc();
5570
}
5671

72+
public void onZeroCopyAppendEntries(AbstractMessage ignored) {
73+
onZeroCopyMessage(ignored);
74+
zeroCopyAppendEntries.inc();
75+
}
76+
77+
public void onZeroCopyInstallSnapshot(AbstractMessage ignored) {
78+
onZeroCopyMessage(ignored);
79+
zeroCopyInstallSnapshot.inc();
80+
}
81+
82+
public void onZeroCopyClientRequest(AbstractMessage ignored) {
83+
onZeroCopyMessage(ignored);
84+
zeroCopyClientRequest.inc();
85+
}
86+
5787
public void onNonZeroCopyMessage(AbstractMessage ignored) {
5888
nonZeroCopyMessages.inc();
5989
}
@@ -62,6 +92,34 @@ public void onReleasedMessage(AbstractMessage ignored) {
6292
releasedMessages.inc();
6393
}
6494

95+
public ZeroCopyMessageMarshallerMetrics newMarshallerMetrics() {
96+
return new ZeroCopyMessageMarshallerMetrics();
97+
}
98+
99+
// Adapter used by ZeroCopyMessageMarshaller to report parse stats and fallback reasons.
100+
public class ZeroCopyMessageMarshallerMetrics implements Metrics {
101+
@Override
102+
public void onZeroCopyParse(long bytesSaved, long parseTimeNanos) {
103+
bytesSavedByZeroCopy.inc(bytesSaved);
104+
zeroCopyParseTimeNanos.inc(parseTimeNanos);
105+
}
106+
107+
@Override
108+
public void onFallbackNotKnownLength() {
109+
fallbackNotKnownLength.inc();
110+
}
111+
112+
@Override
113+
public void onFallbackNotDetachable() {
114+
fallbackNotDetachable.inc();
115+
}
116+
117+
@Override
118+
public void onFallbackNotByteBuffer() {
119+
fallbackNotByteBuffer.inc();
120+
}
121+
}
122+
65123
@VisibleForTesting
66124
public long zeroCopyMessages() {
67125
return zeroCopyMessages.getCount();
@@ -76,4 +134,4 @@ public long nonZeroCopyMessages() {
76134
public long releasedMessages() {
77135
return releasedMessages.getCount();
78136
}
79-
}
137+
}

ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ void closeAllExisting(RaftGroupId groupId) {
161161
this.executor = executor;
162162
this.zeroCopyEnabled = zeroCopyEnabled;
163163
this.zeroCopyRequestMarshaller = new ZeroCopyMessageMarshaller<>(RaftClientRequestProto.getDefaultInstance(),
164-
zeroCopyMetrics::onZeroCopyMessage, zeroCopyMetrics::onNonZeroCopyMessage, zeroCopyMetrics::onReleasedMessage);
164+
zeroCopyMetrics::onZeroCopyClientRequest, zeroCopyMetrics::onNonZeroCopyMessage,
165+
zeroCopyMetrics::onReleasedMessage, zeroCopyMetrics.newMarshallerMetrics());
165166
zeroCopyMetrics.addUnreleased("client_protocol", zeroCopyRequestMarshaller::getUnclosedCount);
166167
}
167168

ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747

4848
import static org.apache.ratis.grpc.GrpcUtil.addMethodWithCustomMarshaller;
4949
import static org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.getAppendEntriesMethod;
50+
import static org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.getInstallSnapshotMethod;
5051

5152
class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase {
5253
public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class);
@@ -59,6 +60,7 @@ private enum BatchLogKey implements BatchLogger.Key {
5960
static class PendingServerRequest<REQUEST> {
6061
private final AtomicReference<ReferenceCountedObject<REQUEST>> requestRef;
6162
private final CompletableFuture<Void> future = new CompletableFuture<>();
63+
private volatile String requestString;
6264

6365
PendingServerRequest(ReferenceCountedObject<REQUEST> requestRef) {
6466
requestRef.retain();
@@ -71,6 +73,14 @@ REQUEST getRequest() {
7173
.orElse(null);
7274
}
7375

76+
void setRequestString(String requestString) {
77+
this.requestString = requestString;
78+
}
79+
80+
String getRequestString() {
81+
return requestString;
82+
}
83+
7484
CompletableFuture<Void> getFuture() {
7585
return future;
7686
}
@@ -104,8 +114,7 @@ String getName() {
104114

105115
private String getPreviousRequestString() {
106116
return Optional.ofNullable(previousOnNext.get())
107-
.map(PendingServerRequest::getRequest)
108-
.map(this::requestToString)
117+
.map(PendingServerRequest::getRequestString)
109118
.orElse(null);
110119
}
111120

@@ -178,12 +187,17 @@ public void onNext(REQUEST request) {
178187
}
179188

180189
final PendingServerRequest<REQUEST> current = new PendingServerRequest<>(requestRef);
181-
final long callId = getCallId(current.getRequest());
182-
final boolean isHeartbeat = isHeartbeat(current.getRequest());
183-
final Optional<PendingServerRequest<REQUEST>> previous = Optional.ofNullable(previousOnNext.getAndSet(current));
184-
final CompletableFuture<Void> previousFuture = previous.map(PendingServerRequest::getFuture)
185-
.orElse(CompletableFuture.completedFuture(null));
190+
current.getFuture().whenComplete((r, e) -> current.release());
191+
final REQUEST currentRequest = current.getRequest();
192+
final long callId = getCallId(currentRequest);
193+
final boolean isHeartbeat = isHeartbeat(currentRequest);
194+
Optional<PendingServerRequest<REQUEST>> previous = Optional.empty();
195+
CompletableFuture<Void> previousFuture = CompletableFuture.completedFuture(null);
186196
try {
197+
current.setRequestString(requestToString(currentRequest));
198+
previous = Optional.ofNullable(previousOnNext.getAndSet(current));
199+
previousFuture = previous.map(PendingServerRequest::getFuture)
200+
.orElse(CompletableFuture.completedFuture(null));
187201
final CompletableFuture<REPLY> f = process(requestRef).exceptionally(e -> {
188202
// Handle cases, such as RaftServer is paused
189203
handleError(e, callId, isHeartbeat);
@@ -243,15 +257,23 @@ private void releaseLast() {
243257
private final RaftServer server;
244258
private final boolean zeroCopyEnabled;
245259
private final ZeroCopyMessageMarshaller<AppendEntriesRequestProto> zeroCopyRequestMarshaller;
260+
private final ZeroCopyMessageMarshaller<InstallSnapshotRequestProto> zeroCopyInstallSnapshotMarshaller;
246261

247262
GrpcServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer server, boolean zeroCopyEnabled,
248263
ZeroCopyMetrics zeroCopyMetrics) {
249264
this.idSupplier = idSupplier;
250265
this.server = server;
251266
this.zeroCopyEnabled = zeroCopyEnabled;
252267
this.zeroCopyRequestMarshaller = new ZeroCopyMessageMarshaller<>(AppendEntriesRequestProto.getDefaultInstance(),
253-
zeroCopyMetrics::onZeroCopyMessage, zeroCopyMetrics::onNonZeroCopyMessage, zeroCopyMetrics::onReleasedMessage);
268+
zeroCopyMetrics::onZeroCopyAppendEntries, zeroCopyMetrics::onNonZeroCopyMessage,
269+
zeroCopyMetrics::onReleasedMessage, zeroCopyMetrics.newMarshallerMetrics());
270+
this.zeroCopyInstallSnapshotMarshaller = new ZeroCopyMessageMarshaller<>(
271+
InstallSnapshotRequestProto.getDefaultInstance(),
272+
zeroCopyMetrics::onZeroCopyInstallSnapshot, zeroCopyMetrics::onNonZeroCopyMessage,
273+
zeroCopyMetrics::onReleasedMessage, zeroCopyMetrics.newMarshallerMetrics());
254274
zeroCopyMetrics.addUnreleased("server_protocol", zeroCopyRequestMarshaller::getUnclosedCount);
275+
zeroCopyMetrics.addUnreleased("server_protocol_install_snapshot",
276+
zeroCopyInstallSnapshotMarshaller::getUnclosedCount);
255277
}
256278

257279
RaftPeerId getId() {
@@ -268,9 +290,16 @@ ServerServiceDefinition bindServiceWithZeroCopy() {
268290

269291
// Add appendEntries with zero copy marshaller.
270292
addMethodWithCustomMarshaller(orig, builder, getAppendEntriesMethod(), zeroCopyRequestMarshaller);
293+
// Add installSnapshot with zero copy marshaller for zero-copy counters/metrics.
294+
addMethodWithCustomMarshaller(orig, builder, getInstallSnapshotMethod(), zeroCopyInstallSnapshotMarshaller);
271295
// Add remaining methods as is.
296+
final String appendEntriesMethod = getAppendEntriesMethod().getFullMethodName();
297+
final String installSnapshotMethod = getInstallSnapshotMethod().getFullMethodName();
272298
orig.getMethods().stream().filter(
273-
x -> !x.getMethodDescriptor().getFullMethodName().equals(getAppendEntriesMethod().getFullMethodName())
299+
x -> {
300+
final String methodName = x.getMethodDescriptor().getFullMethodName();
301+
return !methodName.equals(appendEntriesMethod) && !methodName.equals(installSnapshotMethod);
302+
}
274303
).forEach(
275304
builder::addMethod
276305
);
@@ -365,6 +394,11 @@ CompletableFuture<InstallSnapshotReplyProto> process(InstallSnapshotRequestProto
365394
return CompletableFuture.completedFuture(server.installSnapshot(request));
366395
}
367396

397+
@Override
398+
void release(InstallSnapshotRequestProto request) {
399+
zeroCopyInstallSnapshotMarshaller.release(request);
400+
}
401+
368402
@Override
369403
long getCallId(InstallSnapshotRequestProto request) {
370404
return request.getServerRequest().getCallId();

ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,23 @@
5555
public class ZeroCopyMessageMarshaller<T extends MessageLite> implements PrototypeMarshaller<T> {
5656
static final Logger LOG = LoggerFactory.getLogger(ZeroCopyMessageMarshaller.class);
5757

58+
public interface Metrics {
59+
default void onZeroCopyParse(long bytesSaved, long parseTimeNanos) {
60+
}
61+
62+
default void onFallbackNotKnownLength() {
63+
}
64+
65+
default void onFallbackNotDetachable() {
66+
}
67+
68+
default void onFallbackNotByteBuffer() {
69+
}
70+
}
71+
72+
private static final Metrics NOOP_METRICS = new Metrics() {
73+
};
74+
5875
private final String name;
5976
private final Map<T, InputStream> unclosedStreams = Collections.synchronizedMap(new IdentityHashMap<>());
6077
private final Parser<T> parser;
@@ -63,13 +80,19 @@ public class ZeroCopyMessageMarshaller<T extends MessageLite> implements Prototy
6380
private final Consumer<T> zeroCopyCount;
6481
private final Consumer<T> nonZeroCopyCount;
6582
private final Consumer<T> releasedCount;
83+
private final Metrics metrics;
6684

6785
public ZeroCopyMessageMarshaller(T defaultInstance) {
68-
this(defaultInstance, m -> {}, m -> {}, m -> {});
86+
this(defaultInstance, m -> {}, m -> {}, m -> {}, NOOP_METRICS);
6987
}
7088

7189
public ZeroCopyMessageMarshaller(T defaultInstance, Consumer<T> zeroCopyCount, Consumer<T> nonZeroCopyCount,
7290
Consumer<T> releasedCount) {
91+
this(defaultInstance, zeroCopyCount, nonZeroCopyCount, releasedCount, NOOP_METRICS);
92+
}
93+
94+
public ZeroCopyMessageMarshaller(T defaultInstance, Consumer<T> zeroCopyCount, Consumer<T> nonZeroCopyCount,
95+
Consumer<T> releasedCount, Metrics metrics) {
7396
this.name = JavaUtils.getClassSimpleName(defaultInstance.getClass()) + "-Marshaller";
7497
@SuppressWarnings("unchecked")
7598
final Parser<T> p = (Parser<T>) defaultInstance.getParserForType();
@@ -79,6 +102,7 @@ public ZeroCopyMessageMarshaller(T defaultInstance, Consumer<T> zeroCopyCount, C
79102
this.zeroCopyCount = zeroCopyCount;
80103
this.nonZeroCopyCount = nonZeroCopyCount;
81104
this.releasedCount = releasedCount;
105+
this.metrics = metrics == null ? NOOP_METRICS : metrics;
82106
}
83107

84108
@Override
@@ -158,28 +182,36 @@ private List<ByteString> getByteStrings(InputStream detached, int exactSize) thr
158182
*/
159183
private T parseZeroCopy(InputStream stream) throws IOException {
160184
if (!(stream instanceof KnownLength)) {
185+
metrics.onFallbackNotKnownLength();
161186
LOG.debug("stream is not KnownLength: {}", stream.getClass());
162187
return null;
163188
}
164189
if (!(stream instanceof Detachable)) {
190+
metrics.onFallbackNotDetachable();
165191
LOG.debug("stream is not Detachable: {}", stream.getClass());
166192
return null;
167193
}
168194
if (!(stream instanceof HasByteBuffer)) {
195+
metrics.onFallbackNotByteBuffer();
169196
LOG.debug("stream is not HasByteBuffer: {}", stream.getClass());
170197
return null;
171198
}
172199
if (!((HasByteBuffer) stream).byteBufferSupported()) {
200+
metrics.onFallbackNotByteBuffer();
173201
LOG.debug("stream is HasByteBuffer but not byteBufferSupported: {}", stream.getClass());
174202
return null;
175203
}
176204

177205
final int exactSize = stream.available();
178206
InputStream detached = ((Detachable) stream).detach();
207+
// Measure only the zero-copy parse path (detach + parse).
208+
final long startNanos = System.nanoTime();
179209
try {
180210
final List<ByteString> byteStrings = getByteStrings(detached, exactSize);
181211
final T message = parseFrom(byteStrings, exactSize);
182212

213+
metrics.onZeroCopyParse(exactSize, System.nanoTime() - startNanos);
214+
183215
final InputStream previous = unclosedStreams.put(message, detached);
184216
Preconditions.assertNull(previous, "previous");
185217

0 commit comments

Comments
 (0)