Skip to content

Commit e0615f2

Browse files
committed
PYCBC-1730: Migrate Cluster & Bucket objects away from Wrapper decorators
Motivation ========== Two features are needed to be added to the Python client: observability and additional protocols. While the observability feature is coming soon and the additional protocol later, both changes requires touching the entire surface area of the client. The changes in this migration are aimed at preventing duplicate work by reorganizing the multiple APIs supported by the Python client to provide more centralized logic and make the addition of future protocols straight-forward. Modification ============ * Add client_adapter for sync and async operations (async adapter used by acouchbase and txcouchbase APIs) * Add cluster and bucket impl classes for all APIs (txcouchbase utilizes the acouchbase impl logic and converts futures/coroutines to deferreds) * Add BindingMap which maps all operations to the corresponding method in the bindings * Add cluster and bucket request builders (this is where the options processing logic resides) * Update transactions __init__ params to adjust to new client_adapter * Move cluster __init__ logic to cluster_settings.py module and create ClusterSettings object to house the various settings related to initialization (timeouts, cluster opts, tracing, transactions config, etc.) * start to move transforms and validation away from _utils and into specific modules (will be used more extensively once options processing logic is updated) * Add overload_registry module (will take place of the overloading used in certain mgmt operations) Change-Id: I57a737c289bea6ec530f2625aaa828ce9d285c5c Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/239235 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Dimitris Christodoulou <dimitris.christodoulou@couchbase.com>
1 parent f614e46 commit e0615f2

55 files changed

Lines changed: 3284 additions & 921 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

acouchbase/bucket.py

Lines changed: 26 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,25 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
from __future__ import annotations
17+
1618
from typing import (TYPE_CHECKING,
1719
Any,
18-
Awaitable,
1920
Dict)
2021

2122
from acouchbase.collection import Collection
22-
from acouchbase.logic import AsyncWrapper
23+
from acouchbase.logic.bucket_impl import AsyncBucketImpl
2324
from acouchbase.management.collections import CollectionManager
2425
from acouchbase.management.views import ViewIndexManager
2526
from acouchbase.scope import Scope
26-
from acouchbase.views import AsyncViewRequest, ViewQuery
27-
from couchbase.logic.bucket import BucketLogic
2827
from couchbase.result import PingResult, ViewResult
2928

3029
if TYPE_CHECKING:
31-
from acouchbase.cluster import Cluster
30+
from acouchbase.cluster import AsyncCluster
3231
from couchbase.options import PingOptions, ViewOptions
3332

3433

35-
class AsyncBucket(BucketLogic):
34+
class AsyncBucket:
3635
"""Create a Couchbase Bucket instance.
3736
3837
Exposes the operations which are available to be performed against a bucket. Namely the ability to
@@ -47,44 +46,14 @@ class AsyncBucket(BucketLogic):
4746
4847
"""
4948

50-
def __init__(self, cluster, # type: Cluster
51-
bucket_name # type: str
52-
):
53-
super().__init__(cluster, bucket_name)
54-
self._close_ftr = None
55-
self._connect_ftr = self._open_bucket()
49+
def __init__(self, cluster: AsyncCluster, bucket_name: str) -> None:
50+
self._impl = AsyncBucketImpl(bucket_name, cluster)
5651

5752
@property
58-
def loop(self):
59-
"""
60-
**INTERNAL**
61-
"""
62-
return self._cluster.loop
63-
64-
# def _connect_bucket(self):
65-
# """
66-
# **INTERNAL**
67-
# Used w/in wrappers if a collection op is called when not connected
68-
# """
69-
# if self._connect_ftr is not None:
70-
# return
71-
72-
# if not self._cluster.connected:
73-
# self._connect_ftr = self.loop.create_future()
74-
# ft = self._cluster.on_connect()
75-
# ft.add_done_callback(partial(AsyncWrapper.chain_connect_callbacks, self))
76-
# else:
77-
# self._connect_ftr = super()._open_or_close_bucket(open_bucket=True)
78-
79-
@AsyncWrapper.inject_bucket_open_callbacks()
80-
def _open_bucket(self, **kwargs) -> Awaitable:
81-
super()._open_or_close_bucket(open_bucket=True, **kwargs)
82-
83-
@AsyncWrapper.inject_close_callbacks()
84-
def _close_bucket(self, **kwargs):
85-
super()._open_or_close_bucket(open_bucket=False, **kwargs)
86-
87-
def on_connect(self) -> Awaitable:
53+
def name(self) -> str:
54+
return self._impl.bucket_name
55+
56+
async def on_connect(self) -> None:
8857
"""Returns an awaitable future that indicates connecting to the Couchbase bucket has completed.
8958
9059
Returns:
@@ -94,11 +63,7 @@ def on_connect(self) -> Awaitable:
9463
Raises:
9564
:class:`~couchbase.exceptions.UnAmbiguousTimeoutException`: If an error occured while trying to connect.
9665
"""
97-
if not (self._connect_ftr or self.connected):
98-
self._connect_ftr = self._open_bucket()
99-
self._close_ftr = None
100-
101-
return self._connect_ftr
66+
await self._impl.wait_until_bucket_connected()
10267

