Skip to content

Commit 7a3d62a

Browse files
committed
PYCBC-1735: Migrate Query Index management away from Wrapper decorators
Changes ------- * Add query index mgmt types, request builder * Add query index mgmt impl for all 3 APIs * Update APIs query index manager classes to use impl and remove wrapper decorators * Remove pytest.skip() from query index mgmt tests Change-Id: I683d8812dcb58766f6bd1bf23c90b918b4bd0beb Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/239336 Reviewed-by: Dimitris Christodoulou <dimitris.christodoulou@couchbase.com> Tested-by: Build Bot <build@couchbase.com>
1 parent b4b17de commit 7a3d62a

10 files changed

Lines changed: 1221 additions & 750 deletions

File tree

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# Copyright 2016-2023. Couchbase, Inc.
2+
# All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License")
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from __future__ import annotations
17+
18+
import asyncio
19+
import time
20+
from typing import (TYPE_CHECKING,
21+
Iterable,
22+
List)
23+
24+
from couchbase.exceptions import (AmbiguousTimeoutException,
25+
QueryIndexNotFoundException,
26+
WatchQueryIndexTimeoutException)
27+
from couchbase.management.logic.query_index_mgmt_req_builder import QueryIndexMgmtRequestBuilder
28+
from couchbase.management.logic.query_index_mgmt_req_types import (BuildDeferredIndexesRequest,
29+
CreateIndexRequest,
30+
DropIndexRequest,
31+
GetAllIndexesRequest,
32+
QueryIndex,
33+
WatchIndexesRequest)
34+
from couchbase.pycbc_core import mgmt_operations, query_index_mgmt_operations
35+
36+
if TYPE_CHECKING:
37+
from acouchbase.logic.client_adapter import AsyncClientAdapter
38+
39+
40+
class AsyncQueryIndexMgmtImpl:
41+
def __init__(self, client_adapter: AsyncClientAdapter) -> None:
42+
self._client_adapter = client_adapter
43+
self._request_builder = QueryIndexMgmtRequestBuilder()
44+
45+
@property
46+
def loop(self) -> asyncio.AbstractEventLoop:
47+
"""**INTERNAL**"""
48+
return self._client_adapter.loop
49+
50+
@property
51+
def request_builder(self) -> QueryIndexMgmtRequestBuilder:
52+
"""**INTERNAL**"""
53+
return self._request_builder
54+
55+
async def create_index(self, req: CreateIndexRequest) -> None:
56+
"""**INTERNAL**"""
57+
await self._client_adapter.execute_mgmt_request(req)
58+
59+
async def create_primary_index(self, req: CreateIndexRequest) -> None:
60+
"""**INTERNAL**"""
61+
await self._client_adapter.execute_mgmt_request(req)
62+
63+
async def drop_index(self, req: DropIndexRequest) -> None:
64+
"""**INTERNAL**"""
65+
await self._client_adapter.execute_mgmt_request(req)
66+
67+
async def drop_primary_index(self, req: DropIndexRequest) -> None:
68+
"""**INTERNAL**"""
69+
await self._client_adapter.execute_mgmt_request(req)
70+
71+
async def get_all_indexes(self, req: GetAllIndexesRequest) -> List[QueryIndex]:
72+
"""**INTERNAL**"""
73+
res = await self._client_adapter.execute_mgmt_request(req)
74+
indexes = []
75+
raw_indexes = res.raw_result.get('indexes', None)
76+
if raw_indexes:
77+
indexes = [QueryIndex.from_server(idx) for idx in raw_indexes]
78+
79+
return indexes
80+
81+
async def build_deferred_indexes(self, req: BuildDeferredIndexesRequest) -> None:
82+
"""**INTERNAL**"""
83+
await self._client_adapter.execute_mgmt_request(req)
84+
85+
async def watch_indexes(self, req: WatchIndexesRequest) -> None:
86+
"""**INTERNAL**"""
87+
current_time = time.monotonic()
88+
# timeout is converted to microsecs via options processing
89+
timeout = req.timeout / 1000 / 1000
90+
deadline = current_time + timeout
91+
delay = 0.1 # seconds
92+
# needs to be int b/c req.timeout expects int (this is what the bindings want)
93+
delay_us = int(delay * 1e6)
94+
95+
get_all_indexes_req = GetAllIndexesRequest(self._request_builder._error_map,
96+
mgmt_operations.QUERY_INDEX.value,
97+
query_index_mgmt_operations.GET_ALL_INDEXES.value,
98+
bucket_name=req.bucket_name,
99+
scope_name=req.scope_name,
100+
collection_name=req.collection_name,
101+
timeout=req.timeout)
102+
103+
while True:
104+
try:
105+
indexes = await self.get_all_indexes(get_all_indexes_req)
106+
except AmbiguousTimeoutException:
107+
pass # go ahead and move on, raise WatchQueryIndexTimeoutException later if needed
108+
109+
all_online = self._check_indexes(req.index_names, indexes)
110+
if all_online:
111+
break
112+
113+
current_time = time.monotonic()
114+
if deadline < (current_time + delay):
115+
raise WatchQueryIndexTimeoutException('Failed to find all indexes online within the alloted time.')
116+
await asyncio.sleep(delay)
117+
get_all_indexes_req.timeout -= delay_us
118+
119+
def _check_indexes(self, index_names: Iterable[str], indexes: Iterable[QueryIndex]):
120+
for idx_name in index_names:
121+
match = next((i for i in indexes if i.name == idx_name), None)
122+
if not match:
123+
raise QueryIndexNotFoundException(f'Cannot find index with name: {idx_name}')
124+
125+
return all(map(lambda i: i.state == 'online', indexes))

0 commit comments

Comments
 (0)