Skip to content

Commit a49d066

Browse files
aandreassacy-yunyoshi-automation
authored
feat(pubsub): Support adhoc debug logging (#32404)
Co-authored-by: Charlotte Yun <yejiyun@google.com> Co-authored-by: Yoshi Automation Bot <yoshi-automation@google.com>
1 parent 17a2ecf commit a49d066

19 files changed

Lines changed: 362 additions & 57 deletions

File tree

google-cloud-pubsub/README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,43 @@ module GRPC
9292
end
9393
```
9494

95+
### Enabling library level logging
96+
97+
This library includes an opt-in logging mechanism that provides detailed information about high-level operations. These logs are useful for troubleshooting and monitoring the client's behavior. When enabled, logs are tagged with subtags to indicate the operation type.
98+
99+
The following subtags are used:
100+
101+
* `callback-delivery`: Logs when a message is delivered to the user-provided callback.
102+
* `callback-exceptions`: Logs any exceptions raised from the user callback.
103+
* `ack-nack`: Logs when a message is acknowledged (`ack`) or negatively acknowledged (`nack`).
104+
* `ack-batch`: Logs the reason and size of acknowledgement batches sent to the server.
105+
* `publish-batch`: Logs the reason and size of message batches sent to the server for publishing.
106+
* `expiry`: Logs when a message's lease expires and it is dropped from client-side lease management.
107+
* `subscriber-streams`: Logs key events in the subscriber's streaming connection, such as opening, closing, and errors.
108+
* `subscriber-flow-control`: Logs when the subscriber's client-side flow control is paused or resumed.
109+
110+
**WARNING:** These logs may contain message data in plaintext, which could include sensitive information. Ensure you are practicing good data hygiene with your application logs. It is recommended to enable this logging only for debugging purposes and not permanently in production.
111+
112+
To enable these debug logs, you must provide a logger with the `progname` set to `"pubsub"`.
113+
114+
```ruby
115+
require "google/cloud/pubsub"
116+
require "logger"
117+
118+
# Create a logger and set the progname to "pubsub" to enable library-level logging
119+
logger = Logger.new($stdout)
120+
logger.progname = "pubsub"
121+
122+
# Configure the logger globally
123+
Google::Cloud.configure.pubsub.logger = logger
124+
125+
# Or provide it directly to the client
126+
pubsub = Google::Cloud::PubSub.new logger: logger
127+
```
128+
129+
If the logger's `progname` is not set to `"pubsub"`, these debug logs will be suppressed, even if the logger is provided.
130+
131+
95132
## Supported Ruby Versions
96133

97134
This library is supported on Ruby 3.1+.

google-cloud-pubsub/lib/google-cloud-pubsub.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
require "google/cloud" unless defined? Google::Cloud.new
2424
require "google/cloud/config"
2525
require "googleauth"
26+
require "logger"
2627

2728
module Google
2829
module Cloud
@@ -142,6 +143,8 @@ def self.pubsub project_id = nil,
142143
"https://www.googleapis.com/auth/pubsub"
143144
]
144145

146+
default_logger = Logger.new $stdout
147+
145148
config.add_field! :project_id, default_project, match: String, allow_nil: true
146149
config.add_alias! :project, :project_id
147150
config.add_field! :credentials, default_creds, match: [String, Hash, Google::Auth::Credentials], allow_nil: true
@@ -153,4 +156,5 @@ def self.pubsub project_id = nil,
153156
config.add_field! :on_error, nil, match: Proc
154157
config.add_field! :endpoint, nil, match: String
155158
config.add_field! :universe_domain, nil, match: String
159+
config.add_field! :logger, default_logger, match: Logger, allow_nil: true
156160
end

google-cloud-pubsub/lib/google/cloud/pubsub.rb

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515

1616
require "google-cloud-pubsub"
17+
require "google/cloud/pubsub/internal_logger"
1718
require "google/cloud/pubsub/project"
1819
require "google/cloud/config"
1920
require "google/cloud/env"
@@ -70,11 +71,16 @@ module PubSub
7071
#
7172
# * `https://www.googleapis.com/auth/pubsub`
7273
# @param [Numeric] timeout Default timeout to use in requests. Optional.
74+
# @param [String] universe_domain A custom universe domain. Optional.
7375
# @param [String] endpoint Override of the endpoint host name. Optional.
7476
# If the param is nil, uses the default endpoint.
7577
# @param [String] emulator_host Pub/Sub emulator host. Optional.
7678
# If the param is nil, uses the value of the `emulator_host` config.
77-
# @param universe_domain [String] A custom universe domain. Optional.
79+
# @param [Logger] logger Optional Logger instance for emitting
80+
# library-level debug logs. If not provided, it will default to
81+
# configure.logger, which defaults to Logger.new STDOUT if not set. To
82+
# enable logging, set environment variable GOOGLE_SDK_RUBY_LOGGING_GEMS
83+
# to "all" or a comma separated list of gem names, including "pubsub".
7884
#
7985
# @return [Google::Cloud::PubSub::Project]
8086
#
@@ -92,13 +98,15 @@ def self.new project_id: nil,
9298
timeout: nil,
9399
universe_domain: nil,
94100
endpoint: nil,
95-
emulator_host: nil
101+
emulator_host: nil,
102+
logger: nil
96103
project_id ||= default_project_id
97104
scope ||= configure.scope
98105
timeout ||= configure.timeout
99106
endpoint ||= configure.endpoint
100107
universe_domain ||= configure.universe_domain
101108
emulator_host ||= configure.emulator_host
109+
logger ||= configure.logger
102110

103111
if emulator_host
104112
credentials = :this_channel_is_insecure
@@ -114,10 +122,12 @@ def self.new project_id: nil,
114122
project_id = project_id.to_s # Always cast to a string
115123
raise ArgumentError, "project_id is missing" if project_id.empty?
116124

125+
logger = Google::Cloud::PubSub::InternalLogger.new logger
117126
service = PubSub::Service.new project_id, credentials,
118127
host: endpoint,
119128
timeout: timeout,
120-
universe_domain: universe_domain
129+
universe_domain: universe_domain,
130+
logger: logger
121131
PubSub::Project.new service
122132
end
123133

google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ def publish data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &cal
157157
end
158158
batch_action = batch.add msg, callback
159159
if batch_action == :full
160-
publish_batches!
160+
publish_batches! reason: "batch full"
161161
elsif @published_at.nil?
162162
# Set initial time to now to start the background counter
163163
@published_at = Time.now
@@ -180,7 +180,7 @@ def stop
180180
break if @stopped
181181

182182
@stopped = true
183-
publish_batches! stop: true
183+
publish_batches! stop: true, reason: "shutdown"
184184
@cond.signal
185185
@publish_thread_pool.shutdown
186186
end
@@ -234,7 +234,7 @@ def stop! timeout = nil
234234
# @return [AsyncPublisher] returns self so calls can be chained.
235235
def flush
236236
synchronize do
237-
publish_batches!
237+
publish_batches! reason: "manual flush"
238238
@cond.signal
239239
end
240240

@@ -313,7 +313,7 @@ def run_background
313313
time_since_first_publish = Time.now - @published_at
314314
if time_since_first_publish > @interval
315315
# interval met, flush the batches...
316-
publish_batches!
316+
publish_batches! reason: "interval timeout"
317317
@cond.wait
318318
else
319319
# still waiting for the interval to publish the batch...
@@ -347,28 +347,28 @@ def stop_publish ordering_key, err
347347
end
348348
end
349349

350-
def publish_batches! stop: nil
350+
def publish_batches! stop: nil, reason: "unknown"
351351
@batches.reject! { |_ordering_key, batch| batch.empty? }
352352
@batches.each_value do |batch|
353353
ready = batch.publish! stop: stop
354-
publish_batch_async @topic_name, batch if ready
354+
publish_batch_async @topic_name, batch, reason: reason if ready
355355
end
356356
# Set published_at to nil to wait indefinitely
357357
@published_at = nil
358358
end
359359

360-
def publish_batch_async topic_name, batch
360+
def publish_batch_async topic_name, batch, reason: "unknown"
361361
# TODO: raise unless @publish_thread_pool.running?
362362
return unless @publish_thread_pool.running?
363363

364364
Concurrent::Promises.future_on(
365-
@publish_thread_pool, topic_name, batch
366-
) { |t, b| publish_batch_sync t, b }
365+
@publish_thread_pool, topic_name, batch, reason
366+
) { |t, b, r| publish_batch_sync t, b, reason: r }
367367
end
368368

