Skip to content

Commit 986e7d6

Browse files
committed
PYCBC-1733: Migrate Bucket management away from Wrapper decorators
Changes ------- * Add bucket mgmt types, request builder * Add bucket mgmt impl for all 3 APIs * Update APIs bucket manager classes to use impl and remove wrapper decorators * Remove pytest.skip() from bucket mgmt tests * Add unit tests to validate how we handle BucketSettings and CreateBucketSettings Change-Id: I2e9ad26b5c570c7b92790edb70c4e33749288941 Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/239325 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Dimitris Christodoulou <dimitris.christodoulou@couchbase.com>
1 parent 852b5e4 commit 986e7d6

12 files changed

Lines changed: 1511 additions & 136 deletions

File tree

acouchbase/management/buckets.py

Lines changed: 60 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,10 @@
2121
Dict,
2222
List)
2323

24-
from acouchbase.management.logic.wrappers import AsyncMgmtWrapper
25-
from couchbase.management.logic import ManagementType
26-
from couchbase.management.logic.buckets_logic import (BucketDescribeResult,
27-
BucketManagerLogic,
28-
BucketSettings,
29-
CreateBucketSettings)
24+
from acouchbase.management.logic.bucket_mgmt_impl import AsyncBucketMgmtImpl
25+
from couchbase.management.logic.bucket_mgmt_types import (BucketDescribeResult,
26+
BucketSettings,
27+
CreateBucketSettings)
3028

3129
if TYPE_CHECKING:
3230
from acouchbase.logic.client_adapter import AsyncClientAdapter
@@ -39,25 +37,15 @@
3937
UpdateBucketOptions)
4038

4139

42-
class BucketManager(BucketManagerLogic):
43-
40+
class BucketManager:
4441
def __init__(self, client_adapter: AsyncClientAdapter) -> None:
45-
super().__init__(client_adapter.connection)
46-
self._loop = client_adapter.loop
42+
self._impl = AsyncBucketMgmtImpl(client_adapter)
4743

