Skip to content

Commit c995cb2

Browse files
committed
PYCBC-1685: Remove timeout logic when waiting for C++ core HTTP response
Motivation ========== Since the SDK uses a timeout (the same it passes down to the C++ core) when waiting for streaming results from the C++ core, it is possible for it to raise a `StopIteration` incorrectly. Instead the SDK should continue to wait for the C++ core to return a response. Modification ============ * Remove logic to raise a `StopIteration` exception when waiting for a response for streaming operations. Instead a message is logged (at the DEBUG level) to provide potentially useful information. * Update tests to confirm changes. Change-Id: Ia4ea33ebe9dbf3560874a5f9317b476a5ec5315a Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/227431 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Dimitris Christodoulou <dimitris.christodoulou@couchbase.com>
1 parent 8a65250 commit c995cb2

6 files changed

Lines changed: 83 additions & 52 deletions

File tree

acouchbase/tests/analytics_t.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ class AnalyticsTestSuite:
172172
TEST_MANIFEST = [
173173
'test_analytics_metadata',
174174
'test_analytics_with_metrics',
175+
'test_query_large_result_set',
175176
'test_query_named_parameters',
176177
'test_query_named_parameters_no_options',
177178
'test_query_named_parameters_override',
@@ -229,6 +230,21 @@ async def test_analytics_with_metrics(self, cb_env):
229230
assert isinstance(metrics.processed_objects(), UnsignedInt64)
230231
assert metrics.error_count() == UnsignedInt64(0)
231232

233+
@pytest.mark.asyncio
234+
async def test_query_large_result_set(self, cb_env):
235+
# Prior to PYCBC-1685, this would raise a StopIteration b/c the timeout was
236+
# reached on the Python side prior to the C++ core returning the result set.
237+
# It is difficult to determine the timeout value in the Jenkins environment,
238+
# so allow an AmbiguousTimeoutException.
239+
count = 100000
240+
statement = f'SELECT {{"x1": 1, "x2": 2, "x3": 3}} FROM range(1, {count}) r;'
241+
try:
242+
result = cb_env.cluster.analytics_query(statement, timeout=timedelta(seconds=2))
243+
row_count = [1 async for _ in result.rows()]
244+
assert len(row_count) == count
245+
except AmbiguousTimeoutException:
246+
pass
247+
232248
@pytest.mark.asyncio
233249
async def test_query_named_parameters(self, cb_env):
234250
result = cb_env.cluster.analytics_query(f'SELECT * FROM `{cb_env.DATASET_NAME}` WHERE `type` = $atype LIMIT 1',
@@ -291,19 +307,17 @@ async def test_query_timeout(self, cb_env):
291307
username, pw = cb_env.config.get_username_and_pw()
292308
auth = PasswordAuthenticator(username, pw)
293309
# Prior to PYCBC-1521, this test would fail as each request would override the cluster level analytics_timeout.
294-
# If a timeout was not provided in the request, the default 75s timeout would be used. PYCBC-1521 corrects
295-
# this behavior so this test will pass as we are essentially forcing an AmbiguousTimeoutException because
296-
# we are setting the cluster level analytics_timeout such a small value.
297-
timeout_opts = ClusterTimeoutOptions(analytics_timeout=timedelta(milliseconds=1))
310+
# If a timeout was not provided in the request, the default 75s timeout would be used.
311+
timeout_opts = ClusterTimeoutOptions(analytics_timeout=timedelta(seconds=1))
298312
cluster = await Cluster.connect(f'{conn_string}', ClusterOptions(auth, timeout_options=timeout_opts))
299313
# don't need to do this except for older server versions
300314
_ = cluster.bucket(f'{cb_env.bucket.name}')
301-
q_str = f'SELECT * FROM `{cb_env.DATASET_NAME}` LIMIT 1;'
315+
q_str = 'SELECT sleep("some value", 1500) AS some_field;'
302316
with pytest.raises(AmbiguousTimeoutException):
303317
res = cluster.analytics_query(q_str)
304318
[r async for r in res.rows()]
305319
# if we override the timeout w/in the request the query should succeed.
306-
res = cluster.analytics_query(q_str, timeout=timedelta(seconds=10))
320+
res = cluster.analytics_query(q_str, timeout=timedelta(seconds=2))
307321
rows = [r async for r in res.rows()]
308322
assert len(rows) > 0
309323

acouchbase/tests/query_t.py

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -205,19 +205,6 @@ class QueryTestSuite:
205205
'test_simple_query_without_options_with_kwargs_positional_params',
206206
]
207207

208-
@pytest_asyncio.fixture(name='setup_udf')
209-
async def setup_teardown_udf(self, cb_env):
210-
EnvironmentFeatures.check_if_feature_supported('query_user_defined_functions',
211-
cb_env.server_version_short,
212-
cb_env.mock_server_type)
213-
await AsyncTestEnvironment.try_n_times(3,
214-
1,
215-
cb_env.load_udf)
216-
yield
217-
await AsyncTestEnvironment.try_n_times(3,
218-
1,
219-
cb_env.drop_udf)
220-
221208
@pytest.fixture(scope='class')
222209
def check_preserve_expiry_supported(self, cb_env):
223210
EnvironmentFeatures.check_if_feature_supported('preserve_expiry',
@@ -243,11 +230,15 @@ async def test_mixed_positional_parameters(self, cb_env):
243230
QueryOptions(positional_parameters=['xgfflq']), f'{cb_env.get_batch_id()}')
244231
await cb_env.assert_rows(result, 1)
245232

246-
@pytest.mark.usefixtures('setup_udf')
247233
@pytest.mark.asyncio
248234
async def test_non_blocking(self, cb_env):
235+
249236
async def run_query(cluster, idx):
250-
result = cluster.query("EXECUTE FUNCTION loop(1000000000)")
237+
slow_query = ['SELECT COUNT (1) AS c FROM',
238+
'ARRAY_RANGE(0,100) AS d1,'
239+
'ARRAY_RANGE(0,100) AS d2,'
240+
'ARRAY_RANGE(0,100) AS d3']
241+
result = cluster.query(' '.join(slow_query))
251242
rows = []
252243
async for r in result:
253244
rows.append(r)
@@ -363,25 +354,29 @@ async def test_query_ryow(self, cb_env):
363354
@pytest.mark.flaky(reruns=5, reruns_delay=1)
364355
@pytest.mark.asyncio
365356
async def test_query_timeout(self, cb_env):
357+
if cb_env.server_version_short < 7.1:
358+
pytest.skip("Query used in test only available on server versions >= 7.1")
366359
from acouchbase.cluster import Cluster
367360
from couchbase.auth import PasswordAuthenticator
368361
from couchbase.options import ClusterOptions, ClusterTimeoutOptions
369362
conn_string = cb_env.config.get_connection_string()
370363
username, pw = cb_env.config.get_username_and_pw()
371364
auth = PasswordAuthenticator(username, pw)
372365
# Prior to PYCBC-1521, this test would fail as each request would override the cluster level query_timeout.
373-
# If a timeout was not provided in the request, the default 75s timeout would be used. PYCBC-1521 corrects
374-
# this behavior so this test will pass as we are essentially forcing an AmbiguousTimeoutException because
375-
# we are setting the cluster level query_timeout such a small value.
376-
timeout_opts = ClusterTimeoutOptions(query_timeout=timedelta(milliseconds=1))
366+
# If a timeout was not provided in the request, the default 75s timeout would be used.
367+
timeout_opts = ClusterTimeoutOptions(query_timeout=timedelta(seconds=1.5))
377368
cluster = await Cluster.connect(f'{conn_string}', ClusterOptions(auth, timeout_options=timeout_opts))
378369
# don't need to do this except for older server versions
379370
_ = cluster.bucket(f'{cb_env.bucket.name}')
380-
q_str = f'SELECT * FROM `{cb_env.bucket.name}` LIMIT 10;'
371+
slow_query = ' '.join(['SELECT COUNT (1) AS c FROM',
372+
'ARRAY_RANGE(0,110) AS d1,'
373+
'ARRAY_RANGE(0,110) AS d2,'
374+
'ARRAY_RANGE(0,110) AS d3'])
381375
with pytest.raises(AmbiguousTimeoutException):
382-
await cluster.query(q_str).execute()
376+
await cluster.query(slow_query).execute()
383377
# If we override the timeout w/in the request the query should succeed.
384-
rows = await cluster.query(q_str, timeout=timedelta(seconds=10)).execute()
378+
# NOTE: a timeout of < 10s is most likely acceptable, but for the Jenkins environment we give plenty of room.
379+
rows = await cluster.query(slow_query, timeout=timedelta(seconds=30)).execute()
385380
assert len(rows) > 0
386381

387382
@pytest.mark.asyncio

couchbase/tests/analytics_t.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ class AnalyticsTestSuite:
186186
'test_analytics_metadata',
187187
'test_analytics_query_in_thread',
188188
'test_analytics_with_metrics',
189+
'test_query_large_result_set',
189190
'test_query_named_parameters',
190191
'test_query_named_parameters_no_options',
191192
'test_query_named_parameters_override',
@@ -264,6 +265,20 @@ def test_analytics_with_metrics(self, cb_env):
264265
assert isinstance(metrics.processed_objects(), UnsignedInt64)
265266
assert metrics.error_count() == UnsignedInt64(0)
266267

268+
def test_query_large_result_set(self, cb_env):
269+
# Prior to PYCBC-1685, this would raise a StopIteration b/c the timeout was
270+
# reached on the Python side prior to the C++ core returning the result set.
271+
# It is difficult to determine the timeout value in the Jenkins environment,
272+
# so allow an AmbiguousTimeoutException.
273+
count = 100000
274+
statement = f'SELECT {{"x1": 1, "x2": 2, "x3": 3}} FROM range(1, {count}) r;'
275+
try:
276+
result = cb_env.cluster.analytics_query(statement, timeout=timedelta(seconds=2))
277+
row_count = [1 for _ in result.rows()]
278+
assert len(row_count) == count
279+
except AmbiguousTimeoutException:
280+
pass
281+
267282
def test_query_named_parameters(self, cb_env):
268283
result = cb_env.cluster.analytics_query(f'SELECT * FROM `{cb_env.DATASET_NAME}` WHERE `type` = $atype LIMIT 1',
269284
AnalyticsOptions(named_parameters={'atype': 'vehicle'}))
@@ -318,20 +333,18 @@ def test_query_timeout(self, cb_env):
318333
username, pw = cb_env.config.get_username_and_pw()
319334
auth = PasswordAuthenticator(username, pw)
320335
# Prior to PYCBC-1521, this test would fail as each request would override the cluster level analytics_timeout.
321-
# If a timeout was not provided in the request, the default 75s timeout would be used. PYCBC-1521 corrects
322-
# this behavior so this test will pass as we are essentially forcing an AmbiguousTimeoutException because
323-
# we are setting the cluster level analytics_timeout such a small value.
324-
timeout_opts = ClusterTimeoutOptions(analytics_timeout=timedelta(milliseconds=1))
336+
# If a timeout was not provided in the request, the default 75s timeout would be used.
337+
timeout_opts = ClusterTimeoutOptions(analytics_timeout=timedelta(seconds=1))
325338
cluster = Cluster.connect(f'{conn_string}', ClusterOptions(auth, timeout_options=timeout_opts))
326339
# don't need to do this except for older server versions
327340
_ = cluster.bucket(f'{cb_env.bucket.name}')
328-
q_str = f'SELECT * FROM `{cb_env.DATASET_NAME}` LIMIT 1;'
341+
q_str = 'SELECT sleep("some value", 1500) AS some_field;'
329342
with pytest.raises(AmbiguousTimeoutException):
330343
res = cluster.analytics_query(q_str)
331344
[r for r in res.rows()]
332345

333346
# if we override the timeout w/in the request the query should succeed.
334-
res = cluster.analytics_query(q_str, timeout=timedelta(seconds=10))
347+
res = cluster.analytics_query(q_str, timeout=timedelta(seconds=2))
335348
rows = [r for r in res.rows()]
336349
assert len(rows) > 0
337350

couchbase/tests/query_t.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -336,26 +336,30 @@ def test_query_raw_options(self, cb_env):
336336
# creating a new connection, allow retries
337337
@pytest.mark.flaky(reruns=5, reruns_delay=1)
338338
def test_query_timeout(self, cb_env):
339+
if cb_env.server_version_short < 7.1:
340+
pytest.skip("Query used in test only available on server versions >= 7.1")
339341
from couchbase.auth import PasswordAuthenticator
340342
from couchbase.cluster import Cluster
341343
from couchbase.options import ClusterOptions, ClusterTimeoutOptions
342344
conn_string = cb_env.config.get_connection_string()
343345
username, pw = cb_env.config.get_username_and_pw()
344346
auth = PasswordAuthenticator(username, pw)
345347
# Prior to PYCBC-1521, this test would fail as each request would override the cluster level query_timeout.
346-
# If a timeout was not provided in the request, the default 75s timeout would be used. PYCBC-1521 corrects
347-
# this behavior so this test will pass as we are essentially forcing an AmbiguousTimeoutException because
348-
# we are setting the cluster level query_timeout such a small value.
349-
timeout_opts = ClusterTimeoutOptions(query_timeout=timedelta(milliseconds=1))
348+
# If a timeout was not provided in the request, the default 75s timeout would be used.
349+
timeout_opts = ClusterTimeoutOptions(query_timeout=timedelta(seconds=1.5))
350350
cluster = Cluster.connect(f'{conn_string}', ClusterOptions(auth, timeout_options=timeout_opts))
351351
# don't need to do this except for older server versions
352352
_ = cluster.bucket(f'{cb_env.bucket.name}')
353-
q_str = f'SELECT * FROM `{cb_env.bucket.name}` LIMIT 10;'
353+
slow_query = ' '.join(['SELECT COUNT (1) AS c FROM',
354+
'ARRAY_RANGE(0,110) AS d1,'
355+
'ARRAY_RANGE(0,110) AS d2,'
356+
'ARRAY_RANGE(0,110) AS d3'])
354357
with pytest.raises(AmbiguousTimeoutException):
355-
cluster.query(q_str).execute()
358+
cluster.query(slow_query).execute()
356359

357360
# If we override the timeout w/in the request the query should succeed.
358-
rows = cluster.query(q_str, timeout=timedelta(seconds=10)).execute()
361+
# NOTE: a timeout of < 10s is most likely acceptable, but for the Jenkins environment we give plenty of room.
362+
rows = cluster.query(slow_query, timeout=timedelta(seconds=30)).execute()
359363
assert len(rows) > 0
360364

361365
def test_query_ryow(self, cb_env):

src/result.cxx

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -230,13 +230,7 @@ streamed_result_iternext(PyObject* self)
230230
Py_BEGIN_ALLOW_THREADS row = s_res->rows->get(s_res->timeout_ms);
231231
Py_END_ALLOW_THREADS
232232
}
233-
234-
if (row != nullptr) {
235-
return row;
236-
} else {
237-
PyErr_SetString(PyExc_StopIteration, "Timeout occurred waiting for next item in queue.");
238-
return nullptr;
239-
}
233+
return row;
240234
}
241235

