From 28e9c2712022bc5e37af84e3d0620933152b287f Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Fri, 10 Apr 2026 06:42:37 +0100 Subject: [PATCH 1/3] Add ignoreDuplicates option to insertMany for silent duplicate handling - Use ordered:false so MongoDB continues inserting after a duplicate key error - Suppress duplicate key writeErrors (11000/11001) in receive() via instance flag - Use writeError indices to filter out failed docs from the returned array - Non-duplicate writeErrors still throw as before --- src/Client.php | 67 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 58 insertions(+), 9 deletions(-) diff --git a/src/Client.php b/src/Client.php index 7481249..21b5ebe 100644 --- a/src/Client.php +++ b/src/Client.php @@ -43,6 +43,12 @@ class Client */ private bool $isConnected = false; + /** + * When true, duplicate key writeErrors (11000/11001) are suppressed in receive(). + * Set temporarily by insertMany() when ignoreDuplicates is enabled. + */ + private bool $ignoreDuplicateErrors = false; + /** * Defines commands Mongo uses over wire protocol. */ @@ -717,8 +723,9 @@ public function insert(string $collection, array $document, array $options = []) * - readConcern: Read concern specification * - readPreference: Read preference * - maxTimeMS: Operation timeout in milliseconds - * - ordered: Whether to stop on first error (default: true) + * - ordered: Whether to stop on first error (default: true, forced to false when ignoreDuplicates is true) * - batchSize: Number of documents per batch (default: 1000) + * - ignoreDuplicates: When true, silently skip duplicate key errors (11000/11001) and return only successfully inserted documents * @return array Array of inserted documents with generated _ids * @throws Exception */ @@ -729,7 +736,8 @@ public function insertMany(string $collection, array $documents, array $options } $batchSize = $options['batchSize'] ?? 1000; - $ordered = $options['ordered'] ?? true; + $ignoreDuplicates = $options['ignoreDuplicates'] ?? false; + $ordered = $ignoreDuplicates ? false : ($options['ordered'] ?? true); $insertedDocs = []; // Process documents in batches for better performance @@ -775,12 +783,38 @@ public function insertMany(string $collection, array $documents, array $options } // Add other options (excluding those we've already handled) - $otherOptions = array_diff_key($options, array_flip(['session', 'writeConcern', 'readConcern', 'batchSize'])); + $otherOptions = array_diff_key($options, array_flip(['session', 'writeConcern', 'readConcern', 'batchSize', 'ignoreDuplicates'])); $command = array_merge($command, $otherOptions); - $this->query($command); + if ($ignoreDuplicates) { + $this->ignoreDuplicateErrors = true; + } + + try { + $result = $this->query($command); + } finally { + $this->ignoreDuplicateErrors = false; + } + + if ($ignoreDuplicates && \is_object($result) && \property_exists($result, 'writeErrors')) { + // Use writeError indices to identify which docs were skipped + $failedIndices = []; + foreach ($result->writeErrors as $err) { + if (($err->code ?? 0) === 11000 || ($err->code ?? 0) === 11001) { + $failedIndices[$err->index] = true; + } + } - $insertedDocs = array_merge($insertedDocs, $this->toArray($docObjs)); + $filtered = []; + foreach ($docObjs as $i => $obj) { + if (!isset($failedIndices[$i])) { + $filtered[] = $obj; + } + } + $insertedDocs = \array_merge($insertedDocs, $this->toArray($filtered)); + } else { + $insertedDocs = \array_merge($insertedDocs, $this->toArray($docObjs)); + } } return $insertedDocs; @@ -1634,10 +1668,25 @@ private function parseResponse(string $response, int $responseLength): stdClass| // Check for write errors (duplicate key, etc.) if (\property_exists($result, 'writeErrors') && !empty($result->writeErrors)) { - throw new Exception( - $result->writeErrors[0]->errmsg, - $result->writeErrors[0]->code - ); + if ($this->ignoreDuplicateErrors) { + $nonDuplicateErrors = \array_filter( + (array)$result->writeErrors, + fn ($err) => ($err->code ?? 0) !== 11000 && ($err->code ?? 0) !== 11001 + ); + + if (!empty($nonDuplicateErrors)) { + $first = \reset($nonDuplicateErrors); + throw new Exception($first->errmsg, $first->code); + } + + // Return the full result so insertMany() can read writeErrors indices + return $result; + } else { + throw new Exception( + $result->writeErrors[0]->errmsg, + $result->writeErrors[0]->code + ); + } } // Check for general MongoDB errors From 08c94f423bf6a0473ea34b8217599cb53e177cbc Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Fri, 10 Apr 2026 06:54:56 +0100 Subject: [PATCH 2/3] Pass ignoreDuplicateErrors through call chain instead of class-level flag Addresses review feedback: - Remove mutable $ignoreDuplicateErrors property to prevent Swoole coroutine races - Thread the flag as a parameter through query() -> send() -> receive() -> parseResponse() - Add 'ordered' to array_diff_key exclusion list to prevent caller override --- src/Client.php | 34 ++++++++++------------------------ 1 file changed, 10 insertions(+), 24 deletions(-) diff --git a/src/Client.php b/src/Client.php index 21b5ebe..e5895be 100644 --- a/src/Client.php +++ b/src/Client.php @@ -43,12 +43,6 @@ class Client */ private bool $isConnected = false; - /** - * When true, duplicate key writeErrors (11000/11001) are suppressed in receive(). - * Set temporarily by insertMany() when ignoreDuplicates is enabled. - */ - private bool $ignoreDuplicateErrors = false; - /** * Defines commands Mongo uses over wire protocol. */ @@ -252,7 +246,7 @@ public function createUuid(): string * @return stdClass|array|int Query result * @throws Exception */ - public function query(array $command, ?string $db = null): stdClass|array|int + public function query(array $command, ?string $db = null, bool $ignoreDuplicateErrors = false): stdClass|array|int { // Validate connection state before each operation $this->validateConnection(); @@ -357,7 +351,7 @@ public function query(array $command, ?string $db = null): stdClass|array|int $sections = Document::fromPHP($params); $message = pack('V*', 21 + strlen($sections), $this->id, 0, 2013, 0) . "\0" . $sections; - $result = $this->send($message); + $result = $this->send($message, $ignoreDuplicateErrors); $this->updateCausalConsistency($result); @@ -377,7 +371,7 @@ public function query(array $command, ?string $db = null): stdClass|array|int * @return stdClass|array|int * @throws Exception */ - public function send(mixed $data): stdClass|array|int + public function send(mixed $data, bool $ignoreDuplicateErrors = false): stdClass|array|int { // Check if connection is alive, connect if not if (!$this->client->isConnected()) { @@ -396,14 +390,14 @@ public function send(mixed $data): stdClass|array|int } } - return $this->receive(); + return $this->receive($ignoreDuplicateErrors); } /** * Receive a message from connection. * @throws Exception */ - private function receive(): stdClass|array|int + private function receive(bool $ignoreDuplicateErrors = false): stdClass|array|int { $chunks = []; $receivedLength = 0; @@ -467,7 +461,7 @@ private function receive(): stdClass|array|int $res = \implode('', $chunks); - return $this->parseResponse($res, $responseLength); + return $this->parseResponse($res, $responseLength, $ignoreDuplicateErrors); } /** @@ -783,18 +777,10 @@ public function insertMany(string $collection, array $documents, array $options } // Add other options (excluding those we've already handled) - $otherOptions = array_diff_key($options, array_flip(['session', 'writeConcern', 'readConcern', 'batchSize', 'ignoreDuplicates'])); + $otherOptions = array_diff_key($options, array_flip(['session', 'writeConcern', 'readConcern', 'batchSize', 'ignoreDuplicates', 'ordered'])); $command = array_merge($command, $otherOptions); - if ($ignoreDuplicates) { - $this->ignoreDuplicateErrors = true; - } - - try { - $result = $this->query($command); - } finally { - $this->ignoreDuplicateErrors = false; - } + $result = $this->query($command, null, $ignoreDuplicates); if ($ignoreDuplicates && \is_object($result) && \property_exists($result, 'writeErrors')) { // Use writeError indices to identify which docs were skipped @@ -1618,7 +1604,7 @@ public function close(): void * @return stdClass|array|int Parsed response * @throws Exception */ - private function parseResponse(string $response, int $responseLength): stdClass|array|int + private function parseResponse(string $response, int $responseLength, bool $ignoreDuplicateErrors = false): stdClass|array|int { /* * The first 21 bytes of the MongoDB wire protocol response consist of: @@ -1668,7 +1654,7 @@ private function parseResponse(string $response, int $responseLength): stdClass| // Check for write errors (duplicate key, etc.) if (\property_exists($result, 'writeErrors') && !empty($result->writeErrors)) { - if ($this->ignoreDuplicateErrors) { + if ($ignoreDuplicateErrors) { $nonDuplicateErrors = \array_filter( (array)$result->writeErrors, fn ($err) => ($err->code ?? 0) !== 11000 && ($err->code ?? 0) !== 11001 From b9db7e3861e4df05a54dcbebf23f4e931f76280f Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Fri, 10 Apr 2026 07:05:22 +0100 Subject: [PATCH 3/3] Add tests for insertMany ignoreDuplicates option - testInsertManyIgnoreDuplicates: mixed batch with existing + new docs - testInsertManyIgnoreIntraBatchDuplicates: same ID twice, first wins - testInsertManyIgnoreAllDuplicates: all duplicates, zero inserts --- tests/MongoTest.php | 95 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/tests/MongoTest.php b/tests/MongoTest.php index 2d48716..f497ed4 100644 --- a/tests/MongoTest.php +++ b/tests/MongoTest.php @@ -389,6 +389,101 @@ public function testToArrayNestedConversion() self::assertEquals([42], $client->toArray(42)); } + public function testInsertManyIgnoreDuplicates() + { + $collectionName = 'ignore_duplicates'; + $this->getDatabase()->createCollection($collectionName); + try { + // Seed two documents + $this->getDatabase()->insertMany($collectionName, [ + ['_id' => 'doc1', 'name' => 'Original A'], + ['_id' => 'doc2', 'name' => 'Original B'], + ]); + + // Without ignoreDuplicates, inserting a duplicate throws + try { + $this->getDatabase()->insertMany($collectionName, [ + ['_id' => 'doc1', 'name' => 'Duplicate A'], + ]); + self::fail('Expected duplicate key exception'); + } catch (Exception $e) { + self::assertTrue($e->isDuplicateKeyError()); + } + + // With ignoreDuplicates, duplicate is skipped and new doc is inserted + $result = $this->getDatabase()->insertMany($collectionName, [ + ['_id' => 'doc1', 'name' => 'Duplicate A'], + ['_id' => 'doc3', 'name' => 'New C'], + ], ['ignoreDuplicates' => true]); + + self::assertCount(1, $result); + self::assertEquals('doc3', $result[0]['_id']); + + // Original doc1 unchanged + $docs = $this->getDatabase()->find($collectionName, ['_id' => 'doc1'])->cursor->firstBatch ?? []; + self::assertCount(1, $docs); + self::assertEquals('Original A', $docs[0]->name); + + // Total should be 3 + $total = $this->getDatabase()->count($collectionName); + self::assertEquals(3, $total); + } finally { + $this->getDatabase()->dropCollection($collectionName); + } + } + + public function testInsertManyIgnoreIntraBatchDuplicates() + { + $collectionName = 'ignore_intra_batch'; + $this->getDatabase()->createCollection($collectionName); + try { + // Same ID twice in one batch — first wins + $result = $this->getDatabase()->insertMany($collectionName, [ + ['_id' => 'dup', 'name' => 'First'], + ['_id' => 'dup', 'name' => 'Second'], + ['_id' => 'unique1', 'name' => 'Unique'], + ], ['ignoreDuplicates' => true]); + + self::assertCount(2, $result); + + $doc = $this->getDatabase()->find($collectionName, ['_id' => 'dup'])->cursor->firstBatch ?? []; + self::assertCount(1, $doc); + self::assertEquals('First', $doc[0]->name); + + $total = $this->getDatabase()->count($collectionName); + self::assertEquals(2, $total); + } finally { + $this->getDatabase()->dropCollection($collectionName); + } + } + + public function testInsertManyIgnoreAllDuplicates() + { + $collectionName = 'ignore_all_dups'; + $this->getDatabase()->createCollection($collectionName); + try { + // Seed one document + $this->getDatabase()->insert($collectionName, ['_id' => 'existing', 'name' => 'Original']); + + // Insert only duplicates with ignoreDuplicates + $result = $this->getDatabase()->insertMany($collectionName, [ + ['_id' => 'existing', 'name' => 'Duplicate'], + ], ['ignoreDuplicates' => true]); + + self::assertCount(0, $result); + + // Original unchanged + $docs = $this->getDatabase()->find($collectionName, ['_id' => 'existing'])->cursor->firstBatch ?? []; + self::assertEquals('Original', $docs[0]->name); + + // Still only 1 document + $total = $this->getDatabase()->count($collectionName); + self::assertEquals(1, $total); + } finally { + $this->getDatabase()->dropCollection($collectionName); + } + } + public function testCountMethod() { $collectionName = 'count_test';