Skip to content

Commit 1360e5e

Browse files
paddymulclaude
andcommitted
refactor: abstract CustomizableDataflow into pandas/polars subclasses
Make CustomizableDataflow an abstract base class with concrete PandasCustomizableDataflow and PolarsCustomizableDataflow subclasses. Widgets select their backend via a dataflow_klass attribute. This decouples pandas from the core dataflow module, enabling pandas to become an optional dependency in the future. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 66b7b4c commit 1360e5e

9 files changed

Lines changed: 142 additions & 67 deletions

File tree

buckaroo/buckaroo_widget.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from buckaroo.extension_utils import copy_extend
2828

2929
from .serialization_utils import EMPTY_DF_WHOLE, check_and_fix_df, pd_to_obj, to_parquet, sd_to_parquet_b64
30-
from .dataflow.dataflow import CustomizableDataflow
30+
from .dataflow.pandas_dataflow import PandasCustomizableDataflow
3131
from .dataflow.dataflow_extras import (Sampling, exception_protect)
3232
from .dataflow.styling_core import (ComponentConfig, DFViewerConfig, DisplayArgs, OverrideColumnConfig, PinnedRowConfig, StylingAnalysis, merge_column_config, EMPTY_DFVIEWER_CONFIG)
3333
from .dataflow.autocleaning import PandasAutocleaning
@@ -124,16 +124,13 @@ def __init__(self, orig_df, debug=False,
124124
self.record_transcript = record_transcript
125125
self.exception = None
126126
kls = self.__class__
127-
class InnerDataFlow(CustomizableDataflow):
127+
class InnerDataFlow(kls.dataflow_klass):
128128
sampling_klass = kls.sampling_klass
129129
autocleaning_klass = kls.autocleaning_klass
130130
DFStatsClass = kls.DFStatsClass
131131
autoclean_conf= kls.autoclean_conf
132132
analysis_klasses = kls.analysis_klasses
133133

134-
def _df_to_obj(idfself, df:pd.DataFrame):
135-
return self._df_to_obj(df)
136-
137134
self.dataflow = InnerDataFlow(
138135
orig_df,
139136
debug=debug,column_config_overrides=column_config_overrides,
@@ -162,6 +159,7 @@ def _df_to_obj(self, df:pd.DataFrame):
162159
render_func_name = Unicode("baked").tag(sync=True)
163160

164161

162+
dataflow_klass = PandasCustomizableDataflow
165163
sampling_klass = PdSampling
166164
autocleaning_klass = PandasAutocleaning #override the base CustomizableDataFlow klass
167165
DFStatsClass = DfStatsV2 # Pandas Specific

buckaroo/dataflow/dataflow.py

Lines changed: 24 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
1+
from abc import abstractmethod
12
from typing import List, Literal, Tuple, Type, TypedDict, Dict as TDict, Any as TAny, Union
23
from typing_extensions import override
34
import six
45
import warnings
5-
import pandas as pd
66
from traitlets import Unicode, Any, observe, Dict
77

88
from buckaroo.pluggable_analysis_framework.col_analysis import ColAnalysis, SDType
9-
from ..serialization_utils import pd_to_obj, sd_to_parquet_b64
9+
from ..serialization_utils import sd_to_parquet_b64
1010
from buckaroo.pluggable_analysis_framework.utils import (filter_analysis)
11-
from buckaroo.pluggable_analysis_framework.df_stats_v2 import DfStatsV2
1211
from .autocleaning import SentinelAutocleaning
1312
from .dataflow_extras import (exception_protect, Sampling)
1413
from .styling_core import (
@@ -92,7 +91,7 @@ def __init__(self, raw_df):
9291

9392

9493

95-
def _compute_sampled_df(self, raw_df:pd.DataFrame, sample_method:str):
94+
def _compute_sampled_df(self, raw_df, sample_method):
9695
if sample_method == "first":
9796
return raw_df[:1]
9897
return raw_df
@@ -185,7 +184,7 @@ def processed_sd(self) -> SDType:
185184
return self.processed_result[1]
186185
return {}
187186

188-
def _get_summary_sd(self, df:pd.DataFrame) -> Tuple[SDType, TAny]:
187+
def _get_summary_sd(self, df) -> Tuple[SDType, TAny]:
189188
analysis_klasses = self.analysis_klasses
190189
if analysis_klasses == "foo":
191190
return {'some-col': {'foo':8}}, {}
@@ -236,12 +235,15 @@ def _widget_config(self, change):
236235

237236
class CustomizableDataflow(DataFlow):
238237
"""
239-
This allows targetd extension and customization of DataFlow
238+
This allows targetd extension and customization of DataFlow.
239+
240+
This is an abstract base class — use PandasCustomizableDataflow or
241+
PolarsCustomizableDataflow for concrete implementations.
240242
"""
241243
#analysis_klasses = [StylingAnalysis]
242244
analysis_klasses: List[Type[ColAnalysis]] = [StylingAnalysis]
243245
command_config = Dict({}).tag(sync=True)
244-
DFStatsClass = DfStatsV2
246+
DFStatsClass = None
245247
sampling_klass = Sampling
246248

247249
df_display_klasses: TDict[str, Type[StylingAnalysis]] = {}
@@ -323,7 +325,8 @@ def setup_options_from_analysis(self):
323325
empty_df_display_args[kls.df_display_name] = EMPTY_DF_DISPLAY_ARG
324326

325327

326-
self.DFStatsClass.verify_analysis_objects(self.analysis_klasses)
328+
if self.DFStatsClass is not None:
329+
self.DFStatsClass.verify_analysis_objects(self.analysis_klasses)
327330

328331
self.post_processing_klasses = filter_analysis(self.analysis_klasses, "post_processing_method")
329332

@@ -379,38 +382,20 @@ def run_code_generator(self, operations):
379382
self.ac_obj.run_code_generator(operations)
380383
### end code interpeter block
381384

382-
@override
383-
def _compute_processed_result(self, cleaned_df:pd.DataFrame, post_processing_method:str) -> Tuple[pd.DataFrame, SDType]:
384-
if post_processing_method == '':
385-
return (cleaned_df, {})
386-
else:
387-
post_analysis = self.post_processing_klasses[post_processing_method]
388-
try:
389-
ret_df, sd = post_analysis.post_process_df(cleaned_df)
390-
return (ret_df, sd)
391-
except Exception as e:
392-
return (self._build_error_dataframe(e), {})
385+
@abstractmethod
386+
def _compute_processed_result(self, cleaned_df, post_processing_method):
387+
...
393388

389+
@abstractmethod
394390
def _build_error_dataframe(self, e):
395-
return pd.DataFrame({'err': [str(e)]})
391+
...
396392

397393

398394
### start summary stats block
399395
#TAny closer to some error type
400-
@override
401-
def _get_summary_sd(self, processed_df:pd.DataFrame) -> Tuple[SDType, TDict[str, TAny]]:
402-
stats = self.DFStatsClass(
403-
processed_df,
404-
self.analysis_klasses,
405-
self.df_name, debug=self.debug)
406-
sdf = stats.sdf
407-
if stats.errs:
408-
if self.debug:
409-
raise Exception("Error executing analysis")
410-
else:
411-
return {}, stats.errs
412-
else:
413-
return sdf, {}
396+
@abstractmethod
397+
def _get_summary_sd(self, processed_df) -> Tuple[SDType, TDict[str, TAny]]:
398+
...
414399

415400

416401
# ### end summary stats block
@@ -422,13 +407,16 @@ def _sd_to_jsondf(self, sd:SDType):
422407
"""
423408
return sd_to_parquet_b64(sd)
424409

425-
def _df_to_obj(self, df:pd.DataFrame) -> TDict[str, TAny]:
426-
return pd_to_obj(self.sampling_klass.serialize_sample(df))
410+
@abstractmethod
411+
def _df_to_obj(self, df) -> TDict[str, TAny]:
412+
...
427413

428414
def add_analysis(self, analysis_klass:Type[ColAnalysis]) -> None:
429415
"""
430416
same as get_summary_sd, call whatever to set summary_sd and trigger further comps
431417
"""
418+
if self.DFStatsClass is None:
419+
return
432420

433421
stats = self.DFStatsClass(
434422
self.processed_df,

buckaroo/dataflow/dataflow_extras.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import logging
33

44

5-
import pandas as pd
65

76
logger = logging.getLogger()
87

@@ -13,8 +12,6 @@
1312
'summary_stats_key': 'empty'}
1413

1514

16-
SENTINEL_DF_1 = pd.DataFrame({'foo' :[10, 20], 'bar' : ["asdf", "iii"]})
17-
SENTINEL_DF_2 = pd.DataFrame({'col1' :[55, 55], 'col2': ["pppp", "333"]})
1815

1916
SENTINEL_COLUMN_CONFIG_1 = "ASDF"
2017
SENTINEL_COLUMN_CONFIG_2 = "FOO-BAR"
@@ -65,18 +62,15 @@ def pre_stats_sample(kls, df):
6562
print("Removing excess columns, found %d columns" % len(df.columns))
6663
df = df[df.columns[:kls.max_columns]]
6764
if kls.pre_limit and len(df) > kls.pre_limit:
68-
sampled = df.sample(kls.pre_limit)
69-
if isinstance(sampled, pd.DataFrame):
70-
return sampled.sort_index()
71-
return sampled
65+
return df.sample(kls.pre_limit)
7266
return df
7367

7468

7569
@classmethod
7670
def serialize_sample(kls, df):
7771
if kls.serialize_limit and len(df) > kls.serialize_limit:
7872
sampled = df.sample(kls.serialize_limit)
79-
if isinstance(sampled, pd.DataFrame):
73+
if hasattr(sampled, 'sort_index'):
8074
return sampled.sort_index()
8175
return sampled
8276
return df
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from typing import Tuple, Dict as TDict, Any as TAny
2+
3+
import pandas as pd
4+
from typing_extensions import override
5+
6+
from buckaroo.pluggable_analysis_framework.col_analysis import SDType
7+
from buckaroo.pluggable_analysis_framework.df_stats_v2 import DfStatsV2
8+
from ..serialization_utils import pd_to_obj
9+
from .dataflow import CustomizableDataflow
10+
11+
12+
class PandasCustomizableDataflow(CustomizableDataflow):
13+
"""Concrete pandas implementation of CustomizableDataflow."""
14+
15+
DFStatsClass = DfStatsV2
16+
17+
@override
18+
def _compute_processed_result(self, cleaned_df, post_processing_method):
19+
if post_processing_method == '':
20+
return (cleaned_df, {})
21+
else:
22+
post_analysis = self.post_processing_klasses[post_processing_method]
23+
try:
24+
ret_df, sd = post_analysis.post_process_df(cleaned_df)
25+
return (ret_df, sd)
26+
except Exception as e:
27+
return (self._build_error_dataframe(e), {})
28+
29+
@override
30+
def _build_error_dataframe(self, e):
31+
return pd.DataFrame({'err': [str(e)]})
32+
33+
@override
34+
def _get_summary_sd(self, processed_df) -> Tuple[SDType, TDict[str, TAny]]:
35+
stats = self.DFStatsClass(
36+
processed_df,
37+
self.analysis_klasses,
38+
self.df_name, debug=self.debug)
39+
sdf = stats.sdf
40+
if stats.errs:
41+
if self.debug:
42+
raise Exception("Error executing analysis")
43+
else:
44+
return {}, stats.errs
45+
else:
46+
return sdf, {}
47+
48+
@override
49+
def _df_to_obj(self, df) -> TDict[str, TAny]:
50+
return pd_to_obj(self.sampling_klass.serialize_sample(df))
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from typing import Tuple, Dict as TDict, Any as TAny
2+
3+
import pandas as pd
4+
import polars as pl
5+
from typing_extensions import override
6+
7+
from buckaroo.pluggable_analysis_framework.col_analysis import SDType
8+
from buckaroo.pluggable_analysis_framework.df_stats_v2 import PlDfStatsV2
9+
from ..serialization_utils import pd_to_obj
10+
from .dataflow import CustomizableDataflow
11+
12+
13+
class PolarsCustomizableDataflow(CustomizableDataflow):
14+
"""Concrete polars implementation of CustomizableDataflow."""
15+
16+
DFStatsClass = PlDfStatsV2
17+
18+
@override
19+
def _compute_processed_result(self, cleaned_df, post_processing_method):
20+
if post_processing_method == '':
21+
return (cleaned_df, {})
22+
else:
23+
post_analysis = self.post_processing_klasses[post_processing_method]
24+
try:
25+
ret_df, sd = post_analysis.post_process_df(cleaned_df)
26+
return (ret_df, sd)
27+
except Exception as e:
28+
return (self._build_error_dataframe(e), {})
29+
30+
@override
31+
def _build_error_dataframe(self, e):
32+
return pl.DataFrame({'err': [str(e)]})
33+
34+
@override
35+
def _get_summary_sd(self, processed_df) -> Tuple[SDType, TDict[str, TAny]]:
36+
stats = self.DFStatsClass(
37+
processed_df,
38+
self.analysis_klasses,
39+
self.df_name, debug=self.debug)
40+
sdf = stats.sdf
41+
if stats.errs:
42+
if self.debug:
43+
raise Exception("Error executing analysis")
44+
else:
45+
return {}, stats.errs
46+
else:
47+
return sdf, {}
48+
49+
@override
50+
def _df_to_obj(self, df) -> TDict[str, TAny]:
51+
if isinstance(df, pd.DataFrame):
52+
return pd_to_obj(self.sampling_klass.serialize_sample(df))
53+
return pd_to_obj(self.sampling_klass.serialize_sample(df.to_pandas()))

buckaroo/polars_buckaroo.py

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@
99
from .pluggable_analysis_framework.df_stats_v2 import PlDfStatsV2
1010
from .pluggable_analysis_framework.polars_analysis_management import PlDfStats
1111
from .customizations.pl_stats_v2 import PL_ANALYSIS_V2
12-
from .serialization_utils import pd_to_obj, sd_to_parquet_b64
12+
from .serialization_utils import sd_to_parquet_b64
1313
from .customizations.styling import DefaultSummaryStatsStyling, DefaultMainStyling
1414
from .customizations.pl_autocleaning_conf import NoCleaningConfPl
1515
from .dataflow.dataflow import Sampling
16+
from .dataflow.polars_dataflow import PolarsCustomizableDataflow
1617
from .dataflow.autocleaning import PandasAutocleaning
1718
from .dataflow.widget_extension_utils import configure_buckaroo
1819

@@ -50,6 +51,7 @@ def make_origs(raw_df, cleaned_df, cleaning_sd):
5051
class PolarsBuckarooWidget(BuckarooWidget):
5152
"""TODO: Add docstring here
5253
"""
54+
dataflow_klass = PolarsCustomizableDataflow
5355
analysis_klasses = local_analysis_klasses
5456
autocleaning_klass = PandasAutocleaning #override the base CustomizableDataFlow klass
5557
autoclean_conf = tuple([NoCleaningConfPl]) #override the base CustomizableDataFlow conf
@@ -60,17 +62,6 @@ def _sd_to_jsondf(self, sd):
6062
"""Serialize summary stats dict as parquet-b64."""
6163
return sd_to_parquet_b64(sd)
6264

63-
def _build_error_dataframe(self, e):
64-
return pl.DataFrame({'err': [str(e)]})
65-
66-
def _df_to_obj(self, df):
67-
# I want to this, but then row numbers are lost
68-
#return pd_to_obj(self.sampling_klass.serialize_sample(df).to_pandas())
69-
import pandas as pd
70-
if isinstance(df, pd.DataFrame):
71-
return pd_to_obj(self.sampling_klass.serialize_sample(df))
72-
return pd_to_obj(self.sampling_klass.serialize_sample(df.to_pandas()))
73-
7465

7566
def prepare_df_for_serialization(df:pl.DataFrame) -> pl.DataFrame:
7667
# I don't like this copy. modify to keep the same data with different names

buckaroo/server/data_loading.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from buckaroo.serialization_utils import to_parquet, pd_to_obj, check_and_fix_df
77
from buckaroo.df_util import old_col_new_col, to_chars
88

9-
from buckaroo.dataflow.dataflow import CustomizableDataflow
9+
from buckaroo.dataflow.pandas_dataflow import PandasCustomizableDataflow
1010
from buckaroo.dataflow.dataflow_extras import Sampling
1111
from buckaroo.dataflow.autocleaning import PandasAutocleaning
1212
from buckaroo.dataflow.styling_core import StylingAnalysis
@@ -36,7 +36,7 @@ def pre_stats_sample(kls, df):
3636
return df
3737

3838

39-
class ServerDataflow(CustomizableDataflow):
39+
class ServerDataflow(PandasCustomizableDataflow):
4040
"""Headless dataflow matching BuckarooInfiniteWidget's pipeline."""
4141
sampling_klass = ServerSampling
4242
autocleaning_klass = PandasAutocleaning

tests/unit/dataflow/autocleaning_pd_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
SafeInt, DropCol, FillNA, GroupBy, NoOp, Search, OnlyOutliers
1414
)
1515
from buckaroo.customizations.pd_autoclean_conf import (NoCleaningConf)
16-
from buckaroo.dataflow.dataflow import CustomizableDataflow
16+
from buckaroo.dataflow.pandas_dataflow import PandasCustomizableDataflow
1717

1818
dirty_df = pd.DataFrame(
1919
{'a':[10, 20, 30, 40, 10, 20.3, 5, None, None, None],
@@ -500,7 +500,7 @@ def test_autoclean_dataflow():
500500
"""
501501
verify that different autocleaning confs are actually called
502502
"""
503-
class SentinelDataflow(CustomizableDataflow):
503+
class SentinelDataflow(PandasCustomizableDataflow):
504504
autocleaning_klass = PandasAutocleaning
505505
autoclean_conf = tuple([SentinelConfig, NoCleaningConf])
506506

0 commit comments

Comments
 (0)