Skip to content

Commit 2b4fd1e

Browse files
committed
Fix MongoDB transaction safety, tenant-aware dedup, and empty ID filtering
- Pre-filter duplicates inside MongoDB session instead of bypassing the transaction, ensuring ignore mode works correctly within transactions - Use tenant-aware composite keys for intra-batch dedup in tenant-per-document mode - Add array_filter() to tenant prefetch paths to skip empty IDs, consistent with non-tenant paths
1 parent 287420b commit 2b4fd1e

2 files changed

Lines changed: 45 additions & 24 deletions

File tree

src/Database/Adapter/Mongo.php

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1463,14 +1463,7 @@ public function castingBefore(Document $collection, Document $document): Documen
14631463
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
14641464
{
14651465
$name = $this->getNamespace() . '_' . $this->filter($collection->getId());
1466-
1467-
if ($ignore) {
1468-
// Run outside transaction — MongoDB aborts transactions on any write error,
1469-
// so ordered:false + session would roll back even successfully inserted docs.
1470-
$options = ['ordered' => false];
1471-
} else {
1472-
$options = $this->getTransactionOptions();
1473-
}
1466+
$options = $this->getTransactionOptions();
14741467

14751468
$records = [];
14761469
$hasSequence = null;
@@ -1494,19 +1487,42 @@ public function createDocuments(Document $collection, array $documents, bool $ig
14941487
$records[] = $record;
14951488
}
14961489

1497-
try {
1498-
$documents = $this->client->insertMany($name, $records, $options);
1499-
} catch (MongoException $e) {
1500-
$processed = $this->processException($e);
1490+
// In ignore mode, pre-filter duplicates within the same session to avoid
1491+
// BulkWriteException which would abort the transaction.
1492+
if ($ignore && !empty($records)) {
1493+
$uids = \array_filter(\array_map(fn ($r) => $r['_uid'] ?? null, $records));
1494+
if (!empty($uids)) {
1495+
$findOptions = $this->getTransactionOptions(['projection' => ['_uid' => 1]]);
1496+
$result = $this->client->find($name, ['_uid' => ['$in' => \array_values($uids)]], $findOptions);
1497+
$existingUids = [];
1498+
foreach ($result->cursor->firstBatch ?? [] as $doc) {
1499+
$existingUids[$doc->_uid] = true;
1500+
}
15011501

1502-
if ($ignore && $processed instanceof DuplicateException) {
1503-
// Race condition: a doc was inserted between pre-filter and insertMany.
1504-
// With ordered:false outside transaction, non-duplicate inserts persist.
1505-
// Return empty — we cannot determine which docs succeeded without querying.
1502+
if (!empty($existingUids)) {
1503+
$filteredRecords = [];
1504+
$filteredDocuments = [];
1505+
foreach ($records as $i => $record) {
1506+
$uid = $record['_uid'] ?? '';
1507+
if (!isset($existingUids[$uid])) {
1508+
$filteredRecords[] = $record;
1509+
$filteredDocuments[] = $documents[$i];
1510+
}
1511+
}
1512+
$records = $filteredRecords;
1513+
$documents = $filteredDocuments;
1514+
}
1515+
}
1516+
1517+
if (empty($records)) {
15061518
return [];
15071519
}
1520+
}
15081521

1509-
throw $processed;
1522+
try {
1523+
$documents = $this->client->insertMany($name, $records, $options);
1524+
} catch (MongoException $e) {
1525+
throw $this->processException($e);
15101526
}
15111527

15121528
foreach ($documents as $index => $document) {

src/Database/Database.php

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5655,18 +5655,24 @@ public function createDocuments(
56555655
$time = DateTime::now();
56565656
$modified = 0;
56575657

5658+
$tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument();
5659+
56585660
// Deduplicate intra-batch documents by ID when ignore mode is on.
56595661
// Keeps the first occurrence, mirrors upsertDocuments' seenIds check.
5662+
// In tenant-per-document mode, dedupe by tenant+id to allow same ID across tenants.
56605663
if ($ignore) {
56615664
$seenIds = [];
56625665
$deduplicated = [];
56635666
foreach ($documents as $document) {
56645667
$docId = $document->getId();
5665-
if ($docId !== '' && isset($seenIds[$docId])) {
5666-
continue;
5667-
}
56685668
if ($docId !== '') {
5669-
$seenIds[$docId] = true;
5669+
$dedupeKey = $tenantPerDocument
5670+
? $document->getTenant() . ':' . $docId
5671+
: $docId;
5672+
if (isset($seenIds[$dedupeKey])) {
5673+
continue;
5674+
}
5675+
$seenIds[$dedupeKey] = true;
56705676
}
56715677
$deduplicated[] = $document;
56725678
}
@@ -5676,15 +5682,14 @@ public function createDocuments(
56765682
// When ignore mode is on and relationships are being resolved,
56775683
// pre-fetch existing document IDs so we skip relationship writes for duplicates
56785684
$preExistingIds = [];
5679-
$tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument();
56805685
if ($ignore) {
56815686
if ($tenantPerDocument) {
56825687
$idsByTenant = [];
56835688
foreach ($documents as $doc) {
56845689
$idsByTenant[$doc->getTenant()][] = $doc->getId();
56855690
}
56865691
foreach ($idsByTenant as $tenant => $tenantIds) {
5687-
$tenantIds = \array_values(\array_unique($tenantIds));
5692+
$tenantIds = \array_values(\array_unique(\array_filter($tenantIds)));
56885693
foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) {
56895694
$existing = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find(
56905695
$collection->getId(),
@@ -7219,7 +7224,7 @@ public function upsertDocumentsWithIncrease(
72197224
$idsByTenant[$tenant][] = $doc->getId();
72207225
}
72217226
foreach ($idsByTenant as $tenant => $tenantIds) {
7222-
$tenantIds = \array_values(\array_unique($tenantIds));
7227+
$tenantIds = \array_values(\array_unique(\array_filter($tenantIds)));
72237228
foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) {
72247229
$fetched = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find(
72257230
$collection->getId(),

0 commit comments

Comments
 (0)