Skip to content

Commit 1803558

Browse files
committed
update to go-redis/v9
1 parent 195b427 commit 1803558

9 files changed

Lines changed: 221 additions & 229 deletions

File tree

consumer.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package redisqueue
22

33
import (
4+
"context"
45
"net"
56
"os"
67
"sync"
78
"time"
89

9-
"github.com/go-redis/redis/v7"
1010
"github.com/pkg/errors"
11+
"github.com/redis/go-redis/v9"
1112
)
1213

1314
// ConsumerFunc is a type alias for the functions that will be used to handle
@@ -58,7 +59,7 @@ type ConsumerOptions struct {
5859
RedisClient redis.UniversalClient
5960
// RedisOptions allows you to configure the underlying Redis connection.
6061
// More info here:
61-
// https://pkg.go.dev/github.com/go-redis/redis/v7?tab=doc#Options.
62+
// https://pkg.go.dev/github.com/redis/go-redis/v9#Options.
6263
//
6364
// This field is used if RedisClient field is nil.
6465
RedisOptions *RedisOptions
@@ -99,14 +100,14 @@ var defaultConsumerOptions = &ConsumerOptions{
99100
// BufferSize to 100, and Concurrency to 10. In most production environments,
100101
// you'll want to use NewConsumerWithOptions.
101102
func NewConsumer() (*Consumer, error) {
102-
return NewConsumerWithOptions(defaultConsumerOptions)
103+
return NewConsumerWithOptions(context.Background(), defaultConsumerOptions)
103104
}
104105

105106
// NewConsumerWithOptions creates a Consumer with custom ConsumerOptions. If
106107
// Name is left empty, it defaults to the hostname; if GroupName is left empty,
107108
// it defaults to "redisqueue"; if BlockingTimeout is 0, it defaults to 5
108109
// seconds; if ReclaimInterval is 0, it defaults to 1 second.
109-
func NewConsumerWithOptions(options *ConsumerOptions) (*Consumer, error) {
110+
func NewConsumerWithOptions(ctx context.Context, options *ConsumerOptions) (*Consumer, error) {
110111
hostname, _ := os.Hostname()
111112

112113
if options.Name == "" {
@@ -130,7 +131,7 @@ func NewConsumerWithOptions(options *ConsumerOptions) (*Consumer, error) {
130131
r = newRedisClient(options.RedisOptions)
131132
}
132133

133-
if err := redisPreflightChecks(r); err != nil {
134+
if err := redisPreflightChecks(ctx, r); err != nil {
134135
return nil, err
135136
}
136137

@@ -182,15 +183,15 @@ func (c *Consumer) Register(stream string, fn ConsumerFunc) {
182183
// Run will terminate early. The same will happen if an error occurs when
183184
// creating the consumer group in Redis. Run will block until Shutdown is called
184185
// and all of the in-flight messages have been processed.
185-
func (c *Consumer) Run() {
186+
func (c *Consumer) Run(ctx context.Context) {
186187
if len(c.consumers) == 0 {
187188
c.Errors <- errors.New("at least one consumer function needs to be registered")
188189
return
189190
}
190191

191192
for stream, consumer := range c.consumers {
192193
c.streams = append(c.streams, stream)
193-
err := c.redis.XGroupCreateMkStream(stream, c.options.GroupName, consumer.id).Err()
194+
err := c.redis.XGroupCreateMkStream(ctx, stream, c.options.GroupName, consumer.id).Err()
194195
// ignoring the BUSYGROUP error makes this a noop
195196
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
196197
c.Errors <- errors.Wrap(err, "error creating consumer group")
@@ -202,8 +203,8 @@ func (c *Consumer) Run() {
202203
c.streams = append(c.streams, ">")
203204
}
204205

205-
go c.reclaim()
206-
go c.poll()
206+
go c.reclaim(ctx)
207+
go c.poll(ctx)
207208

208209
stop := newSignalHandler()
209210
go func() {
@@ -214,7 +215,7 @@ func (c *Consumer) Run() {
214215
c.wg.Add(c.options.Concurrency)
215216

216217
for i := 0; i < c.options.Concurrency; i++ {
217-
go c.work()
218+
go c.work(ctx)
218219
}
219220

220221
c.wg.Wait()
@@ -237,7 +238,7 @@ func (c *Consumer) Shutdown() {
237238
// If VisibilityTimeout is 0, this function returns early and no messages are
238239
// reclaimed. It checks the list of pending messages according to
239240
// ReclaimInterval.
240-
func (c *Consumer) reclaim() {
241+
func (c *Consumer) reclaim(ctx context.Context) {
241242
if c.options.VisibilityTimeout == 0 {
242243
return
243244
}
@@ -256,7 +257,7 @@ func (c *Consumer) reclaim() {
256257
end := "+"
257258

258259
for {
259-
res, err := c.redis.XPendingExt(&redis.XPendingExtArgs{
260+
res, err := c.redis.XPendingExt(ctx, &redis.XPendingExtArgs{
260261
Stream: stream,
261262
Group: c.options.GroupName,
262263
Start: start,
@@ -276,7 +277,7 @@ func (c *Consumer) reclaim() {
276277

277278
for _, r := range res {
278279
if r.Idle >= c.options.VisibilityTimeout {
279-
claimres, err := c.redis.XClaim(&redis.XClaimArgs{
280+
claimres, err := c.redis.XClaim(ctx, &redis.XClaimArgs{
280281
Stream: stream,
281282
Group: c.options.GroupName,
282283
Consumer: c.options.Name,
@@ -297,7 +298,7 @@ func (c *Consumer) reclaim() {
297298
// exists, the only way we can get it out of the
298299
// pending state is to acknowledge it.
299300
if err == redis.Nil {
300-
err = c.redis.XAck(stream, c.options.GroupName, r.ID).Err()
301+
err = c.redis.XAck(ctx, stream, c.options.GroupName, r.ID).Err()
301302
if err != nil {
302303
c.Errors <- errors.Wrapf(err, "error acknowledging after failed claim for %q stream and %q message", stream, r.ID)
303304
continue
@@ -324,7 +325,7 @@ func (c *Consumer) reclaim() {
324325
// messages for this consumer to process. It blocks for up to 5 seconds instead
325326
// of blocking indefinitely so that it can periodically check to see if Shutdown
326327
// was called.
327-
func (c *Consumer) poll() {
328+
func (c *Consumer) poll(ctx context.Context) {
328329
for {
329330
select {
330331
case <-c.stopPoll:
@@ -335,7 +336,7 @@ func (c *Consumer) poll() {
335336
}
336337
return
337338
default:
338-
res, err := c.redis.XReadGroup(&redis.XReadGroupArgs{
339+
res, err := c.redis.XReadGroup(ctx, &redis.XReadGroupArgs{
339340
Group: c.options.GroupName,
340341
Consumer: c.options.Name,
341342
Streams: c.streams,
@@ -378,7 +379,7 @@ func (c *Consumer) enqueue(stream string, msgs []redis.XMessage) {
378379
// channel, it calls the corrensponding ConsumerFunc depending on the stream it
379380
// came from. If no error is returned from the ConsumerFunc, the message is
380381
// acknowledged in Redis.
381-
func (c *Consumer) work() {
382+
func (c *Consumer) work(ctx context.Context) {
382383
defer c.wg.Done()
383384

384385
for {
@@ -389,7 +390,7 @@ func (c *Consumer) work() {
389390
c.Errors <- errors.Wrapf(err, "error calling ConsumerFunc for %q stream and %q message", msg.Stream, msg.ID)
390391
continue
391392
}
392-
err = c.redis.XAck(msg.Stream, c.options.GroupName, msg.ID).Err()
393+
err = c.redis.XAck(ctx, msg.Stream, c.options.GroupName, msg.ID).Err()
393394
if err != nil {
394395
c.Errors <- errors.Wrapf(err, "error acknowledging after success for %q stream and %q message", msg.Stream, msg.ID)
395396
continue

0 commit comments

Comments
 (0)