48-
@property
49-
def loop(self):
50-
"""
51-
**INTERNAL**
52-
"""
53-
return self._loop
54-
55-
@AsyncMgmtWrapper.inject_callbacks(None, ManagementType.BucketMgmt, BucketManagerLogic._ERROR_MAPPING)
56-
def create_bucket(self,
57-
settings, # type: CreateBucketSettings
58-
*options, # type: CreateBucketOptions
59-
**kwargs # type: Any
60-
) -> Awaitable[None]:
44+
async def create_bucket(self,
45+
settings, # type: CreateBucketSettings
46+
*options, # type: CreateBucketOptions
47+
**kwargs # type: Any
48+
) -> None:
6149
"""
6250
Creates a new bucket.
6351
@@ -68,60 +56,53 @@ def create_bucket(self,
6856
:raises: BucketAlreadyExistsException
6957
:raises: InvalidArgumentsException
7058
"""
71-
super().create_bucket(settings, *options, **kwargs)
72-
73-
@AsyncMgmtWrapper.inject_callbacks(None, ManagementType.BucketMgmt, BucketManagerLogic._ERROR_MAPPING)
74-
def update_bucket(self,
75-
settings, # type: BucketSettings
76-
*options, # type: UpdateBucketOptions
77-
**kwargs # type: Dict[str, Any]
78-
) -> Awaitable[None]:
79-
80-
super().update_bucket(settings, *options, **kwargs)
81-
82-
@AsyncMgmtWrapper.inject_callbacks(None, ManagementType.BucketMgmt, BucketManagerLogic._ERROR_MAPPING)
83-
def drop_bucket(self,
84-
bucket_name, # type: str
85-
*options, # type: DropBucketOptions
86-
**kwargs # type: Dict[str, Any]
87-
) -> Awaitable[None]:
88-
89-
super().drop_bucket(bucket_name, *options, **kwargs)
90-
91-
@AsyncMgmtWrapper.inject_callbacks(BucketSettings, ManagementType.BucketMgmt, BucketManagerLogic._ERROR_MAPPING)
92-
def get_bucket(self,
93-
bucket_name, # type: str
94-
*options, # type: GetBucketOptions
95-
**kwargs # type: Dict[str, Any]
96-
) -> Awaitable[BucketSettings]:
97-
98-
super().get_bucket(bucket_name, *options, **kwargs)
99-
100-
@AsyncMgmtWrapper.inject_callbacks(BucketSettings, ManagementType.BucketMgmt, BucketManagerLogic._ERROR_MAPPING)
101-
def get_all_buckets(self,
102-
*options, # type: GetAllBucketOptions
103-
**kwargs # type: Dict[str, Any]
104-
) -> Awaitable[List[BucketSettings]]:
105-
106-
super().get_all_buckets(*options, **kwargs)
107-
108-
@AsyncMgmtWrapper.inject_callbacks(None, ManagementType.BucketMgmt, BucketManagerLogic._ERROR_MAPPING)
109-
def flush_bucket(self,
110-
bucket_name, # type: str
111-
*options, # type: FlushBucketOptions
112-
**kwargs # type: Dict[str, Any]
113-
) -> Awaitable[None]:
114-
115-
super().flush_bucket(bucket_name, *options, **kwargs)
116-
117-
@AsyncMgmtWrapper.inject_callbacks(BucketDescribeResult,
118-
ManagementType.BucketMgmt,
119-
BucketManagerLogic._ERROR_MAPPING)
120-
def bucket_describe(self,
121-
bucket_name, # type: str
122-
*options, # type: BucketDescribeOptions
123-
**kwargs # type: Dict[str, Any]
124-
) -> BucketDescribeResult:
59+
req = self._impl.request_builder.build_create_bucket_request(settings, *options, **kwargs)
60+
await self._impl.create_bucket(req)
61+
62+
async def update_bucket(self,
63+
settings, # type: BucketSettings
64+
*options, # type: UpdateBucketOptions
65+
**kwargs # type: Dict[str, Any]
66+
) -> None:
67+
req = self._impl.request_builder.build_update_bucket_request(settings, *options, **kwargs)
68+
await self._impl.update_bucket(req)
69+
70+
async def drop_bucket(self,
71+
bucket_name, # type: str
72+
*options, # type: DropBucketOptions
73+
**kwargs # type: Dict[str, Any]
74+
) -> None:
75+
req = self._impl.request_builder.build_drop_bucket_request(bucket_name, *options, **kwargs)
76+
await self._impl.drop_bucket(req)
77+
78+
async def get_bucket(self,
79+
bucket_name, # type: str
80+
*options, # type: GetBucketOptions
81+
**kwargs # type: Dict[str, Any]
82+
) -> Awaitable[BucketSettings]:
83+
req = self._impl.request_builder.build_get_bucket_request(bucket_name, *options, **kwargs)
84+
return await self._impl.get_bucket(req)
85+
86+
async def get_all_buckets(self,
87+
*options, # type: GetAllBucketOptions
88+
**kwargs # type: Dict[str, Any]
89+
) -> Awaitable[List[BucketSettings]]:
90+
req = self._impl.request_builder.build_get_all_buckets_request(*options, **kwargs)
91+
return await self._impl.get_all_buckets(req)
92+
93+
async def flush_bucket(self,
94+
bucket_name, # type: str
95+
*options, # type: FlushBucketOptions
96+
**kwargs # type: Dict[str, Any]
97+
) -> Awaitable[None]:
98+
req = self._impl.request_builder.build_flush_bucket_request(bucket_name, *options, **kwargs)
99+
await self._impl.flush_bucket(req)
100+
101+
async def bucket_describe(self,
102+
bucket_name, # type: str
103+
*options, # type: BucketDescribeOptions
104+
**kwargs # type: Dict[str, Any]
105+
) -> BucketDescribeResult:
125106
"""Provides details on provided the bucket.
126107
127108
Args:
@@ -137,4 +118,5 @@ def bucket_describe(self,
137118
Raises:
138119
:class:`~couchbase.exceptions.BucketDoesNotExistException`: If the bucket does not exist.
139120
"""
140-
super().bucket_describe(bucket_name, *options, **kwargs)
121+
req = self._impl.request_builder.build_bucket_describe_request(bucket_name, *options, **kwargs)
122+
return await self._impl.bucket_describe(req)
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Copyright 2016-2023. Couchbase, Inc.
2+
# All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License")
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from __future__ import annotations
17+
18+
from asyncio import AbstractEventLoop
19+
from typing import (TYPE_CHECKING,
20+
List,
21+
Optional)
22+
23+
from couchbase.management.logic.bucket_mgmt_req_builder import BucketMgmtRequestBuilder
24+
from couchbase.management.logic.bucket_mgmt_types import BucketDescribeResult, BucketSettings
25+
26+
if TYPE_CHECKING:
27+
from acouchbase.logic.client_adapter import AsyncClientAdapter
28+
from couchbase.management.logic.bucket_mgmt_types import (BucketDescribeRequest,
29+
CreateBucketRequest,
30+
DropBucketRequest,
31+
FlushBucketRequest,
32+
GetAllBucketsRequest,
33+
GetBucketRequest,
34+
UpdateBucketRequest)
35+
36+
37+
class AsyncBucketMgmtImpl:
38+
def __init__(self, client_adapter: AsyncClientAdapter) -> None:
39+
self._client_adapter = client_adapter
40+
self._request_builder = BucketMgmtRequestBuilder()
41+
42+
@property
43+
def loop(self) -> AbstractEventLoop:
44+
"""**INTERNAL**"""
45+
return self._client_adapter.loop
46+
47+
@property
48+
def request_builder(self) -> BucketMgmtRequestBuilder:
49+
"""**INTERNAL**"""
50+
return self._request_builder
51+
52+
async def bucket_describe(self, req: BucketDescribeRequest) -> Optional[BucketDescribeResult]:
53+
"""**INTERNAL**"""
54+
res = await self._client_adapter.execute_mgmt_request(req)
55+
bucket_info = res.raw_result.get('bucket_info', None)
56+
if bucket_info:
57+
return BucketDescribeResult(**bucket_info)
58+
return None
59+
60+
async def create_bucket(self, req: CreateBucketRequest) -> None:
61+
"""**INTERNAL**"""
62+
await self._client_adapter.execute_mgmt_request(req)
63+
64+
async def drop_bucket(self, req: DropBucketRequest) -> None:
65+
"""**INTERNAL**"""
66+
await self._client_adapter.execute_mgmt_request(req)
67+
68+
async def flush_bucket(self, req: FlushBucketRequest) -> None:
69+
"""**INTERNAL**"""
70+
await self._client_adapter.execute_mgmt_request(req)
71+
72+
async def get_all_buckets(self, req: GetAllBucketsRequest) -> List[BucketSettings]:
73+
"""**INTERNAL**"""
74+
res = await self._client_adapter.execute_mgmt_request(req)
75+
raw_buckets = res.raw_result.get('buckets', None)
76+
buckets = []
77+
if raw_buckets:
78+
for b in raw_buckets:
79+
bucket_settings = BucketSettings.bucket_settings_from_server(b)
80+
buckets.append(bucket_settings)
81+
82+
return buckets
83+
84+
async def get_bucket(self, req: GetBucketRequest) -> BucketSettings:
85+
"""**INTERNAL**"""
86+
res = await self._client_adapter.execute_mgmt_request(req)
87+
raw_settings = res.raw_result.get('bucket_settings', None)
88+
bucket_settings = None
89+
if raw_settings:
90+
bucket_settings = BucketSettings.bucket_settings_from_server(raw_settings)
91+
92+
return bucket_settings
93+
94+
async def update_bucket(self, req: UpdateBucketRequest) -> None:
95+
"""**INTERNAL**"""
96+
await self._client_adapter.execute_mgmt_request(req)

