Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 51533b8

Browse files
zoercailarkee
andauthored
feat: add RPC priority support (#324)
* feat: add RPC priority support * Review changes * Review changes * Update google/cloud/spanner_v1/database.py Co-authored-by: larkee <31196561+larkee@users.noreply.github.com> * Update google/cloud/spanner_v1/database.py Co-authored-by: larkee <31196561+larkee@users.noreply.github.com> * Update session.py * update import Co-authored-by: larkee <31196561+larkee@users.noreply.github.com>
1 parent c1ee8c2 commit 51533b8

12 files changed

Lines changed: 209 additions & 20 deletions

File tree

google/cloud/spanner_v1/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from .types.query_plan import PlanNode
2929
from .types.query_plan import QueryPlan
3030
from .types.result_set import PartialResultSet
31+
from .types import RequestOptions
3132
from .types.result_set import ResultSet
3233
from .types.result_set import ResultSetMetadata
3334
from .types.result_set import ResultSetStats
@@ -119,6 +120,7 @@
119120
"PlanNode",
120121
"QueryPlan",
121122
"ReadRequest",
123+
"RequestOptions",
122124
"ResultSet",
123125
"ResultSetMetadata",
124126
"ResultSetStats",

google/cloud/spanner_v1/batch.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from google.cloud.spanner_v1._helpers import _make_list_value_pbs
2424
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
2525
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
26+
from google.cloud.spanner_v1 import RequestOptions
2627

2728
# pylint: enable=ungrouped-imports
2829

@@ -138,13 +139,20 @@ def _check_state(self):
138139
if self.committed is not None:
139140
raise ValueError("Batch already committed")
140141

141-
def commit(self, return_commit_stats=False):
142+
def commit(self, return_commit_stats=False, request_options=None):
142143
"""Commit mutations to the database.
143144
144145
:type return_commit_stats: bool
145146
:param return_commit_stats:
146147
If true, the response will return commit stats which can be accessed though commit_stats.
147148
149+
:type request_options:
150+
:class:`google.cloud.spanner_v1.types.RequestOptions`
151+
:param request_options:
152+
(Optional) Common options for this request.
153+
If a dict is provided, it must be of the same form as the protobuf
154+
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
155+
148156
:rtype: datetime
149157
:returns: timestamp of the committed changes.
150158
"""
@@ -154,11 +162,16 @@ def commit(self, return_commit_stats=False):
154162
metadata = _metadata_with_prefix(database.name)
155163
txn_options = TransactionOptions(read_write=TransactionOptions.ReadWrite())
156164
trace_attributes = {"num_mutations": len(self._mutations)}
165+
166+
if type(request_options) == dict:
167+
request_options = RequestOptions(request_options)
168+
157169
request = CommitRequest(
158170
session=self._session.name,
159171
mutations=self._mutations,
160172
single_use_transaction=txn_options,
161173
return_commit_stats=return_commit_stats,
174+
request_options=request_options,
162175
)
163176
with trace_call("CloudSpanner.Commit", self._session, trace_attributes):
164177
response = api.commit(request=request, metadata=metadata,)

google/cloud/spanner_v1/database.py

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@
5858
TransactionOptions,
5959
)
6060
from google.cloud.spanner_v1.table import Table
61+
from google.cloud.spanner_v1 import RequestOptions
6162

6263
# pylint: enable=ungrouped-imports
6364

64-
6565
SPANNER_DATA_SCOPE = "https://www.googleapis.com/auth/spanner.data"
6666

6767

@@ -454,7 +454,12 @@ def drop(self):
454454
api.drop_database(database=self.name, metadata=metadata)
455455

456456
def execute_partitioned_dml(
457-
self, dml, params=None, param_types=None, query_options=None
457+
self,
458+
dml,
459+
params=None,
460+
param_types=None,
461+
query_options=None,
462+
request_options=None,
458463
):
459464
"""Execute a partitionable DML statement.
460465
@@ -478,12 +483,22 @@ def execute_partitioned_dml(
478483
If a dict is provided, it must be of the same form as the protobuf
479484
message :class:`~google.cloud.spanner_v1.types.QueryOptions`
480485
486+
:type request_options:
487+
:class:`google.cloud.spanner_v1.types.RequestOptions`
488+
:param request_options:
489+
(Optional) Common options for this request.
490+
If a dict is provided, it must be of the same form as the protobuf
491+
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
492+
481493
:rtype: int
482494
:returns: Count of rows affected by the DML statement.
483495
"""
484496
query_options = _merge_query_options(
485497
self._instance._client._query_options, query_options
486498
)
499+
if type(request_options) == dict:
500+
request_options = RequestOptions(request_options)
501+
487502
if params is not None:
488503
from google.cloud.spanner_v1.transaction import Transaction
489504

