1- import thrift from 'thrift' ;
1+ import thrift , { HttpHeaders } from 'thrift' ;
22
33import { EventEmitter } from 'events' ;
44import TCLIService from '../thrift/TCLIService' ;
55import { TProtocolVersion } from '../thrift/TCLIService_types' ;
6- import IDBSQLClient , { ConnectionOptions , OpenSessionRequest , ClientOptions } from './contracts/IDBSQLClient' ;
6+ import IDBSQLClient , { ClientOptions , ConnectionOptions , OpenSessionRequest } from './contracts/IDBSQLClient' ;
77import HiveDriver from './hive/HiveDriver' ;
88import { Int64 } from './hive/Types' ;
99import DBSQLSession from './DBSQLSession' ;
1010import IDBSQLSession from './contracts/IDBSQLSession' ;
11- import IThriftConnection from './connection/contracts/IThriftConnection' ;
12- import IConnectionProvider from './connection/contracts/IConnectionProvider' ;
1311import IAuthentication from './connection/contracts/IAuthentication' ;
1412import HttpConnection from './connection/connections/HttpConnection' ;
1513import IConnectionOptions from './connection/contracts/IConnectionOptions' ;
1614import Status from './dto/Status' ;
1715import HiveDriverError from './errors/HiveDriverError' ;
18- import { buildUserAgentString , definedOrError } from './utils' ;
16+ import { areHeadersEqual , buildUserAgentString , definedOrError } from './utils' ;
1917import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication' ;
2018import DatabricksOAuth from './connection/auth/DatabricksOAuth' ;
2119import IDBSQLLogger , { LogLevel } from './contracts/IDBSQLLogger' ;
@@ -42,26 +40,25 @@ function getInitialNamespaceOptions(catalogName?: string, schemaName?: string) {
4240}
4341
4442export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
45- private client : TCLIService . Client | null ;
43+ private client : TCLIService . Client | null = null ;
4644
47- private connection : IThriftConnection | null ;
45+ private authProvider : IAuthentication | null = null ;
4846
49- private connectionProvider : IConnectionProvider ;
47+ private connectionOptions : ConnectionOptions | null = null ;
48+
49+ private additionalHeaders : HttpHeaders = { } ;
5050
5151 private readonly logger : IDBSQLLogger ;
5252
5353 private readonly thrift = thrift ;
5454
5555 constructor ( options ?: ClientOptions ) {
5656 super ( ) ;
57- this . connectionProvider = new HttpConnection ( ) ;
5857 this . logger = options ?. logger || new DBSQLLogger ( ) ;
59- this . client = null ;
60- this . connection = null ;
6158 this . logger . log ( LogLevel . info , 'Created DBSQLClient' ) ;
6259 }
6360
64- private getConnectionOptions ( options : ConnectionOptions ) : IConnectionOptions {
61+ private getConnectionOptions ( options : ConnectionOptions , headers : HttpHeaders ) : IConnectionOptions {
6562 const {
6663 host,
6764 port,
@@ -85,6 +82,7 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
8582 https : true ,
8683 ...otherOptions ,
8784 headers : {
85+ ...headers ,
8886 'User-Agent' : buildUserAgentString ( options . clientId ) ,
8987 } ,
9088 } ,
@@ -126,39 +124,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
126124 * const session = client.connect({host, path, token});
127125 */
128126 public async connect ( options : ConnectionOptions , authProvider ?: IAuthentication ) : Promise < IDBSQLClient > {
129- authProvider = this . getAuthProvider ( options , authProvider ) ;
130-
131- this . connection = await this . connectionProvider . connect ( this . getConnectionOptions ( options ) , authProvider ) ;
132-
133- this . client = this . thrift . createClient ( TCLIService , this . connection . getConnection ( ) ) ;
134-
135- this . connection . getConnection ( ) . on ( 'error' , ( error : Error ) => {
136- // Error.stack already contains error type and message, so log stack if available,
137- // otherwise fall back to just error type + message
138- this . logger . log ( LogLevel . error , error . stack || `${ error . name } : ${ error . message } ` ) ;
139- try {
140- this . emit ( 'error' , error ) ;
141- } catch ( e ) {
142- // EventEmitter will throw unhandled error when emitting 'error' event.
143- // Since we already logged it few lines above, just suppress this behaviour
144- }
145- } ) ;
146-
147- this . connection . getConnection ( ) . on ( 'reconnecting' , ( params : { delay : number ; attempt : number } ) => {
148- this . logger . log ( LogLevel . debug , `Reconnecting, params: ${ JSON . stringify ( params ) } ` ) ;
149- this . emit ( 'reconnecting' , params ) ;
150- } ) ;
151-
152- this . connection . getConnection ( ) . on ( 'close' , ( ) => {
153- this . logger . log ( LogLevel . debug , 'Closing connection.' ) ;
154- this . emit ( 'close' ) ;
155- } ) ;
156-
157- this . connection . getConnection ( ) . on ( 'timeout' , ( ) => {
158- this . logger . log ( LogLevel . debug , 'Connection timed out.' ) ;
159- this . emit ( 'timeout' ) ;
160- } ) ;
161-
127+ this . authProvider = this . getAuthProvider ( options , authProvider ) ;
128+ this . connectionOptions = options ;
162129 return this ;
163130 }
164131
@@ -172,11 +139,7 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
172139 * const session = await client.openSession();
173140 */
174141 public async openSession ( request : OpenSessionRequest = { } ) : Promise < IDBSQLSession > {
175- if ( ! this . connection ?. isConnected ( ) ) {
176- throw new HiveDriverError ( 'DBSQLClient: connection is lost' ) ;
177- }
178-
179- const driver = new HiveDriver ( this . getClient ( ) ) ;
142+ const driver = new HiveDriver ( ( ) => this . getClient ( ) ) ;
180143
181144 const response = await driver . openSession ( {
182145 client_protocol_i64 : new Int64 ( TProtocolVersion . SPARK_CLI_SERVICE_PROTOCOL_V6 ) ,
@@ -187,23 +150,64 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
187150 return new DBSQLSession ( driver , definedOrError ( response . sessionHandle ) , this . logger ) ;
188151 }
189152
190- public getClient ( ) {
191- if ( ! this . client ) {
192- throw new HiveDriverError ( 'DBSQLClient: client is not initialized' ) ;
153+ private async getClient ( ) {
154+ if ( ! this . connectionOptions || ! this . authProvider ) {
155+ throw new HiveDriverError ( 'DBSQLClient: not connected' ) ;
156+ }
157+
158+ const authHeaders = await this . authProvider . authenticate ( ) ;
159+ // When auth headers change - recreate client. Thrift library does not provide API for updating
160+ // changed options, therefore we have to recreate both connection and client to apply new headers
161+ if ( ! this . client || ! areHeadersEqual ( this . additionalHeaders , authHeaders ) ) {
162+ this . logger . log ( LogLevel . info , 'DBSQLClient: initializing thrift client' ) ;
163+ this . additionalHeaders = authHeaders ;
164+ const connectionOptions = this . getConnectionOptions ( this . connectionOptions , this . additionalHeaders ) ;
165+
166+ const connection = await this . createConnection ( connectionOptions ) ;
167+ this . client = this . thrift . createClient ( TCLIService , connection . getConnection ( ) ) ;
193168 }
194169
195170 return this . client ;
196171 }
197172
198- public async close ( ) : Promise < void > {
199- if ( this . connection ) {
200- const thriftConnection = this . connection . getConnection ( ) ;
173+ private async createConnection ( options : IConnectionOptions ) {
174+ const connectionProvider = new HttpConnection ( ) ;
175+ const connection = await connectionProvider . connect ( options ) ;
176+ const thriftConnection = connection . getConnection ( ) ;
201177
202- if ( typeof thriftConnection . end === 'function' ) {
203- this . connection . getConnection ( ) . end ( ) ;
178+ thriftConnection . on ( 'error' , ( error : Error ) => {
179+ // Error.stack already contains error type and message, so log stack if available,
180+ // otherwise fall back to just error type + message
181+ this . logger . log ( LogLevel . error , error . stack || `${ error . name } : ${ error . message } ` ) ;
182+ try {
183+ this . emit ( 'error' , error ) ;
184+ } catch ( e ) {
185+ // EventEmitter will throw unhandled error when emitting 'error' event.
186+ // Since we already logged it few lines above, just suppress this behaviour
204187 }
188+ } ) ;
205189
206- this . connection = null ;
207- }
190+ thriftConnection . on ( 'reconnecting' , ( params : { delay : number ; attempt : number } ) => {
191+ this . logger . log ( LogLevel . debug , `Reconnecting, params: ${ JSON . stringify ( params ) } ` ) ;
192+ this . emit ( 'reconnecting' , params ) ;
193+ } ) ;
194+
195+ thriftConnection . on ( 'close' , ( ) => {
196+ this . logger . log ( LogLevel . debug , 'Closing connection.' ) ;
197+ this . emit ( 'close' ) ;
198+ } ) ;
199+
200+ thriftConnection . on ( 'timeout' , ( ) => {
201+ this . logger . log ( LogLevel . debug , 'Connection timed out.' ) ;
202+ this . emit ( 'timeout' ) ;
203+ } ) ;
204+
205+ return connection ;
206+ }
207+
208+ public async close ( ) : Promise < void > {
209+ this . client = null ;
210+ this . authProvider = null ;
211+ this . connectionOptions = null ;
208212 }
209213}
0 commit comments