acouchbase/tests/bucketmgmt_t.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
from ._test_utils import TestEnvironment
3636

3737

38-
@pytest.mark.skip(reason='Skip until PYCBC-1733')
3938
@pytest.mark.flaky(reruns=5)
4039
class BucketManagementTests:
4140

couchbase/management/buckets.py

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,16 @@
2020
Dict,
2121
List)
2222

23-
from couchbase.management.logic.buckets_logic import BucketType # noqa: F401
24-
from couchbase.management.logic.buckets_logic import CompressionMode # noqa: F401
25-
from couchbase.management.logic.buckets_logic import ConflictResolutionType # noqa: F401
26-
from couchbase.management.logic.buckets_logic import EjectionMethod # noqa: F401
27-
from couchbase.management.logic.buckets_logic import EvictionPolicyType # noqa: F401
28-
from couchbase.management.logic.buckets_logic import StorageBackend # noqa: F401
29-
from couchbase.management.logic.buckets_logic import (BucketDescribeResult,
30-
BucketManagerLogic,
31-
BucketSettings,
32-
CreateBucketSettings)
33-
from couchbase.management.logic.wrappers import BlockingMgmtWrapper, ManagementType
23+
from couchbase.management.logic.bucket_mgmt_impl import BucketMgmtImpl
24+
from couchbase.management.logic.bucket_mgmt_types import BucketType # noqa: F401
25+
from couchbase.management.logic.bucket_mgmt_types import CompressionMode # noqa: F401
26+
from couchbase.management.logic.bucket_mgmt_types import ConflictResolutionType # noqa: F401
27+
from couchbase.management.logic.bucket_mgmt_types import EjectionMethod # noqa: F401
28+
from couchbase.management.logic.bucket_mgmt_types import EvictionPolicyType # noqa: F401
29+
from couchbase.management.logic.bucket_mgmt_types import StorageBackend # noqa: F401
30+
from couchbase.management.logic.bucket_mgmt_types import (BucketDescribeResult,
31+
BucketSettings,
32+
CreateBucketSettings)
3433

