-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathacquire_spectra.py
More file actions
131 lines (109 loc) · 4.76 KB
/
acquire_spectra.py
File metadata and controls
131 lines (109 loc) · 4.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import numpy as np
import pandas as pd
from acquire_spectra_utils import (
get_datafile,
lorentzian_profile,
add_white_noise,
apply_cavity_mode_response,
param_check,
)
from scipy.signal import find_peaks as fp
def acquire_spectra(params: dict, window=25, resolution=0.001, fwhm=0.007, Q=10000, Pmax=1.0):
"""
For each spectral line in the data file corresponding to the molecule specified in params,
create a local frequency grid (spanning ±window around its doubled frequency) and compute its
broadened spectrum using a Lorentzian profile. The local spectra are then interpolated onto a
common grid and summed to produce the final spectrum.
"""
# verify user input is valid
if not param_check(params):
return {
"success": False,
"error": "One of the given parameters was invalid. Please change some settings and try again.",
}
# Retrieve the molecule parameter from params.
molecule = params.get("molecule")
# Retrieve vres parameter from params.
v_res = params.get("vres")
# Use the helper function to obtain the correct data file path.
datafile = get_datafile(molecule)
# Determine cropping bounds based on frequency mode.
frequencyMode = params.get("acquisitionType", "single")
if frequencyMode == "single":
crop_min = v_res - window
crop_max = v_res + window
else:
frequency_min = params.get("frequencyMin")
frequency_max = params.get("frequencyMax")
if frequency_min is None or frequency_max is None:
raise ValueError("For frequency range mode, 'frequencyMin' and 'frequencyMax' must be provided.")
crop_min = frequency_min - window
crop_max = frequency_max + window
# Read the data file.
df = pd.read_csv(datafile, sep=r"\s+", header=None, names=["Frequency", "Intensity"])
df["Frequency"] = pd.to_numeric(df["Frequency"], errors='coerce')
df["Intensity"] = pd.to_numeric(df["Intensity"], errors='coerce')
line_freq = df["Frequency"].values
line_intensity = df["Intensity"].values
# Constants
c_SI = 299792458.0 # Speed of light in m/s
vrms = 1760.0 # Helium velocity in m/s
# Filter out spectral lines that are outside the cropping bounds.
if crop_min is not None and crop_max is not None:
mask_lines = (line_freq + window >= crop_min) & (line_freq - window <= crop_max)
line_intensity = line_intensity[mask_lines]
line_freq = line_freq[mask_lines]
# Collect individual spectra (local grid and corresponding spectrum).
individual_spectra = []
for f, I in zip(line_freq, line_intensity):
local_grid = np.arange(f - window, f + window, resolution)
split_val = (f / c_SI) * vrms
add_split = (f + split_val)
subtract_split = (f - split_val)
profile_main = I * lorentzian_profile(local_grid, add_split, fwhm)
profile_split = I * lorentzian_profile(local_grid, subtract_split, fwhm)
local_spectrum = profile_main + profile_split
individual_spectra.append((local_grid, local_spectrum))
# Define the overall frequency grid.
final_grid = np.arange(crop_min, crop_max, resolution)
final_spectrum = np.zeros_like(final_grid, dtype=float)
# Interpolate each individual spectrum onto the overall grid and sum them.
for local_grid, local_spec in individual_spectra:
final_spectrum += np.interp(final_grid, local_grid, local_spec, left=0, right=0)
# Add white noise to the final spectrum, depending on the number of cycles per step.
cyclesPerStep = params.get("numCyclesPerStep")
final_spectrum = add_white_noise(final_spectrum, cyclesPerStep, is_cavity_mode=False)
# Apply cavity mode response
final_spectrum = apply_cavity_mode_response(params, final_grid, final_spectrum, v_res, Q, Pmax)
# Take absolute value of the final spectrum.
final_spectrum = np.abs(final_spectrum)
return {
"success": True,
"x": [f"{xi:.4f}" for xi in final_grid],
"y": [f"{yi:.6f}" for yi in final_spectrum],
}
def find_peaks(
x_data: list[float],
y_data: list[float],
threshold: float = 0,
min_distance: int = 100,
) -> dict:
"""
Use scipy.signal.find_peaks to locate peaks in y_data above an absolute threshold.
"""
try:
x = np.array(x_data)
y = np.array(y_data)
lines, props = fp(
y,
height=threshold,
distance=min_distance,
)
except Exception as e:
return {"success": False, "error": str(e)}
peaks = {}
for line in lines:
freq = x[line]
intensity = y[line]
peaks[f"{float(freq):.4f}"] = f"{float(intensity):.4f}"
return {"success": True, "peaks": peaks}