diff --git a/packages/bigframes/bigframes/core/blocks.py b/packages/bigframes/bigframes/core/blocks.py index b9a246fc0360..01ba73458993 100644 --- a/packages/bigframes/bigframes/core/blocks.py +++ b/packages/bigframes/bigframes/core/blocks.py @@ -696,6 +696,7 @@ def to_pandas_batches( page_size: Optional[int] = None, max_results: Optional[int] = None, allow_large_results: Optional[bool] = None, + cell_execution_count: Optional[int] = None, ) -> PandasBatches: """Download results one message at a time. @@ -713,6 +714,7 @@ def to_pandas_batches( execution_spec.ExecutionSpec( promise_under_10gb=under_10gb, ordered=True, + cell_execution_count=cell_execution_count, ), ) result_batches = execution_result.batches() diff --git a/packages/bigframes/bigframes/core/events.py b/packages/bigframes/bigframes/core/events.py index 61831f4cc399..db81eba06652 100644 --- a/packages/bigframes/bigframes/core/events.py +++ b/packages/bigframes/bigframes/core/events.py @@ -20,7 +20,7 @@ import datetime import threading import uuid -from typing import Any, Callable, Literal, Set +from typing import Any, Callable, Literal, Optional, Set import google.cloud.bigquery._job_helpers import google.cloud.bigquery.job.query @@ -129,6 +129,7 @@ class Event: class EventEnvelope: event: Event progress_bar: ProgressBarType = _DEFAULT + cell_execution_count: Optional[int] = None @dataclasses.dataclass(frozen=True) diff --git a/packages/bigframes/bigframes/core/global_session.py b/packages/bigframes/bigframes/core/global_session.py index 6ffb37ac5acf..fa2b7b2e2edb 100644 --- a/packages/bigframes/bigframes/core/global_session.py +++ b/packages/bigframes/bigframes/core/global_session.py @@ -19,7 +19,7 @@ import threading import traceback import warnings -from typing import TYPE_CHECKING, Callable, Optional, TypeVar +from typing import TYPE_CHECKING, Callable, Iterable, Optional, TypeVar import google.auth.exceptions @@ -124,12 +124,22 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T: return func_(get_global_session(), *args, **kwargs) -def execution_history() -> "bigframes.session._ExecutionHistory": +def execution_history( + *, + events: Optional[Iterable[bigframes.core.events.Event]] = None, + job_ids: Optional[Iterable[str]] = None, + all_cells: bool = True, +) -> "bigframes.session._ExecutionHistory": import pandas # noqa: F401 import bigframes.session - return with_default_session(bigframes.session.Session.execution_history) + return with_default_session( + bigframes.session.Session.execution_history, + events=events, + job_ids=job_ids, + all_cells=all_cells, + ) class _GlobalSessionContext: diff --git a/packages/bigframes/bigframes/core/utils.py b/packages/bigframes/bigframes/core/utils.py index b219335a516e..2250739a89c9 100644 --- a/packages/bigframes/bigframes/core/utils.py +++ b/packages/bigframes/bigframes/core/utils.py @@ -249,3 +249,16 @@ def timedelta_to_micros( ) * 1_000_000 + timedelta.microseconds raise TypeError(f"Unrecognized input type: {type(timedelta)}") + + +def get_ipython_execution_count() -> typing.Optional[int]: + """Returns the current IPython cell execution count if running in a notebook, else None.""" + try: + import IPython + + ipy = IPython.get_ipython() + if ipy is not None and hasattr(ipy, "execution_count"): + return ipy.execution_count + except (ImportError, NameError): + pass + return None diff --git a/packages/bigframes/bigframes/dataframe.py b/packages/bigframes/bigframes/dataframe.py index d7755517293e..1d6a6bebd692 100644 --- a/packages/bigframes/bigframes/dataframe.py +++ b/packages/bigframes/bigframes/dataframe.py @@ -1755,6 +1755,7 @@ def to_pandas_batches( max_results: Optional[int] = None, *, allow_large_results: Optional[bool] = None, + cell_execution_count: Optional[int] = None, ) -> blocks.PandasBatches: """Stream DataFrame results to an iterable of pandas DataFrame. @@ -1807,6 +1808,7 @@ def to_pandas_batches( page_size=page_size, max_results=max_results, allow_large_results=allow_large_results, + cell_execution_count=cell_execution_count, ) def _to_pandas_batches( @@ -1815,11 +1817,13 @@ def _to_pandas_batches( max_results: Optional[int] = None, *, allow_large_results: Optional[bool] = None, + cell_execution_count: Optional[int] = None, ) -> blocks.PandasBatches: return self._block.to_pandas_batches( page_size=page_size, max_results=max_results, allow_large_results=allow_large_results, + cell_execution_count=cell_execution_count, ) def _compute_dry_run(self) -> google.cloud.bigquery.job.QueryJob: diff --git a/packages/bigframes/bigframes/display/anywidget.py b/packages/bigframes/bigframes/display/anywidget.py index 90d285d1b0d7..08b19d820173 100644 --- a/packages/bigframes/bigframes/display/anywidget.py +++ b/packages/bigframes/bigframes/display/anywidget.py @@ -92,6 +92,10 @@ def __init__(self, dataframe: bigframes.dataframe.DataFrame): self._dataframe = dataframe + from bigframes.core.utils import get_ipython_execution_count + + self._cell_execution_count = get_ipython_execution_count() + super().__init__() # Initialize attributes that might be needed by observers first @@ -286,7 +290,10 @@ def _reset_batch_cache(self) -> None: def _reset_batches_for_new_page_size(self) -> None: """Reset the batch iterator when page size changes.""" with bigframes.option_context("display.progress_bar", None): - self._batches = self._dataframe.to_pandas_batches(page_size=self.page_size) + self._batches = self._dataframe.to_pandas_batches( + page_size=self.page_size, + cell_execution_count=self._cell_execution_count, + ) self._reset_batch_cache() @@ -318,7 +325,8 @@ def _set_table_html(self) -> None: current_sort_state = _SortState(tuple(sort_columns), tuple(sort_ascending)) if self._last_sort_state != current_sort_state: self._batches = df_to_display.to_pandas_batches( - page_size=self.page_size + page_size=self.page_size, + cell_execution_count=self._cell_execution_count, ) self._reset_batch_cache() self._last_sort_state = current_sort_state diff --git a/packages/bigframes/bigframes/pandas/io/api.py b/packages/bigframes/bigframes/pandas/io/api.py index 6c83095ab3cd..75e6b6bef08d 100644 --- a/packages/bigframes/bigframes/pandas/io/api.py +++ b/packages/bigframes/bigframes/pandas/io/api.py @@ -300,6 +300,7 @@ def _try_read_gbq_colab_sessionless_dry_run( def _read_gbq_colab( # type: ignore[overload-overlap] query_or_table: str, *, + callback: Optional[Callable[[bigframes.core.events.EventEnvelope], None]] = ..., pyformat_args: Optional[Dict[str, Any]] = ..., dry_run: Literal[False] = ..., ) -> bigframes.dataframe.DataFrame: ... @@ -309,6 +310,7 @@ def _read_gbq_colab( # type: ignore[overload-overlap] def _read_gbq_colab( query_or_table: str, *, + callback: Optional[Callable[[bigframes.core.events.EventEnvelope], None]] = ..., pyformat_args: Optional[Dict[str, Any]] = ..., dry_run: Literal[True] = ..., ) -> pandas.Series: ... @@ -317,6 +319,7 @@ def _read_gbq_colab( def _read_gbq_colab( query_or_table: str, *, + callback: Optional[Callable[[bigframes.core.events.EventEnvelope], None]] = None, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: bool = False, ) -> bigframes.dataframe.DataFrame | pandas.Series: @@ -328,6 +331,8 @@ def _read_gbq_colab( Args: query_or_table (str): SQL query or table ID (table ID not yet supported). + callback (Optional[Callable[[bigframes.core.events.EventEnvelope], None]]): + Callback to receive query execution events. pyformat_args (Optional[Dict[str, Any]]): Parameters to format into the query string. dry_run (bool): @@ -379,6 +384,7 @@ def _read_gbq_colab( return global_session.with_default_session( bigframes.session.Session._read_gbq_colab, query_or_table, + callback=callback, pyformat_args=pyformat_args, dry_run=dry_run, ) diff --git a/packages/bigframes/bigframes/series.py b/packages/bigframes/bigframes/series.py index 1065744f1716..0a66f44516df 100644 --- a/packages/bigframes/bigframes/series.py +++ b/packages/bigframes/bigframes/series.py @@ -759,6 +759,7 @@ def to_pandas_batches( max_results: Optional[int] = None, *, allow_large_results: Optional[bool] = None, + cell_execution_count: Optional[int] = None, ) -> Iterable[pandas.Series]: """Stream Series results to an iterable of pandas Series. @@ -811,6 +812,7 @@ def to_pandas_batches( page_size=page_size, max_results=max_results, allow_large_results=allow_large_results, + cell_execution_count=cell_execution_count, ) return map(lambda df: cast(pandas.Series, df.squeeze(1)), batches) diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index 38e92a60321b..e8b5f4faf248 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -113,6 +113,18 @@ class _ExecutionHistory: def __init__(self, jobs: list[dict]): self._df = pandas.DataFrame(jobs) + if self._df.empty: + self._df = pandas.DataFrame( + columns=[ + "job_id", + "query_id", + "job_type", + "status", + "query", + "total_bytes_processed", + "job_url", + ] + ) def to_dataframe(self) -> pandas.DataFrame: """Returns the execution history as a pandas DataFrame.""" @@ -199,9 +211,10 @@ def __init__( self._clients_provider = clients_provider self._location = context.location or "US" else: - credentials, project = ( - bigframes._config.auth.resolve_credentials_and_project(context) - ) + ( + credentials, + project, + ) = bigframes._config.auth.resolve_credentials_and_project(context) if context.location is None: with bigquery.Client( project=project, @@ -430,12 +443,83 @@ def slot_millis_sum(self): """The sum of all slot time used by bigquery jobs in this session.""" return self._metrics.slot_millis - def execution_history(self) -> _ExecutionHistory: + def execution_history( + self, + *, + events: Optional[Iterable[bigframes.core.events.Event]] = None, + job_ids: Optional[Iterable[str]] = None, + all_cells: bool = True, + ) -> _ExecutionHistory: """Returns the history of executions initiated by BigFrames in the current session. Use `.to_dataframe()` on the result to get a pandas DataFrame. + + Args: + events (Iterable[Event], optional): + Filter execution history to only include jobs associated with the given events. + job_ids (Iterable[str], optional): + Filter execution history to only include jobs matching the given job IDs. + all_cells (bool, optional): + If True, do not filter execution history by notebook cell. If False, + and running in Colab/Jupyter, automatically filter history to only include + jobs executed within the current cell. Defaults to True. """ - return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs]) + jobs = [job.__dict__ for job in self._metrics.jobs] + + if events is not None: + event_job_ids = { + getattr(event, "job_id", None) + for event in events + if getattr(event, "job_id", None) is not None + } + event_query_ids = { + getattr(event, "query_id", None) + for event in events + if getattr(event, "query_id", None) is not None + } + jobs = [ + job + for job in jobs + if ( + job.get("job_id") is not None and job.get("job_id") in event_job_ids + ) + or ( + job.get("query_id") is not None + and job.get("query_id") in event_query_ids + ) + ] + + elif job_ids is not None: + target_job_ids = set(job_ids) + jobs = [ + job + for job in jobs + if ( + job.get("job_id") is not None + and job.get("job_id") in target_job_ids + ) + or ( + job.get("query_id") is not None + and job.get("query_id") in target_job_ids + ) + ] + + elif not all_cells: + try: + import IPython + + ipy = IPython.get_ipython() + if ipy is not None and hasattr(ipy, "execution_count"): + current_count = ipy.execution_count + jobs = [ + job + for job in jobs + if job.get("cell_execution_count") == current_count + ] + except (ImportError, NameError): + pass + + return _ExecutionHistory(jobs) @property def _allows_ambiguity(self) -> bool: @@ -584,6 +668,7 @@ def _read_gbq_colab( self, query: str, *, + callback: Optional[Callable[[bigframes.core.events.EventEnvelope], None]] = ..., pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[False] = ..., ) -> dataframe.DataFrame: ... @@ -593,6 +678,7 @@ def _read_gbq_colab( self, query: str, *, + callback: Optional[Callable[[bigframes.core.events.EventEnvelope], None]] = ..., pyformat_args: Optional[Dict[str, Any]] = None, dry_run: Literal[True] = ..., ) -> pandas.Series: ... @@ -601,8 +687,10 @@ def _read_gbq_colab( def _read_gbq_colab( self, query: str, - # TODO: Add a callback parameter that takes some kind of Event object. *, + callback: Optional[ + Callable[[bigframes.core.events.EventEnvelope], None] + ] = None, pyformat_args: Optional[Dict[str, Any]] = None, dry_run: bool = False, ) -> Union[dataframe.DataFrame, pandas.Series]: @@ -615,6 +703,8 @@ def _read_gbq_colab( query (str): A SQL query string to execute. Results (if any) are turned into a DataFrame. + callback (Optional[Callable[[bigframes.core.events.EventEnvelope], None]]): + Callback to receive query execution events. pyformat_args (dict): A dictionary of potential variables to replace in ``query``. Note: strings are _not_ escaped. Use query parameters for these, @@ -634,13 +724,19 @@ def _read_gbq_colab( dry_run=dry_run, ) - return self._loader.read_gbq_query( - query=query, - index_col=bigframes.enums.DefaultIndexKind.NULL, - force_total_order=False, - dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run), - allow_large_results=allow_large_results, - ) + def _run_query(): + return self._loader.read_gbq_query( + query=query, + index_col=bigframes.enums.DefaultIndexKind.NULL, + force_total_order=False, + dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run), + allow_large_results=allow_large_results, + ) + + if callback is not None: + with self._publisher.subscribe(callback): + return _run_query() + return _run_query() @overload def read_gbq_query( # type: ignore[overload-overlap] diff --git a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py index 17534e59273d..ac952397087f 100644 --- a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py +++ b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py @@ -64,6 +64,8 @@ def create_job_configs_labels( ) -> Dict[str, str]: if job_configs_labels is None: job_configs_labels = {} + else: + job_configs_labels = dict(job_configs_labels) if api_methods and "bigframes-api" not in job_configs_labels: job_configs_labels["bigframes-api"] = api_methods[0] @@ -261,7 +263,7 @@ def add_and_trim_labels( ) -def create_bq_event_callback(publisher): +def create_bq_event_callback(publisher, cell_execution_count=None): event_map = { google.cloud.bigquery._job_helpers.QueryFinishedEvent: ( bigframes.core.events.BigQueryFinishedEvent @@ -284,7 +286,9 @@ def publish_bq_event(event): bf_event = bf_type.from_bqclient(event) # type: ignore break envelope = bigframes.core.events.EventEnvelope( - event=bf_event, progress_bar=bigframes.core.events._DEFAULT + event=bf_event, + progress_bar=bigframes.core.events._DEFAULT, + cell_execution_count=cell_execution_count, ) publisher.publish(envelope) @@ -307,10 +311,16 @@ def start_query_with_job( job_retry: google.api_core.retry.Retry = (third_party_gcb_retry.DEFAULT_JOB_RETRY), # noqa: E501 publisher: bigframes.core.events.Publisher, session=None, + cell_execution_count: Optional[int] = None, ) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: """ Starts query job and waits for results. """ + if cell_execution_count is None: + from bigframes.core.utils import get_ipython_execution_count + + cell_execution_count = get_ipython_execution_count() + # Note: Ensure no additional labels are added to job_config after this # point, as `add_and_trim_labels` ensures the label count does not # exceed MAX_LABELS_COUNT. @@ -337,6 +347,7 @@ def start_query_with_job( sql=sql, publisher=publisher, metrics=metrics, + cell_execution_count=cell_execution_count, ) return results_iterator, query_job @@ -357,6 +368,7 @@ def start_query_job_optional( job_retry: google.api_core.retry.Retry = (third_party_gcb_retry.DEFAULT_JOB_RETRY), # noqa: E501 publisher: bigframes.core.events.Publisher, session=None, + cell_execution_count: Optional[int] = None, ) -> google.cloud.bigquery.table.RowIterator: """ Run a bigquery query, with job optional. @@ -364,6 +376,11 @@ def start_query_job_optional( See: https://docs.cloud.google.com/bigquery/docs/running-queries#optional-job-creation """ + if cell_execution_count is None: + from bigframes.core.utils import get_ipython_execution_count + + cell_execution_count = get_ipython_execution_count() + add_and_trim_labels(job_config, session=session) try: results_iterator = bq_client._query_and_wait_bigframes( @@ -373,10 +390,14 @@ def start_query_job_optional( project=project, api_timeout=timeout, job_retry=job_retry, - callback=create_bq_event_callback(publisher), + callback=create_bq_event_callback( + publisher, cell_execution_count=cell_execution_count + ), ) if metrics is not None: - metrics.count_job_stats(row_iterator=results_iterator) + metrics.count_job_stats( + row_iterator=results_iterator, cell_execution_count=cell_execution_count + ) return results_iterator except google.api_core.exceptions.Forbidden as ex: if "Drive credentials" in ex.message: @@ -390,35 +411,45 @@ def _publish_events( total_rows: Optional[int], publisher: bigframes.core.events.Publisher, metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, + cell_execution_count: Optional[int] = None, ): if not query_job.configuration.dry_run: publisher.publish( - bigframes.core.events.BigQuerySentEvent( - sql, - billing_project=query_job.project, - location=query_job.location, - job_id=query_job.job_id, - request_id=None, + bigframes.core.events.EventEnvelope( + event=bigframes.core.events.BigQuerySentEvent( + sql, + billing_project=query_job.project, + location=query_job.location, + job_id=query_job.job_id, + request_id=None, + ), + cell_execution_count=cell_execution_count, ) ) if not query_job.configuration.dry_run: publisher.publish( - bigframes.core.events.BigQueryFinishedEvent( - billing_project=query_job.project, - location=query_job.location, - job_id=query_job.job_id, - destination=query_job.destination, - total_rows=total_rows, - total_bytes_processed=query_job.total_bytes_processed, - slot_millis=query_job.slot_millis, - created=query_job.created, - started=query_job.started, - ended=query_job.ended, + bigframes.core.events.EventEnvelope( + event=bigframes.core.events.BigQueryFinishedEvent( + billing_project=query_job.project, + location=query_job.location, + query_id=query_job.query_id, + job_id=query_job.job_id, + destination=query_job.destination, + total_rows=total_rows, + total_bytes_processed=query_job.total_bytes_processed, + slot_millis=query_job.slot_millis, + created=query_job.created, + started=query_job.started, + ended=query_job.ended, + ), + cell_execution_count=cell_execution_count, ) ) if metrics is not None: - metrics.count_job_stats(query_job=query_job) + metrics.count_job_stats( + query_job=query_job, cell_execution_count=cell_execution_count + ) def delete_tables_matching_session_id( diff --git a/packages/bigframes/bigframes/session/bigquery_session.py b/packages/bigframes/bigframes/session/bigquery_session.py index a39c6136876d..18f8cdeaff49 100644 --- a/packages/bigframes/bigframes/session/bigquery_session.py +++ b/packages/bigframes/bigframes/session/bigquery_session.py @@ -122,7 +122,7 @@ def close(self): # Assume this is being called in the user thread, so we can access # this thread-local config. job_config=bigquery.QueryJobConfig( - labels=bigframes.options.compute.extra_query_labels + labels=dict(bigframes.options.compute.extra_query_labels) ), location=self.location, project=None, diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index 9948480d5cac..0b9c5f804741 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -208,8 +208,9 @@ async def _execute_async( execution_spec, ) await self._publisher.publish_async( - bigframes.core.events.ExecutionFinished( - result=result, + bigframes.core.events.EventEnvelope( + event=bigframes.core.events.ExecutionFinished(result=result), + cell_execution_count=execution_spec.cell_execution_count, ) ) return result @@ -224,8 +225,11 @@ async def _try_execute_semi_executors( maybe_result = await exec.execute(plan, execution_spec) if maybe_result: await self._publisher.publish_async( - bigframes.core.events.ExecutionFinished( - result=maybe_result, + bigframes.core.events.EventEnvelope( + event=bigframes.core.events.ExecutionFinished( + result=maybe_result, + ), + cell_execution_count=execution_spec.cell_execution_count, ) ) return maybe_result diff --git a/packages/bigframes/bigframes/session/direct_gbq_execution.py b/packages/bigframes/bigframes/session/direct_gbq_execution.py index 24f48fe7d54b..c4d9aa729468 100644 --- a/packages/bigframes/bigframes/session/direct_gbq_execution.py +++ b/packages/bigframes/bigframes/session/direct_gbq_execution.py @@ -109,6 +109,7 @@ async def execute( job_config=job_config, query_with_job=(not can_skip_job), session=plan.session, + cell_execution_count=spec.cell_execution_count, ) result_bq_data = None if query_job and query_job.destination: @@ -158,6 +159,7 @@ def _run_execute_query( job_config: bq_job.QueryJobConfig, query_with_job: bool, session, + cell_execution_count: Optional[int] = None, ) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]: """ Starts BigQuery query job and waits for results. @@ -171,6 +173,7 @@ def _run_execute_query( metrics=self._metrics, publisher=self._publisher, session=session, + cell_execution_count=cell_execution_count, ) else: return ( @@ -181,6 +184,7 @@ def _run_execute_query( metrics=self._metrics, publisher=self._publisher, session=session, + cell_execution_count=cell_execution_count, ), None, ) diff --git a/packages/bigframes/bigframes/session/execution_spec.py b/packages/bigframes/bigframes/session/execution_spec.py index fc5c11c1c403..31ab5b750bed 100644 --- a/packages/bigframes/bigframes/session/execution_spec.py +++ b/packages/bigframes/bigframes/session/execution_spec.py @@ -60,6 +60,7 @@ class ExecutionSpec: # BigQuery specific options bigquery_config: Optional[BqComputeOptions] = None + cell_execution_count: Optional[int] = None def with_bq_labels(self, labels: Mapping[str, str]) -> ExecutionSpec: bq_config = self.bigquery_config or BqComputeOptions() @@ -77,7 +78,18 @@ def with_compute_options(self, compute_options: ComputeOptions) -> ExecutionSpec new_bq_config = new_bq_config.push_labels( dict(self.bigquery_config.extra_query_labels) ) - return dataclasses.replace(self, bigquery_config=new_bq_config) + + cell_execution_count = self.cell_execution_count + if cell_execution_count is None: + from bigframes.core.utils import get_ipython_execution_count + + cell_execution_count = get_ipython_execution_count() + + return dataclasses.replace( + self, + bigquery_config=new_bq_config, + cell_execution_count=cell_execution_count, + ) # Used internally by execution diff --git a/packages/bigframes/bigframes/session/metrics.py b/packages/bigframes/bigframes/session/metrics.py index 3712cce80726..a9a444ecb389 100644 --- a/packages/bigframes/bigframes/session/metrics.py +++ b/packages/bigframes/bigframes/session/metrics.py @@ -51,12 +51,14 @@ class JobMetadata: input_bytes: Optional[int] = None output_rows: Optional[int] = None source_format: Optional[str] = None + cell_execution_count: Optional[int] = None @classmethod def from_job( cls, query_job: Union[QueryJob, LoadJob], exec_seconds: Optional[float] = None, + cell_execution_count: Optional[int] = None, ) -> "JobMetadata": query_text = getattr(query_job, "query", None) if query_text and len(query_text) > 1024: @@ -71,6 +73,11 @@ def from_job( f"{job_id}&page=queryresults" ) + if cell_execution_count is None: + from bigframes.core.utils import get_ipython_execution_count + + cell_execution_count = get_ipython_execution_count() + metadata = cls( job_id=query_job.job_id, location=query_job.location, @@ -84,6 +91,7 @@ def from_job( error_result=query_job.error_result, query=query_text, job_url=job_url, + cell_execution_count=cell_execution_count, ) if isinstance(query_job, QueryJob): metadata.cached = getattr(query_job, "cache_hit", None) @@ -117,6 +125,7 @@ def from_row_iterator( cls, row_iterator: bq_table.RowIterator, exec_seconds: Optional[float] = None, + cell_execution_count: Optional[int] = None, ) -> "JobMetadata": query_text = getattr(row_iterator, "query", None) if query_text and len(query_text) > 1024: @@ -132,6 +141,11 @@ def from_row_iterator( f"project={project}&j=bq:{location}:{job_id}&page=queryresults" ) + if cell_execution_count is None: + from bigframes.core.utils import get_ipython_execution_count + + cell_execution_count = get_ipython_execution_count() + # fmt: off return cls( job_id=job_id, @@ -151,6 +165,7 @@ def from_row_iterator( cached=getattr(row_iterator, "cache_hit", None), query=query_text, job_url=job_url, + cell_execution_count=cell_execution_count, ) # fmt: on @@ -169,6 +184,8 @@ def count_job_stats( self, query_job: Optional[Union[QueryJob, LoadJob]] = None, row_iterator: Optional[bq_table.RowIterator] = None, + *, + cell_execution_count: Optional[int] = None, ): if query_job is None: assert row_iterator is not None @@ -194,7 +211,9 @@ def count_job_stats( self.jobs.append( JobMetadata.from_row_iterator( - row_iterator, exec_seconds=exec_seconds + row_iterator, + exec_seconds=exec_seconds, + cell_execution_count=cell_execution_count, ) ) @@ -225,7 +244,9 @@ def count_job_stats( self.execution_secs += exec_seconds or 0 metadata = JobMetadata.from_job( - query_job, exec_seconds=exec_seconds + query_job, + exec_seconds=exec_seconds, + cell_execution_count=cell_execution_count, ) self.jobs.append(metadata) @@ -237,7 +258,11 @@ def count_job_stats( else None ) self.jobs.append( - JobMetadata.from_job(query_job, exec_seconds=duration) + JobMetadata.from_job( + query_job, + exec_seconds=duration, + cell_execution_count=cell_execution_count, + ) ) # For pytest runs only, log information about the query job @@ -284,6 +309,7 @@ def on_event(self, envelope: Any): # EventEnvelope, ensuring subscribers receive a consistent contract. assert isinstance(envelope, bigframes.core.events.EventEnvelope) event = envelope.event + cell_execution_count = envelope.cell_execution_count if isinstance(event, bigframes.core.events.ExecutionFinished): if event.result and isinstance(event.result, LocalExecuteResult): @@ -291,10 +317,16 @@ def on_event(self, envelope: Any): bytes_processed = event.result.total_bytes_processed or 0 self.bytes_processed += bytes_processed + if cell_execution_count is None: + from bigframes.core.utils import get_ipython_execution_count + + cell_execution_count = get_ipython_execution_count() + metadata = JobMetadata( job_type="polars", status="DONE", total_bytes_processed=bytes_processed, + cell_execution_count=cell_execution_count, ) self.jobs.append(metadata) diff --git a/packages/bigframes/tests/unit/display/test_anywidget.py b/packages/bigframes/tests/unit/display/test_anywidget.py index 5c9fd79a3542..8e7b2f7f335b 100644 --- a/packages/bigframes/tests/unit/display/test_anywidget.py +++ b/packages/bigframes/tests/unit/display/test_anywidget.py @@ -177,6 +177,25 @@ def test_page_size_change_resets_sort(mock_df): assert mock_df.to_pandas_batches.call_count >= 2 +def test_cell_execution_count_propagation(mock_df): + """Test that the captured cell_execution_count is propagated to to_pandas_batches.""" + mock_ipy = mock.Mock() + mock_ipy.execution_count = 42 + mock_ipython = mock.MagicMock() + mock_ipython.get_ipython.return_value = mock_ipy + + with mock.patch.dict("sys.modules", {"IPython": mock_ipython}): + with bigframes.option_context("display.render_mode", "anywidget"): + widget = TableWidget(mock_df) + + assert widget._cell_execution_count == 42 + + mock_df.to_pandas_batches.assert_called_with( + page_size=widget.page_size, + cell_execution_count=42, + ) + + def test_json_column_converted_to_string_for_display(): mock_block = mock.Mock(spec=Block) mock_block.column_labels = pd.Index(["col_json"]) diff --git a/packages/bigframes/tests/unit/session/test_metrics.py b/packages/bigframes/tests/unit/session/test_metrics.py index ebd6e210fbe2..4e550b1c77a3 100644 --- a/packages/bigframes/tests/unit/session/test_metrics.py +++ b/packages/bigframes/tests/unit/session/test_metrics.py @@ -268,3 +268,35 @@ def test_on_event_with_local_execute_result(): assert execution_metrics.jobs[0].job_type == "polars" assert execution_metrics.jobs[0].status == "DONE" assert execution_metrics.jobs[0].total_bytes_processed == 1024 + + +def test_count_job_stats_with_explicit_cell_execution_count(): + row_iterator = unittest.mock.create_autospec( + bigquery.table.RowIterator, instance=True + ) + row_iterator.total_bytes_processed = 1024 + row_iterator.query = "SELECT * FROM table" + row_iterator.slot_millis = 1234 + execution_metrics = metrics.ExecutionMetrics() + execution_metrics.count_job_stats( + row_iterator=row_iterator, cell_execution_count=42 + ) + + assert len(execution_metrics.jobs) == 1 + assert execution_metrics.jobs[0].cell_execution_count == 42 + + +def test_on_event_with_explicit_cell_execution_count(): + import bigframes.core.events + from bigframes.session.executor import LocalExecuteResult + + local_result = unittest.mock.create_autospec(LocalExecuteResult, instance=True) + local_result.total_bytes_processed = 1024 + + event = bigframes.core.events.ExecutionFinished(result=local_result) + envelope = bigframes.core.events.EventEnvelope(event=event, cell_execution_count=42) + execution_metrics = metrics.ExecutionMetrics() + execution_metrics.on_event(envelope) + + assert len(execution_metrics.jobs) == 1 + assert execution_metrics.jobs[0].cell_execution_count == 42 diff --git a/packages/bigframes/tests/unit/session/test_read_gbq_colab.py b/packages/bigframes/tests/unit/session/test_read_gbq_colab.py index bb2cba0c1093..fd377b693187 100644 --- a/packages/bigframes/tests/unit/session/test_read_gbq_colab.py +++ b/packages/bigframes/tests/unit/session/test_read_gbq_colab.py @@ -126,3 +126,94 @@ def test_read_gbq_colab_doesnt_set_destination_table(): assert query == "SELECT 'my-test-query';" assert config.destination is None + + +def test_read_gbq_colab_with_callback(): + """Make sure callback receives events during execution.""" + session = mocks.create_bigquery_session() + callback = mock.Mock() + + _ = session._read_gbq_colab("SELECT 'my-test-query';", callback=callback) + + assert callback.call_count > 0 + + +def test_read_gbq_colab_filters_by_cell(): + """Verify that callbacks are scoped to individual executions.""" + session = mocks.create_bigquery_session() + callback1 = mock.Mock() + callback2 = mock.Mock() + + _ = session._read_gbq_colab("SELECT 'cell_1_query';", callback=callback1) + callback1_initial_count = callback1.call_count + + _ = session._read_gbq_colab("SELECT 'cell_2_query';", callback=callback2) + + # Verify callback1 was automatically unsubscribed upon completion + # of the first query. + assert callback1.call_count == callback1_initial_count + assert callback2.call_count > 0 + + +def test_execution_history_filtering(): + """Verify that execution_history can be filtered by job_ids or events.""" + from bigframes.session import metrics + + session = mocks.create_bigquery_session() + + job1 = metrics.JobMetadata(job_id="job_1", job_type="query", query="SELECT 1") + job2 = metrics.JobMetadata(job_id="job_2", job_type="query", query="SELECT 2") + session._metrics.jobs.extend([job1, job2]) + + history_job1 = session.execution_history(job_ids=["job_1"]).to_dataframe() + assert len(history_job1) == 1 + assert history_job1.iloc[0]["job_id"] == "job_1" + + event2 = mock.Mock() + event2.job_id = "job_2" + history_job2 = session.execution_history(events=[event2]).to_dataframe() + assert len(history_job2) == 1 + assert history_job2.iloc[0]["job_id"] == "job_2" + + +def test_execution_history_returns_all_executions_by_default(): + """Verify that execution_history returns all executions by default.""" + from bigframes.session import metrics + + session = mocks.create_bigquery_session() + job1 = metrics.JobMetadata( + job_id="job_1", job_type="query", query="SELECT 1", cell_execution_count=10 + ) + job2 = metrics.JobMetadata( + job_id="job_2", job_type="query", query="SELECT 2", cell_execution_count=20 + ) + session._metrics.jobs.extend([job1, job2]) + + history = session.execution_history().to_dataframe() + + assert len(history) == 2 + + +def test_execution_history_filters_by_notebook_cell_when_all_cells_is_false(): + """Verify that execution_history filters to the current cell when all_cells is False.""" + from bigframes.session import metrics + + session = mocks.create_bigquery_session() + job1 = metrics.JobMetadata( + job_id="job_1", job_type="query", query="SELECT 1", cell_execution_count=10 + ) + job2 = metrics.JobMetadata( + job_id="job_2", job_type="query", query="SELECT 2", cell_execution_count=20 + ) + session._metrics.jobs.extend([job1, job2]) + + mock_ipy = mock.Mock() + mock_ipy.execution_count = 20 + mock_ipython = mock.MagicMock() + mock_ipython.get_ipython.return_value = mock_ipy + + with mock.patch.dict("sys.modules", {"IPython": mock_ipython}): + history = session.execution_history(all_cells=False).to_dataframe() + + assert len(history) == 1 + assert history.iloc[0]["job_id"] == "job_2"