Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions google/cloud/odbc/bq_driver/internal/odbc_internal_commons.cc
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,25 @@ StatusRecordOr<PostQueryResults> PostQueryWithoutResults(
}
// For now , we use default options.
// We can set timeout here as needed later.

// --- BENCHMARK START: PostQuery (Client Library Call) ---
auto start_post_query = std::chrono::high_resolution_clock::now();

auto pq_status = bq_client->PostQuery(post_query_request, options);

// --- BENCHMARK END: PostQuery (Client Library Call) ---
auto end_post_query = std::chrono::high_resolution_clock::now();
auto elapsed_post_query = std::chrono::duration_cast<std::chrono::milliseconds>(end_post_query - start_post_query);

// ADDED: Static variable to track cumulative time across function calls
static long long cumulative_post_query_ms = 0;
cumulative_post_query_ms += elapsed_post_query.count();

// UPDATED: Debugging statement to print both current and cumulative time
std::cout << "[BENCHMARK] PostQueryWithoutResults -> bq_client->PostQuery: "
<< elapsed_post_query.count() << " ms | Cumulative: "
<< cumulative_post_query_ms << " ms\n";

if (!pq_status) {
LOG(ERROR) << "PostQueryWithoutResults::PostQuery:: "
<< pq_status.GetStatusRecord().message;
Expand Down
140 changes: 116 additions & 24 deletions google/cloud/odbc/bq_driver/internal/odbc_sql_columns.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ using ::google::cloud::odbc_internal::SQLStates;
using ::google::cloud::odbc_internal::StatusRecord;
using ::google::cloud::odbc_internal::StatusRecordOr;

std::string const kTableAndViewTypes =
"TABLE,VIEW,MATERIALIZED VIEW,EXTERNAL,SNAPSHOT,CLONE";

namespace {

bool IsTableNotFound(StatusRecord const& status) {
Expand Down Expand Up @@ -383,49 +380,104 @@ StatusRecordOr<std::vector<Table>> FetchBQTablesData(
StatementHandle& stmt_handle, std::string const& catalog,
std::string const& dataset_pattern, std::string const& table_pattern,
SQLULEN metadata_id) {

// --- BENCHMARK START: Entire Function ---
auto start_total = std::chrono::high_resolution_clock::now();

// Helper to easily print total time on any return path
auto print_total_time = [&start_total](const std::string& exit_point) {
auto end_total = std::chrono::high_resolution_clock::now();
auto el_total = std::chrono::duration_cast<std::chrono::milliseconds>(end_total - start_total).count();
std::cout << "[BENCHMARK] FetchBQTablesData (Total - " << exit_point << "): " << el_total << " ms\n";
};

std::vector<Table> result;
if (catalog.empty()) {
LOG(ERROR)
<< "FetchBQTablesData:: Catalog cannot be empty for BQ Data source.";
print_total_time("Early Exit: Empty Catalog");
return StatusRecord{SQLStates::k_HY000(),
"Catalog cannot be empty for BQ Data source"};
}
if (dataset_pattern.empty()) {
LOG(ERROR) << "FetchBQTablesData:: Dataset pattern cannot be empty for BQ "
"Data source.";
print_total_time("Early Exit: Empty Dataset Pattern");
return StatusRecord{SQLStates::k_HY000(),
"Dataset pattern cannot be empty for BQ Data source"};
}
if (table_pattern.empty()) {
LOG(ERROR) << "FetchBQTablesData:: Table pattern cannot be empty for BQ "
"Data source.";
print_total_time("Early Exit: Empty Table Pattern");
return StatusRecord{SQLStates::k_HY000(),
"Table pattern cannot be empty for BQ Data source"};
}
ConnectionHandle& conn_handle = *(stmt_handle.GetConnectionHandle());
if (!conn_handle.IsConnected()) {
LOG(ERROR)
<< "FetchBQTablesData:: Connection to the data source is broken.";
print_total_time("Early Exit: Broken Connection");
return StatusRecord{SQLStates::k_08S01(),
"Connection to the data source is broken"};
}
auto bq_client = conn_handle.GetClient();
if (!bq_client) {
LOG(ERROR) << "FetchBQTablesData:: Invalid or null BQ Client within the "
"connection handle.";
print_total_time("Early Exit: Null BQ Client");
return StatusRecord{
SQLStates::k_HY000(),
"Invalid or null BQ Client within the connection handle"};
}
// Get Datasets based on search pattern in the dataset argument
StatusRecordOr<std::vector<std::string>> datasets_status =
GetFilteredDatasetIds(*bq_client, catalog, dataset_pattern, metadata_id);
if (!datasets_status) {
LOG(ERROR) << "FetchBQTablesData::GetFilteredDatasetIds:: "
<< datasets_status.GetStatusRecord().message;
return datasets_status.GetStatusRecord();

bool const case_sensitive_match = metadata_id != SQL_TRUE;

if (case_sensitive_match && !IsSearchPatternArgument(dataset_pattern) &&
!IsSearchPatternArgument(table_pattern)) {
auto bq_table_status =
FetchBQTableData(conn_handle, catalog, dataset_pattern, table_pattern);
if (!bq_table_status) {
auto const& status = bq_table_status.GetStatusRecord();
// A non-existent table yields an empty result set, matching the discovery
// path which simply finds no matching tables.
if (IsTableNotFound(status)) {
print_total_time("Success: Fast Path (table not found)");
return result;
}
LOG(ERROR) << "FetchBQTablesData::FetchBQTableData (fast path):: "
<< status.message;
print_total_time("Early Exit: FetchBQTableData (fast path) Error");
return status;
}
result.push_back(std::move(*bq_table_status));
print_total_time("Success: Fast Path");
return result;
}

// --- BENCHMARK START: GetFilteredDatasetIds ---
auto start_datasets = std::chrono::high_resolution_clock::now();

std::vector<std::string> dataset_ids;
if (case_sensitive_match && !IsSearchPatternArgument(dataset_pattern)) {
dataset_ids.push_back(dataset_pattern);
} else {
StatusRecordOr<std::vector<std::string>> datasets_status =
GetFilteredDatasetIds(*bq_client, catalog, dataset_pattern, metadata_id);
if (!datasets_status) {
LOG(ERROR) << "FetchBQTablesData::GetFilteredDatasetIds:: "
<< datasets_status.GetStatusRecord().message;
print_total_time("Early Exit: GetFilteredDatasetIds Error");
return datasets_status.GetStatusRecord();
}
dataset_ids = std::move(*datasets_status);
}

auto end_datasets = std::chrono::high_resolution_clock::now();
std::cout << "[BENCHMARK] FetchBQTablesData -> GetFilteredDatasetIds: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_datasets - start_datasets).count() << " ms\n";
// --- BENCHMARK END ---

struct TableTaskInput {
std::size_t index;
std::string dataset;
Expand All @@ -442,47 +494,73 @@ StatusRecordOr<std::vector<Table>> FetchBQTablesData(
};

std::vector<DatasetTaskInput> dataset_tasks;
dataset_tasks.reserve(datasets_status->size());
for (std::size_t i = 0; i < datasets_status->size(); ++i) {
dataset_tasks.push_back({i, datasets_status->at(i)});
dataset_tasks.reserve(dataset_ids.size());
for (std::size_t i = 0; i < dataset_ids.size(); ++i) {
dataset_tasks.push_back({i, dataset_ids[i]});
}

// Run broad SQLColumns discovery in parallel: first table listing per
// dataset, then table metadata retrieval.
auto trace_option = TraceOptions::GetTraceOption();
if (trace_option == nullptr || trace_option->max_threads <= 0) {
print_total_time("Early Exit: Invalid MaxThreads");
return StatusRecord{SQLStates::k_HY000(),
"MaxThreads must be configured with a positive value"};
}
int max_threads = trace_option->max_threads;


std::regex const table_regex = BuildRegex(table_pattern, metadata_id);
auto fetch_tables_for_dataset_task = [&](DatasetTaskInput const& dataset_task)
-> StatusRecordOr<DatasetTablesBatch> {
StatusRecordOr<std::vector<FilteredTableResponse>> tables_status =
GetFilteredTables(stmt_handle, catalog, dataset_task.dataset,
table_pattern, kTableAndViewTypes, metadata_id);
if (!tables_status) {
LOG(ERROR) << "FetchBQTablesData::GetFilteredTables:: "
<< tables_status.GetStatusRecord().message;
return tables_status.GetStatusRecord();
}
Options options;
options.set<MaxRetriesOption>(conn_handle.GetDsn().max_retries);
auto tables_status =
bq_client->ListAllTables(catalog, dataset_task.dataset, options);

DatasetTablesBatch batch{
dataset_task.dataset_index, dataset_task.dataset, {}};
batch.table_names.reserve(tables_status->size());
for (auto const& filtered_table : *tables_status) {
batch.table_names.push_back(filtered_table.table_name);
if (!tables_status) {
auto const& status = tables_status.GetStatusRecord();

if (IsTableNotFound(status)) {
LOG(WARNING) << "FetchBQTablesData:: Skipping dataset not found or with "
<< "no tables: '" << dataset_task.dataset
<< "': " << status.message;
return batch;
}
LOG(ERROR) << "FetchBQTablesData::ListAllTables:: " << status.message;
return status;
}

for (auto const& list_table : *tables_status) {
// Match (and store) the table's real-cased id so the subsequent
// tables.get lookup succeeds even under case-insensitive matching.
std::string const& table_id = list_table.table_reference.table_id;
if (std::regex_match(table_id, table_regex)) {
batch.table_names.push_back(table_id);
}
}
return batch;
};

// --- BENCHMARK START: ExecuteParallelTasks (GetFilteredTables) ---
auto start_exec_tables = std::chrono::high_resolution_clock::now();

auto dataset_tables_results_or =
ExecuteParallelTasks<DatasetTaskInput, DatasetTablesBatch>(
max_threads, dataset_tasks, fetch_tables_for_dataset_task);

auto end_exec_tables = std::chrono::high_resolution_clock::now();
std::cout << "[BENCHMARK] FetchBQTablesData -> ExecuteParallelTasks(ListAllTables): "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_exec_tables - start_exec_tables).count() << " ms\n";
// --- BENCHMARK END ---

if (!dataset_tables_results_or) {
LOG(ERROR)
<< "FetchBQTablesData::ExecuteParallelTasks(GetFilteredTables):: "
<< dataset_tables_results_or.GetStatusRecord().message;
print_total_time("Early Exit: ExecuteParallelTasks(GetFilteredTables) Error");
return dataset_tables_results_or.GetStatusRecord();
}

Expand Down Expand Up @@ -524,12 +602,22 @@ StatusRecordOr<std::vector<Table>> FetchBQTablesData(
IndexedTable{task_input.index, std::move(*bq_table_status)});
};

// --- BENCHMARK START: ExecuteParallelTasks (FetchBQTableData) ---
auto start_exec_table_data = std::chrono::high_resolution_clock::now();

auto table_results_or =
ExecuteParallelTasks<TableTaskInput, optional<IndexedTable>>(
max_threads, table_tasks, fetch_table_task);

auto end_exec_table_data = std::chrono::high_resolution_clock::now();
std::cout << "[BENCHMARK] FetchBQTablesData -> ExecuteParallelTasks(FetchBQTableData): "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_exec_table_data - start_exec_table_data).count() << " ms\n";
// --- BENCHMARK END ---

if (!table_results_or) {
LOG(ERROR) << "FetchBQTablesData::ExecuteParallelTasks:: "
<< table_results_or.GetStatusRecord().message;
print_total_time("Early Exit: ExecuteParallelTasks(FetchBQTableData) Error");
return table_results_or.GetStatusRecord();
}

Expand All @@ -552,6 +640,10 @@ StatusRecordOr<std::vector<Table>> FetchBQTablesData(
for (auto& indexed_table : indexed_tables) {
result.push_back(std::move(indexed_table.table));
}

// Entire function success exit
print_total_time("Success");

return result;
}
} // namespace google::cloud::odbc_bq_driver_internal
Original file line number Diff line number Diff line change
Expand Up @@ -1805,6 +1805,117 @@ TEST(CatalogTest, SQLTables_Filter_DefaultDataset_SchemaNull) {
EXPECT_EQ(Disconnect(conn), SQL_SUCCESS);
}

TEST(CatalogTest, SQLTables_FullCatalogEnumeration_WildcardCatalog) {
auto conn = std::make_shared<ODBCHandles>();

// Power BI commonly sends CatalogName="%"
// during initial metadata discovery.
std::string catalog_pattern = "%";

std::string conn_str =
kDefaultConnectionString +
";FilterTablesOnDefaultDataset=0;";

ASSERT_EQ(Connect(conn_str, conn), SQL_SUCCESS);

SQLRETURN status = SQLSetStmtAttr(conn->hstmt,
SQL_ATTR_METADATA_ID,
(SQLPOINTER)SQL_FALSE,
0);

CheckError(status, "SQLSetStmtAttr", conn);

auto start_enum =
std::chrono::high_resolution_clock::now();

std::vector<SQLTableResult> tables =
Catalog::GetTables(
conn,
catalog_pattern.c_str(),
nullptr,
nullptr,
nullptr);

auto end_enum =
std::chrono::high_resolution_clock::now();

auto elapsed_enum =
std::chrono::duration_cast<std::chrono::milliseconds>(
end_enum - start_enum);

std::cout << "[BENCHMARK] Catalog::GetTables "
<< "(Full Catalog Enumeration using Catalog='%') : "
<< elapsed_enum.count()
<< " ms" << std::endl;

ASSERT_FALSE(tables.empty());

// Validate that at least one dataset/schema is returned
std::set<std::string> discovered_datasets;

for (auto const& table : tables) {
if (table.dataset_name.has_value()) {
discovered_datasets.insert(table.dataset_name.value());
}
}

EXPECT_GE(discovered_datasets.size(), 1u);

EXPECT_EQ(Disconnect(conn), SQL_SUCCESS);
}
TEST(CatalogTest, SQLColumns_WildcardColumnSearch) {
auto conn = std::make_shared<ODBCHandles>();

std::string dataset = "kirltest";
std::string table_name = "new_timestamp_table";

// Matches:
// timestamp_col_1
// timestamp_col_2
// ...
std::string column_pattern = "%timestamp%";

std::string conn_str =
kDefaultConnectionString +
";DefaultDataset=" + dataset + ";";

ASSERT_EQ(Connect(conn_str, conn), SQL_SUCCESS);

auto start =
std::chrono::high_resolution_clock::now();

std::vector<SQLColumnsResult> columns =
Catalog::GetColumns(
conn,
kCatalogName,
dataset.c_str(),
table_name.c_str(),
column_pattern.c_str());

auto end =
std::chrono::high_resolution_clock::now();

auto elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(
end - start);

std::cout << "[BENCHMARK] Catalog::GetColumns "
<< "(Wildcard Column Search) : "
<< elapsed.count()
<< " ms" << std::endl;

ASSERT_FALSE(columns.empty());

for (auto const& column : columns) {
EXPECT_EQ(column.dataset_name, dataset);
EXPECT_EQ(column.table_name, table_name);

EXPECT_NE(column.column_name.find("timestamp"),
std::string::npos);
}

EXPECT_EQ(Disconnect(conn), SQL_SUCCESS);
}
#ifdef BQ_DRIVER_INTEGRATION_TESTS
// This test case currently crashes with the existing ODBC Driver for BigQuery
// v3.1.6.1026. The crash occurs in SQLColumns when schema_name is NULL,
Expand Down
Loading