Skip to content

Commit 507a3de

Browse files
committed
Fix BayesSearchCV stability and implement time-series pipeline dispatch
- Add `PatchedBayesSearchCV` in `hyperparameter_search.py` to fix `skopt` issues with non-scalar parameters and optimizer state corruption. - Update `HyperparameterSearch` to detect `aeon` deep learning models, forcing single-threaded execution and disabling verbosity to prevent multiprocessing hangs and pickling errors. - Refactor `main.py` to dynamically select between `grid_search_crossvalidate` and `grid_search_crossvalidate_ts` based on the pipeline mode. - Force threading backend for Keras/aeon models during hyperparameter search execution.
1 parent c98ec9c commit 507a3de

2 files changed

Lines changed: 145 additions & 13 deletions

File tree

ml_grid/pipeline/hyperparameter_search.py

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import logging
22
import warnings
33
from typing import Any, Dict, List, Union
4+
import numpy as np
5+
import joblib
46

57
import pandas as pd
68
import tensorflow as tf
79
from sklearn.base import BaseEstimator, is_classifier
810
from sklearn.exceptions import ConvergenceWarning
911
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
1012
from skopt import BayesSearchCV
13+
from skopt.utils import point_asdict
1114
from ml_grid.model_classes.AutoKerasClassifierWrapper import AutoKerasClassifierWrapper
1215
from ml_grid.model_classes.FLAMLClassifierWrapper import FLAMLClassifierWrapper
1316
from ml_grid.model_classes.H2OAutoMLClassifier import H2OAutoMLClassifier
@@ -30,6 +33,86 @@
3033
from ml_grid.util.validate_parameters import validate_parameters_helper
3134

3235

36+
class PatchedBayesSearchCV(BayesSearchCV):
37+
def _step(
38+
self,
39+
search_space,
40+
optimizer,
41+
score_name=None,
42+
evaluate_candidates=None,
43+
n_points=1,
44+
):
45+
"""
46+
A patched version of _step to handle non-scalar Categorical parameters.
47+
48+
This is a copy of the original _step method from an older skopt version,
49+
with the problematic line that causes `ValueError: can only convert an
50+
array of size 1 to a Python scalar` removed.
51+
"""
52+
# get parameter values to evaluate
53+
params = optimizer.ask(n_points=n_points)
54+
55+
# The problematic line `params = [[np.array(v).item() for v in p] for p in params]`
56+
# is removed here to support non-scalar parameter values like tuples.
57+
58+
# make lists into dictionaries
59+
params_dict = [point_asdict(search_space, p) for p in params]
60+
61+
# Convert numpy types to native Python types to avoid H2OTypeError
62+
for i in range(len(params_dict)):
63+
for k, v in params_dict[i].items():
64+
if hasattr(v, "item"):
65+
params_dict[i][k] = v.item()
66+
67+
# evaluate all candidates
68+
all_results = evaluate_candidates(params_dict)
69+
70+
# Feed the point and score to the optimizer
71+
# We should feed the score of the refit metric to the optimizer.
72+
# The `multimetric_` attribute may not be present in all versions.
73+
# A reliable way to check for multimetric scoring is to see if `scoring`
74+
# was provided as a dictionary.
75+
if isinstance(self.scoring, dict):
76+
# Always use self.refit to get the base metric name (e.g., 'auc').
77+
# The `score_name` argument can be polluted in older skopt versions
78+
# on subsequent iterations of the search loop.
79+
metric_name = self.refit
80+
mean_test_score = all_results[f"mean_test_{metric_name}"]
81+
else:
82+
mean_test_score = all_results["mean_test_score"]
83+
84+
# Coerce scores to a 1D numpy array of floats to prevent type/shape errors.
85+
# This handles scalars, lists, and nested lists.
86+
scores_arr = np.asarray(mean_test_score, dtype=float).flatten()
87+
88+
# skopt optimizer minimizes the function so we negate the score
89+
y_tell = (-scores_arr).tolist()
90+
91+
# WORKAROUND: The batch `tell` method in older skopt versions can be buggy
92+
# and corrupt the optimizer's internal state (Xi, yi), leading to an
93+
# IndexError. To avoid this, we feed the points to the optimizer one
94+
# by one. The `fit` parameter is set to False for all but the last
95+
# point to ensure the model is fitted only after all points in the
96+
# batch are told.
97+
if params:
98+
# Tell all but the last point without fitting the model
99+
for i in range(len(params) - 1):
100+
optimizer.tell(params[i], y_tell[i], fit=False)
101+
# Tell the last point and trigger the model fit
102+
optimizer.tell(params[-1], y_tell[-1], fit=True)
103+
104+
# Pack results into a dictionary
105+
results = {
106+
"params": params,
107+
"mean_test_score": mean_test_score,
108+
"all_results": all_results,
109+
}
110+
# The calling `_run_search` loop expects a score_name back. We return
111+
# the base metric name to avoid polluting the `score_name` variable
112+
# in the parent loop.
113+
return results, self.refit if self.refit else "score"
114+
115+
33116
class HyperparameterSearch:
34117
"""Orchestrates hyperparameter search using GridSearchCV, RandomizedSearchCV, or BayesSearchCV."""
35118

@@ -203,6 +286,32 @@ def run_search(self, X_train: pd.DataFrame, y_train: pd.Series) -> BaseEstimator
203286
), # KNNWrapper,
204287
)
205288

