-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTestingApp.py
More file actions
797 lines (633 loc) · 34.5 KB
/
TestingApp.py
File metadata and controls
797 lines (633 loc) · 34.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
import ttkbootstrap as ttk
from ttkbootstrap import validation, scrolled, window
from tkinter import filedialog
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import time
import os
import webbrowser
import nidaqmx
import scipy.optimize, scipy.signal
import requests
from threading import Thread
from constants import *
from plots import *
from messages import *
from config import *
from scope import *
from ni_daq import *
from timer import *
from visa_comms import *
from console import *
from analysis import *
from indicator import *
# Change nidaqmx read/write to this format? https://github.com/AppliedAcousticsChalmers/nidaqmxAio
# Tkinter has quite a high learning curve. If attempting to edit this source code without experience, I highly
# recommend going through some tutorials. The documentation on tkinter is also quite poor, but
# this website has the best I can find (http://www.tcl.tk/man/tcl8.5/TkCmd/contents.htm). At times you may
# need to manually go into the tkinter source code to investigate the behavior/capabilities of some code.
class TestingApp(ttk.Window):
def __init__(self):
super().__init__(themename=themename)
style = ttk.Style()
self.colors = style.colors
style.configure('TButton', **button_opts)
style.configure('TFrame', **frame_opts)
style.configure('TLabelframe.Label', **text_opts)
style.configure('TEntry', **entry_opts)
style.configure('TLabel', **text_opts)
style.configure('TRadiobutton', **text_opts)
style.configure('TNotebook.Tab', **text_opts)
style.configure('TCheckbutton', **text_opts)
self.option_add('*TCombobox*Listbox.font', text_opts)
# Special code for styling big buttons
style.configure('bigRed.TButton', font=(None, 24,), justify='center', background=self.colors.danger)
style.configure('bigOrange.TButton', font=(None, 24,), justify='center', background=self.colors.warning)
style.map('bigRed.TButton', background=[('active', '#e74c3c')])
style.map('bigOrange.TButton', background=[('active', '#f39c12')])
# # Initialize pins to default values
self.scopePins = scopeChannelDefaults
self.charge_ao_Pins = charge_ao_defaults
self.systemStatus_Pins = systemStatus_defaults
self.do_Pins = do_defaults
self.di_Pins = di_defaults
self.diagnostics_Pins = diagnostics_defaults
self.counters_Pins = counters_defaults
self.pulse_Pins = pulse_defaults
def center_app(self):
self.update_idletasks()
width = self.winfo_width()
frm_width = self.winfo_rootx() - self.winfo_x()
win_width = width + 2 * frm_width
height = self.winfo_height()
titlebar_height = self.winfo_rooty() - self.winfo_y()
win_height = height + titlebar_height + frm_width
x = self.winfo_screenwidth() // 2 - win_width // 2
y = self.winfo_screenheight() // 2 - win_height // 2
self.geometry(f'+{x}+{0}')
self.deiconify()
# There are two pieces of hardware important for communication with the test cart
# The NI panel extender provides an analog output and two analog inputs to read/write to the power supply during charging
# The Oscilloscope is triggered when the capacitor discharges and reads waveform from the discharge
def init_DAQ(self):
# We need both an analog input and output
self.NI_DAQ = NI_DAQ(systemStatus_sample_rate, systemStatus_channels=self.systemStatus_Pins,
charge_ao_channels=self.charge_ao_Pins, di_channels=self.di_Pins, diagnostics=self.diagnostics_Pins,
pulse_channels=self.pulse_Pins, counters=self.counters_Pins, n_pulses=n_pulses)
# Discharge the power supply on startup
self.powerSupplyRamp(action='discharge')
# Initialize the scope over ethernet
if USING_SCOPE:
try:
self.scope = Oscilloscope(self.scopePins)
except visa.errors.VisaIOError:
scopeErrorName = 'Oscilloscope Connection'
scopeErrorText = 'Cannot connect to oscilloscope because IP address is either incorrect or not present. Please make sure instrument is on and has IP address. The IP can be found on the instrument or NI MAX.'
scopeErrorWindow = MessageWindow(self, scopeErrorName, scopeErrorText)
scopeErrorWindow.wait_window()
# If the user presses the Okay button, charging begins
if scopeErrorWindow.OKPress:
self.on_closing()
def init_visaInstruments(self):
self.pulseGenerator = PulseGenerator()
# Setup delays
for variable, trigger_values in pulseGeneratorOutputs.items():
ref = trigger_values['ref']
delay = trigger_values['delay']
self.pulseGenerator.setDelay(variable, delay, ref_chan=ref)
def setRunNumber(self):
# Create master file if it does not already exist
self.runNumber = f'{0:05d}'
if resultsMasterName in os.listdir(self.saveFolder):
resultsMaster_df = pd.read_csv(f'{self.saveFolder}/{resultsMasterName}', low_memory=False)
runNumbers = resultsMaster_df[master_columns['runNumber']]
runNumber = int(max(runNumbers)) + 1
self.runNumber = f'{runNumber:05d}'
def saveResults(self):
'''MASTER FILE SAVE'''
master_columnsNames = [master_columns[variable] for variable in master_columns if hasattr(self, variable)]
# Create master file if it does not already exist
if resultsMasterName not in os.listdir(self.saveFolder):
resultsMaster_df = pd.DataFrame(columns=master_columnsNames)
resultsMaster_df.to_csv(f'{self.saveFolder}/{resultsMasterName}', index=False)
else:
resultsMaster_df = pd.read_csv(f'{self.saveFolder}/{resultsMasterName}')
# Save master results
resultMaster = []
for variable in master_columns:
if hasattr(self, variable):
resultMaster.append(getattr(self, variable))
resultMaster_df = pd.DataFrame(columns=master_columnsNames)
resultMaster_df.loc[0] = resultMaster
resultsMaster_df = pd.concat([resultsMaster_df, resultMaster_df])
resultsMaster_df.to_csv(f'{self.saveFolder}/{resultsMasterName}', index=False)
'''RUN FILE SAVE'''
# Create a folder for today's date if it doesn't already exist
if self.runDate not in os.listdir(self.saveFolder):
os.mkdir(f'{self.saveFolder}/{self.runDate}')
if SHOT_MODE:
self.filename = f'CMFX_{self.runNumber}.csv'
else:
self.filename = f'CMFX_{self.serialNumber}.csv'
# These results are listed in accordance with the 'columns' variable in constants.py
# If the user would like to add or remove fields please make those changes in constant.py
results = [getattr(self, variable) for variable in single_columns if hasattr(self, variable)]
# Creates a data frame which is easier to save to csv formats
results_df = pd.DataFrame([pd.Series(val, dtype='object') for val in results]).T
results_df.columns = [single_columns[variable]['name'] for variable in single_columns if hasattr(self, variable)]
results_df.to_csv(f'{self.saveFolder}/{self.runDate}/{self.filename}', index=False)
def saveScopeResults(self):
'''OSCILLOSCOPE FILE SAVE'''
scope_filename = f'CMFX_{self.runNumber}_scope.csv'
self.scope.set_runNumber(self.runNumber)
# These results are listed in accordance with the 'columns' variable in constants.py
# If the user would like to add or remove fields please make those changes in constant.py
results = [getattr(self.scope, variable) for variable in scope_columns if hasattr(self.scope, variable)]
# Creates a data frame which is easier to save to csv formats
results_df = pd.concat([pd.Series(val) for val in results], axis=1)
results_df.columns = [scope_columns[variable]['name'] for variable in scope_columns if hasattr(self.scope, variable)]
results_df.to_csv(f'{self.saveFolder}/{self.runDate}/{scope_filename}', index=False)
print('Done saving scope!')
# Read in a csv file and plot those results
def readResults(self):
readFile = filedialog.askopenfilename(filetypes=[('Comma separated values', '.csv')])
if readFile != '':
results_df = pd.read_csv(readFile, low_memory=False)
# Reset program and allow user to reset
self.resetButton.configure(state='normal')
for variable, description in single_columns.items():
if description['type'] == 'scalar':
if description['name'] in results_df:
scalar_data = results_df[description['name']].values[0]
setattr(self, variable, scalar_data)
else:
setattr(self, variable, 0)
else:
if description['name'] in results_df:
array_data = results_df[description['name']].dropna().values
setattr(self, variable, array_data)
else:
setattr(self, variable, np.array([]))
# This is to account for difference in DC and capacitor discharges
if hasattr(self, 'ignitronDelay'):
self.hvStart = 0
else:
self.ignitronDelay = 0
# Place values for all user inputs and plots
if SHOT_MODE:
for variable, entry in self.userEntries.items():
entry.delete(0, 'end')
entry.insert(0, getattr(self, variable))
# Load pre- and post-shot notes
self.preShotNotesEntry.text.delete('1.0', 'end')
self.postShotNotesEntry.text.delete('1.0', 'end')
self.preShotNotesEntry.text.insert('1.0', self.preShotNotes)
self.postShotNotesEntry.text.insert('1.0', self.postShotNotes)
self.replotCharge()
# Load the results plots
self.setData(self.resultsPlotData)
self.resultsPlotViewer.replot()
# Load the analysis plots
self.performAnalysis()
self.analysisPlotViewer.replot()
self.resultsSaved = True
self.filename = readFile.split('/')[-1]
def openSite(self):
webbrowser.open(githubSite)
def setSaveLocation(self):
# Creates folder dialog for user to choose save directory
self.saveFolder = filedialog.askdirectory(initialdir=os.path.dirname(os.path.realpath(__file__)), title='Select directory for saving results.')
if self.saveFolder != '':
self.saveFolderSet = True
# If the user inputs have already been set, enable the checklist button
if self.userInputsSet:
self.checklistButton.configure(state='normal')
def pinSelector(self):
# Create popup window with fields for username and password
self.setPinWindow = window.Toplevel(padx=setPinsPaddingX, pady=setPinsPaddingY)
self.setPinWindow.title('Set Pins')
# Bring pop up to the center and top
self.eval(f'tk::PlaceWindow {str(self.setPinWindow)} center')
self.setPinWindow.attributes('-topmost', True)
# This function places pin labels and dropdown menus in the popup window
def selectPins(channelDefaults, options):
pins = {}
nCols, nRows = self.setPinWindow.grid_size()
for i, channel in enumerate(channelDefaults):
channelVariable = ttk.StringVar()
channelVariable.set(channelDefaults[channel])
label = ttk.Label(self.setPinWindow, text=channel, **text_opts)
drop = ttk.OptionMenu(self.setPinWindow, channelVariable, channelDefaults[channel], *options)
label.grid(row=nRows + i, column=0, sticky='w', padx=(0, setPinsPaddingX), pady=(0, setPinsPaddingY))
drop.grid(row=nRows + i, column=1, sticky='w', padx=(setPinsPaddingX, 0), pady=(0, setPinsPaddingY))
pins[channel] = channelVariable
return pins
scopePinsOptions = selectPins(scopeChannelDefaults, scopeChannelOptions)
charge_ao_PinsOptions = selectPins(charge_ao_defaults, charge_ao_options)
systemStatus_PinsOptions = selectPins(systemStatus_defaults, systemStatus_options)
do_PinsOptions = selectPins(do_defaults, do_options)
di_PinsOptions = selectPins(di_defaults, di_options)
diagnostics_PinsOptions = selectPins(diagnostics_defaults, diagnostics_options)
counter_PinsOptions = selectPins(counters_defaults, counters_options)
pulse_PinsOptions = selectPins(pulse_defaults, pulse_options)
# Button on the bottom
nCols, nRows = self.setPinWindow.grid_size()
buttonFrame = ttk.Frame(self.setPinWindow)
buttonFrame.grid(row=nRows + 1, columnspan=2)
# Once the okay button is pressed, assign the pins
def assignPins():
for channel in scopePinsOptions:
self.scopePins[channel] = scopePinsOptions[channel].get()
for channel in charge_ao_PinsOptions:
self.charge_ao_Pins[channel] = charge_ao_PinsOptions[channel].get()
for channel in systemStatus_PinsOptions:
self.systemStatus_Pins[channel] = systemStatus_PinsOptions[channel].get()
for channel in do_PinsOptions:
self.do_Pins[channel] = do_PinsOptions[channel].get()
for channel in di_PinsOptions:
self.di_Pins[channel] = di_PinsOptions[channel].get()
for channel in diagnostics_PinsOptions:
self.diagnostics_Pins[channel] = diagnostics_PinsOptions[channel].get()
for channel in counter_PinsOptions:
self.counters_Pins[channel] = counter_PinsOptions[channel].get()
for channel in di_PinsOptions:
self.pulse_Pins[channel] = pulse_PinsOptions[channel].get()
print(self.scopePins)
print(self.charge_ao_Pins)
print(self.systemStatus_Pins)
print(self.do_Pins)
print(self.di_Pins)
print(self.diagnostics_Pins)
print(self.counters_Pins)
print(self.pulse_Pins)
self.setPinWindow.destroy()
okayButton = ttk.Button(buttonFrame, text='Set Pins', command=assignPins, bootstyle='success')
okayButton.pack()
self.setPinWindow.wait_window()
# Disable all buttons in the button frame
def disableButtons(self):
for w in self.buttons.winfo_children():
if isinstance(w, ttk.Button):
w.configure(state='disabled')
# Enable normal operation of all buttons in the button frame
def enableButtons(self):
for w in self.buttons.winfo_children():
if isinstance(w, ttk.Button):
w.configure(state='normal')
def checklist(self):
# Any time the checklist is opened, all buttons are disabled except for the save, help, and checklist
self.disableButtons()
self.checklistButton.configure(state='normal')
# Popup window appears to confirm charging
checklistName = 'Checklist Complete?'
checklistText = 'Has the checklist been completed?'
checklistWindow = MessageWindow(self, checklistName, checklistText)
checklistWindow.OKButton['text'] = 'Yes'
cancelButton = ttk.Button(checklistWindow.bottomFrame, text='Cancel', command=checklistWindow.destroy, bootstyle='danger')
cancelButton.pack(side='left')
checklistWindow.wait_window()
if checklistWindow.OKPress and self.userInputsSet:
self.enableButtons()
def operateSwitch(self, switchName, state):
# If state is false, power supply switch opens and load switch closes
# If state is true, power supply switch closes and load switch opens
if not DEBUG_MODE:
try:
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(f'{self.do_Pins[switchName]}')
task.write(state)
print(f'{switchName} in {state} state')
except Exception as e:
print(e)
print('Cannot operate switches')
# Sends signal from NI analog output to charge or discharge the capacitor
def powerSupplyRamp(self, action='discharge'):
if action == 'charge':
voltageValue = self.chargeVoltage / maxVoltagePowerSupply[POWER_SUPPLY] * maxAnalogInput * 1000
# The Genvolt power supplies requires that a current be prescribed in addition to voltage
if POWER_SUPPLY == 'PLEIADES' or POWER_SUPPLY == 'EB-100':
currentValue = self.chargeCurrent / maxCurrentPowerSupply[POWER_SUPPLY] * maxAnalogInput / 1000 # Charge with maximum current
else:
currentValue = 0
else:
voltageValue = 0
currentValue = 0
if not DEBUG_MODE:
self.NI_DAQ.write_value(voltageValue, currentValue)
def charge(self):
# Determine whether there are any faults in the HV supply
# if any([not self.booleanIndicators['HV On'].state,
# self.booleanIndicators['Interlock Closed'].state,
# self.booleanIndicators['Spark'].state,
# self.booleanIndicators['Over Temp Fault'].state,
# self.booleanIndicators['AC Fault'].state,
# not self.booleanIndicators['Door Closed 1'].state,
# not self.booleanIndicators['Door Closed 2'].state]):
if False:
name = 'Power Supply Fault'
text = 'Please evaluate power supply status. HV may not be enabled, interlock may be open, the door to the lab may be open, or there may be a fault.'
else:
name = 'Begin charging'
text = 'Are you sure you want to begin charging?'
# Popup window appears to confirm charging
chargeConfirmWindow = MessageWindow(self, name, text)
cancelButton = ttk.Button(chargeConfirmWindow.bottomFrame, text='Cancel', command=chargeConfirmWindow.destroy, bootstyle='danger')
cancelButton.pack(side='left')
chargeConfirmWindow.wait_window()
# If the user presses the Okay button, charging begins
if chargeConfirmWindow.OKPress:
self.NI_DAQ.reset_systemStatus() # Only start gathering data when beginning to charge
self.idleMode = False
# Arm lab computers over http
self.arm_http()
### Operate switches ###
# Close power supply switch
self.operateSwitch('Power Supply Switch', True)
time.sleep(switchWaitTime)
# Open dump switch
self.operateSwitch('Dump Switch', True)
time.sleep(switchWaitTime)
# Actually begin charging power supply
self.powerSupplyRamp(action='charge')
# Enable HV only AFTER writing charge values
self.operateSwitch('Enable HV', True)
self.charging = True
if SHOT_MODE:
# Record base pressures when charging begins
self.recordPressure()
def discharge(self):
def discharge_switch():
if not IGNITRON_MODE:
self.operateSwitch('Load Switch', True)
time.sleep(gasPuffWaitTime) # Hold central conductor at high voltage for a while to avoid bouncing switch until gas puff starts
# Start the pulse generator
self.runDate, self.runTime = self.pulseGenerator.triggerStart()
# Wait a while before closing the mechanical dump switch
time.sleep(hardCloseWaitTime)
self.operateSwitch('Dump Switch', False)
# Read from DAQ
self.NI_DAQ.read_discharge()
else:
# Start the pulse generator
self.runDate, self.runTime = self.pulseGenerator.triggerStart()
# Read from DAQ
self.NI_DAQ.read_discharge()
# Wait a while before closing the mechanical dump switch
time.sleep(hardCloseWaitTime)
self.operateSwitch('Dump Switch', False)
self.charging = False
self.discharged = True
# Save discharge on a separate thread
self.saveDischarge_thread = Thread(target=self.saveDischarge)
self.saveDischarge_thread.start()
def popup():
# Popup window to confirm discharge
dischargeConfirmName = 'Discharge'
dischargeConfirmText = 'Are you sure you want to discharge?'
dischargeConfirmWindow = MessageWindow(self, dischargeConfirmName, dischargeConfirmText)
cancelButton = ttk.Button(dischargeConfirmWindow.bottomFrame, text='Cancel', command=dischargeConfirmWindow.destroy, bootstyle='danger')
cancelButton.pack(side='left')
dischargeConfirmWindow.wait_window()
if dischargeConfirmWindow.OKPress:
# Reset the trigger just before discharging
self.NI_DAQ.reset_discharge_trigger()
# Force power supply to discharge
self.powerSupplyRamp(action='discharge')
self.NI_DAQ.remove_tasks(self.NI_DAQ.dump_task_names + self.NI_DAQ.switch_task_names)
# Operate switches
self.operateSwitch('Power Supply Switch', False)
time.sleep(switchWaitTime)
discharge_switch()
if not self.idleMode:
if not self.charged:
popup()
else:
discharge_switch()
else:
popup()
# Disable all buttons except for reset, if logged in
self.disableButtons()
if self.loggedIn:
self.resetButton.configure(state='normal')
def startDirectDrive(self):
# Determine whether there are any faults in the HV supply
# if any([not self.booleanIndicators['HV On'].state,
# self.booleanIndicators['Interlock Closed'].state,
# self.booleanIndicators['Spark'].state,
# self.booleanIndicators['Over Temp Fault'].state,
# self.booleanIndicators['AC Fault'].state,
# not self.booleanIndicators['Door Closed 1'].state,
# not self.booleanIndicators['Door Closed 2'].state]):
if False:
name = 'Power Supply Fault'
text = 'Please evaluate power supply status. HV may not be enabled, interlock may be open, the door to the lab may be open, or there may be a fault.'
else:
name = 'Begin Run'
text = 'Are you sure you want to begin the run?'
# Popup window appears to confirm charging
confirmWindow = MessageWindow(self, name, text)
cancelButton = ttk.Button(confirmWindow.bottomFrame, text='Cancel', command=confirmWindow.destroy, bootstyle='danger')
cancelButton.pack(side='left')
confirmWindow.wait_window()
# If the user presses the Okay button, charging begins
if confirmWindow.OKPress:
self.NI_DAQ.reset_systemStatus() # Only start gathering data when beginning to charge
# Reset the trigger just before discharging
self.NI_DAQ.reset_discharge_trigger()
self.idleMode = False
self.charging = True
# Arming lab computers over http
self.arm_http()
self.countdownStart = time.time()
print(f'Starting countdown for {countdownTime} seconds')
def dischargeDirectDrive(self):
def discharge_switch():
### Operate charging ###
print('Charging')
if SHOT_MODE:
# Record base pressures when charging begins
self.recordPressure()
# Close the load switch
self.operateSwitch('Load Switch', True)
time.sleep(switchWaitTime)
# Actually begin charging power supply
self.powerSupplyRamp(action='charge')
# Start the pulse generator
self.runDate, self.runTime = self.pulseGenerator.triggerStart()
# Read from DAQ
self.NI_DAQ.read_discharge()
# Actually begin discharging power supply before opening load switch
self.powerSupplyRamp(action='discharge')
# Wait a while before closing the mechanical dump switch
time.sleep(hardCloseWaitTime)
self.operateSwitch('Load Switch', False)
self.charging = False
self.discharged = True
# Save discharge on a separate thread
self.saveDischarge_thread = Thread(target=self.saveDischarge)
self.saveDischarge_thread.start()
def popup():
# Popup window to confirm discharge
dischargeConfirmName = 'Discharge'
dischargeConfirmText = 'Are you sure you want to discharge?'
dischargeConfirmWindow = MessageWindow(self, dischargeConfirmName, dischargeConfirmText)
cancelButton = ttk.Button(dischargeConfirmWindow.bottomFrame, text='Cancel', command=dischargeConfirmWindow.destroy, bootstyle='danger')
cancelButton.pack(side='left')
dischargeConfirmWindow.wait_window()
if dischargeConfirmWindow.OKPress:
# Reset the trigger just before discharging
self.NI_DAQ.reset_discharge_trigger()
self.NI_DAQ.remove_tasks(self.NI_DAQ.dump_task_names + self.NI_DAQ.switch_task_names)
discharge_switch()
if not self.idleMode:
discharge_switch()
else:
popup()
# Disable all buttons except for reset, if logged in
self.disableButtons()
if self.loggedIn:
self.resetButton.configure(state='normal')
def arm_http(self):
print('Arming lab computers over local network...')
# Send packet to local computer
try:
Thread(target=requests.get, args=(f'http://169.254.146.111/arm_diagnostics?n={self.runNumber}&dsc=1&time={max(1, self.dumpDelay / 1000 + post_dump_duration + pretrigger_duration)}',)).start()
Thread(target=requests.get, args=(f'http://169.254.146.131/arm_diagnostics?n={self.runNumber}&dsc=1&time={max(1, self.dumpDelay / 1000 + post_dump_duration + pretrigger_duration)}',)).start()
except requests.exceptions.ConnectionError:
print('Lab computers are not connected')
def replotCharge(self):
# Don't execute if using direct drive power supply
if POWER_SUPPLY == 'EB-100':
return
self.chargeVoltageLine.set_data(self.chargeTime, np.abs(self.chargeVoltagePS) / 1000)
self.chargeCurrentLine.set_data(self.chargeTime, self.chargeCurrentPS * 1000)
# If the capacitor is only being read every so often, only plot non nan values
try:
nanIndices = np.isnan(self.capacitorVoltage)
self.capacitorVoltageLine.set_data(self.chargeTime[~nanIndices], self.capacitorVoltage[~nanIndices] / 1000)
except IndexError:
print('Mismatch in shape')
if self.timePoint + 0.2 * plotTimeLimit > plotTimeLimit:
self.chargePlot.ax.set_xlim(self.timePoint - 0.8 * plotTimeLimit, self.timePoint + 0.2 * plotTimeLimit)
else:
self.chargePlot.ax.set_xlim(0, plotTimeLimit)
if len(self.capacitorVoltage) != 0:
if 1.2 * max(np.abs(self.capacitorVoltage)) / 1000 > voltageYLim:
self.chargePlot.ax.set_ylim(0, 1.2 * max(np.abs(self.capacitorVoltage)) / 1000)
if 1.2 * max(np.abs(self.chargeCurrentPS)) * 1000 > currentYLim:
self.chargeCurrentAxis.set_ylim(0, 1.2 * max(np.abs(self.chargeCurrentPS)) * 1000)
try:
self.bm.update()
except ValueError:
print('Mismatch in shape')
# Turn on safety lights inside the control room and outside the lab
def safetyLights(self):
print('Turn on safety lights')
def emergency_off(self):
print('Emergency Off')
# Force power supply to discharge
self.powerSupplyRamp(action='discharge')
# Close the switch tasks so we can close the switches with hardware instead of software
if hasattr(self, 'NI_DAQ'):
self.NI_DAQ.remove_tasks(self.NI_DAQ.dump_task_names + self.NI_DAQ.switch_task_names)
# Operate switches
self.operateSwitch('Enable HV', False)
self.operateSwitch('Power Supply Switch', False)
time.sleep(switchWaitTime)
self.operateSwitch('Dump Switch', False)
self.operateSwitch('Load Switch', False)
self.charging = False
self.discharged = True
self.idleMode = True
def validateLogin(self):
# If someone is not logged in then the buttons remain deactivated
def checkLogin(event):
# Obtain login status. To change valid usernames and password please see constants.py
self.username = self.usernameEntry.get()
self.password = self.passwordEntry.get()
self.loggedIn = self.username in acceptableUsernames and self.password in acceptablePasswords
# Once logged in, enable the save location and help buttons
if self.loggedIn:
self.loginWindow.destroy()
# If incorrect username or password, create popup notifying the user
else:
incorrectLoginName = 'Incorrect Login'
incorrectLoginText = 'You have entered either a wrong name or password. Please reenter your credentials or contact nickschw@umd.edu for help'
incorrectLoginWindow = MessageWindow(self, incorrectLoginName, incorrectLoginText)
# Clear username and password entries
self.usernameEntry.delete(0, 'end')
self.passwordEntry.delete(0, 'end')
# Create popup window with fields for username and password
self.loginWindow = window.Toplevel(padx=loginPadding, pady=loginPadding)
self.loginWindow.title('Login Window')
# Center and bring popup to the top
self.loginWindow.attributes('-topmost', True)
self.eval(f'tk::PlaceWindow {str(self.loginWindow)} center')
login_text = 'Please enter UMD username.'
password_text = 'Please enter password.'
button_text = 'Login'
self.usernameLabel = ttk.Label(self.loginWindow, text=login_text, **text_opts)
self.usernameEntry = ttk.Entry(self.loginWindow, **entry_opts)
self.passwordLabel = ttk.Label(self.loginWindow, text=password_text, **text_opts)
self.passwordEntry = ttk.Entry(self.loginWindow, show='*', **entry_opts)
self.loginButton = ttk.Button(self.loginWindow, text=button_text, command=lambda event='Okay Press': checkLogin(event), bootstyle='success')
# User can press 'Return' key to login instead of loginButton
self.loginWindow.bind('<Return>', checkLogin)
self.usernameLabel.pack()
self.usernameEntry.pack()
self.passwordLabel.pack()
self.passwordEntry.pack()
self.loginButton.pack()
self.loginWindow.wait_window()
# Special function for closing the window and program
def on_closing(self):
self.powerSupplyRamp(action='discharge')
# Close the switch tasks so we can close the switches with hardware instead of software
if hasattr(self, 'NI_DAQ'):
self.NI_DAQ.remove_tasks(self.NI_DAQ.dump_task_names + self.NI_DAQ.switch_task_names)
# Open power supply and Cap Switch and close load switch
self.operateSwitch('Enable HV', False)
self.operateSwitch('Power Supply Switch', False)
time.sleep(switchWaitTime)
self.operateSwitch('Dump Switch', False)
self.operateSwitch('Load Switch', False)
if not DEBUG_MODE:
# Stop NI communication
self.NI_DAQ.close()
# Close visa communication with scope
if hasattr(self, 'scope'):
try:
self.scope.inst.close()
except visa.errors.VisaIOError:
pass
# Close plots
plt.close('all')
# Cancel all scheduled callbacks
for after_id in self.tk.eval('after info').split():
self.after_cancel(after_id)
self.quit()
self.destroy()
def getChargingTestVoltages(self):
if hasattr(self, 'waveform') and not self.discharged and len(self.waveform) != 0:
powerSupplyVoltage = np.abs(self.waveform[0] + (np.random.rand() - 0.5) * 0.01)
capacitorVoltage = np.abs(self.waveform[0] + (np.random.rand() - 0.5) * 0.01)
powerSupplyCurrent = np.random.rand() * 0.01
values = [powerSupplyVoltage, powerSupplyCurrent, capacitorVoltage]
# remove the first element so that the next element is acquired on the next iteration
self.waveform = self.waveform[1:]
else:
powerSupplyVoltage = np.random.rand() * 0.01
powerSupplyCurrent = np.random.rand() * 0.01
capacitorVoltage = np.random.rand() * 0.01
values = [powerSupplyVoltage, powerSupplyCurrent, capacitorVoltage]
return values
def getDischargeTestValues(self):
time = np.linspace(0, 1, 100)
tUnit = 's'
voltage = self.chargeVoltage * voltageDivider * np.exp( - time / RCTime)
current = pearsonCoilDischarge * np.exp( - time / RCTime)
return (voltage, current, time, tUnit)
# Popup window for help
def help(self):
webbrowser.open(f'{githubSite}/blob/main/README.md')