10368
async def close(self) -> None:
10469
"""Shuts down this bucket instance. Cleaning up all resources associated with it.
@@ -109,15 +74,9 @@ async def close(self) -> None:
10974
is necessary and in those types of applications, this method might be beneficial.
11075
11176
"""
112-
if self.connected and not self._close_ftr:
113-
self._close_ftr = self._close_bucket()
114-
self._connect_ftr = None
115-
116-
await self._close_ftr
117-
super()._destroy_connection()
77+
await self._impl.close_bucket()
11878

119-
def default_scope(self
120-
) -> Scope:
79+
def default_scope(self) -> Scope:
12180
"""Creates a :class:`~acouchbase.scope.Scope` instance of the default scope.
12281
12382
Returns:
@@ -126,8 +85,7 @@ def default_scope(self
12685
"""
12786
return self.scope(Scope.default_name())
12887

129-
def scope(self, name # type: str
130-
) -> Scope:
88+
def scope(self, name: str) -> Scope:
13189
"""Creates a :class:`~acouchbase.scope.Scope` instance of the specified scope.
13290
13391
Args:
@@ -139,7 +97,7 @@ def scope(self, name # type: str
13997
"""
14098
return Scope(self, name)
14199

142-
def collection(self, collection_name):
100+
def collection(self, collection_name: str) -> Collection:
143101
"""Creates a :class:`~acouchbase.collection.Collection` instance of the specified collection.
144102
145103
Args:
@@ -161,11 +119,10 @@ def default_collection(self):
161119
scope = self.default_scope()
162120
return scope.collection(Collection.default_name())
163121

164-
@AsyncWrapper.inject_cluster_callbacks(PingResult)
165-
def ping(self,
166-
*opts, # type: PingOptions
167-
**kwargs # type: Dict[str, Any]
168-
) -> Awaitable[PingResult]:
122+
async def ping(self,
123+
*opts, # type: PingOptions
124+
**kwargs # type: Dict[str, Any]
125+
) -> PingResult:
169126
"""Performs a ping operation against the bucket.
170127
171128
The ping operation pings the services which are specified
@@ -180,7 +137,8 @@ def ping(self,
180137
operations which were performed.
181138
182139
"""
183-
return super().ping(*opts, **kwargs)
140+
req = self._impl.request_builder.build_ping_request(*opts, **kwargs)
141+
return await self._impl.ping(req)
184142

185143
def view_query(self,
186144
design_doc, # type: str
@@ -231,17 +189,8 @@ def view_query(self,
231189
print(f'Found row: {row}')
232190
233191
"""
234-
request_args = dict(default_serialize=self.default_serializer,
235-
streaming_timeout=self.streaming_timeouts.get('view_timeout', None))
236-
num_workers = kwargs.pop('num_workers', None)
237-
if num_workers:
238-
request_args['num_workers'] = num_workers
239-
240-
query = ViewQuery.create_view_query_object(self.name, design_doc, view_name, *view_options, **kwargs)
241-
return ViewResult(AsyncViewRequest.generate_view_request(self.connection,
242-
self.loop,
243-
query.as_encodable(),
244-
**request_args))
192+
req = self._impl.request_builder.build_view_query_request(design_doc, view_name, *view_options, **kwargs)
193+
return self._impl.view_query(req)
245194

246195
def collections(self) -> CollectionManager:
247196
"""
@@ -251,7 +200,7 @@ def collections(self) -> CollectionManager:
251200
Returns:
252201
:class:`~acouchbase.management.collections.CollectionManager`: A :class:`~couchbase.management.collections.CollectionManager` instance.
253202
""" # noqa: E501
254-
return CollectionManager(self.connection, self.loop, self.name)
203+
return CollectionManager(self._impl.connection, self._impl.loop, self.name)
255204

256205
def view_indexes(self) -> ViewIndexManager:
257206
"""
@@ -267,7 +216,7 @@ def view_indexes(self) -> ViewIndexManager:
267216
Returns:
268217
:class:`~acouchbase.management.views.ViewIndexManager`: A :class:`~couchbase.management.views.ViewIndexManager` instance.
269218
""" # noqa: E501
270-
return ViewIndexManager(self.connection, self.loop, self.name)
219+
return ViewIndexManager(self._impl.connection, self._impl.loop, self.name)
271220

272221

273222
Bucket = AsyncBucket

0 commit comments

Comments
 (0)