Skip to content

Commit 2cde868

Browse files
committed
refactor
1 parent cbed290 commit 2cde868

7 files changed

Lines changed: 96 additions & 82 deletions

File tree

dialer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ func TestDialer_Dial(t *testing.T) {
3434
Dial: d2.Dial,
3535
},
3636
}
37-
<-Ticker.TickCh()
37+
<-DefaultTicker.Ch()
3838
resp1, err1 := client1.Get(srv.URL)
3939
resp2, err2 := client2.Get(srv.URL)
40-
<-Ticker.TickCh()
40+
<-DefaultTicker.Ch()
4141

4242
if err1 != nil {
4343
t.Fatal(err1)

limiter.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,18 @@ type DialContextFn func(ctx context.Context, network string, address string) (ne
1010
var DefaultNetDialer = &net.Dialer{}
1111

1212
type Limiter struct {
13+
*Ticker
1314
Reads *Operation
1415
Writes *Operation
1516
}
1617

17-
// NewLimiter returns a new limiter. If you provide limits, the first will set
18+
// NewLimiter returns a new limiter from DefaultTicker.
19+
// If you provide limits, the first will set
1820
// both read and write limits, the second will set the write limit.
1921
//
20-
// To stop the limiter and free it's resources, call Stop.
22+
// To stop the Limiter and free it's resources, call Stop.
2123
func NewLimiter(limits ...int64) *Limiter {
22-
return &Limiter{
23-
Reads: NewOperation(limits, 0),
24-
Writes: NewOperation(limits, 1),
25-
}
24+
return DefaultTicker.NewLimiter(limits...)
2625
}
2726

2827
// Stop stops the Limiter and frees any resources. Reads and writes on

limiter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestLimiter_Stop(t *testing.T) {
2121
t.Error(n)
2222
}
2323
l.Stop()
24-
<-Ticker.TickCh()
24+
<-DefaultTicker.Ch()
2525

2626
n, err = l.Reads.io(r.Read, buf)
2727
if err != io.EOF {

operation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (op *Operation) run(ch chan<- int) {
7171
todo = max(1, int(limit/secparts))
7272
batch = min(batchsize, todo)
7373
}
74-
tickCh := Ticker.TickCh()
74+
tickCh := DefaultTicker.Ch()
7575

7676
partialsecond:
7777
for {

operation_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,18 @@ func TestOperation_read_rate_low(t *testing.T) {
103103
buf := make([]byte, 1001)
104104

105105
var tickCount atomic.Int32
106-
oldOnTick := Ticker.GetOnTick()
107-
defer Ticker.SetOnTick(oldOnTick)
108-
<-Ticker.TickCh()
109-
Ticker.SetOnTick(func() { tickCount.Add(1) })
106+
oldOnTick := DefaultTicker.GetOnTick()
107+
defer DefaultTicker.SetOnTick(oldOnTick)
108+
<-DefaultTicker.Ch()
109+
DefaultTicker.SetOnTick(func() { tickCount.Add(1) })
110110

111-
<-Ticker.TickCh()
111+
<-DefaultTicker.Ch()
112112
// should read in batches of 1000/secparts (=100) bytes
113113
now := time.Now()
114114
n, err := l.Reads.io(r.Read, buf)
115115
elapsed := time.Since(now)
116116
rate := l.Reads.Rate.Load()
117-
<-Ticker.TickCh()
117+
<-DefaultTicker.Ch()
118118

119119
if n := tickCount.Load(); n < 11 || n > 13 {
120120
t.Error(n)

operationticker.go

Lines changed: 0 additions & 66 deletions
This file was deleted.

ticker.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package bwlimit
2+
3+
import (
4+
"sync"
5+
"time"
6+
)
7+
8+
// A Ticker synchronizes rate calculation among multiple Limiters and
9+
// provides the on-tick callback.
10+
type Ticker struct {
11+
mu sync.Mutex
12+
ch chan struct{}
13+
fn func()
14+
}
15+
16+
var DefaultTicker *Ticker
17+
18+
// NewLimiter returns a new Limiter using this Ticker.
19+
// If you provide limits, the first will set
20+
// both read and write limits, the second will set the write limit.
21+
//
22+
// To stop the limiter and free it's resources, call Stop.
23+
func (ot *Ticker) NewLimiter(limits ...int64) *Limiter {
24+
return &Limiter{
25+
Ticker: ot,
26+
Reads: NewOperation(limits, 0),
27+
Writes: NewOperation(limits, 1),
28+
}
29+
}
30+
31+
func (ot *Ticker) SetOnTick(fn func()) {
32+
ot.mu.Lock()
33+
ot.fn = fn
34+
ot.mu.Unlock()
35+
}
36+
37+
func (ot *Ticker) GetOnTick() (fn func()) {
38+
ot.mu.Lock()
39+
fn = ot.fn
40+
ot.mu.Unlock()
41+
return
42+
}
43+
44+
func (ot *Ticker) Ch() (ch <-chan struct{}) {
45+
ot.mu.Lock()
46+
ch = ot.ch
47+
ot.mu.Unlock()
48+
return
49+
}
50+
51+
func (ot *Ticker) run() {
52+
defer close(ot.ch)
53+
54+
tckr := time.NewTicker(interval)
55+
defer tckr.Stop()
56+
57+
for range tckr.C {
58+
ot.mu.Lock()
59+
oldCh := ot.ch
60+
ot.ch = make(chan struct{})
61+
ot.mu.Unlock()
62+
close(oldCh)
63+
}
64+
}
65+
66+
func (ot *Ticker) runOnTick() {
67+
for {
68+
<-ot.Ch()
69+
if fn := ot.GetOnTick(); fn != nil {
70+
fn()
71+
}
72+
}
73+
}
74+
75+
func init() {
76+
DefaultTicker = &Ticker{
77+
ch: make(chan struct{}),
78+
}
79+
go DefaultTicker.run()
80+
go DefaultTicker.runOnTick()
81+
}

0 commit comments

Comments
 (0)