44 "context"
55 "errors"
66 "net"
7+ "strings"
8+ "sync/atomic"
79 "testing"
810 "time"
911
@@ -136,50 +138,12 @@ func TestSubscribe(t *testing.T) {
136138 pb .RegisterRelayerServer (r .server , r )
137139
138140 lis := bufconn .Listen (1024 * 1024 )
139- go func () {
140- if err := r .server .Serve (lis ); err != nil {
141- t .Errorf ("Server exited with error: %v" , err )
142- }
143- }()
144141
145- defer r .server .GracefulStop ()
142+ serverDone := make (chan struct {})
143+ var serverErr atomic.Value
146144
147- ctx , cancel := context .WithCancel (context .Background ())
148- defer cancel ()
149-
150- conn , err := grpc .NewClient (
151- "passthrough:///" ,
152- grpc .WithTransportCredentials (insecure .NewCredentials ()),
153- grpc .WithContextDialer (func (ctx context.Context , s string ) (net.Conn , error ) {
154- return lis .Dial ()
155- }))
156- if err != nil {
157- t .Fatalf ("failed to dial bufconn: %v" , err )
158- }
159- defer conn .Close ()
160-
161- client := pb .NewRelayerClient (conn )
162-
163- // Expect a consumer group to be created
164145 mock .ExpectXGroupCreateMkStream (blockStreamName , "member_group:testClient" , "0" ).SetVal ("OK" )
165146
166- stream , err := client .Subscribe (ctx )
167- if err != nil {
168- t .Fatalf ("failed to call Subscribe: %v" , err )
169- }
170-
171- // Send initial subscribe request
172- err = stream .Send (& pb.ClientMessage {
173- Message : & pb.ClientMessage_SubscribeRequest {
174- SubscribeRequest : & pb.SubscribeRequest {
175- ClientId : "testClient" ,
176- },
177- },
178- })
179- if err != nil {
180- t .Fatalf ("failed to send subscribe request: %v" , err )
181- }
182-
183147 mock .ExpectXReadGroup (& redis.XReadGroupArgs {
184148 Group : "member_group:testClient" ,
185149 Consumer : "member_consumer:testClient" ,
@@ -210,6 +174,76 @@ func TestSubscribe(t *testing.T) {
210174 },
211175 })
212176
177+ ackCalled := make (chan struct {})
178+
179+ customMatch := func (expected , actual []interface {}) error {
180+ if len (actual ) >= 1 {
181+ cmdName , ok := actual [0 ].(string )
182+ if ok && strings .ToUpper (cmdName ) == "XACK" {
183+ select {
184+ case <- ackCalled :
185+ default :
186+ close (ackCalled )
187+ }
188+ }
189+ }
190+ return nil
191+ }
192+
193+ mock .CustomMatch (customMatch ).ExpectXAck (blockStreamName , "member_group:testClient" , "123-1" ).SetVal (int64 (1 ))
194+
195+ go func () {
196+ err := r .server .Serve (lis )
197+ if err != nil && err != grpc .ErrServerStopped {
198+ serverErr .Store (err )
199+ }
200+ close (serverDone )
201+ }()
202+
203+ defer func () {
204+ r .server .GracefulStop ()
205+ <- serverDone
206+ if err , ok := serverErr .Load ().(error ); ok {
207+ t .Errorf ("Server error: %v" , err )
208+ }
209+ }()
210+
211+ // Create a gRPC client
212+ ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
213+ defer cancel ()
214+
215+ conn , err := grpc .DialContext (
216+ ctx ,
217+ "passthrough:///" ,
218+ grpc .WithTransportCredentials (insecure .NewCredentials ()),
219+ grpc .WithContextDialer (func (ctx context.Context , s string ) (net.Conn , error ) {
220+ return lis .Dial ()
221+ }),
222+ )
223+ if err != nil {
224+ t .Fatalf ("failed to dial bufconn: %v" , err )
225+ }
226+ defer conn .Close ()
227+
228+ client := pb .NewRelayerClient (conn )
229+
230+ // Call Subscribe
231+ stream , err := client .Subscribe (ctx )
232+ if err != nil {
233+ t .Fatalf ("failed to call Subscribe: %v" , err )
234+ }
235+
236+ err = stream .Send (& pb.ClientMessage {
237+ Message : & pb.ClientMessage_SubscribeRequest {
238+ SubscribeRequest : & pb.SubscribeRequest {
239+ ClientId : "testClient" ,
240+ },
241+ },
242+ })
243+ if err != nil {
244+ t .Fatalf ("failed to send subscribe request: %v" , err )
245+ }
246+
213247 recvMsg , err := stream .Recv ()
214248 if err != nil {
215249 t .Fatalf ("failed to receive message from server: %v" , err )
@@ -218,8 +252,6 @@ func TestSubscribe(t *testing.T) {
218252 t .Errorf ("expected payload_123, got %s" , recvMsg .GetPayloadId ())
219253 }
220254
221- mock .ExpectXAck (blockStreamName , "member_group:testClient" , "123-1" ).SetVal (1 )
222-
223255 err = stream .Send (& pb.ClientMessage {
224256 Message : & pb.ClientMessage_AckPayload {
225257 AckPayload : & pb.AckPayloadRequest {
@@ -233,28 +265,13 @@ func TestSubscribe(t *testing.T) {
233265 t .Fatalf ("failed to send ack: %v" , err )
234266 }
235267
236- mock .ExpectXReadGroup (& redis.XReadGroupArgs {
237- Group : "member_group:testClient" ,
238- Consumer : "member_consumer:testClient" ,
239- Streams : []string {blockStreamName , "0" },
240- Count : 1 ,
241- Block : time .Second ,
242- }).SetErr (redis .Nil )
243-
244- mock .ExpectXReadGroup (& redis.XReadGroupArgs {
245- Group : "member_group:testClient" ,
246- Consumer : "member_consumer:testClient" ,
247- Streams : []string {blockStreamName , ">" },
248- Count : 1 ,
249- Block : time .Second ,
250- }).SetErr (redis .Nil )
251-
252- // Give the server some time to process these reads
253- time .Sleep (100 * time .Millisecond )
268+ select {
269+ case <- ackCalled :
270+ case <- time .After (2 * time .Second ):
271+ t .Fatalf ("timeout waiting for XAck to be called" )
272+ }
254273
255274 if err := mock .ExpectationsWereMet (); err != nil {
256275 t .Errorf ("unmet redis expectations: %v" , err )
257276 }
258-
259- cancel ()
260277}
0 commit comments