Skip to content

Commit 087231b

Browse files
committed
PYCBC-1647: Replica reads from preferred server group in transactions
Motivation ========== The transactions spec now includes a get_replica_from_preferred_server_group in the attempt context. We should add support for it, utilizing the corresponding method in the C++ core. Changes ======= Add a get_replica_from_preferred_server_group method to the attempt context in both the blocking & async APIs. Change-Id: Iff7311a45b0bfc18aab363a3697949d25af6256a Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/221686 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Jared Casey <jared.casey@couchbase.com>
1 parent 8113359 commit 087231b

8 files changed

Lines changed: 234 additions & 8 deletions

File tree

acouchbase/tests/transactions_t.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from couchbase.exceptions import (BucketNotFoundException,
2525
DocumentExistsException,
2626
DocumentNotFoundException,
27+
DocumentUnretrievableException,
2728
FeatureUnavailableException,
2829
ParsingFailedException,
2930
TransactionExpired,
@@ -58,6 +59,8 @@ class TransactionTestSuite:
5859
'test_get',
5960
'test_get_lambda_raises_doc_not_found',
6061
'test_get_inner_exc_doc_not_found',
62+
'test_get_replica_from_preferred_server_group_unretrievable',
63+
'test_get_replica_from_preferred_server_group_propagate_unretrievable_exc',
6164
'test_insert',
6265
'test_insert_lambda_raises_doc_exists',
6366
'test_insert_inner_exc_doc_exists',
@@ -111,6 +114,13 @@ def check_binary_txns_not_supported(self, cb_env):
111114
cb_env.mock_server_type,
112115
cb_env.server_version_patch)
113116

117+
@pytest.fixture(scope='class')
118+
def check_server_groups_supported(self, cb_env):
119+
EnvironmentFeatures.check_if_feature_supported('server_groups',
120+
cb_env.server_version_short,
121+
cb_env.mock_server_type,
122+
cb_env.server_version_patch)
123+
114124
@pytest.mark.parametrize('adhoc', [True, False])
115125
def test_adhoc(self, adhoc):
116126
cfg = TransactionQueryOptions(adhoc=adhoc)
@@ -287,6 +297,43 @@ async def txn_logic(ctx):
287297

288298
assert num_attempts == 1
289299

300+
@pytest.mark.asyncio
301+
@pytest.mark.usefixtures('check_server_groups_supported')
302+
async def test_get_replica_from_preferred_server_group_unretrievable(self, cb_env):
303+
key = cb_env.get_new_doc(key_only=True)
304+
num_attempts = 0
305+
306+
async def txn_logic(ctx):
307+
nonlocal num_attempts
308+
num_attempts += 1
309+
with pytest.raises(DocumentUnretrievableException):
310+
await ctx.get_replica_from_preferred_server_group(cb_env.collection, key)
311+
312+
await cb_env.cluster.transactions.run(txn_logic)
313+
314+
assert num_attempts == 1
315+
316+
@pytest.mark.asyncio
317+
@pytest.mark.usefixtures('check_server_groups_supported')
318+
async def test_get_replica_from_preferred_server_group_propagate_unretrievable_exc(self, cb_env):
319+
key = cb_env.get_new_doc(key_only=True)
320+
num_attempts = 0
321+
322+
async def txn_logic(ctx):
323+
nonlocal num_attempts
324+
num_attempts += 1
325+
await ctx.get_replica_from_preferred_server_group(cb_env.collection, key)
326+
327+
try:
328+
await cb_env.cluster.transactions.run(txn_logic)
329+
except TransactionFailed as ex:
330+
assert ex.inner_cause is not None
331+
assert isinstance(ex.inner_cause, DocumentUnretrievableException)
332+
except Exception as ex:
333+
pytest.fail(f"Expected to raise TransactionFailed, not {ex.__class__.__name__}")
334+
335+
assert num_attempts == 1
336+
290337
@pytest.mark.asyncio
291338
async def test_insert(self, cb_env):
292339
key, value = cb_env.get_new_doc()

