@@ -16,13 +16,12 @@ type Operation struct {
1616 Rate atomic.Int64 // current rate in bytes/sec
1717 avail atomic.Int64
1818 count atomic.Int64
19- batch atomic.Int64
20- ch <- chan struct {}
19+ ch <- chan int
2120 reader bool
2221}
2322
2423func NewOperation (ctx context.Context , limits []int64 , idx int ) (op * Operation ) {
25- ch := make (chan struct {}, secparts )
24+ ch := make (chan int )
2625 op = & Operation {ch : ch , reader : idx == 0 }
2726 var limit int64
2827 if len (limits ) > 0 {
@@ -36,36 +35,40 @@ func NewOperation(ctx context.Context, limits []int64, idx int) (op *Operation)
3635 return
3736}
3837
39- func (op * Operation ) run (ctx context.Context , ch chan <- struct {} ) {
38+ func (op * Operation ) run (ctx context.Context , ch chan <- int ) {
4039 defer close (ch )
4140 seccount := 0
4241 counts := make ([]int64 , secparts )
4342 tckr := time .NewTicker (interval )
4443 defer tckr .Stop ()
4544
4645 for {
47- limit := op .Limit .Load ()
48- todo := max (1 , limit / secparts )
49- batch := min (batchsize , todo )
50- op .batch .Store (batch )
51- drive:
46+ var limitCh chan <- int
47+ var todo int
48+ var batch int
49+ if limit := op .Limit .Load (); limit > 0 {
50+ limitCh = ch
51+ todo = max (1 , int (limit / secparts ))
52+ batch = min (batchsize , todo )
53+ }
54+
55+ partialsecond:
5256 for {
5357 select {
5458 case <- ctx .Done ():
5559 return
56- case ch <- struct {}{}:
57- if limit > 0 {
58- todo -= batch
59- todo += op .avail .Swap (0 )
60- if todo < batch {
61- <- tckr .C
62- break drive
63- }
60+ case limitCh <- batch :
61+ todo -= batch
62+ todo += int (op .avail .Swap (0 ))
63+ if todo < batch {
64+ <- tckr .C
65+ break partialsecond
6466 }
6567 case <- tckr .C :
66- break drive
68+ break partialsecond
6769 }
6870 }
71+
6972 counts [seccount ] = op .count .Swap (0 )
7073 seccount ++
7174 if seccount >= secparts {
@@ -86,11 +89,10 @@ func (op *Operation) io(fn func([]byte) (int, error), b []byte) (n int, err erro
8689 return
8790 }
8891 for len (b ) > 0 && err == nil {
89- _ , ok := <- op .ch
92+ batch , ok := <- op .ch
9093 err = io .EOF
9194 if ok {
9295 var done int
93- batch := int (op .batch .Load ())
9496 todo := min (len (b ), batch )
9597 done , err = fn (b [:todo ])
9698 op .avail .Add (int64 (batch - done ))
0 commit comments