369369
# rubocop:disable Metrics/AbcSize
370370

371-
def publish_batch_sync topic_name, batch
371+
def publish_batch_sync topic_name, batch, reason: "unknown"
372372
# The only batch methods that are safe to call from the loop are
373373
# rebalance! and reset! because they are the only methods that are
374374
# synchronized.
@@ -379,6 +379,7 @@ def publish_batch_sync topic_name, batch
379379
grpc = @service.publish topic_name,
380380
items.map(&:msg),
381381
compress: compress && batch.total_message_bytes >= compression_bytes_threshold
382+
service.logger.log_batch "publish-batch", reason, "publish", items.count, items.sum(&:bytesize)
382383
items.zip Array(grpc.message_ids) do |item, id|
383384
@flow_controller.release item.bytesize
384385
next unless item.callback

google-cloud-pubsub/lib/google/cloud/pubsub/batch_publisher.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ module PubSub
3535
# end
3636
#
3737
class BatchPublisher
38+
3839
##
3940
# @private The messages to publish
4041
attr_reader :messages
@@ -117,10 +118,11 @@ def to_gcloud_messages message_ids
117118

118119
##
119120
# @private Call the publish API with arrays of data and attrs.
120-
def publish_batch_messages topic_name, service
121+
def publish_batch_messages topic_name, service, reason: "unknown"
121122
grpc = service.publish topic_name,
122123
messages,
123124
compress: compress && total_message_bytes >= compression_bytes_threshold
125+
service.logger.log_batch "publish-batch", reason, "publish", messages.count, @total_message_bytes
124126
to_gcloud_messages Array(grpc.message_ids)
125127
end
126128
end
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
require "logger"
15+
16+
require "google/cloud/config"
17+
18+
module Google
19+
module Cloud
20+
module PubSub
21+
##
22+
# @private
23+
class InternalLogger
24+
LOG_NAME = "pubsub".freeze
25+
VALID_LOG_LEVELS = [:debug, :info, :warn, :error, :fatal].freeze
26+
private_constant :VALID_LOG_LEVELS, :LOG_NAME
27+
28+
##
29+
# @private
30+
# rubocop:disable Naming/BlockForwarding
31+
def log level, subtag, &message_block
32+
return unless VALID_LOG_LEVELS.include?(level) && block_given?
33+
# Only log if the logger is explicitly tagged for 'pubsub'.
34+
return unless @logger && @logger.progname == LOG_NAME
35+
36+
@logger.public_send(level, "#{LOG_NAME}:#{subtag}", &message_block)
37+
end
38+
# rubocop:enable Naming/BlockForwarding
39+
40+
##
41+
# @private
42+
def log_batch logger_name, reason, type, num_messages, total_bytes
43+
log :info, logger_name do
44+
"#{reason} triggered #{type} batch of #{num_messages} messages, a total of #{total_bytes} bytes"
45+
end
46+
end
47+
48+
##
49+
# @private
50+
def log_ack_nack ack_ids, type
51+
ack_ids.each do |ack_id|
52+
log :info, "ack-nack" do
53+
"message (ackID #{ack_id}) #{type}"
54+
end
55+
end
56+
end
57+
58+
##
59+
# @private
60+
def log_expiry expired
61+
expired.each do |ack_id, item|
62+
log :info, "expiry" do
63+
"message (ID #{item.message_id}, ackID #{ack_id}) has been dropped from leasing due to a timeout"
64+
end
65+
end
66+
end
67+
68+
private
69+
70+
def initialize logger
71+
@logger = logger || Logger.new(nil)
72+
end
73+
end
74+
end
75+
end
76+
end

