Skip to content

Commit 287420b

Browse files
committed
Add ignore param to createDocuments for silent duplicate handling
- Add `bool $ignore` param to `createDocuments` across all adapters (MariaDB: INSERT IGNORE, Postgres: ON CONFLICT DO NOTHING, SQLite: INSERT OR IGNORE, MongoDB: ordered:false outside transaction) - Pre-fetch existing document IDs to skip relationship writes and filter known duplicates from adapter batches - Deduplicate intra-batch documents by ID (first occurrence wins) to prevent ACL drift and overcounted results - Handle tenant-per-document mode with composite keys for pre-fetch - Chunk all find queries by maxQueryValues - Optimize upsertDocuments: batch-fetch existing docs with find instead of per-row getDocument calls (~2x speedup) - Add ignore param to Mirror::createDocuments - Add e2e tests: mixed duplicates, all duplicates, intra-batch duplicates with onNext callback assertions
1 parent 72ee161 commit 287420b

9 files changed

Lines changed: 421 additions & 22 deletions

File tree

src/Database/Adapter.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,12 +729,13 @@ abstract public function createDocument(Document $collection, Document $document
729729
*
730730
* @param Document $collection
731731
* @param array<Document> $documents
732+
* @param bool $ignore If true, silently ignore duplicate documents instead of throwing
732733
*
733734
* @return array<Document>
734735
*
735736
* @throws DatabaseException
736737
*/
737-
abstract public function createDocuments(Document $collection, array $documents): array;
738+
abstract public function createDocuments(Document $collection, array $documents, bool $ignore = false): array;
738739

739740
/**
740741
* Update Document

src/Database/Adapter/Mongo.php

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,11 +1460,18 @@ public function castingBefore(Document $collection, Document $document): Documen
14601460
* @throws DuplicateException
14611461
* @throws DatabaseException
14621462
*/
1463-
public function createDocuments(Document $collection, array $documents): array
1463+
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
14641464
{
14651465
$name = $this->getNamespace() . '_' . $this->filter($collection->getId());
14661466

1467-
$options = $this->getTransactionOptions();
1467+
if ($ignore) {
1468+
// Run outside transaction — MongoDB aborts transactions on any write error,
1469+
// so ordered:false + session would roll back even successfully inserted docs.
1470+
$options = ['ordered' => false];
1471+
} else {
1472+
$options = $this->getTransactionOptions();
1473+
}
1474+
14681475
$records = [];
14691476
$hasSequence = null;
14701477
$documents = \array_map(fn ($doc) => clone $doc, $documents);
@@ -1490,7 +1497,16 @@ public function createDocuments(Document $collection, array $documents): array
14901497
try {
14911498
$documents = $this->client->insertMany($name, $records, $options);
14921499
} catch (MongoException $e) {
1493-
throw $this->processException($e);
1500+
$processed = $this->processException($e);
1501+
1502+
if ($ignore && $processed instanceof DuplicateException) {
1503+
// Race condition: a doc was inserted between pre-filter and insertMany.
1504+
// With ordered:false outside transaction, non-duplicate inserts persist.
1505+
// Return empty — we cannot determine which docs succeeded without querying.
1506+
return [];
1507+
}
1508+
1509+
throw $processed;
14941510
}
14951511

14961512
foreach ($documents as $index => $document) {

src/Database/Adapter/Pool.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ public function createDocument(Document $collection, Document $document): Docume
268268
return $this->delegate(__FUNCTION__, \func_get_args());
269269
}
270270

271-
public function createDocuments(Document $collection, array $documents): array
271+
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
272272
{
273273
return $this->delegate(__FUNCTION__, \func_get_args());
274274
}

src/Database/Adapter/Postgres.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1365,6 +1365,35 @@ public function updateDocument(Document $collection, string $id, Document $docum
13651365
return $document;
13661366
}
13671367

1368+
protected function getInsertKeyword(bool $ignore): string
1369+
{
1370+
return 'INSERT INTO';
1371+
}
1372+
1373+
protected function getInsertSuffix(bool $ignore, string $table): string
1374+
{
1375+
if (!$ignore) {
1376+
return '';
1377+
}
1378+
1379+
$conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")';
1380+
1381+
return "ON CONFLICT {$conflictTarget} DO NOTHING";
1382+
}
1383+
1384+
protected function getInsertPermissionsSuffix(bool $ignore): string
1385+
{
1386+
if (!$ignore) {
1387+
return '';
1388+
}
1389+
1390+
$conflictTarget = $this->sharedTables
1391+
? '("_type", "_permission", "_document", "_tenant")'
1392+
: '("_type", "_permission", "_document")';
1393+
1394+
return "ON CONFLICT {$conflictTarget} DO NOTHING";
1395+
}
1396+
13681397
/**
13691398
* @param string $tableName
13701399
* @param string $columns

src/Database/Adapter/SQL.php

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2471,7 +2471,7 @@ protected function execute(mixed $stmt): bool
24712471
* @throws DuplicateException
24722472
* @throws \Throwable
24732473
*/
2474-
public function createDocuments(Document $collection, array $documents): array
2474+
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
24752475
{
24762476
if (empty($documents)) {
24772477
return $documents;
@@ -2573,8 +2573,9 @@ public function createDocuments(Document $collection, array $documents): array
25732573
$batchKeys = \implode(', ', $batchKeys);
25742574

25752575
$stmt = $this->getPDO()->prepare("
2576-
INSERT INTO {$this->getSQLTable($name)} {$columns}
2576+
{$this->getInsertKeyword($ignore)} {$this->getSQLTable($name)} {$columns}
25772577
VALUES {$batchKeys}
2578+
{$this->getInsertSuffix($ignore, $name)}
25782579
");
25792580

25802581
foreach ($bindValues as $key => $value) {
@@ -2588,8 +2589,9 @@ public function createDocuments(Document $collection, array $documents): array
25882589
$permissions = \implode(', ', $permissions);
25892590

25902591
$sqlPermissions = "
2591-
INSERT INTO {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn})
2592-
VALUES {$permissions};
2592+
{$this->getInsertKeyword($ignore)} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn})
2593+
VALUES {$permissions}
2594+
{$this->getInsertPermissionsSuffix($ignore)}
25932595
";
25942596

25952597
$stmtPermissions = $this->getPDO()->prepare($sqlPermissions);
@@ -2608,6 +2610,33 @@ public function createDocuments(Document $collection, array $documents): array
26082610
return $documents;
26092611
}
26102612

2613+
/**
2614+
* Returns the INSERT keyword, optionally with IGNORE for duplicate handling.
2615+
* Override in adapter subclasses for DB-specific syntax.
2616+
*/
2617+
protected function getInsertKeyword(bool $ignore): string
2618+
{
2619+
return $ignore ? 'INSERT IGNORE INTO' : 'INSERT INTO';
2620+
}
2621+
2622+
/**
2623+
* Returns a suffix appended after VALUES clause for duplicate handling.
2624+
* Override in adapter subclasses (e.g., Postgres uses ON CONFLICT DO NOTHING).
2625+
*/
2626+
protected function getInsertSuffix(bool $ignore, string $table): string
2627+
{
2628+
return '';
2629+
}
2630+
2631+
/**
2632+
* Returns a suffix for the permissions INSERT statement when ignoring duplicates.
2633+
* Override in adapter subclasses for DB-specific syntax.
2634+
*/
2635+
protected function getInsertPermissionsSuffix(bool $ignore): string
2636+
{
2637+
return '';
2638+
}
2639+
26112640
/**
26122641
* @param Document $collection
26132642
* @param string $attribute

src/Database/Adapter/SQLite.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@
3434
*/
3535
class SQLite extends MariaDB
3636
{
37+
protected function getInsertKeyword(bool $ignore): string
38+
{
39+
return $ignore ? 'INSERT OR IGNORE INTO' : 'INSERT INTO';
40+
}
41+
3742
/**
3843
* @inheritDoc
3944
*/

src/Database/Database.php

Lines changed: 133 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5621,6 +5621,7 @@ public function createDocument(string $collection, Document $document): Document
56215621
* @param int $batchSize
56225622
* @param (callable(Document): void)|null $onNext
56235623
* @param (callable(Throwable): void)|null $onError
5624+
* @param bool $ignore If true, silently ignore duplicate documents instead of throwing
56245625
* @return int
56255626
* @throws AuthorizationException
56265627
* @throws StructureException
@@ -5633,6 +5634,7 @@ public function createDocuments(
56335634
int $batchSize = self::INSERT_BATCH_SIZE,
56345635
?callable $onNext = null,
56355636
?callable $onError = null,
5637+
bool $ignore = false,
56365638
): int {
56375639
if (!$this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument()) {
56385640
throw new DatabaseException('Shared tables must be enabled if tenant per document is enabled.');
@@ -5653,6 +5655,71 @@ public function createDocuments(
56535655
$time = DateTime::now();
56545656
$modified = 0;
56555657

5658+
// Deduplicate intra-batch documents by ID when ignore mode is on.
5659+
// Keeps the first occurrence, mirrors upsertDocuments' seenIds check.
5660+
if ($ignore) {
5661+
$seenIds = [];
5662+
$deduplicated = [];
5663+
foreach ($documents as $document) {
5664+
$docId = $document->getId();
5665+
if ($docId !== '' && isset($seenIds[$docId])) {
5666+
continue;
5667+
}
5668+
if ($docId !== '') {
5669+
$seenIds[$docId] = true;
5670+
}
5671+
$deduplicated[] = $document;
5672+
}
5673+
$documents = $deduplicated;
5674+
}
5675+
5676+
// When ignore mode is on and relationships are being resolved,
5677+
// pre-fetch existing document IDs so we skip relationship writes for duplicates
5678+
$preExistingIds = [];
5679+
$tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument();
5680+
if ($ignore) {
5681+
if ($tenantPerDocument) {
5682+
$idsByTenant = [];
5683+
foreach ($documents as $doc) {
5684+
$idsByTenant[$doc->getTenant()][] = $doc->getId();
5685+
}
5686+
foreach ($idsByTenant as $tenant => $tenantIds) {
5687+
$tenantIds = \array_values(\array_unique($tenantIds));
5688+
foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) {
5689+
$existing = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find(
5690+
$collection->getId(),
5691+
[
5692+
Query::equal('$id', $idChunk),
5693+
Query::select(['$id']),
5694+
Query::limit(\count($idChunk)),
5695+
]
5696+
))));
5697+
foreach ($existing as $doc) {
5698+
$preExistingIds[$tenant . ':' . $doc->getId()] = true;
5699+
}
5700+
}
5701+
}
5702+
} else {
5703+
$inputIds = \array_values(\array_unique(\array_filter(
5704+
\array_map(fn (Document $doc) => $doc->getId(), $documents)
5705+
)));
5706+
5707+
foreach (\array_chunk($inputIds, \max(1, $this->maxQueryValues)) as $idChunk) {
5708+
$existing = $this->authorization->skip(fn () => $this->silent(fn () => $this->find(
5709+
$collection->getId(),
5710+
[
5711+
Query::equal('$id', $idChunk),
5712+
Query::select(['$id']),
5713+
Query::limit(\count($idChunk)),
5714+
]
5715+
)));
5716+
foreach ($existing as $doc) {
5717+
$preExistingIds[$doc->getId()] = true;
5718+
}
5719+
}
5720+
}
5721+
}
5722+
56565723
foreach ($documents as $document) {
56575724
$createdAt = $document->getCreatedAt();
56585725
$updatedAt = $document->getUpdatedAt();
@@ -5693,15 +5760,33 @@ public function createDocuments(
56935760
}
56945761

56955762
if ($this->resolveRelationships) {
5696-
$document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document));
5763+
$preExistKey = $tenantPerDocument
5764+
? $document->getTenant() . ':' . $document->getId()
5765+
: $document->getId();
5766+
5767+
if (!isset($preExistingIds[$preExistKey])) {
5768+
$document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document));
5769+
}
56975770
}
56985771

