-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathlogplexc.go
More file actions
324 lines (264 loc) · 7.27 KB
/
logplexc.go
File metadata and controls
324 lines (264 loc) · 7.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
/*
Package logplexc provides Logplex clients that can be linked into
other Go programs.
Most users will want to use logplexc.Client, which implements a
concurrent client, including periodic flushing, dropping, statistics
accumulation, and request-level parallelism. logplexc.Client is built
with logplexc.MiniClient.
To use the client, call NewClient with a configuration structure:
cfg := logplexc.Config{
Logplex: "https://t:123@my.logplex.example.com",
HttpClient: client,
RequestSizeTrigger: 100 * KB,
Concurrency: 3,
Period: 3 * time.Second,
}
c, err := logplexc.NewClient(&cfg)
defer c.Close()
// Messages will be periodically flushed.
c.BufferMessage(...)
Those with advanced needs will want to use the low level
logplexc.MiniClient, which implements log formatting, buffering, and
HTTP POSTing.
*/
package logplexc
import (
"errors"
"net/http"
"net/url"
"runtime"
"sync"
"sync/atomic"
"time"
)
type Stats struct {
// Number of concurrent requests at the time of retrieval.
Concurrency int32
// Message-level statistics
// Total messages submitted
Total uint64
// Incremented when a message is ignored outright because of
// too much work being done already.
Dropped uint64
// Incremented when a log post request is not known to have
// succeeded and one has given up waiting.
Cancelled uint64
// Incremented when a log post request is responded to,
// affirming that the messages have been rejected.
Rejected uint64
// Incremented only when a positive response is received from
// logplex.
Successful uint64
// Request-level statistics
TotalRequests uint64
DroppedRequests uint64
CancelRequests uint64
RejectRequests uint64
SuccessRequests uint64
}
type TimeTriggerBehavior byte
const (
// Carefully choose the zero-value so it is a reasonable
// default, so that a user requesting the other behaviors --
// which do not need a time -- can write things like:
// TimeTrigger{Behavior: TimeTriggerImmediate} without
// specifying a Period.
TimeTriggerPeriodic TimeTriggerBehavior = iota
TimeTriggerImmediate
TimeTriggerNever
)
// Client represents a Logplex embeddable client implementation that includes
// concurrency, dropping, and statistics gathering.
type Client struct {
s Stats
statLock sync.Mutex
c *MiniClient
// Concurrency control of POST workers: the current level of
// concurrency, and a token bucket channel.
bucket chan struct{}
// Threshold of logplex request size to trigger POST.
requestSizeTrigger int
// For implementing timely flushing of log buffers.
timeTrigger TimeTriggerBehavior
ticker *time.Ticker
tickerShutdown chan struct{}
bucketDepth int
}
// Config represents a Client configuration
type Config struct {
Logplex url.URL
HTTPClient http.Client
RequestSizeTrigger int
Concurrency int
Period time.Duration
// Optional: Can be set for advanced behaviors like triggering
// Never or Immediately.
TimeTrigger TimeTriggerBehavior
}
// NewClient returns a new logplex client based on the given config.
func NewClient(cfg *Config) (*Client, error) {
c, err := NewMiniClient(
&MiniConfig{
Logplex: cfg.Logplex,
HTTPClient: cfg.HTTPClient,
})
if err != nil {
return nil, err
}
m := Client{
c: c,
bucket: make(chan struct{}),
requestSizeTrigger: cfg.RequestSizeTrigger,
}
// Handle determining m.timeTrigger. This complexity seems
// reasonable to allow the user to get some input checking
// (negative Periods) and to get TimeTriggerImmediate by
// passing a zero-duration period (TimeTriggerImmediate is
// still useful for internal bookkeeping).
switch cfg.TimeTrigger {
case TimeTriggerPeriodic:
if cfg.Period < 0 {
return nil, errors.New(
"logplexc.Client: negative target " +
"latency not allowed")
} else if cfg.Period == 0 {
// Rewrite a zero-duration period into an
// immediate flush.
m.timeTrigger = TimeTriggerImmediate
} else if cfg.Period > 0 {
m.timeTrigger = TimeTriggerPeriodic
} else {
panic("bug")
}
default:
m.timeTrigger = cfg.TimeTrigger
}
// Supply tokens to do work with bounded concurrency.
m.bucketDepth = cfg.Concurrency
go func() {
for i := 0; i < m.bucketDepth; i++ {
m.bucket <- struct{}{}
}
}()
// Set up the time-based log flushing, if requested.
if m.timeTrigger == TimeTriggerPeriodic {
m.ticker = time.NewTicker(cfg.Period)
m.tickerShutdown = make(chan struct{})
go func() {
for {
// Wait for a while to do work, or to
// exit when the ticker is stopped
// when Close is called on the client.
select {
case <-m.ticker.C:
m.maybeWork()
case <-m.tickerShutdown:
return
}
}
}()
}
return &m, nil
}
// Close closes connection and flush logs.
func (m *Client) Close() {
if m.timeTrigger == TimeTriggerPeriodic {
// Clean up otherwise immortal ticker goroutine.
m.ticker.Stop()
m.tickerShutdown <- struct{}{}
}
// Make an attempt to send the final buffer, if any.
m.maybeWork()
// Drain all work tokens.
for i := 0; i < m.bucketDepth; i++ {
<-m.bucket
}
close(m.bucket)
}
func (m *Client) BufferMessage(
priority int, when time.Time, host string, procID string,
log []byte) error {
s := m.c.BufferMessage(priority, when, host, procID, log)
if s.Buffered >= m.requestSizeTrigger ||
m.timeTrigger == TimeTriggerImmediate {
m.maybeWork()
}
return nil
}
func (m *Client) Statistics() (s Stats) {
m.statLock.Lock()
defer m.statLock.Unlock()
s = m.s
return s
}
func (m *Client) maybeWork() {
atomic.AddInt32(&m.s.Concurrency, 1)
defer atomic.AddInt32(&m.s.Concurrency, -1)
b := m.c.SwapBundle()
// Avoid sending empty requests
if b.NumberFramed <= 0 {
return
}
// Check if there are any worker tokens available. If not,
// then just abort after recording drop statistics.
select {
case <-m.bucket:
go m.postBundle(&b)
default:
m.statReqDrop(&b.MiniStats)
// In GOMAXPROCS=1 cases, tight loops can starve out
// any of the workers predictably and seemingly
// forever.
runtime.Gosched()
}
}
func (m *Client) postBundle(b *Bundle) {
// When exiting, free up the token for use by another
// worker.
defer func() { m.bucket <- struct{}{} }()
// Post to logplex.
resp, err := m.c.Post(b)
if err != nil {
m.statReqErr(&b.MiniStats)
return
}
defer resp.Body.Close()
// Check HTTP return code and accrue statistics accordingly.
if resp.StatusCode != http.StatusNoContent {
m.statReqRej(&b.MiniStats)
} else {
m.statReqSuccess(&b.MiniStats)
}
}
func (m *Client) statReqTotalUnsync(s *MiniStats) {
m.s.Total += s.NumberFramed
m.s.TotalRequests++
}
func (m *Client) statReqSuccess(s *MiniStats) {
m.statLock.Lock()
defer m.statLock.Unlock()
m.statReqTotalUnsync(s)
m.s.Successful += s.NumberFramed
m.s.SuccessRequests++
}
func (m *Client) statReqErr(s *MiniStats) {
m.statLock.Lock()
defer m.statLock.Unlock()
m.statReqTotalUnsync(s)
m.s.Cancelled += s.NumberFramed
m.s.CancelRequests++
}
func (m *Client) statReqRej(s *MiniStats) {
m.statLock.Lock()
defer m.statLock.Unlock()
m.statReqTotalUnsync(s)
m.s.Rejected += s.NumberFramed
m.s.RejectRequests++
}
func (m *Client) statReqDrop(s *MiniStats) {
m.statLock.Lock()
defer m.statLock.Unlock()
m.statReqTotalUnsync(s)
m.s.Dropped += s.NumberFramed
m.s.DroppedRequests++
}