File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -165,6 +165,7 @@ type PulsarConsumerSource struct {
165165 PulsarSubscription string
166166 PulsarReplicateState bool
167167 PulsarMaxReconnect * uint
168+ ConsumerName string
168169
169170 client pulsar.Client
170171 consumer pulsar.Consumer
@@ -173,19 +174,25 @@ type PulsarConsumerSource struct {
173174 ackTrackers * ackTrackers
174175}
175176
176- func (p * PulsarConsumerSource ) Capture (cp cursor.Checkpoint ) (changes chan Change , err error ) {
177- host , err := os .Hostname ()
177+ func (p * PulsarConsumerSource ) getConsumerName () (string , error ) {
178+ if p .ConsumerName != "" {
179+ return p .ConsumerName , nil
180+ }
181+ hostname , err := os .Hostname ()
178182 if err != nil {
179- return nil , err
183+ return "" , err
180184 }
185+ return hostname , nil
186+ }
181187
182- p .client , err = pulsar .NewClient (p .PulsarOption )
188+ func (p * PulsarConsumerSource ) Capture (cp cursor.Checkpoint ) (changes chan Change , err error ) {
189+ consumerName , err := p .getConsumerName ()
183190 if err != nil {
184191 return nil , err
185192 }
186193
187194 p .consumer , err = p .client .Subscribe (pulsar.ConsumerOptions {
188- Name : host ,
195+ Name : consumerName ,
189196 Topic : p .PulsarTopic ,
190197 SubscriptionName : p .PulsarSubscription ,
191198 ReplicateSubscriptionState : p .PulsarReplicateState ,
Original file line number Diff line number Diff line change @@ -2,6 +2,7 @@ package source
22
33import (
44 "context"
5+ "os"
56 "strconv"
67 "testing"
78 "time"
@@ -250,3 +251,35 @@ func TestPulsarConsumerSource(t *testing.T) {
250251 }
251252 src .Stop ()
252253}
254+
255+ func TestPulsarConsumerSourceGetConsumerName (t * testing.T ) {
256+ // test with custom consumer name set
257+ src := & PulsarConsumerSource {
258+ ConsumerName : "custom-test-consumer" ,
259+ }
260+
261+ name , err := src .getConsumerName ()
262+ if err != nil {
263+ t .Fatalf ("unexpected error: %v" , err )
264+ }
265+ if name != "custom-test-consumer" {
266+ t .Fatalf ("expected consumer name 'custom-test-consumer', got '%s'" , name )
267+ }
268+
269+ // test with empty consumer name (should fall back to hostname)
270+ src2 := & PulsarConsumerSource {}
271+
272+ name2 , err := src2 .getConsumerName ()
273+ if err != nil {
274+ t .Fatalf ("unexpected error: %v" , err )
275+ }
276+ if name2 == "" {
277+ t .Fatal ("expected non-empty hostname, got empty string" )
278+ }
279+
280+ // verify that hostname fallback returns a non-empty string
281+ hostname , _ := os .Hostname ()
282+ if name2 != hostname {
283+ t .Fatalf ("expected hostname '%s', got '%s'" , hostname , name2 )
284+ }
285+ }
You can’t perform that action at this time.
0 commit comments