Skip to content

Commit b2a498f

Browse files
committed
handle mid-stream unlimiting
1 parent 32c7fe8 commit b2a498f

2 files changed

Lines changed: 58 additions & 19 deletions

File tree

operation.go

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -109,29 +109,35 @@ func (op *Operation) run(ch chan<- int64) {
109109
}
110110

111111
func (op *Operation) io(fn func([]byte) (int, error), b []byte) (n int, err error) {
112-
if op.Limit.Load() < 1 {
113-
n, err = fn(b)
114-
op.count.Add(int64(n))
115-
return
116-
}
112+
outer:
117113
for len(b) > 0 && err == nil {
118-
batch, ok := <-op.ch
119-
err = io.EOF
120-
if ok {
121-
var done int
122-
todo := min(int64(len(b)), batch)
123-
done, err = fn(b[:todo])
124-
op.avail.Add(batch - int64(done))
125-
if done > 0 {
126-
op.count.Add(int64(done))
127-
n += int(done)
128-
b = b[done:]
129-
}
130-
if op.reader && int64(done) < todo {
131-
break
114+
var done int
115+
if op.Limit.Load() < 1 {
116+
done, err = fn(b)
117+
n += done
118+
op.count.Add(int64(done))
119+
return
120+
}
121+
select {
122+
case batch, ok := <-op.ch:
123+
err = io.EOF
124+
if ok {
125+
todo := min(int64(len(b)), batch)
126+
done, err = fn(b[:todo])
127+
op.avail.Add(batch - int64(done))
128+
if done > 0 {
129+
op.count.Add(int64(done))
130+
n += int(done)
131+
b = b[done:]
132+
}
133+
if op.reader && int64(done) < todo {
134+
break outer
135+
}
132136
}
137+
case <-op.WaitCh():
133138
}
134139
}
140+
135141
if op.reader && n > 0 && err == io.EOF {
136142
err = nil
137143
}

operation_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,39 @@ func TestOperation_io_read_limit(t *testing.T) {
9696
}
9797
}
9898

99+
func TestOperation_io_read_limit_change_to_unlimited(t *testing.T) {
100+
l := NewLimiter(100)
101+
defer l.Stop()
102+
103+
r := bytes.NewReader(make([]byte, 1000))
104+
done := make(chan struct{})
105+
var n int
106+
var err error
107+
108+
go func() {
109+
n, err = l.Reads.io(r.Read, make([]byte, 1000))
110+
close(done)
111+
}()
112+
113+
// Let the read enter limited mode before switching to unlimited.
114+
<-l.WaitCh()
115+
<-l.WaitCh()
116+
l.Reads.Limit.Store(0)
117+
118+
select {
119+
case <-done:
120+
case <-time.After(time.Second):
121+
t.Fatal("read hung after limit changed to unlimited")
122+
}
123+
124+
if err != nil {
125+
t.Fatal(err)
126+
}
127+
if n != 1000 {
128+
t.Fatal(n)
129+
}
130+
}
131+
99132
func TestOperation_read_rate_low(t *testing.T) {
100133
l := NewLimiter(1000)
101134
defer l.Stop()

0 commit comments

Comments
 (0)