Skip to content

Commit dff89a2

Browse files
committed
refactor
1 parent 2cde868 commit dff89a2

5 files changed

Lines changed: 33 additions & 52 deletions

File tree

dialer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ func TestDialer_Dial(t *testing.T) {
3434
Dial: d2.Dial,
3535
},
3636
}
37-
<-DefaultTicker.Ch()
37+
<-l1.WaitCh()
3838
resp1, err1 := client1.Get(srv.URL)
3939
resp2, err2 := client2.Get(srv.URL)
40-
<-DefaultTicker.Ch()
40+
<-l1.WaitCh()
4141

4242
if err1 != nil {
4343
t.Fatal(err1)

limiter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestLimiter_Stop(t *testing.T) {
2121
t.Error(n)
2222
}
2323
l.Stop()
24-
<-DefaultTicker.Ch()
24+
<-l.WaitCh()
2525

2626
n, err = l.Reads.io(r.Read, buf)
2727
if err != io.EOF {

operation.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,22 @@ const interval = time.Second / secparts
1212
const batchsize = 4096
1313

1414
type Operation struct {
15-
Limit atomic.Int64 // bandwith limit in bytes/sec
16-
Rate atomic.Int64 // current rate in bytes/sec
17-
Count atomic.Int64 // number of bytes seen
18-
avail atomic.Int64
19-
count atomic.Int64
20-
ch <-chan int
21-
reader bool
22-
mu sync.Mutex // protects following
23-
stopCh chan struct{}
15+
*Ticker // Ticker we belong to
16+
Limit atomic.Int64 // bandwith limit in bytes/sec
17+
Rate atomic.Int64 // current rate in bytes/sec
18+
Count atomic.Int64 // number of bytes seen
19+
avail atomic.Int64
20+
count atomic.Int64
21+
ch <-chan int
22+
reader bool
23+
mu sync.Mutex // protects following
24+
stopCh chan struct{}
2425
}
2526

26-
func NewOperation(limits []int64, idx int) (op *Operation) {
27+
func NewOperation(t *Ticker, limits []int64, idx int) (op *Operation) {
2728
ch := make(chan int)
2829
op = &Operation{
30+
Ticker: t,
2931
ch: ch,
3032
stopCh: make(chan struct{}),
3133
reader: idx == 0,
@@ -71,7 +73,7 @@ func (op *Operation) run(ch chan<- int) {
7173
todo = max(1, int(limit/secparts))
7274
batch = min(batchsize, todo)
7375
}
74-
tickCh := DefaultTicker.Ch()
76+
waitCh := op.WaitCh()
7577

7678
partialsecond:
7779
for {
@@ -82,10 +84,10 @@ func (op *Operation) run(ch chan<- int) {
8284
todo -= batch
8385
todo += int(op.avail.Swap(0))
8486
if todo < batch {
85-
<-tickCh
87+
<-waitCh
8688
break partialsecond
8789
}
88-
case <-tickCh:
90+
case <-waitCh:
8991
break partialsecond
9092
}
9193
}

operation_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,20 @@ func TestOperation_read_rate_low(t *testing.T) {
103103
buf := make([]byte, 1001)
104104

105105
var tickCount atomic.Int32
106-
oldOnTick := DefaultTicker.GetOnTick()
107-
defer DefaultTicker.SetOnTick(oldOnTick)
108-
<-DefaultTicker.Ch()
109-
DefaultTicker.SetOnTick(func() { tickCount.Add(1) })
106+
go func() {
107+
for {
108+
<-l.WaitCh()
109+
tickCount.Add(1)
110+
}
111+
}()
110112

111-
<-DefaultTicker.Ch()
112113
// should read in batches of 1000/secparts (=100) bytes
114+
<-l.WaitCh()
113115
now := time.Now()
114116
n, err := l.Reads.io(r.Read, buf)
115117
elapsed := time.Since(now)
116118
rate := l.Reads.Rate.Load()
117-
<-DefaultTicker.Ch()
119+
<-l.WaitCh()
118120

119121
if n := tickCount.Load(); n < 11 || n > 13 {
120122
t.Error(n)

ticker.go

Lines changed: 7 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,10 @@ import (
55
"time"
66
)
77

8-
// A Ticker synchronizes rate calculation among multiple Limiters and
9-
// provides the on-tick callback.
8+
// A Ticker synchronizes rate calculation among multiple Limiters.
109
type Ticker struct {
1110
mu sync.Mutex
1211
ch chan struct{}
13-
fn func()
1412
}
1513

1614
var DefaultTicker *Ticker
@@ -20,28 +18,17 @@ var DefaultTicker *Ticker
2018
// both read and write limits, the second will set the write limit.
2119
//
2220
// To stop the limiter and free it's resources, call Stop.
23-
func (ot *Ticker) NewLimiter(limits ...int64) *Limiter {
21+
func (ot *Ticker) NewLimiter(limits ...int64) (l *Limiter) {
2422
return &Limiter{
2523
Ticker: ot,
26-
Reads: NewOperation(limits, 0),
27-
Writes: NewOperation(limits, 1),
24+
Reads: NewOperation(ot, limits, 0),
25+
Writes: NewOperation(ot, limits, 1),
2826
}
2927
}
3028

31-
func (ot *Ticker) SetOnTick(fn func()) {
32-
ot.mu.Lock()
33-
ot.fn = fn
34-
ot.mu.Unlock()
35-
}
36-
37-
func (ot *Ticker) GetOnTick() (fn func()) {
38-
ot.mu.Lock()
39-
fn = ot.fn
40-
ot.mu.Unlock()
41-
return
42-
}
43-
44-
func (ot *Ticker) Ch() (ch <-chan struct{}) {
29+
// WaitCh returns a channel that will close when the current rate limit
30+
// time slice runs out.
31+
func (ot *Ticker) WaitCh() (ch <-chan struct{}) {
4532
ot.mu.Lock()
4633
ch = ot.ch
4734
ot.mu.Unlock()
@@ -63,19 +50,9 @@ func (ot *Ticker) run() {
6350
}
6451
}
6552

66-
func (ot *Ticker) runOnTick() {
67-
for {
68-
<-ot.Ch()
69-
if fn := ot.GetOnTick(); fn != nil {
70-
fn()
71-
}
72-
}
73-
}
74-
7553
func init() {
7654
DefaultTicker = &Ticker{
7755
ch: make(chan struct{}),
7856
}
7957
go DefaultTicker.run()
80-
go DefaultTicker.runOnTick()
8158
}

0 commit comments

Comments
 (0)