Skip to content

Commit 8113359

Browse files
committed
PYCBC-1594: Replica reads from preferred server group
Motivation ========== Operations do not currently take into account server groups. For reading from replica servers we need to provide a way for users to be able to read from specific groups, which can correspond to availablity zones. Doing so helps users to reduce costs of sending requests across availability zones. Changes ======= * Add a preferred_server_group option to ClusterOptions and propagate it to the C++ core * Add a read_preference option in replica read operations and propagate it to the C++ core Results ======= All tests pass Change-Id: Ib98a92d9cf141e4e6dbbbdeb9e43d17aa5811d6e Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/221585 Reviewed-by: Jared Casey <jared.casey@couchbase.com> Tested-by: Build Bot <build@couchbase.com>
1 parent cab9866 commit 8113359

18 files changed

Lines changed: 317 additions & 44 deletions

acouchbase/tests/collection_t.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,13 @@
3232
InvalidArgumentException,
3333
PathNotFoundException,
3434
TemporaryFailException)
35-
from couchbase.options import (GetOptions,
35+
from couchbase.options import (GetAllReplicasOptions,
36+
GetAnyReplicaOptions,
37+
GetOptions,
3638
InsertOptions,
3739
ReplaceOptions,
3840
UpsertOptions)
41+
from couchbase.replica_reads import ReadPreference
3942
from couchbase.result import (ExistsResult,
4043
GetReplicaResult,
4144
GetResult,
@@ -94,6 +97,10 @@ def check_xattr_supported(self, cb_env):
9497
def check_not_locked_supported(self, cb_env):
9598
cb_env.check_if_feature_supported('kv_not_locked')
9699

100+
@pytest.fixture(scope='class')
101+
def check_server_groups_supported(self, cb_env):
102+
cb_env.check_if_feature_supported('server_groups')
103+
97104
@pytest_asyncio.fixture(name="new_kvp")
98105
async def new_key_and_value_with_reset(self, cb_env) -> KVPair:
99106
key, value = await cb_env.get_new_key_value()
@@ -626,6 +633,17 @@ async def test_get_any_replica_fail(self, cb_env):
626633
with pytest.raises(DocumentUnretrievableException):
627634
await cb_env.collection.get_any_replica('not-a-key')
628635

636+
@pytest.mark.usefixtures("check_multi_node")
637+
@pytest.mark.usefixtures("check_replicas")
638+
@pytest.mark.usefixtures("check_server_groups_supported")
639+
@pytest.mark.asyncio
640+
async def test_get_any_replica_read_preference(self, cb_env, default_kvp):
641+
# No preferred server group was specified in the cluster options so this should raise
642+
# DocumentUnretrievableException
643+
with pytest.raises(DocumentUnretrievableException):
644+
await cb_env.collection.get_any_replica(
645+
default_kvp.key, GetAnyReplicaOptions(read_preference=ReadPreference.SELECTED_SERVER_GROUP))
646+
629647
@pytest.mark.usefixtures("check_multi_node")
630648
@pytest.mark.usefixtures("check_replicas")
631649
@pytest.mark.asyncio
@@ -667,6 +685,17 @@ async def test_get_all_replicas_results(self, cb_env, default_kvp):
667685
assert active_cnt == 1
668686
assert replica_cnt >= active_cnt
669687

688+
@pytest.mark.usefixtures("check_multi_node")
689+
@pytest.mark.usefixtures("check_replicas")
690+
@pytest.mark.usefixtures("check_server_groups_supported")
691+
@pytest.mark.asyncio
692+
async def test_get_all_replicas_read_preference(self, cb_env, default_kvp):
693+
# No preferred server group was specified in the cluster options so this should raise
694+
# DocumentUnretrievableException
695+
with pytest.raises(DocumentUnretrievableException):
696+
await cb_env.collection.get_all_replicas(
697+
default_kvp.key, GetAllReplicasOptions(read_preference=ReadPreference.SELECTED_SERVER_GROUP))
698+
670699
# @TODO(jc): - should an expiry of -1 raise an InvalidArgumentException?
671700
@pytest.mark.usefixtures("check_xattr_supported")
672701
@pytest.mark.asyncio

acouchbase/tests/subdoc_t.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
LookupInAnyReplicaOptions,
3333
LookupInOptions,
3434
MutateInOptions)
35+
from couchbase.replica_reads import ReadPreference
3536
from couchbase.result import (GetResult,
3637
LookupInReplicaResult,
3738
LookupInResult,
@@ -72,6 +73,7 @@ class SubDocumentTestSuite:
7273
'test_lookup_in_all_replicas_get_full',
7374
'test_lookup_in_all_replicas_multiple_specs',
7475
'test_lookup_in_all_replicas_with_timeout',
76+
'test_lookup_in_all_replicas_read_preference',
7577
'test_lookup_in_any_replica_bad_key',
7678
'test_lookup_in_any_replica_exists',
7779
'test_lookup_in_any_replica_exists_bad_path',
@@ -80,6 +82,7 @@ class SubDocumentTestSuite:
8082
'test_lookup_in_any_replica_get_full',
8183
'test_lookup_in_any_replica_multiple_specs',
8284
'test_lookup_in_any_replica_with_timeout',
85+
'test_lookup_in_any_replica_read_preference',
8386
'test_lookup_in_multiple_specs',
8487
'test_lookup_in_one_path_not_found',
8588
'test_lookup_in_simple_exists',
@@ -134,6 +137,13 @@ def check_replica_read_supported(self, cb_env):
134137
cb_env.server_version_short,
135138
cb_env.mock_server_type)
136139

140+
@pytest.fixture(scope='class')
141+
def check_server_groups_supported(self, cb_env):
142+
EnvironmentFeatures.check_if_feature_supported('server_groups',
143+
cb_env.server_version_short,
144+
cb_env.mock_server_type,
145+
cb_env.server_version_patch)
146+
137147
@pytest.mark.asyncio
138148
@pytest.mark.usefixtures('skip_if_go_caves')
139149
async def test_array_add_unique(self, cb_env):
@@ -455,6 +465,19 @@ async def test_lookup_in_all_replicas_with_timeout(self, cb_env):
455465
active_count += not result.is_replica
456466
assert active_count == 1
457467

468+
@pytest.mark.asyncio
469+
@pytest.mark.usefixtures('check_replica_read_supported')
470+
@pytest.mark.usefixtures('check_server_groups_supported')
471+
async def test_lookup_in_all_replicas_read_preference(self, cb_env):
472+
key, value = cb_env.get_existing_doc_by_type('vehicle')
473+
# No preferred server group was specified in the cluster options so this should raise
474+
# DocumentUnretrievableException
475+
with pytest.raises(DocumentUnretrievableException):
476+
await cb_env.collection.lookup_in_all_replicas(
477+
key,
478+
[SD.get('batch')],
479+
LookupInAllReplicasOptions(read_preference=ReadPreference.SELECTED_SERVER_GROUP))
480+
458481
@pytest.mark.asyncio
459482
@pytest.mark.usefixtures('check_replica_read_supported')
460483
async def test_lookup_in_any_replica_bad_key(self, cb_env):
@@ -527,6 +550,17 @@ async def test_lookup_in_any_replica_with_timeout(self, cb_env):
527550
assert result.content_as[str](0) == value['batch']
528551
assert result.is_replica is not None
529552

553+
@pytest.mark.asyncio
554+
@pytest.mark.usefixtures('check_replica_read_supported')
555+
@pytest.mark.usefixtures('check_server_groups_supported')
556+
async def test_lookup_in_any_replica_read_preference(self, cb_env):
557+
key, value = cb_env.get_existing_doc_by_type('vehicle')
558+
# No preferred server group was specified in the cluster options so this should raise
559+
# DocumentUnretrievableException
560+
with pytest.raises(DocumentUnretrievableException):
561+
await cb_env.collection.lookup_in_any_replica(
562+
key, [SD.get('batch')], LookupInAnyReplicaOptions(read_preference=ReadPreference.SELECTED_SERVER_GROUP))
563+
530564
@pytest.mark.asyncio
531565
@pytest.mark.usefixtures("check_xattr_supported")
532566
async def test_lookup_in_multiple_specs(self, cb_env):

couchbase/logic/options.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from couchbase.metrics import CouchbaseMeter
4747
from couchbase.mutation_state import MutationState
4848
from couchbase.n1ql import QueryProfile, QueryScanConsistency
49+
from couchbase.replica_reads import ReadPreference
4950
from couchbase.search import (Facet,
5051
HighlightStyle,
5152
SearchScanConsistency,
@@ -119,6 +120,7 @@ def get_valid_args(
119120
'project': lambda x: x,
120121
'delta': lambda x: x,
121122
'initial': lambda x: x,
123+
'read_preference': lambda x: x.value,
122124
'per_key_options': lambda x: x,
123125
'return_exceptions': validate_bool
124126
}
@@ -556,6 +558,7 @@ class ClusterOptionsBase(dict):
556558
"dns_nameserver": {"dns_nameserver": validate_str},
557559
"dns_port": {"dns_port": validate_int},
558560
"dump_configuration": {"dump_configuration": validate_bool},
561+
"preferred_server_group": {"preferred_server_group": validate_str},
559562
}
560563

561564
@overload
@@ -596,7 +599,8 @@ def __init__(
596599
dns_nameserver=None, # type: Optional[str]
597600
dns_port=None, # type: Optional[int]
598601
disable_mozilla_ca_certificates=None, # type: Optional[bool]
599-
dump_configuration=None, # type: Optional[bool]
602+
dump_configuration=None, # type: Optional[bool],
603+
preferred_server_group=None, # type: Optional[str]
600604
):
601605
"""ClusterOptions instance."""
602606

@@ -913,7 +917,8 @@ class GetAllReplicasOptionsBase(OptionsTimeoutBase):
913917
@overload
914918
def __init__(self,
915919
timeout=None, # type: Optional[timedelta]
916-
transcoder=None # type: Optional[Transcoder]
920+
transcoder=None, # type: Optional[Transcoder]
921+
read_preference=None, # type: Optional[ReadPreference]
917922
):
918923
pass
919924

@@ -952,7 +957,8 @@ class GetAnyReplicaOptionsBase(OptionsTimeoutBase):
952957
@overload
953958
def __init__(self,
954959
timeout=None, # type: Optional[timedelta]
955-
transcoder=None # type: Optional[Transcoder]
960+
transcoder=None, # type: Optional[Transcoder]
961+
read_preference=None, # type: Optional[ReadPreference]
956962
):
957963
pass
958964

@@ -995,6 +1001,7 @@ def __init__(self,
9951001
timeout=None, # type: Optional[timedelta]
9961002
span=None, # type: Optional[Any]
9971003
serializer=None, # type: Optional[Serializer]
1004+
read_preference=None, # type: Optional[ReadPreference]
9981005
) -> None:
9991006
pass
10001007

@@ -1009,6 +1016,7 @@ def __init__(self,
10091016
timeout=None, # type: Optional[timedelta]
10101017
span=None, # type: Optional[Any]
10111018
serializer=None, # type: Optional[Serializer]
1019+
read_preference=None, # type: Optional[ReadPreference]
10121020
) -> None:
10131021
pass
10141022

couchbase/options.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
from couchbase.collection import Collection
9797
from couchbase.durability import DurabilityType, ServerDurability
9898
from couchbase.n1ql import QueryScanConsistency
99+
from couchbase.replica_reads import ReadPreference
99100
from couchbase.transactions import TransactionKeyspace
100101
from couchbase.transcoder import Transcoder
101102

@@ -362,6 +363,7 @@ class ClusterOptions(ClusterOptionsBase):
362363
dns_nameserver (str, optional): **VOLATILE** This API is subject to change at any time. Set to configure custom DNS nameserver. Defaults to None.
363364
dns_port (int, optional): **VOLATILE** This API is subject to change at any time. Set to configure custom DNS port. Defaults to None.
364365
dump_configuration (bool, optional): Set to True to dump every new configuration when TRACE level logging. Defaults to False (disabled).
366+
preferred_server_group (str, optional): Specifies the preferred server group to be used for replica reads with 'selected server group' read preference.
365367
""" # noqa: E501
366368

367369
def apply_profile(self,
@@ -507,6 +509,8 @@ class GetAllReplicasOptions(GetAllReplicasOptionsBase):
507509
key-value operation timeout.
508510
transcoder (:class:`~.transcoder.Transcoder`, optional): Specifies an explicit transcoder
509511
to use for this specific operation. Defaults to :class:`~.transcoder.JsonTranscoder`.
512+
read_preference(:class:`~couchbase.replica_reads.ReadPreference`, optional): Specifies how the replica nodes
513+
will be selected. Defaults to no preference.
510514
"""
511515

512516

@@ -552,6 +556,8 @@ class GetAnyReplicaOptions(GetAnyReplicaOptionsBase):
552556
key-value operation timeout.
553557
transcoder (:class:`~.transcoder.Transcoder`, optional): Specifies an explicit transcoder
554558
to use for this specific operation. Defaults to :class:`~.transcoder.JsonTranscoder`.
559+
read_preference(:class:`~couchbase.replica_reads.ReadPreference`, optional): Specifies how the replica nodes
560+
will be selected. Defaults to no preference.
555561
"""
556562

557563

@@ -706,6 +712,8 @@ class LookupInAnyReplicaOptions(LookupInAnyReplicaOptionsBase):
706712
Args:
707713
timeout (timedelta, optional): The timeout for this operation. Defaults to global
708714
subdocument operation timeout.
715+
read_preference(:class:`~couchbase.replica_reads.ReadPreference`, optional): Specifies how the replica nodes
716+
will be selected. Defaults to no preference.
709717
"""
710718

711719

@@ -719,6 +727,8 @@ class LookupInAllReplicasOptions(LookupInAllReplicasOptionsBase):
719727
Args:
720728
timeout (timedelta, optional): The timeout for this operation. Defaults to global
721729
subdocument operation timeout.
730+
read_preference(:class:`~couchbase.replica_reads.ReadPreference`, optional): Specifies how the replica nodes
731+
will be selected. Defaults to no preference.
722732
"""
723733

724734

@@ -834,6 +844,8 @@ class GetAllReplicasMultiOptions(dict):
834844
key-value operation timeout.
835845
transcoder (:class:`~couchbase.transcoder.Transcoder`, optional): Specifies an explicit transcoder
836846
to use for this specific operation. Defaults to :class:`~.transcoder.JsonTranscoder`.
847+
read_preference(:class:`~couchbase.replica_reads.ReadPreference`, optional): Specifies how the replica nodes
848+
will be selected. Defaults to no preference.
837849
per_key_options (Dict[str, :class:`.GetAllReplicasOptions`], optional): Specify
838850
:class:`.GetAllReplicasOptions` per key.
839851
return_exceptions(bool, optional): If False, raise an Exception when encountered. If True return the
@@ -842,9 +854,10 @@ class GetAllReplicasMultiOptions(dict):
842854
@overload
843855
def __init__(
844856
self,
845-
transcoder=None, # type: Transcoder
846-
per_key_options=None, # type: Dict[str, GetAllReplicasOptions]
847-
return_exceptions=None # type: Optional[bool]
857+
transcoder=None, # type: Optional[Transcoder]
858+
read_preference=None, # type: Optional[ReadPreference]
859+
per_key_options=None, # type: Dict[str, GetAllReplicasOptions]
860+
return_exceptions=None # type: Optional[bool]
848861
):
849862
pass
850863

@@ -854,7 +867,7 @@ def __init__(self, **kwargs):
854867

855868
@classmethod
856869
def get_valid_keys(cls):
857-
return ['timeout', 'transcoder', 'per_key_options', 'return_exceptions']
870+
return ['timeout', 'transcoder', 'read_preference', 'per_key_options', 'return_exceptions']
858871

859872

860873
class GetAnyReplicaMultiOptions(dict):
@@ -868,6 +881,8 @@ class GetAnyReplicaMultiOptions(dict):
868881
key-value operation timeout.
869882
transcoder (:class:`~couchbase.transcoder.Transcoder`, optional): Specifies an explicit transcoder
870883
to use for this specific operation. Defaults to :class:`~.transcoder.JsonTranscoder`.
884+
read_preference(:class:`~couchbase.replica_reads.ReadPreference`, optional): Specifies how the replica nodes
885+
will be selected. Defaults to no preference.
871886
per_key_options (Dict[str, :class:`.GetAnyReplicaOptions`], optional): Specify
872887
:class:`.GetAnyReplicaOptions` per key.
873888
return_exceptions(bool, optional): If False, raise an Exception when encountered. If True return the
@@ -876,9 +891,10 @@ class GetAnyReplicaMultiOptions(dict):
876891
@overload
877892
def __init__(
878893
self,
879-
transcoder=None, # type: Transcoder
880-
per_key_options=None, # type: Dict[str, GetAnyReplicaOptions]
881-
return_exceptions=None # type: Optional[bool]
894+
transcoder=None, # type: Optional[Transcoder]
895+
read_preference=None, # type: Optional[ReadPreference]
896+
per_key_options=None, # type: Dict[str, GetAnyReplicaOptions]
897+
return_exceptions=None # type: Optional[bool]
882898
):
883899
pass
884900

@@ -888,7 +904,7 @@ def __init__(self, **kwargs):
888904

889905
@classmethod
890906
def get_valid_keys(cls):
891-
return ['timeout', 'transcoder', 'per_key_options', 'return_exceptions']
907+
return ['timeout', 'transcoder', 'read_preference', 'per_key_options', 'return_exceptions']
892908

893909

894910
class GetMultiOptions(dict):
@@ -1820,7 +1836,8 @@ def arg_mapping(self):
18201836
"report_id": {"report_id": lambda x: str(x)},
18211837
"batch_byte_limit": {"batch_byte_limit": validate_int},
18221838
"batch_item_limit": {"batch_item_limit": validate_int},
1823-
"concurrency": {"concurrency": validate_int}
1839+
"concurrency": {"concurrency": validate_int},
1840+
"read_preference": {"read_preference": lambda r: r.value}
18241841
}
18251842

18261843

couchbase/replica_reads.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from enum import IntEnum
2+
3+
4+
class ReadPreference(IntEnum):
5+
NO_PREFERENCE = 0
6+
SELECTED_SERVER_GROUP = 1

0 commit comments

Comments
 (0)