Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit c4ce860

Browse files
authored
Merge branch 'main' into owl-bot-copy
2 parents fb80ea8 + de322f8 commit c4ce860

16 files changed

Lines changed: 1459 additions & 306 deletions

docs/transaction-usage.rst

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ A :class:`~google.cloud.spanner_v1.transaction.Transaction` represents a
55
transaction: when the transaction commits, it will send any accumulated
66
mutations to the server.
77

8-
To understand more about how transactions work, visit [Transaction](https://cloud.google.com/spanner/docs/reference/rest/v1/Transaction).
8+
To understand more about how transactions work, visit
9+
`Transaction <https://cloud.google.com/spanner/docs/reference/rest/v1/Transaction>`_.
910
To learn more about how to use them in the Python client, continue reading.
1011

1112

@@ -90,8 +91,8 @@ any of the records already exists.
9091
Update records using a Transaction
9192
----------------------------------
9293

93-
:meth:`Transaction.update` updates one or more existing records in a table. Fails
94-
if any of the records does not already exist.
94+
:meth:`Transaction.update` updates one or more existing records in a table.
95+
Fails if any of the records does not already exist.
9596

9697
.. code:: python
9798
@@ -178,35 +179,40 @@ Using :meth:`~Database.run_in_transaction`
178179

179180
Rather than calling :meth:`~Transaction.commit` or :meth:`~Transaction.rollback`
180181
manually, you should use :meth:`~Database.run_in_transaction` to run the
181-
function that you need. The transaction's :meth:`~Transaction.commit` method
182+
function that you need. The transaction's :meth:`~Transaction.commit` method
182183
will be called automatically if the ``with`` block exits without raising an
183-
exception. The function will automatically be retried for
184+
exception. The function will automatically be retried for
184185
:class:`~google.api_core.exceptions.Aborted` errors, but will raise on
185186
:class:`~google.api_core.exceptions.GoogleAPICallError` and
186187
:meth:`~Transaction.rollback` will be called on all others.
187188

188189
.. code:: python
189190
190191
def _unit_of_work(transaction):
191-
192192
transaction.insert(
193-
'citizens', columns=['email', 'first_name', 'last_name', 'age'],
193+
'citizens',
194+
columns=['email', 'first_name', 'last_name', 'age'],
194195
values=[
195196
['phred@exammple.com', 'Phred', 'Phlyntstone', 32],
196197
['bharney@example.com', 'Bharney', 'Rhubble', 31],
197-
])
198+
]
199+
)
198200
199201
transaction.update(
200-
'citizens', columns=['email', 'age'],
202+
'citizens',
203+
columns=['email', 'age'],
201204
values=[
202205
['phred@exammple.com', 33],
203206
['bharney@example.com', 32],
204-
])
207+
]
208+
)
205209
206210
...
207211
208-
transaction.delete('citizens',
209-
keyset['bharney@example.com', 'nonesuch@example.com'])
212+
transaction.delete(
213+
'citizens',
214+
keyset=['bharney@example.com', 'nonesuch@example.com']
215+
)
210216
211217
db.run_in_transaction(_unit_of_work)
212218
@@ -242,7 +248,7 @@ If an exception is raised inside the ``with`` block, the transaction's
242248
...
243249
244250
transaction.delete('citizens',
245-
keyset['bharney@example.com', 'nonesuch@example.com'])
251+
keyset=['bharney@example.com', 'nonesuch@example.com'])
246252
247253
248254
Begin a Transaction

google/cloud/spanner_v1/batch.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -249,17 +249,28 @@ def commit(
249249
observability_options=observability_options,
250250
metadata=metadata,
251251
), MetricsCapture():
252-
method = functools.partial(
253-
api.commit,
254-
request=request,
255-
metadata=metadata,
256-
)
252+
253+
def wrapped_method(*args, **kwargs):
254+
method = functools.partial(
255+
api.commit,
256+
request=request,
257+
metadata=database.metadata_with_request_id(
258+
# This code is retried due to ABORTED, hence nth_request
259+
# should be increased. attempt can only be increased if
260+
# we encounter UNAVAILABLE or INTERNAL.
261+
getattr(database, "_next_nth_request", 0),
262+
1,
263+
metadata,
264+
),
265+
)
266+
return method(*args, **kwargs)
267+
257268
deadline = time.time() + kwargs.get(
258269
"timeout_secs", DEFAULT_RETRY_TIMEOUT_SECS
259270
)
260271
default_retry_delay = kwargs.get("default_retry_delay", None)
261272
response = _retry_on_aborted_exception(
262-
method,
273+
wrapped_method,
263274
deadline=deadline,
264275
default_retry_delay=default_retry_delay,
265276
)