3534
# @TODO: lets deprecate import of options from couchbase.management.buckets
3635
from couchbase.management.options import (BucketDescribeOptions,
@@ -45,11 +44,11 @@
4544
from couchbase.logic.client_adapter import ClientAdapter
4645

4746

48-
class BucketManager(BucketManagerLogic):
47+
class BucketManager:
48+
4949
def __init__(self, client_adapter: ClientAdapter) -> None:
50-
super().__init__(client_adapter.connection)
50+
self._impl = BucketMgmtImpl(client_adapter)
5151

52-
@BlockingMgmtWrapper.block(None, ManagementType.BucketMgmt, BucketManagerLogic._ERROR_MAPPING)
5352
def create_bucket(self,
5453
settings, # type: CreateBucketSettings
5554
*options, # type: CreateBucketOptions
@@ -69,9 +68,9 @@ def create_bucket(self,
6968
:class:`~couchbase.exceptions.InvalidArgumentsException`: If an invalid type or value is provided for the
7069
settings argument.
7170
"""
72-
return super().create_bucket(settings, *options, **kwargs)
71+
req = self._impl.request_builder.build_create_bucket_request(settings, *options, **kwargs)
72+
self._impl.create_bucket(req)
7373

74-
@BlockingMgmtWrapper.block(None, ManagementType.BucketMgmt, BucketManagerLogic._ERROR_MAPPING)
7574
def update_bucket(self,
7675
settings, # type: BucketSettings
7776
*options, # type: UpdateBucketOptions
@@ -90,9 +89,9 @@ def update_bucket(self,
9089
:class:`~couchbase.exceptions.InvalidArgumentsException`: If an invalid type or value is provided for the
9190
settings argument.
9291
"""
93-
return super().update_bucket(settings, *options, **kwargs)
92+
req = self._impl.request_builder.build_update_bucket_request(settings, *options, **kwargs)
93+
self._impl.update_bucket(req)
9494

95-
@BlockingMgmtWrapper.block(None, ManagementType.BucketMgmt, BucketManagerLogic._ERROR_MAPPING)
9695
def drop_bucket(self,
9796
bucket_name, # type: str
9897
*options, # type: DropBucketOptions
@@ -110,9 +109,9 @@ def drop_bucket(self,
110109
Raises:
111110
:class:`~couchbase.exceptions.BucketDoesNotExistException`: If the bucket does not exist.
112111
"""
113-
return super().drop_bucket(bucket_name, *options, **kwargs)
112+
req = self._impl.request_builder.build_drop_bucket_request(bucket_name, *options, **kwargs)
113+
self._impl.drop_bucket(req)
114114

115-
@BlockingMgmtWrapper.block(BucketSettings, ManagementType.BucketMgmt, BucketManagerLogic._ERROR_MAPPING)
116115
def get_bucket(self,
117116
bucket_name, # type: str
118117
*options, # type: GetBucketOptions
@@ -133,9 +132,9 @@ def get_bucket(self,
133132
Raises:
134133
:class:`~couchbase.exceptions.BucketDoesNotExistException`: If the bucket does not exist.
135134
"""
136-
return super().get_bucket(bucket_name, *options, **kwargs)
135+
req = self._impl.request_builder.build_get_bucket_request(bucket_name, *options, **kwargs)
136+
return self._impl.get_bucket(req)
137137

138-
@BlockingMgmtWrapper.block(BucketSettings, ManagementType.BucketMgmt, BucketManagerLogic._ERROR_MAPPING)
139138
def get_all_buckets(self,
140139
*options, # type: GetAllBucketOptions
141140
**kwargs # type: Dict[str, Any]
@@ -151,9 +150,9 @@ def get_all_buckets(self,
151150
Returns:
152151
List[:class:`.BucketSettings`]: A list of existing buckets in the cluster.
153152
"""
154-
return super().get_all_buckets(*options, **kwargs)
153+
req = self._impl.request_builder.build_get_all_buckets_request(*options, **kwargs)
154+
return self._impl.get_all_buckets(req)
155155

156-
@BlockingMgmtWrapper.block(None, ManagementType.BucketMgmt, BucketManagerLogic._ERROR_MAPPING)
157156
def flush_bucket(self,
158157
bucket_name, # type: str
159158
*options, # type: FlushBucketOptions
@@ -173,9 +172,9 @@ def flush_bucket(self,
173172
:class:`~couchbase.exceptions.BucketNotFlushableException`: If the bucket's settings have
174173
flushing disabled.
175174
"""
176-
return super().flush_bucket(bucket_name, *options, **kwargs)
175+
req = self._impl.request_builder.build_flush_bucket_request(bucket_name, *options, **kwargs)
176+
self._impl.flush_bucket(req)
177177

178-
@BlockingMgmtWrapper.block(BucketDescribeResult, ManagementType.BucketMgmt, BucketManagerLogic._ERROR_MAPPING)
179178
def bucket_describe(self,
180179
bucket_name, # type: str
181180
*options, # type: BucketDescribeOptions
@@ -196,4 +195,5 @@ def bucket_describe(self,
196195
Raises:
197196
:class:`~couchbase.exceptions.BucketDoesNotExistException`: If the bucket does not exist.
198197
"""
199-
return super().bucket_describe(bucket_name, *options, **kwargs)
198+
req = self._impl.request_builder.build_bucket_describe_request(bucket_name, *options, **kwargs)
199+
return self._impl.bucket_describe(req)

0 commit comments

Comments
 (0)