diff --git a/RMS/Routines/CustomPyqtgraphClasses.py b/RMS/Routines/CustomPyqtgraphClasses.py index f2d2a6156..5d238d290 100644 --- a/RMS/Routines/CustomPyqtgraphClasses.py +++ b/RMS/Routines/CustomPyqtgraphClasses.py @@ -15,6 +15,7 @@ from RMS.Formats.FFfile import reconstructFrame as reconstructFrameFF from RMS.Routines import Image from RMS.Routines.DebruijnSequence import findAllInDeBruijnSequence, generateDeBruijnSequence +from PyQt5.QtWidgets import QDialog, QVBoxLayout, QLabel, QComboBox, QPushButton, QSizePolicy import time import re @@ -169,6 +170,37 @@ def setIcon(self, icon): pass +class ComboDialog(QDialog): + def __init__(self, options_dict, window_title="Select", label="Options"): + super().__init__() + + self.resize(300,100) + self.setWindowTitle(window_title) + + # Layout + layout = QVBoxLayout() + + # Label + self.label = QLabel(label) + layout.addWidget(self.label) + + # Combo Box + self.combo = QComboBox() + self.combo.addItems(options_dict.keys()) + layout.addWidget(self.combo) + + # Button + + ok_button = QPushButton("OK") + ok_button.clicked.connect(self.accept) + layout.addWidget(ok_button) + + self.setLayout(layout) + + def get_selection(self): + return self.combo.currentText() + + class TextItemList(pg.GraphicsObject): """ Allows for a list of TextItems without having to constantly add items to a widget diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index 7c790dbac..4dea8dd0d 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -18,6 +18,12 @@ import scipy.optimize import pyqtgraph as pg import random +import tempfile +import tarfile +import shutil +import paramiko +import subprocess + from RMS.Astrometry.ApplyAstrometry import xyToRaDecPP, raDecToXYPP, \ rotationWrtHorizon, rotationWrtHorizonToPosAngle, computeFOVSize, photomLine, photometryFit, \ @@ -444,9 +450,9 @@ def __len__(self): class PlateTool(QtWidgets.QMainWindow): - def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None, use_fr_files=False, + def __init__(self, input_path, config, config_path=os.getcwd(), beginning_time=None, fps=None, gamma=None, use_fr_files=False, geo_points_input=None, startUI=True, mask=None, nobg=False, peribg=False, flipud=False, - flatbiassub=False): + flatbiassub=False, platepar_file=None, fits_file_to_open=None): """ SkyFit interactive window. Arguments: @@ -469,10 +475,23 @@ def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None coloured mask instead of the avepixel. False by default. flipud: [bool] Flip the image upside down. False by default. flatbiassub: [bool] Subtract flat and bias frames. False by default. + platepar_file: [str] path to platepar file to load + fits_file: [str] path to fits file to open """ super(PlateTool, self).__init__() + + mask_path, fits_file, star_count_max = None, None, 0 + if input_path is None: + input_path, platepar_file, mask_path, mask, fits_file, star_count_max, config, config_path = handleNoInputPath(input_path=input_path) + else: + if len(input_path) == 6: + input_path, platepar_file, mask_path, mask, fits_file, star_count_max, config, config_path = handleNoInputPath( + input_path=input_path) + + fits_file_to_open = fits_file + print("Starting with file {}".format(fits_file_to_open)) # Mode of operation - skyfit for fitting astrometric plates, manualreduction for manual picking # of position on frames and photometry self.mode = 'skyfit' @@ -488,6 +507,7 @@ def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None self.config = config + # Store forced time of first frame self.beginning_time = beginning_time @@ -608,6 +628,8 @@ def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None # Detect data input type and init the image handle self.detectInputType(load=True, beginning_time=beginning_time, use_fr_files=self.use_fr_files) + if fits_file_to_open is not None and os.path.exists(fits_file_to_open): + self.img_handle.setCurrentFF(os.path.basename(fits_file_to_open)) # Update the FPS if it's forced self.setFPS() @@ -639,11 +661,13 @@ def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None self.loadCalstars() + ################################################################################################### # PLATEPAR # Load the platepar file - self.loadPlatepar() + print("Platepar is {}".format(platepar_file)) + self.loadPlatepar(platepar_file=platepar_file) # Set the given gamma value to platepar @@ -657,11 +681,16 @@ def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None ################################################################################################### - print() + + + if fits_file is not None: + print("Loading {}".format(fits_file)) + + # INIT WINDOW if startUI: - self.setupUI() + self.setupUI(starting_ff=fits_file) def setFPS(self): @@ -674,13 +703,15 @@ def setFPS(self): print("Forcing video FPS to:", self.fps) - def setupUI(self, loaded_file=False): + def setupUI(self, loaded_file=False, starting_ff=None): """ Setup pyqt UI with widgets. No variables worth saving should be defined here. Keyword arguments: loaded_file: [bool] Loaded a state from a file. False by default. """ + + self.central = QtWidgets.QWidget() self.setCentralWidget(self.central) @@ -4490,6 +4521,7 @@ def detectInputType(self, beginning_time=None, use_fr_files=False, load=False): self.img_handle = img_handle + def loadCalstars(self): """ Loads data from calstars file and updates self.calstars """ @@ -4589,6 +4621,7 @@ def loadPlatepar(self, update=False, platepar_file = None): if self.config.platepar_name in os.listdir(self.dir_path): initial_file = os.path.join(self.dir_path, self.config.platepar_name) + platepar_file = initial_file else: initial_file = self.dir_path @@ -6498,6 +6531,7 @@ def saveECSV(self): + def getRollingShutterCorrectedFrameNo(self, frame, pick): """ Given a pick object, return rolling shutter corrected (or not, depending on the config) frame number. @@ -6539,7 +6573,7 @@ def furthestStar(self, miss_this_one=False, min_separation=15): miss_this_one: Return coordinates of a different star at random, but don't mark anything. min_separation: Minimum separation in pixels between stars. - Returns: + Return: (x,y) integers of the image location of the furthest star away from all other matched stars. """ @@ -6564,10 +6598,16 @@ def furthestStar(self, miss_this_one=False, min_separation=15): def getMarkedStars(include_unsuitable=True): + """Returns: a list of stars which are either marked as paired, or bad in image coordinates. - """ + Arguments: - Returns: a list of stars which are either marked as paired, or bad in image coordinates + Keyword Arguments: + include_unsuitable: [bool] Include stars marked as unsuitable. + + Return: + marked_x: [list] of x coordinates. + marked_y: [list] of y coordindates. """ @@ -6588,17 +6628,19 @@ def getMarkedStars(include_unsuitable=True): def isDouble(x,y, reference_x_list, reference_y_list, min_separation=5): - """ - Are x,y coordinates which are very close to, but distinct from all coordinates in reference list + """ Are x,y coordinates which are very close to, but distinct from all coordinates in reference list. + + Arguments: + x: [int] Image coordinates of star. + y: [int] Image coordinates of star. + reference_x_list: [list] List of x image coordinates. + reference_y_list: [list] List of y image coordinates. - Args: - x: image coordinates of star - y: image coordinates of star - reference_x_list: list of x image coordinates - reference_y_list: list of y image coordinates + Keyword Arguments: + min_separation: [int] Minimum separation not to be considered a double star. - Returns: - [bool] True if star is within min_separation of another star + Return: + [bool] True if star is within min_separation of another star. """ for reference_x, reference_y in zip(reference_x_list, reference_y_list): @@ -6615,17 +6657,19 @@ def getVisibleUnmarkedStarsAndDistanceToMarked(marked_x_list, marked_y_list, min """ From the catalogue of filtered stars return a lists of coordinates stars which are not marked, - and another list which is the distance to the nearest marked star + and another list which is the distance to the nearest marked star. - Args: - marked_x_list: list of marked star x coordinates - marked_y_list: list of marked star y coordinates - min_separation: minimum separation to be regarded as a different stra + Arguments: + marked_x_list: [list] list of marked star x coordinates. + marked_y_list: [list] list of marked star y coordinates. - Returns: - unmarked_x_list: list of unmarked star x coordinates - unmarked_y_list: list of unmarked star x coordinates - dist_nearest_marked_list: distance of the nearest marked star for returned star coordinates + Keyword Arguments: + min_separation: [int] Minimum seperation not be regarded as a double star. + + Return: + unmarked_x_list: list of unmarked star x coordinates. + unmarked_y_list: list of unmarked star x coordinates. + dist_nearest_marked_list: distance of the nearest marked star for returned star coordinates. """ @@ -6727,6 +6771,872 @@ def getVisibleUnmarkedStarsAndDistanceToMarked(marked_x_list, marked_y_list, min +def handleBZ2(bz2_path, mask_path): + """Passed a path to a bz2 file, unpack and prepare a working area for PlateTool, and launch. + + Arguments: + bz2_path: [path] Path to a bz2 file. + + Return: + working_dir: [path] Path to a directory containing .config, fits files and if available, a mask. + """ + + bz2_path = os.path.expanduser(bz2_path) + bz2_basename = os.path.basename(bz2_path) + stationID = bz2_basename.split("_")[0] + + with tempfile.TemporaryDirectory() as working_dir: + print("Extracting {}".format(bz2_basename)) + with tarfile.open(bz2_path, 'r:bz2') as tar: + tar.extractall(path=working_dir) + config_path = os.path.join(working_dir, ".config") + if os.path.exists(config_path): + config = cr.parse(config_path) + else: + print("No config file found in {}".format(bz2_basename)) + print("Quitting") + exit() + if mask_path is None: + mask_path = os.path.join(working_dir, config.mask_file) + else: + if os.path.isfile(os.path.expanduser(mask_path)): + mask_path = os.path.expanduser(mask_path) + else: + mask_path = os.path.join(os.path.expanduser(mask_path), config.mask_file) + if os.path.exists(mask_path): + mask = getMaskFile(".", config) + + # If the dimensions of the mask do not match the config file, ignore the mask + if (mask is not None) and (not mask.checkResolution(config.width, config.height)): + print( + "Mask resolution ({:d}, {:d}) does not match the image resolution ({:d}, {:d}). Ignoring the mask.".format( + mask.width, mask.height, config.width, config.height)) + mask = None + + # Init SkyFit + plate_tool = PlateTool(working_dir, config, beginning_time=beginning_time, fps=cml_args.fps, \ + gamma=cml_args.gamma, use_fr_files=cml_args.fr, geo_points_input=cml_args.geopoints, + mask=mask, nobg=cml_args.nobg, peribg=cml_args.peribg, flipud=cml_args.flipud, + flatbiassub=cml_args.flatbiassub) + + # Run the GUI app + a = app.exec_() + temp_platepar_location = os.path.join(working_dir, config.platepar_name) + platepar_destination = os.path.join(os.getcwd(), "{}.cal".format(config.stationID.lower())) + print("Writing modified platepar to {}".format(platepar_destination)) + shutil.copy2(temp_platepar_location, platepar_destination) + sys.exit(a) + +def lsRemote(host, username, port, remote_path): + """Return the files in a remote directory. + + Arguments: + host: [str] remote host. + username: [str] user account to use. + port: [int] remote port number. + remote_pat: [str] path of remote directory to list. + + Return: + files: [list of strings] Names of remote files. + """ + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Accept unknown host keys + ssh.connect(hostname=host, port=port, username=username) + + try: + sftp = ssh.open_sftp() + print("Remote path {}".format(remote_path)) + files = sftp.listdir(remote_path) + return files + finally: + sftp.close() + ssh.close() + +def downloadFile(host, username, port, remote_path, local_path): + """Download a single file try compressed rsync first, then fall back to Paramiko + + Arguments: + host: [str] hostname of remote machine. + username: [str] username for remote machine. + port: [str] port. + remote_path: [path] full path to destination. + local_path: [path] full path of local target. + + Return: + Nothing. + """ + + try: + + remote = "{}@{}:{}".format(username, host, remote_path) + result = subprocess.run(['rsync', '-z', remote], capture_output=True, text=True) + if "No such file or directory" in result.stderr : + print("Remote file {} was not found.".format(os.path.basename(remote))) + return + else: + result = subprocess.run(['rsync', '-z', remote, local_path], capture_output=True, text=True) + if not os.path.exists(os.path.expanduser(local_path)): + print("Download of {} from {}@{} failed. You need to add your keys to remote using ssh-copy-id.".format(remote_path, username,host)) + quit() + return + except: + pass + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Accept unknown host keys + try: + ssh.connect(hostname=host, port=port, username=username) + except: + print("Login to {}@{} failed. You need to add your keys to remote using ssh-copy-id.".format(username,host)) + quit() + try: + sftp = ssh.open_sftp() + remote_file_list = sftp.listdir(os.path.dirname(remote_path)) + if remote_file_list: + sftp.get(remote_path, local_path) + + finally: + sftp.close() + ssh.close() + + return + +def nItemsFromList(number, input_list, drop_first=False, drop_last=False, sort=True): + """Return a list of length number, containing equally spaced items from input list. + + + + Arguments: + number: [int] Number of list items to return. Can be more than the length of the input list, + in which case items will be duplicated to pad to length. + input_list: [list] Input list. + + Keyword arguments: + drop_first: [bool] If true, remove the first item from the list. + drop_last: [bool] If true, remove the last item from the list. + + Return: + output_list: [list] list of length number + """ + + if number is None: + return input_list + + # Avoid divide by zero error + if number == 1: + return [input_list[-1]] + + # Avoid working on empty list + + if len(input_list) < 1: + return[] + + # Truncate as required + input_list = input_list[1:] if drop_first else input_list + input_list = input_list[:-1] if drop_last else input_list + + # Sort the list + if sort: + input_list.sort() + + output_list, n, gap = [], 0, (len(input_list)) / (number) + for i in range(0, number): + output_list.append(input_list[round(n)]) + n += gap + + return output_list + +def getFiles(host, user, port, remote_path, local_path, files_list, number=None): + """Passed a list of files, get from remote path and put in local path. + + Arguments: + host: [str] hostname. + user: [str] user account. + port: [str] port. + remote_path: [str] remote path to get files from. + local_path: [str] local path to put files in. + + Keyword Arguments: + number: [int] Optional, default None. The number of files to download from the list. If none, + or more than the number of files in the list, download all. If 1, download penultimate, if 0, + download middle. + + + Return: + local_target_list: [list] list of retrieved files. + """ + + files_list.sort() + if number == 0: + # Pick approximately the middle from the fil + files_list = [files_list[len(files_list) // 2]] + else: + files_list = nItemsFromList(number, files_list, drop_last=True, sort=True) + local_target_list = [] + for f in files_list: + local_target, remote_target = os.path.join(local_path, f), os.path.join(remote_path, f) + text = highlight("Downloading ", files_list, f) + print(text, end='\r') + downloadFile(host, user, port, remote_target, local_target) + local_target_list.append(local_target) + text = highlight("Downloading ", files_list, f, all_done = True) + print(text) + return local_target_list + +def uploadFile(host, username, port, local_path, remote_path): + """Upload a single file. + + Arguments: + host: [str] hostname of remote machine. + username: [str] username for remote machine. + port: [str] port. + local_path: [path] full path to file to be uploaded. + remote_path: [path] full path to destination. + + Return: + Nothing. + """ + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Accept unknown host keys + try: + ssh.connect(hostname=host, port=port, username=username) + except: + print("Login to {}@{} failed. You need to add your keys to remote using ssh-copy-id.".format(username,host)) + + try: + sftp = ssh.open_sftp() + sftp.put(local_path, remote_path) + print("Uploaded {} to {}".format(local_path, remote_path)) + finally: + sftp.close() + ssh.close() + +def putFiles(host, user, port, local_path, remote_path, files_list): + """Passed a list of files, put from local_path to remote path. + + Arguments: + host: [str] hostname. + user: [str] user account. + port: [str] port. + local_path: [str] local path to get files from. + remote_path: [str] remote path to put files in. + + + Return: + local_target_list: [list] list of retrieved files. + """ + + local_target_list = [] + for f in files_list: + local_target, remote_target = os.path.join(local_path, f), os.path.join(remote_path, f) + if os.path.exists(local_target): + uploadFile(host, user, port, local_target, remote_target) + local_target_list.append(local_target) + else: + print("Upload target {} not found on local.".format(local_target)) + return local_target_list + +def getUserHostPortPath(path): + """Passed a user@host:port:path, or user@host:path return components, assuming port 22, and + if no path, assume source/RMS/ + + Arguments: + path: [str] path to be broken apart. + + Return: + user: [str] e.g. rms + host: [str] e.g. raspberrypi + port: [str] e.g. 22 + path: [str] e.g. 192.168.1.2 + """ + + pattern_port = r'^([\w.-]+)@([\w.-]+):(\d+):(.*)$' + match_port = re.match(pattern_port, path) + + if match_port: + user, host, port, path = match_port.groups() + return user, host, port, path + + pattern = r'^([^@:\s]+)@([^@:\s]+):([^\s]+)$' + match = re.match(pattern, path) + + if match: + user, host, path = match.groups() + return user, host, 22, path + + pattern = r'^([\w\.-]+)@([\w\.-]+)$' + match_username_hostname = re.match(pattern, path) + + if match_username_hostname: + user, host = match_username_hostname.groups() + return user, host, 22, "source/RMS/" + + + return None, None, None, None + +def highlight(custom_text, list, highlight, all_done=False): + + HEADER = '\033[95m' + OKBLUE = '\033[94m' + OKCYAN = '\033[96m' + OKGREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + + output = HEADER + custom_text + OKBLUE + if len(list) > 5: + output += "\n" + i = 0 + highlight_passed = False + for item in list: + i += 1 + if item == highlight and not all_done: + highlight_passed = True + output += WARNING + output += "{}".format(item) + output += ENDC + " " + elif highlight_passed == True and not all_done: + output += OKBLUE + output += "{}".format(item) + output += ENDC + " " + else: + output += OKGREEN + output += "{}".format(item) + output += ENDC + " " + + if i % 6 == 0: + output += "\n" + " " * (len(custom_text) + 2) + output += ENDC + + return output + +def getRemoteCapturedDirsPath(rc): + """Passed a config, get the path to the captured files on the remote machine. + + config parser will expond the ~ in any remote path as though it is on the local machine. + This function strips off the local ~ and joins the remote ~ + + Arguments: + rc: [config] the remote RMS config instance. + + Return: + [path] path to remote captured files. + """ + + len_local_home_directory = len(os.path.expanduser("~")) + len("/") + return os.path.join(rc.data_dir[len_local_home_directory:], rc.captured_dir) + +def getLatestCapturedDirectory(r, host, user, port): + """Get the latest captured directory from the remote machine. + + Arguments: + r: [config] the remote RMS config instance. + host: [str] remote host. + user: [str] remote user. + port: [str] remote port. + + Return: + [path] path to remote captured files directory. + """ + + remote_directory = getRemoteCapturedDirsPath(r) + print("Getting directory list of {}@{}:{}:{}".format(user, host, port, remote_directory)) + remote_captured_directory_list = lsRemote(host, user, port, remote_directory) + remote_captured_directory_list = [d for d in remote_captured_directory_list if d.startswith(r.stationID)] + remote_captured_directory_list.sort(reverse=True) + return remote_captured_directory_list[0] + +def isLoginPath(path): + """Passed a path see if it is a path to a remote RMS installation. + + Arguments: + path: [str] String to be tested. + + Return: + is_login_path: [bool] True if a network path, else false. + """ + pattern_username_hostname = r'\b[\w\.-]+@[\w\.-]+\b' + pattern_port = r'^([\w.-]+)@([\w.-]+):(\d+):(.*)$' + pattern = r'^[^@:\s]+@[^@:\s]+:[^\s]+$' + is_login_path = re.match(pattern, path) or re.match(pattern_port, path) or re.match(pattern_username_hostname, path) + + return is_login_path + +def handleLoginPath(login_path, number_of_fits=None): + """Passed a login path, retrieve necessary files and start the platetool. + + Arguments: + login_path: [str] user@host:port:path/to/.config or user@host:path/to/.config + + + Return: + Nothing. + """ + # If no argyment was passsed then set as one, return approximiate middle file + number_of_fits = 1 if number_of_fits is None else int(number_of_fits) + user, host, port, remote_path = getUserHostPortPath(login_path) + + # If no config path passed in then assume ~/source/RMS for .config + remote_path = "source/RMS" if not len(remote_path) else remote_path + config_file_name = '.config' + if remote_path.endswith(config_file_name): + remote_path = remote_path[:-len(config_file_name)] + print("Getting .config from {}@{}:{}:{}".format(user, host, port, remote_path)) + # Create temporary directory + with tempfile.TemporaryDirectory() as local_path: + + # First get the .config file + files_list = ['.config'] + + # Record a start time + time_started_getting_files = datetime.datetime.now(datetime.timezone.utc) + + # Parse the remote configuration + remote_config = cr.parse(getFiles(host, user, port, remote_path, local_path, files_list)[0]) + + # Start making a list of files to get; the platepar, and the mask + platepar_mask_list = [remote_config.platepar_name, remote_config.mask_file] + + # Get the platepar and and the mask + getFiles(host, user, port, remote_path, local_path, platepar_mask_list) + + # Now get the remote paths of the remote directories of interest + latest_cap_dir = os.path.join(getRemoteCapturedDirsPath(remote_config), getLatestCapturedDirectory(remote_config, host, user, port)) + latest_captured_files = lsRemote(host, user, port, latest_cap_dir) + + # Filter only for fits files for the remote station + fits_files = [f for f in latest_captured_files if f.endswith(".fits") and f.startswith("FF_{}".format(remote_config.stationID))] + + # Get the files + getFiles(host, user, port, latest_cap_dir, local_path, fits_files, number=number_of_fits) + + # Record and print the time taken + time_finished_getting_files = datetime.datetime.now(datetime.timezone.utc) + time_taken = (time_finished_getting_files - time_started_getting_files).total_seconds() + print("Files retrieved in {:.1f} seconds".format(time_taken)) + + # Load the mask structure + mask_path = os.path.join(local_path, remote_config.mask_file) + if os.path.exists(mask_path): + mask_dir = os.path.dirname(mask_path) + mask = getMaskFile(mask_dir, remote_config) + + # If the dimensions of the mask do not match the config file, ignore the mask + if (mask is not None) and (not mask.checkResolution(remote_config.width, remote_config.height)): + print( + "Mask resolution ({:d}, {:d}) does not match the image resolution ({:d}, {:d}). Ignoring the mask.".format( + mask.width, mask.height, remote_config.width, remote_config.height)) + mask = None + + # Init SkyFit + plate_tool = PlateTool(local_path, remote_config, beginning_time=beginning_time, fps=cml_args.fps, \ + gamma=cml_args.gamma, use_fr_files=cml_args.fr, geo_points_input=cml_args.geopoints, + mask=mask, nobg=cml_args.nobg, peribg=cml_args.peribg, flipud=cml_args.flipud, + flatbiassub=cml_args.flatbiassub) + + # Run the GUI app + a = app.exec_() + files_list = [remote_config.platepar_name] + putFiles(host, user, port, local_path, remote_path, files_list) + sys.exit(a) + + +def getFITSMostStars(calstars_full_path): + """ + Use the calstars file to find the fits file with the most stars. + + Argument: + calstars_full_path: [str] full path to the calstars file. + + Return: + best_fits_file:[str] full path to the fits file with the most stars. + star_count_max[int]: number of stars on that fits file. + """ + + best_fits_file, star_count_max = None, 0 + + # Try and load in the calstars file + if os.path.exists(calstars_full_path): + captured_directory_full_path = os.path.dirname(calstars_full_path) + calstars_file_name = os.path.basename(calstars_full_path) + calstar_data, _ = CALSTARS.readCALSTARS(captured_directory_full_path, calstars_file_name) + + # Iterate through the calstar files, looking for the fits file which exists, and has the most stars + for calstar_entry in calstar_data: + fits_file = calstar_entry[0] + star_count = len(calstar_entry[1]) + if star_count > star_count_max: + if os.path.exists(os.path.join(captured_directory_full_path, fits_file)): + best_fits_file = os.path.join(captured_directory_full_path, fits_file) + star_count_max = star_count + + return best_fits_file, star_count_max + +def getCalstarsPath(captured_directory, config): + + """ + Get the path to the calstars file. + + Arguments: + captured_directory: [str] RMS captured files directory. + config: [config] RMS config instance. + + Return: + calstars_full_path:[str] full path to the calstars file. + """ + + dir_date = captured_directory.split("_")[1] + dir_time = captured_directory.split("_")[2] + dir_us = captured_directory.split("_")[3] + captured_directory_full_path = os.path.join(config.data_dir, config.captured_dir, captured_directory) + calstars_file_name = "CALSTARS_{}_{}_{}_{}.txt".format(config.stationID, dir_date, dir_time, dir_us) + calstars_full_path = os.path.join(captured_directory_full_path, calstars_file_name) + + return calstars_full_path + +def expandUserList(path_list, file_name): + """ + Given a list of paths, expand each path and add file_name to the end if the path is a directory. + + Arguments: + path_list: [list] list of paths to be expanded. + file_name: [str] file name to be appended to each list item, if the list item is not this file already, and + the directory is a target. + + Return: + target_path_list: [list] returned list of paths, expanded, and with the file_type appended to each one. + """ + + target_path_list = [] + if path_list is None: + return path_list + for path in path_list: + target = os.path.expanduser(path) + if not os.path.basename(target) == file_name and os.path.isdir(target): + target = os.path.join(target, file_name) + if os.path.isfile(target): + target_path_list.append(target) + else: + target_path_list.append(path) + return target_path_list + +def getPlateparFilePath(config): + """ + + Arguments: + config: [config] RMS config instance. + + Return: + platepar_file_path: [str] full path to a platepar file else None. + """ + + potential_platepar_path = os.path.join(os.getcwd(), config.platepar_name) + if os.path.exists(potential_platepar_path): + platepar_file_path = potential_platepar_path + else: + platepar_file_path = None + + return platepar_file_path + +def handleNoInputPath(input_path=None): + """ + If no input path is specified then check to see if a single station name was passed, i.e. au000d, + or if nothing was passed open a dialog box with a list of stations. + + Return: + captured_directory_full_path: [str] full path to the most recent captured directory which contains at least one fits file + platepar_file: [str] full path to the platepar file + mask_path: [str] full path to the mask file + mask: [img] the mask + best_fits_file: [str] full path to the fits file with the most stars + star_count_max: [int] the number of stars on the best_fits_file + c: [config] rms config instance + cml_args.config_path: [path] path to the config file to use + + + """ + station_from_command_line = None + if input_path is not None: + if len(input_path) == 6 and input_path[0:2].isalpha(): + station_from_command_line = input_path.upper() + else: + station_from_command_line = None + # This will hold the configs, and paths to platepars and masks for all valid stations found + config_platepar_mask_dict = {} + + best_fits_file = None + star_count_max = 0 + + # Load the config in ~/source/RMS/.config + if cml_args.config is None: + c = cr.parse(os.path.expanduser(os.path.join(os.getcwd(), ".config"))) + cml_args.config_path = os.path.expanduser(os.path.join(os.getcwd(), ".config")) + else: + c = cr.parse(os.path.expanduser(cml_args.config)) + cml_args.config_path = os.path.expanduser(cml_args.config) + + # Handle the single station per user account case + if not c.stationID.startswith("XX"): + # If we have fits, then populate + if anyFits(verifyCapturedDirectories(getCapturedDirectoryObjects(c), c), c): + config_platepar_mask_dict[c.stationID] = [ c, + cml_args.config_path, + getPlateparPath(os.getcwd), + getMaskPath(os.getcwd()) + ] + + # Are we in a multiple camera per username environment + # Check to see if there is a XX at the start of the stationID or the ~/source/Stations directory exists + multi_cam_stations_directory = os.path.join(os.path.dirname(os.getcwd()), "Stations") + if c.stationID.startswith("XX") or os.path.exists(multi_cam_stations_directory): + + # This dictionary will hold lists [config, platepar_path, mask_path], key will be station directory + if os.path.exists(multi_cam_stations_directory): + potential_station_directory_list = sorted(os.listdir(multi_cam_stations_directory)) + # Iterate over the stations and collect what we can + for potential_station_directory in potential_station_directory_list: + full_path_potential_station_directory = os.path.join(multi_cam_stations_directory, potential_station_directory) + config_path = os.path.join(full_path_potential_station_directory, os.path.basename(c.config_file_name)) + + # If we have a valid config, load it, if it is not valid, then move to the next stations + try: + # Load the multi-cam config + mc_c = cr.parse(os.path.expanduser(config_path)) + except: + continue + + # If no fits file was found, then skip this station + if not anyFits(verifyCapturedDirectories(getCapturedDirectoryObjects(mc_c), mc_c), mc_c): + continue + + if mc_c.stationID == potential_station_directory: + config_platepar_mask_dict[potential_station_directory] = \ + [mc_c, + config_path, + getPlateparPath(potential_station_directory, multi_cam=True), + getMaskPath(potential_station_directory, multi_cam=True)] + + # Is the station passed in from the command line available in the dictionary of stations + if station_from_command_line in config_platepar_mask_dict.keys(): + selected_station = station_from_command_line + else: + # If not, the open a dialog box to select from known stations with data + dialog = ComboDialog(config_platepar_mask_dict, + window_title="Select station to calibrate", + label="Stations available for calibration:") + if dialog.exec_() == QDialog.Accepted: + selected_station = dialog.get_selection() + print("Selected station {}".format(selected_station)) + else: + sys.exit() + + + # Start to set the variables to launch the platetool + station_data = config_platepar_mask_dict[selected_station] + c = station_data[0] + cml_args.config_path = station_data[1] + platepar_file = station_data[2] + cml_args.mask = station_data[3] + + if cml_args.mask is None: + mask_path = os.path.expanduser(os.path.join(os.getcwd(), c.mask_file)) + if os.path.exists(mask_path): + cml_args.mask = mask_path + else: + mask_path = cml_args.mask + + # Use the correct config file to build the paths + captured_directory_path = os.path.expanduser(os.path.join(str(c.data_dir), str(c.captured_dir))) + station = c.stationID + captured_directory_list = os.listdir(captured_directory_path) + captured_directory_full_path = None + + # Check there are still some captured directories to use - there must be + if not len(captured_directory_list): + print("No captured directories found, cannot continue") + sys.exit() + + # Now work back from the newest directory + for potential_captured_directory in sorted(verifyCapturedDirectories(getCapturedDirectoryObjects(c), c), reverse=True): + one_valid_fits = False + captured_directory_full_path = os.path.join(captured_directory_path, potential_captured_directory) + potential_captured_directory = os.path.basename(potential_captured_directory) + if potential_captured_directory.startswith("{}_".format(station)) \ + and os.path.isdir(captured_directory_full_path): + # Maybe we have a calstar file and can get a best_fits_file + best_fits_file, star_count_max = getFITSMostStars(getCalstarsPath(potential_captured_directory, c)) + if anyFits([captured_directory_full_path], c): + break + + mask = None + cml_args.mask = os.path.dirname(mask_path) + if os.path.exists(cml_args.mask): + mask = getMaskFile(cml_args.mask, c) + + if (mask is not None) and (not mask.checkResolution(c.width, c.height)): + print("Mask resolution ({:d}, {:d}) does not match the image resolution ({:d}, {:d}). Ignoring the mask.".format( + mask.width, mask.height, c.width, c.height)) + mask = None + + return captured_directory_full_path, platepar_file, mask_path, mask, best_fits_file, star_count_max, c, cml_args.config_path + + +def getMaskPath(station_directory, multi_cam=False): + + """ + Work out the path to the mask. + + Arguments: + station_directory: [str] The directory for this station, i.e. ~/source/RMS/ or ~/source/RMSStations + Keyword Arguments: + multi_cam: [bool] Optional, default false, if true work using a multicam linux file structure. + + Returns: + [str] path to the mask + """ + + if multi_cam: + multi_cam_stations_directory = os.path.join(os.path.dirname(os.getcwd()), "Stations") + full_path_potential_station_directory = os.path.join(multi_cam_stations_directory, station_directory) + c = cr.parse(os.path.expanduser(os.path.join(full_path_potential_station_directory, ".config"))) + mask_path = os.path.join(full_path_potential_station_directory, c.mask_file) + else: + c = cr.parse(os.path.expanduser(os.path.join(os.getcwd(), ".config"))) + mask_path = os.path.join(os.getcwd(), c.mask_file) + + if not os.path.exists(mask_path): + mask_path = None + return mask_path + + +def getPlateparPath(station_directory, multi_cam=False): + """ + + Get the path to a file which can be read as a platepar. + + Arguments: + station_directory: the directory of a specific station i.e. AU000A + + Keyword Arguments: + multi_cam: True if this is a multi_cam environment, default false + + Return: + [str] path to the platepar if one exists, else None + """ + + + if multi_cam: + multi_cam_stations_directory = os.path.join(os.path.dirname(os.getcwd()), "Stations") + full_path_potential_station_directory = os.path.join(multi_cam_stations_directory, station_directory) + c = cr.parse(os.path.expanduser(os.path.join(full_path_potential_station_directory, ".config"))) + platepar_path = os.path.join(full_path_potential_station_directory, c.platepar_name) + else: + c = cr.parse(os.path.expanduser(os.path.join(os.getcwd(), ".config"))) + platepar_path = os.path.join(os.getcwd(), c.platepar_name) + + if os.path.exists(platepar_path): + pp = Platepar() + try: + pp.read(platepar_path) + except: + pp = None + if pp is None: + return None + else: + if pp.station_code != station_directory and multi_cam: + pp = None + platepar_path = None + else: + return platepar_path + + return None + + +def anyFits(directory_list, config, prefix="FF", extension=".fits", delimiter="_", no_of_parts=6): + + """ + + Arguments: + directory_list: [list] list of directories to be searched + config: config file for stationID + + Keyword arguments + prefix: [str] optional default FF + extension: [str] optional default .fits + delimiter: [char] optional default _ + no_of_parts: [int] optional default 6 + + Returns: + [bool] True if any file matching is found + """ + + # Does at least one of the captured_directories contain a fits file + one_fits_file_found = False + for captured_directory in directory_list: + file_list = os.listdir(captured_directory) + for test_file in file_list: + if test_file.startswith("{}{}{}{}".format(prefix,delimiter,config.stationID,delimiter)) \ + and test_file.endswith(extension) \ + and len(test_file.split("_")) == no_of_parts: + # This is probably a fits file + one_fits_file_found = True + break + if one_fits_file_found: + break + return one_fits_file_found + + +def verifyCapturedDirectories(directory_list, config): + """ + Given a list of file system objects, and a config file, return of list of directories which match the style + of an RMS captured directory associated with the station in the config file. + + Args: + directory_list: [list] list of file system objects. + config: [config] RMS config instance. + + Return: + verified directory list: Only the directories which match the expected format, sorted ascending. + + """ + + + # Filter these list of files in the captured directory for directories which match expected pattern + full_path_to_captured_files_directory = os.path.join(config.data_dir, config.captured_dir) + verified_directory_list = [] + for potential_directory in directory_list: + if not os.path.isdir(os.path.join(full_path_to_captured_files_directory, potential_directory)): + # If it is not a directory, continue + continue + # Check as much as we reasonably can that this is not some random directory saved here + if potential_directory.startswith("{}_".format(config.stationID)) \ + and len(potential_directory.split("_")) == 4: + verified_directory_list.append(os.path.join(full_path_to_captured_files_directory, potential_directory)) + return sorted(verified_directory_list) + + +def getCapturedDirectoryObjects(config): + """ + Get all the file system objects in captured directory pointed to by the config file. + + Arguments: + config: [config] RMS config instance. + + Return: + captured_directory_list sorted ascending + """ + + full_path_to_captured_files_directory = os.path.join(config.data_dir, config.captured_dir) + captured_directory_list = [] + if os.path.exists(full_path_to_captured_files_directory): + captured_directory_list = os.listdir(full_path_to_captured_files_directory) + return sorted(captured_directory_list) + if __name__ == '__main__': ### COMMAND LINE ARGUMENTS @@ -6734,16 +7644,18 @@ def getVisibleUnmarkedStarsAndDistanceToMarked(marked_x_list, marked_y_list, min # Init the command line arguments parser arg_parser = argparse.ArgumentParser(description="Tool for fitting astrometry plates and photometric calibration.") - arg_parser.add_argument('input_path', metavar='INPUT_PATH', type=str, - help='Path to the folder with FF or image files, path to a video file, or to a state file.' + arg_parser.add_argument('input_path', metavar='INPUT_PATH', type=str, nargs='?', + help='Path to the folder with FF or image files, path to a video file, ' + ' to a state file, an RMS bz2 file, or user@host:path/to/config/ . for remote platepar fitting' + ' if no path is given, the .config is assumed to be at ~/source/RMS/.config' ' If images or videos are given, their names must be in the format: YYYYMMDD_hhmmss.uuuuuu') + arg_parser.add_argument('-c', '--config', nargs=1, metavar='CONFIG_PATH', type=str, help="Path to a config file which will be used instead of the default one." " To load the .config file in the given data directory, write '.' (dot).") - arg_parser.add_argument('-r', '--fr', action="store_true", \ - help="""Use FR files. """) + arg_parser.add_argument('-r', '--fr', action="store_true", help="""Use FR files. """) arg_parser.add_argument('-t', '--timebeg', nargs=1, metavar='TIME', type=str, help="The beginning time of the video file in the YYYYMMDD_hhmmss.uuuuuu format.") @@ -6777,17 +7689,33 @@ def getVisibleUnmarkedStarsAndDistanceToMarked(marked_x_list, marked_y_list, min arg_parser.add_argument('-m', '--mask', metavar='MASK_PATH', type=str, help="Path to a mask file which will be applied to the star catalog") - + + arg_parser.add_argument('-u', '--number_of_fits', metavar='NUMBER_OF_FITS', type=int, + help="When working remotely, number of fits files to download. \n" + "If not specified, then pick the middle file" + "1 - Pick the penultimate file by time. \n" + "0 - Pick the file in the approximate middle of most recent capture session. \n") + + + arg_parser.add_argument('--flatbiassub', action="store_true", \ help="Subtract the bias from the flat. False by default.") - # Parse the command line arguments cml_args = arg_parser.parse_args() ######################### + platepar_file = None + best_fits_file = None + mask_file = None + + + # expand the user for the list of cml_args.config + cml_args.config = expandUserList(cml_args.config, ".config") + + print(cml_args.config) # Parse the beginning time into a datetime object if cml_args.timebeg is not None: @@ -6811,77 +7739,104 @@ def getVisibleUnmarkedStarsAndDistanceToMarked(marked_x_list, marked_y_list, min app = QtWidgets.QApplication(sys.argv) - # If the state file was given, load the state - if cml_args.input_path.endswith('.state'): + if cml_args.input_path is None: + input_path = None + config = None + mask = None + elif len(cml_args.input_path) == 6: + input_path = cml_args.input_path + config = None + mask = None + else: + cml_args.input_path = os.path.expanduser(cml_args.input_path) + config_path_list = [] - dir_path, state_name = os.path.split(cml_args.input_path) - config = cr.loadConfigFromDirectory(cml_args.config, cml_args.input_path) + if not cml_args.mask is None: + if os.path.isfile(cml_args.mask) or cml_args.mask.endswith(".bmp"): + cml_args.mask = os.path.dirname(cml_args.mask) - # Create plate_tool without calling its constructor then calling loadstate - plate_tool = PlateTool.__new__(PlateTool) - super(PlateTool, plate_tool).__init__() - if cml_args.mask is not None: - print("Given a path to a mask at {}".format(cml_args.mask)) - mask = getMaskFile(os.path.expanduser(cml_args.mask), config) - elif os.path.exists(os.path.join(config.rms_root_dir, config.mask_file)): - print("No mask specified loading mask from {}".format(os.path.join(config.rms_root_dir, config.mask_file))) - mask = getMaskFile(config.rms_root_dir, config) - elif os.path.exists("mask.bmp"): - mask = getMaskFile(".", config) + # If the state file was given, load the state + if cml_args.input_path.endswith('.state'): - elif True: - mask = None + dir_path, state_name = os.path.split(cml_args.input_path) + config = cr.loadConfigFromDirectory(cml_args.config, cml_args.input_path) - # If the dimensions of the mask do not match the config file, ignore the mask - if (mask is not None) and (not mask.checkResolution(config.width, config.height)): - print("Mask resolution ({:d}, {:d}) does not match the image resolution ({:d}, {:d}). Ignoring the mask.".format( - mask.width, mask.height, config.width, config.height)) - mask = None + # Create plate_tool without calling its constructor then calling loadstate + plate_tool = PlateTool.__new__(PlateTool) + super(PlateTool, plate_tool).__init__() - plate_tool.loadState(dir_path, state_name, beginning_time=beginning_time, mask=mask) + if cml_args.mask is not None: + print("Given a path to a mask at {}".format(cml_args.mask)) + mask = getMaskFile(os.path.expanduser(cml_args.mask), config) - else: + elif os.path.exists(os.path.join(config.rms_root_dir, config.mask_file)): + print("No mask specified loading mask from {}".format(os.path.join(config.rms_root_dir, config.mask_file))) + mask = getMaskFile(config.rms_root_dir, config) + + elif os.path.exists("mask.bmp"): + mask = getMaskFile(".", config) + + elif True: + mask = None + + # If the dimensions of the mask do not match the config file, ignore the mask + if (mask is not None) and (not mask.checkResolution(config.width, config.height)): + print("Mask resolution ({:d}, {:d}) does not match the image resolution ({:d}, {:d}). Ignoring the mask.".format( + mask.width, mask.height, config.width, config.height)) + mask = None + + plate_tool.loadState(dir_path, state_name, beginning_time=beginning_time, mask=mask) + + elif cml_args.input_path.endswith('.bz2'): + handleBZ2(cml_args.input_path, cml_args.mask) + + elif isLoginPath(cml_args.input_path): + handleLoginPath(cml_args.input_path, cml_args.number_of_fits) - # Extract the data directory path - input_path = cml_args.input_path.replace('"', '') - if os.path.isfile(input_path): - dir_path = os.path.dirname(input_path) else: - dir_path = input_path - # Load the config file - config = cr.loadConfigFromDirectory(cml_args.config, dir_path) + # Extract the data directory path + input_path = cml_args.input_path.replace('"', '') + if os.path.isfile(input_path): + dir_path = os.path.dirname(input_path) + else: + dir_path = input_path + # Load the config file + config = cr.loadConfigFromDirectory(cml_args.config, dir_path) - if cml_args.mask is not None: - print("Given a path to a mask at {}".format(cml_args.mask)) - mask = getMaskFile(os.path.expanduser(cml_args.mask), config) - elif os.path.exists(os.path.join(config.rms_root_dir, config.mask_file)): + if cml_args.mask is not None: + print("Given a path to a mask at {}".format(cml_args.mask)) + mask = getMaskFile(os.path.expanduser(cml_args.mask), config) - print("No mask specified loading mask from {}".format(os.path.join(config.rms_root_dir, config.mask_file))) - mask = getMaskFile(config.rms_root_dir, config) + elif os.path.exists(os.path.join(config.rms_root_dir, config.mask_file)): - elif os.path.exists("mask.bmp"): - mask = getMaskFile(".", config) + print("No mask specified loading mask from {}".format(os.path.join(config.rms_root_dir, config.mask_file))) + mask = getMaskFile(config.rms_root_dir, config) + + elif os.path.exists("mask.bmp"): + mask = getMaskFile(".", config) + + else: + mask = None + + # If the dimensions of the mask do not match the config file, ignore the mask + if (mask is not None) and (not mask.checkResolution(config.width, config.height)): + print("Mask resolution ({:d}, {:d}) does not match the image resolution ({:d}, {:d}). Ignoring the mask.".format( + mask.width, mask.height, config.width, config.height)) + mask = None - else: - mask = None - # If the dimensions of the mask do not match the config file, ignore the mask - if (mask is not None) and (not mask.checkResolution(config.width, config.height)): - print("Mask resolution ({:d}, {:d}) does not match the image resolution ({:d}, {:d}). Ignoring the mask.".format( - mask.width, mask.height, config.width, config.height)) - mask = None # Init SkyFit - plate_tool = PlateTool(input_path, config, beginning_time=beginning_time, fps=cml_args.fps, \ + plate_tool = PlateTool(input_path, config, beginning_time=beginning_time, fps=cml_args.fps, \ gamma=cml_args.gamma, use_fr_files=cml_args.fr, geo_points_input=cml_args.geopoints, mask=mask, nobg=cml_args.nobg, peribg=cml_args.peribg, flipud=cml_args.flipud, - flatbiassub=cml_args.flatbiassub) + flatbiassub=cml_args.flatbiassub, platepar_file=platepar_file, fits_file_to_open=None) # Run the GUI app