Skip to content

Commit df43573

Browse files
authored
Merge pull request #1537 from keboola/ms/dmd-177-ctas-om-as-api-param
DMD-177 Deduplication parameter in import
2 parents b15bdbb + cf28290 commit df43573

4 files changed

Lines changed: 261 additions & 12 deletions

File tree

apiary.apib

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3001,6 +3001,11 @@ Further information can be found in the [Developers Documentation](https://devel
30013001
+ escapedBy (optional, string) - Escape character used in the CSV file. The default value is an empty value - no escape character is used. (Note: you can specify either `enclosure` or `escapedBy` parameter, not both.)
30023002
+ columns[] (optional, string) - List of columns present in the CSV file; the first line of the file will not be treated as a header!
30033003
+ withoutHeaders (optional, boolean) - The CSV file doesn't contain headers, columns are matched by their order. If this option is used, columns option is ignored.
3004+
+ deduplicationStrategy (optional, enum[string]) - (Snowflake only) Specifies how to handle duplicate rows in the imported data.
3005+
+ Members
3006+
+ upsert - Existing rows are updated with the new data, and new rows are added.
3007+
+ insert - New rows are added, and existing rows are not updated.
3008+
+ Default: upsert
30043009
30053010
+ Request (application/json)
30063011
+ Headers

src/Keboola/StorageApi/Client.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,6 +1336,7 @@ private function writeTableOptionsPrepare($options)
13361336
'treatValuesAsNull',
13371337
'ignoredLinesCount',
13381338
'useTimestampFromDataFile',
1339+
'deduplicationStrategy',
13391340
];
13401341

13411342
$filteredOptions = array_intersect_key($options, array_flip($allowedOptions));
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Keboola\StorageApi\Options\TableImport;
6+
7+
enum DeduplicationStrategy: string
8+
{
9+
case INSERT = 'insert';
10+
case UPSERT = 'upsert';
11+
}

tests/Backend/Snowflake/TypedTableInWorkspaceTest.php

Lines changed: 244 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
namespace Keboola\Test\Backend\Snowflake;
44

55
use Keboola\Datatype\Definition\Snowflake;
6-
use Keboola\StorageApi\ClientException;
6+
use Keboola\StorageApi\Options\TableImport\DeduplicationStrategy;
77
use Keboola\StorageApi\Workspaces;
88
use Keboola\TableBackendUtils\Column\ColumnCollection;
99
use Keboola\TableBackendUtils\Column\ColumnInterface;
@@ -109,7 +109,249 @@ public function testUnloadFromWSToTypedTable(): void
109109
$this->unloadAndAssert($workspace['id'], $tableId);
110110
}
111111

