Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Database/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -729,12 +729,13 @@ abstract public function createDocument(Document $collection, Document $document
*
* @param Document $collection
* @param array<Document> $documents
* @param bool $ignore If true, silently ignore duplicate documents instead of throwing
*
* @return array<Document>
*
* @throws DatabaseException
*/
abstract public function createDocuments(Document $collection, array $documents): array;
abstract public function createDocuments(Document $collection, array $documents, bool $ignore = false): array;

/**
* Update Document
Expand Down
93 changes: 91 additions & 2 deletions src/Database/Adapter/Mongo.php
Original file line number Diff line number Diff line change
Expand Up @@ -1460,11 +1460,11 @@ public function castingBefore(Document $collection, Document $document): Documen
* @throws DuplicateException
* @throws DatabaseException
*/
public function createDocuments(Document $collection, array $documents): array
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
{
$name = $this->getNamespace() . '_' . $this->filter($collection->getId());

$options = $this->getTransactionOptions();

$records = [];
$hasSequence = null;
$documents = \array_map(fn ($doc) => clone $doc, $documents);
Expand All @@ -1487,6 +1487,95 @@ public function createDocuments(Document $collection, array $documents): array
$records[] = $record;
}

// Pre-filter duplicates within the session to avoid aborting the transaction.
if ($ignore && !empty($records)) {
$existingKeys = [];

try {
if ($this->sharedTables && $this->tenantPerDocument) {
$idsByTenant = [];
foreach ($records as $record) {
$uid = $record['_uid'] ?? '';
if ($uid === '') {
continue;
}
$tenant = $record['_tenant'] ?? $this->getTenant();
$idsByTenant[$tenant][] = $uid;
}

foreach ($idsByTenant as $tenant => $tenantUids) {
$tenantUids = \array_values(\array_unique($tenantUids));
$findOptions = $this->getTransactionOptions([
'projection' => ['_uid' => 1],
'batchSize' => \count($tenantUids),
]);
$filters = ['_uid' => ['$in' => $tenantUids], '_tenant' => $tenant];
$response = $this->client->find($name, $filters, $findOptions);
foreach ($response->cursor->firstBatch ?? [] as $doc) {
$existingKeys[$tenant . ':' . $doc->_uid] = true;
}
$cursorId = $response->cursor->id ?? 0;
while ($cursorId != 0) {
$more = $this->client->getMore($cursorId, $name, \count($tenantUids));
foreach ($more->cursor->nextBatch ?? [] as $doc) {
$existingKeys[$tenant . ':' . $doc->_uid] = true;
}
$cursorId = $more->cursor->id ?? 0;
}
}
} else {
$uids = \array_filter(\array_map(fn ($r) => $r['_uid'] ?? null, $records), fn ($v) => $v !== null);
if (!empty($uids)) {
$uidValues = \array_values(\array_unique($uids));
$findOptions = $this->getTransactionOptions([
'projection' => ['_uid' => 1],
'batchSize' => \count($uidValues),
]);
$filters = ['_uid' => ['$in' => $uidValues]];
if ($this->sharedTables) {
$filters['_tenant'] = $this->getTenantFilters($collection->getId());
}
$response = $this->client->find($name, $filters, $findOptions);
foreach ($response->cursor->firstBatch ?? [] as $doc) {
$existingKeys[$doc->_uid] = true;
}
$cursorId = $response->cursor->id ?? 0;
while ($cursorId != 0) {
$more = $this->client->getMore($cursorId, $name, \count($uidValues));
foreach ($more->cursor->nextBatch ?? [] as $doc) {
$existingKeys[$doc->_uid] = true;
}
$cursorId = $more->cursor->id ?? 0;
}
}
}
} catch (MongoException $e) {
throw $this->processException($e);
}

if (!empty($existingKeys)) {
$filteredRecords = [];
$filteredDocuments = [];
$tenantPerDoc = $this->sharedTables && $this->tenantPerDocument;
foreach ($records as $i => $record) {
$uid = $record['_uid'] ?? '';
$key = $tenantPerDoc
? ($record['_tenant'] ?? $this->getTenant()) . ':' . $uid
: $uid;
if (!isset($existingKeys[$key])) {
$filteredRecords[] = $record;
$filteredDocuments[] = $documents[$i];
}
}
$records = $filteredRecords;
$documents = $filteredDocuments;
}

if (empty($records)) {
return [];
}
}

try {
$documents = $this->client->insertMany($name, $records, $options);
} catch (MongoException $e) {
Expand Down
2 changes: 1 addition & 1 deletion src/Database/Adapter/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public function createDocument(Document $collection, Document $document): Docume
return $this->delegate(__FUNCTION__, \func_get_args());
}

public function createDocuments(Document $collection, array $documents): array
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
{
return $this->delegate(__FUNCTION__, \func_get_args());
}
Expand Down
29 changes: 29 additions & 0 deletions src/Database/Adapter/Postgres.php
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,35 @@ public function updateDocument(Document $collection, string $id, Document $docum
return $document;
}

protected function getInsertKeyword(bool $ignore): string
{
return 'INSERT INTO';
}

protected function getInsertSuffix(bool $ignore, string $table): string
{
if (!$ignore) {
return '';
}

$conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

protected function getInsertPermissionsSuffix(bool $ignore): string
{
if (!$ignore) {
return '';
}

$conflictTarget = $this->sharedTables
? '("_type", "_permission", "_document", "_tenant")'
: '("_type", "_permission", "_document")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

/**
* @param string $tableName
* @param string $columns
Expand Down
Loading
Loading