Skip to content

Commit 1db7fe0

Browse files
committed
[155] encodedsession: Support ResultSet with a huge header
The ResultSet consists of a header (labels for each returned column) plus some number of rows. The maximum size of a single "fetch" operation is 100,000 bytes. If the header happens to exceed that size then the fetch operation will always raise an EndOfStream exception. The protocol for result sets is a little fragile: we send back a row of data followed by an optional integer. If the integer is 1 then there is another row to be read in this same fetch result message. If the integer is 0 then there are no more rows to fetch (the ResultSet is complete). If the integer is missing completely then there are more rows to fetch by sending another request to the server. In the driver we were always assuming there would be the headers plus at least one row before we ran out of room in the message, but if the header was large enough (> 100,000 bytes for labels) that might not be true. Add a new encodedsession._hasBytes() method that allows us to check whether there is data left in the message. Use this to rewrite the fetch_result_set() and fetch_result_set_next() methods to simplify the logic and avoid the error above.
1 parent 6b7b59e commit 1db7fe0

1 file changed

Lines changed: 27 additions & 32 deletions

File tree

pynuodb/encodedsession.py

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""A module for housing the EncodedSession class.
22
3-
(C) Copyright 2013-2020 NuoDB, Inc. All Rights Reserved.
3+
(C) Copyright 2013-2021 NuoDB, Inc. All Rights Reserved.
44
55
This software is licensed under a BSD 3-Clause License.
66
See the LICENSE file provided with this software.
@@ -438,30 +438,27 @@ def fetch_result_set(self, statement):
438438
handle = self.getInt()
439439
colcount = self.getInt()
440440

441-
col_num_iter = range(colcount)
442-
for _ in col_num_iter:
441+
# skip the header labels
442+
for _ in range(colcount):
443443
self.getString()
444444

445445
complete = False
446446
init_results = []
447-
next_row = self.getInt()
448447

449-
while next_row == 1:
448+
# If we hit the end of the stream without next==0, there are more
449+
# results to fetch.
450+
while self._hasBytes(1):
451+
next_row = self.getInt()
452+
if next_row == 0:
453+
complete = True
454+
break
455+
450456
row = [None] * colcount
451-
for i in col_num_iter:
457+
for i in range(colcount):
452458
row[i] = self.getValue()
453459

454460
init_results.append(tuple(row))
455461

456-
try:
457-
next_row = self.getInt()
458-
except EndOfStream:
459-
break
460-
461-
# the first chunk might be all of the data
462-
if next_row == 0:
463-
complete = True
464-
465462
return ResultSet(handle, colcount, init_results, complete)
466463

467464
def fetch_result_set_next(self, result_set):
@@ -471,26 +468,19 @@ def fetch_result_set_next(self, result_set):
471468
self._putMessageId(protocol.NEXT).putInt(result_set.handle)
472469
self._exchangeMessages()
473470

474-
col_num_iter = range(result_set.col_count)
475-
476471
result_set.clear_results()
477472

478-
next_row = self.getInt()
479-
while next_row == 1:
473+
while self._hasBytes(1):
474+
if self.getInt() == 0:
475+
result_set.complete = True
476+
break
477+
480478
row = [None] * result_set.col_count
481-
for i in col_num_iter:
479+
for i in range(result_set.col_count):
482480
row[i] = self.getValue()
483481

484482
result_set.add_row(tuple(row))
485483

486-
try:
487-
next_row = self.getInt()
488-
except EndOfStream:
489-
break
490-
491-
if next_row == 0:
492-
result_set.complete = True
493-
494484
def fetch_result_set_description(self, result_set):
495485
"""
496486
:type result_set: ResultSet
@@ -1102,7 +1092,6 @@ def _exchangeMessages(self, getResponse=True):
11021092
db_error_handler(protocol.OPERATION_TIMEOUT, "timed out")
11031093

11041094
error = self.getInt()
1105-
11061095
if error != 0:
11071096
db_error_handler(error, self.getString())
11081097
else:
@@ -1131,13 +1120,19 @@ def _setup_statement(self, handle, msgId):
11311120

11321121
return self
11331122

1123+
def _hasBytes(self, length):
1124+
return self.__inpos + length <= len(self.__input)
1125+
11341126
def _peekTypeCode(self):
11351127
"""Looks at the next Type Code off the session. (Does not move inpos)"""
1128+
if not self._hasBytes(1):
1129+
raise EndOfStream('end of stream reached')
1130+
11361131
return ord(self.__input[self.__inpos])
11371132

11381133
def _getTypeCode(self):
11391134
"""Read the next Type Code off the session."""
1140-
if self.__inpos >= len(self.__input):
1135+
if not self._hasBytes(1):
11411136
raise EndOfStream('end of stream reached')
11421137

11431138
try:
@@ -1151,8 +1146,8 @@ def _takeBytes(self, length):
11511146
:type length: int
11521147
:rtype: bytes
11531148
"""
1154-
if self.__inpos + length > len(self.__input):
1155-
raise EndOfStream('end of stream reached')
1149+
if not self._hasBytes(length):
1150+
raise EndOfStream('end of stream reached (need %d bytes)' % (length))
11561151

11571152
try:
11581153
return self.__input[self.__inpos:self.__inpos + length]

0 commit comments

Comments
 (0)