Skip to content

Commit cf28290

Browse files
committed
test(table import): Deduplication strategy by feature "ctas-om"
- DMD-177 - Until the deduplication setting by parameter is fully implemented, we still need to maintain the original deduplication strategy by feature flag.
1 parent 5fc0739 commit cf28290

1 file changed

Lines changed: 246 additions & 1 deletion

File tree

tests/Backend/Snowflake/TypedTableInWorkspaceTest.php

Lines changed: 246 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ public function testUnloadFromWSToTypedTableWithoutDeduplication(): void
173173
$this->_client->writeTableAsyncDirect($this->tableId, [
174174
'dataWorkspaceId' => $workspace['id'],
175175
'dataTableName' => $tableId,
176-
'deduplicationStrategy' => DeduplicationStrategy::INSERT->value,
176+
'deduplicationStrategy' => DeduplicationStrategy::INSERT,
177177
]);
178178
$eventAssertCallback = function ($events) {
179179
$this->assertCount(1, $events);
@@ -347,6 +347,251 @@ public function testUnloadFromWSToTypedTableWithoutDeduplication(): void
347347
self::assertEquals($expectedIncrementalLoad, $data['rows']);
348348
}
349349

350+
/**
351+
* "Copy" of until there is a "ctas-om" feature.
352+
* @see TypedTableInWorkspaceTest::testUnloadFromWSToTypedTableWithoutDeduplication
353+
*/
354+
public function testUnloadFromWSToTypedTableWithoutDeduplicationByFeature(): void
355+
{
356+
if (!in_array('ctas-om', $this->_client->verifyToken()['owner']['features'], true)) {
357+
$this->markTestSkipped(
358+
'CTAS is not enabled for this project, skipping test.',
359+
);
360+
}
361+
// create workspace and source table in workspace
362+
$workspace = $this->initTestWorkspace();
363+
364+
$tableId = 'Languages3';
365+
366+
$backend = WorkspaceBackendFactory::createWorkspaceBackend($workspace);
367+
$backend->dropTableIfExists($tableId);
368+
369+
$connection = $workspace['connection'];
370+
371+
$db = $backend->getDb();
372+
373+
$quotedTableId = sprintf(
374+
'"%s"."%s"',
375+
$connection['schema'],
376+
$tableId,
377+
);
378+
// exact length is required
379+
$sql = sprintf(
380+
'
381+
CREATE TABLE %s (
382+
"id" INT NOT NULL,
383+
"name" VARCHAR(16777216),
384+
PRIMARY KEY ("id")
385+
)
386+
',
387+
$quotedTableId,
388+
);
389+
$db->query($sql);
390+
$db->query(sprintf("INSERT INTO %s VALUES (1, 'john');", $quotedTableId));
391+
$db->query(sprintf("INSERT INTO %s VALUES (2, 'does');", $quotedTableId));
392+
393+
$this->unloadAndAssert($workspace['id'], $tableId);
394+
395+
// test unload from workspace with _timestamp column exist
396+
$db->query(sprintf('DROP TABLE %s', $quotedTableId));
397+
$sql = sprintf(
398+
'
399+
CREATE TABLE %s (
400+
"id" INT NOT NULL,
401+
"name" VARCHAR(16777216),
402+
PRIMARY KEY ("id")
403+
)
404+
',
405+
$quotedTableId,
406+
);
407+
$db->query($sql);
408+
$db->query(sprintf("INSERT INTO %s VALUES (1, 'john');", $quotedTableId));
409+
$db->query(sprintf("INSERT INTO %s VALUES (2, 'does');", $quotedTableId));
410+
// snowflake does not enforce primary key so this is allowed
411+
// normally such value would be deduplicated in storage
412+
// but ctas-om does not deduplicate values
413+
$db->query(sprintf("INSERT INTO %s VALUES (2, 'doesToo');", $quotedTableId));
414+
415+
$runId = $this->_client->generateRunId();
416+
$this->_client->setRunId($runId);
417+
$this->initEvents($this->_client);
418+
419+
// do full load
420+
$this->_client->writeTableAsyncDirect($this->tableId, [
421+
'dataWorkspaceId' => $workspace['id'],
422+
'dataTableName' => $tableId,
423+
]);
424+
$eventAssertCallback = function ($events) {
425+
$this->assertCount(1, $events);
426+
$this->assertArrayHasKey('performance', $events[0]);
427+
$this->assertIsArray($events[0]['performance']);
428+
$this->assertArrayHasKey('importDecomposed', $events[0]['performance']);
429+
$this->assertCount(1, $events[0]['performance']['importDecomposed']);
430+
$this->assertArrayHasKey('name', $events[0]['performance']['importDecomposed'][0]);
431+
// there is only one stage in full load
432+
$this->assertSame('ctasLoad', $events[0]['performance']['importDecomposed'][0]['name']);
433+
434+
// set event as last event
435+
$this->lastEventId = $events[0]['uuid'];
436+
};
437+
$query = new EventsQueryBuilder();
438+
$query->setEvent('storage.tableImportDone')
439+
->setTokenId($this->tokenId)
440+
->setRunId($runId);
441+
$this->assertEventWithRetries($this->_client, $eventAssertCallback, $query);
442+
443+
$expectedFullLoad = [
444+
[
445+
[
446+
'columnName' => 'id',
447+
'value' => '1',
448+
'isTruncated' => false,
449+
],
450+
[
451+
'columnName' => 'name',
452+
'value' => 'john',
453+
'isTruncated' => false,
454+
],
455+
],
456+
[
457+
[
458+
'columnName' => 'id',
459+
'value' => '2',
460+
'isTruncated' => false,
461+
],
462+
[
463+
'columnName' => 'name',
464+
'value' => 'does',
465+
'isTruncated' => false,
466+
],
467+
],
468+
[
469+
[
470+
'columnName' => 'id',
471+
'value' => '2',
472+
'isTruncated' => false,
473+
],
474+
[
475+
'columnName' => 'name',
476+
'value' => 'doesToo',
477+
'isTruncated' => false,
478+
],
479+
],
480+
];
481+
482+
/** @var array $data */
483+
$data = $this->_client->getTableDataPreview($this->tableId, [
484+
'format' => 'json',
485+
]);
486+
self::assertEquals($expectedFullLoad, $data['rows']);
487+
488+
// do incremental load
489+
// incremental load also does not deduplicate values
490+
// this will cause that all values are present twice
491+
$this->_client->writeTableAsyncDirect($this->tableId, [
492+
'dataWorkspaceId' => $workspace['id'],
493+
'dataTableName' => $tableId,
494+
'incremental' => true,
495+
]);
496+
497+
$eventAssertCallback = function ($events) {
498+
$this->assertCount(1, $events);
499+
$this->assertArrayHasKey('performance', $events[0]);
500+
$this->assertIsArray($events[0]['performance']);
501+
$this->assertArrayHasKey('importDecomposed', $events[0]['performance']);
502+
$this->assertCount(1, $events[0]['performance']['importDecomposed']);
503+
$this->assertArrayHasKey('name', $events[0]['performance']['importDecomposed'][0]);
504+
// there is only one stage in incremental load
505+
$this->assertSame('insertIntoTargetFromStaging', $events[0]['performance']['importDecomposed'][0]['name']);
506+
};
507+
$query = new EventsQueryBuilder();
508+
$query->setEvent('storage.tableImportDone')
509+
->setTokenId($this->tokenId)
510+
->setRunId($runId);
511+
$this->assertEventWithRetries($this->_client, $eventAssertCallback, $query);
512+
513+
$expectedIncrementalLoad = [
514+
[
515+
[
516+
'columnName' => 'id',
517+
'value' => '1',
518+
'isTruncated' => false,
519+
],
520+
[
521+
'columnName' => 'name',
522+
'value' => 'john',
523+
'isTruncated' => false,
524+
],
525+
],
526+
[
527+
[
528+
'columnName' => 'id',
529+
'value' => '2',
530+
'isTruncated' => false,
531+
],
532+
[
533+
'columnName' => 'name',
534+
'value' => 'does',
535+
'isTruncated' => false,
536+
],
537+
],
538+
[
539+
[
540+
'columnName' => 'id',
541+
'value' => '2',
542+
'isTruncated' => false,
543+
],
544+
[
545+
'columnName' => 'name',
546+
'value' => 'doesToo',
547+
'isTruncated' => false,
548+
],
549+
],
550+
[
551+
[
552+
'columnName' => 'id',
553+
'value' => '1',
554+
'isTruncated' => false,
555+
],
556+
[
557+
'columnName' => 'name',
558+
'value' => 'john',
559+
'isTruncated' => false,
560+
],
561+
],
562+
[
563+
[
564+
'columnName' => 'id',
565+
'value' => '2',
566+
'isTruncated' => false,
567+
],
568+
[
569+
'columnName' => 'name',
570+
'value' => 'does',
571+
'isTruncated' => false,
572+
],
573+
],
574+
[
575+
[
576+
'columnName' => 'id',
577+
'value' => '2',
578+
'isTruncated' => false,
579+
],
580+
[
581+
'columnName' => 'name',
582+
'value' => 'doesToo',
583+
'isTruncated' => false,
584+
],
585+
],
586+
];
587+
588+
/** @var array $data */
589+
$data = $this->_client->getTableDataPreview($this->tableId, [
590+
'format' => 'json',
591+
]);
592+
self::assertEquals($expectedIncrementalLoad, $data['rows']);
593+
}
594+
350595
private function unloadAndAssert(int $id, string $tableId): void
351596
{
352597
// should be OK tables types are matching

0 commit comments

Comments
 (0)