Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ config*.yaml
# Editor/IDE
.idea/
.vscode/
.codegraph/
reasonix.toml
41 changes: 41 additions & 0 deletions proxy/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package proxy

import (
pkgmetrics "github.com/omalloc/tavern/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)

var (
// upstreamRequestDuration tracks upstream round-trip latency per upstream address.
// Labels: addr
upstreamRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: pkgmetrics.Namespace,
Name: "upstream_request_duration_seconds",
Comment on lines +11 to +13
Help: "Upstream request round-trip latency histogram",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 30},
}, []string{"addr"})

// upstreamErrorsTotal counts upstream errors by upstream address and error type.
// Labels: addr, error_type (network/timeout/http_status)
upstreamErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Comment on lines +18 to +20
Namespace: pkgmetrics.Namespace,
Name: "upstream_errors_total",
Help: "The total number of upstream request errors by upstream address and error type",
}, []string{"addr", "error_type"})

// collapseRequestsTotal tracks singleflight request coalescing outcomes.
// Labels: result (primary/shared)
collapseRequestsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: pkgmetrics.Namespace,
Name: "collapse_requests_total",
Help: "The total number of singleflight-collapsed upstream requests",
}, []string{"result"})
)

func init() {
prometheus.MustRegister(
upstreamRequestDuration,
upstreamErrorsTotal,
collapseRequestsTotal,
)
}
42 changes: 35 additions & 7 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/omalloc/proxy/selector/random"

"github.com/omalloc/tavern/proxy/singleflight"

"github.com/prometheus/client_golang/prometheus"
)

type Tuple struct {
Expand Down Expand Up @@ -70,32 +72,43 @@ func (r *ReverseProxy) Do(req *http.Request, collapsed bool, waitTimeout time.Du
return nil, selector.ErrNoAvailable
}

upAddr := current.Address()

defer done(req.Context(), selector.DoneInfo{
Err: err,
BytesSent: true,
BytesReceived: true,
})

client := r.find(current.Address())
client := r.find(upAddr)

trackedDo := func() (*http.Response, error) {
start := time.Now()
resp, doErr := client.Do(req)
upstreamRequestDuration.With(prometheus.Labels{"addr": upAddr}).Observe(time.Since(start).Seconds())
if doErr != nil {
upstreamErrorsTotal.With(prometheus.Labels{"addr": upAddr, "error_type": classifyError(doErr)}).Inc()
}
return resp, doErr
}

if !collapsed {
return client.Do(req)
//return r.uncompress(client.Do(req))
return trackedDo()
}

ret := <-r.flight.DoChan(onceKey(req), waitTimeout, func() (*http.Response, error) {
//return r.uncompress(client.Do(req))
return client.Do(req)
return trackedDo()
Comment on lines 95 to +100
})

if ret.Err != nil {
return ret.Val, ret.Err
}

if ret.Shared {
// if shared, process the response copied.
collapseRequestsTotal.WithLabelValues("shared").Inc()
return ret.Val, ret.Err
}
// return directly
collapseRequestsTotal.WithLabelValues("primary").Inc()
return ret.Val, ret.Err
}

Expand Down Expand Up @@ -217,3 +230,18 @@ func WithActivateMock(fn func(client *http.Client)) Option {
r.activateMock = fn
}
}

// classifyError maps an upstream request error to a metric label value.
func classifyError(err error) string {
if err == nil {
return "none"
}
if oe, ok := err.(interface{ Timeout() bool }); ok && oe.Timeout() {
return "timeout"
}
// net.Error covers both temporary and permanent network errors.
if _, ok := err.(net.Error); ok {
return "network"
}
return "unknown"
}
17 changes: 17 additions & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,29 @@ var (
Name: "requests_unexpected_closed_total",
Help: "The total number of unexpected closed requests",
}, []string{"protocol", "method"})

// requestDuration tracks end-to-end request latency by HTTP method.
requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: pkgmetrics.Namespace,
Name: "request_duration_seconds",
Help: "End-to-end request latency histogram by HTTP method",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 30},
}, []string{"method"})

// connectionsActive tracks the current number of active client connections.
connectionsActive = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: pkgmetrics.Namespace,
Name: "connections_active",
Help: "The current number of active client connections",
})
Comment on lines +29 to +34
)

func init() {
prometheus.MustRegister(
_metricRequestCodeCounterTotal,
_metricRequestUnexpectedClosedTotal,
requestDuration,
connectionsActive,
)

_metricRequestUnexpectedClosedTotal.WithLabelValues("HTTP/1.1", "GET")
Expand Down
12 changes: 12 additions & 0 deletions server/middleware/caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/omalloc/tavern/proxy"
"github.com/omalloc/tavern/server/middleware"
storagev1 "github.com/omalloc/tavern/storage"

"github.com/prometheus/client_golang/prometheus"
)

