Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 51 additions & 16 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public function createUuid(): string
* @return stdClass|array|int Query result
* @throws Exception
*/
public function query(array $command, ?string $db = null): stdClass|array|int
public function query(array $command, ?string $db = null, bool $ignoreDuplicateErrors = false): stdClass|array|int
{
// Validate connection state before each operation
$this->validateConnection();
Expand Down Expand Up @@ -351,7 +351,7 @@ public function query(array $command, ?string $db = null): stdClass|array|int

$sections = Document::fromPHP($params);
$message = pack('V*', 21 + strlen($sections), $this->id, 0, 2013, 0) . "\0" . $sections;
$result = $this->send($message);
$result = $this->send($message, $ignoreDuplicateErrors);

$this->updateCausalConsistency($result);

Expand All @@ -371,7 +371,7 @@ public function query(array $command, ?string $db = null): stdClass|array|int
* @return stdClass|array|int
* @throws Exception
*/
public function send(mixed $data): stdClass|array|int
public function send(mixed $data, bool $ignoreDuplicateErrors = false): stdClass|array|int
{
// Check if connection is alive, connect if not
if (!$this->client->isConnected()) {
Expand All @@ -390,14 +390,14 @@ public function send(mixed $data): stdClass|array|int
}
}

return $this->receive();
return $this->receive($ignoreDuplicateErrors);
}

/**
* Receive a message from connection.
* @throws Exception
*/
private function receive(): stdClass|array|int
private function receive(bool $ignoreDuplicateErrors = false): stdClass|array|int
{
$chunks = [];
$receivedLength = 0;
Expand Down Expand Up @@ -461,7 +461,7 @@ private function receive(): stdClass|array|int

$res = \implode('', $chunks);

return $this->parseResponse($res, $responseLength);
return $this->parseResponse($res, $responseLength, $ignoreDuplicateErrors);
}

/**
Expand Down Expand Up @@ -717,8 +717,9 @@ public function insert(string $collection, array $document, array $options = [])
* - readConcern: Read concern specification
* - readPreference: Read preference
* - maxTimeMS: Operation timeout in milliseconds
* - ordered: Whether to stop on first error (default: true)
* - ordered: Whether to stop on first error (default: true, forced to false when ignoreDuplicates is true)
* - batchSize: Number of documents per batch (default: 1000)
* - ignoreDuplicates: When true, silently skip duplicate key errors (11000/11001) and return only successfully inserted documents
* @return array Array of inserted documents with generated _ids
* @throws Exception
*/
Expand All @@ -729,7 +730,8 @@ public function insertMany(string $collection, array $documents, array $options
}

$batchSize = $options['batchSize'] ?? 1000;
$ordered = $options['ordered'] ?? true;
$ignoreDuplicates = $options['ignoreDuplicates'] ?? false;
$ordered = $ignoreDuplicates ? false : ($options['ordered'] ?? true);
$insertedDocs = [];

// Process documents in batches for better performance
Expand Down Expand Up @@ -775,12 +777,30 @@ public function insertMany(string $collection, array $documents, array $options
}

// Add other options (excluding those we've already handled)
$otherOptions = array_diff_key($options, array_flip(['session', 'writeConcern', 'readConcern', 'batchSize']));
$otherOptions = array_diff_key($options, array_flip(['session', 'writeConcern', 'readConcern', 'batchSize', 'ignoreDuplicates', 'ordered']));
$command = array_merge($command, $otherOptions);

$this->query($command);
$result = $this->query($command, null, $ignoreDuplicates);

if ($ignoreDuplicates && \is_object($result) && \property_exists($result, 'writeErrors')) {
// Use writeError indices to identify which docs were skipped
$failedIndices = [];
foreach ($result->writeErrors as $err) {
if (($err->code ?? 0) === 11000 || ($err->code ?? 0) === 11001) {
$failedIndices[$err->index] = true;
}
}

$insertedDocs = array_merge($insertedDocs, $this->toArray($docObjs));
$filtered = [];
foreach ($docObjs as $i => $obj) {
if (!isset($failedIndices[$i])) {
$filtered[] = $obj;
}
}
$insertedDocs = \array_merge($insertedDocs, $this->toArray($filtered));
} else {
$insertedDocs = \array_merge($insertedDocs, $this->toArray($docObjs));
}
}

