Skip to content

Commit 85a806b

Browse files
committed
Receive chunking
1 parent 11a919b commit 85a806b

1 file changed

Lines changed: 40 additions & 29 deletions

File tree

src/Client.php

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -377,9 +377,9 @@ public function send(mixed $data): stdClass|array|int
377377
*/
378378
private function receive(): stdClass|array|int
379379
{
380+
$chunks = [];
380381
$receivedLength = 0;
381382
$responseLength = null;
382-
$res = '';
383383
$attempts = 0;
384384
$maxAttempts = 10000;
385385
$sleepTime = 100;
@@ -407,26 +407,37 @@ private function receive(): stdClass|array|int
407407
$attempts = 0;
408408
$sleepTime = 100; // Reset to 0.1ms
409409

410-
$receivedLength += \strlen($chunk);
411-
$res .= $chunk;
410+
$chunkLen = \strlen($chunk);
411+
$receivedLength += $chunkLen;
412+
$chunks[] = $chunk;
412413

413414
// Parse message length from first 4 bytes
414-
if ((!isset($responseLength)) && (strlen($res) >= 4)) {
415-
$responseLength = \unpack('Vl', substr($res, 0, 4))['l'];
415+
if ($responseLength === null && $receivedLength >= 4) {
416+
$firstData = $chunks[0];
417+
418+
if (\strlen($firstData) < 4) {
419+
$firstData = \implode('', $chunks);
420+
}
421+
422+
$responseLength = \unpack('Vl', substr($firstData, 0, 4))['l'];
416423

417424
// Validate response length before allocating memory to prevent memory exhaustion
418425
if ($responseLength > 16777216) { // 16MB limit
419426
throw new Exception('Response too large: ' . $responseLength . ' bytes');
420427
}
421428

422-
// Additional validation for negative or tiny values
429+
// Validate for negative or tiny values
423430
if ($responseLength < 21) { // Minimum MongoDB message size
424431
throw new Exception('Invalid response length: ' . $responseLength . ' bytes');
425432
}
426433
}
427-
} while (
428-
(!isset($responseLength)) || ($receivedLength < $responseLength)
429-
);
434+
435+
if ($responseLength !== null && $receivedLength >= $responseLength) {
436+
break;
437+
}
438+
} while (true);
439+
440+
$res = \implode('', $chunks);
430441

431442
return $this->parseResponse($res, $responseLength);
432443
}
@@ -1569,25 +1580,25 @@ private function parseResponse(string $response, int $responseLength): stdClass|
15691580
* These 21 bytes are protocol metadata and precede the actual BSON-encoded document in the response.
15701581
*/
15711582