const BYPASS = "BYPASS"
Expand Down Expand Up @@ -142,6 +144,7 @@ func Middleware(c *configv1.Middleware) (middleware.Middleware, func(), error) {
// set cache-staus header BYPASS
resp.Header.Set(protocol.ProtocolCacheStatusKey, BYPASS)
}
cacheRequestTotal.WithLabelValues(storage.BYPASS.String(), caching.bucket.StoreType()).Inc()
return
}

Expand All @@ -155,6 +158,7 @@ func Middleware(c *configv1.Middleware) (middleware.Middleware, func(), error) {
headers := make(http.Header)
xhttp.CopyHeader(caching.md.Headers, headers)
headers.Set("Content-Range", fmt.Sprintf("bytes */%d", caching.md.Size))
cacheRequestTotal.WithLabelValues(caching.cacheStatus.String(), caching.bucket.StoreType()).Inc()
return nil, xhttp.NewBizError(http.StatusRequestedRangeNotSatisfiable, headers)
}

Expand All @@ -166,21 +170,25 @@ func Middleware(c *configv1.Middleware) (middleware.Middleware, func(), error) {
if err != nil {
// fd leak
closeBody(resp)
cacheRequestTotal.WithLabelValues(caching.cacheStatus.String(), caching.bucket.StoreType()).Inc()
return nil, err
}

// response now
resp, err = caching.processor.postCacheProcessor(caching, req, resp)
cacheRequestTotal.WithLabelValues(caching.cacheStatus.String(), caching.bucket.StoreType()).Inc()
return
}

// full MISS
resp, err = caching.doProxy(req, false)
if err != nil {
cacheRequestTotal.WithLabelValues(caching.cacheStatus.String(), caching.bucket.StoreType()).Inc()
return nil, err
}

resp, err = processor.postCacheProcessor(caching, req, resp)
cacheRequestTotal.WithLabelValues(caching.cacheStatus.String(), caching.bucket.StoreType()).Inc()
return
})

