Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 10 additions & 27 deletions datastore/infrastructure/engines/bigquery/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
column_defs,
delete_sql,
drop_columns_sql,
format_select_column,
insert_sql,
merge_sql,
normalize_pk,
Expand Down Expand Up @@ -448,8 +449,7 @@ def upsert(
schema = self._read_schema(resource_id)
if schema is None:
raise NotFoundError(
f"resource {resource_id!r} is not declared; call "
"datastore_create before upsert"
f"resource {resource_id!r} not found."
)

rows = records or []
Expand Down Expand Up @@ -521,8 +521,7 @@ def search(
schema = self._read_schema(resource_id)
if schema is None:
raise NotFoundError(
f"resource {resource_id!r} is not declared; call "
"datastore_create first"
f"resource {resource_id!r} not found."
)

try:
Expand Down Expand Up @@ -867,8 +866,7 @@ def info(self, resource_id: str) -> InfoResult:
schema = self._read_schema(resource_id)
if schema is None:
raise NotFoundError(
f"resource {resource_id!r} is not declared; call "
"datastore_create first"
f"resource {resource_id!r} not found."
)

total = self._count_rows(resource_id)
Expand Down Expand Up @@ -1284,30 +1282,15 @@ def _build_export_select(schema: Any, fmt: str) -> str:
"""SELECT column list for EXPORT DATA.

Parquet preserves native logical types → `*` is enough. For CSV /
NDJSON, cast TIMESTAMP and DATETIME columns to ISO 8601 (BigQuery's
default text format uses a space separator and `UTC` suffix, which
most clients reject as non-ISO). DATE and TIME already serialise as
ISO and pass through.
NDJSON, every column goes through `format_select_column` (in
`bigquery/lib.py`) — the same helper `datastore_search` uses — so a
given column renders identically in a dump and in a search response.
"""
if fmt == "parquet":
return "*"
parts: list[str] = []
for field in schema:
ftype = (field.field_type or "").upper()
if ftype == "TIMESTAMP":
# `%E*S` keeps all fractional seconds; trailing Z marks UTC.
parts.append(
f"FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%E*SZ', "
f"`{field.name}`, 'UTC') AS `{field.name}`"
)
elif ftype == "DATETIME":
parts.append(
f"FORMAT_DATETIME('%Y-%m-%dT%H:%M:%E*S', `{field.name}`) "
f"AS `{field.name}`"
)
else:
parts.append(f"`{field.name}`")
return ", ".join(parts)
return ", ".join(
format_select_column(f.name, f.field_type) for f in schema
)


def _is_export_too_large(exc: BaseException) -> bool:
Expand Down
24 changes: 24 additions & 0 deletions datastore/infrastructure/engines/bigquery/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,30 @@ def _system_col_insert_list(include_updated_at: bool) -> str:
return "`_id`, `_updated_at`" if include_updated_at else "`_id`"


def format_select_column(name: str, bq_type: str | None) -> str:
"""Render a SELECT-list expression for one column, casting TIMESTAMP /
DATETIME values to a fixed-shape ISO 8601 string
(`YYYY-MM-DDTHH:MM:SS` — UTC implicit, no offset, no fractional).
Other types pass through unchanged.

Shared by `datastore_search` (engine-built projection) and the
`datastore_dump` `EXPORT DATA` SELECT, so both read paths emit the
same timestamp shape from BigQuery's side. TIMESTAMP is rendered in
UTC; the resulting string carries no offset, so consumers must treat
any timestamp value as UTC.
"""
bq = (bq_type or "").upper()
if bq == "TIMESTAMP":
return (
f"FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%S', `{name}`, 'UTC') AS `{name}`"
)
if bq == "DATETIME":
return (
f"FORMAT_DATETIME('%Y-%m-%dT%H:%M:%S', `{name}`) AS `{name}`"
)
return f"`{name}`"


def normalize_pk(schema: dict) -> list[str]:
"""`schema.primaryKey` as a list (str → 1-elem, missing → empty)."""
pk = schema.get("primaryKey")
Expand Down
31 changes: 28 additions & 3 deletions datastore/infrastructure/engines/bigquery/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
from datastore.infrastructure.engines.bigquery.lib import (
JSON_FRICTIONLESS_TYPES,
SYSTEM_COLUMN_NAMES,
format_select_column,
)
from datastore.infrastructure.engines.bigquery.types import bigquery_type

if TYPE_CHECKING:
from google.cloud import bigquery
Expand Down Expand Up @@ -203,6 +205,19 @@ def _build_where(
return " AND ".join(clauses)


def _project_column(col: str, type_map: dict[str, str]) -> str:
"""Render a projected column for `datastore_search` / `build_count`.

Translates the Frictionless type to BigQuery's name (via
`bigquery_type`) and delegates to `lib.format_select_column` — the
same helper `datastore_dump` uses for `EXPORT DATA`. Net effect:
TIMESTAMP / DATETIME columns come back as the fixed ISO 8601 string
`2026-06-08T00:00:00` (UTC implicit) in both endpoints; other
columns pass through as the native type.
"""
return format_select_column(col, bigquery_type(type_map.get(col)))


def build_search(
*,
table_ref: str,
Expand Down Expand Up @@ -265,16 +280,22 @@ def build_search(
)

parts: list[str] = []
projection = ", ".join(f"`{c}`" for c in projected)
projection = ", ".join(_project_column(c, type_map) for c in projected)
parts.append(
f"SELECT {'DISTINCT ' if distinct else ''}{projection} "
f"FROM {table_ref} AS t"
)
if where:
parts.append(f"WHERE {where}")
if sort_pairs:
# Sort against the underlying table column (`t.<col>`) rather than
# the projected alias — the datetime projection rewrites
# `col` → `FORMAT_TIMESTAMP(...) AS col`, and we want ORDER BY to
# operate on the native TIMESTAMP, not the formatted string. (The
# ISO format happens to sort lexicographically the same way, but
# the explicit reference future-proofs us if the format changes.)
parts.append(
"ORDER BY " + ", ".join(f"`{c}` {d}" for c, d in sort_pairs)
"ORDER BY " + ", ".join(f"t.`{c}` {d}" for c, d in sort_pairs)
)
parts.append(f"LIMIT {int(limit)} OFFSET {int(offset)}")

Expand Down Expand Up @@ -322,7 +343,11 @@ def build_count(
table_alias="t", params=params,
)

projection = ", ".join(f"`{c}`" for c in projected)
# Same projection rewrite as `build_search` — datetime columns are
# FORMAT_TIMESTAMP-wrapped. Matters when `distinct=True`: the COUNT
# then dedupes on the formatted (second-precision) string, matching
# what the user sees in the data response.
projection = ", ".join(_project_column(c, type_map) for c in projected)
inner_parts = [
f"SELECT {'DISTINCT ' if distinct else ''}{projection} "
f"FROM {table_ref} AS t"
Expand Down
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@
# it a dummy when the env doesn't carry one — tests override the CKAN
# client via DI, so this URL is never contacted.
os.environ.setdefault("CKAN_URL", "http://test-ckan.local")
# Force CKAN auth for the suite: the endpoint tests are built around
# `CKANAuthProvider` + `FakeCKAN`, and the dict-resource create branch +
# the read-only force guard are explicitly gated on `AUTH_TYPE == "ckan"`.
# A developer .env that flips this to `anonymous` would otherwise reroute
# those code paths and silently break ~7 endpoint tests locally (CI is
# fine because it has no .env). `os.environ[...] =` forces an override,
# unlike `setdefault` above which respects a CI-supplied value.
os.environ["AUTH_TYPE"] = "ckan"

from collections.abc import Iterator # noqa: E402
from typing import Any # noqa: E402
Expand Down
20 changes: 14 additions & 6 deletions tests/engines/bigquery/test_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ def test_upsert_undeclared_resource_raises_not_found(
backend = _backend(mock_client)
backend._read_schema = MagicMock(return_value=None)

with pytest.raises(NotFoundError, match="not declared"):
with pytest.raises(NotFoundError, match="not found"):
backend.upsert(
"ghost", [{"a": 1}], method="upsert", include_total=False
)
Expand Down Expand Up @@ -963,7 +963,7 @@ def test_info_raises_not_found_for_undeclared_resource(
backend = _backend(mock_client)
backend._read_schema = MagicMock(return_value=None)

with pytest.raises(NotFoundError, match="not declared"):
with pytest.raises(NotFoundError, match="not found"):
backend.info("ghost")
# No COUNT runs when the resource isn't declared.
mock_client.query.assert_not_called()
Expand Down Expand Up @@ -1050,7 +1050,10 @@ def test_build_search_renders_full_param_set() -> None:
assert sql.startswith("SELECT `auction_id`, `product_code` FROM `p.d.r` AS t")
assert "WHERE `product_code` = @f0 AND `accepted` = @f1" in sql
assert "SEARCH(t, @f2)" in sql
assert "ORDER BY `auction_id` DESC" in sql
# ORDER BY references the underlying table column (`t.<col>`) so the
# FORMAT_TIMESTAMP projection alias for datetime columns doesn't shadow
# the native value at sort time.
assert "ORDER BY t.`auction_id` DESC" in sql
assert sql.rstrip().endswith("LIMIT 100 OFFSET 25")
# Parameter types track the schema (STRING / BOOL / STRING for q).
by_name = {p.name: p for p in params}
Expand Down Expand Up @@ -1106,7 +1109,7 @@ def test_build_search_default_sort_is_id_asc() -> None:
limit=10,
offset=0,
)
assert "ORDER BY `_id` ASC" in sql
assert "ORDER BY t.`_id` ASC" in sql


def test_build_search_rejects_unknown_columns() -> None:
Expand Down Expand Up @@ -1257,8 +1260,13 @@ def test_search_unfiltered_uses_cheap_row_count(
sqls = [call.args[0] for call in mock_client.query.call_args_list]
# `_updated_at` rides along in default projection because the
# MagicMock config returns truthy for `INCLUDE_UPDATED_AT`.
# `_updated_at` is a `datetime` column → projection now wraps it in
# FORMAT_TIMESTAMP (UTC, no fractional, no offset) so the search
# response matches the dump endpoint's timestamp shape.
assert sqls[0].startswith(
"SELECT `_id`, `a`, `_updated_at` FROM `proj-1.ds-1.res-1` AS t"
"SELECT `_id`, `a`, "
"FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%S', `_updated_at`, 'UTC') "
"AS `_updated_at` FROM `proj-1.ds-1.res-1` AS t"
)
assert sqls[1] == (
"SELECT COUNT(*) AS n FROM `proj-1.ds-1.res-1`"
Expand All @@ -1274,7 +1282,7 @@ def test_search_raises_not_found_for_undeclared_resource(
backend = _backend(mock_client)
backend._read_schema = MagicMock(return_value=None)

with pytest.raises(NotFoundError, match="not declared"):
with pytest.raises(NotFoundError, match="not found"):
backend.search(
resource_id="ghost",
filters=None, q=None, distinct=False, plain=True,
Expand Down
11 changes: 9 additions & 2 deletions tests/test_datastore_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ def test_dump_with_denied_key_returns_403(


def test_build_export_select_iso_casts_timestamp_and_datetime() -> None:
"""TIMESTAMP / DATETIME columns render as `YYYY-MM-DDTHH:MM:SS` —
no timezone suffix, no fractional seconds. TIMESTAMP is formatted
in UTC (clients should assume UTC even though the string carries
no offset)."""
schema = [
_bq_field("auction_id", "INT64"),
_bq_field("delivery_start", "TIMESTAMP"),
Expand All @@ -165,13 +169,16 @@ def test_build_export_select_iso_casts_timestamp_and_datetime() -> None:
]
select = _build_export_select(schema, fmt="csv")
assert (
"FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%E*SZ', `delivery_start`, 'UTC')"
"FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%S', `delivery_start`, 'UTC')"
in select
)
assert (
"FORMAT_DATETIME('%Y-%m-%dT%H:%M:%E*S', `delivery_local`)"
"FORMAT_DATETIME('%Y-%m-%dT%H:%M:%S', `delivery_local`)"
in select
)
# No `Z` suffix and no `%E*S` (which would re-introduce fractional seconds).
assert "Z'," not in select
assert "%E*S" not in select
assert "`auction_id`" in select
assert "`delivery_day`" in select

Expand Down
Loading