112-
public function testUnloadFromWSToTypedTableCTAS(): void
112+
public function testUnloadFromWSToTypedTableWithoutDeduplication(): void
113+
{
114+
// create workspace and source table in workspace
115+
$workspace = $this->initTestWorkspace();
116+
117+
$tableId = 'Languages3';
118+
119+
$backend = WorkspaceBackendFactory::createWorkspaceBackend($workspace);
120+
$backend->dropTableIfExists($tableId);
121+
122+
$connection = $workspace['connection'];
123+
124+
$db = $backend->getDb();
125+
126+
$quotedTableId = sprintf(
127+
'"%s"."%s"',
128+
$connection['schema'],
129+
$tableId,
130+
);
131+
// exact length is required
132+
$sql = sprintf(
133+
'
134+
CREATE TABLE %s (
135+
"id" INT NOT NULL,
136+
"name" VARCHAR(16777216),
137+
PRIMARY KEY ("id")
138+
)
139+
',
140+
$quotedTableId,
141+
);
142+
$db->query($sql);
143+
$db->query(sprintf("INSERT INTO %s VALUES (1, 'john');", $quotedTableId));
144+
$db->query(sprintf("INSERT INTO %s VALUES (2, 'does');", $quotedTableId));
145+
146+
$this->unloadAndAssert($workspace['id'], $tableId);
147+
148+
// test unload from workspace with _timestamp column exist
149+
$db->query(sprintf('DROP TABLE %s', $quotedTableId));
150+
$sql = sprintf(
151+
'
152+
CREATE TABLE %s (
153+
"id" INT NOT NULL,
154+
"name" VARCHAR(16777216),
155+
PRIMARY KEY ("id")
156+
)
157+
',
158+
$quotedTableId,
159+
);
160+
$db->query($sql);
161+
$db->query(sprintf("INSERT INTO %s VALUES (1, 'john');", $quotedTableId));
162+
$db->query(sprintf("INSERT INTO %s VALUES (2, 'does');", $quotedTableId));
163+
// snowflake does not enforce primary key so this is allowed
164+
// normally such value would be deduplicated in storage
165+
// but ctas-om does not deduplicate values
166+
$db->query(sprintf("INSERT INTO %s VALUES (2, 'doesToo');", $quotedTableId));
167+
168+
$runId = $this->_client->generateRunId();
169+
$this->_client->setRunId($runId);
170+
$this->initEvents($this->_client);
171+
172+
// do full load
173+
$this->_client->writeTableAsyncDirect($this->tableId, [
174+
'dataWorkspaceId' => $workspace['id'],
175+
'dataTableName' => $tableId,
176+
'deduplicationStrategy' => DeduplicationStrategy::INSERT,
177+
]);
178+
$eventAssertCallback = function ($events) {
179+
$this->assertCount(1, $events);
180+
$this->assertArrayHasKey('performance', $events[0]);
181+
$this->assertIsArray($events[0]['performance']);
182+
$this->assertArrayHasKey('importDecomposed', $events[0]['performance']);
183+
$this->assertCount(1, $events[0]['performance']['importDecomposed']);
184+
$this->assertArrayHasKey('name', $events[0]['performance']['importDecomposed'][0]);
185+
// there is only one stage in full load
186+
$this->assertSame('ctasLoad', $events[0]['performance']['importDecomposed'][0]['name']);
187+
188+
// set event as last event
189+
$this->lastEventId = $events[0]['uuid'];
190+
};
191+
$query = new EventsQueryBuilder();
192+
$query->setEvent('storage.tableImportDone')
193+
->setTokenId($this->tokenId)
194+
->setRunId($runId);
195+
$this->assertEventWithRetries($this->_client, $eventAssertCallback, $query);
196+
197+
$expectedFullLoad = [
198+
[
199+
[
200+
'columnName' => 'id',
201+
'value' => '1',
202+
'isTruncated' => false,
203+
],
204+
[
205+
'columnName' => 'name',
206+
'value' => 'john',
207+
'isTruncated' => false,
208+
],
209+
],
210+
[
211+
[
212+
'columnName' => 'id',
213+
'value' => '2',
214+
'isTruncated' => false,
215+
],
216+
[
217+
'columnName' => 'name',
218+
'value' => 'does',
219+
'isTruncated' => false,
220+
],
221+
],
222+
[
223+
[
224+
'columnName' => 'id',
225+
'value' => '2',
226+
'isTruncated' => false,
227+
],
228+
[
229+
'columnName' => 'name',
230+
'value' => 'doesToo',
231+
'isTruncated' => false,
232+
],
233+
],
234+
];
235+
236+
/** @var array $data */
237+
$data = $this->_client->getTableDataPreview($this->tableId, [
238+
'format' => 'json',
239+
]);
240+
self::assertEquals($expectedFullLoad, $data['rows']);
241+
242+
// do incremental load
243+
// incremental load also does not deduplicate values
244+
// this will cause that all values are present twice
245+
$this->_client->writeTableAsyncDirect($this->tableId, [
246+
'dataWorkspaceId' => $workspace['id'],
247+
'dataTableName' => $tableId,
248+
'incremental' => true,
249+
'deduplicationStrategy' => DeduplicationStrategy::INSERT->value,
250+
]);
251+
252+
$eventAssertCallback = function ($events) {
253+
$this->assertCount(1, $events);
254+
$this->assertArrayHasKey('performance', $events[0]);
255+
$this->assertIsArray($events[0]['performance']);
256+
$this->assertArrayHasKey('importDecomposed', $events[0]['performance']);
257+
$this->assertCount(1, $events[0]['performance']['importDecomposed']);
258+
$this->assertArrayHasKey('name', $events[0]['performance']['importDecomposed'][0]);
259+
// there is only one stage in incremental load
260+
$this->assertSame('insertIntoTargetFromStaging', $events[0]['performance']['importDecomposed'][0]['name']);
261+
};
262+
$query = new EventsQueryBuilder();
263+
$query->setEvent('storage.tableImportDone')
264+
->setTokenId($this->tokenId)
265+
->setRunId($runId);
266+
$this->assertEventWithRetries($this->_client, $eventAssertCallback, $query);
267+
268+
$expectedIncrementalLoad = [
269+
[
270+
[
271+
'columnName' => 'id',
272+
'value' => '1',
273+
'isTruncated' => false,
274+
],
275+
[
276+
'columnName' => 'name',
277+
'value' => 'john',
278+
'isTruncated' => false,
279+
],
280+
],
281+
[
282+
[
283+
'columnName' => 'id',
284+
'value' => '2',
285+
'isTruncated' => false,
286+
],
287+
[
288+
'columnName' => 'name',
289+
'value' => 'does',
290+
'isTruncated' => false,
291+
],
292+
],
293+
[
294+
[
295+
'columnName' => 'id',
296+
'value' => '2',
297+
'isTruncated' => false,
298+
],
299+
[
300+
'columnName' => 'name',
301+
'value' => 'doesToo',
302+
'isTruncated' => false,
303+
],
304+
],
305+
[
306+
[
307+
'columnName' => 'id',
308+
'value' => '1',
309+
'isTruncated' => false,
310+
],
311+
[
312+
'columnName' => 'name',
313+
'value' => 'john',
314+
'isTruncated' => false,
315+
],
316+
],
317+
[
318+
[
319+
'columnName' => 'id',
320+
'value' => '2',
321+
'isTruncated' => false,
322+
],
323+
[
324+
'columnName' => 'name',
325+
'value' => 'does',
326+
'isTruncated' => false,
327+
],
328+
],
329+
[
330+
[
331+
'columnName' => 'id',
332+
'value' => '2',
333+
'isTruncated' => false,
334+
],
335+
[
336+
'columnName' => 'name',
337+
'value' => 'doesToo',
338+
'isTruncated' => false,
339+
],
340+
],
341+
];
342+
343+
/** @var array $data */
344+
$data = $this->_client->getTableDataPreview($this->tableId, [
345+
'format' => 'json',
346+
]);
347+
self::assertEquals($expectedIncrementalLoad, $data['rows']);
348+
}
349+
350+
/**
351+
* "Copy" of until there is a "ctas-om" feature.
352+
* @see TypedTableInWorkspaceTest::testUnloadFromWSToTypedTableWithoutDeduplication
353+
*/
354+
public function testUnloadFromWSToTypedTableWithoutDeduplicationByFeature(): void
113355
{
114356
if (!in_array('ctas-om', $this->_client->verifyToken()['owner']['features'], true)) {
115357
$this->markTestSkipped(
@@ -348,16 +590,6 @@ public function testUnloadFromWSToTypedTableCTAS(): void
348590
'format' => 'json',
349591
]);
350592
self::assertEquals($expectedIncrementalLoad, $data['rows']);
351-
352-
$this->expectException(ClientException::class);
353-
// create table does not support typed tables
354-
// tables are created by runner and this is never called
355-
$this->expectExceptionMessage('Table import error: Source destination columns mismatch. "id NUMBER (38,0) NOT NULL"->"id VARCHAR NOT NULL DEFAULT \'\'');
356-
$this->_client->createTableAsyncDirect($this->getTestBucketId(self::STAGE_IN), [
357-
'name' => 'languagesNew',
358-
'dataWorkspaceId' => $workspace['id'],
359-
'dataObject' => $tableId,
360-
]);
361593
}
362594

363595
private function unloadAndAssert(int $id, string $tableId): void

0 commit comments

Comments
 (0)