Expand Down Expand Up @@ -456,6 +464,7 @@ func (c *Caching) flushbufferSlice(respRange xhttp.ContentRange) (iobuf.EventSuc
writerBuffer := func(buf []byte, index uint32, current uint64, eof bool) error {
f, wpath, err := c.bucket.WriteChunkFile(c.req.Context(), c.id, index)
if err != nil {
cacheChunkWriteTotal.With(prometheus.Labels{"result": "failed", "store_type": c.bucket.StoreType()}).Inc()
return err
}
defer func() {
Expand Down Expand Up @@ -497,6 +506,7 @@ func (c *Caching) flushbufferSlice(respRange xhttp.ContentRange) (iobuf.EventSuc
c.log.Debugf("flushBuffer wpath=%s isChunked=%t fileChunk=%d/%d", wpath, chunked, index+1, endPart)

if nn, err1 := f.Write(buf); err1 != nil || nn != len(buf) {
cacheChunkWriteTotal.With(prometheus.Labels{"result": "failed", "store_type": c.bucket.StoreType()}).Inc()
return fmt.Errorf("writeBuffer wpath[%s] chunk[%d] failed nn[%d] want[%d] err %v", wpath, index+1, nn, len(buf), err1)
}

Expand All @@ -508,6 +518,7 @@ func (c *Caching) flushbufferSlice(respRange xhttp.ContentRange) (iobuf.EventSuc
_ = c.bucket.Store(c.req.Context(), c.md)
}

cacheChunkWriteTotal.With(prometheus.Labels{"result": "success", "store_type": c.bucket.StoreType()}).Inc()
return nil
}

Expand All @@ -530,5 +541,6 @@ func (c *Caching) flushbufferSlice(respRange xhttp.ContentRange) (iobuf.EventSuc
// flushFailed flush cache file to bucket failed callback
func (c *Caching) flushFailed(err error) {
c.log.Errorf("flush body to disk failed: %v", err)
cacheFlushFailedTotal.WithLabelValues(c.bucket.StoreType()).Inc()
_ = c.bucket.DiscardWithMetadata(c.req.Context(), c.md)
}
2 changes: 2 additions & 0 deletions server/middleware/caching/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func getContents(c *Caching, reqChunks []uint32, from uint32) (io.ReadCloser, in
toByte := min(c.md.Size-1, uint64(availableChunks[index]*uint32(partSize))-1)

// Request is automatically cloned by getUpstreamReader
cacheFillrangeTotal.WithLabelValues(c.bucket.StoreType()).Inc()
reader, err := c.getUpstreamReader(fromByte, toByte, true)
if err != nil {
_ = chunkFile.Close()
Expand All @@ -223,6 +224,7 @@ func getContents(c *Caching, reqChunks []uint32, from uint32) (io.ReadCloser, in
toByte := min(c.md.Size-1, tailChunkSize)

// Request is automatically cloned by getUpstreamReader
cacheFillrangeTotal.WithLabelValues(c.bucket.StoreType()).Inc()
reader, err := c.getUpstreamReader(fromByte, toByte, true)
if err != nil {
return nil, 0, err
Expand Down
50 changes: 50 additions & 0 deletions server/middleware/caching/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package caching

import (
pkgmetrics "github.com/omalloc/tavern/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)

var (
// cacheRequestTotal tracks cache request outcomes by cache status and store type.
// Labels: cache_status (HIT/MISS/PART_HIT/PART_MISS/BYPASS/REVALIDATE_HIT/REVALIDATE_MISS/HOT_HIT),
// store_type (disk/memory/hot/warm)
cacheRequestTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: pkgmetrics.Namespace,
Name: "cache_requests_total",
Help: "The total number of cache requests by status and store type",
}, []string{"cache_status", "store_type"})

// cacheChunkWriteTotal tracks chunk write outcomes during cache fill.
// Labels: result (success/failed), store_type
cacheChunkWriteTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: pkgmetrics.Namespace,
Name: "cache_chunk_write_total",
Help: "The total number of chunk write operations by result",
}, []string{"result", "store_type"})

// cacheFlushFailedTotal counts flush-to-disk failures that cause object discard.
// Labels: store_type
cacheFlushFailedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: pkgmetrics.Namespace,
Name: "cache_flush_failed_total",
Help: "The total number of cache flush failures that triggered object discard",
}, []string{"store_type"})

// cacheFillrangeTotal counts how many times the fillrange path was entered
// (upstream sub-requests to fill missing chunks). Labels: store_type
cacheFillrangeTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: pkgmetrics.Namespace,
Name: "cache_fillrange_total",
Help: "The total number of fillrange upstream sub-requests triggered by partial cache hits",
}, []string{"store_type"})
)

func init() {
prometheus.MustRegister(
cacheRequestTotal,
cacheChunkWriteTotal,
cacheFlushFailedTotal,
cacheFillrangeTotal,
)
}
19 changes: 19 additions & 0 deletions server/middleware/recovery/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package recovery

import (
pkgmetrics "github.com/omalloc/tavern/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
)

var (
// panicTotal counts the number of panics caught by the recovery middleware.
panicTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: pkgmetrics.Namespace,
Name: "panics_total",
Help: "The total number of panics caught by the recovery middleware",
})
)

func init() {
prometheus.MustRegister(panicTotal)
}
1 change: 1 addition & 0 deletions server/middleware/recovery/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func Middleware(c *configv1.Middleware) (middleware.Middleware, func(), error) {
// Here you can log the panic or handle it as needed
log.Context(req.Context()).Errorf("middleware recovery: %s \n%s", r, runtime.PrintStackTrace(4))

panicTotal.Inc()
failCount.Add(1)
if failCount.Load() >= int32(opts.FailCountThreshold) {
log.Context(req.Context()).Errorf("middleware recovery: reached fail count threshold (%d), healthy now fail.", opts.FailCountThreshold)
Expand Down
14 changes: 14 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"dario.cat/mergo"
"github.com/cloudflare/tableflip"
Expand Down Expand Up @@ -68,6 +69,14 @@ func NewServer(flip *tableflip.Upgrader, config *conf.Bootstrap, plugins []plugi
IdleTimeout: servConfig.IdleTimeout,
ReadHeaderTimeout: servConfig.ReadHeaderTimeout,
MaxHeaderBytes: servConfig.MaxHeaderBytes,
ConnState: func(_ net.Conn, state http.ConnState) {
switch state {
case http.StateNew:
connectionsActive.Inc()
case http.StateClosed, http.StateHijacked:
connectionsActive.Dec()
}
},
},
plugins: plugins,
flip: flip,
Expand Down Expand Up @@ -212,6 +221,11 @@ func (s *HTTPServer) newServeMux() *http.ServeMux {
// buildHandler ... Cache 主流程入口
func (s *HTTPServer) buildHandler(tripper http.RoundTripper) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
start := time.Now()
defer func() {
requestDuration.WithLabelValues(req.Method).Observe(time.Since(start).Seconds())
}()

var clog = log.Context(req.Context())
var resp *http.Response
var err error
Expand Down
Loading
Loading