google/cloud/spanner_v1/database.py

Lines changed: 86 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
from google.cloud.spanner_v1._helpers import (
5454
_metadata_with_prefix,
5555
_metadata_with_leader_aware_routing,
56+
_metadata_with_request_id,
5657
)
5758
from google.cloud.spanner_v1.batch import Batch
5859
from google.cloud.spanner_v1.batch import MutationGroups
@@ -151,6 +152,9 @@ class Database(object):
151152

152153
_spanner_api: SpannerClient = None
153154

155+
__transport_lock = threading.Lock()
156+
__transports_to_channel_id = dict()
157+
154158
def __init__(
155159
self,
156160
database_id,
@@ -188,6 +192,7 @@ def __init__(
188192
self._instance._client.default_transaction_options
189193
)
190194
self._proto_descriptors = proto_descriptors
195+
self._channel_id = 0 # It'll be created when _spanner_api is created.
191196

192197
if pool is None:
193198
pool = BurstyPool(database_role=database_role)
@@ -446,8 +451,26 @@ def spanner_api(self):
446451
client_info=client_info,
447452
client_options=client_options,
448453
)
454+
455+
with self.__transport_lock:
456+
transport = self._spanner_api._transport
457+
channel_id = self.__transports_to_channel_id.get(transport, None)
458+
if channel_id is None:
459+
channel_id = len(self.__transports_to_channel_id) + 1
460+
self.__transports_to_channel_id[transport] = channel_id
461+
self._channel_id = channel_id
462+
449463
return self._spanner_api
450464

465+
def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]):
466+
return _metadata_with_request_id(
467+
self._nth_client_id,
468+
self._channel_id,
469+
nth_request,
470+
nth_attempt,
471+
prior_metadata,
472+
)
473+
451474
def __eq__(self, other):
452475
if not isinstance(other, self.__class__):
453476
return NotImplemented
@@ -490,7 +513,10 @@ def create(self):
490513
database_dialect=self._database_dialect,
491514
proto_descriptors=self._proto_descriptors,
492515
)
493-
future = api.create_database(request=request, metadata=metadata)
516+
future = api.create_database(
517+
request=request,
518+
metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata),
519+
)
494520
return future
495521

496522
def exists(self):
@@ -506,7 +532,12 @@ def exists(self):
506532
metadata = _metadata_with_prefix(self.name)
507533

508534
try:
509-
api.get_database_ddl(database=self.name, metadata=metadata)
535+
api.get_database_ddl(
536+
database=self.name,
537+
metadata=self.metadata_with_request_id(
538+
self._next_nth_request, 1, metadata
539+
),
540+
)
510541
except NotFound:
511542
return False
512543
return True
@@ -523,10 +554,16 @@ def reload(self):
523554
"""
524555
api = self._instance._client.database_admin_api
525556
metadata = _metadata_with_prefix(self.name)
526-
response = api.get_database_ddl(database=self.name, metadata=metadata)
557+
response = api.get_database_ddl(
558+
database=self.name,
559+
metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata),
560+
)
527561
self._ddl_statements = tuple(response.statements)
528562
self._proto_descriptors = response.proto_descriptors
529-
response = api.get_database(name=self.name, metadata=metadata)
563+
response = api.get_database(
564+
name=self.name,
565+
metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata),
566+
)
530567
self._state = DatabasePB.State(response.state)
531568
self._create_time = response.create_time
532569
self._restore_info = response.restore_info
@@ -571,7 +608,10 @@ def update_ddl(self, ddl_statements, operation_id="", proto_descriptors=None):
571608
proto_descriptors=proto_descriptors,
572609
)
573610

574-
future = api.update_database_ddl(request=request, metadata=metadata)
611+
future = api.update_database_ddl(
612+
request=request,
613+
metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata),
614+
)
575615
return future
576616

577617
def update(self, fields):
@@ -609,7 +649,9 @@ def update(self, fields):
609649
metadata = _metadata_with_prefix(self.name)
610650

611651
future = api.update_database(
612-
database=database_pb, update_mask=field_mask, metadata=metadata
652+
database=database_pb,
653+
update_mask=field_mask,
654+
metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata),
613655
)
614656

615657
return future
@@ -622,7 +664,10 @@ def drop(self):
622664
"""
623665
api = self._instance._client.database_admin_api
624666
metadata = _metadata_with_prefix(self.name)
625-
api.drop_database(database=self.name, metadata=metadata)
667+
api.drop_database(
668+
database=self.name,
669+
metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata),
670+
)
626671

