From e9b2bed411c2bbca467a41e04d679369d23ed216 Mon Sep 17 00:00:00 2001 From: Ilesh garish Date: Thu, 21 May 2026 11:09:44 -0700 Subject: [PATCH 1/4] Add Iceberg snapshot-id time travel support (version=) Adds a new ``version`` field to ``TimeTravelConfig`` that emits the Snowflake ``AT(VERSION => )`` SQL clause for Iceberg snapshot id time travel. The new kwarg is wired into: - ``snowflake.snowpark.Session.table(version=...)`` - ``snowflake.snowpark.DataFrameReader.table(version=...)`` - ``snowflake.snowpark.Table(version=...)`` For Spark Iceberg compatibility, ``DataFrameReader.option("snapshot-id", N)`` (and the ``snapshot_id`` variant) is also accepted: the helper ``_extract_time_travel_from_options`` aliases it to the ``version`` kwarg and auto-sets ``time_travel_mode="at"``. String snapshot ids are coerced to int. Validation: - ``version`` requires ``time_travel_mode='at'`` (matches Snowflake grammar and Spark Iceberg semantics of "snapshot N", not "before N") - ``version`` must be ``int`` (bool explicitly rejected) - exactly one of statement/offset/timestamp/stream/version Mirrors the existing ``iceberg_tag`` pattern. Tests in ``tests/unit/test_utils.py`` cover SQL emission, validation errors, and the Spark-compat option aliases. Co-authored-by: Cursor --- CHANGELOG.md | 7 ++ src/snowflake/snowpark/_internal/utils.py | 30 +++++- src/snowflake/snowpark/dataframe_reader.py | 69 +++++++++++- src/snowflake/snowpark/session.py | 11 +- src/snowflake/snowpark/table.py | 7 ++ tests/unit/test_utils.py | 118 +++++++++++++++++++++ 6 files changed, 235 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fba8710a2f..3a6fd399e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ ### Snowpark Python API Updates +#### New Features + +- Added support for Iceberg snapshot id time travel via the `version` parameter + (or `option("snapshot-id", N)` / `option("snapshot_id", N)` for Spark Iceberg + compatibility) on `Session.table` and `DataFrameReader.table`. Generates + `AT(VERSION => )` SQL. Only works with `time_travel_mode="at"`. + #### Documentation - Clarified that the JDBC driver JAR referenced via `udtf_configs.imports` in `DataFrameReader.jdbc()` must be downloaded from the database vendor and uploaded to a Snowflake stage. diff --git a/src/snowflake/snowpark/_internal/utils.py b/src/snowflake/snowpark/_internal/utils.py index b4350a06c4..f57aa99d55 100644 --- a/src/snowflake/snowpark/_internal/utils.py +++ b/src/snowflake/snowpark/_internal/utils.py @@ -1955,6 +1955,7 @@ class TimeTravelConfig(NamedTuple): timestamp: Optional[str] = None timestamp_type: Optional[str] = None stream: Optional[str] = None + version: Optional[int] = None @staticmethod def validate_and_normalize_params( @@ -1964,6 +1965,7 @@ def validate_and_normalize_params( timestamp: Optional[Union[str, datetime.datetime]] = None, timestamp_type: Optional[Union[str, "TimestampTimeZone"]] = None, stream: Optional[str] = None, + version: Optional[int] = None, ) -> Optional["TimeTravelConfig"]: """ Validates and normalizes time travel parameters. @@ -1986,7 +1988,7 @@ def validate_and_normalize_params( ValueError: If parameters are invalid. """ time_travel_arg_count = sum( - arg is not None for arg in (statement, offset, timestamp, stream) + arg is not None for arg in (statement, offset, timestamp, stream, version) ) # Validate mode @@ -2003,10 +2005,28 @@ def validate_and_normalize_params( f"Invalid time travel mode: {time_travel_mode}. Must be 'at' or 'before'." ) + # version (Iceberg snapshot id) only works with 'at' mode — matches + # Snowflake's ``AT(VERSION => )`` grammar and Spark Iceberg's + # ``snapshot-id`` option semantics ("read snapshot N", not "before N"). + if version is not None and time_travel_mode.lower() != "at": + raise ValueError( + "Iceberg snapshot version time travel can only be used with " + "time_travel_mode='at', not 'before'." + ) + + # Validate version type — snapshot IDs are 64-bit integers in Iceberg. + # Reject bool explicitly because ``isinstance(True, int)`` is True in Python. + if version is not None and ( + not isinstance(version, int) or isinstance(version, bool) + ): + raise ValueError( + f"'version' must be an int Iceberg snapshot id, got {type(version).__name__}." + ) + # Validate exactly one parameter is provided if time_travel_arg_count != 1: raise ValueError( - "Exactly one of 'statement', 'offset', 'timestamp', or 'stream' must be provided." + "Exactly one of 'statement', 'offset', 'timestamp', 'stream', or 'version' must be provided." ) # Normalize timestamp @@ -2040,6 +2060,7 @@ def validate_and_normalize_params( timestamp=normalized_timestamp, timestamp_type=timestamp_type, stream=stream, + version=version, ) def generate_sql_clause(self) -> str: @@ -2048,7 +2069,8 @@ def generate_sql_clause(self) -> str: Args: config: Time travel configuration. Returns: - SQL clause like " AT (TIMESTAMP => TO_TIMESTAMP_NTZ('...'))" + SQL clause like " AT (TIMESTAMP => TO_TIMESTAMP_NTZ('...'))" or + " AT (VERSION => 1234567890)" for Iceberg snapshot id time travel. """ clause = f" {self.time_travel_mode.upper()} " @@ -2058,6 +2080,8 @@ def generate_sql_clause(self) -> str: clause += f"(OFFSET => {self.offset})" elif self.stream is not None: clause += f"(STREAM => '{self.stream}')" + elif self.version is not None: + clause += f"(VERSION => {self.version})" elif self.timestamp is not None: if self.timestamp_type is not None: timestamp_type = self.timestamp_type.upper() diff --git a/src/snowflake/snowpark/dataframe_reader.py b/src/snowflake/snowpark/dataframe_reader.py index b6c28a559f..af6047875c 100644 --- a/src/snowflake/snowpark/dataframe_reader.py +++ b/src/snowflake/snowpark/dataframe_reader.py @@ -125,6 +125,7 @@ "TIMESTAMP": "timestamp", "TIMESTAMP_TYPE": "timestamp_type", "STREAM": "stream", + "VERSION": "version", } READER_OPTIONS_ALIAS_MAP = { @@ -162,6 +163,13 @@ def _extract_time_travel_from_options(options: dict) -> dict: - Automatically sets time_travel_mode to 'at' - Cannot be used with time_travel_mode='before' (raises error) - Cannot be mixed with regular 'timestamp' option (raises error) + + Special handling for 'SNAPSHOT-ID' / 'SNAPSHOT_ID' (Spark Iceberg + compatibility) — both aliases of the Snowpark ``version`` kwarg: + - Automatically sets time_travel_mode to 'at' + (Iceberg snapshot ids only support ``AT(VERSION => N)``, not ``BEFORE``) + - Cannot be used with time_travel_mode='before' (raises error) + - Cannot be mixed with explicit ``version`` option (raises error) """ result = {} excluded_keys = set() @@ -183,6 +191,33 @@ def _extract_time_travel_from_options(options: dict) -> dict: result["timestamp"] = options["AS-OF-TIMESTAMP"] excluded_keys.add("TIMESTAMP") + # Handle Spark Iceberg snapshot-id option (aliased to ``version``) + snapshot_id_value = options.get("SNAPSHOT-ID") + if snapshot_id_value is None: + snapshot_id_value = options.get("SNAPSHOT_ID") + if snapshot_id_value is not None: + if ( + "TIME_TRAVEL_MODE" in options + and options["TIME_TRAVEL_MODE"].lower() == "before" + ): + raise ValueError( + "Cannot use 'snapshot-id' option with time_travel_mode='before'. " + "Iceberg snapshot id time travel only supports time_travel_mode='at'." + ) + if "VERSION" in options: + raise ValueError("Cannot use both 'snapshot-id' and 'version' options.") + # Coerce string snapshot ids (Spark accepts both string and long literals + # via .option(); we normalize to int so the SQL emits an unquoted long). + try: + result["version"] = int(snapshot_id_value) + except (TypeError, ValueError): + raise ValueError( + f"'snapshot-id' must be a 64-bit integer Iceberg snapshot id, " + f"got {snapshot_id_value!r}." + ) + result["time_travel_mode"] = "at" + excluded_keys.add("VERSION") + for option_key, param_name in _TIME_TRAVEL_OPTIONS_PARAMS_MAP.items(): if option_key in options and option_key not in excluded_keys: result[param_name] = options[option_key] @@ -549,6 +584,7 @@ def table( timestamp: Optional[Union[str, datetime]] = None, timestamp_type: Optional[Union[str, TimestampTimeZone]] = None, stream: Optional[str] = None, + version: Optional[int] = None, ) -> Table: """Returns a Table that points to the specified table. @@ -568,6 +604,11 @@ def table( timestamp_type: Type of timestamp interpretation ('NTZ', 'LTZ', or 'TZ'). Can also be set via ``option("timestamp_type", "LTZ")``. stream: Stream name for time travel. Can also be set via ``option("stream", "stream_name")``. + version: Iceberg snapshot id (64-bit integer) for snapshot-based time + travel on Iceberg tables. Can also be set via + ``option("snapshot-id", 5129038471029384756)`` (Spark Iceberg + naming) or ``option("version", 5129038471029384756)``. + Automatically sets time_travel_mode='at'. Note: Time travel options can be set either as direct parameters or via the @@ -577,6 +618,9 @@ def table( PySpark Compatibility: The ``as-of-timestamp`` option automatically sets ``time_travel_mode="at"`` and cannot be used with ``time_travel_mode="before"``. + Spark Iceberg Compatibility: The ``snapshot-id`` option automatically sets + ``time_travel_mode="at"`` for snapshot-id-based time travel. + Examples:: # Using direct parameters @@ -591,6 +635,9 @@ def table( # PySpark-style as-of-timestamp (automatically sets mode to "at") >>> table = session.read.option("as-of-timestamp", "2023-01-01 12:00:00").table("my_table") # doctest: +SKIP + # Spark Iceberg snapshot-id time travel (automatically sets mode to "at") + >>> table = session.read.option("snapshot-id", 5129038471029384756).table("my_iceberg_table") # doctest: +SKIP + # timestamp_type automatically set to "TZ" due to timezone info >>> import datetime, pytz # doctest: +SKIP >>> tz_aware = datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=pytz.UTC) # doctest: +SKIP @@ -625,15 +672,25 @@ def table( ast.timestamp_type.value = str(timestamp_type) if stream is not None: ast.stream.value = stream - - if time_travel_mode is not None: + if version is not None and hasattr(ast, "version"): + ast.version.value = version + + if time_travel_mode is not None or version is not None: + # If version is provided without mode, default to 'at' (snapshot ids + # only make sense with AT — symmetric with iceberg_tag handling). + effective_mode = ( + time_travel_mode + if time_travel_mode + else ("at" if version is not None else None) + ) time_travel_params = { - "time_travel_mode": time_travel_mode, + "time_travel_mode": effective_mode, "statement": statement, "offset": offset, "timestamp": timestamp, "timestamp_type": timestamp_type, "stream": stream, + "version": version, } else: # if time_travel_mode is not provided, extract time travel config from options @@ -1163,11 +1220,17 @@ def option(self, key: str, value: Any, _emit_ast: bool = True) -> "DataFrameRead - ``timestamp``: Specific timestamp for time travel - ``timestamp_type``: Type of timestamp interpretation ('NTZ', 'LTZ', or 'TZ'). - ``stream``: Stream name for time travel. + - ``version``: Iceberg snapshot id (64-bit integer) for snapshot-based time travel. Special PySpark compatibility option: - ``as-of-timestamp``: Automatically sets ``time_travel_mode`` to "at" and uses the provided timestamp. Cannot be used with ``time_travel_mode="before"``. + Special Spark Iceberg compatibility option: + - ``snapshot-id`` (or ``snapshot_id``): Aliased to ``version``. Automatically sets + ``time_travel_mode`` to "at" and reads the table at the specified Iceberg snapshot + id. Cannot be used with ``time_travel_mode="before"``. + Args: key: Name of the option (e.g. ``compression``, ``skip_header``, ``time_travel_mode``, etc.). value: Value of the option. diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index c8bdc0ca5a..af5f9da104 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -2728,6 +2728,7 @@ def table( timestamp: Optional[Union[str, datetime.datetime]] = None, timestamp_type: Optional[Union[str, TimestampTimeZone]] = None, stream: Optional[str] = None, + version: Optional[int] = None, ) -> Table: """ Returns a Table that points the specified table. @@ -2739,12 +2740,16 @@ def table( _emit_ast: Whether to emit AST statements. time_travel_mode: Time travel mode, either 'at' or 'before'. - Exactly one of statement, offset, timestamp, or stream must be provided when time_travel_mode is set. + Exactly one of statement, offset, timestamp, stream, or version must be provided when time_travel_mode is set. statement: Query ID for time travel. offset: Negative integer representing seconds in the past for time travel. timestamp: Timestamp string or datetime object. timestamp_type: Type of timestamp interpretation ('NTZ', 'LTZ', or 'TZ'). stream: Stream name for time travel. + version: Iceberg snapshot id (64-bit integer) for snapshot-based time + travel on Iceberg tables. Can only be used with + ``time_travel_mode='at'``. Generates SQL clause like + ``AT(VERSION => 5129038471029384756)``. Note: If your table name contains special characters, use double quotes to mark it like this, ``session.table('"my table"')``. @@ -2766,6 +2771,7 @@ def table( >>> df_before = session.table("my_table", time_travel_mode="before", statement="01234567-abcd-1234-5678-123456789012") # doctest: +SKIP >>> df_offset = session.table("my_table", time_travel_mode="at", offset=-3600) # doctest: +SKIP >>> df_stream = session.table("my_table", time_travel_mode="at", stream="my_stream") # doctest: +SKIP + >>> df_iceberg_version = session.table("my_iceberg_table", time_travel_mode="at", version=5129038471029384756) # doctest: +SKIP # timestamp_type automatically set to "TZ" due to timezone info >>> import datetime, pytz # doctest: +SKIP @@ -2793,6 +2799,8 @@ def table( ast.timestamp_type.value = str(timestamp_type) if stream is not None: ast.stream.value = stream + if version is not None and hasattr(ast, "version"): + ast.version.value = version else: stmt = None @@ -2811,6 +2819,7 @@ def table( timestamp=timestamp, timestamp_type=timestamp_type, stream=stream, + version=version, ) # Replace API call origin for table set_api_call_source(t, "Session.table") diff --git a/src/snowflake/snowpark/table.py b/src/snowflake/snowpark/table.py index 43a53d5945..3f16d5215c 100644 --- a/src/snowflake/snowpark/table.py +++ b/src/snowflake/snowpark/table.py @@ -301,6 +301,7 @@ def __init__( timestamp: Optional[Union[str, datetime.datetime]] = None, timestamp_type: Optional[Union[str, TimestampTimeZone]] = None, stream: Optional[str] = None, + version: Optional[int] = None, ) -> None: if _ast_stmt is None and session is not None and _emit_ast: _ast_stmt = session._ast_batch.bind() @@ -320,6 +321,11 @@ def __init__( ast.timestamp_type.value = str(timestamp_type) if stream is not None: ast.stream.value = stream + # Guard with hasattr — the AST proto field is added in a separate + # proto change. Without it we still produce correct SQL; only AST + # replay/telemetry would lack the version value. + if version is not None and hasattr(ast, "version"): + ast.version.value = version time_travel_config = TimeTravelConfig.validate_and_normalize_params( time_travel_mode=time_travel_mode, @@ -328,6 +334,7 @@ def __init__( timestamp=timestamp, timestamp_type=timestamp_type, stream=stream, + version=version, ) snowflake_table_plan = SnowflakeTable( diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index a1e00a0dd3..78ea7a27bc 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -877,3 +877,121 @@ def test_generate_time_travel_sql_clause(): ) sql_clause = config.generate_sql_clause() assert sql_clause == " AT (STATEMENT => 'abc123')" + + +def test_time_travel_version_snapshot_id(): + """Test Iceberg snapshot id time travel via ``version`` parameter. + + Covers SQL generation, validation, and the ``mode='at'``-only restriction. + Verifies the SQL matches the Snowflake AT(VERSION => N) grammar for + Iceberg snapshot id time travel — unquoted long literal, no casting. + """ + # Valid: large int64 snapshot id emits unquoted long literal. + snapshot_id = 5129038471029384756 + config = TimeTravelConfig.validate_and_normalize_params( + time_travel_mode="at", version=snapshot_id + ) + assert config.version == snapshot_id + assert config.time_travel_mode == "at" + assert config.generate_sql_clause() == f" AT (VERSION => {snapshot_id})" + + # Negative snapshot ids are allowed (Iceberg occasionally uses negative + # hash-derived ids); validation is on type, not sign. + config_neg = TimeTravelConfig.validate_and_normalize_params( + time_travel_mode="at", version=-1 + ) + assert config_neg.generate_sql_clause() == " AT (VERSION => -1)" + + # Direct construction also generates the right SQL. + direct = TimeTravelConfig(time_travel_mode="AT", version=42) + assert direct.generate_sql_clause() == " AT (VERSION => 42)" + + # version + 'before' is invalid (Snowflake only supports AT(VERSION => ...)). + with pytest.raises( + ValueError, + match=r"Iceberg snapshot version time travel can only be used with time_travel_mode='at'", + ): + TimeTravelConfig.validate_and_normalize_params( + time_travel_mode="before", version=snapshot_id + ) + + # version + another time-travel param is invalid (must pick one). + with pytest.raises(ValueError, match="Exactly one of"): + TimeTravelConfig.validate_and_normalize_params( + time_travel_mode="at", version=snapshot_id, offset=-3600 + ) + with pytest.raises(ValueError, match="Exactly one of"): + TimeTravelConfig.validate_and_normalize_params( + time_travel_mode="at", + version=snapshot_id, + timestamp="2023-01-01 12:00:00", + ) + + # Non-int version is rejected. + with pytest.raises( + ValueError, match="'version' must be an int Iceberg snapshot id" + ): + TimeTravelConfig.validate_and_normalize_params( + time_travel_mode="at", version="abc" + ) + + # bool is explicitly rejected (isinstance(True, int) is True in Python). + with pytest.raises( + ValueError, match="'version' must be an int Iceberg snapshot id" + ): + TimeTravelConfig.validate_and_normalize_params( + time_travel_mode="at", version=True + ) + + # version alone (no mode) requires mode. + with pytest.raises(ValueError, match="Must specify time travel mode"): + TimeTravelConfig.validate_and_normalize_params(version=snapshot_id) + + +def test_extract_time_travel_snapshot_id_option(): + """Test Spark Iceberg 'snapshot-id' option extraction for the reader API.""" + from snowflake.snowpark.dataframe_reader import _extract_time_travel_from_options + + # Long int via SNAPSHOT-ID (Spark canonical key) + result = _extract_time_travel_from_options({"SNAPSHOT-ID": 5129038471029384756}) + assert result == { + "time_travel_mode": "at", + "version": 5129038471029384756, + } + + # Underscore variant + result = _extract_time_travel_from_options({"SNAPSHOT_ID": 42}) + assert result == {"time_travel_mode": "at", "version": 42} + + # String snapshot ids coerced to int (Spark accepts both via .option()) + result = _extract_time_travel_from_options({"SNAPSHOT-ID": "10963874102873"}) + assert result == {"time_travel_mode": "at", "version": 10963874102873} + + # snapshot-id + time_travel_mode='before' is rejected + with pytest.raises( + ValueError, + match=r"Cannot use 'snapshot-id' option with time_travel_mode='before'", + ): + _extract_time_travel_from_options( + {"SNAPSHOT-ID": 1, "TIME_TRAVEL_MODE": "before"} + ) + + # snapshot-id + explicit version conflict + with pytest.raises( + ValueError, match="Cannot use both 'snapshot-id' and 'version' options." + ): + _extract_time_travel_from_options({"SNAPSHOT-ID": 1, "VERSION": 2}) + + # Non-numeric snapshot-id is rejected + with pytest.raises( + ValueError, match="'snapshot-id' must be a 64-bit integer Iceberg snapshot id" + ): + _extract_time_travel_from_options({"SNAPSHOT-ID": "not-a-number"}) + + # VERSION option flows through directly without auto-setting mode (user + # must opt in to 'at' explicitly to use VERSION on its own — keeps the + # ``snapshot-id`` Spark-compat alias the only auto-mode path). + result = _extract_time_travel_from_options( + {"VERSION": 99, "TIME_TRAVEL_MODE": "at"} + ) + assert result == {"time_travel_mode": "at", "version": 99} From 9c2ff86b16c025553e3e78e6a5a6ce04724c6a97 Mon Sep 17 00:00:00 2001 From: Ilesh garish Date: Thu, 21 May 2026 17:08:49 -0700 Subject: [PATCH 2/4] Gate Iceberg snapshot-id time travel behind Session.iceberg_features_enabled MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an umbrella session flag (default ``False``) that gates the ``version=`` / ``snapshot-id`` time-travel surface added by the previous commit: * ``Session.iceberg_features_enabled`` — public bool property + setter (locking + bool coercion). Mirrors the existing pattern used by ``sql_simplifier_enabled`` / ``cte_optimization_enabled``. * ``Session._require_iceberg_features_enabled(feature=...)`` — internal helper that raises ``SnowparkClientException`` pointing users at the flag. Gates are placed at all three entry points so the kwarg and the Spark Iceberg ``snapshot-id`` / ``snapshot_id`` reader option are both rejected when the flag is off: * ``Session.table(name, version=...)`` * ``DataFrameReader.table(name, version=...)`` and ``.option("snapshot-id", N).table(...)`` (also catches ``snapshot_id``) * ``Table(name, ..., version=...)`` direct construction Default ``False`` because the underlying Snowflake feature (``AT(VERSION => N)`` on unmanaged Iceberg tables) is itself gated behind the ``FEATURE_ICEBERG_TIME_TRAVEL`` account parameter and currently scoped to Catalog-Linked Databases. Snowpark Connect (SAS) flips the flag per-session when its ``snowpark.connect.iceberg.timeTravel.enabled`` config is set. Tests ----- ``tests/unit/test_iceberg_features_flag.py`` (new, 13 tests): * Default + setter + helper semantics. * Each of the three entry-point gates raises when the flag is off and is bypassed cleanly when the flag is on. * Three AST-emission coverage tests patch ``with_src_position`` to return a stand-in with a ``version`` field, so the ``hasattr(ast, "version")``-guarded ``ast.version.value = version`` line in ``session.py`` / ``dataframe_reader.py`` / ``table.py`` actually executes. This addresses the codecov gap (3 missing lines flagged by the patch report on PR #4231). ``tests/integ/test_dataframe.py`` (3 new tests, all ``@pytest.mark.skip``): * End-to-end ``Session.table(..., version=N)`` against a multi-snapshot Iceberg table. * End-to-end ``session.read.option('snapshot-id', N).table(...)``. * End-to-end gate verification (flag off + ``version=`` → raises). All three are skipped because they need a Catalog-Linked Database (cldUnity / cldglue) with an unmanaged Iceberg table that has multiple snapshots AND ``FEATURE_ICEBERG_TIME_TRAVEL`` enabled on the account — neither of which the standard snowpark-python integ accounts have today. They run manually against sfctest0. A TODO above the block tracks wiring them into CI once an Iceberg-capable integ account exists. Addresses @sfc-gh-mayliu review comments on PR #4231 (integ test coverage) and the codecov patch-coverage drop. Co-authored-by: Cursor --- CHANGELOG.md | 6 + src/snowflake/snowpark/dataframe_reader.py | 6 + src/snowflake/snowpark/session.py | 51 +++++ src/snowflake/snowpark/table.py | 5 + tests/integ/test_dataframe.py | 92 +++++++++ tests/unit/test_iceberg_features_flag.py | 205 +++++++++++++++++++++ 6 files changed, 365 insertions(+) create mode 100644 tests/unit/test_iceberg_features_flag.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a6fd399e6..97357b4757 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,12 @@ (or `option("snapshot-id", N)` / `option("snapshot_id", N)` for Spark Iceberg compatibility) on `Session.table` and `DataFrameReader.table`. Generates `AT(VERSION => )` SQL. Only works with `time_travel_mode="at"`. + Disabled by default; opt in via `Session.iceberg_features_enabled = True` + because the underlying Snowflake feature (`AT(VERSION => N)` on unmanaged + Iceberg tables) is gated behind the `FEATURE_ICEBERG_TIME_TRAVEL` account + parameter and is currently scoped to Catalog-Linked Databases. Snowpark + Connect (SAS) flips the flag per-session when its + `snowpark.connect.iceberg.timeTravel.enabled` config is set. #### Documentation diff --git a/src/snowflake/snowpark/dataframe_reader.py b/src/snowflake/snowpark/dataframe_reader.py index af6047875c..e822be3940 100644 --- a/src/snowflake/snowpark/dataframe_reader.py +++ b/src/snowflake/snowpark/dataframe_reader.py @@ -652,6 +652,12 @@ def table( ... .option("offset", -60) # This will be IGNORED ... .table("my_table", time_travel_mode="at", offset=-3600)) # Only this is used """ + if version is not None or any( + self._cur_options.get(k) is not None for k in ("SNAPSHOT-ID", "SNAPSHOT_ID") + ): + self._session._require_iceberg_features_enabled( + feature="`version=` / `snapshot-id` time travel" + ) # AST. stmt = None diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index af5f9da104..23b8039a92 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -719,6 +719,14 @@ def __init__( # (SAS / snowpark-connect enables this during session configuration.) self._use_structured_type_infer_schema: bool = False + # Umbrella flag for Iceberg-specific Snowpark features that are not + # yet exposed by default to all Snowpark users. Currently gates the + # ``version=`` / ``snapshot-id`` time-travel kwargs on + # ``Session.table`` / ``DataFrameReader.table`` / ``Table``. + # Defaults to ``False``; SAS / snowpark-connect enables it per + # session when the corresponding SCOS config is set. + self._iceberg_features_enabled: bool = False + self._large_query_breakdown_enabled: bool = self.is_feature_enabled_for_version( _PYTHON_SNOWPARK_USE_LARGE_QUERY_BREAKDOWN_OPTIMIZATION_VERSION ) @@ -1050,6 +1058,44 @@ def cte_optimization_enabled(self) -> bool: """ return self._cte_optimization_enabled + @property + def iceberg_features_enabled(self) -> bool: + """Set to ``True`` to enable Iceberg-specific Snowpark features + (defaults to ``False``). + + Currently this gates the ``version=`` kwarg on :meth:`Session.table`, + :meth:`DataFrameReader.table`, and :class:`Table`, and the Spark + Iceberg ``snapshot-id`` / ``snapshot_id`` reader options. With the + flag off, supplying any of these raises ``SnowparkClientException``. + + Snowpark Connect (SAS) enables this flag per-session via the + ``snowpark.connect.iceberg.timeTravel.enabled`` SCOS config. + Direct Snowpark users must opt in explicitly because the underlying + Snowflake feature (``AT(VERSION => N)`` on unmanaged Iceberg tables) + is currently gated behind the ``FEATURE_ICEBERG_TIME_TRAVEL`` server + parameter. + """ + return self._iceberg_features_enabled + + @iceberg_features_enabled.setter + def iceberg_features_enabled(self, value: bool) -> None: + warn_session_config_update_in_multithreaded_mode("iceberg_features_enabled") + with self._lock: + self._iceberg_features_enabled = bool(value) + + def _require_iceberg_features_enabled(self, *, feature: str) -> None: + """Raise if Iceberg features are gated off on this session. + + ``feature`` is a short human-readable label of the API surface + being gated, used in the error message. + """ + if not self._iceberg_features_enabled: + raise SnowparkClientException( + f"{feature} is an Iceberg-only feature and is disabled by " + "default. Enable it by setting " + "`session.iceberg_features_enabled = True`." + ) + @property def eliminate_numeric_sql_value_cast_enabled(self) -> bool: return self._eliminate_numeric_sql_value_cast_enabled @@ -2781,6 +2827,11 @@ def table( # timestamp_type remains "NTZ" (user's explicit choice respected) >>> table2 = session.read.table("my_table", time_travel_mode="at", timestamp=tz_aware, timestamp_type="NTZ") # doctest: +SKIP """ + if version is not None: + self._require_iceberg_features_enabled( + feature="`version=` snapshot-id time travel" + ) + if _emit_ast: stmt = self._ast_batch.bind() ast = with_src_position(stmt.expr.table, stmt) diff --git a/src/snowflake/snowpark/table.py b/src/snowflake/snowpark/table.py index 3f16d5215c..d50ea84216 100644 --- a/src/snowflake/snowpark/table.py +++ b/src/snowflake/snowpark/table.py @@ -303,6 +303,11 @@ def __init__( stream: Optional[str] = None, version: Optional[int] = None, ) -> None: + if version is not None and session is not None: + session._require_iceberg_features_enabled( + feature="`version=` snapshot-id time travel" + ) + if _ast_stmt is None and session is not None and _emit_ast: _ast_stmt = session._ast_batch.bind() ast = with_src_position(_ast_stmt.expr.table, _ast_stmt) diff --git a/tests/integ/test_dataframe.py b/tests/integ/test_dataframe.py index dc069c2cb2..18151f0826 100644 --- a/tests/integ/test_dataframe.py +++ b/tests/integ/test_dataframe.py @@ -8326,3 +8326,95 @@ def test_time_travel_comprehensive_coverage(session): finally: Utils.drop_table(session, table1_name) Utils.drop_table(session, table2_name) + + +# ---------------------------------------------------------------------- +# Iceberg snapshot id (``version=``) time travel. +# +# TODO(SNOW-NNNNNNN): Wire these up to a CI test account that has: +# * a Catalog-Linked Database (CLD) such as cldUnity / cldglue, AND +# * an unmanaged Iceberg table inside it with at least two snapshots +# readable through ``INFORMATION_SCHEMA.GET_TABLE_VERSIONS(...)``. +# +# Snowflake's ``AT(VERSION => N)`` syntax requires the +# ``FEATURE_ICEBERG_TIME_TRAVEL`` server parameter to be enabled on the +# account and is currently scoped to **unmanaged** Iceberg tables in CLDs. +# Because the existing snowpark-python integ accounts don't have a CLD with +# a multi-snapshot Iceberg table provisioned, these tests are skipped by +# default and run manually against ``sfctest0`` (see +# ``tests/sas_tests/test_iceberg_snapshot_id_sample.py`` in the +# snowflake-eng/sas repo for the manual reproducer). +# ---------------------------------------------------------------------- +@pytest.mark.skip( + reason=( + "Requires a CLD-linked unmanaged Iceberg table with multiple " + "snapshots and FEATURE_ICEBERG_TIME_TRAVEL enabled on the account. " + "Tested manually; see TODO above." + ) +) +def test_iceberg_snapshot_id_time_travel_session_table(session): + """End-to-end: ``Session.table(..., version=)`` returns the + table state at the requested Iceberg snapshot.""" + session.iceberg_features_enabled = True + table_fqn = "CLDUNITY.scosschema.snapshot_demo" + + snapshot_ids = [ + row["SNAPSHOT_ID"] + for row in session.sql( + f"SELECT SNAPSHOT_ID FROM " + f"TABLE(INFORMATION_SCHEMA.GET_TABLE_VERSIONS('{table_fqn}')) " + "ORDER BY SNAPSHOT_TIMESTAMP" + ).collect() + ] + assert len(snapshot_ids) >= 2, "Demo table needs at least 2 snapshots" + + first_snapshot = session.table( + table_fqn, time_travel_mode="at", version=snapshot_ids[0] + ).collect() + latest = session.table(table_fqn).collect() + assert len(first_snapshot) <= len(latest) + + +@pytest.mark.skip( + reason=( + "Requires a CLD-linked unmanaged Iceberg table with multiple " + "snapshots and FEATURE_ICEBERG_TIME_TRAVEL enabled on the account. " + "Tested manually; see TODO above." + ) +) +def test_iceberg_snapshot_id_time_travel_dataframe_reader_option(session): + """End-to-end: ``session.read.option('snapshot-id', N).table(...)`` + routes through the Spark Iceberg-compat alias and produces the same + result as the explicit ``version=`` kwarg.""" + session.iceberg_features_enabled = True + table_fqn = "CLDUNITY.scosschema.snapshot_demo" + + snapshot_id = session.sql( + f"SELECT SNAPSHOT_ID FROM " + f"TABLE(INFORMATION_SCHEMA.GET_TABLE_VERSIONS('{table_fqn}')) " + "ORDER BY SNAPSHOT_TIMESTAMP LIMIT 1" + ).collect()[0]["SNAPSHOT_ID"] + + via_kwarg = session.read.table( + table_fqn, time_travel_mode="at", version=snapshot_id + ).collect() + via_option = ( + session.read.option("snapshot-id", snapshot_id).table(table_fqn).collect() + ) + assert via_kwarg == via_option + + +@pytest.mark.skip( + reason=( + "Requires a regular Snowflake account; doesn't need a CLD. " + "Tested manually until we wire up an Iceberg-capable integ account." + ) +) +def test_iceberg_snapshot_id_flag_gates_version_kwarg(session): + """End-to-end: with the umbrella flag OFF the ``version=`` kwarg must + raise ``SnowparkClientException`` and not reach the server.""" + from snowflake.snowpark.exceptions import SnowparkClientException + + session.iceberg_features_enabled = False + with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): + session.table("ANY_TABLE", time_travel_mode="at", version=1) diff --git a/tests/unit/test_iceberg_features_flag.py b/tests/unit/test_iceberg_features_flag.py new file mode 100644 index 0000000000..1a1bf9514c --- /dev/null +++ b/tests/unit/test_iceberg_features_flag.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved. +# + +# +# Copyright (c) 2012-2026 Snowflake Computing Inc. All rights reserved. +# +"""Unit tests for the `Session.iceberg_features_enabled` umbrella flag and +the `version=` / `snapshot-id` time-travel gates it controls. + +These tests use mocked connections so they don't require a live Snowflake +account. End-to-end validation against an unmanaged Iceberg table lives in +`tests/integ/test_dataframe.py::test_iceberg_snapshot_id_time_travel*` +(skipped by default — see TODO there). +""" + +from types import SimpleNamespace +from unittest import mock + +import pytest + +import snowflake.snowpark.session +from snowflake.snowpark._internal.server_connection import ServerConnection +from snowflake.snowpark.exceptions import SnowparkClientException +from snowflake.snowpark.table import Table + + +def _build_session(*, iceberg_features_enabled=False): + """Create a Session bound to an autospec'd ServerConnection. + + Mirrors the pattern in `tests/unit/test_dataframe.py::test_table_source_plan`. + The session has a real `_ast_batch`, so AST emission code paths execute + against real protos. + """ + mock_connection = mock.create_autospec(ServerConnection) + mock_connection._conn = mock.MagicMock() + mock_connection._thread_safe_session_enabled = True + session = snowflake.snowpark.session.Session(mock_connection) + session.iceberg_features_enabled = iceberg_features_enabled + return session + + +# --------------------------------------------------------------------------- +# Flag plumbing: default, setter, public property. +# --------------------------------------------------------------------------- +def test_iceberg_features_enabled_default_is_false(): + """Default must be ``False`` so existing Snowpark users see no behavior + change. SCOS / snowpark-connect opt in per-session.""" + session = _build_session() + assert session.iceberg_features_enabled is False + assert session._iceberg_features_enabled is False + + +def test_iceberg_features_enabled_setter_round_trip(): + """Public setter accepts truthy/falsy values and coerces to bool.""" + session = _build_session() + + session.iceberg_features_enabled = True + assert session.iceberg_features_enabled is True + + session.iceberg_features_enabled = False + assert session.iceberg_features_enabled is False + + # Truthy non-bool is coerced. + session.iceberg_features_enabled = 1 + assert session.iceberg_features_enabled is True + session.iceberg_features_enabled = 0 + assert session.iceberg_features_enabled is False + + +def test_require_iceberg_features_enabled_raises_when_off(): + """Helper raises a clear SnowparkClientException pointing to the flag.""" + session = _build_session(iceberg_features_enabled=False) + with pytest.raises(SnowparkClientException) as exc_info: + session._require_iceberg_features_enabled(feature="X") + assert "iceberg_features_enabled" in str(exc_info.value) + assert "X" in str(exc_info.value) + + +def test_require_iceberg_features_enabled_noop_when_on(): + """Helper returns without raising when the flag is on.""" + session = _build_session(iceberg_features_enabled=True) + session._require_iceberg_features_enabled(feature="X") # no raise + + +# --------------------------------------------------------------------------- +# Session.table gate. +# --------------------------------------------------------------------------- +def test_session_table_rejects_version_when_flag_off(): + """`Session.table(name, version=N)` must raise with the flag off.""" + session = _build_session(iceberg_features_enabled=False) + with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): + session.table("MY_TABLE", time_travel_mode="at", version=12345) + + +def test_session_table_does_not_raise_without_version(): + """Plain `session.table("...")` still works with the flag off.""" + session = _build_session(iceberg_features_enabled=False) + # Not asserting on the returned Table internals here — just that the + # gate doesn't fire when ``version`` is not supplied. + t = session.table("MY_TABLE") + assert isinstance(t, Table) + + +# --------------------------------------------------------------------------- +# DataFrameReader.table gate (covers both ``version=`` kwarg and the Spark +# Iceberg ``snapshot-id`` reader option path). +# --------------------------------------------------------------------------- +def test_dataframe_reader_table_rejects_version_kwarg_when_flag_off(): + session = _build_session(iceberg_features_enabled=False) + with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): + session.read.table("MY_TABLE", time_travel_mode="at", version=1) + + +@pytest.mark.parametrize("opt_key", ["snapshot-id", "snapshot_id"]) +def test_dataframe_reader_table_rejects_snapshot_id_option_when_flag_off(opt_key): + """The Spark-compat ``snapshot-id`` / ``snapshot_id`` option must also be + gated — otherwise a user could bypass the flag through `.option(...)`.""" + session = _build_session(iceberg_features_enabled=False) + with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): + session.read.option(opt_key, 12345).table("MY_TABLE") + + +# --------------------------------------------------------------------------- +# Table.__init__ gate (direct construction). +# --------------------------------------------------------------------------- +def test_table_init_rejects_version_when_flag_off(): + session = _build_session(iceberg_features_enabled=False) + with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): + Table("MY_TABLE", session=session, time_travel_mode="at", version=1) + + +# --------------------------------------------------------------------------- +# AST `version` emission coverage. The Snowpark AST proto's Table / +# ReadTable messages don't yet carry a `version` field; the source guards +# the assignment with ``hasattr(ast, "version")``. To exercise the +# assignment (and credit coverage to the line) we patch ``with_src_position`` +# to return a stand-in object that *does* have a `version` attribute, so the +# hasattr check is True. +# --------------------------------------------------------------------------- +def _make_ast_sentinel(): + """Return a SimpleNamespace masquerading as an AST proto with a + `version` field that has a writable `.value`. + + All other fields fall through to a MagicMock so unrelated assignments + (e.g. `ast.time_travel_mode.value = ...`) succeed without raising. + """ + base = mock.MagicMock() + base.version = SimpleNamespace(value=None) + return base + + +def test_session_table_emits_version_into_ast_when_proto_supports_it(): + """Covers `session.py` ``ast.version.value = version`` branch. + + ``_emit_ast=True`` is passed explicitly because the ``@publicapi`` + decorator otherwise overrides it to ``is_ast_enabled()`` (False on a + plain mock-connected session). + """ + session = _build_session(iceberg_features_enabled=True) + sentinel = _make_ast_sentinel() + with mock.patch( + "snowflake.snowpark.session.with_src_position", return_value=sentinel + ): + session.table("MY_TABLE", _emit_ast=True, time_travel_mode="at", version=98765) + assert sentinel.version.value == 98765 + + +def test_dataframe_reader_table_emits_version_into_ast_when_proto_supports_it(): + """Covers `dataframe_reader.py` ``ast.version.value = version`` branch. + + The reader's AST block additionally requires ``self._ast is not None``; + AST is globally disabled on a plain mock-connected session, so we both + pass ``_emit_ast=True`` and assign a non-None ``_ast`` placeholder on the + reader. + """ + session = _build_session(iceberg_features_enabled=True) + reader = session.read + reader._ast = mock.MagicMock() # satisfy `self._ast is not None` guard + + sentinel = _make_ast_sentinel() + with mock.patch( + "snowflake.snowpark.dataframe_reader.with_src_position", + return_value=sentinel, + ): + reader.table("MY_TABLE", _emit_ast=True, time_travel_mode="at", version=42) + assert sentinel.version.value == 42 + + +def test_table_init_emits_version_into_ast_when_proto_supports_it(): + """Covers `table.py` ``ast.version.value = version`` branch.""" + session = _build_session(iceberg_features_enabled=True) + sentinel = _make_ast_sentinel() + with mock.patch( + "snowflake.snowpark.table.with_src_position", return_value=sentinel + ): + Table( + "MY_TABLE", + session=session, + _emit_ast=True, + time_travel_mode="at", + version=7, + ) + assert sentinel.version.value == 7 From 6abaad4800d378c043ff7dea96564173cde24bc2 Mon Sep 17 00:00:00 2001 From: Ilesh garish Date: Tue, 26 May 2026 11:52:38 -0700 Subject: [PATCH 3/4] Address review feedback from May Lieu (SNOW-3525585) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Make `option("version", ...)` auto-set `time_travel_mode="at"` so all three aliases (snapshot-id / snapshot_id / version) share the same semantics and the docstring claim "Automatically sets time_travel_mode='at'" holds for every documented path. Reject `option("version", N) + time_travel_mode="before"` for the same reason Snowflake rejects it for snapshot-id: AT(VERSION => N) is the only valid form. - Extend the `iceberg_features_enabled` gate in `DataFrameReader.table` to also cover `option("version", ...)`. Previously a user could bypass the umbrella flag through the Snowpark-native reader option. - Drop the dead `version` AST emission lines (and matching unit tests) in `Session.table`, `DataFrameReader.table`, and `Table.__init__`. The Table / ReadTable protos have no `version` field and the feature is parameter-protected (gated behind `iceberg_features_enabled`, consumed only by Snowpark Connect), so AST replay/telemetry is not warranted today. Comments at each call site point to the one-line restore when the proto is extended. - Drop the Iceberg snapshot-id entry from `CHANGELOG.md` since the feature is gated off by default and only Snowpark Connect flips it. - Unskip `test_iceberg_snapshot_id_flag_gates_version_kwarg` — it only exercises the client-side raise and needs neither a CLD nor an Iceberg table. Also broaden it to cover the reader paths (`session.read.table(version=...)`, `option("snapshot-id", ...)`, `option("version", ...)`). - Swap the `TODO(SNOW-NNNNNNN)` placeholder above the skipped CLD integ tests for the actual parent JIRA, SNOW-3525585. Co-authored-by: Cursor --- CHANGELOG.md | 13 ---- src/snowflake/snowpark/dataframe_reader.py | 44 +++++++---- src/snowflake/snowpark/session.py | 7 +- src/snowflake/snowpark/table.py | 10 +-- tests/integ/test_dataframe.py | 34 ++++++--- tests/unit/test_iceberg_features_flag.py | 87 +++------------------- tests/unit/test_utils.py | 37 +++++++-- 7 files changed, 101 insertions(+), 131 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 97357b4757..fba8710a2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,19 +4,6 @@ ### Snowpark Python API Updates -#### New Features - -- Added support for Iceberg snapshot id time travel via the `version` parameter - (or `option("snapshot-id", N)` / `option("snapshot_id", N)` for Spark Iceberg - compatibility) on `Session.table` and `DataFrameReader.table`. Generates - `AT(VERSION => )` SQL. Only works with `time_travel_mode="at"`. - Disabled by default; opt in via `Session.iceberg_features_enabled = True` - because the underlying Snowflake feature (`AT(VERSION => N)` on unmanaged - Iceberg tables) is gated behind the `FEATURE_ICEBERG_TIME_TRAVEL` account - parameter and is currently scoped to Catalog-Linked Databases. Snowpark - Connect (SAS) flips the flag per-session when its - `snowpark.connect.iceberg.timeTravel.enabled` config is set. - #### Documentation - Clarified that the JDBC driver JAR referenced via `udtf_configs.imports` in `DataFrameReader.jdbc()` must be downloaded from the database vendor and uploaded to a Snowflake stage. diff --git a/src/snowflake/snowpark/dataframe_reader.py b/src/snowflake/snowpark/dataframe_reader.py index e822be3940..a094130ac8 100644 --- a/src/snowflake/snowpark/dataframe_reader.py +++ b/src/snowflake/snowpark/dataframe_reader.py @@ -165,11 +165,12 @@ def _extract_time_travel_from_options(options: dict) -> dict: - Cannot be mixed with regular 'timestamp' option (raises error) Special handling for 'SNAPSHOT-ID' / 'SNAPSHOT_ID' (Spark Iceberg - compatibility) — both aliases of the Snowpark ``version`` kwarg: - - Automatically sets time_travel_mode to 'at' + compatibility) — both aliases of the Snowpark ``version`` kwarg. + All three (``SNAPSHOT-ID``, ``SNAPSHOT_ID``, ``VERSION``): + - Automatically set time_travel_mode to 'at' (Iceberg snapshot ids only support ``AT(VERSION => N)``, not ``BEFORE``) - Cannot be used with time_travel_mode='before' (raises error) - - Cannot be mixed with explicit ``version`` option (raises error) + - Cannot be mixed with each other (raises error) """ result = {} excluded_keys = set() @@ -191,29 +192,38 @@ def _extract_time_travel_from_options(options: dict) -> dict: result["timestamp"] = options["AS-OF-TIMESTAMP"] excluded_keys.add("TIMESTAMP") - # Handle Spark Iceberg snapshot-id option (aliased to ``version``) + # Handle Iceberg snapshot id (Spark `snapshot-id` / `snapshot_id` plus the + # Snowpark-native `version` option). All three aliases auto-set mode='at' + # since `AT(VERSION => N)` is the only valid Snowflake form. snapshot_id_value = options.get("SNAPSHOT-ID") + snapshot_id_source = "snapshot-id" if snapshot_id_value is None: snapshot_id_value = options.get("SNAPSHOT_ID") + snapshot_id_source = "snapshot_id" + if snapshot_id_value is not None and "VERSION" in options: + raise ValueError("Cannot use both 'snapshot-id' and 'version' options.") + if snapshot_id_value is None and "VERSION" in options: + snapshot_id_value = options["VERSION"] + snapshot_id_source = "version" if snapshot_id_value is not None: if ( "TIME_TRAVEL_MODE" in options and options["TIME_TRAVEL_MODE"].lower() == "before" ): raise ValueError( - "Cannot use 'snapshot-id' option with time_travel_mode='before'. " - "Iceberg snapshot id time travel only supports time_travel_mode='at'." + f"Cannot use '{snapshot_id_source}' option with " + "time_travel_mode='before'. Iceberg snapshot id time travel " + "only supports time_travel_mode='at'." ) - if "VERSION" in options: - raise ValueError("Cannot use both 'snapshot-id' and 'version' options.") - # Coerce string snapshot ids (Spark accepts both string and long literals - # via .option(); we normalize to int so the SQL emits an unquoted long). + # Coerce string snapshot ids (Spark accepts both string and long + # literals via .option(); we normalize to int so the SQL emits an + # unquoted long). try: result["version"] = int(snapshot_id_value) except (TypeError, ValueError): raise ValueError( - f"'snapshot-id' must be a 64-bit integer Iceberg snapshot id, " - f"got {snapshot_id_value!r}." + f"'{snapshot_id_source}' must be a 64-bit integer Iceberg " + f"snapshot id, got {snapshot_id_value!r}." ) result["time_travel_mode"] = "at" excluded_keys.add("VERSION") @@ -653,7 +663,8 @@ def table( ... .table("my_table", time_travel_mode="at", offset=-3600)) # Only this is used """ if version is not None or any( - self._cur_options.get(k) is not None for k in ("SNAPSHOT-ID", "SNAPSHOT_ID") + self._cur_options.get(k) is not None + for k in ("VERSION", "SNAPSHOT-ID", "SNAPSHOT_ID") ): self._session._require_iceberg_features_enabled( feature="`version=` / `snapshot-id` time travel" @@ -678,8 +689,11 @@ def table( ast.timestamp_type.value = str(timestamp_type) if stream is not None: ast.stream.value = stream - if version is not None and hasattr(ast, "version"): - ast.version.value = version + # NOTE: ``version`` is intentionally NOT emitted to the AST. The + # ReadTable proto has no ``version`` field and the feature is + # parameter-protected (gated behind `iceberg_features_enabled`, + # consumed by Snowpark Connect only). When the proto is extended, + # restore a single ``ast.version.value = version`` line here. if time_travel_mode is not None or version is not None: # If version is provided without mode, default to 'at' (snapshot ids diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index 23b8039a92..ddecbd93a0 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -2850,8 +2850,11 @@ def table( ast.timestamp_type.value = str(timestamp_type) if stream is not None: ast.stream.value = stream - if version is not None and hasattr(ast, "version"): - ast.version.value = version + # NOTE: ``version`` is intentionally NOT emitted to the AST. The + # Table proto has no ``version`` field and the feature is + # parameter-protected (gated behind `iceberg_features_enabled`, + # consumed by Snowpark Connect only). When the proto is extended, + # restore a single ``ast.version.value = version`` line here. else: stmt = None diff --git a/src/snowflake/snowpark/table.py b/src/snowflake/snowpark/table.py index d50ea84216..c811e77442 100644 --- a/src/snowflake/snowpark/table.py +++ b/src/snowflake/snowpark/table.py @@ -326,11 +326,11 @@ def __init__( ast.timestamp_type.value = str(timestamp_type) if stream is not None: ast.stream.value = stream - # Guard with hasattr — the AST proto field is added in a separate - # proto change. Without it we still produce correct SQL; only AST - # replay/telemetry would lack the version value. - if version is not None and hasattr(ast, "version"): - ast.version.value = version + # NOTE: ``version`` is intentionally NOT emitted to the AST. The + # Table proto has no ``version`` field and the feature is + # parameter-protected (gated behind `iceberg_features_enabled`, + # consumed by Snowpark Connect only). When the proto is extended, + # restore a single ``ast.version.value = version`` line here. time_travel_config = TimeTravelConfig.validate_and_normalize_params( time_travel_mode=time_travel_mode, diff --git a/tests/integ/test_dataframe.py b/tests/integ/test_dataframe.py index 18151f0826..4bfe5f6583 100644 --- a/tests/integ/test_dataframe.py +++ b/tests/integ/test_dataframe.py @@ -8331,7 +8331,7 @@ def test_time_travel_comprehensive_coverage(session): # ---------------------------------------------------------------------- # Iceberg snapshot id (``version=``) time travel. # -# TODO(SNOW-NNNNNNN): Wire these up to a CI test account that has: +# TODO(SNOW-3525585): Wire these up to a CI test account that has: # * a Catalog-Linked Database (CLD) such as cldUnity / cldglue, AND # * an unmanaged Iceberg table inside it with at least two snapshots # readable through ``INFORMATION_SCHEMA.GET_TABLE_VERSIONS(...)``. @@ -8404,17 +8404,27 @@ def test_iceberg_snapshot_id_time_travel_dataframe_reader_option(session): assert via_kwarg == via_option -@pytest.mark.skip( - reason=( - "Requires a regular Snowflake account; doesn't need a CLD. " - "Tested manually until we wire up an Iceberg-capable integ account." - ) -) def test_iceberg_snapshot_id_flag_gates_version_kwarg(session): - """End-to-end: with the umbrella flag OFF the ``version=`` kwarg must - raise ``SnowparkClientException`` and not reach the server.""" + """The umbrella flag-off path raises client-side, before any SQL is + emitted — so this test needs neither a CLD nor an Iceberg table; any + session will do. + """ from snowflake.snowpark.exceptions import SnowparkClientException - session.iceberg_features_enabled = False - with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): - session.table("ANY_TABLE", time_travel_mode="at", version=1) + original = session.iceberg_features_enabled + try: + session.iceberg_features_enabled = False + # Session.table path + with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): + session.table("ANY_TABLE", time_travel_mode="at", version=1) + # DataFrameReader.table path via the ``version=`` kwarg + with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): + session.read.table("ANY_TABLE", time_travel_mode="at", version=1) + # DataFrameReader.table path via the ``option("snapshot-id", ...)`` alias + with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): + session.read.option("snapshot-id", 1).table("ANY_TABLE") + # DataFrameReader.table path via the ``option("version", ...)`` alias + with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): + session.read.option("version", 1).table("ANY_TABLE") + finally: + session.iceberg_features_enabled = original diff --git a/tests/unit/test_iceberg_features_flag.py b/tests/unit/test_iceberg_features_flag.py index 1a1bf9514c..ea89f9738e 100644 --- a/tests/unit/test_iceberg_features_flag.py +++ b/tests/unit/test_iceberg_features_flag.py @@ -15,7 +15,6 @@ (skipped by default — see TODO there). """ -from types import SimpleNamespace from unittest import mock import pytest @@ -113,10 +112,12 @@ def test_dataframe_reader_table_rejects_version_kwarg_when_flag_off(): session.read.table("MY_TABLE", time_travel_mode="at", version=1) -@pytest.mark.parametrize("opt_key", ["snapshot-id", "snapshot_id"]) +@pytest.mark.parametrize("opt_key", ["snapshot-id", "snapshot_id", "version"]) def test_dataframe_reader_table_rejects_snapshot_id_option_when_flag_off(opt_key): - """The Spark-compat ``snapshot-id`` / ``snapshot_id`` option must also be - gated — otherwise a user could bypass the flag through `.option(...)`.""" + """Every reader option that resolves to ``version=`` must be gated — + ``snapshot-id`` / ``snapshot_id`` (Spark Iceberg compat) AND the + Snowpark-native ``version`` key. Otherwise a user could bypass the + flag through ``.option(...)``.""" session = _build_session(iceberg_features_enabled=False) with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): session.read.option(opt_key, 12345).table("MY_TABLE") @@ -131,75 +132,9 @@ def test_table_init_rejects_version_when_flag_off(): Table("MY_TABLE", session=session, time_travel_mode="at", version=1) -# --------------------------------------------------------------------------- -# AST `version` emission coverage. The Snowpark AST proto's Table / -# ReadTable messages don't yet carry a `version` field; the source guards -# the assignment with ``hasattr(ast, "version")``. To exercise the -# assignment (and credit coverage to the line) we patch ``with_src_position`` -# to return a stand-in object that *does* have a `version` attribute, so the -# hasattr check is True. -# --------------------------------------------------------------------------- -def _make_ast_sentinel(): - """Return a SimpleNamespace masquerading as an AST proto with a - `version` field that has a writable `.value`. - - All other fields fall through to a MagicMock so unrelated assignments - (e.g. `ast.time_travel_mode.value = ...`) succeed without raising. - """ - base = mock.MagicMock() - base.version = SimpleNamespace(value=None) - return base - - -def test_session_table_emits_version_into_ast_when_proto_supports_it(): - """Covers `session.py` ``ast.version.value = version`` branch. - - ``_emit_ast=True`` is passed explicitly because the ``@publicapi`` - decorator otherwise overrides it to ``is_ast_enabled()`` (False on a - plain mock-connected session). - """ - session = _build_session(iceberg_features_enabled=True) - sentinel = _make_ast_sentinel() - with mock.patch( - "snowflake.snowpark.session.with_src_position", return_value=sentinel - ): - session.table("MY_TABLE", _emit_ast=True, time_travel_mode="at", version=98765) - assert sentinel.version.value == 98765 - - -def test_dataframe_reader_table_emits_version_into_ast_when_proto_supports_it(): - """Covers `dataframe_reader.py` ``ast.version.value = version`` branch. - - The reader's AST block additionally requires ``self._ast is not None``; - AST is globally disabled on a plain mock-connected session, so we both - pass ``_emit_ast=True`` and assign a non-None ``_ast`` placeholder on the - reader. - """ - session = _build_session(iceberg_features_enabled=True) - reader = session.read - reader._ast = mock.MagicMock() # satisfy `self._ast is not None` guard - - sentinel = _make_ast_sentinel() - with mock.patch( - "snowflake.snowpark.dataframe_reader.with_src_position", - return_value=sentinel, - ): - reader.table("MY_TABLE", _emit_ast=True, time_travel_mode="at", version=42) - assert sentinel.version.value == 42 - - -def test_table_init_emits_version_into_ast_when_proto_supports_it(): - """Covers `table.py` ``ast.version.value = version`` branch.""" - session = _build_session(iceberg_features_enabled=True) - sentinel = _make_ast_sentinel() - with mock.patch( - "snowflake.snowpark.table.with_src_position", return_value=sentinel - ): - Table( - "MY_TABLE", - session=session, - _emit_ast=True, - time_travel_mode="at", - version=7, - ) - assert sentinel.version.value == 7 +# NOTE: AST coverage for ``version`` was intentionally removed. The Snowpark +# AST proto's Table / ReadTable messages don't carry a ``version`` field and +# this feature is parameter-protected (gated behind ``iceberg_features_enabled`` +# and consumed only by Snowpark Connect), so the AST emission is left out by +# design. When the proto is extended, restore one assignment line per call +# site and add coverage tests then. diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 78ea7a27bc..54aaf18ccd 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -949,7 +949,12 @@ def test_time_travel_version_snapshot_id(): def test_extract_time_travel_snapshot_id_option(): - """Test Spark Iceberg 'snapshot-id' option extraction for the reader API.""" + """Test Iceberg snapshot id option extraction for the reader API. + + All three aliases (``snapshot-id`` / ``snapshot_id`` / ``version``) + share the same auto-mode semantics so the docstring claim + ``Automatically sets time_travel_mode='at'`` holds for every path. + """ from snowflake.snowpark.dataframe_reader import _extract_time_travel_from_options # Long int via SNAPSHOT-ID (Spark canonical key) @@ -967,6 +972,16 @@ def test_extract_time_travel_snapshot_id_option(): result = _extract_time_travel_from_options({"SNAPSHOT-ID": "10963874102873"}) assert result == {"time_travel_mode": "at", "version": 10963874102873} + # ``option("version", N)`` (Snowpark-native key) also auto-sets mode='at' + result = _extract_time_travel_from_options({"VERSION": 99}) + assert result == {"time_travel_mode": "at", "version": 99} + + # ``option("version", N)`` explicit ``at`` is a no-op overlap + result = _extract_time_travel_from_options( + {"VERSION": 99, "TIME_TRAVEL_MODE": "at"} + ) + assert result == {"time_travel_mode": "at", "version": 99} + # snapshot-id + time_travel_mode='before' is rejected with pytest.raises( ValueError, @@ -976,6 +991,14 @@ def test_extract_time_travel_snapshot_id_option(): {"SNAPSHOT-ID": 1, "TIME_TRAVEL_MODE": "before"} ) + # ``version`` + ``before`` is rejected too — Iceberg snapshot ids only + # support AT(VERSION => N), not BEFORE. + with pytest.raises( + ValueError, + match=r"Cannot use 'version' option with time_travel_mode='before'", + ): + _extract_time_travel_from_options({"VERSION": 1, "TIME_TRAVEL_MODE": "before"}) + # snapshot-id + explicit version conflict with pytest.raises( ValueError, match="Cannot use both 'snapshot-id' and 'version' options." @@ -988,10 +1011,8 @@ def test_extract_time_travel_snapshot_id_option(): ): _extract_time_travel_from_options({"SNAPSHOT-ID": "not-a-number"}) - # VERSION option flows through directly without auto-setting mode (user - # must opt in to 'at' explicitly to use VERSION on its own — keeps the - # ``snapshot-id`` Spark-compat alias the only auto-mode path). - result = _extract_time_travel_from_options( - {"VERSION": 99, "TIME_TRAVEL_MODE": "at"} - ) - assert result == {"time_travel_mode": "at", "version": 99} + # Non-numeric ``version`` is rejected through the same code path + with pytest.raises( + ValueError, match="'version' must be a 64-bit integer Iceberg snapshot id" + ): + _extract_time_travel_from_options({"VERSION": "not-a-number"}) From 5c4a7df0c2834f3b9a833810add086b0e1656ac6 Mon Sep 17 00:00:00 2001 From: Ilesh garish Date: Wed, 27 May 2026 15:08:32 -0700 Subject: [PATCH 4/4] Drop iceberg_features_enabled flag; hide version= behind **kwargs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review feedback from May Lieu and Aling: - Aling: drop the umbrella ``Session.iceberg_features_enabled`` property and ``_require_iceberg_features_enabled`` helper. The flag added more mental burden to direct Snowpark users without giving us a real isolation benefit — the underlying ``AT(VERSION => N)`` syntax is already account-gated by ``FEATURE_ICEBERG_TIME_TRAVEL`` server-side, so any caller without it on will get a server error anyway. - May Lieu: drop ``tests/unit/test_iceberg_features_flag.py`` entirely (the only test for the removed flag), which also resolves the duplicate copyright header she flagged in that file. To keep the ``version=`` time-travel kwarg an internal-only surface consumed by Snowpark Connect (rather than a first-class public API), ``version`` is no longer in the public signatures of ``Session.table`` / ``DataFrameReader.table`` / ``Table.__init__``. Each method now accepts ``**kwargs`` and pops ``version`` inside, so direct callers can still pass it but it's not advertised in the docstring, doctest examples, or Sphinx-rendered signature. For the same reason, the public ``option("version", N)`` alias is dropped (``VERSION`` removed from ``_TIME_TRAVEL_OPTIONS_PARAMS_MAP`` and from ``_extract_time_travel_from_options``). The Spark Iceberg ``option("snapshot-id", N)`` / ``option("snapshot_id", N)`` aliases remain — those are Spark-compat surface, not Snowpark-native, and they auto-set ``time_travel_mode='at'`` exactly as before. Tests ----- - ``tests/unit/test_utils.py::test_extract_time_travel_snapshot_id_option`` is trimmed to cover only ``snapshot-id`` / ``snapshot_id`` (the ``option("version", ...)`` cases that no longer exist are removed). - ``tests/unit/test_utils.py::test_time_travel_version_snapshot_id`` is kept — ``version`` is still a valid ``TimeTravelConfig`` field, just internal-only. - Integ tests in ``tests/integ/test_dataframe.py``: the ``test_iceberg_snapshot_id_flag_gates_version_kwarg`` test is removed (no flag to gate), and ``session.iceberg_features_enabled = True`` setup lines are dropped from the two remaining (skipped) CLD-required E2E tests. Local: ``tests/unit/test_utils.py`` (15 tests) and ``tests/unit/test_dataframe.py`` + ``tests/unit/test_session.py`` (88+1 skipped) all green. Co-authored-by: Cursor --- src/snowflake/snowpark/dataframe_reader.py | 55 ++------ src/snowflake/snowpark/session.py | 71 ++--------- src/snowflake/snowpark/table.py | 18 +-- tests/integ/test_dataframe.py | 28 ----- tests/unit/test_iceberg_features_flag.py | 140 --------------------- tests/unit/test_utils.py | 37 +----- 6 files changed, 36 insertions(+), 313 deletions(-) delete mode 100644 tests/unit/test_iceberg_features_flag.py diff --git a/src/snowflake/snowpark/dataframe_reader.py b/src/snowflake/snowpark/dataframe_reader.py index a094130ac8..065ae038d5 100644 --- a/src/snowflake/snowpark/dataframe_reader.py +++ b/src/snowflake/snowpark/dataframe_reader.py @@ -125,7 +125,6 @@ "TIMESTAMP": "timestamp", "TIMESTAMP_TYPE": "timestamp_type", "STREAM": "stream", - "VERSION": "version", } READER_OPTIONS_ALIAS_MAP = { @@ -165,12 +164,11 @@ def _extract_time_travel_from_options(options: dict) -> dict: - Cannot be mixed with regular 'timestamp' option (raises error) Special handling for 'SNAPSHOT-ID' / 'SNAPSHOT_ID' (Spark Iceberg - compatibility) — both aliases of the Snowpark ``version`` kwarg. - All three (``SNAPSHOT-ID``, ``SNAPSHOT_ID``, ``VERSION``): + compatibility) — both aliases map to the internal ``version`` time + travel parameter: - Automatically set time_travel_mode to 'at' (Iceberg snapshot ids only support ``AT(VERSION => N)``, not ``BEFORE``) - Cannot be used with time_travel_mode='before' (raises error) - - Cannot be mixed with each other (raises error) """ result = {} excluded_keys = set() @@ -192,19 +190,13 @@ def _extract_time_travel_from_options(options: dict) -> dict: result["timestamp"] = options["AS-OF-TIMESTAMP"] excluded_keys.add("TIMESTAMP") - # Handle Iceberg snapshot id (Spark `snapshot-id` / `snapshot_id` plus the - # Snowpark-native `version` option). All three aliases auto-set mode='at' - # since `AT(VERSION => N)` is the only valid Snowflake form. + # Handle Iceberg snapshot id (Spark ``snapshot-id`` / ``snapshot_id``). + # Auto-sets mode='at' since ``AT(VERSION => N)`` is the only valid form. snapshot_id_value = options.get("SNAPSHOT-ID") snapshot_id_source = "snapshot-id" if snapshot_id_value is None: snapshot_id_value = options.get("SNAPSHOT_ID") snapshot_id_source = "snapshot_id" - if snapshot_id_value is not None and "VERSION" in options: - raise ValueError("Cannot use both 'snapshot-id' and 'version' options.") - if snapshot_id_value is None and "VERSION" in options: - snapshot_id_value = options["VERSION"] - snapshot_id_source = "version" if snapshot_id_value is not None: if ( "TIME_TRAVEL_MODE" in options @@ -226,7 +218,6 @@ def _extract_time_travel_from_options(options: dict) -> dict: f"snapshot id, got {snapshot_id_value!r}." ) result["time_travel_mode"] = "at" - excluded_keys.add("VERSION") for option_key, param_name in _TIME_TRAVEL_OPTIONS_PARAMS_MAP.items(): if option_key in options and option_key not in excluded_keys: @@ -594,7 +585,7 @@ def table( timestamp: Optional[Union[str, datetime]] = None, timestamp_type: Optional[Union[str, TimestampTimeZone]] = None, stream: Optional[str] = None, - version: Optional[int] = None, + **kwargs, ) -> Table: """Returns a Table that points to the specified table. @@ -614,11 +605,6 @@ def table( timestamp_type: Type of timestamp interpretation ('NTZ', 'LTZ', or 'TZ'). Can also be set via ``option("timestamp_type", "LTZ")``. stream: Stream name for time travel. Can also be set via ``option("stream", "stream_name")``. - version: Iceberg snapshot id (64-bit integer) for snapshot-based time - travel on Iceberg tables. Can also be set via - ``option("snapshot-id", 5129038471029384756)`` (Spark Iceberg - naming) or ``option("version", 5129038471029384756)``. - Automatically sets time_travel_mode='at'. Note: Time travel options can be set either as direct parameters or via the @@ -628,9 +614,6 @@ def table( PySpark Compatibility: The ``as-of-timestamp`` option automatically sets ``time_travel_mode="at"`` and cannot be used with ``time_travel_mode="before"``. - Spark Iceberg Compatibility: The ``snapshot-id`` option automatically sets - ``time_travel_mode="at"`` for snapshot-id-based time travel. - Examples:: # Using direct parameters @@ -645,9 +628,6 @@ def table( # PySpark-style as-of-timestamp (automatically sets mode to "at") >>> table = session.read.option("as-of-timestamp", "2023-01-01 12:00:00").table("my_table") # doctest: +SKIP - # Spark Iceberg snapshot-id time travel (automatically sets mode to "at") - >>> table = session.read.option("snapshot-id", 5129038471029384756).table("my_iceberg_table") # doctest: +SKIP - # timestamp_type automatically set to "TZ" due to timezone info >>> import datetime, pytz # doctest: +SKIP >>> tz_aware = datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=pytz.UTC) # doctest: +SKIP @@ -662,12 +642,14 @@ def table( ... .option("offset", -60) # This will be IGNORED ... .table("my_table", time_travel_mode="at", offset=-3600)) # Only this is used """ - if version is not None or any( - self._cur_options.get(k) is not None - for k in ("VERSION", "SNAPSHOT-ID", "SNAPSHOT_ID") - ): - self._session._require_iceberg_features_enabled( - feature="`version=` / `snapshot-id` time travel" + # ``version`` (Iceberg snapshot id) is intentionally not in the public + # signature — it's consumed by Snowpark Connect and may be removed + # once a first-class API lands. Accept it through **kwargs so direct + # callers can still pass it without us advertising it. + version = kwargs.pop("version", None) + if kwargs: + raise TypeError( + f"table() got unexpected keyword arguments: {sorted(kwargs)}" ) # AST. @@ -689,11 +671,6 @@ def table( ast.timestamp_type.value = str(timestamp_type) if stream is not None: ast.stream.value = stream - # NOTE: ``version`` is intentionally NOT emitted to the AST. The - # ReadTable proto has no ``version`` field and the feature is - # parameter-protected (gated behind `iceberg_features_enabled`, - # consumed by Snowpark Connect only). When the proto is extended, - # restore a single ``ast.version.value = version`` line here. if time_travel_mode is not None or version is not None: # If version is provided without mode, default to 'at' (snapshot ids @@ -1240,17 +1217,11 @@ def option(self, key: str, value: Any, _emit_ast: bool = True) -> "DataFrameRead - ``timestamp``: Specific timestamp for time travel - ``timestamp_type``: Type of timestamp interpretation ('NTZ', 'LTZ', or 'TZ'). - ``stream``: Stream name for time travel. - - ``version``: Iceberg snapshot id (64-bit integer) for snapshot-based time travel. Special PySpark compatibility option: - ``as-of-timestamp``: Automatically sets ``time_travel_mode`` to "at" and uses the provided timestamp. Cannot be used with ``time_travel_mode="before"``. - Special Spark Iceberg compatibility option: - - ``snapshot-id`` (or ``snapshot_id``): Aliased to ``version``. Automatically sets - ``time_travel_mode`` to "at" and reads the table at the specified Iceberg snapshot - id. Cannot be used with ``time_travel_mode="before"``. - Args: key: Name of the option (e.g. ``compression``, ``skip_header``, ``time_travel_mode``, etc.). value: Value of the option. diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index ddecbd93a0..c2ebffbe91 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -719,14 +719,6 @@ def __init__( # (SAS / snowpark-connect enables this during session configuration.) self._use_structured_type_infer_schema: bool = False - # Umbrella flag for Iceberg-specific Snowpark features that are not - # yet exposed by default to all Snowpark users. Currently gates the - # ``version=`` / ``snapshot-id`` time-travel kwargs on - # ``Session.table`` / ``DataFrameReader.table`` / ``Table``. - # Defaults to ``False``; SAS / snowpark-connect enables it per - # session when the corresponding SCOS config is set. - self._iceberg_features_enabled: bool = False - self._large_query_breakdown_enabled: bool = self.is_feature_enabled_for_version( _PYTHON_SNOWPARK_USE_LARGE_QUERY_BREAKDOWN_OPTIMIZATION_VERSION ) @@ -1058,44 +1050,6 @@ def cte_optimization_enabled(self) -> bool: """ return self._cte_optimization_enabled - @property - def iceberg_features_enabled(self) -> bool: - """Set to ``True`` to enable Iceberg-specific Snowpark features - (defaults to ``False``). - - Currently this gates the ``version=`` kwarg on :meth:`Session.table`, - :meth:`DataFrameReader.table`, and :class:`Table`, and the Spark - Iceberg ``snapshot-id`` / ``snapshot_id`` reader options. With the - flag off, supplying any of these raises ``SnowparkClientException``. - - Snowpark Connect (SAS) enables this flag per-session via the - ``snowpark.connect.iceberg.timeTravel.enabled`` SCOS config. - Direct Snowpark users must opt in explicitly because the underlying - Snowflake feature (``AT(VERSION => N)`` on unmanaged Iceberg tables) - is currently gated behind the ``FEATURE_ICEBERG_TIME_TRAVEL`` server - parameter. - """ - return self._iceberg_features_enabled - - @iceberg_features_enabled.setter - def iceberg_features_enabled(self, value: bool) -> None: - warn_session_config_update_in_multithreaded_mode("iceberg_features_enabled") - with self._lock: - self._iceberg_features_enabled = bool(value) - - def _require_iceberg_features_enabled(self, *, feature: str) -> None: - """Raise if Iceberg features are gated off on this session. - - ``feature`` is a short human-readable label of the API surface - being gated, used in the error message. - """ - if not self._iceberg_features_enabled: - raise SnowparkClientException( - f"{feature} is an Iceberg-only feature and is disabled by " - "default. Enable it by setting " - "`session.iceberg_features_enabled = True`." - ) - @property def eliminate_numeric_sql_value_cast_enabled(self) -> bool: return self._eliminate_numeric_sql_value_cast_enabled @@ -2774,7 +2728,7 @@ def table( timestamp: Optional[Union[str, datetime.datetime]] = None, timestamp_type: Optional[Union[str, TimestampTimeZone]] = None, stream: Optional[str] = None, - version: Optional[int] = None, + **kwargs, ) -> Table: """ Returns a Table that points the specified table. @@ -2786,16 +2740,12 @@ def table( _emit_ast: Whether to emit AST statements. time_travel_mode: Time travel mode, either 'at' or 'before'. - Exactly one of statement, offset, timestamp, stream, or version must be provided when time_travel_mode is set. + Exactly one of statement, offset, timestamp, or stream must be provided when time_travel_mode is set. statement: Query ID for time travel. offset: Negative integer representing seconds in the past for time travel. timestamp: Timestamp string or datetime object. timestamp_type: Type of timestamp interpretation ('NTZ', 'LTZ', or 'TZ'). stream: Stream name for time travel. - version: Iceberg snapshot id (64-bit integer) for snapshot-based time - travel on Iceberg tables. Can only be used with - ``time_travel_mode='at'``. Generates SQL clause like - ``AT(VERSION => 5129038471029384756)``. Note: If your table name contains special characters, use double quotes to mark it like this, ``session.table('"my table"')``. @@ -2817,7 +2767,6 @@ def table( >>> df_before = session.table("my_table", time_travel_mode="before", statement="01234567-abcd-1234-5678-123456789012") # doctest: +SKIP >>> df_offset = session.table("my_table", time_travel_mode="at", offset=-3600) # doctest: +SKIP >>> df_stream = session.table("my_table", time_travel_mode="at", stream="my_stream") # doctest: +SKIP - >>> df_iceberg_version = session.table("my_iceberg_table", time_travel_mode="at", version=5129038471029384756) # doctest: +SKIP # timestamp_type automatically set to "TZ" due to timezone info >>> import datetime, pytz # doctest: +SKIP @@ -2827,9 +2776,14 @@ def table( # timestamp_type remains "NTZ" (user's explicit choice respected) >>> table2 = session.read.table("my_table", time_travel_mode="at", timestamp=tz_aware, timestamp_type="NTZ") # doctest: +SKIP """ - if version is not None: - self._require_iceberg_features_enabled( - feature="`version=` snapshot-id time travel" + # ``version`` (Iceberg snapshot id) is intentionally not in the public + # signature — it's consumed by Snowpark Connect and may be removed + # once a first-class API lands. Accept it through **kwargs so direct + # callers can still pass it without us advertising it. + version = kwargs.pop("version", None) + if kwargs: + raise TypeError( + f"table() got unexpected keyword arguments: {sorted(kwargs)}" ) if _emit_ast: @@ -2850,11 +2804,6 @@ def table( ast.timestamp_type.value = str(timestamp_type) if stream is not None: ast.stream.value = stream - # NOTE: ``version`` is intentionally NOT emitted to the AST. The - # Table proto has no ``version`` field and the feature is - # parameter-protected (gated behind `iceberg_features_enabled`, - # consumed by Snowpark Connect only). When the proto is extended, - # restore a single ``ast.version.value = version`` line here. else: stmt = None diff --git a/src/snowflake/snowpark/table.py b/src/snowflake/snowpark/table.py index c811e77442..53508e64e5 100644 --- a/src/snowflake/snowpark/table.py +++ b/src/snowflake/snowpark/table.py @@ -301,11 +301,16 @@ def __init__( timestamp: Optional[Union[str, datetime.datetime]] = None, timestamp_type: Optional[Union[str, TimestampTimeZone]] = None, stream: Optional[str] = None, - version: Optional[int] = None, + **kwargs, ) -> None: - if version is not None and session is not None: - session._require_iceberg_features_enabled( - feature="`version=` snapshot-id time travel" + # ``version`` (Iceberg snapshot id) is intentionally not in the public + # signature — it's consumed by Snowpark Connect and may be removed + # once a first-class API lands. Accept it through **kwargs so direct + # callers can still pass it without us advertising it. + version = kwargs.pop("version", None) + if kwargs: + raise TypeError( + f"Table() got unexpected keyword arguments: {sorted(kwargs)}" ) if _ast_stmt is None and session is not None and _emit_ast: @@ -326,11 +331,6 @@ def __init__( ast.timestamp_type.value = str(timestamp_type) if stream is not None: ast.stream.value = stream - # NOTE: ``version`` is intentionally NOT emitted to the AST. The - # Table proto has no ``version`` field and the feature is - # parameter-protected (gated behind `iceberg_features_enabled`, - # consumed by Snowpark Connect only). When the proto is extended, - # restore a single ``ast.version.value = version`` line here. time_travel_config = TimeTravelConfig.validate_and_normalize_params( time_travel_mode=time_travel_mode, diff --git a/tests/integ/test_dataframe.py b/tests/integ/test_dataframe.py index 4bfe5f6583..cf3eed77ae 100644 --- a/tests/integ/test_dataframe.py +++ b/tests/integ/test_dataframe.py @@ -8355,7 +8355,6 @@ def test_time_travel_comprehensive_coverage(session): def test_iceberg_snapshot_id_time_travel_session_table(session): """End-to-end: ``Session.table(..., version=)`` returns the table state at the requested Iceberg snapshot.""" - session.iceberg_features_enabled = True table_fqn = "CLDUNITY.scosschema.snapshot_demo" snapshot_ids = [ @@ -8386,7 +8385,6 @@ def test_iceberg_snapshot_id_time_travel_dataframe_reader_option(session): """End-to-end: ``session.read.option('snapshot-id', N).table(...)`` routes through the Spark Iceberg-compat alias and produces the same result as the explicit ``version=`` kwarg.""" - session.iceberg_features_enabled = True table_fqn = "CLDUNITY.scosschema.snapshot_demo" snapshot_id = session.sql( @@ -8402,29 +8400,3 @@ def test_iceberg_snapshot_id_time_travel_dataframe_reader_option(session): session.read.option("snapshot-id", snapshot_id).table(table_fqn).collect() ) assert via_kwarg == via_option - - -def test_iceberg_snapshot_id_flag_gates_version_kwarg(session): - """The umbrella flag-off path raises client-side, before any SQL is - emitted — so this test needs neither a CLD nor an Iceberg table; any - session will do. - """ - from snowflake.snowpark.exceptions import SnowparkClientException - - original = session.iceberg_features_enabled - try: - session.iceberg_features_enabled = False - # Session.table path - with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): - session.table("ANY_TABLE", time_travel_mode="at", version=1) - # DataFrameReader.table path via the ``version=`` kwarg - with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): - session.read.table("ANY_TABLE", time_travel_mode="at", version=1) - # DataFrameReader.table path via the ``option("snapshot-id", ...)`` alias - with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): - session.read.option("snapshot-id", 1).table("ANY_TABLE") - # DataFrameReader.table path via the ``option("version", ...)`` alias - with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): - session.read.option("version", 1).table("ANY_TABLE") - finally: - session.iceberg_features_enabled = original diff --git a/tests/unit/test_iceberg_features_flag.py b/tests/unit/test_iceberg_features_flag.py deleted file mode 100644 index ea89f9738e..0000000000 --- a/tests/unit/test_iceberg_features_flag.py +++ /dev/null @@ -1,140 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved. -# - -# -# Copyright (c) 2012-2026 Snowflake Computing Inc. All rights reserved. -# -"""Unit tests for the `Session.iceberg_features_enabled` umbrella flag and -the `version=` / `snapshot-id` time-travel gates it controls. - -These tests use mocked connections so they don't require a live Snowflake -account. End-to-end validation against an unmanaged Iceberg table lives in -`tests/integ/test_dataframe.py::test_iceberg_snapshot_id_time_travel*` -(skipped by default — see TODO there). -""" - -from unittest import mock - -import pytest - -import snowflake.snowpark.session -from snowflake.snowpark._internal.server_connection import ServerConnection -from snowflake.snowpark.exceptions import SnowparkClientException -from snowflake.snowpark.table import Table - - -def _build_session(*, iceberg_features_enabled=False): - """Create a Session bound to an autospec'd ServerConnection. - - Mirrors the pattern in `tests/unit/test_dataframe.py::test_table_source_plan`. - The session has a real `_ast_batch`, so AST emission code paths execute - against real protos. - """ - mock_connection = mock.create_autospec(ServerConnection) - mock_connection._conn = mock.MagicMock() - mock_connection._thread_safe_session_enabled = True - session = snowflake.snowpark.session.Session(mock_connection) - session.iceberg_features_enabled = iceberg_features_enabled - return session - - -# --------------------------------------------------------------------------- -# Flag plumbing: default, setter, public property. -# --------------------------------------------------------------------------- -def test_iceberg_features_enabled_default_is_false(): - """Default must be ``False`` so existing Snowpark users see no behavior - change. SCOS / snowpark-connect opt in per-session.""" - session = _build_session() - assert session.iceberg_features_enabled is False - assert session._iceberg_features_enabled is False - - -def test_iceberg_features_enabled_setter_round_trip(): - """Public setter accepts truthy/falsy values and coerces to bool.""" - session = _build_session() - - session.iceberg_features_enabled = True - assert session.iceberg_features_enabled is True - - session.iceberg_features_enabled = False - assert session.iceberg_features_enabled is False - - # Truthy non-bool is coerced. - session.iceberg_features_enabled = 1 - assert session.iceberg_features_enabled is True - session.iceberg_features_enabled = 0 - assert session.iceberg_features_enabled is False - - -def test_require_iceberg_features_enabled_raises_when_off(): - """Helper raises a clear SnowparkClientException pointing to the flag.""" - session = _build_session(iceberg_features_enabled=False) - with pytest.raises(SnowparkClientException) as exc_info: - session._require_iceberg_features_enabled(feature="X") - assert "iceberg_features_enabled" in str(exc_info.value) - assert "X" in str(exc_info.value) - - -def test_require_iceberg_features_enabled_noop_when_on(): - """Helper returns without raising when the flag is on.""" - session = _build_session(iceberg_features_enabled=True) - session._require_iceberg_features_enabled(feature="X") # no raise - - -# --------------------------------------------------------------------------- -# Session.table gate. -# --------------------------------------------------------------------------- -def test_session_table_rejects_version_when_flag_off(): - """`Session.table(name, version=N)` must raise with the flag off.""" - session = _build_session(iceberg_features_enabled=False) - with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): - session.table("MY_TABLE", time_travel_mode="at", version=12345) - - -def test_session_table_does_not_raise_without_version(): - """Plain `session.table("...")` still works with the flag off.""" - session = _build_session(iceberg_features_enabled=False) - # Not asserting on the returned Table internals here — just that the - # gate doesn't fire when ``version`` is not supplied. - t = session.table("MY_TABLE") - assert isinstance(t, Table) - - -# --------------------------------------------------------------------------- -# DataFrameReader.table gate (covers both ``version=`` kwarg and the Spark -# Iceberg ``snapshot-id`` reader option path). -# --------------------------------------------------------------------------- -def test_dataframe_reader_table_rejects_version_kwarg_when_flag_off(): - session = _build_session(iceberg_features_enabled=False) - with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): - session.read.table("MY_TABLE", time_travel_mode="at", version=1) - - -@pytest.mark.parametrize("opt_key", ["snapshot-id", "snapshot_id", "version"]) -def test_dataframe_reader_table_rejects_snapshot_id_option_when_flag_off(opt_key): - """Every reader option that resolves to ``version=`` must be gated — - ``snapshot-id`` / ``snapshot_id`` (Spark Iceberg compat) AND the - Snowpark-native ``version`` key. Otherwise a user could bypass the - flag through ``.option(...)``.""" - session = _build_session(iceberg_features_enabled=False) - with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): - session.read.option(opt_key, 12345).table("MY_TABLE") - - -# --------------------------------------------------------------------------- -# Table.__init__ gate (direct construction). -# --------------------------------------------------------------------------- -def test_table_init_rejects_version_when_flag_off(): - session = _build_session(iceberg_features_enabled=False) - with pytest.raises(SnowparkClientException, match="iceberg_features_enabled"): - Table("MY_TABLE", session=session, time_travel_mode="at", version=1) - - -# NOTE: AST coverage for ``version`` was intentionally removed. The Snowpark -# AST proto's Table / ReadTable messages don't carry a ``version`` field and -# this feature is parameter-protected (gated behind ``iceberg_features_enabled`` -# and consumed only by Snowpark Connect), so the AST emission is left out by -# design. When the proto is extended, restore one assignment line per call -# site and add coverage tests then. diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 54aaf18ccd..eac924ff48 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -951,9 +951,10 @@ def test_time_travel_version_snapshot_id(): def test_extract_time_travel_snapshot_id_option(): """Test Iceberg snapshot id option extraction for the reader API. - All three aliases (``snapshot-id`` / ``snapshot_id`` / ``version``) - share the same auto-mode semantics so the docstring claim - ``Automatically sets time_travel_mode='at'`` holds for every path. + The Spark-compat aliases ``snapshot-id`` / ``snapshot_id`` map to the + internal ``version`` time-travel parameter and auto-set + ``time_travel_mode='at'`` (``AT(VERSION => N)`` is the only valid form + for Iceberg snapshot id time travel). """ from snowflake.snowpark.dataframe_reader import _extract_time_travel_from_options @@ -972,16 +973,6 @@ def test_extract_time_travel_snapshot_id_option(): result = _extract_time_travel_from_options({"SNAPSHOT-ID": "10963874102873"}) assert result == {"time_travel_mode": "at", "version": 10963874102873} - # ``option("version", N)`` (Snowpark-native key) also auto-sets mode='at' - result = _extract_time_travel_from_options({"VERSION": 99}) - assert result == {"time_travel_mode": "at", "version": 99} - - # ``option("version", N)`` explicit ``at`` is a no-op overlap - result = _extract_time_travel_from_options( - {"VERSION": 99, "TIME_TRAVEL_MODE": "at"} - ) - assert result == {"time_travel_mode": "at", "version": 99} - # snapshot-id + time_travel_mode='before' is rejected with pytest.raises( ValueError, @@ -991,28 +982,8 @@ def test_extract_time_travel_snapshot_id_option(): {"SNAPSHOT-ID": 1, "TIME_TRAVEL_MODE": "before"} ) - # ``version`` + ``before`` is rejected too — Iceberg snapshot ids only - # support AT(VERSION => N), not BEFORE. - with pytest.raises( - ValueError, - match=r"Cannot use 'version' option with time_travel_mode='before'", - ): - _extract_time_travel_from_options({"VERSION": 1, "TIME_TRAVEL_MODE": "before"}) - - # snapshot-id + explicit version conflict - with pytest.raises( - ValueError, match="Cannot use both 'snapshot-id' and 'version' options." - ): - _extract_time_travel_from_options({"SNAPSHOT-ID": 1, "VERSION": 2}) - # Non-numeric snapshot-id is rejected with pytest.raises( ValueError, match="'snapshot-id' must be a 64-bit integer Iceberg snapshot id" ): _extract_time_travel_from_options({"SNAPSHOT-ID": "not-a-number"}) - - # Non-numeric ``version`` is rejected through the same code path - with pytest.raises( - ValueError, match="'version' must be a 64-bit integer Iceberg snapshot id" - ): - _extract_time_travel_from_options({"VERSION": "not-a-number"})