-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgroup_analysis.py
More file actions
169 lines (137 loc) · 7.63 KB
/
group_analysis.py
File metadata and controls
169 lines (137 loc) · 7.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
from scipy.optimize import minimize_scalar, root_scalar
from cost_functions import compute_phi_raw, penalty_function
def compute_group_economic_profit(group, group_time, get_production_line_by_id):
setup_savings = 0
downtime_savings = 0
increase_in_cost = 0
production_line_ids = set()
# --- CM Cost Increase Calculation ---
for component in group.components:
production_line_ids.add(component.production_line_id)
delta_t = abs(group_time - component.optimal_execution_time)
cc = component.corrective_maintenance_cost
mtbf = component.mean_time_between_failures
lambd = component.lamda_efr
x_star = component.optimal_execution_time
phi_plus = compute_phi_raw(cc, mtbf, lambd, x_star + delta_t)
phi_minus = compute_phi_raw(cc, mtbf, lambd, x_star - delta_t)
phi_star = compute_phi_raw(cc, mtbf, lambd, x_star)
increase_in_cost += (phi_plus + phi_minus - 2 * phi_star)
# --- Setup Savings: 1 setup per line ---
for line_id in production_line_ids:
line = get_production_line_by_id(line_id)
num_components = sum(1 for c in group.components if c.production_line_id == line_id)
setup_savings += line.preventive_maintenance_set_up_cost * (num_components - 1)
# --- Downtime Cost Savings (Equation 15) ---
for line_id in production_line_ids:
components_on_line = [c for c in group.components if c.production_line_id == line_id]
line = get_production_line_by_id(line_id)
total_individual_duration = sum(c.preventive_maintenance_duration for c in components_on_line)
max_parallel_duration = max(c.preventive_maintenance_duration for c in components_on_line)
line_downtime_savings = line.downtime_cost_rate * (total_individual_duration - max_parallel_duration)
downtime_savings += line_downtime_savings
# --- Final Economic Profit ---
economic_profit = setup_savings + downtime_savings - increase_in_cost
return economic_profit, {
"downtime_savings": downtime_savings,
"setup_savings": setup_savings,
"increased_CM_cost": increase_in_cost
}
# This function is used to calculate the penalty for a group of components. The penalty minimization helps find the group's optimal execution time
def group_penalty_function(t, group):
penalty = 0
for comp in group.components:
delta_t = t - comp.optimal_execution_time
phi_shifted = compute_phi_raw(comp.corrective_maintenance_cost, comp.mean_time_between_failures, comp.lamda_efr, comp.optimal_execution_time + delta_t)
phi_star = compute_phi_raw(comp.corrective_maintenance_cost, comp.mean_time_between_failures, comp.lamda_efr, comp.optimal_execution_time)
penalty += (phi_shifted - phi_star - delta_t * comp.long_term_cost_rate)
return penalty
def find_optimal_group_time(group, use_effective_time=False):
"""
Find optimal execution time for a group.
If use_effective_time is True and group has effective_execution_time set,
use that instead of optimizing.
"""
if use_effective_time and hasattr(group, 'effective_execution_time') and group.effective_execution_time > 0:
return group.effective_execution_time, group_penalty_function(group.effective_execution_time, group)
result = minimize_scalar(lambda t: group_penalty_function(t, group), bounds=(0, 365), method='bounded')
# Store as planned execution time if not using effective time
if not use_effective_time:
group.planned_execution_time = result.x
return result.x, result.fun
# This function finds for a given component the feasible interval within which the penalty for shifting away from the optimal execution time does not exceed the setup cost of the production line
def find_feasible_interval(component, get_production_line_by_id):
production_line = get_production_line_by_id(component.production_line_id)
S = production_line.preventive_maintenance_set_up_cost
x_star = component.optimal_execution_time
def root_eq(delta_t):
return penalty_function(delta_t, component) - S
# Search for Δt⁻ in the negative direction
try:
sol_neg = root_scalar(root_eq, bracket=[-x_star + 0.01, 0], method='brentq') # Root scalar is a root-finding algorithm based on Bolzano's condition
delta_t_minus = sol_neg.root if sol_neg.converged else None
except:
delta_t_minus = None
# Search for Δt⁺ in the positive direction
try:
sol_pos = root_scalar(root_eq, bracket=[0, 365 - x_star], method='brentq')
delta_t_plus = sol_pos.root if sol_pos.converged else None
except:
delta_t_plus = None
if delta_t_minus is not None and delta_t_plus is not None:
return (x_star + delta_t_minus, x_star + delta_t_plus)
else:
return None # No feasible interval
def find_non_intersecting_pairs(components, find_feasible_interval, get_production_line_by_id):
non_intersecting_pairs = []
for i in range(len(components)):
for j in range(i + 1, len(components)):
interval_i = components[i].feasible_interval
interval_j = components[j].feasible_interval
if interval_i and interval_j:
# Check if intervals do not intersect
if interval_i[1] < interval_j[0] or interval_j[1] < interval_i[0]:
non_intersecting_pairs.append((components[i], components[j]))
return non_intersecting_pairs
def compute_grouping_structure_cost(groups, get_production_line_fn, d_PH):
# Step 1: Calculate the total individual cost
total_individual_cost = 0
all_components = []
for group in groups:
all_components.extend(group.components)
total_individual_cost = d_PH * sum(comp.long_term_cost_rate for comp in all_components)
# Step 2: Calculate total group economic profit
total_economic_profit = 0
for group in groups:
# Compute optimal time for this group
group_time = find_optimal_group_time(group)[0]
profit, _ = compute_group_economic_profit(group, group_time, get_production_line_fn)
total_economic_profit += profit
# Step 3: Calculate grouped structure cost
grouped_structure_cost = total_individual_cost - total_economic_profit
return grouped_structure_cost
def compute_grouping_structure_cost_with_cascading(groups, get_production_line_fn, d_PH):
"""
Compute grouping structure cost considering cascading delays.
This should be called AFTER the constructive heuristic completes.
"""
# Step 1: Calculate the total individual cost
total_individual_cost = 0
all_components = []
for group in groups:
all_components.extend(group.components)
total_individual_cost = d_PH * sum(comp.long_term_cost_rate for comp in all_components)
# Step 2: Apply cascading delays
from group import group_components_by_line, apply_cascading_delays
groups_by_line = group_components_by_line(groups)
apply_cascading_delays(groups_by_line, get_production_line_fn)
# Step 3: Calculate total group economic profit using effective execution times
total_economic_profit = 0
for group in groups:
group_time = group.effective_execution_time if hasattr(group, 'effective_execution_time') and group.effective_execution_time > 0 else find_optimal_group_time(group)[0]
profit, components_dict = compute_group_economic_profit(group, group_time, get_production_line_fn)
group.economic_profit = (profit, components_dict)
total_economic_profit += profit
# Step 4: Calculate grouped structure cost
grouped_structure_cost = total_individual_cost - total_economic_profit
return grouped_structure_cost