627672
def execute_partitioned_dml(
628673
self,
@@ -711,7 +756,13 @@ def execute_pdml():
711756
with SessionCheckout(self._pool) as session:
712757
add_span_event(span, "Starting BeginTransaction")
713758
txn = api.begin_transaction(
714-
session=session.name, options=txn_options, metadata=metadata
759+
session=session.name,
760+
options=txn_options,
761+
metadata=self.metadata_with_request_id(
762+
self._next_nth_request,
763+
1,
764+
metadata,
765+
),
715766
)
716767

717768
txn_selector = TransactionSelector(id=txn.id)
@@ -724,6 +775,7 @@ def execute_pdml():
724775
query_options=query_options,
725776
request_options=request_options,
726777
)
778+
727779
method = functools.partial(
728780
api.execute_streaming_sql,
729781
metadata=metadata,
@@ -736,6 +788,7 @@ def execute_pdml():
736788
metadata=metadata,
737789
transaction_selector=txn_selector,
738790
observability_options=self.observability_options,
791+
request_id_manager=self,
739792
)
740793

741794
result_set = StreamedResultSet(iterator)
@@ -745,6 +798,18 @@ def execute_pdml():
745798

746799
return _retry_on_aborted(execute_pdml, DEFAULT_RETRY_BACKOFF)()
747800

801+
@property
802+
def _next_nth_request(self):
803+
if self._instance and self._instance._client:
804+
return self._instance._client._next_nth_request
805+
return 1
806+
807+
@property
808+
def _nth_client_id(self):
809+
if self._instance and self._instance._client:
810+
return self._instance._client._nth_client_id
811+
return 0
812+
748813
def session(self, labels=None, database_role=None):
749814
"""Factory to create a session for this database.
750815
@@ -965,7 +1030,7 @@ def restore(self, source):
9651030
)
9661031
future = api.restore_database(
9671032
request=request,
968-
metadata=metadata,
1033+
metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata),
9691034
)
9701035
return future
9711036

@@ -1034,7 +1099,10 @@ def list_database_roles(self, page_size=None):
10341099
parent=self.name,
10351100
page_size=page_size,
10361101
)
1037-
return api.list_database_roles(request=request, metadata=metadata)
1102+
return api.list_database_roles(
1103+
request=request,
1104+
metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata),
1105+
)
10381106

10391107
def table(self, table_id):
10401108
"""Factory to create a table object within this database.
@@ -1118,7 +1186,10 @@ def get_iam_policy(self, policy_version=None):
11181186
requested_policy_version=policy_version
11191187
),
11201188
)
1121-
response = api.get_iam_policy(request=request, metadata=metadata)
1189+
response = api.get_iam_policy(
1190+
request=request,
1191+
metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata),
1192+
)
11221193
return response
11231194

11241195
def set_iam_policy(self, policy):
@@ -1140,7 +1211,10 @@ def set_iam_policy(self, policy):
11401211
resource=self.name,
11411212
policy=policy,
11421213
)
1143-
response = api.set_iam_policy(request=request, metadata=metadata)
1214+
response = api.set_iam_policy(
1215+
request=request,
1216+
metadata=self.metadata_with_request_id(self._next_nth_request, 1, metadata),
1217+
)
11441218
return response
11451219

11461220
@property

google/cloud/spanner_v1/pool.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,9 @@ def bind(self, database):
256256
)
257257
resp = api.batch_create_sessions(
258258
request=request,
259-
metadata=metadata,
259+
metadata=database.metadata_with_request_id(
260+
database._next_nth_request, 1, metadata
261+
),
260262
)
261263

262264
add_span_event(
@@ -561,7 +563,9 @@ def bind(self, database):
561563
while returned_session_count < self.size:
562564
resp = api.batch_create_sessions(
563565
request=request,
564-
metadata=metadata,
566+
metadata=database.metadata_with_request_id(
567+
database._next_nth_request, 1, metadata
568+
),
565569
)
566570

567571
add_span_event(

0 commit comments

Comments
 (0)