@@ -14,6 +14,11 @@ import (
1414// and process Messages.
1515type ConsumerFunc func (* Message ) error
1616
17+ type registeredConsumer struct {
18+ fn ConsumerFunc
19+ id string
20+ }
21+
1722// ConsumerOptions provide options to configure the Consumer.
1823type ConsumerOptions struct {
1924 // Name sets the name of this consumer. This will be used when fetching from
@@ -62,12 +67,12 @@ type Consumer struct {
6267 // channels.
6368 Errors chan error
6469
65- options * ConsumerOptions
66- redis * redis.Client
67- funcs map [string ]ConsumerFunc
68- streams []string
69- queue chan * Message
70- wg * sync.WaitGroup
70+ options * ConsumerOptions
71+ redis * redis.Client
72+ consumers map [string ]registeredConsumer
73+ streams []string
74+ queue chan * Message
75+ wg * sync.WaitGroup
7176
7277 stopReclaim chan struct {}
7378 stopPoll chan struct {}
@@ -118,25 +123,43 @@ func NewConsumerWithOptions(options *ConsumerOptions) (*Consumer, error) {
118123 return & Consumer {
119124 Errors : make (chan error ),
120125
121- options : options ,
122- redis : r ,
123- funcs : map [string ]ConsumerFunc {} ,
124- streams : make ([]string , 0 ),
125- queue : make (chan * Message , options .BufferSize ),
126- wg : & sync.WaitGroup {},
126+ options : options ,
127+ redis : r ,
128+ consumers : make ( map [string ]registeredConsumer ) ,
129+ streams : make ([]string , 0 ),
130+ queue : make (chan * Message , options .BufferSize ),
131+ wg : & sync.WaitGroup {},
127132
128133 stopReclaim : make (chan struct {}, 1 ),
129134 stopPoll : make (chan struct {}, 1 ),
130135 stopWorkers : make (chan struct {}, options .Concurrency ),
131136 }, nil
132137}
133138
139+ // RegisterWithLastID is the same as Register, except that it also lets you
140+ // specify the oldest message to receive when first creating the consumer group.
141+ // This can be any valid message ID, "0" for all messages in the stream, or "$"
142+ // for only new messages.
143+ //
144+ // If the consumer group already exists the id field is ignored, meaning you'll
145+ // receive unprocessed messages.
146+ func (c * Consumer ) RegisterWithLastID (stream string , id string , fn ConsumerFunc ) {
147+ if len (id ) == 0 {
148+ id = "0"
149+ }
150+
151+ c .consumers [stream ] = registeredConsumer {
152+ fn : fn ,
153+ id : id ,
154+ }
155+ }
156+
134157// Register takes in a stream name and a ConsumerFunc that will be called when a
135158// message comes in from that stream. Register must be called at least once
136159// before Run is called. If the same stream name is passed in twice, the first
137160// ConsumerFunc is overwritten by the second.
138161func (c * Consumer ) Register (stream string , fn ConsumerFunc ) {
139- c .funcs [ stream ] = fn
162+ c .RegisterWithLastID ( stream , "0" , fn )
140163}
141164
142165// Run starts all of the worker goroutines and starts processing from the
@@ -146,22 +169,22 @@ func (c *Consumer) Register(stream string, fn ConsumerFunc) {
146169// creating the consumer group in Redis. Run will block until Shutdown is called
147170// and all of the in-flight messages have been processed.
148171func (c * Consumer ) Run () {
149- if len (c .funcs ) == 0 {
172+ if len (c .consumers ) == 0 {
150173 c .Errors <- errors .New ("at least one consumer function needs to be registered" )
151174 return
152175 }
153176
154- for stream := range c .funcs {
177+ for stream , consumer := range c .consumers {
155178 c .streams = append (c .streams , stream )
156- err := c .redis .XGroupCreateMkStream (stream , c .options .GroupName , "0" ).Err ()
179+ err := c .redis .XGroupCreateMkStream (stream , c .options .GroupName , consumer . id ).Err ()
157180 // ignoring the BUSYGROUP error makes this a noop
158181 if err != nil && err .Error () != "BUSYGROUP Consumer Group name already exists" {
159182 c .Errors <- errors .Wrap (err , "error creating consumer group" )
160183 return
161184 }
162185 }
163186
164- for i := 0 ; i < len (c .funcs ); i ++ {
187+ for i := 0 ; i < len (c .consumers ); i ++ {
165188 c .streams = append (c .streams , ">" )
166189 }
167190
@@ -214,7 +237,7 @@ func (c *Consumer) reclaim() {
214237 c .stopPoll <- struct {}{}
215238 return
216239 case <- ticker .C :
217- for stream := range c .funcs {
240+ for stream := range c .consumers {
218241 start := "-"
219242 end := "+"
220243
@@ -373,7 +396,6 @@ func (c *Consumer) process(msg *Message) (err error) {
373396 err = errors .Errorf ("ConsumerFunc panic: %v" , r )
374397 }
375398 }()
376- fn := c .funcs [msg .Stream ]
377- err = fn (msg )
399+ err = c .consumers [msg .Stream ].fn (msg )
378400 return
379401}
0 commit comments