1572-
if (strlen($response) < 21) {
1583+
if (\strlen($response) < 21) {
15731584
throw new Exception('Invalid response: too short');
15741585
}
15751586

15761587
// Extract message header
1577-
$header = substr($response, 0, 16);
1578-
$headerData = unpack('VmessageLength/VrequestID/VresponseTo/VopCode', $header);
1588+
$header = \substr($response, 0, 16);
1589+
$headerData = \unpack('VmessageLength/VrequestID/VresponseTo/VopCode', $header);
15791590

15801591
// Validate message length
15811592
if ($headerData['messageLength'] !== $responseLength) {
15821593
throw new Exception('Response length mismatch');
15831594
}
15841595

15851596
// Extract flag bits and payload type
1586-
$flagBits = unpack('V', substr($response, 16, 4))[1];
1587-
$payloadType = ord(substr($response, 20, 1));
1597+
$flagBits = \unpack('V', \substr($response, 16, 4))[1];
1598+
$payloadType = \ord(\substr($response, 20, 1));
15881599

15891600
// Extract BSON document (skip header + flagBits + payloadType)
1590-
$bsonString = substr($response, 21, $responseLength - 21);
1601+
$bsonString = \substr($response, 21, $responseLength - 21);
15911602

15921603
if (empty($bsonString)) {
15931604
return new \stdClass();
@@ -1598,32 +1609,32 @@ private function parseResponse(string $response, int $responseLength): stdClass|
15981609
$result = Document::fromBSON($bsonString)->toPHP();
15991610

16001611
// Convert array to stdClass if needed
1601-
if (is_array($result)) {
1612+
if (\is_array($result)) {
16021613
$result = (object)$result;
16031614
}
16041615

16051616
// Check for write errors (duplicate key, etc.)
1606-
if (property_exists($result, 'writeErrors') && !empty($result->writeErrors)) {
1617+
if (\property_exists($result, 'writeErrors') && !empty($result->writeErrors)) {
16071618
throw new Exception(
16081619
$result->writeErrors[0]->errmsg,
16091620
$result->writeErrors[0]->code
16101621
);
16111622
}
16121623

16131624
// Check for general MongoDB errors
1614-
if (property_exists($result, 'errmsg')) {
1625+
if (\property_exists($result, 'errmsg')) {
16151626
throw new Exception(
16161627
'E' . $result->code . ' ' . $result->codeName . ': ' . $result->errmsg,
16171628
$result->code
16181629
);
16191630
}
16201631

16211632
// Check for operation success
1622-
if (property_exists($result, 'n') && $result->ok === 1.0) {
1633+
if (\property_exists($result, 'n') && $result->ok === 1.0) {
16231634
return $result->n;
16241635
}
16251636

1626-
if (property_exists($result, 'nonce') && $result->ok === 1.0) {
1637+
if (\property_exists($result, 'nonce') && $result->ok === 1.0) {
16271638
return $result;
16281639
}
16291640

@@ -1636,7 +1647,7 @@ private function parseResponse(string $response, int $responseLength): stdClass|
16361647
throw new Exception('Failed to parse BSON response: ' . $e->getMessage());
16371648
} catch (\Exception $e) {
16381649
if ($e instanceof Exception) {
1639-
throw $e; // Re-throw our own exceptions
1650+
throw $e;
16401651
}
16411652
throw new Exception('Error parsing response: ' . $e->getMessage());
16421653
}
@@ -1671,11 +1682,11 @@ public function isTransientTransactionError(Exception $exception): bool
16711682
13436, // NotMasterOrSecondary
16721683
];
16731684

1674-
return in_array($code, $transientCodes) ||
1675-
str_contains($message, self::TRANSIENT_TRANSACTION_ERROR) ||
1676-
str_contains($message, 'connection') ||
1677-
str_contains($message, 'timeout') ||
1678-
str_contains($message, 'network');
1685+
return \in_array($code, $transientCodes) ||
1686+
\str_contains($message, self::TRANSIENT_TRANSACTION_ERROR) ||
1687+
\str_contains($message, 'connection') ||
1688+
\str_contains($message, 'timeout') ||
1689+
\str_contains($message, 'network');
16791690
}
16801691

16811692
/**
@@ -1702,8 +1713,8 @@ public function isUnknownTransactionCommitResult(Exception $exception): bool
17021713
13436, // NotMasterOrSecondary
17031714
];
17041715

1705-
return in_array($code, $unknownCommitCodes) ||
1706-
str_contains($message, self::UNKNOWN_TRANSACTION_COMMIT_RESULT);
1716+
return \in_array($code, $unknownCommitCodes) ||
1717+
\str_contains($message, self::UNKNOWN_TRANSACTION_COMMIT_RESULT);
17071718
}
17081719

17091720
/**
@@ -1769,7 +1780,7 @@ public function withTransaction(array $session, callable $callback, array $optio
17691780
if ($this->isTransientTransactionError($e) && $attempt < $maxRetries) {
17701781
$attempt++;
17711782
if ($retryDelayMs > 0) {
1772-
usleep($retryDelayMs * 1000);
1783+
\usleep($retryDelayMs * 1000);
17731784
}
17741785
continue;
17751786
}

0 commit comments

Comments
 (0)