Skip to content

Commit f2b9465

Browse files
committed
additional data pipeline test
1 parent 37178d4 commit f2b9465

1 file changed

Lines changed: 359 additions & 0 deletions

File tree

tests/test_data_pipeline.py

Lines changed: 359 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,359 @@
1+
"""
2+
Unit tests for the ml_grid.pipeline.data.pipe class.
3+
4+
This test suite validates the core functionality of the data pipeline, ensuring
5+
that data is loaded, cleaned, transformed, and split correctly according to
6+
various configurations.
7+
"""
8+
9+
import unittest
10+
import pandas as pd
11+
import warnings
12+
import numpy as np
13+
import sys
14+
import tempfile
15+
import shutil
16+
from pathlib import Path
17+
18+
# Ensure the project root is in the Python path to allow for module imports
19+
try:
20+
from ml_grid.pipeline.data import pipe, NoFeaturesError
21+
from ml_grid.util.global_params import global_parameters
22+
except ImportError:
23+
# This allows the test to be run from the project root directory
24+
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
25+
from ml_grid.pipeline.data import pipe, NoFeaturesError
26+
from ml_grid.util.global_params import global_parameters
27+
28+
29+
class TestDataPipeline(unittest.TestCase):
30+
"""Test suite for the data.pipe class."""
31+
32+
@classmethod
33+
def setUpClass(cls):
34+
"""Set up resources that are shared across all tests."""
35+
cls.project_root = Path(__file__).resolve().parents[1]
36+
cls.test_data_path = cls.project_root / "notebooks" / "test_data_hfe_1yr_m_small_multiclass.csv"
37+
38+
if not cls.test_data_path.exists():
39+
raise FileNotFoundError(f"Test data file not found at {cls.test_data_path}")
40+
41+
print(f"Using test data: {cls.test_data_path}")
42+
43+
def setUp(self):
44+
"""Set up a temporary environment for each test."""
45+
self.test_dir = tempfile.mkdtemp()
46+
47+
# Configure global parameters for testing
48+
global_parameters.verbose = 1
49+
global_parameters.error_raise = True
50+
global_parameters.bayessearch = False
51+
52+
# Define a base configuration for the pipeline
53+
self.base_local_param_dict = {
54+
'outcome_var_n': 1,
55+
'param_space_size': 'small',
56+
'scale': True,
57+
'feature_n': 100, # Use all features by default
58+
'use_embedding': False,
59+
'embedding_method': 'pca',
60+
'embedding_dim': 10,
61+
'scale_features_before_embedding': True,
62+
'percent_missing': 50,
63+
'correlation_threshold': 0.98,
64+
'corr': 0.98,
65+
'test_size': 0.25,
66+
'resample': None,
67+
'random_state': 42,
68+
'feature_selection_method': 'anova',
69+
'data': {
70+
'age': True, 'sex': True, 'bmi': True, 'ethnicity': True,
71+
'bloods': True, 'diagnostic_order': True, 'drug_order': True,
72+
'annotation_n': True, 'meta_sp_annotation_n': True,
73+
'annotation_mrc_n': True, 'meta_sp_annotation_mrc_n': True,
74+
'core_02': True, 'bed': True, 'vte_status': True,
75+
'hosp_site': True, 'core_resus': True, 'news': True,
76+
'date_time_stamp': True, 'appointments': True,
77+
}
78+
}
79+
self.drop_term_list = ['chrom', 'hfe', 'phlebo']
80+
self.model_class_dict = {'LogisticRegression_class': True}
81+
82+
def tearDown(self):
83+
"""Clean up the temporary directory after each test."""
84+
shutil.rmtree(self.test_dir, ignore_errors=True)
85+
86+
def test_pipeline_initialization_successful(self):
87+
"""Test that the pipeline initializes and runs without errors."""
88+
try:
89+
pipeline = pipe(
90+
file_name=str(self.test_data_path),
91+
drop_term_list=self.drop_term_list,
92+
experiment_dir=self.test_dir,
93+
base_project_dir=str(self.project_root),
94+
local_param_dict=self.base_local_param_dict,
95+
param_space_index=0,
96+
model_class_dict=self.model_class_dict
97+
)
98+
# Assert that key attributes are created and have the correct types
99+
self.assertIsInstance(pipeline.X_train, pd.DataFrame)
100+
self.assertIsInstance(pipeline.y_train, pd.Series)
101+
self.assertGreater(len(pipeline.final_column_list), 0)
102+
self.assertGreater(len(pipeline.model_class_list), 0)
103+
self.assertEqual(pipeline.outcome_variable, 'outcome_var_1')
104+
105+
except Exception as e:
106+
self.fail(f"Pipeline initialization failed with an unexpected error: {e}")
107+
108+
def test_no_constant_columns_in_final_X_train(self):
109+
"""Verify that the final X_train contains no constant columns."""
110+
pipeline = pipe(
111+
file_name=str(self.test_data_path),
112+
drop_term_list=self.drop_term_list,
113+
experiment_dir=self.test_dir,
114+
base_project_dir=str(self.project_root),
115+
local_param_dict=self.base_local_param_dict,
116+
param_space_index=1,
117+
model_class_dict=self.model_class_dict
118+
)
119+
# A constant column has a variance of 0
120+
variances = pipeline.X_train.var(axis=0)
121+
constant_columns = variances[variances == 0].index.tolist()
122+
self.assertEqual(len(constant_columns), 0,
123+
f"Found constant columns in final X_train: {constant_columns}")
124+
125+
def test_data_quality_in_final_data(self):
126+
"""Check for NaN or infinite values in the final training data."""
127+
pipeline = pipe(
128+
file_name=str(self.test_data_path),
129+
drop_term_list=self.drop_term_list,
130+
experiment_dir=self.test_dir,
131+
base_project_dir=str(self.project_root),
132+
local_param_dict=self.base_local_param_dict,
133+
param_space_index=2,
134+
model_class_dict=self.model_class_dict
135+
)
136+
self.assertEqual(pipeline.X_train.isna().sum().sum(), 0,
137+
"Found NaN values in final X_train.")
138+
numeric_cols = pipeline.X_train.select_dtypes(include=np.number)
139+
self.assertEqual(np.isinf(numeric_cols).sum().sum(), 0,
140+
"Found infinite values in final X_train.")
141+
142+
def test_feature_importance_selection(self):
143+
"""Test that feature importance selection correctly reduces column count."""
144+
params = self.base_local_param_dict.copy()
145+
params['feature_n'] = 50 # Select top 50% of features
146+
params['percent_missing'] = 100 # Disable missing value pruning
147+
params['corr'] = 1.0 # Disable correlation pruning
148+
149+
with warnings.catch_warnings():
150+
warnings.simplefilter("ignore", RuntimeWarning)
151+
pipeline = pipe(
152+
file_name=str(self.test_data_path),
153+
drop_term_list=self.drop_term_list,
154+
experiment_dir=self.test_dir,
155+
base_project_dir=str(self.project_root),
156+
local_param_dict=params,
157+
param_space_index=3,
158+
model_class_dict=self.model_class_dict
159+
)
160+
161+
# Get the number of features *before* importance selection
162+
log = pipeline.feature_transformation_log
163+
importance_rows = log[log['step'] == 'Feature Importance']
164+
165+
if len(importance_rows) > 0:
166+
features_before_importance = importance_rows['features_before'].iloc[0]
167+
expected_features = int(features_before_importance * 0.50)
168+
169+
# Allow for slight rounding differences
170+
self.assertAlmostEqual(pipeline.X_train.shape[1], expected_features, delta=2,
171+
msg=f"Feature importance did not reduce features to ~50%. "
172+
f"Expected ~{expected_features}, got {pipeline.X_train.shape[1]}")
173+
else:
174+
self.fail("Feature Importance step was not found in the transformation log.")
175+
176+
def test_embedding_application(self):
177+
"""Test that embedding correctly reduces features to the target dimension."""
178+
params = self.base_local_param_dict.copy()
179+
params['use_embedding'] = True
180+
params['embedding_dim'] = 5 # Request a valid number of dimensions
181+
params['percent_missing'] = 100 # Disable missing value pruning
182+
params['corr'] = 1.0 # Disable correlation pruning
183+
params['feature_n'] = 100 # Ensure feature selection is off
184+
185+
pipeline = pipe(
186+
file_name=str(self.test_data_path),
187+
drop_term_list=self.drop_term_list,
188+
experiment_dir=self.test_dir,
189+
base_project_dir=str(self.project_root),
190+
local_param_dict=params,
191+
param_space_index=4,
192+
model_class_dict=self.model_class_dict
193+
)
194+
195+
# Embedding might create constant columns that are then removed
196+
self.assertLessEqual(pipeline.X_train.shape[1], params['embedding_dim'],
197+
"Embedding created more features than expected.")
198+
self.assertGreater(pipeline.X_train.shape[1], 0,
199+
"All features were removed after embedding.")
200+
self.assertTrue(all(c.startswith('embed_') for c in pipeline.X_train.columns),
201+
"Not all columns have the 'embed_' prefix.")
202+
203+
def test_index_alignment(self):
204+
"""Test that all final data splits have aligned indices."""
205+
pipeline = pipe(
206+
file_name=str(self.test_data_path),
207+
drop_term_list=self.drop_term_list,
208+
experiment_dir=self.test_dir,
209+
base_project_dir=str(self.project_root),
210+
local_param_dict=self.base_local_param_dict,
211+
param_space_index=5,
212+
model_class_dict=self.model_class_dict
213+
)
214+
self.assertTrue(pipeline.X_train.index.equals(pipeline.y_train.index),
215+
"X_train and y_train indices are not aligned.")
216+
self.assertTrue(pipeline.X_test.index.equals(pipeline.y_test.index),
217+
"X_test and y_test indices are not aligned.")
218+
self.assertTrue(pipeline.X_test_orig.index.equals(pipeline.y_test_orig.index),
219+
"X_test_orig and y_test_orig indices are not aligned.")
220+
221+
def test_safety_net_activation(self):
222+
"""Test that the safety net retains features when all are pruned."""
223+
params = self.base_local_param_dict.copy()
224+
# Create a config that will prune all features
225+
params['data'] = {key: False for key in params['data']}
226+
params['percent_missing'] = 0 # Drop any column with missing values
227+
params['correlation_threshold'] = 0.01 # Drop almost everything
228+
params['corr'] = 0.01
229+
230+
pipeline = pipe(
231+
file_name=str(self.test_data_path),
232+
drop_term_list=self.drop_term_list,
233+
experiment_dir=self.test_dir,
234+
base_project_dir=str(self.project_root),
235+
local_param_dict=params,
236+
param_space_index=6,
237+
model_class_dict=self.model_class_dict
238+
)
239+
240+
# Check that the safety net was activated and retained some features
241+
log = pipeline.feature_transformation_log
242+
self.assertTrue('Safety Net' in log['step'].values,
243+
"Safety Net step was not logged.")
244+
self.assertGreater(pipeline.X_train.shape[1], 0,
245+
"Safety net failed to retain any features.")
246+
247+
def test_index_alignment_with_resampling(self):
248+
"""Test index alignment after applying resampling."""
249+
params = self.base_local_param_dict.copy()
250+
params['resample'] = 'oversample'
251+
252+
pipeline = pipe(
253+
file_name=str(self.test_data_path),
254+
drop_term_list=self.drop_term_list,
255+
experiment_dir=self.test_dir,
256+
base_project_dir=str(self.project_root),
257+
local_param_dict=params,
258+
param_space_index=7,
259+
model_class_dict=self.model_class_dict
260+
)
261+
262+
# The most critical check is that the final training data is aligned
263+
self.assertTrue(pipeline.X_train.index.equals(pipeline.y_train.index),
264+
"X_train and y_train indices are not aligned after resampling.")
265+
266+
def test_final_data_integrity_after_complex_pipeline(self):
267+
"""
268+
Test for constant columns and index alignment in all final data splits
269+
after a complex pipeline run involving resampling and feature selection.
270+
"""
271+
params = self.base_local_param_dict.copy()
272+
params['resample'] = 'oversample'
273+
params['feature_n'] = 75
274+
params['corr'] = 1.0
275+
params['percent_missing'] = 100
276+
277+
with warnings.catch_warnings():
278+
warnings.simplefilter("ignore", RuntimeWarning)
279+
pipeline = pipe(
280+
file_name=str(self.test_data_path),
281+
drop_term_list=self.drop_term_list,
282+
experiment_dir=self.test_dir,
283+
base_project_dir=str(self.project_root),
284+
local_param_dict=params,
285+
param_space_index=11,
286+
model_class_dict=self.model_class_dict
287+
)
288+
289+
# 1. Check for constant columns in X_train
290+
train_variances = pipeline.X_train.var(axis=0)
291+
constant_columns_train = train_variances[train_variances == 0].index.tolist()
292+
self.assertEqual(len(constant_columns_train), 0,
293+
f"Found constant columns in final X_train: {constant_columns_train}")
294+
295+
# For test sets, constant columns are acceptable (no data leakage)
296+
for name, df in [('X_test', pipeline.X_test), ('X_test_orig', pipeline.X_test_orig)]:
297+
if (df.var(axis=0) == 0).any():
298+
constant_cols = df.columns[df.var(axis=0) == 0].tolist()
299+
warnings.warn(
300+
f"Found constant columns in final {name}: {constant_cols}. "
301+
f"This is acceptable as they were not constant in X_train."
302+
)
303+
304+
# 2. Check for index alignment in all final data splits
305+
self.assertTrue(pipeline.X_train.index.equals(pipeline.y_train.index),
306+
"Final X_train and y_train indices are not aligned.")
307+
self.assertTrue(pipeline.X_test.index.equals(pipeline.y_test.index),
308+
"Final X_test and y_test indices are not aligned.")
309+
self.assertTrue(pipeline.X_test_orig.index.equals(pipeline.y_test_orig.index),
310+
"Final X_test_orig and y_test_orig indices are not aligned.")
311+
312+
def test_final_data_integrity_with_embedding_and_resampling(self):
313+
"""
314+
Test for constant columns and index alignment after a pipeline
315+
run involving resampling and embedding.
316+
"""
317+
params = self.base_local_param_dict.copy()
318+
params['resample'] = 'undersample'
319+
params['use_embedding'] = True
320+
params['embedding_dim'] = 4
321+
params['feature_n'] = 100 # Disable feature selection
322+
params['corr'] = 1.0
323+
params['percent_missing'] = 100
324+
325+
pipeline = pipe(
326+
file_name=str(self.test_data_path),
327+
drop_term_list=self.drop_term_list,
328+
experiment_dir=self.test_dir,
329+
base_project_dir=str(self.project_root),
330+
local_param_dict=params,
331+
param_space_index=12,
332+
model_class_dict=self.model_class_dict
333+
)
334+
335+
# 1. Check for constant columns in the final training set
336+
train_variances = pipeline.X_train.var(axis=0)
337+
constant_columns_train = train_variances[train_variances == 0].index.tolist()
338+
self.assertEqual(len(constant_columns_train), 0,
339+
f"Found constant columns in final X_train after embedding: {constant_columns_train}")
340+
341+
# 2. Check for index alignment in all final data splits
342+
self.assertTrue(pipeline.X_train.index.equals(pipeline.y_train.index),
343+
"Final X_train and y_train indices are not aligned.")
344+
self.assertTrue(pipeline.X_test.index.equals(pipeline.y_test.index),
345+
"Final X_test and y_test indices are not aligned.")
346+
self.assertTrue(pipeline.X_test_orig.index.equals(pipeline.y_test_orig.index),
347+
"Final X_test_orig and y_test_orig indices are not aligned.")
348+
349+
# 3. Check that embedding was applied correctly
350+
self.assertLessEqual(pipeline.X_train.shape[1], params['embedding_dim'],
351+
"Embedding created more features than expected.")
352+
self.assertGreater(pipeline.X_train.shape[1], 0,
353+
"Embedding and cleaning removed all features.")
354+
self.assertTrue(all(c.startswith('embed_') for c in pipeline.X_train.columns),
355+
"Not all columns have the 'embed_' prefix after embedding.")
356+
357+
358+
if __name__ == '__main__':
359+
unittest.main(argv=['first-arg-is-ignored'], exit=False)

0 commit comments

Comments
 (0)