56995772
$document = $this->adapter->castingBefore($collection, $document);
57005773
}
57015774

57025775
foreach (\array_chunk($documents, $batchSize) as $chunk) {
5703-
$batch = $this->withTransaction(function () use ($collection, $chunk) {
5704-
return $this->adapter->createDocuments($collection, $chunk);
5776+
if ($ignore && !empty($preExistingIds)) {
5777+
$chunk = \array_values(\array_filter($chunk, function (Document $doc) use ($preExistingIds, $tenantPerDocument) {
5778+
$key = $tenantPerDocument
5779+
? $doc->getTenant() . ':' . $doc->getId()
5780+
: $doc->getId();
5781+
return !isset($preExistingIds[$key]);
5782+
}));
5783+
if (empty($chunk)) {
5784+
continue;
5785+
}
5786+
}
5787+
5788+
$batch = $this->withTransaction(function () use ($collection, $chunk, $ignore) {
5789+
return $this->adapter->createDocuments($collection, $chunk, $ignore);
57055790
});
57065791

57075792
$batch = $this->adapter->getSequences($collection->getId(), $batch);
@@ -7116,18 +7201,53 @@ public function upsertDocumentsWithIncrease(
71167201
$created = 0;
71177202
$updated = 0;
71187203
$seenIds = [];
7119-
foreach ($documents as $key => $document) {
7120-
if ($this->getSharedTables() && $this->getTenantPerDocument()) {
7121-
$old = $this->authorization->skip(fn () => $this->withTenant($document->getTenant(), fn () => $this->silent(fn () => $this->getDocument(
7122-
$collection->getId(),
7123-
$document->getId(),
7124-
))));
7204+
7205+
// Batch-fetch existing documents in one query instead of N individual getDocument() calls
7206+
$ids = \array_filter(\array_map(fn ($doc) => $doc->getId(), $documents));
7207+
$existingDocs = [];
7208+
$upsertTenantPerDocument = $this->getSharedTables() && $this->getTenantPerDocument();
7209+
7210+
if (!empty($ids)) {
7211+
$uniqueIds = \array_values(\array_unique($ids));
7212+
7213+
if ($upsertTenantPerDocument) {
7214+
// Group IDs by tenant and fetch each group separately
7215+
// Use composite key tenant:id to avoid cross-tenant collisions
7216+
$idsByTenant = [];
7217+
foreach ($documents as $doc) {
7218+
$tenant = $doc->getTenant();
7219+
$idsByTenant[$tenant][] = $doc->getId();
7220+
}
7221+
foreach ($idsByTenant as $tenant => $tenantIds) {
7222+
$tenantIds = \array_values(\array_unique($tenantIds));
7223+
foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) {
7224+
$fetched = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find(
7225+
$collection->getId(),
7226+
[Query::equal('$id', $idChunk), Query::limit(\count($idChunk))],
7227+
))));
7228+
foreach ($fetched as $doc) {
7229+
$existingDocs[$tenant . ':' . $doc->getId()] = $doc;
7230+
}
7231+
}
7232+
}
71257233
} else {
7126-
$old = $this->authorization->skip(fn () => $this->silent(fn () => $this->getDocument(
7127-
$collection->getId(),
7128-
$document->getId(),
7129-
)));
7234+
foreach (\array_chunk($uniqueIds, \max(1, $this->maxQueryValues)) as $idChunk) {
7235+
$fetched = $this->authorization->skip(fn () => $this->silent(fn () => $this->find(
7236+
$collection->getId(),
7237+
[Query::equal('$id', $idChunk), Query::limit(\count($idChunk))],
7238+
)));
7239+
foreach ($fetched as $doc) {
7240+
$existingDocs[$doc->getId()] = $doc;
7241+
}
7242+
}
71307243
}
7244+
}
7245+
7246+
foreach ($documents as $key => $document) {
7247+
$lookupKey = $upsertTenantPerDocument
7248+
? $document->getTenant() . ':' . $document->getId()
7249+
: $document->getId();
7250+
$old = $existingDocs[$lookupKey] ?? new Document();
71317251

71327252
// Extract operators early to avoid comparison issues
71337253
$documentArray = $document->getArrayCopy();

src/Database/Mirror.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,13 +600,15 @@ public function createDocuments(
600600
int $batchSize = self::INSERT_BATCH_SIZE,
601601
?callable $onNext = null,
602602
?callable $onError = null,
603+
bool $ignore = false,
603604
): int {
604605
$modified = $this->source->createDocuments(
605606
$collection,
606607
$documents,
607608
$batchSize,
608609
$onNext,
609610
$onError,
611+
$ignore,
610612
);
611613

612614
if (
@@ -645,6 +647,7 @@ public function createDocuments(
645647
$collection,
646648
$clones,
647649
$batchSize,
650+
ignore: $ignore,
648651
)
649652
);
650653

0 commit comments

Comments
 (0)