-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhistory_timeline.go
More file actions
105 lines (94 loc) · 2.69 KB
/
history_timeline.go
File metadata and controls
105 lines (94 loc) · 2.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package queue
import (
"sync"
"time"
)
const (
historyTimelineRetention = 8 * 24 * time.Hour
historyTimelineMaxPoints = 30000
historyTimelineMinInterval = 30 * time.Second
)
type historyTimeline struct {
mu sync.Mutex
byQueue map[string][]QueueHistoryPoint
}
var defaultHistoryTimeline = &historyTimeline{byQueue: make(map[string][]QueueHistoryPoint)}
func timelineDuration(window QueueHistoryWindow) time.Duration {
switch window {
case QueueHistoryWeek:
return 7 * 24 * time.Hour
case QueueHistoryDay:
return 24 * time.Hour
default:
return time.Hour
}
}
// TimelineHistoryFromSnapshot records queue counters and returns windowed points.
// This is intended for drivers that don't expose native multi-point history.
// @group Admin
//
// Example: timeline history from snapshots
//
// snapshot := queue.StatsSnapshot{
// ByQueue: map[string]queue.QueueCounters{
// "default": {Processed: 5, Failed: 1},
// },
// }
// points := queue.TimelineHistoryFromSnapshot(snapshot, "default", queue.QueueHistoryHour)
// fmt.Println(len(points) >= 1)
// // Output: true
func TimelineHistoryFromSnapshot(snapshot StatsSnapshot, queueName string, window QueueHistoryWindow) []QueueHistoryPoint {
return defaultHistoryTimeline.recordAndRead(snapshot, normalizeQueueName(queueName), window, time.Now())
}
func (t *historyTimeline) recordAndRead(snapshot StatsSnapshot, queueName string, window QueueHistoryWindow, now time.Time) []QueueHistoryPoint {
if t == nil {
return nil
}
if now.IsZero() {
now = time.Now()
}
t.mu.Lock()
defer t.mu.Unlock()
cutoff := now.Add(-historyTimelineRetention)
for name, counters := range snapshot.ByQueue {
if name == "" {
continue
}
points := t.byQueue[name]
entry := QueueHistoryPoint{At: now, Processed: counters.Processed, Failed: counters.Failed}
if len(points) > 0 {
last := points[len(points)-1]
if last.Processed == entry.Processed && last.Failed == entry.Failed && now.Sub(last.At) < historyTimelineMinInterval {
continue
}
}
points = append(points, entry)
start := 0
for start < len(points) && points[start].At.Before(cutoff) {
start++
}
if start > 0 {
points = append([]QueueHistoryPoint(nil), points[start:]...)
}
if len(points) > historyTimelineMaxPoints {
points = append([]QueueHistoryPoint(nil), points[len(points)-historyTimelineMaxPoints:]...)
}
t.byQueue[name] = points
}
if queueName == "" {
return nil
}
points := t.byQueue[queueName]
if len(points) == 0 {
return nil
}
windowCutoff := now.Add(-timelineDuration(window))
out := make([]QueueHistoryPoint, 0, len(points))
for _, point := range points {
if point.At.Before(windowCutoff) {
continue
}
out = append(out, point)
}
return out
}