Skip to content
This repository was archived by the owner on Mar 13, 2026. It is now read-only.

Commit dc62471

Browse files
committed
chore: fix biglake implementation
1 parent 1164fcc commit dc62471

4 files changed

Lines changed: 164 additions & 59 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ docs.metadata
5151
# Virtual environment
5252
env/
5353
venv/
54+
.venv/
5455

5556
# Test logs
5657
coverage.xml

pandas_gbq/core/biglake.py

Lines changed: 55 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,60 +2,76 @@
22
# Use of this source code is governed by a BSD-style
33
# license that can be found in the LICENSE file.
44

5+
"""
6+
Utilities for working with BigLake tables.
7+
"""
8+
9+
# TODO(tswast): Synchronize with bigframes/session/iceberg.py, which uses
10+
# pyiceberg and the BigLake APIs, rather than relying on dry run.
11+
512
from __future__ import annotations
613

714
import dataclasses
15+
from typing import Sequence
816

917
import google.auth.transport.requests
18+
import google.cloud.bigquery
1019
import google.oauth2.credentials
1120

12-
_ICEBERG_REST_CATALOG_URI = "https://biglake.googleapis.com/iceberg/v1/restcatalog"
13-
_TABLE_METADATA_PATH = (
14-
"/v1/projects/{project}/catalogs/{catalog}/namespaces/{namespace}/tables/{table}"
15-
)
21+
import pandas_gbq.core.resource_references
22+
23+
24+
_DRY_RUN_TEMPLATE = """
25+
SELECT *
26+
FROM `{project}.{catalog}.{namespace}.{table}`
27+
"""
28+
1629

30+
_COUNT_TEMPLATE = """
31+
SELECT COUNT(*) as total_rows
32+
FROM `{project}.{catalog}.{namespace}.{table}`
33+
"""
1734

1835
@dataclasses.dataclass(frozen=True)
19-
class BigLakeTableId:
20-
project: str
21-
catalog: str
22-
namespace: str
23-
table: str
36+
class BigLakeTableMetadata:
37+
schema: Sequence[google.cloud.bigquery.SchemaField]
38+
num_rows: int
2439

2540