google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515

1616
require "google/cloud/pubsub/service"
17+
require "google/cloud/pubsub/subscriber"
1718
require "google/cloud/pubsub/message_listener/stream"
1819
require "google/cloud/pubsub/message_listener/timed_unary_buffer"
1920
require "monitor"

google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
1615
require "monitor"
1716

1817
module Google
@@ -22,9 +21,9 @@ class MessageListener
2221
##
2322
# @private
2423
class Inventory
25-
InventoryItem = Struct.new :bytesize, :pulled_at do
24+
InventoryItem = Struct.new :message_id, :bytesize, :pulled_at do
2625
def self.from rec_msg
27-
new rec_msg.to_proto.bytesize, Time.now
26+
new rec_msg.message.message_id, rec_msg.to_proto.bytesize, Time.now
2827
end
2928
end
3029

@@ -70,18 +69,24 @@ def add *rec_msgs
7069
def remove *ack_ids
7170
ack_ids.flatten!
7271
ack_ids.compact!
73-
return if ack_ids.empty?
72+
return {} if ack_ids.empty?
7473

74+
removed_items = {}
7575
synchronize do
76-
@inventory.delete_if { |ack_id, _| ack_ids.include? ack_id }
76+
removed, keep = @inventory.partition { |ack_id, _| ack_ids.include? ack_id }
77+
@inventory = keep.to_h
78+
removed_items = removed.to_h
7779
@wait_cond.broadcast
7880
end
81+
removed_items
7982
end
8083

8184
def remove_expired!
8285
synchronize do
8386
extension_time = Time.new - extension
84-
@inventory.delete_if { |_ack_id, item| item.pulled_at < extension_time }
87+
expired, keep = @inventory.partition { |_ack_id, item| item.pulled_at < extension_time }
88+
@inventory = keep.to_h
89+
stream.subscriber.service.logger.log_expiry expired
8590
@wait_cond.broadcast
8691
end
8792
end

0 commit comments

Comments
 (0)