33from typing import TypeVar
44
55import numpy as np
6+ import scipy
67
78from imap_l3_processing .constants import ELECTRON_MASS_KG , PROTON_CHARGE_COULOMBS , METERS_PER_KILOMETER
89from imap_l3_processing .pitch_angles import calculate_pitch_angle , calculate_unit_vector , calculate_gyrophase
910from imap_l3_processing .swe .l3 .models import SweConfiguration
10-
11+ from imap_l3_processing . swe . quality_flags import SweL3Flags
1112
1213def piece_wise_model (x : np .ndarray , b0 : float , b1 : float ,
1314 b2 : float , b3 : float , b4 : float , b5 : float ) -> np .ndarray :
@@ -18,6 +19,151 @@ def piece_wise_model(x: np.ndarray, b0: float, b1: float,
1819 lambda x : b0 * np .exp (b2 * (b3 - b1 )) * np .exp (b4 * (b5 - b3 )) * np .exp (- b5 * x ),
1920 ]))
2021
22+ def mec_breakpoint_finder (energies : np .ndarray , averaged_psd : np .ndarray ) -> tuple [float , float , SweL3Flags ]:
23+ """
24+ Input:
25+ energies - energy bins
26+ averaged_psd - phase space density, either averaged over all CEMs or individual CEMs
27+ Output:
28+ sc_pot_output, ch_break_output, total_flag - tuple for spacecraft potential, core-halo break point, and all flags thrown
29+ """
30+ log_energy = np .log (energies )
31+ log_psd = np .log (averaged_psd )
32+
33+ # Check to see if first 4 points are nearly linear
34+ # If True, then potential is likely less than 2.7 V (lower than first energy bin)
35+ def line_model (params , x ):
36+ return params [0 ] + params [1 ] * x
37+ from scipy import odr
38+ odr_model = odr .Model (line_model )
39+ x = log_energy [:4 ]
40+ y = log_psd [:4 ]
41+ mydata = odr .RealData (x = x ,y = y )
42+ myodr = odr .ODR (mydata , odr_model , beta0 = [y .max (),- 5 ])
43+ myodr .set_job (fit_type = 2 )
44+ myoutput = myodr .run ()
45+ if myoutput .res_var <= 0.01 :
46+ FALLBACK_POTENTIAL_ESTIMATE = SweL3Flags .FALLBACK_POTENTIAL_ESTIMATE
47+ return_value = 2.5
48+ else :
49+ FALLBACK_POTENTIAL_ESTIMATE = SweL3Flags .NONE
50+
51+ # Use a smoothed spline on log_psd for spectral break finding routine as a fall back only!
52+ # Mirror real point as fake point to left of first energy bin to improve spline concavity
53+ ewidth = np .nanmean (log_energy [1 :] - log_energy [:- 1 ])
54+ from scipy .interpolate import UnivariateSpline as uspline
55+ spline = uspline (np .concatenate ([[log_energy [0 ]- ewidth ],log_energy ]),
56+ np .concatenate ([[log_psd [2 ]],log_psd ]), s = .25 )
57+ spline_energies = np .geomspace (energies .min ()* np .exp (- ewidth ), energies .max (), 100 )
58+ spline_derivative = spline .derivative (2 )
59+ curvature = spline_derivative (np .log (spline_energies ))
60+ try :
61+ peaks = scipy .signal .find_peaks (curvature )[0 ]
62+ sc_pot = spline_energies [peaks [0 ]]
63+ ch_break = spline_energies [peaks [1 ]]
64+ BACKUP_SPLINE_UNRESOLVED = SweL3Flags .NONE
65+ except :
66+ # Spline peak finder did not work
67+ BACKUP_SPLINE_UNRESOLVED = SweL3Flags .BACKUP_SPLINE_UNRESOLVED
68+ sc_pot = np .nan
69+ ch_break = np .nan
70+
71+ def piece_wise_model_mec (x , b0 , b1 , b2 , b3 ):
72+ """
73+ Modified Piecewise to fit Potential and Core-Halo Break separately
74+ The breakpoint is b2
75+ """
76+ return np .piecewise (x , [x <= b2 , x > b2 ],
77+ [lambda x : b0 - b1 * x , lambda x : b0 + b2 * (b3 - b1 ) - b3 * x ])
78+
79+ def refine_breakpoint_value (energy , psd , breakpoint_value , num_points ):
80+ """
81+ Function to use lines from num_points to left and right to find intersection
82+ energy at which the lines intersect is the refined_breakpoint
83+ """
84+ # Find Nearest energy bin to breakpoint_value
85+ nearest_energy_idx = np .argmin (np .abs (energy - breakpoint_value ))
86+ if np .abs (breakpoint_value - energy [nearest_energy_idx ])/ energy [nearest_energy_idx ] <= .075 :
87+ # Breakpoint_value was within FWHM of nearest energy bin
88+ # return that energy bin as refined_breakpoint
89+ return energy [nearest_energy_idx ]
90+ # Get left and right spectrum of breakpoint_value
91+ e_left = energy [energy < breakpoint_value ]
92+ e_right = energy [energy > breakpoint_value ]
93+ psd_left = psd [energy < breakpoint_value ]
94+ psd_right = psd [energy > breakpoint_value ]
95+ if len (e_left ) < num_points :
96+ # Not enough points to the left (really only possible for s/c potential)
97+ return breakpoint_value
98+ # Fit a line to the num_points left and right of breakpoint_value
99+ z_left = np .polyfit (e_left [- num_points :], psd_left [- num_points :], 1 )
100+ z_right = np .polyfit (e_right [:num_points ], psd_right [:num_points ], 1 )
101+ # Determine energy of their intersection
102+ refined_breakpoint = (z_left [1 ]- z_right [1 ]) / (z_right [0 ]- z_left [0 ])
103+ if (refined_breakpoint < z_left [- 1 ]) | (refined_breakpoint > z_right [0 ]):
104+ # Refined breakpoint lies outside of expected range
105+ return breakpoint_value
106+ return refined_breakpoint
107+
108+ # Prepare masking for the two separate fits
109+ mask_sc = log_energy <= np .log (30 )
110+ mask_ch = (log_energy > np .log (30 )) & (log_energy < np .log (400 ))
111+
112+ log_energy_sc = log_energy [mask_sc ]; log_psd_sc = log_psd [mask_sc ]
113+ log_energy_ch = log_energy [mask_ch ]; log_psd_ch = log_psd [mask_ch ]
114+
115+ fitting_model = piece_wise_model_mec
116+ # Try Spacecraft Potential Fit
117+ POTENTIAL_FIT_UNCONVERGED = SweL3Flags .NONE
118+ if FALLBACK_POTENTIAL_ESTIMATE == SweL3Flags .NONE :
119+ try :
120+ initial_guess = [log_psd [0 ],1 ,7 ,1 ]
121+ z , cov = scipy .optimize .curve_fit (fitting_model , np .exp (log_energy_sc ), log_psd_sc , p0 = initial_guess )
122+ # Make sure the fit converged
123+ if ((z [0 ] == initial_guess [0 ]) | (z [1 ] == initial_guess [1 ])
124+ | (z [2 ] == initial_guess [2 ]) | (z [3 ] == initial_guess [3 ])):
125+ # Fall back on Spline method
126+ # Fit did not converge
127+ POTENTIAL_FIT_UNCONVERGED = SweL3Flags .POTENTIAL_FIT_UNCONVERGED
128+ sc_pot_output = sc_pot
129+ else :
130+ # Fit worked
131+ # Check whether breakpoint has two points to left and right
132+ # If so, then find intersection of linear fits to each side
133+ sc_pot_output = refine_breakpoint_value (np .exp (log_energy_sc ), log_psd_sc , z [2 ], 2 )
134+ # spline not used
135+ BACKUP_SPLINE_UNRESOLVED = SweL3Flags .NONE
136+ except :
137+ # Fall back on Spline method
138+ # Fit did not converge
139+ BACKUP_SPLINE_UNRESOLVED = SweL3Flags .BACKUP_SPLINE_UNRESOLVED
140+ sc_pot_output = sc_pot
141+ else :
142+ sc_pot_output = return_value
143+ # Try Core-Halo Breakpoint Fit
144+ BREAKPOINT_FIT_UNCONVERGED = SweL3Flags .NONE
145+ try :
146+ initial_guess = [log_psd_ch [0 ],1 ,65 ,1 ]
147+ z , cov = scipy .optimize .curve_fit (fitting_model , np .exp (log_energy_ch ), log_psd_ch , p0 = initial_guess )
148+ # Make sure the fit converged
149+ if ((z [0 ] == initial_guess [0 ]) & (z [1 ] == initial_guess [1 ])
150+ & (z [2 ] == initial_guess [2 ]) & (z [3 ] == initial_guess [3 ])):
151+ # Fall back on Spline method
152+ # Fit did not converge
153+ BREAKPOINT_FIT_UNCONVERGED = SweL3Flags .BREAKPOINT_FIT_UNCONVERGED
154+ ch_break_output = ch_break
155+ else :
156+ # Fit worked
157+ # Check whether breakpoint has three points to left and right
158+ # If so, then find intersection of linear fits to each side
159+ ch_break_output = refine_breakpoint_value (np .exp (log_energy_ch ), log_psd_ch , z [2 ], 3 )
160+ except :
161+ # Fall back on Spline method
162+ # Fit did not converge
163+ BREAKPOINT_FIT_UNCONVERGED = SweL3Flags .BREAKPOINT_FIT_UNCONVERGED
164+ ch_break_output = ch_break
165+ return sc_pot_output , ch_break_output , FALLBACK_POTENTIAL_ESTIMATE | BACKUP_SPLINE_UNRESOLVED | POTENTIAL_FIT_UNCONVERGED | BREAKPOINT_FIT_UNCONVERGED
166+
21167
22168def find_breakpoints (energies : np .ndarray , averaged_psd : np .ndarray , latest_spacecraft_potentials : list [float ],
23169 latest_core_halo_break_points : list [float ],
0 commit comments