Skip to content

Commit a483557

Browse files
committed
Improved internal_parse_buffer and internal_socket_read
1 parent 34552de commit a483557

2 files changed

Lines changed: 120 additions & 101 deletions

File tree

C/sqcloud.c

Lines changed: 118 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,23 +1165,14 @@ static SQCloudResult *internal_parse_rowset_chunck (SQCloudConnection *connectio
11651165
static SQCloudResult *internal_parse_buffer (SQCloudConnection *connection, char *buffer, uint32_t blen, uint32_t cstart, bool isstatic, bool externalbuffer) {
11661166
if (blen <= 1) return false;
11671167

1168+
bool buffer_canbe_freed = (!isstatic && !externalbuffer);
1169+
11681170
// try to check if it is a OK reply: +2 OK
11691171
if ((blen == REPLY_OK_LEN) && (strncmp(buffer, REPLY_OK, REPLY_OK_LEN) == 0)) {
1172+
if (buffer_canbe_freed) mem_free(buffer);
11701173
return &SQCloudResultOK;
11711174
}
11721175

1173-
// if buffer is static (stack based allocation) then it must be duplicated
1174-
if (buffer[0] != CMD_ERROR && isstatic) {
1175-
char *clone = mem_alloc(blen);
1176-
if (!clone) {
1177-
internal_set_error(connection, INTERNAL_ERRCODE_MEMORY, "Unable to allocate memory: %d.", blen);
1178-
return NULL;
1179-
}
1180-
memcpy(clone, buffer, blen);
1181-
buffer = clone;
1182-
isstatic = false;
1183-
}
1184-
11851176
// check for compressed reply before the parse step
11861177
char *zdata = NULL;
11871178
if (buffer[0] == CMD_COMPRESSED) {
@@ -1200,11 +1191,12 @@ static SQCloudResult *internal_parse_buffer (SQCloudConnection *connection, char
12001191
char *hstart = &buffer[cstart1 + cstart2 + cstart3 + 1];
12011192

12021193
// try to allocate a buffer big enough to hold uncompressed data + raw header
1203-
uint32_t clonelen = ulen + (uint32_t)(hstart - buffer);
1194+
// 256 is an arbitrary memory cushion value
1195+
uint32_t clonelen = ulen + (uint32_t)(hstart - buffer) + 256;
12041196
char *clone = mem_alloc (clonelen);
12051197
if (!clone) {
12061198
internal_set_error(connection, INTERNAL_ERRCODE_MEMORY, "Unable to allocate memory to uncompress buffer: %d.", clonelen);
1207-
if (!isstatic && !externalbuffer) mem_free(buffer);
1199+
if (buffer_canbe_freed) mem_free(buffer);
12081200
return NULL;
12091201
}
12101202

@@ -1215,12 +1207,12 @@ static SQCloudResult *internal_parse_buffer (SQCloudConnection *connection, char
12151207
uint32_t rc = LZ4_decompress_safe(zdata, clone + (zdata - hstart), clen, ulen);
12161208
if (rc <= 0 || rc != ulen) {
12171209
internal_set_error(connection, INTERNAL_ERRCODE_GENERIC, "Unable to decompress buffer (err code: %d).", rc);
1218-
if (!isstatic && !externalbuffer) mem_free(buffer);
1210+
if (buffer_canbe_freed) mem_free(buffer);
12191211
return NULL;
12201212
}
12211213

12221214
// decompression is OK so replace buffer
1223-
if (!isstatic && !externalbuffer) mem_free(buffer);
1215+
if (buffer_canbe_freed) mem_free(buffer);
12241216

12251217
isstatic = false;
12261218
buffer = clone;
@@ -1229,8 +1221,25 @@ static SQCloudResult *internal_parse_buffer (SQCloudConnection *connection, char
12291221
// at this point the buffer used in the SQCloudResult is a newly allocated one (clone)
12301222
// so externalbuffer flag must be set to false
12311223
externalbuffer = false;
1224+
} else {
1225+
// if buffer is static (stack based allocation) then it must be duplicated
1226+
bool buffer_should_be_duplicated = (buffer[0] != CMD_ERROR);
1227+
if (buffer_should_be_duplicated && isstatic) {
1228+
char *clone = mem_alloc(blen);
1229+
if (!clone) {
1230+
internal_set_error(connection, INTERNAL_ERRCODE_MEMORY, "Unable to allocate memory: %d.", blen);
1231+
if (buffer_canbe_freed) mem_free(buffer);
1232+
return NULL;
1233+
}
1234+
memcpy(clone, buffer, blen);
1235+
buffer = clone;
1236+
isstatic = false;
1237+
}
12321238
}
12331239

1240+
// re-compute flag
1241+
buffer_canbe_freed = (!isstatic && !externalbuffer);
1242+
12341243
// parse reply
12351244
switch (buffer[0]) {
12361245
case CMD_ZEROSTRING:
@@ -1274,7 +1283,7 @@ static SQCloudResult *internal_parse_buffer (SQCloudConnection *connection, char
12741283
connection->errmsg[len] = 0;
12751284

12761285
// check free buffer
1277-
if (!isstatic && !externalbuffer) mem_free(buffer);
1286+
if (buffer_canbe_freed) mem_free(buffer);
12781287
return NULL;
12791288
}
12801289

@@ -1304,12 +1313,12 @@ static SQCloudResult *internal_parse_buffer (SQCloudConnection *connection, char
13041313
}
13051314

13061315
// check free buffer
1307-
if (!res && !isstatic && !externalbuffer) mem_free(buffer);
1316+
if (!res && buffer_canbe_freed) mem_free(buffer);
13081317
return res;
13091318
}
13101319

13111320
case CMD_NULL:
1312-
if (!isstatic && !externalbuffer) mem_free(buffer);
1321+
if (buffer_canbe_freed) mem_free(buffer);
13131322
return &SQCloudResultNULL;
13141323

13151324
case CMD_INT:
@@ -1319,7 +1328,7 @@ static SQCloudResult *internal_parse_buffer (SQCloudConnection *connection, char
13191328
SQCloudResult *res = internal_rowset_type(connection, buffer, blen, 1, (buffer[0] == CMD_INT) ? RESULT_INTEGER : RESULT_FLOAT);
13201329
if (res) res->externalbuffer = externalbuffer;
13211330

1322-
if (!res && !isstatic && !externalbuffer) mem_free(buffer);
1331+
if (!res && buffer_canbe_freed) mem_free(buffer);
13231332
return res;
13241333
}
13251334

@@ -1331,7 +1340,7 @@ static SQCloudResult *internal_parse_buffer (SQCloudConnection *connection, char
13311340
}
13321341
}
13331342

1334-
if (!isstatic && !externalbuffer) mem_free(buffer);
1343+
if (buffer_canbe_freed) mem_free(buffer);
13351344
return NULL;
13361345
}
13371346

@@ -1405,94 +1414,108 @@ static bool internal_socket_forward_read (SQCloudConnection *connection, bool (*
14051414
return false;
14061415
}
14071416

1408-
static SQCloudResult *internal_socket_read (SQCloudConnection *connection, bool mainfd) {
1409-
// most of the time one read will be sufficient
1410-
char header[4096];
1411-
char *buffer = (char *)&header;
1412-
uint32_t blen = sizeof(header);
1413-
uint32_t tread = 0;
1417+
static ssize_t internal_socket_read_nbytes (int fd, void *tlsp, char *buffer, ssize_t len) {
1418+
ssize_t total_read = 0;
14141419

1415-
uint32_t cstart = 0;
1416-
uint32_t clen = 0;
1420+
while (1) {
1421+
#ifndef SQLITECLOUD_DISABLE_TLS
1422+
ssize_t nread = (tlsp) ? tls_read((struct tls *)tlsp, buffer + total_read, len - total_read) : readsocket(fd, buffer + total_read, len - total_read);
1423+
if ((tlsp) && (nread == TLS_WANT_POLLIN || nread == TLS_WANT_POLLOUT)) continue;
1424+
#else
1425+
nread = readsocket(fd, buffer + total_read, len - total_read);
1426+
#endif
1427+
total_read += nread;
1428+
1429+
if (nread <= 0) return nread;
1430+
if (total_read == len) break;
1431+
};
1432+
1433+
return total_read;
1434+
}
14171435

1436+
static SQCloudResult *internal_socket_read (SQCloudConnection *connection, bool mainfd) {
14181437
int fd = (mainfd) ? connection->fd : connection->pubsubfd;
14191438
#ifndef SQLITECLOUD_DISABLE_TLS
14201439
struct tls *tls = (mainfd) ? connection->tls_context : connection->tls_pubsub_context;
1440+
#else
1441+
void *tls = NULL;
14211442
#endif
14221443

1423-
char *original = buffer;
1444+
ssize_t nread = 0;
1445+
uint32_t clen = 0;
1446+
uint32_t cstart = 0;
1447+
char header[64];
1448+
int header_index = 0;
1449+
1450+
char *buffer = NULL;
1451+
char static_buffer[4096];
1452+
1453+
// read the buffer one character at a time until a space is encountered
1454+
// see https://github.com/sqlitecloud/sdk/blob/master/PROTOCOL.md for more details about the protocol
1455+
// after this loop we can know the buffer type and len
14241456
while (1) {
1425-
#ifndef SQLITECLOUD_DISABLE_TLS
1426-
ssize_t nread = (tls) ? tls_read(tls, buffer, blen) : readsocket(fd, buffer, blen);
1427-
if ((tls) && (nread == TLS_WANT_POLLIN || nread == TLS_WANT_POLLOUT)) continue;
1428-
#else
1429-
ssize_t nread = readsocket(fd, buffer, blen);
1430-
#endif
1431-
1432-
if (nread < 0) {
1433-
const char *msg = "";
1434-
#ifndef SQLITECLOUD_DISABLE_TLS
1435-
if (tls) msg = tls_error(tls);
1436-
#endif
1437-
1438-
internal_set_error(connection, INTERNAL_ERRCODE_NETWORK, "An error occurred while reading data: %s (%s).", strerror(errno), msg);
1439-
goto abort_read;
1440-
}
1441-
1442-
if (nread == 0) {
1443-
const char *msg = "";
1444-
#ifndef SQLITECLOUD_DISABLE_TLS
1445-
if (tls) msg = tls_error(tls);
1446-
#endif
1447-
1448-
internal_set_error(connection, INTERNAL_ERRCODE_NETWORK, "Unexpected EOF found while reading data: %s (%s).", strerror(errno), msg);
1449-
goto abort_read;
1457+
nread = internal_socket_read_nbytes(fd, tls, &header[header_index], 1);
1458+
if (nread <= 0) goto abort_read;
1459+
if (header[header_index] == ' ') break;
1460+
++header_index;
1461+
1462+
// check for malformed header
1463+
if (header_index >= sizeof(header)) {
1464+
internal_set_error(connection, INTERNAL_ERRCODE_NETWORK, "Bad protocol reply from server: unable to find buffer size (type was %c).", header[0]);
1465+
return NULL;
14501466
}
1467+
}
1468+
1469+
// parse len (if any)
1470+
int header_size = header_index + 1; // +1 because ++header_index; is after the break clause
1471+
if (internal_has_commandlen(header[0])) {
1472+
clen = internal_parse_number (&header[1], header_size-1, &cstart);
14511473

1452-
tread += (uint32_t)nread;
1453-
blen -= (uint32_t)nread;
1454-
buffer += nread;
1455-
1456-
if (internal_has_commandlen(original[0])) {
1457-
// parse buffer looking for command length
1458-
if (clen == 0) clen = internal_parse_number (&original[1], tread-1, &cstart);
1459-
1460-
// check special zero-length value
1461-
if (clen == 0) {
1462-
if (!internal_canbe_zerolength(original[0])) continue;
1463-
}
1464-
1465-
// check if read is complete
1466-
// clen is the lenght parsed in the buffer
1467-
// cstart is the index of the first space
1468-
// +1 because we skipped the first character in the internal_parse_number function
1469-
if (clen + cstart + 1 != tread) {
1470-
// check buffer allocation and continue reading
1471-
if (clen + cstart - tread > blen) {
1472-
char *clone = mem_alloc(clen + cstart + 1);
1473-
if (!clone) {
1474-
internal_set_error(connection, INTERNAL_ERRCODE_MEMORY, "Unable to allocate memory: %d.", clen + cstart + 1);
1475-
goto abort_read;
1476-
}
1477-
memcpy(clone, original, tread);
1478-
buffer = original = clone;
1479-
blen = (clen + cstart + 1) - tread;
1480-
buffer += tread;
1481-
}
1482-
continue;
1474+
// check special zero-length value
1475+
if (clen == 0) {
1476+
if (internal_canbe_zerolength(header[0])) {
1477+
// it is perfectly legit to have a zero-bytes string or blob
1478+
return internal_parse_buffer(connection, header, header_size, 0, true, false);
1479+
} else {
1480+
// we parsed a zero-length header but we command does not allow that value, so return an error
1481+
internal_set_error(connection, INTERNAL_ERRCODE_NETWORK, "Bad protocol reply from server: the type %c cannot have a zero length buffer.", header[0]);
1482+
return NULL;
14831483
}
1484-
} else {
1485-
// it is a command with no explicit len
1486-
// so make sure that the final character is a space
1487-
if (original[tread-1] != ' ') continue;
14881484
}
1489-
1490-
// command is complete so parse it
1491-
return internal_parse_buffer(connection, original, tread, (clen) ? cstart : 0, (original == header), false);
1485+
} else {
1486+
// command does not have an explicit len so the header can be safely processed
1487+
return internal_parse_buffer(connection, header, header_size, (clen) ? cstart : 0, true, false);
14921488
}
14931489

1494-
abort_read:
1495-
if (original != (char *)&header) mem_free(original);
1490+
// header correctly parsed and len is greater than zero, check if allocate a buffer or use a static one
1491+
// the static buffer optimization was added because of the +2 OK messages
1492+
size_t blen = clen + header_size;
1493+
buffer = (blen <= sizeof(static_buffer)) ? static_buffer : mem_alloc(blen);
1494+
if (!buffer) {
1495+
internal_set_error(connection, INTERNAL_ERRCODE_MEMORY, "Unable to allocate memory: %d.", blen);
1496+
return NULL;
1497+
}
1498+
1499+
// copy header back to buffer
1500+
memcpy(buffer, header, header_size);
1501+
1502+
// read the remaing part of the command
1503+
nread = internal_socket_read_nbytes(fd, tls, &buffer[header_size], clen);
1504+
if (nread <= 0) goto abort_read;
1505+
1506+
// command is complete so parse it
1507+
return internal_parse_buffer(connection, buffer, (uint32_t)blen, (clen) ? cstart : 0, (buffer == static_buffer), false);
1508+
1509+
abort_read: {
1510+
const char *msg = "";
1511+
const char *format = (nread == 0) ? "Unexpected EOF found while reading data: %s (%s)." : "An error occurred while reading data: %s (%s).";
1512+
#ifndef SQLITECLOUD_DISABLE_TLS
1513+
if (tls) msg = tls_error(tls);
1514+
#endif
1515+
internal_set_error(connection, INTERNAL_ERRCODE_NETWORK, format, strerror(errno), msg);
1516+
}
1517+
1518+
if (buffer && buffer != static_buffer) mem_free(buffer);
14961519
return NULL;
14971520
}
14981521

@@ -2299,10 +2322,6 @@ bool _reserved6 (SQCloudConnection *connection, const char *buffer) {
22992322
return internal_socket_raw_write(connection, buffer);
23002323
}
23012324

2302-
SQCloudResult *_reserved7 (SQCloudConnection *connection) {
2303-
return internal_socket_read(connection, true);
2304-
}
2305-
23062325
bool _reserved8 (SQCloudConnection *connection, const char *dbname, const char *key, uint64_t snapshotid, bool isinternaldb, void *xdata, int64_t dbsize, int (*xCallback)(void *xdata, void *buffer, uint32_t *blen, int64_t ntot, int64_t nprogress)) {
23072326
return internal_upload_database(connection, dbname, key, true, snapshotid, isinternaldb, xdata, dbsize, xCallback);
23082327
}

C/sqcloud.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
extern "C" {
1616
#endif
1717

18-
#define SQCLOUD_SDK_VERSION "0.9.8"
19-
#define SQCLOUD_SDK_VERSION_NUM 0x000908
18+
#define SQCLOUD_SDK_VERSION "0.9.9"
19+
#define SQCLOUD_SDK_VERSION_NUM 0x000909
2020
#define SQCLOUD_DEFAULT_PORT 8860
2121
#define SQCLOUD_DEFAULT_TIMEOUT 12
2222
#define SQCLOUD_DEFAULT_UPLOAD_SIZE 512*1024

0 commit comments

Comments
 (0)