-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathval.go
More file actions
329 lines (292 loc) · 7.73 KB
/
val.go
File metadata and controls
329 lines (292 loc) · 7.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
// Copyright (C) 2015 Space Monkey, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package monkit
import (
"sync"
"sync/atomic"
"time"
)
// IntVal is a convenience wrapper around an IntDist. Constructed using
// NewIntVal, though its expected usage is like:
//
// var mon = monkit.Package()
//
// func MyFunc() {
// ...
// mon.IntVal("size").Observe(val)
// ...
// }
type IntVal struct {
mtx sync.Mutex
dist IntDist
}
// NewIntVal creates an IntVal
func NewIntVal(key SeriesKey) (v *IntVal) {
v = &IntVal{}
initIntDist(&v.dist, key)
return v
}
// Observe observes an integer value
func (v *IntVal) Observe(val int64) {
v.mtx.Lock()
v.dist.Insert(val)
v.mtx.Unlock()
}
// Stats implements the StatSource interface.
func (v *IntVal) Stats(cb func(key SeriesKey, field string, val float64)) {
v.mtx.Lock()
vd := v.dist.Copy()
v.mtx.Unlock()
vd.Stats(cb)
}
// Quantile returns an estimate of the requested quantile of observed values.
// 0 <= quantile <= 1
func (v *IntVal) Quantile(quantile float64) (rv int64) {
v.mtx.Lock()
rv = v.dist.Query(quantile)
v.mtx.Unlock()
return rv
}
// FloatVal is a convenience wrapper around an FloatDist. Constructed using
// NewFloatVal, though its expected usage is like:
//
// var mon = monkit.Package()
//
// func MyFunc() {
// ...
// mon.FloatVal("size").Observe(val)
// ...
// }
type FloatVal struct {
mtx sync.Mutex
dist FloatDist
}
// NewFloatVal creates a FloatVal
func NewFloatVal(key SeriesKey) (v *FloatVal) {
v = &FloatVal{}
initFloatDist(&v.dist, key)
return v
}
// Observe observes an floating point value
func (v *FloatVal) Observe(val float64) {
v.mtx.Lock()
v.dist.Insert(val)
v.mtx.Unlock()
}
// Stats implements the StatSource interface.
func (v *FloatVal) Stats(cb func(key SeriesKey, field string, val float64)) {
v.mtx.Lock()
vd := v.dist.Copy()
v.mtx.Unlock()
vd.Stats(cb)
}
// Quantile returns an estimate of the requested quantile of observed values.
// 0 <= quantile <= 1
func (v *FloatVal) Quantile(quantile float64) (rv float64) {
v.mtx.Lock()
rv = v.dist.Query(quantile)
v.mtx.Unlock()
return rv
}
// BoolVal keeps statistics about boolean values. It keeps the number of trues,
// number of falses, and the disposition (number of trues minus number of
// falses). Constructed using NewBoolVal, though its expected usage is like:
//
// var mon = monkit.Package()
//
// func MyFunc() {
// ...
// mon.BoolVal("flipped").Observe(bool)
// ...
// }
type BoolVal struct {
trues int64
falses int64
recent int32
key SeriesKey
}
// NewBoolVal creates a BoolVal
func NewBoolVal(key SeriesKey) *BoolVal {
return &BoolVal{key: key}
}
// Observe observes a boolean value
func (v *BoolVal) Observe(val bool) {
if val {
atomic.AddInt64(&v.trues, 1)
atomic.StoreInt32(&v.recent, 1)
} else {
atomic.AddInt64(&v.falses, 1)
atomic.StoreInt32(&v.recent, 0)
}
}
// Stats implements the StatSource interface.
func (v *BoolVal) Stats(cb func(key SeriesKey, field string, val float64)) {
trues := atomic.LoadInt64(&v.trues)
falses := atomic.LoadInt64(&v.falses)
recent := atomic.LoadInt32(&v.recent)
cb(v.key, "disposition", float64(trues-falses))
cb(v.key, "false", float64(falses))
cb(v.key, "recent", float64(recent))
cb(v.key, "true", float64(trues))
}
// StructVal keeps track of a structure of data. Constructed using
// NewStructVal, though its expected usage is like:
//
// var mon = monkit.Package()
//
// func MyFunc() {
// ...
// mon.StructVal("stats").Observe(stats)
// ...
// }
type StructVal struct {
mtx sync.Mutex
recent interface{}
key SeriesKey
}
// NewStructVal creates a StructVal
func NewStructVal(key SeriesKey) *StructVal {
return &StructVal{key: key}
}
// Observe observes a struct value. Only the fields convertable to float64 will
// be monitored. A reference to the most recently called Observe value is kept
// for reading when Stats is called.
func (v *StructVal) Observe(val interface{}) {
v.mtx.Lock()
v.recent = val
v.mtx.Unlock()
}
// Stats implements the StatSource interface.
func (v *StructVal) Stats(cb func(key SeriesKey, field string, val float64)) {
v.mtx.Lock()
recent := v.recent
v.mtx.Unlock()
if recent != nil {
StatSourceFromStruct(v.key, recent).Stats(cb)
}
}
// DurationVal is a convenience wrapper around an DurationVal. Constructed using
// NewDurationVal, though its expected usage is like:
//
// var mon = monkit.Package()
//
// func MyFunc() {
// ...
// mon.DurationVal("time").Observe(val)
// ...
// }
type DurationVal struct {
mtx sync.Mutex
dist DurationDist
}
// NewDurationVal creates an DurationVal
func NewDurationVal(key SeriesKey) (v *DurationVal) {
v = &DurationVal{}
initDurationDist(&v.dist, key)
return v
}
// Observe observes an integer value
func (v *DurationVal) Observe(val time.Duration) {
v.mtx.Lock()
v.dist.Insert(val)
v.mtx.Unlock()
}
// Stats implements the StatSource interface.
func (v *DurationVal) Stats(cb func(key SeriesKey, field string, val float64)) {
v.mtx.Lock()
vd := v.dist.Copy()
v.mtx.Unlock()
vd.Stats(cb)
}
// Quantile returns an estimate of the requested quantile of observed values.
// 0 <= quantile <= 1
func (v *DurationVal) Quantile(quantile float64) (rv time.Duration) {
v.mtx.Lock()
rv = v.dist.Query(quantile)
v.mtx.Unlock()
return rv
}
// Aggregate can implement additional aggregation for collected values.
type Aggregate func() (observe func(val float64), stat func() (field string, val float64))
// RawVal is a simple wrapper around a float64 value without any aggregation
// (histogram, sum, etc). Constructed using NewRawVal, though its expected usage is like:
//
// var mon = monkit.Package()
//
// func MyFunc() {
// ...
// mon.RawVal("value").Observe(val)
// ...
// }
type RawVal struct {
mtx sync.Mutex
value float64
key SeriesKey
stats []func() (field string, val float64)
observers []func(val float64)
}
// NewRawVal creates a RawVal
func NewRawVal(key SeriesKey, aggregations ...Aggregate) *RawVal {
val := &RawVal{key: key}
for _, agg := range aggregations {
observe, stat := agg()
val.stats = append(val.stats, stat)
val.observers = append(val.observers, observe)
}
return val
}
// Observe sets the current value
func (v *RawVal) Observe(val float64) {
v.mtx.Lock()
v.value = val
for _, o := range v.observers {
o(val)
}
v.mtx.Unlock()
}
// Stats implements the StatSource interface.
func (v *RawVal) Stats(cb func(key SeriesKey, field string, val float64)) {
v.mtx.Lock()
cb(v.key, "recent", v.value)
v.mtx.Unlock()
for _, s := range v.stats {
field, value := s()
cb(v.key, field, value)
}
}
// Get returns the current value
func (v *RawVal) Get() float64 {
v.mtx.Lock()
value := v.value
v.mtx.Unlock()
return value
}
// Count is a value aggregator that counts the number of times the value is measured.
func Count() (observe func(val float64), stat func() (field string, val float64)) {
var counter int
return func(val float64) {
counter++
}, func() (field string, val float64) {
return "count", float64(counter)
}
}
// Sum is a value aggregator that summarizes the values measured.
func Sum() (observe func(val float64), stat func() (field string, val float64)) {
var sum int
return func(val float64) {
sum += int(val)
}, func() (field string, val float64) {
return "sum", float64(sum)
}
}