From 287420b01b49f3ca1934b6317be17310aa48edc0 Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Wed, 8 Apr 2026 17:14:22 +0100 Subject: [PATCH 1/4] Add ignore param to createDocuments for silent duplicate handling - Add `bool $ignore` param to `createDocuments` across all adapters (MariaDB: INSERT IGNORE, Postgres: ON CONFLICT DO NOTHING, SQLite: INSERT OR IGNORE, MongoDB: ordered:false outside transaction) - Pre-fetch existing document IDs to skip relationship writes and filter known duplicates from adapter batches - Deduplicate intra-batch documents by ID (first occurrence wins) to prevent ACL drift and overcounted results - Handle tenant-per-document mode with composite keys for pre-fetch - Chunk all find queries by maxQueryValues - Optimize upsertDocuments: batch-fetch existing docs with find instead of per-row getDocument calls (~2x speedup) - Add ignore param to Mirror::createDocuments - Add e2e tests: mixed duplicates, all duplicates, intra-batch duplicates with onNext callback assertions --- src/Database/Adapter.php | 3 +- src/Database/Adapter/Mongo.php | 22 ++- src/Database/Adapter/Pool.php | 2 +- src/Database/Adapter/Postgres.php | 29 +++ src/Database/Adapter/SQL.php | 37 +++- src/Database/Adapter/SQLite.php | 5 + src/Database/Database.php | 146 +++++++++++++-- src/Database/Mirror.php | 3 + tests/e2e/Adapter/Scopes/DocumentTests.php | 196 +++++++++++++++++++++ 9 files changed, 421 insertions(+), 22 deletions(-) diff --git a/src/Database/Adapter.php b/src/Database/Adapter.php index a7b385cce..9cc83d141 100644 --- a/src/Database/Adapter.php +++ b/src/Database/Adapter.php @@ -729,12 +729,13 @@ abstract public function createDocument(Document $collection, Document $document * * @param Document $collection * @param array $documents + * @param bool $ignore If true, silently ignore duplicate documents instead of throwing * * @return array * * @throws DatabaseException */ - abstract public function createDocuments(Document $collection, array $documents): array; + abstract public function createDocuments(Document $collection, array $documents, bool $ignore = false): array; /** * Update Document diff --git a/src/Database/Adapter/Mongo.php b/src/Database/Adapter/Mongo.php index 7ddde43d3..6ecae691d 100644 --- a/src/Database/Adapter/Mongo.php +++ b/src/Database/Adapter/Mongo.php @@ -1460,11 +1460,18 @@ public function castingBefore(Document $collection, Document $document): Documen * @throws DuplicateException * @throws DatabaseException */ - public function createDocuments(Document $collection, array $documents): array + public function createDocuments(Document $collection, array $documents, bool $ignore = false): array { $name = $this->getNamespace() . '_' . $this->filter($collection->getId()); - $options = $this->getTransactionOptions(); + if ($ignore) { + // Run outside transaction — MongoDB aborts transactions on any write error, + // so ordered:false + session would roll back even successfully inserted docs. + $options = ['ordered' => false]; + } else { + $options = $this->getTransactionOptions(); + } + $records = []; $hasSequence = null; $documents = \array_map(fn ($doc) => clone $doc, $documents); @@ -1490,7 +1497,16 @@ public function createDocuments(Document $collection, array $documents): array try { $documents = $this->client->insertMany($name, $records, $options); } catch (MongoException $e) { - throw $this->processException($e); + $processed = $this->processException($e); + + if ($ignore && $processed instanceof DuplicateException) { + // Race condition: a doc was inserted between pre-filter and insertMany. + // With ordered:false outside transaction, non-duplicate inserts persist. + // Return empty — we cannot determine which docs succeeded without querying. + return []; + } + + throw $processed; } foreach ($documents as $index => $document) { diff --git a/src/Database/Adapter/Pool.php b/src/Database/Adapter/Pool.php index 668753387..ec508c3e4 100644 --- a/src/Database/Adapter/Pool.php +++ b/src/Database/Adapter/Pool.php @@ -268,7 +268,7 @@ public function createDocument(Document $collection, Document $document): Docume return $this->delegate(__FUNCTION__, \func_get_args()); } - public function createDocuments(Document $collection, array $documents): array + public function createDocuments(Document $collection, array $documents, bool $ignore = false): array { return $this->delegate(__FUNCTION__, \func_get_args()); } diff --git a/src/Database/Adapter/Postgres.php b/src/Database/Adapter/Postgres.php index 8dcf72025..814ecc8bc 100644 --- a/src/Database/Adapter/Postgres.php +++ b/src/Database/Adapter/Postgres.php @@ -1365,6 +1365,35 @@ public function updateDocument(Document $collection, string $id, Document $docum return $document; } + protected function getInsertKeyword(bool $ignore): string + { + return 'INSERT INTO'; + } + + protected function getInsertSuffix(bool $ignore, string $table): string + { + if (!$ignore) { + return ''; + } + + $conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")'; + + return "ON CONFLICT {$conflictTarget} DO NOTHING"; + } + + protected function getInsertPermissionsSuffix(bool $ignore): string + { + if (!$ignore) { + return ''; + } + + $conflictTarget = $this->sharedTables + ? '("_type", "_permission", "_document", "_tenant")' + : '("_type", "_permission", "_document")'; + + return "ON CONFLICT {$conflictTarget} DO NOTHING"; + } + /** * @param string $tableName * @param string $columns diff --git a/src/Database/Adapter/SQL.php b/src/Database/Adapter/SQL.php index 6864e6aee..dbe66ce12 100644 --- a/src/Database/Adapter/SQL.php +++ b/src/Database/Adapter/SQL.php @@ -2471,7 +2471,7 @@ protected function execute(mixed $stmt): bool * @throws DuplicateException * @throws \Throwable */ - public function createDocuments(Document $collection, array $documents): array + public function createDocuments(Document $collection, array $documents, bool $ignore = false): array { if (empty($documents)) { return $documents; @@ -2573,8 +2573,9 @@ public function createDocuments(Document $collection, array $documents): array $batchKeys = \implode(', ', $batchKeys); $stmt = $this->getPDO()->prepare(" - INSERT INTO {$this->getSQLTable($name)} {$columns} + {$this->getInsertKeyword($ignore)} {$this->getSQLTable($name)} {$columns} VALUES {$batchKeys} + {$this->getInsertSuffix($ignore, $name)} "); foreach ($bindValues as $key => $value) { @@ -2588,8 +2589,9 @@ public function createDocuments(Document $collection, array $documents): array $permissions = \implode(', ', $permissions); $sqlPermissions = " - INSERT INTO {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) - VALUES {$permissions}; + {$this->getInsertKeyword($ignore)} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) + VALUES {$permissions} + {$this->getInsertPermissionsSuffix($ignore)} "; $stmtPermissions = $this->getPDO()->prepare($sqlPermissions); @@ -2608,6 +2610,33 @@ public function createDocuments(Document $collection, array $documents): array return $documents; } + /** + * Returns the INSERT keyword, optionally with IGNORE for duplicate handling. + * Override in adapter subclasses for DB-specific syntax. + */ + protected function getInsertKeyword(bool $ignore): string + { + return $ignore ? 'INSERT IGNORE INTO' : 'INSERT INTO'; + } + + /** + * Returns a suffix appended after VALUES clause for duplicate handling. + * Override in adapter subclasses (e.g., Postgres uses ON CONFLICT DO NOTHING). + */ + protected function getInsertSuffix(bool $ignore, string $table): string + { + return ''; + } + + /** + * Returns a suffix for the permissions INSERT statement when ignoring duplicates. + * Override in adapter subclasses for DB-specific syntax. + */ + protected function getInsertPermissionsSuffix(bool $ignore): string + { + return ''; + } + /** * @param Document $collection * @param string $attribute diff --git a/src/Database/Adapter/SQLite.php b/src/Database/Adapter/SQLite.php index 3c25987eb..8e7ef6b5b 100644 --- a/src/Database/Adapter/SQLite.php +++ b/src/Database/Adapter/SQLite.php @@ -34,6 +34,11 @@ */ class SQLite extends MariaDB { + protected function getInsertKeyword(bool $ignore): string + { + return $ignore ? 'INSERT OR IGNORE INTO' : 'INSERT INTO'; + } + /** * @inheritDoc */ diff --git a/src/Database/Database.php b/src/Database/Database.php index ac58d72f0..5e66334fb 100644 --- a/src/Database/Database.php +++ b/src/Database/Database.php @@ -5621,6 +5621,7 @@ public function createDocument(string $collection, Document $document): Document * @param int $batchSize * @param (callable(Document): void)|null $onNext * @param (callable(Throwable): void)|null $onError + * @param bool $ignore If true, silently ignore duplicate documents instead of throwing * @return int * @throws AuthorizationException * @throws StructureException @@ -5633,6 +5634,7 @@ public function createDocuments( int $batchSize = self::INSERT_BATCH_SIZE, ?callable $onNext = null, ?callable $onError = null, + bool $ignore = false, ): int { if (!$this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument()) { throw new DatabaseException('Shared tables must be enabled if tenant per document is enabled.'); @@ -5653,6 +5655,71 @@ public function createDocuments( $time = DateTime::now(); $modified = 0; + // Deduplicate intra-batch documents by ID when ignore mode is on. + // Keeps the first occurrence, mirrors upsertDocuments' seenIds check. + if ($ignore) { + $seenIds = []; + $deduplicated = []; + foreach ($documents as $document) { + $docId = $document->getId(); + if ($docId !== '' && isset($seenIds[$docId])) { + continue; + } + if ($docId !== '') { + $seenIds[$docId] = true; + } + $deduplicated[] = $document; + } + $documents = $deduplicated; + } + + // When ignore mode is on and relationships are being resolved, + // pre-fetch existing document IDs so we skip relationship writes for duplicates + $preExistingIds = []; + $tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument(); + if ($ignore) { + if ($tenantPerDocument) { + $idsByTenant = []; + foreach ($documents as $doc) { + $idsByTenant[$doc->getTenant()][] = $doc->getId(); + } + foreach ($idsByTenant as $tenant => $tenantIds) { + $tenantIds = \array_values(\array_unique($tenantIds)); + foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) { + $existing = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find( + $collection->getId(), + [ + Query::equal('$id', $idChunk), + Query::select(['$id']), + Query::limit(\count($idChunk)), + ] + )))); + foreach ($existing as $doc) { + $preExistingIds[$tenant . ':' . $doc->getId()] = true; + } + } + } + } else { + $inputIds = \array_values(\array_unique(\array_filter( + \array_map(fn (Document $doc) => $doc->getId(), $documents) + ))); + + foreach (\array_chunk($inputIds, \max(1, $this->maxQueryValues)) as $idChunk) { + $existing = $this->authorization->skip(fn () => $this->silent(fn () => $this->find( + $collection->getId(), + [ + Query::equal('$id', $idChunk), + Query::select(['$id']), + Query::limit(\count($idChunk)), + ] + ))); + foreach ($existing as $doc) { + $preExistingIds[$doc->getId()] = true; + } + } + } + } + foreach ($documents as $document) { $createdAt = $document->getCreatedAt(); $updatedAt = $document->getUpdatedAt(); @@ -5693,15 +5760,33 @@ public function createDocuments( } if ($this->resolveRelationships) { - $document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document)); + $preExistKey = $tenantPerDocument + ? $document->getTenant() . ':' . $document->getId() + : $document->getId(); + + if (!isset($preExistingIds[$preExistKey])) { + $document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document)); + } } $document = $this->adapter->castingBefore($collection, $document); } foreach (\array_chunk($documents, $batchSize) as $chunk) { - $batch = $this->withTransaction(function () use ($collection, $chunk) { - return $this->adapter->createDocuments($collection, $chunk); + if ($ignore && !empty($preExistingIds)) { + $chunk = \array_values(\array_filter($chunk, function (Document $doc) use ($preExistingIds, $tenantPerDocument) { + $key = $tenantPerDocument + ? $doc->getTenant() . ':' . $doc->getId() + : $doc->getId(); + return !isset($preExistingIds[$key]); + })); + if (empty($chunk)) { + continue; + } + } + + $batch = $this->withTransaction(function () use ($collection, $chunk, $ignore) { + return $this->adapter->createDocuments($collection, $chunk, $ignore); }); $batch = $this->adapter->getSequences($collection->getId(), $batch); @@ -7116,18 +7201,53 @@ public function upsertDocumentsWithIncrease( $created = 0; $updated = 0; $seenIds = []; - foreach ($documents as $key => $document) { - if ($this->getSharedTables() && $this->getTenantPerDocument()) { - $old = $this->authorization->skip(fn () => $this->withTenant($document->getTenant(), fn () => $this->silent(fn () => $this->getDocument( - $collection->getId(), - $document->getId(), - )))); + + // Batch-fetch existing documents in one query instead of N individual getDocument() calls + $ids = \array_filter(\array_map(fn ($doc) => $doc->getId(), $documents)); + $existingDocs = []; + $upsertTenantPerDocument = $this->getSharedTables() && $this->getTenantPerDocument(); + + if (!empty($ids)) { + $uniqueIds = \array_values(\array_unique($ids)); + + if ($upsertTenantPerDocument) { + // Group IDs by tenant and fetch each group separately + // Use composite key tenant:id to avoid cross-tenant collisions + $idsByTenant = []; + foreach ($documents as $doc) { + $tenant = $doc->getTenant(); + $idsByTenant[$tenant][] = $doc->getId(); + } + foreach ($idsByTenant as $tenant => $tenantIds) { + $tenantIds = \array_values(\array_unique($tenantIds)); + foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) { + $fetched = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find( + $collection->getId(), + [Query::equal('$id', $idChunk), Query::limit(\count($idChunk))], + )))); + foreach ($fetched as $doc) { + $existingDocs[$tenant . ':' . $doc->getId()] = $doc; + } + } + } } else { - $old = $this->authorization->skip(fn () => $this->silent(fn () => $this->getDocument( - $collection->getId(), - $document->getId(), - ))); + foreach (\array_chunk($uniqueIds, \max(1, $this->maxQueryValues)) as $idChunk) { + $fetched = $this->authorization->skip(fn () => $this->silent(fn () => $this->find( + $collection->getId(), + [Query::equal('$id', $idChunk), Query::limit(\count($idChunk))], + ))); + foreach ($fetched as $doc) { + $existingDocs[$doc->getId()] = $doc; + } + } } + } + + foreach ($documents as $key => $document) { + $lookupKey = $upsertTenantPerDocument + ? $document->getTenant() . ':' . $document->getId() + : $document->getId(); + $old = $existingDocs[$lookupKey] ?? new Document(); // Extract operators early to avoid comparison issues $documentArray = $document->getArrayCopy(); diff --git a/src/Database/Mirror.php b/src/Database/Mirror.php index f740cab3e..636d273dd 100644 --- a/src/Database/Mirror.php +++ b/src/Database/Mirror.php @@ -600,6 +600,7 @@ public function createDocuments( int $batchSize = self::INSERT_BATCH_SIZE, ?callable $onNext = null, ?callable $onError = null, + bool $ignore = false, ): int { $modified = $this->source->createDocuments( $collection, @@ -607,6 +608,7 @@ public function createDocuments( $batchSize, $onNext, $onError, + $ignore, ); if ( @@ -645,6 +647,7 @@ public function createDocuments( $collection, $clones, $batchSize, + ignore: $ignore, ) ); diff --git a/tests/e2e/Adapter/Scopes/DocumentTests.php b/tests/e2e/Adapter/Scopes/DocumentTests.php index d16004d32..34a463200 100644 --- a/tests/e2e/Adapter/Scopes/DocumentTests.php +++ b/tests/e2e/Adapter/Scopes/DocumentTests.php @@ -7722,4 +7722,200 @@ public function testRegexInjection(): void // } // $database->deleteCollection($collectionName); // } + + public function testCreateDocumentsIgnoreDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + + // Insert initial documents + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Original A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'doc2', + 'name' => 'Original B', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + + // Without ignore, duplicates should throw + try { + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Duplicate A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + $this->fail('Expected DuplicateException'); + } catch (DuplicateException $e) { + $this->assertNotEmpty($e->getMessage()); + } + + // With ignore, duplicates should be silently skipped + $emittedIds = []; + $count = $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Duplicate A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'doc3', + 'name' => 'New C', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }, ignore: true); + + // Only doc3 was new, doc1 was skipped as duplicate + $this->assertSame(1, $count); + $this->assertCount(1, $emittedIds); + $this->assertSame('doc3', $emittedIds[0]); + + // doc3 should exist, doc1 should retain original value + $doc1 = $database->getDocument(__FUNCTION__, 'doc1'); + $this->assertSame('Original A', $doc1->getAttribute('name')); + + $doc3 = $database->getDocument(__FUNCTION__, 'doc3'); + $this->assertSame('New C', $doc3->getAttribute('name')); + + // Total should be 3 (doc1, doc2, doc3) + $all = $database->find(__FUNCTION__); + $this->assertCount(3, $all); + } + + public function testCreateDocumentsIgnoreIntraBatchDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + $col = 'createDocsIgnoreIntraBatch'; + + $database->createCollection($col); + $database->createAttribute($col, 'name', Database::VAR_STRING, 128, true); + + // Two docs with same ID in one batch — first wins, second is deduplicated + $emittedIds = []; + $count = $database->createDocuments($col, [ + new Document([ + '$id' => 'dup', + 'name' => 'First', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'dup', + 'name' => 'Second', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + Permission::update(Role::user('extra')), + ], + ]), + new Document([ + '$id' => 'unique1', + 'name' => 'Unique', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }, ignore: true); + + $this->assertSame(2, $count); + $this->assertCount(2, $emittedIds); + + // First occurrence wins + $doc = $database->getDocument($col, 'dup'); + $this->assertSame('First', $doc->getAttribute('name')); + + // Second doc's extra permission should NOT exist (no ACL drift) + $perms = $doc->getPermissions(); + foreach ($perms as $perm) { + $this->assertStringNotContainsString('extra', $perm); + } + + // unique1 should exist + $unique = $database->getDocument($col, 'unique1'); + $this->assertSame('Unique', $unique->getAttribute('name')); + + // Total: 2 documents + $all = $database->find($col); + $this->assertCount(2, $all); + } + + public function testCreateDocumentsIgnoreAllDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + + // Insert initial document + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'existing', + 'name' => 'Original', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + + // With ignore, inserting only duplicates should succeed with no new rows + $emittedIds = []; + $count = $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'existing', + 'name' => 'Duplicate', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }, ignore: true); + + // All duplicates skipped, nothing inserted + $this->assertSame(0, $count); + $this->assertSame([], $emittedIds); + + // Original document should be unchanged + $doc = $database->getDocument(__FUNCTION__, 'existing'); + $this->assertSame('Original', $doc->getAttribute('name')); + + // Still only 1 document + $all = $database->find(__FUNCTION__); + $this->assertCount(1, $all); + } } From 2b4fd1efba0f9e0f31c4c0f4506321c944ce1842 Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Wed, 8 Apr 2026 21:38:29 +0100 Subject: [PATCH 2/4] Fix MongoDB transaction safety, tenant-aware dedup, and empty ID filtering - Pre-filter duplicates inside MongoDB session instead of bypassing the transaction, ensuring ignore mode works correctly within transactions - Use tenant-aware composite keys for intra-batch dedup in tenant-per-document mode - Add array_filter() to tenant prefetch paths to skip empty IDs, consistent with non-tenant paths --- src/Database/Adapter/Mongo.php | 50 ++++++++++++++++++++++------------ src/Database/Database.php | 19 ++++++++----- 2 files changed, 45 insertions(+), 24 deletions(-) diff --git a/src/Database/Adapter/Mongo.php b/src/Database/Adapter/Mongo.php index 6ecae691d..ad136d5e5 100644 --- a/src/Database/Adapter/Mongo.php +++ b/src/Database/Adapter/Mongo.php @@ -1463,14 +1463,7 @@ public function castingBefore(Document $collection, Document $document): Documen public function createDocuments(Document $collection, array $documents, bool $ignore = false): array { $name = $this->getNamespace() . '_' . $this->filter($collection->getId()); - - if ($ignore) { - // Run outside transaction — MongoDB aborts transactions on any write error, - // so ordered:false + session would roll back even successfully inserted docs. - $options = ['ordered' => false]; - } else { - $options = $this->getTransactionOptions(); - } + $options = $this->getTransactionOptions(); $records = []; $hasSequence = null; @@ -1494,19 +1487,42 @@ public function createDocuments(Document $collection, array $documents, bool $ig $records[] = $record; } - try { - $documents = $this->client->insertMany($name, $records, $options); - } catch (MongoException $e) { - $processed = $this->processException($e); + // In ignore mode, pre-filter duplicates within the same session to avoid + // BulkWriteException which would abort the transaction. + if ($ignore && !empty($records)) { + $uids = \array_filter(\array_map(fn ($r) => $r['_uid'] ?? null, $records)); + if (!empty($uids)) { + $findOptions = $this->getTransactionOptions(['projection' => ['_uid' => 1]]); + $result = $this->client->find($name, ['_uid' => ['$in' => \array_values($uids)]], $findOptions); + $existingUids = []; + foreach ($result->cursor->firstBatch ?? [] as $doc) { + $existingUids[$doc->_uid] = true; + } - if ($ignore && $processed instanceof DuplicateException) { - // Race condition: a doc was inserted between pre-filter and insertMany. - // With ordered:false outside transaction, non-duplicate inserts persist. - // Return empty — we cannot determine which docs succeeded without querying. + if (!empty($existingUids)) { + $filteredRecords = []; + $filteredDocuments = []; + foreach ($records as $i => $record) { + $uid = $record['_uid'] ?? ''; + if (!isset($existingUids[$uid])) { + $filteredRecords[] = $record; + $filteredDocuments[] = $documents[$i]; + } + } + $records = $filteredRecords; + $documents = $filteredDocuments; + } + } + + if (empty($records)) { return []; } + } - throw $processed; + try { + $documents = $this->client->insertMany($name, $records, $options); + } catch (MongoException $e) { + throw $this->processException($e); } foreach ($documents as $index => $document) { diff --git a/src/Database/Database.php b/src/Database/Database.php index 5e66334fb..6bfa9d69b 100644 --- a/src/Database/Database.php +++ b/src/Database/Database.php @@ -5655,18 +5655,24 @@ public function createDocuments( $time = DateTime::now(); $modified = 0; + $tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument(); + // Deduplicate intra-batch documents by ID when ignore mode is on. // Keeps the first occurrence, mirrors upsertDocuments' seenIds check. + // In tenant-per-document mode, dedupe by tenant+id to allow same ID across tenants. if ($ignore) { $seenIds = []; $deduplicated = []; foreach ($documents as $document) { $docId = $document->getId(); - if ($docId !== '' && isset($seenIds[$docId])) { - continue; - } if ($docId !== '') { - $seenIds[$docId] = true; + $dedupeKey = $tenantPerDocument + ? $document->getTenant() . ':' . $docId + : $docId; + if (isset($seenIds[$dedupeKey])) { + continue; + } + $seenIds[$dedupeKey] = true; } $deduplicated[] = $document; } @@ -5676,7 +5682,6 @@ public function createDocuments( // When ignore mode is on and relationships are being resolved, // pre-fetch existing document IDs so we skip relationship writes for duplicates $preExistingIds = []; - $tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument(); if ($ignore) { if ($tenantPerDocument) { $idsByTenant = []; @@ -5684,7 +5689,7 @@ public function createDocuments( $idsByTenant[$doc->getTenant()][] = $doc->getId(); } foreach ($idsByTenant as $tenant => $tenantIds) { - $tenantIds = \array_values(\array_unique($tenantIds)); + $tenantIds = \array_values(\array_unique(\array_filter($tenantIds))); foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) { $existing = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find( $collection->getId(), @@ -7219,7 +7224,7 @@ public function upsertDocumentsWithIncrease( $idsByTenant[$tenant][] = $doc->getId(); } foreach ($idsByTenant as $tenant => $tenantIds) { - $tenantIds = \array_values(\array_unique($tenantIds)); + $tenantIds = \array_values(\array_unique(\array_filter($tenantIds))); foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) { $fetched = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find( $collection->getId(), From 37aace4e18a8a81f073e216e283d3183cd6ce252 Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Wed, 8 Apr 2026 21:46:41 +0100 Subject: [PATCH 3/4] Pre-filter duplicates in SQL adapter to prevent race-condition overcount --- src/Database/Adapter/SQL.php | 48 ++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/src/Database/Adapter/SQL.php b/src/Database/Adapter/SQL.php index dbe66ce12..5910d6348 100644 --- a/src/Database/Adapter/SQL.php +++ b/src/Database/Adapter/SQL.php @@ -2476,6 +2476,54 @@ public function createDocuments(Document $collection, array $documents, bool $ig if (empty($documents)) { return $documents; } + + // Pre-filter duplicates inside the transaction to prevent race conditions. + // Query which UIDs already exist and remove them from the batch. + if ($ignore) { + $collectionId = $collection->getId(); + $name = $this->filter($collectionId); + $uids = \array_filter(\array_map(fn (Document $doc) => $doc->getId(), $documents)); + + if (!empty($uids)) { + $placeholders = []; + $binds = []; + foreach (\array_values(\array_unique($uids)) as $i => $uid) { + $key = ':_dup_uid_' . $i; + $placeholders[] = $key; + $binds[$key] = $uid; + } + + $tenantFilter = ''; + if ($this->sharedTables) { + $tenantFilter = ' AND _tenant = :_dup_tenant'; + $binds[':_dup_tenant'] = $this->getTenant(); + } + + $sql = 'SELECT _uid FROM ' . $this->getSQLTable($name) + . ' WHERE _uid IN (' . \implode(', ', $placeholders) . ')' + . $tenantFilter; + + $stmt = $this->getPDO()->prepare($sql); + foreach ($binds as $k => $v) { + $stmt->bindValue($k, $v, $this->getPDOType($v)); + } + $stmt->execute(); + $existingUids = \array_flip(\array_column($stmt->fetchAll(), '_uid')); + $stmt->closeCursor(); + + if (!empty($existingUids)) { + $documents = \array_values(\array_filter( + $documents, + fn (Document $doc) => !isset($existingUids[$doc->getId()]) + )); + } + } + + if (empty($documents)) { + return []; + } + } + $spatialAttributes = $this->getSpatialAttributes($collection); $collection = $collection->getId(); try { From 9653052ed281296f14b399fdd434a9c4d918cae9 Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Wed, 8 Apr 2026 21:52:32 +0100 Subject: [PATCH 4/4] Add tenant filter to Mongo adapter pre-filter query in ignore mode --- src/Database/Adapter/Mongo.php | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Database/Adapter/Mongo.php b/src/Database/Adapter/Mongo.php index ad136d5e5..1700a92ab 100644 --- a/src/Database/Adapter/Mongo.php +++ b/src/Database/Adapter/Mongo.php @@ -1493,7 +1493,11 @@ public function createDocuments(Document $collection, array $documents, bool $ig $uids = \array_filter(\array_map(fn ($r) => $r['_uid'] ?? null, $records)); if (!empty($uids)) { $findOptions = $this->getTransactionOptions(['projection' => ['_uid' => 1]]); - $result = $this->client->find($name, ['_uid' => ['$in' => \array_values($uids)]], $findOptions); + $filters = ['_uid' => ['$in' => \array_values($uids)]]; + if ($this->sharedTables) { + $filters['_tenant'] = $this->getTenantFilters($collection->getId()); + } + $result = $this->client->find($name, $filters, $findOptions); $existingUids = []; foreach ($result->cursor->firstBatch ?? [] as $doc) { $existingUids[$doc->_uid] = true;