@@ -53,6 +53,10 @@ type ConsumerOptions struct {
5353 BufferSize int
5454 // Concurrency dictates how many goroutines to spawn to handle the messages.
5555 Concurrency int
56+ // ErrorChannelBuffer sets the size of the Errors channel buffer. If this is
57+ // 0, it defaults to 100. Errors are sent in a non-blocking way to avoid
58+ // stalling processing when no listener is present.
59+ ErrorChannelBuffer int
5660
5761 // MaxDeliveryCount is the maximum number of times a message can be delivered
5862 // before it is considered failed. If this is set to 0, the message will be
@@ -75,10 +79,8 @@ type ConsumerOptions struct {
7579type Consumer struct {
7680 // Errors is a channel that you can receive from to centrally handle any
7781 // errors that may occur either by your ConsumerFuncs or by internal
78- // processing functions. Because this is an unbuffered channel, you must
79- // have a listener on it. If you don't parts of the consumer could stop
80- // functioning when errors occur due to the blocking nature of unbuffered
81- // channels.
82+ // processing functions. Errors are sent in a non-blocking manner, so if the
83+ // channel buffer is full or no listener is present, errors may be dropped.
8284 Errors chan error
8385
8486 options * ConsumerOptions
@@ -90,7 +92,9 @@ type Consumer struct {
9092
9193 stopReclaim chan struct {}
9294 stopPoll chan struct {}
93- stopWorkers chan struct {}
95+
96+ shutdownOnce sync.Once
97+ closeQueueOnce sync.Once
9498}
9599
96100var defaultConsumerOptions = & ConsumerOptions {
@@ -99,6 +103,7 @@ var defaultConsumerOptions = &ConsumerOptions{
99103 ReclaimInterval : 1 * time .Second ,
100104 BufferSize : 100 ,
101105 Concurrency : 10 ,
106+ ErrorChannelBuffer : 100 ,
102107}
103108
104109// NewConsumer uses a default set of options to create a Consumer. It sets Name
@@ -114,6 +119,9 @@ func NewConsumer() (*Consumer, error) {
114119// it defaults to "redisqueue"; if BlockingTimeout is 0, it defaults to 5
115120// seconds; if ReclaimInterval is 0, it defaults to 1 second.
116121func NewConsumerWithOptions (ctx context.Context , options * ConsumerOptions ) (* Consumer , error ) {
122+ if options == nil {
123+ options = & ConsumerOptions {}
124+ }
117125 hostname , _ := os .Hostname ()
118126
119127 if options .Name == "" {
@@ -122,12 +130,27 @@ func NewConsumerWithOptions(ctx context.Context, options *ConsumerOptions) (*Con
122130 if options .GroupName == "" {
123131 options .GroupName = "redisqueue"
124132 }
133+ if options .BufferSize < 0 {
134+ return nil , errors .New ("buffer size must be greater than 0" )
135+ }
136+ if options .BufferSize == 0 {
137+ options .BufferSize = defaultConsumerOptions .BufferSize
138+ }
139+ if options .Concurrency < 0 {
140+ return nil , errors .New ("concurrency must be greater than 0" )
141+ }
142+ if options .Concurrency == 0 {
143+ options .Concurrency = defaultConsumerOptions .Concurrency
144+ }
125145 if options .BlockingTimeout == 0 {
126146 options .BlockingTimeout = 5 * time .Second
127147 }
128148 if options .ReclaimInterval == 0 {
129149 options .ReclaimInterval = 1 * time .Second
130150 }
151+ if options .ErrorChannelBuffer == 0 {
152+ options .ErrorChannelBuffer = defaultConsumerOptions .ErrorChannelBuffer
153+ }
131154
132155 var r redis.UniversalClient
133156
@@ -142,7 +165,7 @@ func NewConsumerWithOptions(ctx context.Context, options *ConsumerOptions) (*Con
142165 }
143166
144167 return & Consumer {
145- Errors : make (chan error ),
168+ Errors : make (chan error , options . ErrorChannelBuffer ),
146169
147170 options : options ,
148171 redis : r ,
@@ -153,7 +176,6 @@ func NewConsumerWithOptions(ctx context.Context, options *ConsumerOptions) (*Con
153176
154177 stopReclaim : make (chan struct {}, 1 ),
155178 stopPoll : make (chan struct {}, 1 ),
156- stopWorkers : make (chan struct {}, options .Concurrency ),
157179 }, nil
158180}
159181
@@ -190,17 +212,25 @@ func (c *Consumer) Register(stream string, fn ConsumerFunc) {
190212// creating the consumer group in Redis. Run will block until Shutdown is called
191213// and all of the in-flight messages have been processed.
192214func (c * Consumer ) Run (ctx context.Context ) {
215+ runCtx , cancel := context .WithCancel (ctx )
216+ defer cancel ()
217+
218+ go func () {
219+ <- runCtx .Done ()
220+ c .Shutdown ()
221+ }()
222+
193223 if len (c .consumers ) == 0 {
194- c .Errors <- errors .New ("at least one consumer function needs to be registered" )
224+ c .sendError ( errors .New ("at least one consumer function needs to be registered" ) )
195225 return
196226 }
197227
198228 for stream , consumer := range c .consumers {
199229 c .streams = append (c .streams , stream )
200- err := c .redis .XGroupCreateMkStream (ctx , stream , c .options .GroupName , consumer .id ).Err ()
230+ err := c .redis .XGroupCreateMkStream (runCtx , stream , c .options .GroupName , consumer .id ).Err ()
201231 // ignoring the BUSYGROUP error makes this a noop
202232 if err != nil && err .Error () != "BUSYGROUP Consumer Group name already exists" {
203- c .Errors <- errors .Wrap (err , "error creating consumer group" )
233+ c .sendError ( errors .Wrap (err , "error creating consumer group" ) )
204234 return
205235 }
206236 }
@@ -209,8 +239,8 @@ func (c *Consumer) Run(ctx context.Context) {
209239 c .streams = append (c .streams , ">" )
210240 }
211241
212- go c .reclaim (ctx )
213- go c .poll (ctx )
242+ go c .reclaim (runCtx )
243+ go c .poll (runCtx )
214244
215245 stop := newSignalHandler ()
216246 go func () {
@@ -221,7 +251,7 @@ func (c *Consumer) Run(ctx context.Context) {
221251 c .wg .Add (c .options .Concurrency )
222252
223253 for i := 0 ; i < c .options .Concurrency ; i ++ {
224- go c .work (ctx )
254+ go c .work (runCtx )
225255 }
226256
227257 c .wg .Wait ()
@@ -232,10 +262,12 @@ func (c *Consumer) Run(ctx context.Context) {
232262// The order that things stop is 1) the reclaim process (if it's running), 2)
233263// the polling process, and 3) the worker processes.
234264func (c * Consumer ) Shutdown () {
235- c .stopReclaim <- struct {}{}
236- if c .options .VisibilityTimeout == 0 {
237- c .stopPoll <- struct {}{}
238- }
265+ c .shutdownOnce .Do (func () {
266+ c .stopReclaim <- struct {}{}
267+ if c .options .VisibilityTimeout == 0 {
268+ c .signalPollStop ()
269+ }
270+ })
239271}
240272
241273// reclaim runs in a separate goroutine and checks the list of pending messages
@@ -250,28 +282,41 @@ func (c *Consumer) reclaim(ctx context.Context) {
250282 }
251283
252284 ticker := time .NewTicker (c .options .ReclaimInterval )
285+ defer ticker .Stop ()
253286
254287 for {
255288 select {
256289 case <- c .stopReclaim :
257290 // once the reclaim process has stopped, stop the polling process
258- c .stopPoll <- struct {}{}
291+ c .signalPollStop ()
292+ return
293+ case <- ctx .Done ():
294+ c .signalPollStop ()
259295 return
260296 case <- ticker .C :
261297 for stream := range c .consumers {
262298 start := "-"
263299 end := "+"
264300
265- for {
301+ for {
302+ available := c .availableCapacity ()
303+ if available <= 0 {
304+ break
305+ }
306+
266307 res , err := c .redis .XPendingExt (ctx , & redis.XPendingExtArgs {
267308 Stream : stream ,
268309 Group : c .options .GroupName ,
269310 Start : start ,
270311 End : end ,
271- Count : int64 (c . options . BufferSize - len ( c . queue ) ),
312+ Count : int64 (available ),
272313 }).Result ()
273314 if err != nil && err != redis .Nil {
274- c .Errors <- errors .Wrap (err , "error listing pending messages" )
315+ if ctx .Err () != nil {
316+ c .signalPollStop ()
317+ return
318+ }
319+ c .sendError (errors .Wrap (err , "error listing pending messages" ))
275320 break
276321 }
277322
@@ -285,7 +330,7 @@ func (c *Consumer) reclaim(ctx context.Context) {
285330 if c .options .MaxDeliveryCount > 0 && r .RetryCount >= c .options .MaxDeliveryCount {
286331 err = c .redis .XAck (ctx , stream , c .options .GroupName , r .ID ).Err ()
287332 if err != nil {
288- c .Errors <- errors .Wrapf (err , "error acknowledging after retry count exceeded for %q stream and %q message, " , stream , r .ID )
333+ c .sendError ( errors .Wrapf (err , "error acknowledging after retry count exceeded for %q stream and %q message, " , stream , r .ID ) )
289334 continue
290335 }
291336 }
@@ -298,7 +343,11 @@ func (c *Consumer) reclaim(ctx context.Context) {
298343 Messages : []string {r .ID },
299344 }).Result ()
300345 if err != nil && err != redis .Nil {
301- c .Errors <- errors .Wrapf (err , "error claiming %d message(s)" , len (msgs ))
346+ if ctx .Err () != nil {
347+ c .signalPollStop ()
348+ return
349+ }
350+ c .sendError (errors .Wrapf (err , "error claiming %d message(s)" , len (msgs )))
302351 break
303352 }
304353 // If the Redis nil error is returned, it means that
@@ -311,19 +360,23 @@ func (c *Consumer) reclaim(ctx context.Context) {
311360 // exists, the only way we can get it out of the
312361 // pending state is to acknowledge it.
313362 if err == redis .Nil {
314- err = c .redis .XAck (ctx , stream , c .options .GroupName , r .ID ).Err ()
315- if err != nil {
316- c .Errors <- errors .Wrapf (err , "error acknowledging after failed claim for %q stream and %q message" , stream , r .ID )
317- continue
363+ err = c .redis .XAck (ctx , stream , c .options .GroupName , r .ID ).Err ()
364+ if err != nil {
365+ if ctx .Err () != nil {
366+ c .signalPollStop ()
367+ return
368+ }
369+ c .sendError (errors .Wrapf (err , "error acknowledging after failed claim for %q stream and %q message" , stream , r .ID ))
370+ continue
371+ }
318372 }
319- }
320373 c .enqueue (stream , claimres )
321374 }
322375 }
323376
324377 newID , err := incrementMessageID (res [len (res )- 1 ].ID )
325378 if err != nil {
326- c .Errors <- err
379+ c .sendError ( err )
327380 break
328381 }
329382
@@ -342,28 +395,42 @@ func (c *Consumer) poll(ctx context.Context) {
342395 for {
343396 select {
344397 case <- c .stopPoll :
345- // once the polling has stopped (i.e. there will be no more messages
346- // put onto c.queue), stop all of the workers
347- for range c .options .Concurrency {
348- c .stopWorkers <- struct {}{}
349- }
398+ // once polling has stopped (i.e. there will be no more messages
399+ // put onto c.queue), close the queue so workers can drain and exit
400+ c .closeQueue ()
401+ return
402+ case <- ctx .Done ():
403+ c .Shutdown ()
404+ <- c .stopPoll
405+ c .closeQueue ()
350406 return
351407 default :
408+ available := c .availableCapacity ()
409+ if available <= 0 {
410+ time .Sleep (10 * time .Millisecond )
411+ continue
412+ }
352413 res , err := c .redis .XReadGroup (ctx , & redis.XReadGroupArgs {
353414 Group : c .options .GroupName ,
354415 Consumer : c .options .Name ,
355416 Streams : c .streams ,
356- Count : int64 (c . options . BufferSize - len ( c . queue ) ),
417+ Count : int64 (available ),
357418 Block : c .options .BlockingTimeout ,
358419 }).Result ()
359420 if err != nil {
421+ if ctx .Err () != nil {
422+ c .Shutdown ()
423+ <- c .stopPoll
424+ c .closeQueue ()
425+ return
426+ }
360427 if err , ok := err .(net.Error ); ok && err .Timeout () {
361428 continue
362429 }
363430 if err == redis .Nil {
364431 continue
365432 }
366- c .Errors <- errors .Wrap (err , "error reading redis stream" )
433+ c .sendError ( errors .Wrap (err , "error reading redis stream" ) )
367434 continue
368435 }
369436
@@ -395,22 +462,41 @@ func (c *Consumer) enqueue(stream string, msgs []redis.XMessage) {
395462func (c * Consumer ) work (ctx context.Context ) {
396463 defer c .wg .Done ()
397464
398- for {
399- select {
400- case msg := <- c .queue :
401- err := c .process (msg )
402- if err != nil {
403- c .Errors <- errors .Wrapf (err , "error calling ConsumerFunc for %q stream and %q message" , msg .Stream , msg .ID )
404- continue
405- }
406- err = c .redis .XAck (ctx , msg .Stream , c .options .GroupName , msg .ID ).Err ()
407- if err != nil {
408- c .Errors <- errors .Wrapf (err , "error acknowledging after success for %q stream and %q message" , msg .Stream , msg .ID )
409- continue
410- }
411- case <- c .stopWorkers :
412- return
465+ for msg := range c .queue {
466+ err := c .process (msg )
467+ if err != nil {
468+ c .sendError (errors .Wrapf (err , "error calling ConsumerFunc for %q stream and %q message" , msg .Stream , msg .ID ))
469+ continue
413470 }
471+ err = c .redis .XAck (ctx , msg .Stream , c .options .GroupName , msg .ID ).Err ()
472+ if err != nil {
473+ c .sendError (errors .Wrapf (err , "error acknowledging after success for %q stream and %q message" , msg .Stream , msg .ID ))
474+ continue
475+ }
476+ }
477+ }
478+
479+ func (c * Consumer ) closeQueue () {
480+ c .closeQueueOnce .Do (func () {
481+ close (c .queue )
482+ })
483+ }
484+
485+ func (c * Consumer ) signalPollStop () {
486+ select {
487+ case c .stopPoll <- struct {}{}:
488+ default :
489+ }
490+ }
491+
492+ func (c * Consumer ) availableCapacity () int {
493+ return cap (c .queue ) - len (c .queue )
494+ }
495+
496+ func (c * Consumer ) sendError (err error ) {
497+ select {
498+ case c .Errors <- err :
499+ default :
414500 }
415501}
416502
0 commit comments