Skip to content
This repository was archived by the owner on Mar 26, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ export class Bigtable {
static Cluster: Cluster;
_metricsConfigManager: ClientSideMetricsConfigManager;
admin: admin.BigtableAdmin;
closed = false;

constructor(options: BigtableOptions = {}) {
// Determine what scopes are needed.
Expand Down Expand Up @@ -904,6 +905,27 @@ export class Bigtable {
let gaxStream: gax.CancellableStream;
let stream: AbortableDuplex;

if (this.closed) {
const error = Object.assign(
new Error('The client has already been closed.'),
{
name: 'Closed',
code: grpc.status.ABORTED,
details: 'The client has already been closed.',
metadata: new grpc.Metadata(),
},
);
if (isStreamMode) {
stream = streamEvents(new PassThrough({objectMode: true}));
stream.abort = () => {};
setImmediate(() => stream.destroy(error));
return stream;
} else {
callback?.(error as ServiceError);
return;
}
}

const prepareGaxRequest = (
callback: (err: Error | null, fn?: Function) => void,
) => {
Expand Down Expand Up @@ -1021,11 +1043,22 @@ export class Bigtable {
* Close all bigtable clients. New requests will be rejected but it will not
* kill connections with pending requests.
*/
close(): Promise<void[]> {
async close(): Promise<void[]> {
// Close all of the clients.
const combined = Object.keys(this.api).map(clientType =>
this.api[clientType].close(),
);
return Promise.all(combined);
const results = await Promise.all(combined);

// Clear them out of our cache.
Object.keys(this.api).forEach(clientType => {
delete this.api[clientType];
});

// Mark as closed.
this.closed = true;

return results;
}

/**
Expand Down
5 changes: 3 additions & 2 deletions src/row.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import {CallOptions} from 'google-gax';
import {ServiceError} from 'google-gax';
import {google} from '../protos/protos';
import {RowDataUtils, RowProperties} from './row-data-utils';
import {TabularApiSurface} from './tabular-api-surface';
import {GetRowsOptions, TabularApiSurface} from './tabular-api-surface';
import {getRowsInternal} from './utils/getRowsInternal';
import {
MethodName,
Expand Down Expand Up @@ -667,9 +667,10 @@ export class Row {
filter = arrify(filter).concat(options.filter);
}

const getRowsOptions = Object.assign({}, options, {
const getRowsOptions: GetRowsOptions = Object.assign({}, options, {
keys: [this.id],
filter,
limit: 1,
});

const metricsCollector =
Expand Down
5 changes: 5 additions & 0 deletions src/tabular-api-surface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,11 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
reqOpts,
gaxOpts: Object.assign({}, gaxOptions),
});
if (!requestStream) {
throw new Error(
'Failed to create request stream -- is the client already closed?',
);
}
metricsCollector.wrapRequest(requestStream);
const stream = pumpify.obj([requestStream, rowKeysStream]);
stream.on('end', () => {
Expand Down
20 changes: 0 additions & 20 deletions testproxy/known_failures.txt
Original file line number Diff line number Diff line change
@@ -1,20 +0,0 @@
TestMutateRow_Generic_DeadlineExceeded\|
TestMutateRow_Generic_CloseClient\|
TestMutateRow_Generic_DeadlineExceeded\|
TestReadModifyWriteRow_Generic_DeadlineExceeded\|
TestReadRow_Generic_DeadlineExceeded\|
TestMutateRows_Retry_WithRoutingCookie\|
TestReadRow_Generic_DeadlineExceeded\|
TestReadRow_Retry_WithRoutingCookie\|
TestReadRow_Retry_WithRetryInfo\|
TestReadRows_ReverseScans_FeatureFlag_Enabled\|
TestReadRows_NoRetry_OutOfOrderError_Reverse\|
TestReadRows_Retry_LastScannedRow_Reverse\|
TestReadRows_Retry_WithRoutingCookie\|
TestReadRows_Retry_WithRoutingCookie_MultipleErrorResponses\|
TestReadRows_Retry_WithRetryInfo\|
TestReadRows_Retry_WithRetryInfo_MultipleErrorResponse\|
TestSampleRowKeys_Retry_WithRoutingCookie\|
TestSampleRowKeys_Generic_CloseClient\|
TestSampleRowKeys_Generic_Headers\|
TestSampleRowKeys_NoRetry_NoEmptyKey\|
13 changes: 13 additions & 0 deletions testproxy/services/close-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

import {google} from '../protos/protos';
import {ClientImplMaker, normalizeCallback} from './utils';
import {closeBigtableClient} from './utils/bigtable-client';
type ICloseClientRequest = google.bigtable.testproxy.ICloseClientRequest;
type ICloseClientResponse = google.bigtable.testproxy.ICloseClientResponse;

import {log} from './utils/log';

export const closeClient: ClientImplMaker<
ICloseClientRequest,
ICloseClientResponse
Expand All @@ -26,8 +29,18 @@ export const closeClient: ClientImplMaker<
const {clientId} = request;
const bigtable = clientMap.get(clientId!);

log.info(
'close client %s (%s)',
clientId,
bigtable ? 'exists' : "doesn't exist",
);

if (bigtable) {
// closeBigtableClient closes the BigtableClient, but not the Bigtable
// object itself. We need to close the Bigtable object as well.
await closeBigtableClient(bigtable);
await bigtable.close();
log.info('client %s closed', clientId);
}
return {};
});
8 changes: 8 additions & 0 deletions testproxy/services/create-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {google} from '../protos/protos';
import * as grpc from '@grpc/grpc-js';
import {Bigtable} from '../../src';
import {createBigtableClient} from './utils/bigtable-client';
import {log} from './utils/log';

function durationToMilliseconds(
duration: google.protobuf.Duration | google.protobuf.IDuration,
Expand Down Expand Up @@ -68,6 +69,12 @@ export const createClient: ClientImplMaker<
);
}

log.info(
'create client %s (%s)',
clientId,
clientMap.has(clientId) ? 'exists' : "doesn't exist",
);

if (clientMap.has(clientId)) {
throw Object.assign(new Error(`Client ${clientId} already exists`), {
code: grpc.status.ALREADY_EXISTS,
Expand Down Expand Up @@ -100,6 +107,7 @@ export const createClient: ClientImplMaker<
appProfileId: appProfileId!,
clientConfig,
};
log.info('created bigtable client %s', clientId);
const bigtable = new Bigtable(options);
createBigtableClient(bigtable);
clientMap.set(clientId!, bigtable);
Expand Down
29 changes: 20 additions & 9 deletions testproxy/services/mutate-row.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,25 @@ export const mutateRow: ClientImplMaker<
const appProfileId = bigtable.appProfileId;
const client = getBigtableClient(bigtable);

await client.mutateRow({
appProfileId,
mutations,
tableName,
rowKey,
});
try {
await client.mutateRow({
appProfileId,
mutations,
tableName,
rowKey,
});

return {
status: {code: grpc.status.OK, details: []},
};
return {
status: {code: grpc.status.OK, details: []},
};
} catch (e) {
const error = e as GoogleError;
return {
status: {
code: error.code ? error.code : grpc.status.UNKNOWN,
message: error.message,
details: [],
},
};
}
});
31 changes: 22 additions & 9 deletions testproxy/services/read-row.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
getRowResponse,
getTableInfo,
} from './utils';
import {GoogleError} from 'google-gax';

export const readRow: ClientImplMaker<IReadRowRequest, IRowResult> = ({
clientMap,
Expand All @@ -32,15 +33,27 @@ export const readRow: ClientImplMaker<IReadRowRequest, IRowResult> = ({
const {clientId, rowKey, tableName} = rawRequest.request;
const columns = {};

const bigtable = clientMap.get(clientId!);
const table = getTableInfo(bigtable, tableName!);
const row = table.row(rowKey!);
try {
const bigtable = clientMap.get(clientId!);
const table = getTableInfo(bigtable, tableName!);
const row = table.row(rowKey!);

const res = await row.get(columns);
const firstRow = getRowResponse(res[0]);
const res = await row.get(columns);
const firstRow = getRowResponse(res[0]);

return {
status: {code: grpc.status.OK, details: []},
row: firstRow,
};
return {
status: {code: grpc.status.OK, details: []},
row: firstRow,
};
} catch (e) {
const error = e as GoogleError;
return {
status: {
code: error.code || grpc.status.FAILED_PRECONDITION,
// e.details must be in an empty array for the test runner to return the status. This is tracked in b/383096533.
details: [],
message: error.message,
},
};
}
});
3 changes: 2 additions & 1 deletion testproxy/services/read-rows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ export const readRows: ClientImplMaker<IReadRowsRequest, IRowsResult> = ({
const error = e as GoogleError;
return {
status: {
code: error.code,
// This might be zero/undefined if it's a disconnected client error.
code: error.code || grpc.status.FAILED_PRECONDITION,
// e.details must be in an empty array for the test runner to return the status. This is tracked in b/383096533.
details: [],
message: error.message,
Expand Down
21 changes: 14 additions & 7 deletions testproxy/services/sample-row-keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

import * as grpc from '@grpc/grpc-js';
import {GoogleError} from 'google-gax';
import {getSRKRequest} from './utils/request/sampleRowKeys';
import {ClientImplMaker, normalizeCallback} from './utils';

import {google} from '../protos/protos';
import {log} from './utils/log';
type ISampleRowKeysRequest = google.bigtable.testproxy.ISampleRowKeysRequest;
type ISampleRowKeysResult = google.bigtable.testproxy.ISampleRowKeysResult;

Expand All @@ -28,25 +28,32 @@ export const sampleRowKeys: ClientImplMaker<
normalizeCallback(async rawRequest => {
const {request} = rawRequest;
const {clientId, request: sampleRowKeysRequest} = request;
const {appProfileId, tableName} = sampleRowKeysRequest!;
const {tableName} = sampleRowKeysRequest!;

const bigtable = clientMap.get(clientId!);
bigtable.appProfileId = appProfileId!;

const [, , , instanceId, , tableId] = tableName!.split('/');
const instance = bigtable.instance(instanceId);
const table = instance.table(tableId);

try {
const response = await getSRKRequest(bigtable, {appProfileId, tableName});
const response = await table.sampleRowKeys();

log.info('sampleRowKeys response %o', response);
return {
status: {code: grpc.status.OK, details: []},
response,
samples: response[0].map(sample => ({
rowKey: sample.key,
offsetBytes: sample.offset,
})),
};
} catch (e) {
const error = e as GoogleError;
console.error('Error:', error.code);

return {
status: {
code: error.code,
// This might be zero/undefined if it's a disconnected client error.
code: error.code || grpc.status.FAILED_PRECONDITION,
details: [],
},
};
Expand Down
13 changes: 10 additions & 3 deletions testproxy/services/utils/bigtable-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,24 @@ export function createBigtableClient(bigtable: Bigtable) {
bigtableAny[v2] = new BigtableClient(bigtable.options.BigtableClient);
}

export function getBigtableClient(bigtable: Bigtable) {
export function getBigtableClient(bigtable: Bigtable): BigtableClient {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return (bigtable as any)[v2];
}

export async function deleteBigtableClient(bigtable: Bigtable) {
export async function closeBigtableClient(bigtable: Bigtable) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const bigtableAny = bigtable as any;

const bigtableClient = bigtableAny[v2];
const bigtableClient = bigtableAny[v2] as BigtableClient;
await bigtableClient.close();
}

export async function deleteBigtableClient(bigtable: Bigtable) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const bigtableAny = bigtable as any;

await closeBigtableClient(bigtable);

delete bigtableAny[v2];
}
18 changes: 18 additions & 0 deletions testproxy/services/utils/log.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2025-2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import {loggingUtils as logging} from 'google-gax';

// Debug logger to use with the testproxy.
export const log = logging.log('cbt-testproxy');
Loading
Loading