@@ -661,3 +661,53 @@ func TestStreamProducer(t *testing.T) {
661661 }
662662 }
663663}
664+
665+ func TestBulkRetrieve (t * testing.T ) {
666+ s , client , cleanup := setupTest (t )
667+ defer cleanup ()
668+
669+ // Create a topic
670+ _ , err := client .CreateTopic (context .Background (), & pb.CreateTopicRequest {Topic : "test_topic" })
671+ if err != nil {
672+ t .Fatalf ("Failed to create topic: %v" , err )
673+ }
674+
675+ // Preload messages
676+ mockDriver := s .driver .(* MockDBDriver )
677+ messages := []string {"msg1" , "msg2" , "msg3" , "msg4" }
678+ for _ , msg := range messages {
679+ if err := mockDriver .AddMessageToTopic ("test_topic" , []byte (msg )); err != nil {
680+ t .Fatalf ("Failed to preload message: %v" , err )
681+ }
682+ }
683+
684+ // Retrieve first three messages
685+ resp , err := client .BulkRetrieve (context .Background (), & pb.BulkRetrieveRequest {
686+ Topic : "test_topic" ,
687+ StartOffset : 0 ,
688+ Limit : 3 ,
689+ })
690+ if err != nil {
691+ t .Fatalf ("BulkRetrieve failed: %v" , err )
692+ }
693+
694+ if resp .Count != 3 {
695+ t .Errorf ("Expected count 3, got %d" , resp .Count )
696+ }
697+ if len (resp .Messages ) != 3 {
698+ t .Fatalf ("Expected 3 messages, got %d" , len (resp .Messages ))
699+ }
700+ for i , m := range resp .Messages {
701+ expectedMsg := messages [i ]
702+ expectedOffset := int64 (i + 1 )
703+ if string (m .Message ) != expectedMsg {
704+ t .Errorf ("Expected message %q at index %d, got %q" , expectedMsg , i , string (m .Message ))
705+ }
706+ if m .Offset != expectedOffset {
707+ t .Errorf ("Expected offset %d, got %d" , expectedOffset , m .Offset )
708+ }
709+ }
710+ if resp .NextOffset != 4 {
711+ t .Errorf ("Expected next_offset 4, got %d" , resp .NextOffset )
712+ }
713+ }
0 commit comments