242236
static PyObject*

src/result.hxx

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,21 @@ public:
4949
std::unique_lock<std::mutex> lock(mut_);
5050

5151
while (rows_.empty()) {
52-
auto now = std::chrono::system_clock::now();
53-
if (cv_.wait_until(lock, now + timeout_ms) == std::cv_status::timeout) {
54-
// this will cause iternext to return nullptr, which stops iteration
55-
return nullptr;
52+
if (cv_.wait_for(lock, timeout_ms) == std::cv_status::timeout) {
53+
// This timeout (e.g. timeout_ms) is the same timeout we pass to the C++ core.
54+
// If we timeout on the Python side this means:
55+
// - Edge case where the C++ core is about to timeout. We want to use the C++ core error
56+
// details,
57+
// so wait a little longer to get the C++ core timeout.
58+
// - The result set is large and since we don't have streaming support yet, we have to
59+
// wait for
60+
// the entire result set to be returned. Again we should wait until we get the results.
61+
// PYCBC-1685: Instead of trying to do some tricky error handling we instead wait for the
62+
// C++ core results and log a message that can provide insight to users about the SDK
63+
// behavior.
64+
CB_LOG_DEBUG(
65+
"PYCBC: No results received from C++ core after {}ms. Continue to wait for results.",
66+
timeout_ms.count());
5667
}
5768
}
5869

0 commit comments

Comments
 (0)