-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathxanes_exporter.py
More file actions
357 lines (311 loc) · 12.6 KB
/
xanes_exporter.py
File metadata and controls
357 lines (311 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
from prefect import flow, task, get_run_logger
from data_validation import get_run
import time as ttime
import numpy as np
import xraylib as xrl
import pandas as pd
def xanes_textout(
scanid=-1,
header=[],
userheader={},
column=[],
usercolumn={},
usercolumnname=[],
output=True,
api_key=None,
):
"""
scan: can be scan_id (integer) or uid (string). default=-1 (last scan run)
header: a list of items that exist in the event data to be put into
the header
userheader: a dictionary defined by user to put into the header
column: a list of items that exist in the event data to be put into
the column data
output: print all header fields. if output = False, only print the ones
that were able to be written
default = True
"""
h = get_run(scanid, api_key=api_key)
if (
"Beamline Commissioning (beamline staff only)".lower()
in h.start["proposal"]["type"].lower()
):
filepath = f"/nsls2/data/srx/proposals/commissioning/{h.start['data_session']}/scan_{h.start['scan_id']}_xanes.txt"
else:
filepath = f"/nsls2/data/srx/proposals/{h.start['cycle']}/{h.start['data_session']}/scan_{h.start['scan_id']}_xanes.txt" # noqa: E501
with open(filepath, "w") as f:
dataset_client = h["primary"]["data"]
staticheader = (
"# XDI/1.0 MX/2.0\n"
+ "# Beamline.name: "
+ h.start["beamline_id"]
+ "\n"
+ "# Facility.name: NSLS-II\n"
+ "# Facility.ring_current:"
+ str(dataset_client["ring_current"][0])
+ "\n"
+ "# Scan.start.uid: "
+ h.start["uid"]
+ "\n"
+ "# Scan.start.time: "
+ str(h.start["time"])
+ "\n"
+ "# Scan.start.ctime: "
+ ttime.ctime(h.start["time"])
+ "\n"
+ "# Mono.name: Si 111\n"
)
f.write(staticheader)
for item in header:
if item in dataset_client.keys():
f.write("# " + item + ": " + str(dataset_client[item]) + "\n")
if output is True:
print(f"{item} is written")
else:
print(f"{item} is not in the scan")
for key in userheader:
f.write("# " + key + ": " + str(userheader[key]) + "\n")
if output is True:
print(f"{key} is written")
file_data = {}
for idx, item in enumerate(column):
if item in dataset_client.keys():
# retrieve the data from tiled that is going to be used
# in the file
file_data[item] = dataset_client[item].read()
f.write("# Column." + str(idx + 1) + ": " + item + "\n")
f.write("# ")
for item in column:
if item in dataset_client.keys():
f.write(str(item) + "\t")
for item in usercolumnname:
f.write(item + "\t")
f.write("\n")
f.flush()
offset = False
for idx in range(len(file_data[column[0]])):
for item in column:
if item in file_data:
f.write("{0:8.6g} ".format(file_data[item][idx]))
for item in usercolumnname:
if item in usercolumn:
if offset is False:
try:
f.write("{0:8.6g} ".format(usercolumn[item][idx]))
except KeyError:
offset = True
f.write("{0:8.6g} ".format(usercolumn[item][idx + 1]))
else:
f.write("{0:8.6g} ".format(usercolumn[item][idx + 1]))
f.write("\n")
@task
def xas_step_exporter(scanid, api_key=None, dry_run=False):
logger = get_run_logger()
# Custom header list
headeritem = []
# Load header for our scan
h = get_run(scanid, api_key=api_key)
if h.start["scan"].get("type") != "XAS_STEP":
logger.info("Incorrect document type. Not running exporter on this document.")
return
# Construct basic header information
userheaderitem = {}
userheaderitem["uid"] = h.start["uid"]
userheaderitem["sample.name"] = h.start["scan"]["sample_name"]
# Create columns for data file
columnitem = ["energy_energy", "energy_bragg", "energy_c2_x"]
# Include I_M, I_0, and I_t from the SRS
if "sclr1" in h.start["detectors"]:
if "sclr_i0" in h["primary"].descriptors[0]["object_keys"]["sclr1"]:
columnitem.extend(["sclr_im", "sclr_i0", "sclr_it"])
else:
columnitem.extend(["sclr1_mca3", "sclr1_mca2", "sclr1_mca4"])
else:
raise KeyError("SRS not found in data!")
# Include fluorescence data if present, allow multiple rois
if "xs" in h.start["detectors"]:
if "ROI" in h.start["scan"].keys():
roinum = list(h.start["scan"]["ROI"])
else:
roinum = [1] # if no ROI key found, assume ROI 1
logger.info(roinum)
for i in roinum:
logger.info(f"Current roinumber: {i}")
roi_name = "roi{:02}".format(i)
roi_key = []
xs_channels = h["primary"].descriptors[0]["object_keys"]["xs"]
for xs_channel in xs_channels:
logger.info(f"Current xs_channel: {xs_channel}")
if "mca" + roi_name in xs_channel and "total_rbv" in xs_channel: # noqa: E501
roi_key.append(xs_channel)
columnitem.extend(roi_key)
# if ('xs2' in h.start['detectors']):
# if (type(roinum) is not list):
# roinum = [roinum]
# for i in roinum:
# roi_name = 'roi{:02}'.format(i)
# roi_key = []
# roi_key.append(getattr(xs2.channel1.rois, roi_name).value.name)
# [columnitem.append(roi) for roi in roi_key]
# Construct user convenience columns allowing prescaling of ion chamber,
# diode and fluorescence detector data
usercolumnitem = {}
datatablenames = []
if "xs" in h.start["detectors"]:
datatablenames.extend(roi_key)
if "xs2" in h.start["detectors"]:
datatablenames.extend(roi_key)
if "sclr1" in h.start["detectors"]:
if "sclr_im" in h["primary"].descriptors[0]["object_keys"]["sclr1"]:
datatablenames.extend(["sclr_im", "sclr_i0", "sclr_it"])
datatable = h["primary"].read(datatablenames)
else:
datatablenames.extend(["sclr1_mca2", "sclr1_mca3", "sclr1_mca4"])
datatable = h["primary"].read(datatablenames)
else:
raise KeyError
# Calculate sums for xspress3 channels of interest
if "xs" in h.start["detectors"]:
for i in roinum:
roi_name = "roi{:02}".format(i)
roi_key = []
for xs_channel in xs_channels:
if "mca" + roi_name in xs_channel and "total_rbv" in xs_channel: # noqa: E501
roi_key.append(xs_channel)
roisum = sum(datatable[roi_key].to_array()).to_series()
roisum = roisum.rename_axis("seq_num").rename(lambda x: x + 1)
usercolumnitem["If-{:02}".format(i)] = roisum
# usercolumnitem['If-{:02}'.format(i)].round(0)
# if 'xs2' in h.start['detectors']:
# for i in roinum:
# roi_name = 'roi{:02}'.format(i)
# roisum=datatable[getattr(xs2.channel1.rois,roi_name).value.name]
# usercolumnitem['If-{:02}'.format(i)] = roisum
# usercolumnitem['If-{:02}'.format(i)].round(0)
if dry_run:
logger.info("Dry run: Not exporting xanes")
else:
xanes_textout(
scanid=scanid,
header=headeritem,
userheader=userheaderitem,
column=columnitem,
usercolumn=usercolumnitem,
usercolumnname=usercolumnitem.keys(),
output=False,
api_key=api_key,
)
@task
def xas_fly_exporter(uid, api_key=None, dry_run=False):
logger = get_run_logger()
# Get a scan header
hdr = get_run(uid, api_key=api_key)
start_doc = hdr.start
# Get proposal directory location
is_commissioning = "beamline commissioning" in start_doc["proposal"]["type"].lower()
cycle = "commissioning" if is_commissioning else start_doc["cycle"]
root = f"/nsls2/data/srx/proposals/{cycle}/{start_doc['data_session']}/"
# Identify scan streams
scan_streams = [s for s in hdr if s != "baseline" and "monitor" not in s]
# ROI information
roi_num = start_doc["scan"]["roi_num"]
roi_name = start_doc["scan"]["roi_names"][roi_num - 1]
roi_symbol, roi_line = roi_name.split("_")
roi_Z = xrl.SymbolToAtomicNumber(roi_symbol)
if "ka" in roi_line.lower():
roi_line_ind = xrl.KA_LINE
elif "kb" in roi_line.lower():
roi_line_ind = xrl.KB_LINE
elif "la" in roi_line.lower():
roi_line_ind = xrl.LA_LINE
elif "lb" in roi_line.lower():
roi_line_ind = xrl.LB_LINE
else:
logger.info("Line identification failed")
return
# Get bin values
E = xrl.LineEnergy(roi_Z, roi_line_ind)
E_bin = np.round(E * 100, decimals=0).astype(int)
E_width = 10
E_min = E_bin - E_width
E_max = E_bin + E_width
# Get ring current
ring_current_start = np.round(
hdr["baseline"]["data"]["ring_current"][0], decimals=0
).astype(str)
# Static header
staticheader = (
"# XDI/1.0 MX/2.0\n"
+ f"# Beamline.name: {hdr.start['beamline_id']}\n"
+ "# Facility.name: NSLS-II\n"
+ f"# Facility.ring_current: {ring_current_start}\n"
+ f"# IVU.harmonic: {hdr.start['scan']['harmonic']}\n"
+ "# Mono.name: Si 111\n"
+ f"# Scan.start.uid: {hdr.start['uid']}\n"
+ f"# Scan.start.scanid: {hdr.start['scan_id']}\n"
+ f"# Scan.start.time: {hdr.start['time']}\n"
+ f"# Scan.start.ctime: {ttime.ctime(hdr.start['time'])}\n"
+ f"# Scan.ROI.name: {roi_name}\n"
+ f"# Scan.ROI.number: {roi_num}\n"
+ f"# Scan.ROI.range: {f'[{E_min}:{E_max}]'}\n"
+ "# \n"
)
for stream in sorted(scan_streams):
# Set a filename
fname = f"scan_{hdr.start['scan_id']}_{stream}.txt"
fname = root + fname
# Get the full table
tbl = hdr[stream]["data"]
df = pd.DataFrame()
keys = [k for k in tbl.keys()[:] if "time" not in k]
for k in keys:
if "channel" in k:
# We will process later
continue
df[k] = np.squeeze(tbl[k].read())
df.set_index("energy", drop=True, inplace=True)
ch_names = [ch for ch in keys if "channel" in ch]
for ch in ch_names:
ch_data = np.squeeze(tbl[ch].read())
df[ch] = np.sum(ch_data[:, E_min:E_max], axis=1)
df.rename(columns={ch: ch.split("_")[-1]}, inplace=True)
df["ch_sum"] = df[[ch for ch in df.keys() if "channel" in ch]].sum(axis=1)
# Prepare for export
col_names = [df.index.name] + list(df.columns)
for i, col in enumerate(col_names):
staticheader += f"# Column {i + 1:02}: {col}\n"
staticheader += "# \n# "
# Export data to file
if dry_run:
logger.info("Dry run: xas fly exporter")
if len(df) >= 2:
logger.info(
"Dry run: first and last row: {pd.concat([df.head(1), df.tail(1)])}"
)
elif len(df) == 1:
logger.info("Dry run: row: {df}")
else:
logger.info("Dry run: (no data)")
else:
with open(fname, "w") as f:
f.write(staticheader)
df.to_csv(fname, float_format="%.3f", sep=" ", mode="a")
@flow(log_prints=True)
def xanes_exporter(ref, api_key=None, dry_run=False):
logger = get_run_logger()
logger.info("Start writing file with xanes_exporter...")
# Get scan type
scan_type = (
get_run(ref, api_key=api_key).start.get("scan", {}).get("type", "unknown")
)
# Redirect to correction function - or pass
if scan_type == "XAS_STEP":
logger.info("Starting xanes step-scan exporter.")
xas_step_exporter(ref, api_key=api_key, dry_run=dry_run)
logger.info("Finished writing file with xanes step-scan exporter.")
elif scan_type == "XAS_FLY":
logger.info("Starting xanes fly-scan exporter.")
xas_fly_exporter(ref, api_key=api_key, dry_run=dry_run)
logger.info("Finished writing file with xanes fly-scan exporter.")
else:
logger.info(f"xanes exporter for {scan_type=} not available")