Skip to content

Commit 1e7875c

Browse files
committed
Instrument watchstream send loop
This PR adds a few more timing metrics to the watch send loop. We're currently trying to track down some sort of contention affecting all gRPC endpoints, we see a hard QPS ceiling when the apiserver restarts with watch cache inits. There's currently no metric to help us pinpoint where the time is going to create such a ceiling, thus adding some more metrics here. Somewhat related to etcd-io#15402. Signed-off-by: Thomas Jungblut <tjungblu@redhat.com>
1 parent 779ae51 commit 1e7875c

2 files changed

Lines changed: 61 additions & 1 deletion

File tree

server/etcdserver/api/v3rpc/metrics.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,60 @@ var (
5252
},
5353
[]string{"type", "client_api_version"},
5454
)
55+
56+
watchSendLoopWatchStreamTime = prometheus.NewHistogram(
57+
prometheus.HistogramOpts{
58+
Namespace: "etcd_debugging",
59+
Subsystem: "server",
60+
Name: "watch_send_loop_watch_stream_time_seconds",
61+
Help: "The total duration in seconds of running through the send loop watch stream response all events.",
62+
},
63+
)
64+
65+
watchSendLoopWatchStreamTimePerEvent = prometheus.NewHistogram(
66+
prometheus.HistogramOpts{
67+
Namespace: "etcd_debugging",
68+
Subsystem: "server",
69+
Name: "watch_send_loop_watch_stream_time_per_event_seconds",
70+
Help: "The average duration in seconds of running through the send loop watch stream response, per event.",
71+
// lowest bucket start of upper bound 0.0001 sec (0.1 ms) with factor 2
72+
// highest bucket start of 0.0001 sec * 2^15 == 3.2768 sec
73+
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 16),
74+
},
75+
)
76+
77+
watchSendLoopControlStreamTime = prometheus.NewHistogram(
78+
prometheus.HistogramOpts{
79+
Namespace: "etcd_debugging",
80+
Subsystem: "server",
81+
Name: "watch_send_loop_control_stream_time_seconds",
82+
Help: "The total duration in seconds of running through the send loop control stream response.",
83+
// lowest bucket start of upper bound 0.0001 sec (0.1 ms) with factor 2
84+
// highest bucket start of 0.0001 sec * 2^15 == 3.2768 sec
85+
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 16),
86+
},
87+
)
88+
89+
watchSendLoopProgressTime = prometheus.NewHistogram(
90+
prometheus.HistogramOpts{
91+
Namespace: "etcd_debugging",
92+
Subsystem: "server",
93+
Name: "watch_send_loop_progress_time_seconds",
94+
Help: "The total duration in seconds of running through the progress loop control stream response.",
95+
// lowest bucket start of upper bound 0.0001 sec (0.1 ms) with factor 2
96+
// highest bucket start of 0.0001 sec * 2^15 == 3.2768 sec
97+
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 16),
98+
},
99+
)
55100
)
56101

57102
func init() {
58103
prometheus.MustRegister(sentBytes)
59104
prometheus.MustRegister(receivedBytes)
60105
prometheus.MustRegister(streamFailures)
61106
prometheus.MustRegister(clientRequests)
107+
prometheus.MustRegister(watchSendLoopWatchStreamTime)
108+
prometheus.MustRegister(watchSendLoopWatchStreamTimePerEvent)
109+
prometheus.MustRegister(watchSendLoopControlStreamTime)
110+
prometheus.MustRegister(watchSendLoopProgressTime)
62111
}

server/etcdserver/api/v3rpc/watch.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ func (sws *serverWatchStream) sendLoop() {
405405
return
406406
}
407407

408+
start := time.Now()
408409
// TODO: evs is []mvccpb.Event type
409410
// either return []*mvccpb.Event from the mvcc package
410411
// or define protocol buffer with []mvccpb.Event.
@@ -475,11 +476,16 @@ func (sws *serverWatchStream) sendLoop() {
475476
}
476477
sws.mu.Unlock()
477478

479+
totalDur := time.Since(start)
480+
sws.lg.Info("watch response to gRPC stream", zap.Duration("duration", totalDur))
481+
watchSendLoopWatchStreamTime.Observe(totalDur.Seconds())
482+
watchSendLoopWatchStreamTimePerEvent.Observe(totalDur.Seconds() / float64(len(evs)))
483+
478484
case c, ok := <-sws.ctrlStream:
479485
if !ok {
480486
return
481487
}
482-
488+
start := time.Now()
483489
if err := sws.gRPCStream.Send(c); err != nil {
484490
if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
485491
sws.lg.Debug("failed to send watch control response to gRPC stream", zap.Error(err))
@@ -517,7 +523,11 @@ func (sws *serverWatchStream) sendLoop() {
517523
delete(pending, wid)
518524
}
519525

526+
watchSendLoopControlStreamTime.Observe(time.Since(start).Seconds())
527+
520528
case <-progressTicker.C:
529+
start := time.Now()
530+
521531
sws.mu.Lock()
522532
for id, ok := range sws.progress {
523533
if ok {
@@ -526,6 +536,7 @@ func (sws *serverWatchStream) sendLoop() {
526536
sws.progress[id] = true
527537
}
528538
sws.mu.Unlock()
539+
watchSendLoopProgressTime.Observe(time.Since(start).Seconds())
529540

530541
case <-sws.closec:
531542
return

0 commit comments

Comments
 (0)