289+
# Detect aeon deep learning models (MLPClassifier, TimeCNNClassifier, etc.)
290+
# These use TensorFlow/Keras and hang with joblib multiprocessing
291+
if (
292+
not is_single_threaded_search
293+
and hasattr(self.algorithm, "__module__")
294+
and "aeon" in self.algorithm.__module__
295+
and "deep_learning" in self.algorithm.__module__
296+
):
297+
is_single_threaded_search = True
298+
# Force verbose=0 to prevent progress bar hangs in captured stdout environments
299+
if hasattr(self.algorithm, "verbose"):
300+
self.algorithm.verbose = 0
301+
302+
# Force verbose=0 in parameter space to prevent search from re-enabling it
303+
if isinstance(self.parameter_space, dict):
304+
if "verbose" in self.parameter_space:
305+
self.parameter_space["verbose"] = [0]
306+
if "model__verbose" in self.parameter_space:
307+
self.parameter_space["model__verbose"] = [0]
308+
elif isinstance(self.parameter_space, list):
309+
for params in self.parameter_space:
310+
if "verbose" in params:
311+
params["verbose"] = [0]
312+
if "model__verbose" in params:
313+
params["model__verbose"] = [0]
314+
206315
if is_h2o_model or is_single_threaded_search or bayessearch:
207316
if verbose > 0:
208317
self.ml_grid_object.logger.info(
@@ -297,7 +406,7 @@ def run_search(self, X_train: pd.DataFrame, y_train: pd.Series) -> BaseEstimator
297406

298407
if bayessearch:
299408
# Bayesian Optimization
300-
grid = BayesSearchCV(
409+
grid = PatchedBayesSearchCV(
301410
estimator=self.algorithm,
302411
search_spaces=parameters,
303412
n_iter=self.max_iter,
@@ -342,8 +451,13 @@ def run_search(self, X_train: pd.DataFrame, y_train: pd.Series) -> BaseEstimator
342451
f"Starting hyperparameter search with {len(X_train_reset)} samples"
343452
)
344453

345-
# Fit the grid search with pandas DataFrames/Series (retains feature names)
346-
grid.fit(X_train_reset, y_train_reset)
454+
# Fit the grid search
455+
# Use threading backend for Keras/aeon models to avoid pickling errors (AttributeError: _metrics)
456+
if is_single_threaded_search:
457+
with joblib.parallel_backend("threading"):
458+
grid.fit(X_train_reset, y_train_reset)
459+
else:
460+
grid.fit(X_train_reset, y_train_reset)
347461

348462
best_model = grid.best_estimator_
349463

ml_grid/pipeline/main.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from sklearn.model_selection import ParameterGrid
1010
from skopt.space import Categorical, Integer, Real
1111

12-
from ml_grid.pipeline import grid_search_cross_validate
1312
from ml_grid.pipeline.data import pipe
1413
from ml_grid.util.bayes_utils import calculate_combinations
1514
from ml_grid.util.global_params import global_parameters
@@ -317,10 +316,21 @@ def execute_single_model(self, args: Tuple) -> float:
317316
if timeout is None:
318317
timeout = args[3].global_params.model_eval_time_limit
319318

320-
with time_limit(timeout):
321-
gscv_instance = grid_search_cross_validate.grid_search_crossvalidate(
322-
*args
319+
# Conditionally select the grid search class based on time-series mode
320+
ml_grid_object = args[3]
321+
if ml_grid_object.time_series_mode:
322+
from ml_grid.pipeline.grid_search_cross_validate_ts import (
323+
grid_search_crossvalidate_ts,
323324
)
325+
326+
gscv_class = grid_search_crossvalidate_ts
327+
else:
328+
from ml_grid.pipeline import grid_search_cross_validate
329+
330+
gscv_class = grid_search_cross_validate.grid_search_crossvalidate
331+
332+
with time_limit(timeout):
333+
gscv_instance = gscv_class(*args)
324334
score = gscv_instance.grid_search_cross_validate_score_result
325335

326336
self.logger.info(f"Score for {args[2]}: {score:.4f}")
@@ -362,6 +372,19 @@ def execute(self) -> Tuple[List[List[Any]], float]:
362372
self.model_error_list = []
363373
self.highest_score = 0
364374

375+
# Determine which grid search class to use based on the pipeline mode
376+
is_ts = self.ml_grid_object.time_series_mode
377+
if is_ts:
378+
from ml_grid.pipeline.grid_search_cross_validate_ts import (
379+
grid_search_crossvalidate_ts,
380+
)
381+
382+
gscv_class = grid_search_crossvalidate_ts
383+
else:
384+
from ml_grid.pipeline import grid_search_cross_validate
385+
386+
gscv_class = grid_search_cross_validate.grid_search_crossvalidate
387+
365388
if self.multiprocess:
366389

367390
def multi_run_wrapper(args: Tuple) -> Any:
@@ -388,12 +411,7 @@ def multi_run_wrapper(args: Tuple) -> Any:
388411
timeout = self.global_params.model_eval_time_limit
389412

390413
with time_limit(timeout):
391-
gscv_instance = (
392-
grid_search_cross_validate.grid_search_crossvalidate(
393-
*self.arg_list[k] # Unpack all arguments
394-
)
395-
)
396-
414+
gscv_instance = gscv_class(*self.arg_list[k])
397415
self.highest_score = max(
398416
self.highest_score,
399417
gscv_instance.grid_search_cross_validate_score_result,

0 commit comments

Comments
 (0)