Skip to content

Commit 28e55f3

Browse files
[PECO-964] Initial Staging Ingestion implementation (#164)
* Initial commit * Initial commit * Got test working except delete (on current dbr version) * Code style; remove irrelevand changes; replace axios with node-fetch (avoid new dependency) Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Remove accidentally added comment Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Remove leftover file; minor fixes to code Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Check for isStagingOperation flag before processing staging operation Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Refine code, extract staging operation processing to a separate method Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Remove duplicate test Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Make CI pass; minor code polishing Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Fix: check operation status before getting its metadata Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Update client protocol version; improve staging operations handling Signed-off-by: Levko Kravets <levko.ne@gmail.com> * Temporarily disable ingestion tests; fix types Signed-off-by: Levko Kravets <levko.ne@gmail.com> --------- Signed-off-by: Levko Kravets <levko.ne@gmail.com> Co-authored-by: Levko Kravets <levko.ne@gmail.com>
1 parent 89ad8ec commit 28e55f3

8 files changed

Lines changed: 202 additions & 4 deletions

File tree

lib/DBSQLClient.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
147147
const driver = new HiveDriver(() => this.getClient());
148148

149149
const response = await driver.openSession({
150-
client_protocol_i64: new Int64(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6),
150+
client_protocol_i64: new Int64(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8),
151151
...getInitialNamespaceOptions(request.initialCatalog, request.initialSchema),
152152
});
153153

lib/DBSQLOperation/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,12 @@ export default class DBSQLOperation implements IOperation {
231231
return metadata.schema ?? null;
232232
}
233233

234+
public async getMetadata(): Promise<TGetResultSetMetadataResp> {
235+
await this.failIfClosed();
236+
await this.waitUntilReady();
237+
return this.fetchMetadata();
238+
}
239+
234240
private async failIfClosed(): Promise<void> {
235241
if (this.closed) {
236242
throw new OperationStateError(OperationStateErrorCode.Closed);

lib/DBSQLSession.ts

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
import * as fs from 'fs';
2+
import * as path from 'path';
13
import { stringify, NIL, parse } from 'uuid';
4+
import fetch, { HeadersInit } from 'node-fetch';
25
import {
36
TSessionHandle,
47
TStatus,
@@ -30,6 +33,7 @@ import CloseableCollection from './utils/CloseableCollection';
3033
import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
3134
import HiveDriverError from './errors/HiveDriverError';
3235
import globalConfig from './globalConfig';
36+
import StagingError from './errors/StagingError';
3337
import { DBSQLParameter, DBSQLParameterValue } from './DBSQLParameter';
3438

3539
const defaultMaxRows = 100000;
@@ -163,7 +167,110 @@ export default class DBSQLSession implements IDBSQLSession {
163167
parameters: getQueryParameters(options.namedParameters),
164168
});
165169
const response = await this.handleResponse(operationPromise);
166-
return this.createOperation(response);
170+
const operation = this.createOperation(response);
171+
172+
// If `stagingAllowedLocalPath` is provided - assume that operation possibly may be a staging operation.
173+
// To know for sure, fetch metadata and check a `isStagingOperation` flag. If it happens that it wasn't
174+
// a staging operation - not a big deal, we just fetched metadata earlier, but operation is still usable
175+
// and user can get data from it.
176+
// If `stagingAllowedLocalPath` is not provided - don't do anything to the operation. In a case of regular
177+
// operation, everything will work as usual. In a case of staging operation, it will be processed like any
178+
// other query - it will be possible to get data from it as usual, or use other operation methods.
179+
if (options.stagingAllowedLocalPath !== undefined) {
180+
const metadata = await operation.getMetadata();
181+
if (metadata.isStagingOperation) {
182+
const allowedLocalPath = Array.isArray(options.stagingAllowedLocalPath)
183+
? options.stagingAllowedLocalPath
184+
: [options.stagingAllowedLocalPath];
185+
return this.handleStagingOperation(operation, allowedLocalPath);
186+
}
187+
}
188+
return operation;
189+
}
190+
191+
private async handleStagingOperation(operation: IOperation, allowedLocalPath: Array<string>): Promise<IOperation> {
192+
type StagingResponse = {
193+
presignedUrl: string;
194+
localFile?: string;
195+
headers: HeadersInit;
196+
operation: string;
197+
};
198+
const rows = await operation.fetchAll();
199+
if (rows.length !== 1) {
200+
throw new StagingError('Staging operation: expected only one row in result');
201+
}
202+
const row = rows[0] as StagingResponse;
203+
204+
// For REMOVE operation local file is not available, so no need to validate it
205+
if (row.localFile !== undefined) {
206+
let allowOperation = false;
207+
208+
for (const filepath of allowedLocalPath) {
209+
const relativePath = path.relative(filepath, row.localFile);
210+
211+
if (!relativePath.startsWith('..') && !path.isAbsolute(relativePath)) {
212+
allowOperation = true;
213+
}
214+
}
215+
216+
if (!allowOperation) {
217+
throw new StagingError('Staging path not a subset of allowed local paths.');
218+
}
219+
}
220+
221+
const { localFile, presignedUrl, headers } = row;
222+
223+
switch (row.operation) {
224+
case 'GET':
225+
await this.handleStagingGet(localFile, presignedUrl, headers);
226+
return operation;
227+
case 'PUT':
228+
await this.handleStagingPut(localFile, presignedUrl, headers);
229+
return operation;
230+
case 'REMOVE':
231+
await this.handleStagingRemove(presignedUrl, headers);
232+
return operation;
233+
default:
234+
throw new StagingError(`Staging query operation is not supported: ${row.operation}`);
235+
}
236+
}
237+
238+
private async handleStagingGet(
239+
localFile: string | undefined,
240+
presignedUrl: string,
241+
headers: HeadersInit,
242+
): Promise<void> {
243+
if (localFile === undefined) {
244+
throw new StagingError('Local file path not provided');
245+
}
246+
const response = await fetch(presignedUrl, { method: 'GET', headers });
247+
if (!response.ok) {
248+
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
249+
}
250+
const buffer = await response.arrayBuffer();
251+
fs.writeFileSync(localFile, Buffer.from(buffer));
252+
}
253+
254+
private async handleStagingRemove(presignedUrl: string, headers: HeadersInit): Promise<void> {
255+
const response = await fetch(presignedUrl, { method: 'DELETE', headers });
256+
if (!response.ok) {
257+
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
258+
}
259+
}
260+
261+
private async handleStagingPut(
262+
localFile: string | undefined,
263+
presignedUrl: string,
264+
headers: HeadersInit,
265+
): Promise<void> {
266+
if (localFile === undefined) {
267+
throw new StagingError('Local file path not provided');
268+
}
269+
const data = fs.readFileSync(localFile);
270+
const response = await fetch(presignedUrl, { method: 'PUT', headers, body: data });
271+
if (!response.ok) {
272+
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
273+
}
167274
}
168275

169276
/**
@@ -362,7 +469,7 @@ export default class DBSQLSession implements IDBSQLSession {
362469
return new Status(response.status);
363470
}
364471

365-
private createOperation(response: OperationResponseShape): IOperation {
472+
private createOperation(response: OperationResponseShape): DBSQLOperation {
366473
Status.assert(response.status);
367474
const handle = definedOrError(response.operationHandle);
368475
const operation = new DBSQLOperation(

lib/contracts/IDBSQLSession.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export type ExecuteStatementOptions = {
1212
runAsync?: boolean;
1313
maxRows?: number | null;
1414
useCloudFetch?: boolean;
15+
stagingAllowedLocalPath?: string | string[];
1516
namedParameters?: Record<string, DBSQLParameter | DBSQLParameterValue>;
1617
};
1718

lib/errors/OperationStateError.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export default class OperationStateError extends HiveDriverError {
2323
public response?: TGetOperationStatusResp;
2424

2525
constructor(errorCode: OperationStateErrorCode, response?: TGetOperationStatusResp) {
26-
super(errorMessages[errorCode]);
26+
super(response?.displayMessage ?? errorMessages[errorCode]);
2727

2828
this.errorCode = errorCode;
2929
this.response = response;

lib/errors/StagingError.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export default class StagingError extends Error {}

tests/e2e/staging/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
const { expect } = require('chai');
2+
const config = require('./utils/config');
3+
const { DBSQLClient } = require('../..');
4+
const fs = require('fs');
5+
6+
// TODO Temporarily disable those tests until we figure out issues with E2E test env
7+
describe.skip('Staging Test', () => {
8+
it('put staging data and receive it', async () => {
9+
const client = new DBSQLClient();
10+
await client.connect({
11+
host: config.host,
12+
path: config.path,
13+
token: config.token,
14+
});
15+
let tempPath = 'tests/e2e/staging/data';
16+
fs.writeFileSync(tempPath, 'Hello World!');
17+
18+
const session = await client.openSession({
19+
initialCatalog: config.database[0],
20+
initialSchema: config.database[1],
21+
});
22+
await session.executeStatement(
23+
`PUT '${tempPath}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,
24+
{ stagingAllowedLocalPath: ['tests/e2e/staging'] },
25+
);
26+
await session.executeStatement(
27+
`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,
28+
{ stagingAllowedLocalPath: ['tests/e2e/staging'] },
29+
);
30+
let result = fs.readFileSync('tests/e2e/staging/file');
31+
expect(result.toString() === 'Hello World!').to.be.true;
32+
});
33+
34+
it('put staging data and remove it', async () => {
35+
const client = new DBSQLClient();
36+
await client.connect({
37+
host: config.host,
38+
path: config.path,
39+
token: config.token,
40+
});
41+
let tempPath = 'tests/e2e/staging/data';
42+
fs.writeFileSync(tempPath, (data = 'Hello World!'));
43+
44+
let session = await client.openSession({
45+
initialCatalog: config.database[0],
46+
initialSchema: config.database[1],
47+
});
48+
await session.executeStatement(
49+
`PUT '${tempPath}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,
50+
{ stagingAllowedLocalPath: ['tests/e2e/staging'] },
51+
);
52+
await session.executeStatement(`REMOVE '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv'`, {
53+
stagingAllowedLocalPath: ['tests/e2e/staging'],
54+
});
55+
});
56+
57+
it('delete non-existent data', async () => {
58+
const client = new DBSQLClient();
59+
await client.connect({
60+
host: config.host,
61+
path: config.path,
62+
token: config.token,
63+
});
64+
let tempPath = 'tests/e2e/staging/data';
65+
fs.writeFileSync(tempPath, (data = 'Hello World!'));
66+
67+
let session = await client.openSession({
68+
initialCatalog: config.database[0],
69+
initialSchema: config.database[1],
70+
});
71+
await session.executeStatement(
72+
`PUT '${tempPath}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,
73+
{ stagingAllowedLocalPath: ['tests/e2e/staging'] },
74+
);
75+
await session.executeStatement(
76+
`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,
77+
{ stagingAllowedLocalPath: ['tests/e2e/staging'] },
78+
);
79+
let result = fs.readFileSync('tests/e2e/staging/file');
80+
expect(result.toString() === 'Hello World!').to.be.true;
81+
});
82+
});

0 commit comments

Comments
 (0)