From 60f1ff12d868921f8f405d41fbd28a74875d1380 Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Thu, 9 Apr 2026 11:26:53 +0100 Subject: [PATCH 1/8] Add ignore param to createDocuments for silent duplicate handling Adapters: - MariaDB/MySQL: INSERT IGNORE INTO - PostgreSQL: ON CONFLICT DO NOTHING - SQLite: INSERT OR IGNORE INTO - MongoDB: session-scoped pre-filter before insertMany Database.php: - Intra-batch dedup by ID (tenant-aware, first occurrence wins) - Pre-fetch existing IDs to skip known duplicates - Deferred relationship creation for ignore mode (no orphans) - Race-condition reconciliation via _createdAt timestamp verification upsertDocuments: - Batch-fetch existing docs with find() instead of per-row getDocument() - Tenant-aware composite keys for seenIds duplicate check All paths are tenant-per-document aware with null-safe array_filter. --- src/Database/Adapter.php | 3 +- src/Database/Adapter/Mongo.php | 93 ++++++++++++- src/Database/Adapter/Pool.php | 2 +- src/Database/Adapter/Postgres.php | 29 +++++ src/Database/Adapter/SQL.php | 210 +++++++++++++++++++++++++++++- src/Database/Adapter/SQLite.php | 5 + src/Database/Database.php | 194 ++++++++++++++++++++++++--- src/Database/Mirror.php | 3 + 8 files changed, 516 insertions(+), 23 deletions(-) diff --git a/src/Database/Adapter.php b/src/Database/Adapter.php index a7b385cce..9cc83d141 100644 --- a/src/Database/Adapter.php +++ b/src/Database/Adapter.php @@ -729,12 +729,13 @@ abstract public function createDocument(Document $collection, Document $document * * @param Document $collection * @param array $documents + * @param bool $ignore If true, silently ignore duplicate documents instead of throwing * * @return array * * @throws DatabaseException */ - abstract public function createDocuments(Document $collection, array $documents): array; + abstract public function createDocuments(Document $collection, array $documents, bool $ignore = false): array; /** * Update Document diff --git a/src/Database/Adapter/Mongo.php b/src/Database/Adapter/Mongo.php index 7ddde43d3..71bbc3454 100644 --- a/src/Database/Adapter/Mongo.php +++ b/src/Database/Adapter/Mongo.php @@ -1460,11 +1460,11 @@ public function castingBefore(Document $collection, Document $document): Documen * @throws DuplicateException * @throws DatabaseException */ - public function createDocuments(Document $collection, array $documents): array + public function createDocuments(Document $collection, array $documents, bool $ignore = false): array { $name = $this->getNamespace() . '_' . $this->filter($collection->getId()); - $options = $this->getTransactionOptions(); + $records = []; $hasSequence = null; $documents = \array_map(fn ($doc) => clone $doc, $documents); @@ -1487,6 +1487,95 @@ public function createDocuments(Document $collection, array $documents): array $records[] = $record; } + // Pre-filter duplicates within the session to avoid aborting the transaction. + if ($ignore && !empty($records)) { + $existingKeys = []; + + try { + if ($this->sharedTables && $this->tenantPerDocument) { + $idsByTenant = []; + foreach ($records as $record) { + $uid = $record['_uid'] ?? ''; + if ($uid === '') { + continue; + } + $tenant = $record['_tenant'] ?? $this->getTenant(); + $idsByTenant[$tenant][] = $uid; + } + + foreach ($idsByTenant as $tenant => $tenantUids) { + $tenantUids = \array_values(\array_unique($tenantUids)); + $findOptions = $this->getTransactionOptions([ + 'projection' => ['_uid' => 1], + 'batchSize' => \count($tenantUids), + ]); + $filters = ['_uid' => ['$in' => $tenantUids], '_tenant' => $tenant]; + $response = $this->client->find($name, $filters, $findOptions); + foreach ($response->cursor->firstBatch ?? [] as $doc) { + $existingKeys[$tenant . ':' . $doc->_uid] = true; + } + $cursorId = $response->cursor->id ?? 0; + while ($cursorId != 0) { + $more = $this->client->getMore($cursorId, $name, \count($tenantUids)); + foreach ($more->cursor->nextBatch ?? [] as $doc) { + $existingKeys[$tenant . ':' . $doc->_uid] = true; + } + $cursorId = $more->cursor->id ?? 0; + } + } + } else { + $uids = \array_filter(\array_map(fn ($r) => $r['_uid'] ?? null, $records), fn ($v) => $v !== null); + if (!empty($uids)) { + $uidValues = \array_values(\array_unique($uids)); + $findOptions = $this->getTransactionOptions([ + 'projection' => ['_uid' => 1], + 'batchSize' => \count($uidValues), + ]); + $filters = ['_uid' => ['$in' => $uidValues]]; + if ($this->sharedTables) { + $filters['_tenant'] = $this->getTenantFilters($collection->getId()); + } + $response = $this->client->find($name, $filters, $findOptions); + foreach ($response->cursor->firstBatch ?? [] as $doc) { + $existingKeys[$doc->_uid] = true; + } + $cursorId = $response->cursor->id ?? 0; + while ($cursorId != 0) { + $more = $this->client->getMore($cursorId, $name, \count($uidValues)); + foreach ($more->cursor->nextBatch ?? [] as $doc) { + $existingKeys[$doc->_uid] = true; + } + $cursorId = $more->cursor->id ?? 0; + } + } + } + } catch (MongoException $e) { + throw $this->processException($e); + } + + if (!empty($existingKeys)) { + $filteredRecords = []; + $filteredDocuments = []; + $tenantPerDoc = $this->sharedTables && $this->tenantPerDocument; + foreach ($records as $i => $record) { + $uid = $record['_uid'] ?? ''; + $key = $tenantPerDoc + ? ($record['_tenant'] ?? $this->getTenant()) . ':' . $uid + : $uid; + if (!isset($existingKeys[$key])) { + $filteredRecords[] = $record; + $filteredDocuments[] = $documents[$i]; + } + } + $records = $filteredRecords; + $documents = $filteredDocuments; + } + + if (empty($records)) { + return []; + } + } + try { $documents = $this->client->insertMany($name, $records, $options); } catch (MongoException $e) { diff --git a/src/Database/Adapter/Pool.php b/src/Database/Adapter/Pool.php index 668753387..ec508c3e4 100644 --- a/src/Database/Adapter/Pool.php +++ b/src/Database/Adapter/Pool.php @@ -268,7 +268,7 @@ public function createDocument(Document $collection, Document $document): Docume return $this->delegate(__FUNCTION__, \func_get_args()); } - public function createDocuments(Document $collection, array $documents): array + public function createDocuments(Document $collection, array $documents, bool $ignore = false): array { return $this->delegate(__FUNCTION__, \func_get_args()); } diff --git a/src/Database/Adapter/Postgres.php b/src/Database/Adapter/Postgres.php index 8dcf72025..814ecc8bc 100644 --- a/src/Database/Adapter/Postgres.php +++ b/src/Database/Adapter/Postgres.php @@ -1365,6 +1365,35 @@ public function updateDocument(Document $collection, string $id, Document $docum return $document; } + protected function getInsertKeyword(bool $ignore): string + { + return 'INSERT INTO'; + } + + protected function getInsertSuffix(bool $ignore, string $table): string + { + if (!$ignore) { + return ''; + } + + $conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")'; + + return "ON CONFLICT {$conflictTarget} DO NOTHING"; + } + + protected function getInsertPermissionsSuffix(bool $ignore): string + { + if (!$ignore) { + return ''; + } + + $conflictTarget = $this->sharedTables + ? '("_type", "_permission", "_document", "_tenant")' + : '("_type", "_permission", "_document")'; + + return "ON CONFLICT {$conflictTarget} DO NOTHING"; + } + /** * @param string $tableName * @param string $columns diff --git a/src/Database/Adapter/SQL.php b/src/Database/Adapter/SQL.php index 6864e6aee..6c918ef15 100644 --- a/src/Database/Adapter/SQL.php +++ b/src/Database/Adapter/SQL.php @@ -2471,11 +2471,87 @@ protected function execute(mixed $stmt): bool * @throws DuplicateException * @throws \Throwable */ - public function createDocuments(Document $collection, array $documents): array + public function createDocuments(Document $collection, array $documents, bool $ignore = false): array { if (empty($documents)) { return $documents; } + + // Pre-filter existing UIDs to prevent race-condition duplicates. + if ($ignore) { + $collectionId = $collection->getId(); + $name = $this->filter($collectionId); + $uids = \array_filter(\array_map(fn (Document $doc) => $doc->getId(), $documents), fn ($v) => $v !== null); + + if (!empty($uids)) { + try { + $placeholders = []; + $binds = []; + foreach (\array_values(\array_unique($uids)) as $i => $uid) { + $key = ':_dup_uid_' . $i; + $placeholders[] = $key; + $binds[$key] = $uid; + } + + $tenantFilter = ''; + if ($this->sharedTables) { + if ($this->tenantPerDocument) { + $tenants = \array_values(\array_unique(\array_filter( + \array_map(fn (Document $doc) => $doc->getTenant(), $documents), + fn ($v) => $v !== null + ))); + $tenantPlaceholders = []; + foreach ($tenants as $j => $tenant) { + $tKey = ':_dup_tenant_' . $j; + $tenantPlaceholders[] = $tKey; + $binds[$tKey] = $tenant; + } + $tenantFilter = ' AND _tenant IN (' . \implode(', ', $tenantPlaceholders) . ')'; + } else { + $tenantFilter = ' AND _tenant = :_dup_tenant'; + $binds[':_dup_tenant'] = $this->getTenant(); + } + } + + $tenantSelect = $this->sharedTables && $this->tenantPerDocument ? ', _tenant' : ''; + $sql = 'SELECT _uid' . $tenantSelect . ' FROM ' . $this->getSQLTable($name) + . ' WHERE _uid IN (' . \implode(', ', $placeholders) . ')' + . $tenantFilter; + + $stmt = $this->getPDO()->prepare($sql); + foreach ($binds as $k => $v) { + $stmt->bindValue($k, $v, $this->getPDOType($v)); + } + $stmt->execute(); + $rows = $stmt->fetchAll(); + $stmt->closeCursor(); + + if ($this->sharedTables && $this->tenantPerDocument) { + $existingKeys = []; + foreach ($rows as $row) { + $existingKeys[$row['_tenant'] . ':' . $row['_uid']] = true; + } + $documents = \array_values(\array_filter( + $documents, + fn (Document $doc) => !isset($existingKeys[$doc->getTenant() . ':' . $doc->getId()]) + )); + } else { + $existingUids = \array_flip(\array_column($rows, '_uid')); + $documents = \array_values(\array_filter( + $documents, + fn (Document $doc) => !isset($existingUids[$doc->getId()]) + )); + } + } catch (PDOException $e) { + throw $this->processException($e); + } + } + + if (empty($documents)) { + return []; + } + } + $spatialAttributes = $this->getSpatialAttributes($collection); $collection = $collection->getId(); try { @@ -2573,8 +2649,9 @@ public function createDocuments(Document $collection, array $documents): array $batchKeys = \implode(', ', $batchKeys); $stmt = $this->getPDO()->prepare(" - INSERT INTO {$this->getSQLTable($name)} {$columns} + {$this->getInsertKeyword($ignore)} {$this->getSQLTable($name)} {$columns} VALUES {$batchKeys} + {$this->getInsertSuffix($ignore, $name)} "); foreach ($bindValues as $key => $value) { @@ -2583,13 +2660,111 @@ public function createDocuments(Document $collection, array $documents): array $this->execute($stmt); + // Reconcile returned docs with actual inserts when a race condition skipped rows. + if ($ignore && $stmt->rowCount() < \count($documents)) { + $expectedTimestamps = []; + foreach ($documents as $doc) { + $eKey = ($this->sharedTables && $this->tenantPerDocument) + ? $doc->getTenant() . ':' . $doc->getId() + : $doc->getId(); + $expectedTimestamps[$eKey] = $doc->getCreatedAt(); + } + + $verifyPlaceholders = []; + $verifyBinds = []; + $rawUids = \array_values(\array_unique(\array_map(fn (Document $doc) => $doc->getId(), $documents))); + foreach ($rawUids as $idx => $uid) { + $ph = ':_vfy_' . $idx; + $verifyPlaceholders[] = $ph; + $verifyBinds[$ph] = $uid; + } + + $tenantWhere = ''; + $tenantSelect = ''; + if ($this->sharedTables) { + if ($this->tenantPerDocument) { + $tenants = \array_values(\array_unique(\array_filter( + \array_map(fn (Document $doc) => $doc->getTenant(), $documents), + fn ($v) => $v !== null + ))); + $tenantPlaceholders = []; + foreach ($tenants as $j => $tenant) { + $tKey = ':_vfy_tenant_' . $j; + $tenantPlaceholders[] = $tKey; + $verifyBinds[$tKey] = $tenant; + } + $tenantWhere = ' AND _tenant IN (' . \implode(', ', $tenantPlaceholders) . ')'; + $tenantSelect = ', _tenant'; + } else { + $tenantWhere = ' AND _tenant = :_vfy_tenant'; + $verifyBinds[':_vfy_tenant'] = $this->getTenant(); + } + } + + $verifySql = 'SELECT _uid' . $tenantSelect . ', ' . $this->quote($this->filter('_createdAt')) + . ' FROM ' . $this->getSQLTable($name) + . ' WHERE _uid IN (' . \implode(', ', $verifyPlaceholders) . ')' + . $tenantWhere; + + $verifyStmt = $this->getPDO()->prepare($verifySql); + foreach ($verifyBinds as $k => $v) { + $verifyStmt->bindValue($k, $v, $this->getPDOType($v)); + } + $verifyStmt->execute(); + $rows = $verifyStmt->fetchAll(); + $verifyStmt->closeCursor(); + + // Normalise timestamps — Postgres omits .000 for round seconds. + $normalizeTimestamp = fn (?string $ts): string => $ts !== null + ? (new \DateTime($ts))->format('Y-m-d H:i:s.v') + : ''; + + $actualTimestamps = []; + foreach ($rows as $row) { + $key = ($this->sharedTables && $this->tenantPerDocument) + ? $row['_tenant'] . ':' . $row['_uid'] + : $row['_uid']; + $actualTimestamps[$key] = $normalizeTimestamp($row['_createdAt']); + } + + $insertedDocs = []; + foreach ($documents as $doc) { + $key = ($this->sharedTables && $this->tenantPerDocument) + ? $doc->getTenant() . ':' . $doc->getId() + : $doc->getId(); + if (isset($actualTimestamps[$key]) && $actualTimestamps[$key] === $normalizeTimestamp($expectedTimestamps[$key] ?? null)) { + $insertedDocs[] = $doc; + } + } + $documents = $insertedDocs; + + // Rebuild permissions for actually-inserted docs only + $permissions = []; + $bindValuesPermissions = []; + foreach ($documents as $index => $document) { + foreach (Database::PERMISSIONS as $type) { + foreach ($document->getPermissionsByType($type) as $permission) { + $tenantBind = $this->sharedTables ? ", :_tenant_{$index}" : ''; + $permission = \str_replace('"', '', $permission); + $permission = "('{$type}', '{$permission}', :_uid_{$index} {$tenantBind})"; + $permissions[] = $permission; + $bindValuesPermissions[":_uid_{$index}"] = $document->getId(); + if ($this->sharedTables) { + $bindValuesPermissions[":_tenant_{$index}"] = $document->getTenant(); + } + } + } + } + } + if (!empty($permissions)) { $tenantColumn = $this->sharedTables ? ', _tenant' : ''; $permissions = \implode(', ', $permissions); $sqlPermissions = " - INSERT INTO {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) - VALUES {$permissions}; + {$this->getInsertKeyword($ignore)} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) + VALUES {$permissions} + {$this->getInsertPermissionsSuffix($ignore)} "; $stmtPermissions = $this->getPDO()->prepare($sqlPermissions); @@ -2608,6 +2783,33 @@ public function createDocuments(Document $collection, array $documents): array return $documents; } + /** + * Returns the INSERT keyword, optionally with IGNORE for duplicate handling. + * Override in adapter subclasses for DB-specific syntax. + */ + protected function getInsertKeyword(bool $ignore): string + { + return $ignore ? 'INSERT IGNORE INTO' : 'INSERT INTO'; + } + + /** + * Returns a suffix appended after VALUES clause for duplicate handling. + * Override in adapter subclasses (e.g., Postgres uses ON CONFLICT DO NOTHING). + */ + protected function getInsertSuffix(bool $ignore, string $table): string + { + return ''; + } + + /** + * Returns a suffix for the permissions INSERT statement when ignoring duplicates. + * Override in adapter subclasses for DB-specific syntax. + */ + protected function getInsertPermissionsSuffix(bool $ignore): string + { + return ''; + } + /** * @param Document $collection * @param string $attribute diff --git a/src/Database/Adapter/SQLite.php b/src/Database/Adapter/SQLite.php index 3c25987eb..8e7ef6b5b 100644 --- a/src/Database/Adapter/SQLite.php +++ b/src/Database/Adapter/SQLite.php @@ -34,6 +34,11 @@ */ class SQLite extends MariaDB { + protected function getInsertKeyword(bool $ignore): string + { + return $ignore ? 'INSERT OR IGNORE INTO' : 'INSERT INTO'; + } + /** * @inheritDoc */ diff --git a/src/Database/Database.php b/src/Database/Database.php index ac58d72f0..cbcc80273 100644 --- a/src/Database/Database.php +++ b/src/Database/Database.php @@ -5621,6 +5621,7 @@ public function createDocument(string $collection, Document $document): Document * @param int $batchSize * @param (callable(Document): void)|null $onNext * @param (callable(Throwable): void)|null $onError + * @param bool $ignore If true, silently ignore duplicate documents instead of throwing * @return int * @throws AuthorizationException * @throws StructureException @@ -5633,6 +5634,7 @@ public function createDocuments( int $batchSize = self::INSERT_BATCH_SIZE, ?callable $onNext = null, ?callable $onError = null, + bool $ignore = false, ): int { if (!$this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument()) { throw new DatabaseException('Shared tables must be enabled if tenant per document is enabled.'); @@ -5653,6 +5655,81 @@ public function createDocuments( $time = DateTime::now(); $modified = 0; + $tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument(); + + // Deduplicate intra-batch documents by ID (tenant-aware). First occurrence wins. + if ($ignore) { + $seenIds = []; + $deduplicated = []; + foreach ($documents as $document) { + $docId = $document->getId(); + if ($docId !== '') { + $dedupeKey = $tenantPerDocument + ? $document->getTenant() . ':' . $docId + : $docId; + if (isset($seenIds[$dedupeKey])) { + continue; + } + $seenIds[$dedupeKey] = true; + } + $deduplicated[] = $document; + } + $documents = $deduplicated; + } + + // Pre-fetch existing IDs to skip relationship writes for known duplicates + $preExistingIds = []; + if ($ignore) { + if ($tenantPerDocument) { + $idsByTenant = []; + foreach ($documents as $doc) { + $idsByTenant[$doc->getTenant()][] = $doc->getId(); + } + foreach ($idsByTenant as $tenant => $tenantIds) { + $tenantIds = \array_values(\array_unique(\array_filter($tenantIds, fn ($v) => $v !== null))); + foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) { + $existing = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find( + $collection->getId(), + [ + Query::equal('$id', $idChunk), + Query::select(['$id']), + Query::limit(\count($idChunk)), + ] + )))); + foreach ($existing as $doc) { + $preExistingIds[$tenant . ':' . $doc->getId()] = true; + } + } + } + } else { + $inputIds = \array_values(\array_unique(\array_filter( + \array_map(fn (Document $doc) => $doc->getId(), $documents), + fn ($v) => $v !== null + ))); + + foreach (\array_chunk($inputIds, \max(1, $this->maxQueryValues)) as $idChunk) { + $existing = $this->authorization->skip(fn () => $this->silent(fn () => $this->find( + $collection->getId(), + [ + Query::equal('$id', $idChunk), + Query::select(['$id']), + Query::limit(\count($idChunk)), + ] + ))); + foreach ($existing as $doc) { + $preExistingIds[$doc->getId()] = true; + } + } + } + } + + /** @var array> $deferredRelationships */ + $deferredRelationships = []; + $relationships = []; + if ($ignore && $this->resolveRelationships) { + $relationships = \array_filter($collection->getAttribute('attributes', []), fn ($attr) => $attr['type'] === self::VAR_RELATIONSHIP); + } + foreach ($documents as $document) { $createdAt = $document->getCreatedAt(); $updatedAt = $document->getUpdatedAt(); @@ -5692,18 +5769,70 @@ public function createDocuments( } } - if ($this->resolveRelationships) { - $document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document)); + if ($this->resolveRelationships && !empty($relationships)) { + // Defer: store relationship data, strip attributes for INSERT. + $relationshipData = []; + foreach ($relationships as $rel) { + $key = $rel['key']; + $value = $document->getAttribute($key); + if ($value !== null) { + $relationshipData[$key] = $value; + } + $document->removeAttribute($key); + } + if (!empty($relationshipData)) { + $deferKey = $tenantPerDocument + ? $document->getTenant() . ':' . $document->getId() + : $document->getId(); + $deferredRelationships[$deferKey] = $relationshipData; + } + } elseif ($this->resolveRelationships) { + $preExistKey = $tenantPerDocument + ? $document->getTenant() . ':' . $document->getId() + : $document->getId(); + + if (!isset($preExistingIds[$preExistKey])) { + $document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document)); + } } $document = $this->adapter->castingBefore($collection, $document); } foreach (\array_chunk($documents, $batchSize) as $chunk) { - $batch = $this->withTransaction(function () use ($collection, $chunk) { - return $this->adapter->createDocuments($collection, $chunk); + if ($ignore && !empty($preExistingIds)) { + $chunk = \array_values(\array_filter($chunk, function (Document $doc) use ($preExistingIds, $tenantPerDocument) { + $key = $tenantPerDocument + ? $doc->getTenant() . ':' . $doc->getId() + : $doc->getId(); + return !isset($preExistingIds[$key]); + })); + if (empty($chunk)) { + continue; + } + } + + $batch = $this->withTransaction(function () use ($collection, $chunk, $ignore) { + return $this->adapter->createDocuments($collection, $chunk, $ignore); }); + // Create deferred relationships only for docs that were actually inserted + if ($ignore && $this->resolveRelationships && \count($deferredRelationships) > 0) { + foreach ($batch as $insertedDoc) { + $deferKey = $tenantPerDocument + ? $insertedDoc->getTenant() . ':' . $insertedDoc->getId() + : $insertedDoc->getId(); + if (\array_key_exists($deferKey, $deferredRelationships)) { + $relDoc = clone $insertedDoc; + foreach ($deferredRelationships[$deferKey] as $key => $value) { + $relDoc->setAttribute($key, $value); + } + $this->silent(fn () => $this->createDocumentRelationships($collection, $relDoc)); + unset($deferredRelationships[$deferKey]); + } + } + } + $batch = $this->adapter->getSequences($collection->getId(), $batch); if (!$this->inBatchRelationshipPopulation && $this->resolveRelationships) { @@ -7116,18 +7245,51 @@ public function upsertDocumentsWithIncrease( $created = 0; $updated = 0; $seenIds = []; - foreach ($documents as $key => $document) { - if ($this->getSharedTables() && $this->getTenantPerDocument()) { - $old = $this->authorization->skip(fn () => $this->withTenant($document->getTenant(), fn () => $this->silent(fn () => $this->getDocument( - $collection->getId(), - $document->getId(), - )))); + + // Batch-fetch existing documents in one query instead of N individual getDocument() calls + $ids = \array_filter(\array_map(fn ($doc) => $doc->getId(), $documents), fn ($v) => $v !== null); + $existingDocs = []; + $upsertTenantPerDocument = $this->getSharedTables() && $this->getTenantPerDocument(); + + if (!empty($ids)) { + $uniqueIds = \array_values(\array_unique($ids)); + + if ($upsertTenantPerDocument) { + $idsByTenant = []; + foreach ($documents as $doc) { + $tenant = $doc->getTenant(); + $idsByTenant[$tenant][] = $doc->getId(); + } + foreach ($idsByTenant as $tenant => $tenantIds) { + $tenantIds = \array_values(\array_unique(\array_filter($tenantIds, fn ($v) => $v !== null))); + foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) { + $fetched = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find( + $collection->getId(), + [Query::equal('$id', $idChunk), Query::limit(\count($idChunk))], + )))); + foreach ($fetched as $doc) { + $existingDocs[$tenant . ':' . $doc->getId()] = $doc; + } + } + } } else { - $old = $this->authorization->skip(fn () => $this->silent(fn () => $this->getDocument( - $collection->getId(), - $document->getId(), - ))); + foreach (\array_chunk($uniqueIds, \max(1, $this->maxQueryValues)) as $idChunk) { + $fetched = $this->authorization->skip(fn () => $this->silent(fn () => $this->find( + $collection->getId(), + [Query::equal('$id', $idChunk), Query::limit(\count($idChunk))], + ))); + foreach ($fetched as $doc) { + $existingDocs[$doc->getId()] = $doc; + } + } } + } + + foreach ($documents as $key => $document) { + $lookupKey = $upsertTenantPerDocument + ? $document->getTenant() . ':' . $document->getId() + : $document->getId(); + $old = $existingDocs[$lookupKey] ?? new Document(); // Extract operators early to avoid comparison issues $documentArray = $document->getArrayCopy(); @@ -7294,7 +7456,9 @@ public function upsertDocumentsWithIncrease( $document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document)); } - $seenIds[] = $document->getId(); + $seenIds[] = $upsertTenantPerDocument + ? $document->getTenant() . ':' . $document->getId() + : $document->getId(); $old = $this->adapter->castingBefore($collection, $old); $document = $this->adapter->castingBefore($collection, $document); diff --git a/src/Database/Mirror.php b/src/Database/Mirror.php index f740cab3e..636d273dd 100644 --- a/src/Database/Mirror.php +++ b/src/Database/Mirror.php @@ -600,6 +600,7 @@ public function createDocuments( int $batchSize = self::INSERT_BATCH_SIZE, ?callable $onNext = null, ?callable $onError = null, + bool $ignore = false, ): int { $modified = $this->source->createDocuments( $collection, @@ -607,6 +608,7 @@ public function createDocuments( $batchSize, $onNext, $onError, + $ignore, ); if ( @@ -645,6 +647,7 @@ public function createDocuments( $collection, $clones, $batchSize, + ignore: $ignore, ) ); From bee42d477cb6aa602ce0df88bd9b77a93874fed0 Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Thu, 9 Apr 2026 11:27:17 +0100 Subject: [PATCH 2/8] Add e2e tests for createDocuments ignore mode - testCreateDocumentsIgnoreDuplicates: mixed batch with onNext assertions - testCreateDocumentsIgnoreIntraBatchDuplicates: first-wins, no ACL drift - testCreateDocumentsIgnoreAllDuplicates: zero inserts, empty onNext --- tests/e2e/Adapter/Scopes/DocumentTests.php | 196 +++++++++++++++++++++ 1 file changed, 196 insertions(+) diff --git a/tests/e2e/Adapter/Scopes/DocumentTests.php b/tests/e2e/Adapter/Scopes/DocumentTests.php index d16004d32..34a463200 100644 --- a/tests/e2e/Adapter/Scopes/DocumentTests.php +++ b/tests/e2e/Adapter/Scopes/DocumentTests.php @@ -7722,4 +7722,200 @@ public function testRegexInjection(): void // } // $database->deleteCollection($collectionName); // } + + public function testCreateDocumentsIgnoreDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + + // Insert initial documents + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Original A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'doc2', + 'name' => 'Original B', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + + // Without ignore, duplicates should throw + try { + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Duplicate A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + $this->fail('Expected DuplicateException'); + } catch (DuplicateException $e) { + $this->assertNotEmpty($e->getMessage()); + } + + // With ignore, duplicates should be silently skipped + $emittedIds = []; + $count = $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Duplicate A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'doc3', + 'name' => 'New C', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }, ignore: true); + + // Only doc3 was new, doc1 was skipped as duplicate + $this->assertSame(1, $count); + $this->assertCount(1, $emittedIds); + $this->assertSame('doc3', $emittedIds[0]); + + // doc3 should exist, doc1 should retain original value + $doc1 = $database->getDocument(__FUNCTION__, 'doc1'); + $this->assertSame('Original A', $doc1->getAttribute('name')); + + $doc3 = $database->getDocument(__FUNCTION__, 'doc3'); + $this->assertSame('New C', $doc3->getAttribute('name')); + + // Total should be 3 (doc1, doc2, doc3) + $all = $database->find(__FUNCTION__); + $this->assertCount(3, $all); + } + + public function testCreateDocumentsIgnoreIntraBatchDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + $col = 'createDocsIgnoreIntraBatch'; + + $database->createCollection($col); + $database->createAttribute($col, 'name', Database::VAR_STRING, 128, true); + + // Two docs with same ID in one batch — first wins, second is deduplicated + $emittedIds = []; + $count = $database->createDocuments($col, [ + new Document([ + '$id' => 'dup', + 'name' => 'First', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'dup', + 'name' => 'Second', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + Permission::update(Role::user('extra')), + ], + ]), + new Document([ + '$id' => 'unique1', + 'name' => 'Unique', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }, ignore: true); + + $this->assertSame(2, $count); + $this->assertCount(2, $emittedIds); + + // First occurrence wins + $doc = $database->getDocument($col, 'dup'); + $this->assertSame('First', $doc->getAttribute('name')); + + // Second doc's extra permission should NOT exist (no ACL drift) + $perms = $doc->getPermissions(); + foreach ($perms as $perm) { + $this->assertStringNotContainsString('extra', $perm); + } + + // unique1 should exist + $unique = $database->getDocument($col, 'unique1'); + $this->assertSame('Unique', $unique->getAttribute('name')); + + // Total: 2 documents + $all = $database->find($col); + $this->assertCount(2, $all); + } + + public function testCreateDocumentsIgnoreAllDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + + // Insert initial document + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'existing', + 'name' => 'Original', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + + // With ignore, inserting only duplicates should succeed with no new rows + $emittedIds = []; + $count = $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'existing', + 'name' => 'Duplicate', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }, ignore: true); + + // All duplicates skipped, nothing inserted + $this->assertSame(0, $count); + $this->assertSame([], $emittedIds); + + // Original document should be unchanged + $doc = $database->getDocument(__FUNCTION__, 'existing'); + $this->assertSame('Original', $doc->getAttribute('name')); + + // Still only 1 document + $all = $database->find(__FUNCTION__); + $this->assertCount(1, $all); + } } From 93a9136fa02e1b22b3108b9f2db9d07a936e3115 Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Fri, 10 Apr 2026 07:50:16 +0100 Subject: [PATCH 3/8] Fix Mongo adapter ignore mode: pass ignoreDuplicates to client and fix _uid mapping - Pass ignoreDuplicates option to insertMany for race-condition safety - Extract _uid from raw array before replaceChars (excluded from transformation) - Map inserted records back to original Document objects by _uid --- src/Database/Adapter/Mongo.php | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/src/Database/Adapter/Mongo.php b/src/Database/Adapter/Mongo.php index 71bbc3454..d82e8f8cb 100644 --- a/src/Database/Adapter/Mongo.php +++ b/src/Database/Adapter/Mongo.php @@ -1576,18 +1576,37 @@ public function createDocuments(Document $collection, array $documents, bool $ig } } + if ($ignore) { + $options['ignoreDuplicates'] = true; + } + try { - $documents = $this->client->insertMany($name, $records, $options); + $inserted = $this->client->insertMany($name, $records, $options); } catch (MongoException $e) { throw $this->processException($e); } - foreach ($documents as $index => $document) { - $documents[$index] = $this->replaceChars('_', '$', $this->client->toArray($document)); - $documents[$index] = new Document($documents[$index]); + $insertedUids = []; + foreach ($inserted as $record) { + $arr = $this->client->toArray($record); + // _uid is excluded from replaceChars transformation, so extract it before conversion + $uid = $arr['_uid'] ?? ''; + $arr = $this->replaceChars('_', '$', $arr); + $insertedUids[$uid] = new Document($arr); } - return $documents; + if ($ignore) { + $result = []; + foreach ($records as $i => $record) { + $uid = $record['_uid'] ?? ''; + if (isset($insertedUids[$uid])) { + $result[] = $documents[$i]; + } + } + return $result; + } + + return \array_values($insertedUids); } /** From 2906ddafadf4f734b4bd6fdbccbf262c9f1ac10d Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Fri, 10 Apr 2026 08:00:41 +0100 Subject: [PATCH 4/8] Revert "Fix Mongo adapter ignore mode: pass ignoreDuplicates to client and fix _uid mapping" This reverts commit 93a9136fa02e1b22b3108b9f2db9d07a936e3115. --- src/Database/Adapter/Mongo.php | 29 +++++------------------------ 1 file changed, 5 insertions(+), 24 deletions(-) diff --git a/src/Database/Adapter/Mongo.php b/src/Database/Adapter/Mongo.php index d82e8f8cb..71bbc3454 100644 --- a/src/Database/Adapter/Mongo.php +++ b/src/Database/Adapter/Mongo.php @@ -1576,37 +1576,18 @@ public function createDocuments(Document $collection, array $documents, bool $ig } } - if ($ignore) { - $options['ignoreDuplicates'] = true; - } - try { - $inserted = $this->client->insertMany($name, $records, $options); + $documents = $this->client->insertMany($name, $records, $options); } catch (MongoException $e) { throw $this->processException($e); } - $insertedUids = []; - foreach ($inserted as $record) { - $arr = $this->client->toArray($record); - // _uid is excluded from replaceChars transformation, so extract it before conversion - $uid = $arr['_uid'] ?? ''; - $arr = $this->replaceChars('_', '$', $arr); - $insertedUids[$uid] = new Document($arr); - } - - if ($ignore) { - $result = []; - foreach ($records as $i => $record) { - $uid = $record['_uid'] ?? ''; - if (isset($insertedUids[$uid])) { - $result[] = $documents[$i]; - } - } - return $result; + foreach ($documents as $index => $document) { + $documents[$index] = $this->replaceChars('_', '$', $this->client->toArray($document)); + $documents[$index] = new Document($documents[$index]); } - return \array_values($insertedUids); + return $documents; } /** From 63d9902530f272d6863960dfbc17089519deebb4 Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Fri, 10 Apr 2026 13:45:47 +0100 Subject: [PATCH 5/8] Replace ignore param with skipDuplicates scope guard - Add skipDuplicates(callable) scope guard on Database, following existing pattern (skipRelationships, skipValidation, etc.) - Remove bool $ignore parameter from createDocuments signature - Mirror propagates skipDuplicates state to source and destination - Update tests to use $database->skipDuplicates(function() { ... }) --- src/Database/Database.php | 17 ++- src/Database/Mirror.php | 15 ++- tests/e2e/Adapter/Scopes/DocumentTests.php | 134 +++++++++++---------- 3 files changed, 96 insertions(+), 70 deletions(-) diff --git a/src/Database/Database.php b/src/Database/Database.php index cbcc80273..78a9c5a57 100644 --- a/src/Database/Database.php +++ b/src/Database/Database.php @@ -417,6 +417,8 @@ class Database protected bool $preserveDates = false; + protected bool $skipDuplicates = false; + protected bool $preserveSequence = false; protected int $maxQueryValues = 5000; @@ -842,6 +844,18 @@ public function skipRelationshipsExistCheck(callable $callback): mixed } } + public function skipDuplicates(callable $callback): mixed + { + $previous = $this->skipDuplicates; + $this->skipDuplicates = true; + + try { + return $callback(); + } finally { + $this->skipDuplicates = $previous; + } + } + /** * Trigger callback for events * @@ -5621,7 +5635,6 @@ public function createDocument(string $collection, Document $document): Document * @param int $batchSize * @param (callable(Document): void)|null $onNext * @param (callable(Throwable): void)|null $onError - * @param bool $ignore If true, silently ignore duplicate documents instead of throwing * @return int * @throws AuthorizationException * @throws StructureException @@ -5634,7 +5647,6 @@ public function createDocuments( int $batchSize = self::INSERT_BATCH_SIZE, ?callable $onNext = null, ?callable $onError = null, - bool $ignore = false, ): int { if (!$this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument()) { throw new DatabaseException('Shared tables must be enabled if tenant per document is enabled.'); @@ -5656,6 +5668,7 @@ public function createDocuments( $modified = 0; $tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument(); + $ignore = $this->skipDuplicates; // Deduplicate intra-batch documents by ID (tenant-aware). First occurrence wins. if ($ignore) { diff --git a/src/Database/Mirror.php b/src/Database/Mirror.php index 636d273dd..5f3ae640c 100644 --- a/src/Database/Mirror.php +++ b/src/Database/Mirror.php @@ -600,17 +600,19 @@ public function createDocuments( int $batchSize = self::INSERT_BATCH_SIZE, ?callable $onNext = null, ?callable $onError = null, - bool $ignore = false, ): int { - $modified = $this->source->createDocuments( + $createFn = fn () => $this->source->createDocuments( $collection, $documents, $batchSize, $onNext, $onError, - $ignore, ); + $modified = $this->skipDuplicates + ? $this->source->skipDuplicates($createFn) + : $createFn(); + if ( \in_array($collection, self::SOURCE_ONLY_COLLECTIONS) || $this->destination === null @@ -641,16 +643,19 @@ public function createDocuments( $clones[] = $clone; } - $this->destination->withPreserveDates( + $destFn = fn () => $this->destination->withPreserveDates( fn () => $this->destination->createDocuments( $collection, $clones, $batchSize, - ignore: $ignore, ) ); + $this->skipDuplicates + ? $this->destination->skipDuplicates($destFn) + : $destFn(); + foreach ($clones as $clone) { foreach ($this->writeFilters as $filter) { $filter->afterCreateDocument( diff --git a/tests/e2e/Adapter/Scopes/DocumentTests.php b/tests/e2e/Adapter/Scopes/DocumentTests.php index 34a463200..5f82cc025 100644 --- a/tests/e2e/Adapter/Scopes/DocumentTests.php +++ b/tests/e2e/Adapter/Scopes/DocumentTests.php @@ -7768,28 +7768,31 @@ public function testCreateDocumentsIgnoreDuplicates(): void $this->assertNotEmpty($e->getMessage()); } - // With ignore, duplicates should be silently skipped + // With skipDuplicates, duplicates should be silently skipped $emittedIds = []; - $count = $database->createDocuments(__FUNCTION__, [ - new Document([ - '$id' => 'doc1', - 'name' => 'Duplicate A', - '$permissions' => [ - Permission::read(Role::any()), - Permission::create(Role::any()), - ], - ]), - new Document([ - '$id' => 'doc3', - 'name' => 'New C', - '$permissions' => [ - Permission::read(Role::any()), - Permission::create(Role::any()), - ], - ]), - ], onNext: function (Document $doc) use (&$emittedIds) { - $emittedIds[] = $doc->getId(); - }, ignore: true); + $collection = __FUNCTION__; + $count = $database->skipDuplicates(function () use ($database, $collection, &$emittedIds) { + return $database->createDocuments($collection, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Duplicate A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'doc3', + 'name' => 'New C', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }); + }); // Only doc3 was new, doc1 was skipped as duplicate $this->assertSame(1, $count); @@ -7819,35 +7822,37 @@ public function testCreateDocumentsIgnoreIntraBatchDuplicates(): void // Two docs with same ID in one batch — first wins, second is deduplicated $emittedIds = []; - $count = $database->createDocuments($col, [ - new Document([ - '$id' => 'dup', - 'name' => 'First', - '$permissions' => [ - Permission::read(Role::any()), - Permission::create(Role::any()), - ], - ]), - new Document([ - '$id' => 'dup', - 'name' => 'Second', - '$permissions' => [ - Permission::read(Role::any()), - Permission::create(Role::any()), - Permission::update(Role::user('extra')), - ], - ]), - new Document([ - '$id' => 'unique1', - 'name' => 'Unique', - '$permissions' => [ - Permission::read(Role::any()), - Permission::create(Role::any()), - ], - ]), - ], onNext: function (Document $doc) use (&$emittedIds) { - $emittedIds[] = $doc->getId(); - }, ignore: true); + $count = $database->skipDuplicates(function () use ($database, $col, &$emittedIds) { + return $database->createDocuments($col, [ + new Document([ + '$id' => 'dup', + 'name' => 'First', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'dup', + 'name' => 'Second', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + Permission::update(Role::user('extra')), + ], + ]), + new Document([ + '$id' => 'unique1', + 'name' => 'Unique', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }); + }); $this->assertSame(2, $count); $this->assertCount(2, $emittedIds); @@ -7891,20 +7896,23 @@ public function testCreateDocumentsIgnoreAllDuplicates(): void ]), ]); - // With ignore, inserting only duplicates should succeed with no new rows + // With skipDuplicates, inserting only duplicates should succeed with no new rows $emittedIds = []; - $count = $database->createDocuments(__FUNCTION__, [ - new Document([ - '$id' => 'existing', - 'name' => 'Duplicate', - '$permissions' => [ - Permission::read(Role::any()), - Permission::create(Role::any()), - ], - ]), - ], onNext: function (Document $doc) use (&$emittedIds) { - $emittedIds[] = $doc->getId(); - }, ignore: true); + $collection = __FUNCTION__; + $count = $database->skipDuplicates(function () use ($database, $collection, &$emittedIds) { + return $database->createDocuments($collection, [ + new Document([ + '$id' => 'existing', + 'name' => 'Duplicate', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }); + }); // All duplicates skipped, nothing inserted $this->assertSame(0, $count); From b0a8392d2c8e90c5ca0b49ed24758a36b13f7a0c Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Sun, 12 Apr 2026 11:43:46 +0100 Subject: [PATCH 6/8] Push skipDuplicates scope guard down to Adapter layer - Add $skipDuplicates property + skipDuplicates() scope guard to Adapter - Remove bool $ignore parameter from all adapter createDocuments signatures (Adapter, Pool, Mongo, SQL) - Remove bool $ignore from SQL helper methods (getInsertKeyword, getInsertSuffix, getInsertPermissionsSuffix) and Postgres/SQLite overrides - Pool delegate() and withTransaction() propagate skipDuplicates state to pooled/pinned adapter via the scope guard - Database::createDocuments() wraps adapter call in adapter->skipDuplicates() when the flag is set, drops the local $ignore variable --- src/Database/Adapter.php | 26 ++++++++++++++++++++++++-- src/Database/Adapter/Mongo.php | 4 ++-- src/Database/Adapter/Pool.php | 11 +++++++---- src/Database/Adapter/Postgres.php | 10 +++++----- src/Database/Adapter/SQL.php | 22 +++++++++++----------- src/Database/Adapter/SQLite.php | 4 ++-- src/Database/Database.php | 18 ++++++++++-------- 7 files changed, 61 insertions(+), 34 deletions(-) diff --git a/src/Database/Adapter.php b/src/Database/Adapter.php index 9cc83d141..26fe8249c 100644 --- a/src/Database/Adapter.php +++ b/src/Database/Adapter.php @@ -33,6 +33,29 @@ abstract class Adapter protected bool $alterLocks = false; + protected bool $skipDuplicates = false; + + /** + * Run a callback with skipDuplicates enabled. + * Duplicate key errors during createDocuments() will be silently skipped + * instead of thrown. Nestable — saves and restores previous state. + * + * @template T + * @param callable(): T $callback + * @return T + */ + public function skipDuplicates(callable $callback): mixed + { + $previous = $this->skipDuplicates; + $this->skipDuplicates = true; + + try { + return $callback(); + } finally { + $this->skipDuplicates = $previous; + } + } + /** * @var array */ @@ -729,13 +752,12 @@ abstract public function createDocument(Document $collection, Document $document * * @param Document $collection * @param array $documents - * @param bool $ignore If true, silently ignore duplicate documents instead of throwing * * @return array * * @throws DatabaseException */ - abstract public function createDocuments(Document $collection, array $documents, bool $ignore = false): array; + abstract public function createDocuments(Document $collection, array $documents): array; /** * Update Document diff --git a/src/Database/Adapter/Mongo.php b/src/Database/Adapter/Mongo.php index 71bbc3454..c788aebba 100644 --- a/src/Database/Adapter/Mongo.php +++ b/src/Database/Adapter/Mongo.php @@ -1460,7 +1460,7 @@ public function castingBefore(Document $collection, Document $document): Documen * @throws DuplicateException * @throws DatabaseException */ - public function createDocuments(Document $collection, array $documents, bool $ignore = false): array + public function createDocuments(Document $collection, array $documents): array { $name = $this->getNamespace() . '_' . $this->filter($collection->getId()); $options = $this->getTransactionOptions(); @@ -1488,7 +1488,7 @@ public function createDocuments(Document $collection, array $documents, bool $ig } // Pre-filter duplicates within the session to avoid aborting the transaction. - if ($ignore && !empty($records)) { + if ($this->skipDuplicates && !empty($records)) { $existingKeys = []; try { diff --git a/src/Database/Adapter/Pool.php b/src/Database/Adapter/Pool.php index ec508c3e4..10ae1de90 100644 --- a/src/Database/Adapter/Pool.php +++ b/src/Database/Adapter/Pool.php @@ -43,7 +43,8 @@ public function __construct(UtopiaPool $pool) public function delegate(string $method, array $args): mixed { if ($this->pinnedAdapter !== null) { - return $this->pinnedAdapter->{$method}(...$args); + $invoke = fn () => $this->pinnedAdapter->{$method}(...$args); + return $this->skipDuplicates ? $this->pinnedAdapter->skipDuplicates($invoke) : $invoke(); } return $this->pool->use(function (Adapter $adapter) use ($method, $args) { @@ -66,7 +67,8 @@ public function delegate(string $method, array $args): mixed $adapter->setMetadata($key, $value); } - return $adapter->{$method}(...$args); + $invoke = fn () => $adapter->{$method}(...$args); + return $this->skipDuplicates ? $adapter->skipDuplicates($invoke) : $invoke(); }); } @@ -146,7 +148,8 @@ public function withTransaction(callable $callback): mixed $this->pinnedAdapter = $adapter; try { - return $adapter->withTransaction($callback); + $invoke = fn () => $adapter->withTransaction($callback); + return $this->skipDuplicates ? $adapter->skipDuplicates($invoke) : $invoke(); } finally { $this->pinnedAdapter = null; } @@ -268,7 +271,7 @@ public function createDocument(Document $collection, Document $document): Docume return $this->delegate(__FUNCTION__, \func_get_args()); } - public function createDocuments(Document $collection, array $documents, bool $ignore = false): array + public function createDocuments(Document $collection, array $documents): array { return $this->delegate(__FUNCTION__, \func_get_args()); } diff --git a/src/Database/Adapter/Postgres.php b/src/Database/Adapter/Postgres.php index 814ecc8bc..03036dd2f 100644 --- a/src/Database/Adapter/Postgres.php +++ b/src/Database/Adapter/Postgres.php @@ -1365,14 +1365,14 @@ public function updateDocument(Document $collection, string $id, Document $docum return $document; } - protected function getInsertKeyword(bool $ignore): string + protected function getInsertKeyword(): string { return 'INSERT INTO'; } - protected function getInsertSuffix(bool $ignore, string $table): string + protected function getInsertSuffix(string $table): string { - if (!$ignore) { + if (!$this->skipDuplicates) { return ''; } @@ -1381,9 +1381,9 @@ protected function getInsertSuffix(bool $ignore, string $table): string return "ON CONFLICT {$conflictTarget} DO NOTHING"; } - protected function getInsertPermissionsSuffix(bool $ignore): string + protected function getInsertPermissionsSuffix(): string { - if (!$ignore) { + if (!$this->skipDuplicates) { return ''; } diff --git a/src/Database/Adapter/SQL.php b/src/Database/Adapter/SQL.php index 6c918ef15..4fb58ae3f 100644 --- a/src/Database/Adapter/SQL.php +++ b/src/Database/Adapter/SQL.php @@ -2471,14 +2471,14 @@ protected function execute(mixed $stmt): bool * @throws DuplicateException * @throws \Throwable */ - public function createDocuments(Document $collection, array $documents, bool $ignore = false): array + public function createDocuments(Document $collection, array $documents): array { if (empty($documents)) { return $documents; } // Pre-filter existing UIDs to prevent race-condition duplicates. - if ($ignore) { + if ($this->skipDuplicates) { $collectionId = $collection->getId(); $name = $this->filter($collectionId); $uids = \array_filter(\array_map(fn (Document $doc) => $doc->getId(), $documents), fn ($v) => $v !== null); @@ -2649,9 +2649,9 @@ public function createDocuments(Document $collection, array $documents, bool $ig $batchKeys = \implode(', ', $batchKeys); $stmt = $this->getPDO()->prepare(" - {$this->getInsertKeyword($ignore)} {$this->getSQLTable($name)} {$columns} + {$this->getInsertKeyword()} {$this->getSQLTable($name)} {$columns} VALUES {$batchKeys} - {$this->getInsertSuffix($ignore, $name)} + {$this->getInsertSuffix($name)} "); foreach ($bindValues as $key => $value) { @@ -2661,7 +2661,7 @@ public function createDocuments(Document $collection, array $documents, bool $ig $this->execute($stmt); // Reconcile returned docs with actual inserts when a race condition skipped rows. - if ($ignore && $stmt->rowCount() < \count($documents)) { + if ($this->skipDuplicates && $stmt->rowCount() < \count($documents)) { $expectedTimestamps = []; foreach ($documents as $doc) { $eKey = ($this->sharedTables && $this->tenantPerDocument) @@ -2762,9 +2762,9 @@ public function createDocuments(Document $collection, array $documents, bool $ig $permissions = \implode(', ', $permissions); $sqlPermissions = " - {$this->getInsertKeyword($ignore)} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) + {$this->getInsertKeyword()} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) VALUES {$permissions} - {$this->getInsertPermissionsSuffix($ignore)} + {$this->getInsertPermissionsSuffix()} "; $stmtPermissions = $this->getPDO()->prepare($sqlPermissions); @@ -2787,16 +2787,16 @@ public function createDocuments(Document $collection, array $documents, bool $ig * Returns the INSERT keyword, optionally with IGNORE for duplicate handling. * Override in adapter subclasses for DB-specific syntax. */ - protected function getInsertKeyword(bool $ignore): string + protected function getInsertKeyword(): string { - return $ignore ? 'INSERT IGNORE INTO' : 'INSERT INTO'; + return $this->skipDuplicates ? 'INSERT IGNORE INTO' : 'INSERT INTO'; } /** * Returns a suffix appended after VALUES clause for duplicate handling. * Override in adapter subclasses (e.g., Postgres uses ON CONFLICT DO NOTHING). */ - protected function getInsertSuffix(bool $ignore, string $table): string + protected function getInsertSuffix(string $table): string { return ''; } @@ -2805,7 +2805,7 @@ protected function getInsertSuffix(bool $ignore, string $table): string * Returns a suffix for the permissions INSERT statement when ignoring duplicates. * Override in adapter subclasses for DB-specific syntax. */ - protected function getInsertPermissionsSuffix(bool $ignore): string + protected function getInsertPermissionsSuffix(): string { return ''; } diff --git a/src/Database/Adapter/SQLite.php b/src/Database/Adapter/SQLite.php index 8e7ef6b5b..2732f784b 100644 --- a/src/Database/Adapter/SQLite.php +++ b/src/Database/Adapter/SQLite.php @@ -34,9 +34,9 @@ */ class SQLite extends MariaDB { - protected function getInsertKeyword(bool $ignore): string + protected function getInsertKeyword(): string { - return $ignore ? 'INSERT OR IGNORE INTO' : 'INSERT INTO'; + return $this->skipDuplicates ? 'INSERT OR IGNORE INTO' : 'INSERT INTO'; } /** diff --git a/src/Database/Database.php b/src/Database/Database.php index 78a9c5a57..146ceacaf 100644 --- a/src/Database/Database.php +++ b/src/Database/Database.php @@ -5668,10 +5668,9 @@ public function createDocuments( $modified = 0; $tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument(); - $ignore = $this->skipDuplicates; // Deduplicate intra-batch documents by ID (tenant-aware). First occurrence wins. - if ($ignore) { + if ($this->skipDuplicates) { $seenIds = []; $deduplicated = []; foreach ($documents as $document) { @@ -5692,7 +5691,7 @@ public function createDocuments( // Pre-fetch existing IDs to skip relationship writes for known duplicates $preExistingIds = []; - if ($ignore) { + if ($this->skipDuplicates) { if ($tenantPerDocument) { $idsByTenant = []; foreach ($documents as $doc) { @@ -5739,7 +5738,7 @@ public function createDocuments( /** @var array> $deferredRelationships */ $deferredRelationships = []; $relationships = []; - if ($ignore && $this->resolveRelationships) { + if ($this->skipDuplicates && $this->resolveRelationships) { $relationships = \array_filter($collection->getAttribute('attributes', []), fn ($attr) => $attr['type'] === self::VAR_RELATIONSHIP); } @@ -5813,7 +5812,7 @@ public function createDocuments( } foreach (\array_chunk($documents, $batchSize) as $chunk) { - if ($ignore && !empty($preExistingIds)) { + if ($this->skipDuplicates && !empty($preExistingIds)) { $chunk = \array_values(\array_filter($chunk, function (Document $doc) use ($preExistingIds, $tenantPerDocument) { $key = $tenantPerDocument ? $doc->getTenant() . ':' . $doc->getId() @@ -5825,12 +5824,15 @@ public function createDocuments( } } - $batch = $this->withTransaction(function () use ($collection, $chunk, $ignore) { - return $this->adapter->createDocuments($collection, $chunk, $ignore); + $batch = $this->withTransaction(function () use ($collection, $chunk) { + $createFn = fn () => $this->adapter->createDocuments($collection, $chunk); + return $this->skipDuplicates + ? $this->adapter->skipDuplicates($createFn) + : $createFn(); }); // Create deferred relationships only for docs that were actually inserted - if ($ignore && $this->resolveRelationships && \count($deferredRelationships) > 0) { + if ($this->skipDuplicates && $this->resolveRelationships && \count($deferredRelationships) > 0) { foreach ($batch as $insertedDoc) { $deferKey = $tenantPerDocument ? $insertedDoc->getTenant() . ':' . $insertedDoc->getId() From d8f647c33c3bd3073c210f4c9055319c2efb70e4 Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Sun, 12 Apr 2026 14:32:58 +0100 Subject: [PATCH 7/8] Refactor: extract helpers and collapse conditional wraps - skipDuplicates() takes optional $enable so Pool/Mirror/Database can forward state in a single call instead of branching - Database: tenantKey(Document) helper replaces ~8 inline ternaries - Database: fetchExistingByIds() helper replaces the tenant-grouped batched find() in createDocuments and upsertDocuments pre-fetch - Database: drop dead guard on non-defer relationship path - Mongo: findExistingUids() helper replaces the duplicated cursor walk in the skipDuplicates pre-filter (tenant and non-tenant variants) - SQL: buildUidTenantLookup() helper replaces the duplicated placeholder-and-binds builder shared by pre-filter and reconciliation --- src/Database/Adapter.php | 9 +- src/Database/Adapter/Mongo.php | 97 ++++++++------ src/Database/Adapter/Pool.php | 18 ++- src/Database/Adapter/SQL.php | 136 ++++++++++---------- src/Database/Database.php | 225 +++++++++++++++------------------ src/Database/Mirror.php | 39 +++--- 6 files changed, 259 insertions(+), 265 deletions(-) diff --git a/src/Database/Adapter.php b/src/Database/Adapter.php index 26fe8249c..757d0c464 100644 --- a/src/Database/Adapter.php +++ b/src/Database/Adapter.php @@ -39,13 +39,20 @@ abstract class Adapter * Run a callback with skipDuplicates enabled. * Duplicate key errors during createDocuments() will be silently skipped * instead of thrown. Nestable — saves and restores previous state. + * Pass $enable = false to run the callback without toggling the flag + * (useful for conditional forwarding from wrappers like Pool/Mirror). * * @template T * @param callable(): T $callback + * @param bool $enable * @return T */ - public function skipDuplicates(callable $callback): mixed + public function skipDuplicates(callable $callback, bool $enable = true): mixed { + if (!$enable) { + return $callback(); + } + $previous = $this->skipDuplicates; $this->skipDuplicates = true; diff --git a/src/Database/Adapter/Mongo.php b/src/Database/Adapter/Mongo.php index c788aebba..3f6bbc354 100644 --- a/src/Database/Adapter/Mongo.php +++ b/src/Database/Adapter/Mongo.php @@ -1449,6 +1449,42 @@ public function castingBefore(Document $collection, Document $document): Documen return $document; } + /** + * Find existing `_uid` values matching the given filters, fully walking the cursor. + * + * @param string $name + * @param array $filters + * @param int $batchSize Hint: expected max result count — lets MongoDB return everything in one batch + * @return array + * + * @throws MongoException + */ + private function findExistingUids(string $name, array $filters, int $batchSize): array + { + $options = $this->getTransactionOptions([ + 'projection' => ['_uid' => 1], + 'batchSize' => $batchSize, + ]); + + $response = $this->client->find($name, $filters, $options); + $uids = []; + + foreach ($response->cursor->firstBatch ?? [] as $doc) { + $uids[] = $doc->_uid; + } + + $cursorId = $response->cursor->id ?? 0; + while ($cursorId != 0) { + $more = $this->client->getMore($cursorId, $name, $batchSize); + foreach ($more->cursor->nextBatch ?? [] as $doc) { + $uids[] = $doc->_uid; + } + $cursorId = $more->cursor->id ?? 0; + } + + return $uids; + } + /** * Create Documents in batches * @@ -1463,8 +1499,8 @@ public function castingBefore(Document $collection, Document $document): Documen public function createDocuments(Document $collection, array $documents): array { $name = $this->getNamespace() . '_' . $this->filter($collection->getId()); - $options = $this->getTransactionOptions(); + $options = $this->getTransactionOptions(); $records = []; $hasSequence = null; $documents = \array_map(fn ($doc) => clone $doc, $documents); @@ -1487,12 +1523,18 @@ public function createDocuments(Document $collection, array $documents): array $records[] = $record; } - // Pre-filter duplicates within the session to avoid aborting the transaction. + // Pre-filter duplicates within the session — MongoDB has no INSERT IGNORE, + // so sending a duplicate would abort the entire transaction. if ($this->skipDuplicates && !empty($records)) { + $tenantPerDoc = $this->sharedTables && $this->tenantPerDocument; + $recordKey = fn (array $record): string => $tenantPerDoc + ? ($record['_tenant'] ?? $this->getTenant()) . ':' . ($record['_uid'] ?? '') + : ($record['_uid'] ?? ''); + $existingKeys = []; try { - if ($this->sharedTables && $this->tenantPerDocument) { + if ($tenantPerDoc) { $idsByTenant = []; foreach ($records as $record) { $uid = $record['_uid'] ?? ''; @@ -1505,47 +1547,23 @@ public function createDocuments(Document $collection, array $documents): array foreach ($idsByTenant as $tenant => $tenantUids) { $tenantUids = \array_values(\array_unique($tenantUids)); - $findOptions = $this->getTransactionOptions([ - 'projection' => ['_uid' => 1], - 'batchSize' => \count($tenantUids), - ]); $filters = ['_uid' => ['$in' => $tenantUids], '_tenant' => $tenant]; - $response = $this->client->find($name, $filters, $findOptions); - foreach ($response->cursor->firstBatch ?? [] as $doc) { - $existingKeys[$tenant . ':' . $doc->_uid] = true; - } - $cursorId = $response->cursor->id ?? 0; - while ($cursorId != 0) { - $more = $this->client->getMore($cursorId, $name, \count($tenantUids)); - foreach ($more->cursor->nextBatch ?? [] as $doc) { - $existingKeys[$tenant . ':' . $doc->_uid] = true; - } - $cursorId = $more->cursor->id ?? 0; + foreach ($this->findExistingUids($name, $filters, \count($tenantUids)) as $uid) { + $existingKeys[$tenant . ':' . $uid] = true; } } } else { - $uids = \array_filter(\array_map(fn ($r) => $r['_uid'] ?? null, $records), fn ($v) => $v !== null); + $uids = \array_values(\array_unique(\array_filter( + \array_map(fn ($r) => $r['_uid'] ?? null, $records), + fn ($v) => $v !== null + ))); if (!empty($uids)) { - $uidValues = \array_values(\array_unique($uids)); - $findOptions = $this->getTransactionOptions([ - 'projection' => ['_uid' => 1], - 'batchSize' => \count($uidValues), - ]); - $filters = ['_uid' => ['$in' => $uidValues]]; + $filters = ['_uid' => ['$in' => $uids]]; if ($this->sharedTables) { $filters['_tenant'] = $this->getTenantFilters($collection->getId()); } - $response = $this->client->find($name, $filters, $findOptions); - foreach ($response->cursor->firstBatch ?? [] as $doc) { - $existingKeys[$doc->_uid] = true; - } - $cursorId = $response->cursor->id ?? 0; - while ($cursorId != 0) { - $more = $this->client->getMore($cursorId, $name, \count($uidValues)); - foreach ($more->cursor->nextBatch ?? [] as $doc) { - $existingKeys[$doc->_uid] = true; - } - $cursorId = $more->cursor->id ?? 0; + foreach ($this->findExistingUids($name, $filters, \count($uids)) as $uid) { + $existingKeys[$uid] = true; } } } @@ -1556,13 +1574,8 @@ public function createDocuments(Document $collection, array $documents): array if (!empty($existingKeys)) { $filteredRecords = []; $filteredDocuments = []; - $tenantPerDoc = $this->sharedTables && $this->tenantPerDocument; foreach ($records as $i => $record) { - $uid = $record['_uid'] ?? ''; - $key = $tenantPerDoc - ? ($record['_tenant'] ?? $this->getTenant()) . ':' . $uid - : $uid; - if (!isset($existingKeys[$key])) { + if (!isset($existingKeys[$recordKey($record)])) { $filteredRecords[] = $record; $filteredDocuments[] = $documents[$i]; } diff --git a/src/Database/Adapter/Pool.php b/src/Database/Adapter/Pool.php index 10ae1de90..648372402 100644 --- a/src/Database/Adapter/Pool.php +++ b/src/Database/Adapter/Pool.php @@ -43,8 +43,10 @@ public function __construct(UtopiaPool $pool) public function delegate(string $method, array $args): mixed { if ($this->pinnedAdapter !== null) { - $invoke = fn () => $this->pinnedAdapter->{$method}(...$args); - return $this->skipDuplicates ? $this->pinnedAdapter->skipDuplicates($invoke) : $invoke(); + return $this->pinnedAdapter->skipDuplicates( + fn () => $this->pinnedAdapter->{$method}(...$args), + $this->skipDuplicates + ); } return $this->pool->use(function (Adapter $adapter) use ($method, $args) { @@ -67,8 +69,10 @@ public function delegate(string $method, array $args): mixed $adapter->setMetadata($key, $value); } - $invoke = fn () => $adapter->{$method}(...$args); - return $this->skipDuplicates ? $adapter->skipDuplicates($invoke) : $invoke(); + return $adapter->skipDuplicates( + fn () => $adapter->{$method}(...$args), + $this->skipDuplicates + ); }); } @@ -148,8 +152,10 @@ public function withTransaction(callable $callback): mixed $this->pinnedAdapter = $adapter; try { - $invoke = fn () => $adapter->withTransaction($callback); - return $this->skipDuplicates ? $adapter->skipDuplicates($invoke) : $invoke(); + return $adapter->skipDuplicates( + fn () => $adapter->withTransaction($callback), + $this->skipDuplicates + ); } finally { $this->pinnedAdapter = null; } diff --git a/src/Database/Adapter/SQL.php b/src/Database/Adapter/SQL.php index 4fb58ae3f..07994dbe9 100644 --- a/src/Database/Adapter/SQL.php +++ b/src/Database/Adapter/SQL.php @@ -2460,6 +2460,59 @@ protected function execute(mixed $stmt): bool return $stmt->execute(); } + /** + * Build a `_uid IN (...) [AND _tenant ...]` WHERE clause (and matching binds) + * for a batch of documents. Used by the skipDuplicates pre-filter and the + * race-condition reconciliation path. + * + * @param array $documents + * @param string $bindPrefix Prefix for bind keys; must differ between concurrent uses. + * @return array{where: string, tenantSelect: string, binds: array} + */ + private function buildUidTenantLookup(array $documents, string $bindPrefix): array + { + $binds = []; + $placeholders = []; + $uids = \array_values(\array_unique(\array_filter( + \array_map(fn (Document $doc) => $doc->getId(), $documents), + fn ($v) => $v !== null + ))); + foreach ($uids as $i => $uid) { + $key = ':' . $bindPrefix . 'uid_' . $i; + $placeholders[] = $key; + $binds[$key] = $uid; + } + + $tenantFilter = ''; + $tenantSelect = ''; + if ($this->sharedTables) { + if ($this->tenantPerDocument) { + $tenants = \array_values(\array_unique(\array_filter( + \array_map(fn (Document $doc) => $doc->getTenant(), $documents), + fn ($v) => $v !== null + ))); + $tenantPlaceholders = []; + foreach ($tenants as $j => $tenant) { + $tKey = ':' . $bindPrefix . 'tenant_' . $j; + $tenantPlaceholders[] = $tKey; + $binds[$tKey] = $tenant; + } + $tenantFilter = ' AND _tenant IN (' . \implode(', ', $tenantPlaceholders) . ')'; + $tenantSelect = ', _tenant'; + } else { + $tenantKey = ':' . $bindPrefix . 'tenant'; + $tenantFilter = ' AND _tenant = ' . $tenantKey; + $binds[$tenantKey] = $this->getTenant(); + } + } + + return [ + 'where' => '_uid IN (' . \implode(', ', $placeholders) . ')' . $tenantFilter, + 'tenantSelect' => $tenantSelect, + 'binds' => $binds, + ]; + } + /** * Create Documents in batches * @@ -2479,47 +2532,17 @@ public function createDocuments(Document $collection, array $documents): array // Pre-filter existing UIDs to prevent race-condition duplicates. if ($this->skipDuplicates) { - $collectionId = $collection->getId(); - $name = $this->filter($collectionId); - $uids = \array_filter(\array_map(fn (Document $doc) => $doc->getId(), $documents), fn ($v) => $v !== null); + $name = $this->filter($collection->getId()); + $lookup = $this->buildUidTenantLookup($documents, '_dup_'); - if (!empty($uids)) { + if (!empty($lookup['binds'])) { try { - $placeholders = []; - $binds = []; - foreach (\array_values(\array_unique($uids)) as $i => $uid) { - $key = ':_dup_uid_' . $i; - $placeholders[] = $key; - $binds[$key] = $uid; - } - - $tenantFilter = ''; - if ($this->sharedTables) { - if ($this->tenantPerDocument) { - $tenants = \array_values(\array_unique(\array_filter( - \array_map(fn (Document $doc) => $doc->getTenant(), $documents), - fn ($v) => $v !== null - ))); - $tenantPlaceholders = []; - foreach ($tenants as $j => $tenant) { - $tKey = ':_dup_tenant_' . $j; - $tenantPlaceholders[] = $tKey; - $binds[$tKey] = $tenant; - } - $tenantFilter = ' AND _tenant IN (' . \implode(', ', $tenantPlaceholders) . ')'; - } else { - $tenantFilter = ' AND _tenant = :_dup_tenant'; - $binds[':_dup_tenant'] = $this->getTenant(); - } - } - - $tenantSelect = $this->sharedTables && $this->tenantPerDocument ? ', _tenant' : ''; - $sql = 'SELECT _uid' . $tenantSelect . ' FROM ' . $this->getSQLTable($name) - . ' WHERE _uid IN (' . \implode(', ', $placeholders) . ')' - . $tenantFilter; + $sql = 'SELECT _uid' . $lookup['tenantSelect'] + . ' FROM ' . $this->getSQLTable($name) + . ' WHERE ' . $lookup['where']; $stmt = $this->getPDO()->prepare($sql); - foreach ($binds as $k => $v) { + foreach ($lookup['binds'] as $k => $v) { $stmt->bindValue($k, $v, $this->getPDOType($v)); } $stmt->execute(); @@ -2670,44 +2693,13 @@ public function createDocuments(Document $collection, array $documents): array $expectedTimestamps[$eKey] = $doc->getCreatedAt(); } - $verifyPlaceholders = []; - $verifyBinds = []; - $rawUids = \array_values(\array_unique(\array_map(fn (Document $doc) => $doc->getId(), $documents))); - foreach ($rawUids as $idx => $uid) { - $ph = ':_vfy_' . $idx; - $verifyPlaceholders[] = $ph; - $verifyBinds[$ph] = $uid; - } - - $tenantWhere = ''; - $tenantSelect = ''; - if ($this->sharedTables) { - if ($this->tenantPerDocument) { - $tenants = \array_values(\array_unique(\array_filter( - \array_map(fn (Document $doc) => $doc->getTenant(), $documents), - fn ($v) => $v !== null - ))); - $tenantPlaceholders = []; - foreach ($tenants as $j => $tenant) { - $tKey = ':_vfy_tenant_' . $j; - $tenantPlaceholders[] = $tKey; - $verifyBinds[$tKey] = $tenant; - } - $tenantWhere = ' AND _tenant IN (' . \implode(', ', $tenantPlaceholders) . ')'; - $tenantSelect = ', _tenant'; - } else { - $tenantWhere = ' AND _tenant = :_vfy_tenant'; - $verifyBinds[':_vfy_tenant'] = $this->getTenant(); - } - } - - $verifySql = 'SELECT _uid' . $tenantSelect . ', ' . $this->quote($this->filter('_createdAt')) + $lookup = $this->buildUidTenantLookup($documents, '_vfy_'); + $verifySql = 'SELECT _uid' . $lookup['tenantSelect'] . ', ' . $this->quote($this->filter('_createdAt')) . ' FROM ' . $this->getSQLTable($name) - . ' WHERE _uid IN (' . \implode(', ', $verifyPlaceholders) . ')' - . $tenantWhere; + . ' WHERE ' . $lookup['where']; $verifyStmt = $this->getPDO()->prepare($verifySql); - foreach ($verifyBinds as $k => $v) { + foreach ($lookup['binds'] as $k => $v) { $verifyStmt->bindValue($k, $v, $this->getPDOType($v)); } $verifyStmt->execute(); diff --git a/src/Database/Database.php b/src/Database/Database.php index 146ceacaf..626334c2b 100644 --- a/src/Database/Database.php +++ b/src/Database/Database.php @@ -844,8 +844,12 @@ public function skipRelationshipsExistCheck(callable $callback): mixed } } - public function skipDuplicates(callable $callback): mixed + public function skipDuplicates(callable $callback, bool $enable = true): mixed { + if (!$enable) { + return $callback(); + } + $previous = $this->skipDuplicates; $this->skipDuplicates = true; @@ -856,6 +860,83 @@ public function skipDuplicates(callable $callback): mixed } } + /** + * Build a tenant-aware identity key for a document. + * Returns ":" in tenant-per-document shared-table mode, otherwise just the id. + */ + private function tenantKey(Document $document): string + { + return ($this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument()) + ? $document->getTenant() . ':' . $document->getId() + : $document->getId(); + } + + /** + * Batch-fetch documents by ID from a collection, keyed by tenantKey(). + * Chunks by maxQueryValues and groups queries by tenant in tenant-per-document mode. + * + * @param Document $collection + * @param array $documents Source documents (only IDs are read) + * @param bool $idsOnly When true, uses Query::select(['$id']) for a lighter fetch + * @return array + */ + private function fetchExistingByIds(Document $collection, array $documents, bool $idsOnly = false): array + { + $collectionId = $collection->getId(); + $tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument(); + $result = []; + + $buildQueries = function (array $chunk) use ($idsOnly) { + $queries = [Query::equal('$id', $chunk)]; + if ($idsOnly) { + $queries[] = Query::select(['$id']); + } + $queries[] = Query::limit(\count($chunk)); + return $queries; + }; + + if ($tenantPerDocument) { + $idsByTenant = []; + foreach ($documents as $doc) { + $id = $doc->getId(); + if ($id !== '') { + $idsByTenant[$doc->getTenant()][] = $id; + } + } + foreach ($idsByTenant as $tenant => $tenantIds) { + $tenantIds = \array_values(\array_unique($tenantIds)); + foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) { + $fetched = $this->authorization->skip( + fn () => $this->withTenant( + $tenant, + fn () => $this->silent(fn () => $this->find($collectionId, $buildQueries($idChunk))) + ) + ); + foreach ($fetched as $doc) { + $result[$tenant . ':' . $doc->getId()] = $doc; + } + } + } + + return $result; + } + + $ids = \array_values(\array_unique(\array_filter( + \array_map(fn (Document $doc) => $doc->getId(), $documents), + fn ($v) => $v !== null && $v !== '' + ))); + foreach (\array_chunk($ids, \max(1, $this->maxQueryValues)) as $idChunk) { + $fetched = $this->authorization->skip( + fn () => $this->silent(fn () => $this->find($collectionId, $buildQueries($idChunk))) + ); + foreach ($fetched as $doc) { + $result[$doc->getId()] = $doc; + } + } + + return $result; + } + /** * Trigger callback for events * @@ -5667,18 +5748,13 @@ public function createDocuments( $time = DateTime::now(); $modified = 0; - $tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument(); - // Deduplicate intra-batch documents by ID (tenant-aware). First occurrence wins. if ($this->skipDuplicates) { $seenIds = []; $deduplicated = []; foreach ($documents as $document) { - $docId = $document->getId(); - if ($docId !== '') { - $dedupeKey = $tenantPerDocument - ? $document->getTenant() . ':' . $docId - : $docId; + if ($document->getId() !== '') { + $dedupeKey = $this->tenantKey($document); if (isset($seenIds[$dedupeKey])) { continue; } @@ -5690,50 +5766,9 @@ public function createDocuments( } // Pre-fetch existing IDs to skip relationship writes for known duplicates - $preExistingIds = []; - if ($this->skipDuplicates) { - if ($tenantPerDocument) { - $idsByTenant = []; - foreach ($documents as $doc) { - $idsByTenant[$doc->getTenant()][] = $doc->getId(); - } - foreach ($idsByTenant as $tenant => $tenantIds) { - $tenantIds = \array_values(\array_unique(\array_filter($tenantIds, fn ($v) => $v !== null))); - foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) { - $existing = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find( - $collection->getId(), - [ - Query::equal('$id', $idChunk), - Query::select(['$id']), - Query::limit(\count($idChunk)), - ] - )))); - foreach ($existing as $doc) { - $preExistingIds[$tenant . ':' . $doc->getId()] = true; - } - } - } - } else { - $inputIds = \array_values(\array_unique(\array_filter( - \array_map(fn (Document $doc) => $doc->getId(), $documents), - fn ($v) => $v !== null - ))); - - foreach (\array_chunk($inputIds, \max(1, $this->maxQueryValues)) as $idChunk) { - $existing = $this->authorization->skip(fn () => $this->silent(fn () => $this->find( - $collection->getId(), - [ - Query::equal('$id', $idChunk), - Query::select(['$id']), - Query::limit(\count($idChunk)), - ] - ))); - foreach ($existing as $doc) { - $preExistingIds[$doc->getId()] = true; - } - } - } - } + $preExistingIds = $this->skipDuplicates + ? $this->fetchExistingByIds($collection, $documents, idsOnly: true) + : []; /** @var array> $deferredRelationships */ $deferredRelationships = []; @@ -5782,7 +5817,7 @@ public function createDocuments( } if ($this->resolveRelationships && !empty($relationships)) { - // Defer: store relationship data, strip attributes for INSERT. + // Defer relationship writes until after INSERT so we don't orphan records for duplicates. $relationshipData = []; foreach ($relationships as $rel) { $key = $rel['key']; @@ -5793,19 +5828,10 @@ public function createDocuments( $document->removeAttribute($key); } if (!empty($relationshipData)) { - $deferKey = $tenantPerDocument - ? $document->getTenant() . ':' . $document->getId() - : $document->getId(); - $deferredRelationships[$deferKey] = $relationshipData; + $deferredRelationships[$this->tenantKey($document)] = $relationshipData; } } elseif ($this->resolveRelationships) { - $preExistKey = $tenantPerDocument - ? $document->getTenant() . ':' . $document->getId() - : $document->getId(); - - if (!isset($preExistingIds[$preExistKey])) { - $document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document)); - } + $document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document)); } $document = $this->adapter->castingBefore($collection, $document); @@ -5813,30 +5839,24 @@ public function createDocuments( foreach (\array_chunk($documents, $batchSize) as $chunk) { if ($this->skipDuplicates && !empty($preExistingIds)) { - $chunk = \array_values(\array_filter($chunk, function (Document $doc) use ($preExistingIds, $tenantPerDocument) { - $key = $tenantPerDocument - ? $doc->getTenant() . ':' . $doc->getId() - : $doc->getId(); - return !isset($preExistingIds[$key]); - })); + $chunk = \array_values(\array_filter( + $chunk, + fn (Document $doc) => !isset($preExistingIds[$this->tenantKey($doc)]) + )); if (empty($chunk)) { continue; } } - $batch = $this->withTransaction(function () use ($collection, $chunk) { - $createFn = fn () => $this->adapter->createDocuments($collection, $chunk); - return $this->skipDuplicates - ? $this->adapter->skipDuplicates($createFn) - : $createFn(); - }); + $batch = $this->withTransaction(fn () => $this->adapter->skipDuplicates( + fn () => $this->adapter->createDocuments($collection, $chunk), + $this->skipDuplicates + )); // Create deferred relationships only for docs that were actually inserted if ($this->skipDuplicates && $this->resolveRelationships && \count($deferredRelationships) > 0) { foreach ($batch as $insertedDoc) { - $deferKey = $tenantPerDocument - ? $insertedDoc->getTenant() . ':' . $insertedDoc->getId() - : $insertedDoc->getId(); + $deferKey = $this->tenantKey($insertedDoc); if (\array_key_exists($deferKey, $deferredRelationships)) { $relDoc = clone $insertedDoc; foreach ($deferredRelationships[$deferKey] as $key => $value) { @@ -7262,49 +7282,10 @@ public function upsertDocumentsWithIncrease( $seenIds = []; // Batch-fetch existing documents in one query instead of N individual getDocument() calls - $ids = \array_filter(\array_map(fn ($doc) => $doc->getId(), $documents), fn ($v) => $v !== null); - $existingDocs = []; - $upsertTenantPerDocument = $this->getSharedTables() && $this->getTenantPerDocument(); - - if (!empty($ids)) { - $uniqueIds = \array_values(\array_unique($ids)); - - if ($upsertTenantPerDocument) { - $idsByTenant = []; - foreach ($documents as $doc) { - $tenant = $doc->getTenant(); - $idsByTenant[$tenant][] = $doc->getId(); - } - foreach ($idsByTenant as $tenant => $tenantIds) { - $tenantIds = \array_values(\array_unique(\array_filter($tenantIds, fn ($v) => $v !== null))); - foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) { - $fetched = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find( - $collection->getId(), - [Query::equal('$id', $idChunk), Query::limit(\count($idChunk))], - )))); - foreach ($fetched as $doc) { - $existingDocs[$tenant . ':' . $doc->getId()] = $doc; - } - } - } - } else { - foreach (\array_chunk($uniqueIds, \max(1, $this->maxQueryValues)) as $idChunk) { - $fetched = $this->authorization->skip(fn () => $this->silent(fn () => $this->find( - $collection->getId(), - [Query::equal('$id', $idChunk), Query::limit(\count($idChunk))], - ))); - foreach ($fetched as $doc) { - $existingDocs[$doc->getId()] = $doc; - } - } - } - } + $existingDocs = $this->fetchExistingByIds($collection, $documents); foreach ($documents as $key => $document) { - $lookupKey = $upsertTenantPerDocument - ? $document->getTenant() . ':' . $document->getId() - : $document->getId(); - $old = $existingDocs[$lookupKey] ?? new Document(); + $old = $existingDocs[$this->tenantKey($document)] ?? new Document(); // Extract operators early to avoid comparison issues $documentArray = $document->getArrayCopy(); @@ -7471,9 +7452,7 @@ public function upsertDocumentsWithIncrease( $document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document)); } - $seenIds[] = $upsertTenantPerDocument - ? $document->getTenant() . ':' . $document->getId() - : $document->getId(); + $seenIds[] = $this->tenantKey($document); $old = $this->adapter->castingBefore($collection, $old); $document = $this->adapter->castingBefore($collection, $document); diff --git a/src/Database/Mirror.php b/src/Database/Mirror.php index 5f3ae640c..5d5777a5d 100644 --- a/src/Database/Mirror.php +++ b/src/Database/Mirror.php @@ -601,18 +601,17 @@ public function createDocuments( ?callable $onNext = null, ?callable $onError = null, ): int { - $createFn = fn () => $this->source->createDocuments( - $collection, - $documents, - $batchSize, - $onNext, - $onError, + $modified = $this->source->skipDuplicates( + fn () => $this->source->createDocuments( + $collection, + $documents, + $batchSize, + $onNext, + $onError, + ), + $this->skipDuplicates ); - $modified = $this->skipDuplicates - ? $this->source->skipDuplicates($createFn) - : $createFn(); - if ( \in_array($collection, self::SOURCE_ONLY_COLLECTIONS) || $this->destination === null @@ -643,19 +642,17 @@ public function createDocuments( $clones[] = $clone; } - $destFn = fn () => $this->destination->withPreserveDates( - fn () => - $this->destination->createDocuments( - $collection, - $clones, - $batchSize, - ) + $this->destination->skipDuplicates( + fn () => $this->destination->withPreserveDates( + fn () => $this->destination->createDocuments( + $collection, + $clones, + $batchSize, + ) + ), + $this->skipDuplicates ); - $this->skipDuplicates - ? $this->destination->skipDuplicates($destFn) - : $destFn(); - foreach ($clones as $clone) { foreach ($this->writeFilters as $filter) { $filter->afterCreateDocument( From c88c6ac52715fe87ed22eec69a27b4d640341b1a Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Sun, 12 Apr 2026 14:52:46 +0100 Subject: [PATCH 8/8] fix: guard buildUidTenantLookup against empty UID set MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a batch has only null-ID documents but non-null tenants, the previous code emitted WHERE _uid IN () AND _tenant IN (...) — invalid SQL that the !empty(binds) call-site guard failed to catch because the tenant binds kept the binds array non-empty. Return 'where' => '1=0' with empty binds when there are no UIDs, so the query matches nothing and leaves no dangling placeholders. --- src/Database/Adapter/SQL.php | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/Database/Adapter/SQL.php b/src/Database/Adapter/SQL.php index 07994dbe9..f2e84fa30 100644 --- a/src/Database/Adapter/SQL.php +++ b/src/Database/Adapter/SQL.php @@ -2506,6 +2506,15 @@ private function buildUidTenantLookup(array $documents, string $bindPrefix): arr } } + // No UIDs to match → return a clause that matches nothing without leaving dangling binds. + if (empty($placeholders)) { + return [ + 'where' => '1=0', + 'tenantSelect' => $tenantSelect, + 'binds' => [], + ]; + } + return [ 'where' => '_uid IN (' . \implode(', ', $placeholders) . ')' . $tenantFilter, 'tenantSelect' => $tenantSelect,