acouchbase/transactions/transactions.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from couchbase.exceptions import exception as BaseCouchbaseException
2828
from couchbase.logic.supportability import Supportability
2929
from couchbase.options import (TransactionGetOptions,
30+
TransactionGetReplicaFromPreferredServerGroupOptions,
3031
TransactionInsertOptions,
3132
TransactionQueryOptions,
3233
TransactionReplaceOptions)
@@ -170,14 +171,14 @@ def _commit(self,
170171
@AsyncWrapper.inject_callbacks(TransactionGetResult)
171172
def _get(self,
172173
coll, # type: AsyncCollection
173-
key, # type: JSONType
174+
key, # type: str
174175
**kwargs # type: Dict[str, Any]
175176
) -> Awaitable[TransactionGetResult]:
176177
return super().get(coll, key, **kwargs)
177178

178179
def get(self,
179180
coll, # type: AsyncCollection
180-
key, # type: JSONType
181+
key, # type: str
181182
options=None, # type: Optional[TransactionGetOptions]
182183
**kwargs # type: Dict[str, Any]
183184
) -> Awaitable[TransactionGetResult]:
@@ -203,6 +204,46 @@ def get(self,
203204
kwargs['transcoder'] = options.get('transcoder', None)
204205
return self._get(coll, key, **kwargs)
205206

207+
@AsyncWrapper.inject_callbacks(TransactionGetResult)
208+
def _get_replica_from_preferred_server_group(self,
209+
coll, # type: AsyncCollection
210+
key, # type: str
211+
**kwargs # type: Dict[str, Any]
212+
) -> TransactionGetResult:
213+
return super().get_replica_from_preferred_server_group(coll, key, **kwargs)
214+
215+
def get_replica_from_preferred_server_group(self,
216+
coll, # type: AsyncCollection
217+
key, # type: str
218+
options=None, # type: Optional[TransactionGetReplicaFromPreferredServerGroupOptions] # noqa: E501
219+
**kwargs # type: Dict[str, Any]
220+
) -> TransactionGetResult:
221+
"""
222+
Get a document within this transaction from any replica in the preferred server group that is specified in
223+
the :class:`couchbase.options.ClusterOptions`.
224+
225+
Args:
226+
coll (:class:`couchbase.collection.Collection`): Collection to use to find the document.
227+
key (str): document key.
228+
options (:class:`~couchbase.options.TransactionGetReplicaFromPreferredServerGroupOptions`): Optional
229+
parameters for this operation.
230+
**kwargs (Dict[str, Any]): keyword arguments that can be used in place or to
231+
override provided :class:`~couchbase.options.TransactionGetReplicaFromPreferredServerGroupOptions`
232+
233+
Returns:
234+
:class:`couchbase.transactions.TransactionGetResult`: Document in collection, in a form useful for passing
235+
to other transaction operations. Or `None` if the document was not found.
236+
Raises:
237+
:class:`couchbase.exceptions.TransactionOperationFailed`: If the operation failed. In practice, there is
238+
no need to handle the exception, as the transaction will rollback regardless.
239+
:class:`couchbase.exceptions.DocumentUnretrievableException`: If the document could not be retrieved from
240+
any replica in the preferred server group. The transaction will not rollback if this exception is
241+
caught.
242+
"""
243+
if 'transcoder' not in kwargs and isinstance(options, TransactionGetReplicaFromPreferredServerGroupOptions):
244+
kwargs['transcoder'] = options.get('transcoder', None)
245+
return self._get_replica_from_preferred_server_group(coll, key, **kwargs)
246+
206247
@AsyncWrapper.inject_callbacks(TransactionGetResult)
207248
def _insert(self,
208249
coll, # type: AsyncCollection

couchbase/options.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1984,6 +1984,26 @@ def __init__(self, **kwargs):
19841984
super().__init__(**kwargs)
19851985

19861986

1987+
class TransactionGetReplicaFromPreferredServerGroupOptions(dict):
1988+
"""Available options to for transaction get_replica_from_preferred_server_group operation.
1989+
1990+
Args:
1991+
transcoder (:class:`~.transcoder.Transcoder`, optional): Specifies an explicit transcoder
1992+
to use for this specific operation. Defaults to :class:`~.transcoder.JsonTranscoder`.
1993+
"""
1994+
1995+
@overload
1996+
def __init__(self,
1997+
transcoder=None # type: Optional[Transcoder]
1998+
):
1999+
...
2000+
2001+
def __init__(self, **kwargs):
2002+
2003+
kwargs = {k: v for k, v in kwargs.items() if v is not None}
2004+
super().__init__(**kwargs)
2005+
2006+
19872007
class TransactionInsertOptions(dict):
19882008
"""Available options to for transaction insert operation.
19892009

couchbase/tests/transactions_t.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from couchbase.exceptions import (BucketNotFoundException,
2323
DocumentExistsException,
2424
DocumentNotFoundException,
25+
DocumentUnretrievableException,
2526
FeatureUnavailableException,
2627
ParsingFailedException,
2728
TransactionExpired,
@@ -56,6 +57,8 @@ class TransactionTestSuite:
5657
'test_get',
5758
'test_get_lambda_raises_doc_not_found',
5859
'test_get_inner_exc_doc_not_found',
60+
'test_get_replica_from_preferred_server_group_unretrievable',
61+
'test_get_replica_from_preferred_server_group_propagate_unretrievable_exc',
5962
'test_insert',
6063
'test_insert_lambda_raises_doc_exists',
6164
'test_insert_inner_exc_doc_exists',
@@ -109,6 +112,13 @@ def check_binary_txns_not_supported(self, cb_env):
109112
cb_env.mock_server_type,
110113
cb_env.server_version_patch)
111114

115+
@pytest.fixture(scope='class')
116+
def check_server_groups_supported(self, cb_env):
117+
EnvironmentFeatures.check_if_feature_supported('server_groups',
118+
cb_env.server_version_short,
119+
cb_env.mock_server_type,
120+
cb_env.server_version_patch)
121+
112122
@pytest.mark.parametrize('adhoc', [True, False])
113123
def test_adhoc(self, adhoc):
114124
cfg = TransactionQueryOptions(adhoc=adhoc)
@@ -278,6 +288,41 @@ def txn_logic(ctx):
278288

279289
assert num_attempts == 1
280290

291+
@pytest.mark.usefixtures('check_server_groups_supported')
292+
def test_get_replica_from_preferred_server_group_unretrievable(self, cb_env):
293+
key = cb_env.get_new_doc(key_only=True)
294+
num_attempts = 0
295+
296+
def txn_logic(ctx):
297+
nonlocal num_attempts
298+
num_attempts += 1
299+
with pytest.raises(DocumentUnretrievableException):
300+
ctx.get_replica_from_preferred_server_group(cb_env.collection, key)
301+
302+
cb_env.cluster.transactions.run(txn_logic)
303+
304+
assert num_attempts == 1
305+
306+
@pytest.mark.usefixtures('check_server_groups_supported')
307+
def test_get_replica_from_preferred_server_group_propagate_unretrievable_exc(self, cb_env):
308+
key = cb_env.get_new_doc(key_only=True)
309+
num_attempts = 0
310+
311+
def txn_logic(ctx):
312+
nonlocal num_attempts
313+
num_attempts += 1
314+
ctx.get_replica_from_preferred_server_group(cb_env.collection, key)
315+
316+
try:
317+
cb_env.cluster.transactions.run(txn_logic)
318+
except TransactionFailed as ex:
319+
assert ex.inner_cause is not None
320+
assert isinstance(ex.inner_cause, DocumentUnretrievableException)
321+
except Exception as ex:
322+
pytest.fail(f"Expected to raise TransactionFailed, not {ex.__class__.__name__}")
323+
324+
assert num_attempts == 1
325+
281326
def test_insert(self, cb_env):
282327
key, value = cb_env.get_new_doc()
283328

couchbase/transactions/logic/attempt_context_logic.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,16 @@ def get(self, coll, key, **kwargs):
9999
log.debug('get calling transaction op with %s', kwargs)
100100
return transaction_op(**kwargs)
101101

102+
def get_replica_from_preferred_server_group(self, coll, key, **kwargs):
103+
kwargs.pop('transcoder', None)
104+
kwargs.update(coll._get_connection_args())
105+
kwargs.pop("conn")
106+
kwargs["key"] = key
107+
kwargs["ctx"] = self._ctx
108+
kwargs["op"] = transaction_operations.GET_REPLICA_FROM_PREFERRED_SERVER_GROUP.value
109+
log.debug('get_replica_from_preferred_server_group calling transaction op with %s', kwargs)
110+
return transaction_op(**kwargs)
111+
102112
def insert(self, coll, key, value, **kwargs):
103113
transcoder = kwargs.pop('transcoder', self._transcoder)
104114
kwargs.update(coll._get_connection_args())

couchbase/transactions/transactions.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from couchbase.exceptions import exception as BaseCouchbaseException
2828
from couchbase.logic.supportability import Supportability
2929
from couchbase.options import (TransactionGetOptions,
30+
TransactionGetReplicaFromPreferredServerGroupOptions,
3031
TransactionInsertOptions,
3132
TransactionQueryOptions,
3233
TransactionReplaceOptions)
@@ -174,6 +175,46 @@ def get(self,
174175
kwargs['transcoder'] = options.get('transcoder', None)
175176
return self._get(coll, key, **kwargs)
176177

178+
@BlockingWrapper.block(TransactionGetResult)
179+
def _get_replica_from_preferred_server_group(self,
180+
coll, # type: Collection
181+
key, # type: str
182+
**kwargs # type: Dict[str, Any]
183+
) -> TransactionGetResult:
184+
return super().get_replica_from_preferred_server_group(coll, key, **kwargs)
185+
186+
def get_replica_from_preferred_server_group(self,
187+
coll, # type: Collection
188+
key, # type: str
189+
options=None, # type: Optional[TransactionGetReplicaFromPreferredServerGroupOptions] # noqa: E501
190+
**kwargs # type: Dict[str, Any]
191+
) -> TransactionGetResult:
192+
"""
193+
Get a document within this transaction from any replica in the preferred server group that has been specified in
194+
the :class:`couchbase.options.ClusterOptions`.
195+
196+
Args:
197+
coll (:class:`couchbase.collection.Collection`): Collection to use to find the document.
198+
key (str): document key.
199+
options (:class:`~couchbase.options.TransactionGetReplicaFromPreferredServerGroupOptions`): Optional
200+
parameters for this operation.
201+
**kwargs (Dict[str, Any]): keyword arguments that can be used in place or to
202+
override provided :class:`~couchbase.options.TransactionGetReplicaFromPreferredServerGroupOptions`
203+
204+
Returns:
205+
:class:`couchbase.transactions.TransactionGetResult`: Document in collection, in a form useful for passing
206+
to other transaction operations. Or `None` if the document was not found.
207+
Raises:
208+
:class:`couchbase.exceptions.TransactionOperationFailed`: If the operation failed. In practice, there is
209+
no need to handle the exception, as the transaction will rollback regardless.
210+
:class:`couchbase.exceptions.DocumentUnretrievableException`: If the document could not be retrieved from
211+
any replica in the preferred server group. The transaction will not rollback if this exception is
212+
caught.
213+
"""
214+
if 'transcoder' not in kwargs and isinstance(options, TransactionGetReplicaFromPreferredServerGroupOptions):
215+
kwargs['transcoder'] = options.get('transcoder', None)
216+
return self._get_replica_from_preferred_server_group(coll, key, **kwargs)
217+
177218
@BlockingWrapper.block(TransactionGetResult)
178219
def _insert(self,
179220
coll, # type: Collection

src/transactions/transactions.cxx

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -995,11 +995,13 @@ handle_returning_void(PyObject* pyObj_callback,
995995
}
996996

997997
void
998-
handle_returning_transaction_get_result(PyObject* pyObj_callback,
999-
PyObject* pyObj_errback,
1000-
std::shared_ptr<std::promise<PyObject*>> barrier,
1001-
std::exception_ptr err,
1002-
std::optional<tx_core::transaction_get_result> res)
998+
handle_returning_transaction_get_result(
999+
PyObject* pyObj_callback,
1000+
PyObject* pyObj_errback,
1001+
std::shared_ptr<std::promise<PyObject*>> barrier,
1002+
std::exception_ptr err,
1003+
std::optional<couchbase::core::transactions::transaction_get_result> res,
1004+
bool is_replica_get = false)
10031005
{
10041006
// TODO: flesh out transaction_get_result and exceptions...
10051007
auto state = PyGILState_Ensure();
@@ -1022,7 +1024,9 @@ handle_returning_transaction_get_result(PyObject* pyObj_callback,
10221024
// operations once the underlying issue has been resolved.
10231025
if (!res.has_value()) {
10241026
pyObj_get_result = pycbc_build_exception(
1025-
couchbase::errc::make_error_code(couchbase::errc::key_value::document_not_found),
1027+
couchbase::errc::make_error_code((is_replica_get)
1028+
? couchbase::errc::key_value::document_irretrievable
1029+
: couchbase::errc::key_value::document_not_found),
10261030
__FILE__,
10271031
__LINE__,
10281032
"Txn get op: document not found.");
@@ -1231,6 +1235,22 @@ pycbc_txns::transaction_op([[maybe_unused]] PyObject* self, PyObject* args, PyOb
12311235
});
12321236
Py_END_ALLOW_THREADS break;
12331237
}
1238+
case TxOperations::GET_REPLICA_FROM_PREFERRED_SERVER_GROUP: {
1239+
if (nullptr == bucket || nullptr == scope || nullptr == collection || nullptr == key) {
1240+
PyErr_SetString(PyExc_ValueError,
1241+
"couldn't create document id for get_replica_from_preferred_server_group");
1242+
Py_RETURN_NONE;
1243+
}
1244+
couchbase::core::document_id id{ bucket, scope, collection, key };
1245+
Py_BEGIN_ALLOW_THREADS ctx->ctx->get_replica_from_preferred_server_group(
1246+
id,
1247+
[barrier, pyObj_callback, pyObj_errback](
1248+
std::exception_ptr err, std::optional<tx_core::transaction_get_result> res) {
1249+
handle_returning_transaction_get_result(
1250+
pyObj_callback, pyObj_errback, barrier, err, res, true);
1251+
});
1252+
Py_END_ALLOW_THREADS break;
1253+
}
12341254
case TxOperations::INSERT: {
12351255
if (nullptr == bucket || nullptr == scope || nullptr == collection || nullptr == key) {
12361256
PyErr_SetString(PyExc_ValueError, "couldn't create document id for insert");

src/transactions/transactions.hxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public:
4040
enum TxOperationType {
4141
UNKNOWN,
4242
GET,
43+
GET_REPLICA_FROM_PREFERRED_SERVER_GROUP,
4344
REPLACE,
4445
INSERT,
4546
REMOVE,
@@ -73,6 +74,7 @@ public:
7374
static const char* ALL_OPERATIONS(void)
7475
{
7576
const char* ops = "GET "
77+
"GET_REPLICA_FROM_PREFERRED_SERVER_GROUP "
7678
"REPLACE "
7779
"INSERT "
7880
"REMOVE "

0 commit comments

Comments
 (0)