@@ -517,6 +532,7 @@ def execute_pdml():
517532
params=params_pb,
518533
param_types=param_types,
519534
query_options=query_options,
535+
request_options=request_options,
520536
)
521537
method = functools.partial(
522538
api.execute_streaming_sql, metadata=metadata,
@@ -561,16 +577,23 @@ def snapshot(self, **kw):
561577
"""
562578
return SnapshotCheckout(self, **kw)
563579

564-
def batch(self):
580+
def batch(self, request_options=None):
565581
"""Return an object which wraps a batch.
566582
567583
The wrapper *must* be used as a context manager, with the batch
568584
as the value returned by the wrapper.
569585
586+
:type request_options:
587+
:class:`google.cloud.spanner_v1.types.RequestOptions`
588+
:param request_options:
589+
(Optional) Common options for the commit request.
590+
If a dict is provided, it must be of the same form as the protobuf
591+
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
592+
570593
:rtype: :class:`~google.cloud.spanner_v1.database.BatchCheckout`
571594
:returns: new wrapper
572595
"""
573-
return BatchCheckout(self)
596+
return BatchCheckout(self, request_options)
574597

575598
def batch_snapshot(self, read_timestamp=None, exact_staleness=None):
576599
"""Return an object which wraps a batch read / query.
@@ -756,11 +779,19 @@ class BatchCheckout(object):
756779
757780
:type database: :class:`~google.cloud.spanner_v1.database.Database`
758781
:param database: database to use
782+
783+
:type request_options:
784+
:class:`google.cloud.spanner_v1.types.RequestOptions`
785+
:param request_options:
786+
(Optional) Common options for the commit request.
787+
If a dict is provided, it must be of the same form as the protobuf
788+
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
759789
"""
760790

761-
def __init__(self, database):
791+
def __init__(self, database, request_options=None):
762792
self._database = database
763793
self._session = self._batch = None
794+
self._request_options = request_options
764795

765796
def __enter__(self):
766797
"""Begin ``with`` block."""
@@ -772,7 +803,10 @@ def __exit__(self, exc_type, exc_val, exc_tb):
772803
"""End ``with`` block."""
773804
try:
774805
if exc_type is None:
775-
self._batch.commit(return_commit_stats=self._database.log_commit_stats)
806+
self._batch.commit(
807+
return_commit_stats=self._database.log_commit_stats,
808+
request_options=self._request_options,
809+
)
776810
finally:
777811
if self._database.log_commit_stats and self._batch.commit_stats:
778812
self._database.logger.info(

google/cloud/spanner_v1/session.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ def execute_sql(
230230
param_types=None,
231231
query_mode=None,
232232
query_options=None,
233+
request_options=None,
233234
retry=google.api_core.gapic_v1.method.DEFAULT,
234235
timeout=google.api_core.gapic_v1.method.DEFAULT,
235236
):
@@ -258,6 +259,13 @@ def execute_sql(
258259
or :class:`dict`
259260
:param query_options: (Optional) Options that are provided for query plan stability.
260261
262+
:type request_options:
263+
:class:`google.cloud.spanner_v1.types.RequestOptions`
264+
:param request_options:
265+
(Optional) Common options for this request.
266+
If a dict is provided, it must be of the same form as the protobuf
267+
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
268+
261269
:type retry: :class:`~google.api_core.retry.Retry`
262270
:param retry: (Optional) The retry settings for this request.
263271
@@ -273,6 +281,7 @@ def execute_sql(
273281
param_types,
274282
query_mode,
275283
query_options=query_options,
284+
request_options=request_options,
276285
retry=retry,
277286
timeout=timeout,
278287
)
@@ -319,9 +328,12 @@ def run_in_transaction(self, func, *args, **kw):
319328
320329
:type kw: dict
321330
:param kw: (Optional) keyword arguments to be passed to ``func``.
322-
If passed, "timeout_secs" will be removed and used to
331+
If passed:
332+
"timeout_secs" will be removed and used to
323333
override the default retry timeout which defines maximum timestamp
324334
to continue retrying the transaction.
335+
"commit_request_options" will be removed and used to set the
336+
request options for the commit request.
325337
326338
:rtype: Any
327339
:returns: The return value of ``func``.
@@ -330,6 +342,7 @@ def run_in_transaction(self, func, *args, **kw):
330342
reraises any non-ABORT exceptions raised by ``func``.
331343
"""
332344
deadline = time.time() + kw.pop("timeout_secs", DEFAULT_RETRY_TIMEOUT_SECS)
345+
commit_request_options = kw.pop("commit_request_options", None)
333346
attempts = 0
334347

335348
while True:
@@ -355,7 +368,10 @@ def run_in_transaction(self, func, *args, **kw):
355368
raise
356369

357370
try:
358-
txn.commit(return_commit_stats=self._database.log_commit_stats)
371+
txn.commit(
372+
return_commit_stats=self._database.log_commit_stats,
373+
request_options=commit_request_options,
374+
)
359375
except Aborted as exc:
360376
del self._transaction
361377
_delay_until_retry(exc, deadline, attempts)

google/cloud/spanner_v1/snapshot.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from google.cloud.spanner_v1._helpers import _SessionWrapper
3535
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
3636
from google.cloud.spanner_v1.streamed import StreamedResultSet
37+
from google.cloud.spanner_v1 import RequestOptions
3738

3839
_STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = (
3940
"RST_STREAM",
@@ -124,6 +125,7 @@ def read(
124125
index="",
125126
limit=0,
126127
partition=None,
128+
request_options=None,
127129
*,
128130
retry=gapic_v1.method.DEFAULT,
129131
timeout=gapic_v1.method.DEFAULT,
@@ -152,6 +154,13 @@ def read(
152154
from :meth:`partition_read`. Incompatible with
153155
``limit``.
154156
157+
:type request_options:
158+
:class:`google.cloud.spanner_v1.types.RequestOptions`
159+
:param request_options:
160+
(Optional) Common options for this request.
161+
If a dict is provided, it must be of the same form as the protobuf
162+
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
163+
155164
:type retry: :class:`~google.api_core.retry.Retry`
156165
:param retry: (Optional) The retry settings for this request.
157166
@@ -176,6 +185,9 @@ def read(
176185
metadata = _metadata_with_prefix(database.name)
177186
transaction = self._make_txn_selector()
178187

188+
if type(request_options) == dict:
189+
request_options = RequestOptions(request_options)
190+
179191
request = ReadRequest(
180192
session=self._session.name,
181193
table=table,
@@ -185,6 +197,7 @@ def read(
185197
index=index,
186198
limit=limit,
187199
partition_token=partition,
200+
request_options=request_options,
188201
)
189202
restart = functools.partial(
190203
api.streaming_read,
@@ -217,6 +230,7 @@ def execute_sql(
217230
param_types=None,
218231
query_mode=None,
219232
query_options=None,
233+
request_options=None,
220234
partition=None,
221235
retry=gapic_v1.method.DEFAULT,
222236
timeout=gapic_v1.method.DEFAULT,
@@ -249,6 +263,13 @@ def execute_sql(
249263
If a dict is provided, it must be of the same form as the protobuf
250264
message :class:`~google.cloud.spanner_v1.types.QueryOptions`
251265
266+
:type request_options:
267+
:class:`google.cloud.spanner_v1.types.RequestOptions`
268+
:param request_options:
269+
(Optional) Common options for this request.
270+
If a dict is provided, it must be of the same form as the protobuf
271+
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
272+
252273
:type partition: bytes
253274
:param partition: (Optional) one of the partition tokens returned
254275
from :meth:`partition_query`.
@@ -291,6 +312,9 @@ def execute_sql(
291312
default_query_options = database._instance._client._query_options
292313
query_options = _merge_query_options(default_query_options, query_options)
293314

315+
if type(request_options) == dict:
316+
request_options = RequestOptions(request_options)
317+
294318
request = ExecuteSqlRequest(
295319
session=self._session.name,
296320
sql=sql,
@@ -301,6 +325,7 @@ def execute_sql(
301325
partition_token=partition,
302326
seqno=self._execute_sql_count,
303327
query_options=query_options,
328+
request_options=request_options,
304329
)
305330
restart = functools.partial(
306331
api.execute_streaming_sql,

0 commit comments

Comments
 (0)