Skip to content
This repository was archived by the owner on Feb 23, 2026. It is now read-only.

Commit 2065df0

Browse files
committed
fix: allow exceptions to be surfaced to the caller of BackgroundConsumer
1 parent 8a9a7d8 commit 2065df0

2 files changed

Lines changed: 28 additions & 16 deletions

File tree

google/api_core/bidi.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -624,12 +624,17 @@ def on_response(response):
624624
``open()``ed yet.
625625
on_response (Callable[[protobuf.Message], None]): The callback to
626626
be called for every response on the stream.
627+
reraise_exceptions (bool): Whether to reraise exceptions during
628+
the lifetime of the consumer, generally those that are not
629+
handled by `BidiRpc`'s `should_recover` or `should_terminate`.
630+
Default `False`.
627631
"""
628632

629-
def __init__(self, bidi_rpc, on_response):
633+
def __init__(self, bidi_rpc, on_response, reraise_exceptions=False):
630634
self._bidi_rpc = bidi_rpc
631635
self._on_response = on_response
632636
self._paused = False
637+
self._reraise_exceptions = reraise_exceptions
633638
self._wake = threading.Condition()
634639
self._thread = None
635640
self._operational_lock = threading.Lock()
@@ -676,15 +681,17 @@ def _thread_main(self, ready):
676681
exc,
677682
exc_info=True,
678683
)
679-
self.stop()
684+
if self._reraise_exceptions:
685+
raise
680686

681687
except Exception as exc:
682688
_LOGGER.exception(
683689
"%s caught unexpected exception %s and will exit.",
684690
_BIDIRECTIONAL_CONSUMER_NAME,
685691
exc,
686692
)
687-
self.stop()
693+
if self._reraise_exceptions:
694+
raise
688695

689696
_LOGGER.info("%s exiting", _BIDIRECTIONAL_CONSUMER_NAME)
690697

@@ -696,8 +703,8 @@ def start(self):
696703
name=_BIDIRECTIONAL_CONSUMER_NAME,
697704
target=self._thread_main,
698705
args=(ready,),
706+
daemon=True,
699707
)
700-
thread.daemon = True
701708
thread.start()
702709
# Other parts of the code rely on `thread.is_alive` which
703710
# isn't sufficient to know if a thread is active, just that it may

tests/unit/test_bidi.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -922,22 +922,27 @@ def test_stop_error_logs(self, caplog):
922922
def test_fatal_exceptions_will_shutdown_consumer(self, caplog):
923923
"""
924924
https://github.com/googleapis/python-api-core/issues/820
925-
Exceptions thrown in the BackgroundConsumer that
926-
lead to the consumer halting should also stop the thread and rpc.
925+
Exceptions thrown in the BackgroundConsumer not caught by `should_recover` / `should_terminate`
926+
on the RPC should be bubbled back to the caller if `reraise_exceptions` is `True`.
927927
"""
928928
caplog.set_level(logging.DEBUG)
929-
bidi_rpc = mock.create_autospec(bidi.ResumableBidiRpc, instance=True)
930-
bidi_rpc.is_active = True
931-
on_response = mock.Mock(spec=["__call__"])
932929

933-
bidi_rpc.open.side_effect = ValueError()
930+
for fatal_exception in (
931+
ValueError("some non-api error"),
932+
exceptions.PermissionDenied("some api error"),
933+
):
934+
bidi_rpc = mock.create_autospec(bidi.ResumableBidiRpc, instance=True)
935+
bidi_rpc.is_active = True
936+
on_response = mock.Mock(spec=["__call__"])
934937

935-
consumer = bidi.BackgroundConsumer(bidi_rpc, on_response)
938+
bidi_rpc.open.side_effect = fatal_exception
936939

937-
consumer.start()
940+
consumer = bidi.BackgroundConsumer(
941+
bidi_rpc, on_response, reraise_exceptions=True
942+
)
938943

939-
# let the background thread run for a while before exiting
940-
time.sleep(0.1)
944+
with pytest.raises(type(fatal_exception)):
945+
consumer.start()
941946

942-
# We want to make sure that close is called, which will surface the error to the caller.
943-
bidi_rpc.close.assert_called_once()
947+
# let the background thread run for a while before exiting
948+
time.sleep(0.1)

0 commit comments

Comments
 (0)