File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -47,20 +47,18 @@ func (p *Pool) Go(f func()) {
4747 default :
4848 // No goroutine was available to handle the task.
4949 // Spawn a new one and send it the task.
50- p .handle .Go (p .worker )
51- p .tasks <- f
50+ p .handle .Go (func () {
51+ p .worker (f )
52+ })
5253 }
5354 } else {
5455 select {
5556 case p .limiter <- struct {}{}:
5657 // If we are below our limit, spawn a new worker rather
5758 // than waiting for one to become available.
58- p .handle .Go (p .worker )
59-
60- // We know there is at least one worker running, so wait
61- // for it to become available. This ensures we never spawn
62- // more workers than the number of tasks.
63- p .tasks <- f
59+ p .handle .Go (func () {
60+ p .worker (f )
61+ })
6462 case p .tasks <- f :
6563 // A worker is available and has accepted the task.
6664 return
@@ -76,6 +74,10 @@ func (p *Pool) Wait() {
7674
7775 close (p .tasks )
7876
77+ // After Wait() returns, reset the struct so tasks will be reinitialized on
78+ // next use. This better matches the behavior of sync.WaitGroup
79+ defer func () { p .initOnce = sync.Once {} }()
80+
7981 p .handle .Wait ()
8082}
8183
@@ -145,11 +147,15 @@ func (p *Pool) WithContext(ctx context.Context) *ContextPool {
145147 }
146148}
147149
148- func (p * Pool ) worker () {
150+ func (p * Pool ) worker (initialFunc func () ) {
149151 // The only time this matters is if the task panics.
150152 // This makes it possible to spin up new workers in that case.
151153 defer p .limiter .release ()
152154
155+ if initialFunc != nil {
156+ initialFunc ()
157+ }
158+
153159 for f := range p .tasks {
154160 f ()
155161 }
Original file line number Diff line number Diff line change @@ -122,6 +122,26 @@ func TestPool(t *testing.T) {
122122 p := New ().WithMaxGoroutines (42 )
123123 require .Equal (t , 42 , p .MaxGoroutines ())
124124 })
125+
126+ t .Run ("is reusable" , func (t * testing.T ) {
127+ t .Parallel ()
128+ var count atomic.Int64
129+ p := New ()
130+ for i := 0 ; i < 10 ; i ++ {
131+ p .Go (func () {
132+ count .Add (1 )
133+ })
134+ }
135+ p .Wait ()
136+ require .Equal (t , int64 (10 ), count .Load ())
137+ for i := 0 ; i < 10 ; i ++ {
138+ p .Go (func () {
139+ count .Add (1 )
140+ })
141+ }
142+ p .Wait ()
143+ require .Equal (t , int64 (20 ), count .Load ())
144+ })
125145}
126146
127147func BenchmarkPool (b * testing.B ) {
You can’t perform that action at this time.
0 commit comments