diff --git a/src/Database/Adapter.php b/src/Database/Adapter.php index a7b385cce..757d0c464 100644 --- a/src/Database/Adapter.php +++ b/src/Database/Adapter.php @@ -33,6 +33,36 @@ abstract class Adapter protected bool $alterLocks = false; + protected bool $skipDuplicates = false; + + /** + * Run a callback with skipDuplicates enabled. + * Duplicate key errors during createDocuments() will be silently skipped + * instead of thrown. Nestable — saves and restores previous state. + * Pass $enable = false to run the callback without toggling the flag + * (useful for conditional forwarding from wrappers like Pool/Mirror). + * + * @template T + * @param callable(): T $callback + * @param bool $enable + * @return T + */ + public function skipDuplicates(callable $callback, bool $enable = true): mixed + { + if (!$enable) { + return $callback(); + } + + $previous = $this->skipDuplicates; + $this->skipDuplicates = true; + + try { + return $callback(); + } finally { + $this->skipDuplicates = $previous; + } + } + /** * @var array */ diff --git a/src/Database/Adapter/Mongo.php b/src/Database/Adapter/Mongo.php index 7ddde43d3..3f6bbc354 100644 --- a/src/Database/Adapter/Mongo.php +++ b/src/Database/Adapter/Mongo.php @@ -1449,6 +1449,42 @@ public function castingBefore(Document $collection, Document $document): Documen return $document; } + /** + * Find existing `_uid` values matching the given filters, fully walking the cursor. + * + * @param string $name + * @param array $filters + * @param int $batchSize Hint: expected max result count — lets MongoDB return everything in one batch + * @return array + * + * @throws MongoException + */ + private function findExistingUids(string $name, array $filters, int $batchSize): array + { + $options = $this->getTransactionOptions([ + 'projection' => ['_uid' => 1], + 'batchSize' => $batchSize, + ]); + + $response = $this->client->find($name, $filters, $options); + $uids = []; + + foreach ($response->cursor->firstBatch ?? [] as $doc) { + $uids[] = $doc->_uid; + } + + $cursorId = $response->cursor->id ?? 0; + while ($cursorId != 0) { + $more = $this->client->getMore($cursorId, $name, $batchSize); + foreach ($more->cursor->nextBatch ?? [] as $doc) { + $uids[] = $doc->_uid; + } + $cursorId = $more->cursor->id ?? 0; + } + + return $uids; + } + /** * Create Documents in batches * @@ -1487,6 +1523,72 @@ public function createDocuments(Document $collection, array $documents): array $records[] = $record; } + // Pre-filter duplicates within the session — MongoDB has no INSERT IGNORE, + // so sending a duplicate would abort the entire transaction. + if ($this->skipDuplicates && !empty($records)) { + $tenantPerDoc = $this->sharedTables && $this->tenantPerDocument; + $recordKey = fn (array $record): string => $tenantPerDoc + ? ($record['_tenant'] ?? $this->getTenant()) . ':' . ($record['_uid'] ?? '') + : ($record['_uid'] ?? ''); + + $existingKeys = []; + + try { + if ($tenantPerDoc) { + $idsByTenant = []; + foreach ($records as $record) { + $uid = $record['_uid'] ?? ''; + if ($uid === '') { + continue; + } + $tenant = $record['_tenant'] ?? $this->getTenant(); + $idsByTenant[$tenant][] = $uid; + } + + foreach ($idsByTenant as $tenant => $tenantUids) { + $tenantUids = \array_values(\array_unique($tenantUids)); + $filters = ['_uid' => ['$in' => $tenantUids], '_tenant' => $tenant]; + foreach ($this->findExistingUids($name, $filters, \count($tenantUids)) as $uid) { + $existingKeys[$tenant . ':' . $uid] = true; + } + } + } else { + $uids = \array_values(\array_unique(\array_filter( + \array_map(fn ($r) => $r['_uid'] ?? null, $records), + fn ($v) => $v !== null + ))); + if (!empty($uids)) { + $filters = ['_uid' => ['$in' => $uids]]; + if ($this->sharedTables) { + $filters['_tenant'] = $this->getTenantFilters($collection->getId()); + } + foreach ($this->findExistingUids($name, $filters, \count($uids)) as $uid) { + $existingKeys[$uid] = true; + } + } + } + } catch (MongoException $e) { + throw $this->processException($e); + } + + if (!empty($existingKeys)) { + $filteredRecords = []; + $filteredDocuments = []; + foreach ($records as $i => $record) { + if (!isset($existingKeys[$recordKey($record)])) { + $filteredRecords[] = $record; + $filteredDocuments[] = $documents[$i]; + } + } + $records = $filteredRecords; + $documents = $filteredDocuments; + } + + if (empty($records)) { + return []; + } + } + try { $documents = $this->client->insertMany($name, $records, $options); } catch (MongoException $e) { diff --git a/src/Database/Adapter/Pool.php b/src/Database/Adapter/Pool.php index 668753387..648372402 100644 --- a/src/Database/Adapter/Pool.php +++ b/src/Database/Adapter/Pool.php @@ -43,7 +43,10 @@ public function __construct(UtopiaPool $pool) public function delegate(string $method, array $args): mixed { if ($this->pinnedAdapter !== null) { - return $this->pinnedAdapter->{$method}(...$args); + return $this->pinnedAdapter->skipDuplicates( + fn () => $this->pinnedAdapter->{$method}(...$args), + $this->skipDuplicates + ); } return $this->pool->use(function (Adapter $adapter) use ($method, $args) { @@ -66,7 +69,10 @@ public function delegate(string $method, array $args): mixed $adapter->setMetadata($key, $value); } - return $adapter->{$method}(...$args); + return $adapter->skipDuplicates( + fn () => $adapter->{$method}(...$args), + $this->skipDuplicates + ); }); } @@ -146,7 +152,10 @@ public function withTransaction(callable $callback): mixed $this->pinnedAdapter = $adapter; try { - return $adapter->withTransaction($callback); + return $adapter->skipDuplicates( + fn () => $adapter->withTransaction($callback), + $this->skipDuplicates + ); } finally { $this->pinnedAdapter = null; } diff --git a/src/Database/Adapter/Postgres.php b/src/Database/Adapter/Postgres.php index 8dcf72025..03036dd2f 100644 --- a/src/Database/Adapter/Postgres.php +++ b/src/Database/Adapter/Postgres.php @@ -1365,6 +1365,35 @@ public function updateDocument(Document $collection, string $id, Document $docum return $document; } + protected function getInsertKeyword(): string + { + return 'INSERT INTO'; + } + + protected function getInsertSuffix(string $table): string + { + if (!$this->skipDuplicates) { + return ''; + } + + $conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")'; + + return "ON CONFLICT {$conflictTarget} DO NOTHING"; + } + + protected function getInsertPermissionsSuffix(): string + { + if (!$this->skipDuplicates) { + return ''; + } + + $conflictTarget = $this->sharedTables + ? '("_type", "_permission", "_document", "_tenant")' + : '("_type", "_permission", "_document")'; + + return "ON CONFLICT {$conflictTarget} DO NOTHING"; + } + /** * @param string $tableName * @param string $columns diff --git a/src/Database/Adapter/SQL.php b/src/Database/Adapter/SQL.php index 6864e6aee..f2e84fa30 100644 --- a/src/Database/Adapter/SQL.php +++ b/src/Database/Adapter/SQL.php @@ -2460,6 +2460,68 @@ protected function execute(mixed $stmt): bool return $stmt->execute(); } + /** + * Build a `_uid IN (...) [AND _tenant ...]` WHERE clause (and matching binds) + * for a batch of documents. Used by the skipDuplicates pre-filter and the + * race-condition reconciliation path. + * + * @param array $documents + * @param string $bindPrefix Prefix for bind keys; must differ between concurrent uses. + * @return array{where: string, tenantSelect: string, binds: array} + */ + private function buildUidTenantLookup(array $documents, string $bindPrefix): array + { + $binds = []; + $placeholders = []; + $uids = \array_values(\array_unique(\array_filter( + \array_map(fn (Document $doc) => $doc->getId(), $documents), + fn ($v) => $v !== null + ))); + foreach ($uids as $i => $uid) { + $key = ':' . $bindPrefix . 'uid_' . $i; + $placeholders[] = $key; + $binds[$key] = $uid; + } + + $tenantFilter = ''; + $tenantSelect = ''; + if ($this->sharedTables) { + if ($this->tenantPerDocument) { + $tenants = \array_values(\array_unique(\array_filter( + \array_map(fn (Document $doc) => $doc->getTenant(), $documents), + fn ($v) => $v !== null + ))); + $tenantPlaceholders = []; + foreach ($tenants as $j => $tenant) { + $tKey = ':' . $bindPrefix . 'tenant_' . $j; + $tenantPlaceholders[] = $tKey; + $binds[$tKey] = $tenant; + } + $tenantFilter = ' AND _tenant IN (' . \implode(', ', $tenantPlaceholders) . ')'; + $tenantSelect = ', _tenant'; + } else { + $tenantKey = ':' . $bindPrefix . 'tenant'; + $tenantFilter = ' AND _tenant = ' . $tenantKey; + $binds[$tenantKey] = $this->getTenant(); + } + } + + // No UIDs to match → return a clause that matches nothing without leaving dangling binds. + if (empty($placeholders)) { + return [ + 'where' => '1=0', + 'tenantSelect' => $tenantSelect, + 'binds' => [], + ]; + } + + return [ + 'where' => '_uid IN (' . \implode(', ', $placeholders) . ')' . $tenantFilter, + 'tenantSelect' => $tenantSelect, + 'binds' => $binds, + ]; + } + /** * Create Documents in batches * @@ -2476,6 +2538,52 @@ public function createDocuments(Document $collection, array $documents): array if (empty($documents)) { return $documents; } + + // Pre-filter existing UIDs to prevent race-condition duplicates. + if ($this->skipDuplicates) { + $name = $this->filter($collection->getId()); + $lookup = $this->buildUidTenantLookup($documents, '_dup_'); + + if (!empty($lookup['binds'])) { + try { + $sql = 'SELECT _uid' . $lookup['tenantSelect'] + . ' FROM ' . $this->getSQLTable($name) + . ' WHERE ' . $lookup['where']; + + $stmt = $this->getPDO()->prepare($sql); + foreach ($lookup['binds'] as $k => $v) { + $stmt->bindValue($k, $v, $this->getPDOType($v)); + } + $stmt->execute(); + $rows = $stmt->fetchAll(); + $stmt->closeCursor(); + + if ($this->sharedTables && $this->tenantPerDocument) { + $existingKeys = []; + foreach ($rows as $row) { + $existingKeys[$row['_tenant'] . ':' . $row['_uid']] = true; + } + $documents = \array_values(\array_filter( + $documents, + fn (Document $doc) => !isset($existingKeys[$doc->getTenant() . ':' . $doc->getId()]) + )); + } else { + $existingUids = \array_flip(\array_column($rows, '_uid')); + $documents = \array_values(\array_filter( + $documents, + fn (Document $doc) => !isset($existingUids[$doc->getId()]) + )); + } + } catch (PDOException $e) { + throw $this->processException($e); + } + } + + if (empty($documents)) { + return []; + } + } + $spatialAttributes = $this->getSpatialAttributes($collection); $collection = $collection->getId(); try { @@ -2573,8 +2681,9 @@ public function createDocuments(Document $collection, array $documents): array $batchKeys = \implode(', ', $batchKeys); $stmt = $this->getPDO()->prepare(" - INSERT INTO {$this->getSQLTable($name)} {$columns} + {$this->getInsertKeyword()} {$this->getSQLTable($name)} {$columns} VALUES {$batchKeys} + {$this->getInsertSuffix($name)} "); foreach ($bindValues as $key => $value) { @@ -2583,13 +2692,80 @@ public function createDocuments(Document $collection, array $documents): array $this->execute($stmt); + // Reconcile returned docs with actual inserts when a race condition skipped rows. + if ($this->skipDuplicates && $stmt->rowCount() < \count($documents)) { + $expectedTimestamps = []; + foreach ($documents as $doc) { + $eKey = ($this->sharedTables && $this->tenantPerDocument) + ? $doc->getTenant() . ':' . $doc->getId() + : $doc->getId(); + $expectedTimestamps[$eKey] = $doc->getCreatedAt(); + } + + $lookup = $this->buildUidTenantLookup($documents, '_vfy_'); + $verifySql = 'SELECT _uid' . $lookup['tenantSelect'] . ', ' . $this->quote($this->filter('_createdAt')) + . ' FROM ' . $this->getSQLTable($name) + . ' WHERE ' . $lookup['where']; + + $verifyStmt = $this->getPDO()->prepare($verifySql); + foreach ($lookup['binds'] as $k => $v) { + $verifyStmt->bindValue($k, $v, $this->getPDOType($v)); + } + $verifyStmt->execute(); + $rows = $verifyStmt->fetchAll(); + $verifyStmt->closeCursor(); + + // Normalise timestamps — Postgres omits .000 for round seconds. + $normalizeTimestamp = fn (?string $ts): string => $ts !== null + ? (new \DateTime($ts))->format('Y-m-d H:i:s.v') + : ''; + + $actualTimestamps = []; + foreach ($rows as $row) { + $key = ($this->sharedTables && $this->tenantPerDocument) + ? $row['_tenant'] . ':' . $row['_uid'] + : $row['_uid']; + $actualTimestamps[$key] = $normalizeTimestamp($row['_createdAt']); + } + + $insertedDocs = []; + foreach ($documents as $doc) { + $key = ($this->sharedTables && $this->tenantPerDocument) + ? $doc->getTenant() . ':' . $doc->getId() + : $doc->getId(); + if (isset($actualTimestamps[$key]) && $actualTimestamps[$key] === $normalizeTimestamp($expectedTimestamps[$key] ?? null)) { + $insertedDocs[] = $doc; + } + } + $documents = $insertedDocs; + + // Rebuild permissions for actually-inserted docs only + $permissions = []; + $bindValuesPermissions = []; + foreach ($documents as $index => $document) { + foreach (Database::PERMISSIONS as $type) { + foreach ($document->getPermissionsByType($type) as $permission) { + $tenantBind = $this->sharedTables ? ", :_tenant_{$index}" : ''; + $permission = \str_replace('"', '', $permission); + $permission = "('{$type}', '{$permission}', :_uid_{$index} {$tenantBind})"; + $permissions[] = $permission; + $bindValuesPermissions[":_uid_{$index}"] = $document->getId(); + if ($this->sharedTables) { + $bindValuesPermissions[":_tenant_{$index}"] = $document->getTenant(); + } + } + } + } + } + if (!empty($permissions)) { $tenantColumn = $this->sharedTables ? ', _tenant' : ''; $permissions = \implode(', ', $permissions); $sqlPermissions = " - INSERT INTO {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) - VALUES {$permissions}; + {$this->getInsertKeyword()} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) + VALUES {$permissions} + {$this->getInsertPermissionsSuffix()} "; $stmtPermissions = $this->getPDO()->prepare($sqlPermissions); @@ -2608,6 +2784,33 @@ public function createDocuments(Document $collection, array $documents): array return $documents; } + /** + * Returns the INSERT keyword, optionally with IGNORE for duplicate handling. + * Override in adapter subclasses for DB-specific syntax. + */ + protected function getInsertKeyword(): string + { + return $this->skipDuplicates ? 'INSERT IGNORE INTO' : 'INSERT INTO'; + } + + /** + * Returns a suffix appended after VALUES clause for duplicate handling. + * Override in adapter subclasses (e.g., Postgres uses ON CONFLICT DO NOTHING). + */ + protected function getInsertSuffix(string $table): string + { + return ''; + } + + /** + * Returns a suffix for the permissions INSERT statement when ignoring duplicates. + * Override in adapter subclasses for DB-specific syntax. + */ + protected function getInsertPermissionsSuffix(): string + { + return ''; + } + /** * @param Document $collection * @param string $attribute diff --git a/src/Database/Adapter/SQLite.php b/src/Database/Adapter/SQLite.php index 3c25987eb..2732f784b 100644 --- a/src/Database/Adapter/SQLite.php +++ b/src/Database/Adapter/SQLite.php @@ -34,6 +34,11 @@ */ class SQLite extends MariaDB { + protected function getInsertKeyword(): string + { + return $this->skipDuplicates ? 'INSERT OR IGNORE INTO' : 'INSERT INTO'; + } + /** * @inheritDoc */ diff --git a/src/Database/Database.php b/src/Database/Database.php index ac58d72f0..626334c2b 100644 --- a/src/Database/Database.php +++ b/src/Database/Database.php @@ -417,6 +417,8 @@ class Database protected bool $preserveDates = false; + protected bool $skipDuplicates = false; + protected bool $preserveSequence = false; protected int $maxQueryValues = 5000; @@ -842,6 +844,99 @@ public function skipRelationshipsExistCheck(callable $callback): mixed } } + public function skipDuplicates(callable $callback, bool $enable = true): mixed + { + if (!$enable) { + return $callback(); + } + + $previous = $this->skipDuplicates; + $this->skipDuplicates = true; + + try { + return $callback(); + } finally { + $this->skipDuplicates = $previous; + } + } + + /** + * Build a tenant-aware identity key for a document. + * Returns ":" in tenant-per-document shared-table mode, otherwise just the id. + */ + private function tenantKey(Document $document): string + { + return ($this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument()) + ? $document->getTenant() . ':' . $document->getId() + : $document->getId(); + } + + /** + * Batch-fetch documents by ID from a collection, keyed by tenantKey(). + * Chunks by maxQueryValues and groups queries by tenant in tenant-per-document mode. + * + * @param Document $collection + * @param array $documents Source documents (only IDs are read) + * @param bool $idsOnly When true, uses Query::select(['$id']) for a lighter fetch + * @return array + */ + private function fetchExistingByIds(Document $collection, array $documents, bool $idsOnly = false): array + { + $collectionId = $collection->getId(); + $tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument(); + $result = []; + + $buildQueries = function (array $chunk) use ($idsOnly) { + $queries = [Query::equal('$id', $chunk)]; + if ($idsOnly) { + $queries[] = Query::select(['$id']); + } + $queries[] = Query::limit(\count($chunk)); + return $queries; + }; + + if ($tenantPerDocument) { + $idsByTenant = []; + foreach ($documents as $doc) { + $id = $doc->getId(); + if ($id !== '') { + $idsByTenant[$doc->getTenant()][] = $id; + } + } + foreach ($idsByTenant as $tenant => $tenantIds) { + $tenantIds = \array_values(\array_unique($tenantIds)); + foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) { + $fetched = $this->authorization->skip( + fn () => $this->withTenant( + $tenant, + fn () => $this->silent(fn () => $this->find($collectionId, $buildQueries($idChunk))) + ) + ); + foreach ($fetched as $doc) { + $result[$tenant . ':' . $doc->getId()] = $doc; + } + } + } + + return $result; + } + + $ids = \array_values(\array_unique(\array_filter( + \array_map(fn (Document $doc) => $doc->getId(), $documents), + fn ($v) => $v !== null && $v !== '' + ))); + foreach (\array_chunk($ids, \max(1, $this->maxQueryValues)) as $idChunk) { + $fetched = $this->authorization->skip( + fn () => $this->silent(fn () => $this->find($collectionId, $buildQueries($idChunk))) + ); + foreach ($fetched as $doc) { + $result[$doc->getId()] = $doc; + } + } + + return $result; + } + /** * Trigger callback for events * @@ -5653,6 +5748,35 @@ public function createDocuments( $time = DateTime::now(); $modified = 0; + // Deduplicate intra-batch documents by ID (tenant-aware). First occurrence wins. + if ($this->skipDuplicates) { + $seenIds = []; + $deduplicated = []; + foreach ($documents as $document) { + if ($document->getId() !== '') { + $dedupeKey = $this->tenantKey($document); + if (isset($seenIds[$dedupeKey])) { + continue; + } + $seenIds[$dedupeKey] = true; + } + $deduplicated[] = $document; + } + $documents = $deduplicated; + } + + // Pre-fetch existing IDs to skip relationship writes for known duplicates + $preExistingIds = $this->skipDuplicates + ? $this->fetchExistingByIds($collection, $documents, idsOnly: true) + : []; + + /** @var array> $deferredRelationships */ + $deferredRelationships = []; + $relationships = []; + if ($this->skipDuplicates && $this->resolveRelationships) { + $relationships = \array_filter($collection->getAttribute('attributes', []), fn ($attr) => $attr['type'] === self::VAR_RELATIONSHIP); + } + foreach ($documents as $document) { $createdAt = $document->getCreatedAt(); $updatedAt = $document->getUpdatedAt(); @@ -5692,7 +5816,21 @@ public function createDocuments( } } - if ($this->resolveRelationships) { + if ($this->resolveRelationships && !empty($relationships)) { + // Defer relationship writes until after INSERT so we don't orphan records for duplicates. + $relationshipData = []; + foreach ($relationships as $rel) { + $key = $rel['key']; + $value = $document->getAttribute($key); + if ($value !== null) { + $relationshipData[$key] = $value; + } + $document->removeAttribute($key); + } + if (!empty($relationshipData)) { + $deferredRelationships[$this->tenantKey($document)] = $relationshipData; + } + } elseif ($this->resolveRelationships) { $document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document)); } @@ -5700,9 +5838,35 @@ public function createDocuments( } foreach (\array_chunk($documents, $batchSize) as $chunk) { - $batch = $this->withTransaction(function () use ($collection, $chunk) { - return $this->adapter->createDocuments($collection, $chunk); - }); + if ($this->skipDuplicates && !empty($preExistingIds)) { + $chunk = \array_values(\array_filter( + $chunk, + fn (Document $doc) => !isset($preExistingIds[$this->tenantKey($doc)]) + )); + if (empty($chunk)) { + continue; + } + } + + $batch = $this->withTransaction(fn () => $this->adapter->skipDuplicates( + fn () => $this->adapter->createDocuments($collection, $chunk), + $this->skipDuplicates + )); + + // Create deferred relationships only for docs that were actually inserted + if ($this->skipDuplicates && $this->resolveRelationships && \count($deferredRelationships) > 0) { + foreach ($batch as $insertedDoc) { + $deferKey = $this->tenantKey($insertedDoc); + if (\array_key_exists($deferKey, $deferredRelationships)) { + $relDoc = clone $insertedDoc; + foreach ($deferredRelationships[$deferKey] as $key => $value) { + $relDoc->setAttribute($key, $value); + } + $this->silent(fn () => $this->createDocumentRelationships($collection, $relDoc)); + unset($deferredRelationships[$deferKey]); + } + } + } $batch = $this->adapter->getSequences($collection->getId(), $batch); @@ -7116,18 +7280,12 @@ public function upsertDocumentsWithIncrease( $created = 0; $updated = 0; $seenIds = []; + + // Batch-fetch existing documents in one query instead of N individual getDocument() calls + $existingDocs = $this->fetchExistingByIds($collection, $documents); + foreach ($documents as $key => $document) { - if ($this->getSharedTables() && $this->getTenantPerDocument()) { - $old = $this->authorization->skip(fn () => $this->withTenant($document->getTenant(), fn () => $this->silent(fn () => $this->getDocument( - $collection->getId(), - $document->getId(), - )))); - } else { - $old = $this->authorization->skip(fn () => $this->silent(fn () => $this->getDocument( - $collection->getId(), - $document->getId(), - ))); - } + $old = $existingDocs[$this->tenantKey($document)] ?? new Document(); // Extract operators early to avoid comparison issues $documentArray = $document->getArrayCopy(); @@ -7294,7 +7452,7 @@ public function upsertDocumentsWithIncrease( $document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document)); } - $seenIds[] = $document->getId(); + $seenIds[] = $this->tenantKey($document); $old = $this->adapter->castingBefore($collection, $old); $document = $this->adapter->castingBefore($collection, $document); diff --git a/src/Database/Mirror.php b/src/Database/Mirror.php index f740cab3e..5d5777a5d 100644 --- a/src/Database/Mirror.php +++ b/src/Database/Mirror.php @@ -601,12 +601,15 @@ public function createDocuments( ?callable $onNext = null, ?callable $onError = null, ): int { - $modified = $this->source->createDocuments( - $collection, - $documents, - $batchSize, - $onNext, - $onError, + $modified = $this->source->skipDuplicates( + fn () => $this->source->createDocuments( + $collection, + $documents, + $batchSize, + $onNext, + $onError, + ), + $this->skipDuplicates ); if ( @@ -639,13 +642,15 @@ public function createDocuments( $clones[] = $clone; } - $this->destination->withPreserveDates( - fn () => - $this->destination->createDocuments( - $collection, - $clones, - $batchSize, - ) + $this->destination->skipDuplicates( + fn () => $this->destination->withPreserveDates( + fn () => $this->destination->createDocuments( + $collection, + $clones, + $batchSize, + ) + ), + $this->skipDuplicates ); foreach ($clones as $clone) { diff --git a/tests/e2e/Adapter/Scopes/DocumentTests.php b/tests/e2e/Adapter/Scopes/DocumentTests.php index d16004d32..5f82cc025 100644 --- a/tests/e2e/Adapter/Scopes/DocumentTests.php +++ b/tests/e2e/Adapter/Scopes/DocumentTests.php @@ -7722,4 +7722,208 @@ public function testRegexInjection(): void // } // $database->deleteCollection($collectionName); // } + + public function testCreateDocumentsIgnoreDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + + // Insert initial documents + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Original A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'doc2', + 'name' => 'Original B', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + + // Without ignore, duplicates should throw + try { + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Duplicate A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + $this->fail('Expected DuplicateException'); + } catch (DuplicateException $e) { + $this->assertNotEmpty($e->getMessage()); + } + + // With skipDuplicates, duplicates should be silently skipped + $emittedIds = []; + $collection = __FUNCTION__; + $count = $database->skipDuplicates(function () use ($database, $collection, &$emittedIds) { + return $database->createDocuments($collection, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Duplicate A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'doc3', + 'name' => 'New C', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }); + }); + + // Only doc3 was new, doc1 was skipped as duplicate + $this->assertSame(1, $count); + $this->assertCount(1, $emittedIds); + $this->assertSame('doc3', $emittedIds[0]); + + // doc3 should exist, doc1 should retain original value + $doc1 = $database->getDocument(__FUNCTION__, 'doc1'); + $this->assertSame('Original A', $doc1->getAttribute('name')); + + $doc3 = $database->getDocument(__FUNCTION__, 'doc3'); + $this->assertSame('New C', $doc3->getAttribute('name')); + + // Total should be 3 (doc1, doc2, doc3) + $all = $database->find(__FUNCTION__); + $this->assertCount(3, $all); + } + + public function testCreateDocumentsIgnoreIntraBatchDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + $col = 'createDocsIgnoreIntraBatch'; + + $database->createCollection($col); + $database->createAttribute($col, 'name', Database::VAR_STRING, 128, true); + + // Two docs with same ID in one batch — first wins, second is deduplicated + $emittedIds = []; + $count = $database->skipDuplicates(function () use ($database, $col, &$emittedIds) { + return $database->createDocuments($col, [ + new Document([ + '$id' => 'dup', + 'name' => 'First', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'dup', + 'name' => 'Second', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + Permission::update(Role::user('extra')), + ], + ]), + new Document([ + '$id' => 'unique1', + 'name' => 'Unique', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }); + }); + + $this->assertSame(2, $count); + $this->assertCount(2, $emittedIds); + + // First occurrence wins + $doc = $database->getDocument($col, 'dup'); + $this->assertSame('First', $doc->getAttribute('name')); + + // Second doc's extra permission should NOT exist (no ACL drift) + $perms = $doc->getPermissions(); + foreach ($perms as $perm) { + $this->assertStringNotContainsString('extra', $perm); + } + + // unique1 should exist + $unique = $database->getDocument($col, 'unique1'); + $this->assertSame('Unique', $unique->getAttribute('name')); + + // Total: 2 documents + $all = $database->find($col); + $this->assertCount(2, $all); + } + + public function testCreateDocumentsIgnoreAllDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + + // Insert initial document + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'existing', + 'name' => 'Original', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + + // With skipDuplicates, inserting only duplicates should succeed with no new rows + $emittedIds = []; + $collection = __FUNCTION__; + $count = $database->skipDuplicates(function () use ($database, $collection, &$emittedIds) { + return $database->createDocuments($collection, [ + new Document([ + '$id' => 'existing', + 'name' => 'Duplicate', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }); + }); + + // All duplicates skipped, nothing inserted + $this->assertSame(0, $count); + $this->assertSame([], $emittedIds); + + // Original document should be unchanged + $doc = $database->getDocument(__FUNCTION__, 'existing'); + $this->assertSame('Original', $doc->getAttribute('name')); + + // Still only 1 document + $all = $database->find(__FUNCTION__); + $this->assertCount(1, $all); + } }