Skip to content

Commit 4ae75f9

Browse files
author
SamoraHunter
committed
correlation optimisations
1 parent 13a567b commit 4ae75f9

3 files changed

Lines changed: 172 additions & 165 deletions

File tree

Lines changed: 169 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -1,194 +1,200 @@
11
import logging
2-
import sys
3-
from typing import Any, Dict, List
2+
from typing import Dict, List, Set
43

54
import numpy as np
65
import pandas as pd
76
from tqdm import tqdm
87

8+
logger = logging.getLogger("ml_grid")
99

10-
def correlation_coefficient(col1: pd.Series, col2: pd.Series) -> float:
11-
"""Calculates the Pearson correlation coefficient between two pandas Series.
1210

13-
Args:
14-
col1 (pd.Series): The first series.
15-
col2 (pd.Series): The second series.
11+
def handle_correlation_matrix(
12+
local_param_dict: Dict,
13+
drop_list: List[str],
14+
df: pd.DataFrame,
15+
chunk_size: int = 1000,
16+
) -> List[str]:
17+
"""
18+
Hybrid Correlation Optimizer.
1619
17-
Returns:
18-
float: The correlation coefficient.
20+
Features:
21+
1. Respects existing 'drop_list' (adds to it, doesn't replace it).
22+
2. Optimizes by skipping columns already in 'drop_list'.
23+
3. Hybrid GPU/CPU execution with robust error handling.
1924
"""
20-
return col1.corr(col2)
2125

26+
threshold = local_param_dict.get("corr", 0.25)
2227

23-
def calculate_correlation_chunk(
24-
df: pd.DataFrame,
25-
cols_chunk1: List[str],
26-
cols_chunk2: List[str],
27-
) -> pd.DataFrame:
28-
"""Calculate correlation between two chunks of columns.
29-
30-
Args:
31-
df (pd.DataFrame): The input DataFrame.
32-
cols_chunk1 (List[str]): First set of column names.
33-
cols_chunk2 (List[str]): Second set of column names.
34-
35-
Returns:
36-
pd.DataFrame: Correlation matrix for the chunk pairs.
37-
"""
38-
# Extract data for both chunks
39-
data1 = df[cols_chunk1].values
40-
data2 = df[cols_chunk2].values
28+
# Filter to numeric columns only
29+
numeric_columns = df.select_dtypes(include=["number"]).columns
4130

42-
# Calculate means and standard deviations
43-
mean1 = np.mean(data1, axis=0)
44-
mean2 = np.mean(data2, axis=0)
45-
std1 = np.std(data1, axis=0, ddof=1)
46-
std2 = np.std(data2, axis=0, ddof=1)
31+
if len(numeric_columns) == 0:
32+
return drop_list # Return existing list if no new work to do
4733

48-
# Center the data
49-
centered1 = data1 - mean1
50-
centered2 = data2 - mean2
34+
logger.info("Preparing data (converting to float32)...")
35+
df_numeric = df[numeric_columns]
36+
col_names = df_numeric.columns.tolist()
5137

52-
# Calculate correlation matrix chunk
53-
n = len(df)
54-
corr_chunk = np.dot(centered1.T, centered2) / (n - 1)
38+
# Create a mapping for fast index lookups
39+
col_to_idx = {name: i for i, name in enumerate(col_names)}
5540

56-
# Normalize by standard deviations
57-
corr_chunk = corr_chunk / np.outer(std1, std2)
41+
# Convert data to float32
42+
data = df_numeric.values.astype(np.float32)
5843

59-
return pd.DataFrame(corr_chunk, index=cols_chunk1, columns=cols_chunk2)
44+
# --- GPU DETECTION & SAFETY ---
45+
use_gpu = False
46+
try:
47+
import cupy as cp
6048

49+
if cp.cuda.is_available():
50+
free_mem = cp.cuda.Device().mem_info[0]
51+
req_mem = (data.shape[1] ** 2) * 4 # 4 bytes per float32
6152

62-
def handle_correlation_matrix(
63-
local_param_dict: Dict[str, Any],
64-
drop_list: List[Any],
65-
df: pd.DataFrame,
66-
chunk_size: int = 50,
67-
) -> List[Any]:
68-
"""Identifies highly correlated columns and adds them to a drop list.
69-
70-
This function calculates the correlation coefficient between numeric columns
71-
in the input DataFrame in chunks to manage memory usage. Pairs of columns
72-
with a correlation greater than the specified threshold are added to the
73-
`drop_list` as tuples.
74-
75-
To avoid dropping both columns in a highly correlated pair, this function
76-
only adds one of the columns from each pair to the `drop_list`.
77-
78-
Calculates the correlation coefficient between each column in the input DataFrame
79-
using chunks to avoid memory issues. The correlation threshold is defined by
80-
the 'corr' key in the local_param_dict dictionary.
81-
82-
Args:
83-
local_param_dict (Dict[str, Any]): Dictionary containing local parameters,
84-
including the 'corr' threshold.
85-
drop_list (List[Any]): A list to which pairs of correlated columns will
86-
be appended.
87-
df (pd.DataFrame): The input DataFrame.
88-
chunk_size (int, optional): The size of each chunk for correlation
89-
calculation. Defaults to 50.
90-
91-
Returns:
92-
List[Any]: The updated list containing unique pairs of correlated columns.
93-
"""
94-
logger = logging.getLogger("ml_grid")
95-
# Define the correlation threshold
96-
threshold = (
97-
local_param_dict.get("corr")
98-
if local_param_dict.get("corr") is not None
99-
else 0.98
53+
if free_mem > req_mem * 1.2:
54+
use_gpu = True
55+
logger.info(
56+
f"GPU Detected: {cp.cuda.Device().name}. Free VRAM: {free_mem/1e9:.2f} GB."
57+
)
58+
else:
59+
logger.warning(
60+
"GPU detected but insufficient VRAM. Falling back to CPU."
61+
)
62+
except Exception as e:
63+
logger.warning(f"GPU acceleration unavailable (falling back to CPU): {e}")
64+
use_gpu = False
65+
# -----------------------------
66+
67+
# Convert input drop_list to a Set for O(1) lookups
68+
existing_drops = set(drop_list)
69+
70+
if use_gpu:
71+
try:
72+
return _process_on_gpu(data, col_names, threshold, existing_drops)
73+
except Exception as e:
74+
logger.error(f"GPU processing failed: {e}. Retrying on CPU.")
75+
# Fallthrough to CPU
76+
pass
77+
78+
# CPU Fallback
79+
return _process_on_cpu(
80+
data, col_names, col_to_idx, threshold, chunk_size, existing_drops
10081
)
10182

102-
# Remove non-numeric columns
103-
numeric_columns = df.select_dtypes(include=["number"]).columns.tolist()
10483

105-
if len(numeric_columns) == 0:
106-
return drop_list
84+
def _process_on_gpu(
85+
data: np.ndarray, col_names: List[str], threshold: float, existing_drops: Set[str]
86+
) -> List[str]:
87+
import cupy as cp
10788

108-
df_numeric = df[numeric_columns]
109-
n_cols = len(numeric_columns)
89+
n_samples = data.shape[0]
11090

111-
# Adjust chunk size if necessary
112-
if chunk_size >= n_cols:
113-
chunk_size = n_cols
91+
# Initialize the final set with what we already had
92+
to_drop = existing_drops.copy()
11493

115-
# Track which columns to drop
116-
to_drop = set()
94+
# Move data to GPU
95+
gpu_data = cp.asarray(data)
11796

118-
# Calculate number of chunks
119-
n_chunks = int(np.ceil(n_cols / chunk_size))
97+
# Standardize
98+
means = gpu_data.mean(axis=0, keepdims=True)
99+
stds = gpu_data.std(axis=0, keepdims=True)
100+
stds[stds == 0] = 1.0
101+
gpu_data = (gpu_data - means) / stds
120102

121-
logger.info(
122-
f"Processing {n_cols} columns in {n_chunks} chunks of size {chunk_size}..."
123-
)
103+
scale_factor = 1.0 / (n_samples - 1)
124104

125-
# Process correlation matrix in chunks (upper triangle only)
126-
for i in tqdm(range(n_chunks), desc="Processing column chunks"):
127-
start_i = i * chunk_size
128-
end_i = min((i + 1) * chunk_size, n_cols)
129-
cols_i = numeric_columns[start_i:end_i]
130-
131-
# Process only the upper triangle (j <= i)
132-
for j in range(i + 1):
133-
start_j = j * chunk_size
134-
end_j = min((j + 1) * chunk_size, n_cols)
135-
cols_j = numeric_columns[start_j:end_j]
136-
137-
# Calculate correlation for this chunk pair
138-
try:
139-
if i == j:
140-
# Diagonal chunk - calculate self-correlation
141-
corr_chunk = df_numeric[cols_i].corr().abs()
142-
143-
# Only look at upper triangle within this chunk
144-
for idx_i, col_i in enumerate(cols_i):
145-
if col_i in to_drop:
146-
continue
147-
for idx_j in range(idx_i):
148-
col_j = cols_j[idx_j]
149-
if col_j in to_drop:
150-
continue
151-
if corr_chunk.iloc[idx_j, idx_i] > threshold:
152-
to_drop.add(col_i)
153-
break
154-
if col_i in to_drop:
155-
break
156-
else:
157-
# Off-diagonal chunk
158-
corr_chunk = calculate_correlation_chunk(
159-
df_numeric, cols_i, cols_j
160-
).abs()
161-
162-
# Check all pairs in this chunk
163-
for idx_i, col_i in enumerate(cols_i):
164-
if col_i in to_drop:
165-
continue
166-
for idx_j, col_j in enumerate(cols_j):
167-
if col_j in to_drop:
168-
continue
169-
if corr_chunk.iloc[idx_i, idx_j] > threshold:
170-
# Drop the column that appears later in the original list
171-
col_to_drop = (
172-
col_i
173-
if start_i + idx_i > start_j + idx_j
174-
else col_j
175-
)
176-
to_drop.add(col_to_drop)
177-
if col_to_drop == col_i:
178-
break
179-
if col_i in to_drop:
180-
break
181-
182-
except Exception as e:
183-
logger.warning(
184-
f"Error processing chunk ({i}, {j}): {e}", file=sys.stderr
185-
)
186-
continue
105+
# Matrix Multiplication
106+
corr_matrix = cp.matmul(gpu_data.T, gpu_data)
107+
corr_matrix *= scale_factor
108+
corr_matrix = cp.abs(corr_matrix)
109+
110+
# Upper Triangle only (k=1)
111+
upper_tri = cp.triu(corr_matrix, k=1)
112+
113+
# Get indices of high correlations
114+
rows, cols = cp.where(upper_tri > threshold)
115+
116+
cpu_rows = cp.asnumpy(rows)
117+
cpu_cols = cp.asnumpy(cols)
118+
119+
# Process pairs
120+
for i, j in zip(cpu_rows, cpu_cols):
121+
col_i = col_names[i]
122+
col_j = col_names[j]
123+
124+
# KEY LOGIC: If Col_I is already marked for drop (either from input list
125+
# or from this loop), we skip. Otherwise, we drop Col_J.
126+
if col_i not in to_drop:
127+
to_drop.add(col_j)
128+
129+
logger.info(f"GPU complete. Total columns to drop: {len(to_drop)}")
130+
return sorted(list(to_drop))
131+
132+
133+
def _process_on_cpu(
134+
data: np.ndarray,
135+
col_names: List[str],
136+
col_to_idx: Dict[str, int],
137+
threshold: float,
138+
chunk_size: int,
139+
existing_drops: Set[str],
140+
) -> List[str]:
141+
142+
logger.info("Using optimized CPU processing...")
143+
n_samples, n_cols = data.shape
144+
145+
# Standardize
146+
means = data.mean(axis=0, keepdims=True)
147+
stds = data.std(axis=0, keepdims=True)
148+
stds[stds == 0] = 1.0
149+
data = (data - means) / stds
150+
151+
scale_factor = 1.0 / (n_samples - 1)
152+
153+
# Initialize mask with PRE-EXISTING drops
154+
# This optimizes the loop: we won't calculate correlations for columns
155+
# that came in already dropped.
156+
dropped_mask = np.zeros(n_cols, dtype=bool)
157+
158+
for col in existing_drops:
159+
if col in col_to_idx:
160+
dropped_mask[col_to_idx[col]] = True
161+
162+
effective_chunk_size = max(chunk_size, 500)
163+
164+
with tqdm(total=n_cols, desc="CPU Correlation") as pbar:
165+
for i in range(0, n_cols, effective_chunk_size):
166+
i_end = min(i + effective_chunk_size, n_cols)
167+
168+
chunk_data = data[:, i:i_end]
169+
170+
# Correlation Block
171+
corr_chunk = np.matmul(chunk_data.T, data) * scale_factor
172+
corr_chunk = np.abs(corr_chunk)
173+
174+
for local_row in range(corr_chunk.shape[0]):
175+
global_current_idx = i + local_row
176+
177+
# OPTIMIZATION:
178+
# If this column was in the input drop_list OR we just dropped it, SKIP.
179+
if dropped_mask[global_current_idx]:
180+
continue
181+
182+
# Check neighbors to the right
183+
candidates = corr_chunk[local_row, global_current_idx + 1 :]
184+
hits = np.where(candidates > threshold)[0]
185+
186+
if hits.size > 0:
187+
# Add to mask
188+
dropped_mask[global_current_idx + 1 + hits] = True
189+
190+
pbar.update(i_end - i)
187191

188-
# Add the identified columns to the initial drop_list
189-
drop_list.extend(list(to_drop))
192+
# Convert mask back to list
193+
dropped_indices = np.where(dropped_mask)[0]
194+
newly_identified_drops = {col_names[i] for i in dropped_indices}
190195

191-
logger.info(f"Identified {len(to_drop)} columns to drop due to high correlation.")
196+
# Merge with original list (in case original list had cols not in this dataframe)
197+
final_drop_set = existing_drops.union(newly_identified_drops)
192198

193-
# Return a list of unique columns to drop
194-
return list(set(drop_list))
199+
logger.info(f"CPU complete. Total columns to drop: {len(final_drop_set)}")
200+
return sorted(list(final_drop_set))

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ dependencies = [
6666

6767
# Dev
6868
"black",
69-
"ruff"
69+
"ruff",
70+
7071
]
7172

7273
[project.urls]

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -741,4 +741,4 @@ nbformat
741741

742742
upsetplot==0.9.0
743743
# manually added for results processing
744-
744+
cupy

0 commit comments

Comments
 (0)