diff --git a/src/Client.php b/src/Client.php index 7481249..e5895be 100644 --- a/src/Client.php +++ b/src/Client.php @@ -246,7 +246,7 @@ public function createUuid(): string * @return stdClass|array|int Query result * @throws Exception */ - public function query(array $command, ?string $db = null): stdClass|array|int + public function query(array $command, ?string $db = null, bool $ignoreDuplicateErrors = false): stdClass|array|int { // Validate connection state before each operation $this->validateConnection(); @@ -351,7 +351,7 @@ public function query(array $command, ?string $db = null): stdClass|array|int $sections = Document::fromPHP($params); $message = pack('V*', 21 + strlen($sections), $this->id, 0, 2013, 0) . "\0" . $sections; - $result = $this->send($message); + $result = $this->send($message, $ignoreDuplicateErrors); $this->updateCausalConsistency($result); @@ -371,7 +371,7 @@ public function query(array $command, ?string $db = null): stdClass|array|int * @return stdClass|array|int * @throws Exception */ - public function send(mixed $data): stdClass|array|int + public function send(mixed $data, bool $ignoreDuplicateErrors = false): stdClass|array|int { // Check if connection is alive, connect if not if (!$this->client->isConnected()) { @@ -390,14 +390,14 @@ public function send(mixed $data): stdClass|array|int } } - return $this->receive(); + return $this->receive($ignoreDuplicateErrors); } /** * Receive a message from connection. * @throws Exception */ - private function receive(): stdClass|array|int + private function receive(bool $ignoreDuplicateErrors = false): stdClass|array|int { $chunks = []; $receivedLength = 0; @@ -461,7 +461,7 @@ private function receive(): stdClass|array|int $res = \implode('', $chunks); - return $this->parseResponse($res, $responseLength); + return $this->parseResponse($res, $responseLength, $ignoreDuplicateErrors); } /** @@ -717,8 +717,9 @@ public function insert(string $collection, array $document, array $options = []) * - readConcern: Read concern specification * - readPreference: Read preference * - maxTimeMS: Operation timeout in milliseconds - * - ordered: Whether to stop on first error (default: true) + * - ordered: Whether to stop on first error (default: true, forced to false when ignoreDuplicates is true) * - batchSize: Number of documents per batch (default: 1000) + * - ignoreDuplicates: When true, silently skip duplicate key errors (11000/11001) and return only successfully inserted documents * @return array Array of inserted documents with generated _ids * @throws Exception */ @@ -729,7 +730,8 @@ public function insertMany(string $collection, array $documents, array $options } $batchSize = $options['batchSize'] ?? 1000; - $ordered = $options['ordered'] ?? true; + $ignoreDuplicates = $options['ignoreDuplicates'] ?? false; + $ordered = $ignoreDuplicates ? false : ($options['ordered'] ?? true); $insertedDocs = []; // Process documents in batches for better performance @@ -775,12 +777,30 @@ public function insertMany(string $collection, array $documents, array $options } // Add other options (excluding those we've already handled) - $otherOptions = array_diff_key($options, array_flip(['session', 'writeConcern', 'readConcern', 'batchSize'])); + $otherOptions = array_diff_key($options, array_flip(['session', 'writeConcern', 'readConcern', 'batchSize', 'ignoreDuplicates', 'ordered'])); $command = array_merge($command, $otherOptions); - $this->query($command); + $result = $this->query($command, null, $ignoreDuplicates); + + if ($ignoreDuplicates && \is_object($result) && \property_exists($result, 'writeErrors')) { + // Use writeError indices to identify which docs were skipped + $failedIndices = []; + foreach ($result->writeErrors as $err) { + if (($err->code ?? 0) === 11000 || ($err->code ?? 0) === 11001) { + $failedIndices[$err->index] = true; + } + } - $insertedDocs = array_merge($insertedDocs, $this->toArray($docObjs)); + $filtered = []; + foreach ($docObjs as $i => $obj) { + if (!isset($failedIndices[$i])) { + $filtered[] = $obj; + } + } + $insertedDocs = \array_merge($insertedDocs, $this->toArray($filtered)); + } else { + $insertedDocs = \array_merge($insertedDocs, $this->toArray($docObjs)); + } } return $insertedDocs; @@ -1584,7 +1604,7 @@ public function close(): void * @return stdClass|array|int Parsed response * @throws Exception */ - private function parseResponse(string $response, int $responseLength): stdClass|array|int + private function parseResponse(string $response, int $responseLength, bool $ignoreDuplicateErrors = false): stdClass|array|int { /* * The first 21 bytes of the MongoDB wire protocol response consist of: @@ -1634,10 +1654,25 @@ private function parseResponse(string $response, int $responseLength): stdClass| // Check for write errors (duplicate key, etc.) if (\property_exists($result, 'writeErrors') && !empty($result->writeErrors)) { - throw new Exception( - $result->writeErrors[0]->errmsg, - $result->writeErrors[0]->code - ); + if ($ignoreDuplicateErrors) { + $nonDuplicateErrors = \array_filter( + (array)$result->writeErrors, + fn ($err) => ($err->code ?? 0) !== 11000 && ($err->code ?? 0) !== 11001 + ); + + if (!empty($nonDuplicateErrors)) { + $first = \reset($nonDuplicateErrors); + throw new Exception($first->errmsg, $first->code); + } + + // Return the full result so insertMany() can read writeErrors indices + return $result; + } else { + throw new Exception( + $result->writeErrors[0]->errmsg, + $result->writeErrors[0]->code + ); + } } // Check for general MongoDB errors diff --git a/tests/MongoTest.php b/tests/MongoTest.php index 2d48716..f497ed4 100644 --- a/tests/MongoTest.php +++ b/tests/MongoTest.php @@ -389,6 +389,101 @@ public function testToArrayNestedConversion() self::assertEquals([42], $client->toArray(42)); } + public function testInsertManyIgnoreDuplicates() + { + $collectionName = 'ignore_duplicates'; + $this->getDatabase()->createCollection($collectionName); + try { + // Seed two documents + $this->getDatabase()->insertMany($collectionName, [ + ['_id' => 'doc1', 'name' => 'Original A'], + ['_id' => 'doc2', 'name' => 'Original B'], + ]); + + // Without ignoreDuplicates, inserting a duplicate throws + try { + $this->getDatabase()->insertMany($collectionName, [ + ['_id' => 'doc1', 'name' => 'Duplicate A'], + ]); + self::fail('Expected duplicate key exception'); + } catch (Exception $e) { + self::assertTrue($e->isDuplicateKeyError()); + } + + // With ignoreDuplicates, duplicate is skipped and new doc is inserted + $result = $this->getDatabase()->insertMany($collectionName, [ + ['_id' => 'doc1', 'name' => 'Duplicate A'], + ['_id' => 'doc3', 'name' => 'New C'], + ], ['ignoreDuplicates' => true]); + + self::assertCount(1, $result); + self::assertEquals('doc3', $result[0]['_id']); + + // Original doc1 unchanged + $docs = $this->getDatabase()->find($collectionName, ['_id' => 'doc1'])->cursor->firstBatch ?? []; + self::assertCount(1, $docs); + self::assertEquals('Original A', $docs[0]->name); + + // Total should be 3 + $total = $this->getDatabase()->count($collectionName); + self::assertEquals(3, $total); + } finally { + $this->getDatabase()->dropCollection($collectionName); + } + } + + public function testInsertManyIgnoreIntraBatchDuplicates() + { + $collectionName = 'ignore_intra_batch'; + $this->getDatabase()->createCollection($collectionName); + try { + // Same ID twice in one batch — first wins + $result = $this->getDatabase()->insertMany($collectionName, [ + ['_id' => 'dup', 'name' => 'First'], + ['_id' => 'dup', 'name' => 'Second'], + ['_id' => 'unique1', 'name' => 'Unique'], + ], ['ignoreDuplicates' => true]); + + self::assertCount(2, $result); + + $doc = $this->getDatabase()->find($collectionName, ['_id' => 'dup'])->cursor->firstBatch ?? []; + self::assertCount(1, $doc); + self::assertEquals('First', $doc[0]->name); + + $total = $this->getDatabase()->count($collectionName); + self::assertEquals(2, $total); + } finally { + $this->getDatabase()->dropCollection($collectionName); + } + } + + public function testInsertManyIgnoreAllDuplicates() + { + $collectionName = 'ignore_all_dups'; + $this->getDatabase()->createCollection($collectionName); + try { + // Seed one document + $this->getDatabase()->insert($collectionName, ['_id' => 'existing', 'name' => 'Original']); + + // Insert only duplicates with ignoreDuplicates + $result = $this->getDatabase()->insertMany($collectionName, [ + ['_id' => 'existing', 'name' => 'Duplicate'], + ], ['ignoreDuplicates' => true]); + + self::assertCount(0, $result); + + // Original unchanged + $docs = $this->getDatabase()->find($collectionName, ['_id' => 'existing'])->cursor->firstBatch ?? []; + self::assertEquals('Original', $docs[0]->name); + + // Still only 1 document + $total = $this->getDatabase()->count($collectionName); + self::assertEquals(1, $total); + } finally { + $this->getDatabase()->dropCollection($collectionName); + } + } + public function testCountMethod() { $collectionName = 'count_test';