return $insertedDocs;
Expand Down Expand Up @@ -1584,7 +1604,7 @@ public function close(): void
* @return stdClass|array|int Parsed response
* @throws Exception
*/
private function parseResponse(string $response, int $responseLength): stdClass|array|int
private function parseResponse(string $response, int $responseLength, bool $ignoreDuplicateErrors = false): stdClass|array|int
{
/*
* The first 21 bytes of the MongoDB wire protocol response consist of:
Expand Down Expand Up @@ -1634,10 +1654,25 @@ private function parseResponse(string $response, int $responseLength): stdClass|

// Check for write errors (duplicate key, etc.)
if (\property_exists($result, 'writeErrors') && !empty($result->writeErrors)) {
throw new Exception(
$result->writeErrors[0]->errmsg,
$result->writeErrors[0]->code
);
if ($ignoreDuplicateErrors) {
$nonDuplicateErrors = \array_filter(
(array)$result->writeErrors,
fn ($err) => ($err->code ?? 0) !== 11000 && ($err->code ?? 0) !== 11001
);

if (!empty($nonDuplicateErrors)) {
$first = \reset($nonDuplicateErrors);
throw new Exception($first->errmsg, $first->code);
}

// Return the full result so insertMany() can read writeErrors indices
return $result;
} else {
throw new Exception(
$result->writeErrors[0]->errmsg,
$result->writeErrors[0]->code
);
}
}

// Check for general MongoDB errors
Expand Down
95 changes: 95 additions & 0 deletions tests/MongoTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,101 @@ public function testToArrayNestedConversion()
self::assertEquals([42], $client->toArray(42));
}

public function testInsertManyIgnoreDuplicates()
{
$collectionName = 'ignore_duplicates';
$this->getDatabase()->createCollection($collectionName);
try {
// Seed two documents
$this->getDatabase()->insertMany($collectionName, [
['_id' => 'doc1', 'name' => 'Original A'],
['_id' => 'doc2', 'name' => 'Original B'],
]);

// Without ignoreDuplicates, inserting a duplicate throws
try {
$this->getDatabase()->insertMany($collectionName, [
['_id' => 'doc1', 'name' => 'Duplicate A'],
]);
self::fail('Expected duplicate key exception');
} catch (Exception $e) {
self::assertTrue($e->isDuplicateKeyError());
}

// With ignoreDuplicates, duplicate is skipped and new doc is inserted
$result = $this->getDatabase()->insertMany($collectionName, [
['_id' => 'doc1', 'name' => 'Duplicate A'],
['_id' => 'doc3', 'name' => 'New C'],
], ['ignoreDuplicates' => true]);

self::assertCount(1, $result);
self::assertEquals('doc3', $result[0]['_id']);

// Original doc1 unchanged
$docs = $this->getDatabase()->find($collectionName, ['_id' => 'doc1'])->cursor->firstBatch ?? [];
self::assertCount(1, $docs);
self::assertEquals('Original A', $docs[0]->name);

// Total should be 3
$total = $this->getDatabase()->count($collectionName);
self::assertEquals(3, $total);
} finally {
$this->getDatabase()->dropCollection($collectionName);
}
}

public function testInsertManyIgnoreIntraBatchDuplicates()
{
$collectionName = 'ignore_intra_batch';
$this->getDatabase()->createCollection($collectionName);
try {
// Same ID twice in one batch — first wins
$result = $this->getDatabase()->insertMany($collectionName, [
['_id' => 'dup', 'name' => 'First'],
['_id' => 'dup', 'name' => 'Second'],
['_id' => 'unique1', 'name' => 'Unique'],
], ['ignoreDuplicates' => true]);

self::assertCount(2, $result);

$doc = $this->getDatabase()->find($collectionName, ['_id' => 'dup'])->cursor->firstBatch ?? [];
self::assertCount(1, $doc);
self::assertEquals('First', $doc[0]->name);

$total = $this->getDatabase()->count($collectionName);
self::assertEquals(2, $total);
} finally {
$this->getDatabase()->dropCollection($collectionName);
}
}

public function testInsertManyIgnoreAllDuplicates()
{
$collectionName = 'ignore_all_dups';
$this->getDatabase()->createCollection($collectionName);
try {
// Seed one document
$this->getDatabase()->insert($collectionName, ['_id' => 'existing', 'name' => 'Original']);

// Insert only duplicates with ignoreDuplicates
$result = $this->getDatabase()->insertMany($collectionName, [
['_id' => 'existing', 'name' => 'Duplicate'],
], ['ignoreDuplicates' => true]);

self::assertCount(0, $result);

// Original unchanged
$docs = $this->getDatabase()->find($collectionName, ['_id' => 'existing'])->cursor->firstBatch ?? [];
self::assertEquals('Original', $docs[0]->name);

// Still only 1 document
$total = $this->getDatabase()->count($collectionName);
self::assertEquals(1, $total);
} finally {
$this->getDatabase()->dropCollection($collectionName);
}
}

public function testCountMethod()
{
$collectionName = 'count_test';
Expand Down
Loading