2641
def get_table_metadata(
2742
*,
28-
table_id: str,
29-
credentials: google.oauth2.credentials.Credentials,
30-
billing_project_id: str,
31-
):
43+
reference: pandas_gbq.core.resource_references.BigLakeTableId,
44+
bqclient: google.cloud.bigquery.Client,
45+
) -> BigLakeTableMetadata:
3246
"""
33-
Docstring for get_table_metadata
47+
Get the schema for a BigLake table.
3448
35-
https://iceberg.apache.org/spec/#metrics;
36-
37-
curl -X GET -H "Authorization: Bearer \"$(gcloud auth application-default print-access-token)\"" \
38-
-H "Content-Type: application/json; charset=utf-8" \
39-
-H 'x-goog-user-project: swast-scratch' \
40-
-H 'X-Iceberg-Access-Delegation: vended-credentials' \
49+
Currently, this does some BigQuery queries. In the future, we'll want to get
50+
other metadata like the number of rows and storage bytes so that we can do a
51+
more accurate estimate of how many rows to sample.
4152
"""
42-
# https://iceberg.apache.org/spec/#metrics
43-
# total-files-size
44-
project, catalog, namespace, table = table_id.split(".")
45-
session = google.auth.transport.requests.AuthorizedSession(credentials=credentials)
46-
path = _TABLE_METADATA_PATH.format(
47-
project=project,
48-
catalog=catalog,
49-
namespace=namespace,
50-
table=table,
53+
dry_run_config = google.cloud.bigquery.QueryJobConfig(dry_run=True)
54+
query = _DRY_RUN_TEMPLATE.format(
55+
project=reference.project,
56+
catalog=reference.catalog,
57+
namespace=".".join(reference.namespace),
58+
table=reference.table,
5159
)
52-
return session.get(
53-
f"{_ICEBERG_REST_CATALOG_URI}.{path}",
54-
headers={
55-
"x-goog-user-project": billing_project_id,
56-
"Content-Type": "application/json; charset=utf-8",
57-
# TODO(tswast): parameter for this option (or get from catalog metadata?)
58-
# /iceberg/{$api_version}/restcatalog/extensions/{name=projects/*/catalogs/*}
59-
"X-Iceberg-Access-Delegation": "vended-credentials",
60-
},
61-
).json()
60+
job = bqclient.query(query, job_config=dry_run_config)
61+
job.result()
62+
schema = job.schema
63+
64+
count_rows = list(bqclient.query_and_wait(_COUNT_TEMPLATE.format(
65+
project=reference.project,
66+
catalog=reference.catalog,
67+
namespace=".".join(reference.namespace),
68+
table=reference.table,
69+
)))
70+
assert len(count_rows) == 1, "got unexpected query response when determining number of rows"
71+
total_rows = count_rows[0].total_rows
72+
73+
return BigLakeTableMetadata(
74+
schema=schema if schema is not None else [],
75+
num_rows=total_rows,
76+
)
77+
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Copyright (c) 2026 pandas-gbq Authors All rights reserved.
2+
# Use of this source code is governed by a BSD-style
3+
# license that can be found in the LICENSE file.
4+
5+
import dataclasses
6+
import re
7+
8+
9+
_TABLE_REFEREENCE_PATTERN = re.compile(
10+
# In the past, organizations could prefix their project IDs with a domain
11+
# name. Such projects still exist, especially at Google.
12+
r"^(?P<legacy_project_domain>[^:]+:)?"
13+
r"(?P<project>[^.]+)\."
14+
# Dataset for native BigQuery tables, catalog + namespace(s) for BigLake.
15+
r"(?P<inner_parts>([^.\s]+\.?)+)\."
16+
# Table names can't contain ".", as that's used as the separator.
17+
r"(?P<table>[^.]+)$"
18+
)
19+
20+
21+
@dataclasses.dataclass(frozen=True)
22+
class BigLakeTableId:
23+
project: str
24+
catalog: str
25+
namespace: tuple[str, ...]
26+
table: str
27+
28+
29+
@dataclasses.dataclass(frozen=True)
30+
class BigQueryTableId:
31+
project_id: str
32+
dataset_id: str
33+
table_id: str
34+
35+
36+
def parse_table_id(table_id: str) -> BigLakeTableId | BigQueryTableId:
37+
"""Turn a string into a BigLakeTableId or BigQueryTableId.
38+
39+
Raises:
40+
ValueError: If the table ID is invalid.
41+
"""
42+
regex_match = _TABLE_REFEREENCE_PATTERN.match(table_id)
43+
if not regex_match:
44+
raise ValueError(f"Invalid table ID: {table_id}")
45+
46+
inner_parts = regex_match.group("inner_parts").split(".")
47+
if len(inner_parts) == 1:
48+
return BigQueryTableId(
49+
project_id=regex_match.group("project"),
50+
dataset_id=inner_parts[0],
51+
table_id=regex_match.group("table"),
52+
)
53+
54+
return BigLakeTableId(
55+
project=regex_match.group("project"),
56+
catalog=inner_parts[0],
57+
namespace=tuple(inner_parts[1:]),
58+
table=regex_match.group("table"),
59+
)

pandas_gbq/core/sample.py

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import pandas_gbq.core.read
1717
import pandas_gbq.core.biglake
1818
import pandas_gbq.gbq_connector
19+
import pandas_gbq.core.resource_references
1920

2021
# Only import at module-level at type checking time to avoid circular
2122
# dependencies in the pandas package, which has an optional dependency on
@@ -53,7 +54,6 @@
5354
# TODO(tswast): Choose an estimate based on actual BigQuery stats.
5455
_ARRAY_LENGTH_ESTIMATE = 5
5556
_UNKNOWN_TYPE_SIZE_ESTIMATE = 4
56-
_MAX_ROW_BYTES = 100 * pandas_gbq.constants.BYTES_IN_MIB
5757
_MAX_AUTO_TARGET_BYTES = 1 * pandas_gbq.constants.BYTES_IN_GIB
5858

5959

@@ -62,15 +62,15 @@ def _calculate_target_bytes(target_mb: Optional[int]) -> int:
6262
return target_mb * pandas_gbq.constants.BYTES_IN_MIB
6363

6464
mem = psutil.virtual_memory()
65-
return min(_MAX_AUTO_TARGET_BYTES, max(_MAX_ROW_BYTES, mem.available // 4))
65+
return min(_MAX_AUTO_TARGET_BYTES, mem.available // 4)
6666

6767

6868
def _estimate_limit(
6969
*,
70-
target_bytes: int,
71-
table_bytes: Optional[int],
72-
table_rows: Optional[int],
7370
fields: Sequence[google.cloud.bigquery.SchemaField],
71+
target_bytes: int,
72+
table_bytes: Optional[int] = None,
73+
table_rows: Optional[int] = None,
7474
) -> int:
7575
if table_bytes and table_rows:
7676
proportion = target_bytes / table_bytes
@@ -119,8 +119,8 @@ def _estimate_row_bytes(fields: Sequence[google.cloud.bigquery.SchemaField]) ->
119119
Returns:
120120
An integer representing the estimated total row size in logical bytes.
121121
"""
122-
total_size = min(
123-
_MAX_ROW_BYTES,
122+
total_size = max(
123+
1,
124124
sum(_estimate_field_bytes(field) for field in fields),
125125
)
126126
return total_size
@@ -165,10 +165,11 @@ def _sample_with_tablesample(
165165
progress_bar_type: Optional[str] = None,
166166
use_bqstorage_api: bool = True,
167167
) -> Optional[pandas.DataFrame]:
168+
sample_percent = min(100, max(1, int(proportion * 100)))
168169
query = f"""
169170
SELECT *
170-
FROM `{table_id}`
171-
TABLESAMPLE SYSTEM ({float(proportion) * 100.0} PERCENT)
171+
FROM `{table_id}` t
172+
TABLESAMPLE SYSTEM ({sample_percent} PERCENT)
172173
ORDER BY RAND() DESC
173174
LIMIT {int(target_row_count)};
174175
"""
@@ -206,25 +207,55 @@ def _sample_with_limit(
206207

207208
def _sample_biglake_table(
208209
*,
209-
table_id: str,
210-
credentials: google.oauth2.credentials.Credentials,
210+
reference: pandas_gbq.core.resource_references.BigLakeTableId,
211211
bqclient: google.cloud.bigquery.Client,
212212
target_bytes: int,
213213
progress_bar_type: str | None,
214214
use_bqstorage_api: bool,
215215
) -> Optional[pandas.DataFrame]:
216-
pass
216+
metadata = pandas_gbq.core.biglake.get_table_metadata(
217+
reference=reference,
218+
bqclient=bqclient,
219+
)
220+
total_rows = metadata.num_rows
221+
222+
# Avoid divide by 0 when calculating proportions.
223+
if total_rows == 0:
224+
total_rows = 1
225+
226+
target_row_count = _estimate_limit(
227+
target_bytes=target_bytes,
228+
fields=metadata.schema,
229+
table_rows=total_rows,
230+
)
231+
proportion = max(0.01, target_row_count / total_rows)
232+
233+
# BigLake tables should always support table sample, since they are backed
234+
# by parquet files.
235+
return _sample_with_tablesample(
236+
f"{reference.project}.{reference.catalog}.{'.'.join(reference.namespace)}.{reference.table}",
237+
bqclient=bqclient,
238+
proportion=proportion,
239+
target_row_count=target_row_count,
240+
progress_bar_type=progress_bar_type,
241+
use_bqstorage_api=use_bqstorage_api,
242+
)
217243

218244

219245
def _sample_bq_table(
220246
*,
221-
table_id: str,
247+
reference: pandas_gbq.core.resource_references.BigQueryTableId,
222248
bqclient: google.cloud.bigquery.Client,
223249
target_bytes: int,
224250
progress_bar_type: str | None,
225251
use_bqstorage_api: bool,
226252
) -> Optional[pandas.DataFrame]:
227-
table = bqclient.get_table(table_id)
253+
table = bqclient.get_table(google.cloud.bigquery.TableReference(
254+
google.cloud.bigquery.DatasetReference(
255+
reference.project_id, reference.dataset_id
256+
),
257+
reference.table_id
258+
))
228259
num_rows = table.num_rows
229260
num_bytes = table.num_bytes
230261
table_type = table.table_type
@@ -342,24 +373,22 @@ def sample(
342373
connector = pandas_gbq.gbq_connector.GbqConnector(
343374
project_id=billing_project_id, credentials=credentials
344375
)
345-
credentials = cast(google.oauth2.credentials.Credentials, connector.credentials)
346376
bqclient = connector.get_client()
347377

348378
# BigLake tables can't be read directly by the BQ Storage Read API, so make
349379
# sure we run a query first.
350-
parts = table_id.split(".")
351-
if len(parts) == 4:
380+
reference = pandas_gbq.core.resource_references.parse_table_id(table_id)
381+
if isinstance(reference, pandas_gbq.core.resource_references.BigLakeTableId):
352382
return _sample_biglake_table(
353-
table_id=table_id,
354-
credentials=credentials,
383+
reference=reference,
355384
bqclient=bqclient,
356385
target_bytes=target_bytes,
357386
progress_bar_type=progress_bar_type,
358387
use_bqstorage_api=use_bqstorage_api,
359388
)
360389
else:
361390
return _sample_bq_table(
362-
table_id=table_id,
391+
reference=reference,
363392
bqclient=bqclient,
364393
target_bytes=target_bytes,
365394
progress_bar_type=progress_bar_type,

0 commit comments

Comments
 (0)