From 07cfb027a77f93667f0651f670545119a1e021b1 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Wed, 25 Jun 2025 14:29:42 +0000 Subject: [PATCH 01/22] Read bz2 files and write out pp as xx0001.cal --- Utils/SkyFit2.py | 106 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 86 insertions(+), 20 deletions(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index 7c790dbac..b69792893 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -18,6 +18,9 @@ import scipy.optimize import pyqtgraph as pg import random +import tempfile +import tarfile +import shutil from RMS.Astrometry.ApplyAstrometry import xyToRaDecPP, raDecToXYPP, \ rotationWrtHorizon, rotationWrtHorizonToPosAngle, computeFOVSize, photomLine, photometryFit, \ @@ -6498,6 +6501,7 @@ def saveECSV(self): + def getRollingShutterCorrectedFrameNo(self, frame, pick): """ Given a pick object, return rolling shutter corrected (or not, depending on the config) frame number. @@ -6564,10 +6568,16 @@ def furthestStar(self, miss_this_one=False, min_separation=15): def getMarkedStars(include_unsuitable=True): + """Returns: a list of stars which are either marked as paired, or bad in image coordinates. - """ + Arguments: + + Keyword Arguments: + include_unsuitable: [bool] Include stars marked as unsuitable. - Returns: a list of stars which are either marked as paired, or bad in image coordinates + Return + marked_x: [list] of x coordinates. + marked_y: [list] of y coordindates. """ @@ -6588,17 +6598,19 @@ def getMarkedStars(include_unsuitable=True): def isDouble(x,y, reference_x_list, reference_y_list, min_separation=5): - """ - Are x,y coordinates which are very close to, but distinct from all coordinates in reference list + """ Are x,y coordinates which are very close to, but distinct from all coordinates in reference list. - Args: - x: image coordinates of star - y: image coordinates of star - reference_x_list: list of x image coordinates - reference_y_list: list of y image coordinates + Arguments: + x: [int] Image coordinates of star. + y: [int] Image coordinates of star. + reference_x_list: [list] List of x image coordinates. + reference_y_list: [list] List of y image coordinates. - Returns: - [bool] True if star is within min_separation of another star + Keyword Arguments: + min_separation: [int] Minimum separation not to be considered a double star. + + Return: + [bool] True if star is within min_separation of another star. """ for reference_x, reference_y in zip(reference_x_list, reference_y_list): @@ -6615,17 +6627,19 @@ def getVisibleUnmarkedStarsAndDistanceToMarked(marked_x_list, marked_y_list, min """ From the catalogue of filtered stars return a lists of coordinates stars which are not marked, - and another list which is the distance to the nearest marked star + and another list which is the distance to the nearest marked star. - Args: - marked_x_list: list of marked star x coordinates - marked_y_list: list of marked star y coordinates - min_separation: minimum separation to be regarded as a different stra + Arguments: + marked_x_list: [list] list of marked star x coordinates. + marked_y_list: [list] list of marked star y coordinates. + + Keyword Arguments + min_separation: [int] Minimum seperation not be regarded as a double star. Returns: - unmarked_x_list: list of unmarked star x coordinates - unmarked_y_list: list of unmarked star x coordinates - dist_nearest_marked_list: distance of the nearest marked star for returned star coordinates + unmarked_x_list: list of unmarked star x coordinates. + unmarked_y_list: list of unmarked star x coordinates. + dist_nearest_marked_list: distance of the nearest marked star for returned star coordinates. """ @@ -6725,7 +6739,55 @@ def getVisibleUnmarkedStarsAndDistanceToMarked(marked_x_list, marked_y_list, min return unmarked_x_list[next_star_index], unmarked_y_list[next_star_index], max_distance_between_paired +def handleBZ2(bz2_path): + """Passed a path to a bz2 file, unpack and prepare a working area for PlateTool, and launch. + + Arguments: + bz2_path: [path] Path to a bz2 file. + + Returns: + working_dir: [path] Path to a directory containing .config, fits files and if available, a mask. + """ + bz2_path = os.path.expanduser(bz2_path) + bz2_basename = os.path.basename(bz2_path) + stationID = bz2_basename.split("_")[0] + + with tempfile.TemporaryDirectory() as working_dir: + print("Extracting {}".format(bz2_basename)) + with tarfile.open(bz2_path, 'r:bz2') as tar: + tar.extractall(path=working_dir) + config_path = os.path.join(working_dir, ".config") + if os.path.exists(config_path): + config = cr.parse(config_path) + else: + print("No config file found in {}".format(bz2_basename)) + print("Quitting") + exit() + mask_path = os.path.join(working_dir, config.mask_file) + if os.path.exists(mask_path): + mask = getMaskFile(".", config) + + # If the dimensions of the mask do not match the config file, ignore the mask + if (mask is not None) and (not mask.checkResolution(config.width, config.height)): + print( + "Mask resolution ({:d}, {:d}) does not match the image resolution ({:d}, {:d}). Ignoring the mask.".format( + mask.width, mask.height, config.width, config.height)) + mask = None + + # Init SkyFit + plate_tool = PlateTool(working_dir, config, beginning_time=beginning_time, fps=cml_args.fps, \ + gamma=cml_args.gamma, use_fr_files=cml_args.fr, geo_points_input=cml_args.geopoints, + mask=mask, nobg=cml_args.nobg, peribg=cml_args.peribg, flipud=cml_args.flipud, + flatbiassub=cml_args.flatbiassub) + + # Run the GUI app + a = app.exec_() + temp_platepar_location = os.path.join(working_dir, config.platepar_name) + platepar_destination = os.path.join(os.getcwd(), "{}.cal".format(config.stationID.lower())) + print("Writing modified platepar to {}".format(platepar_destination)) + shutil.copy2(temp_platepar_location, platepar_destination) + sys.exit(a) if __name__ == '__main__': @@ -6735,7 +6797,8 @@ def getVisibleUnmarkedStarsAndDistanceToMarked(marked_x_list, marked_y_list, min arg_parser = argparse.ArgumentParser(description="Tool for fitting astrometry plates and photometric calibration.") arg_parser.add_argument('input_path', metavar='INPUT_PATH', type=str, - help='Path to the folder with FF or image files, path to a video file, or to a state file.' + help='Path to the folder with FF or image files, path to a video file, ' + ' to a state file, or an RMS bz2 file.' ' If images or videos are given, their names must be in the format: YYYYMMDD_hhmmss.uuuuuu') arg_parser.add_argument('-c', '--config', nargs=1, metavar='CONFIG_PATH', type=str, @@ -6843,6 +6906,9 @@ def getVisibleUnmarkedStarsAndDistanceToMarked(marked_x_list, marked_y_list, min plate_tool.loadState(dir_path, state_name, beginning_time=beginning_time, mask=mask) + elif cml_args.input_path.endswith('.bz2'): + handleBZ2(cml_args.input_path) + else: # Extract the data directory path From 8486e7805d9b0469bbb31c5fe769a42e02b50f36 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Thu, 26 Jun 2025 14:12:25 +0000 Subject: [PATCH 02/22] Run over a network such as the internet --- Utils/SkyFit2.py | 51 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index b69792893..c18e63f1f 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -21,6 +21,8 @@ import tempfile import tarfile import shutil +import paramiko + from RMS.Astrometry.ApplyAstrometry import xyToRaDecPP, raDecToXYPP, \ rotationWrtHorizon, rotationWrtHorizonToPosAngle, computeFOVSize, photomLine, photometryFit, \ @@ -6790,6 +6792,55 @@ def handleBZ2(bz2_path): sys.exit(a) +def lsRemote(host, username, port, remote_path): + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Accept unknown host keys + ssh.connect(hostname=host, port=port, username=username) + + try: + sftp = ssh.open_sftp() + files = sftp.listdir(remote_path) + return files + finally: + sftp.close() + ssh.close() + + +def downloadFile(host, username, port, remote_path, local_path): + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Accept unknown host keys + try: + ssh.connect(hostname=host, port=port, username=username) + except: + print("Login to {}@{} failed. You need to add your keys to remote using ssh-copy-id.".format(username,host)) + + try: + sftp = ssh.open_sftp() + print("Downloading {} to {}".format(remote_path, local_path)) + sftp.get(remote_path, local_path) + + finally: + sftp.close() + ssh.close() + +def uploadFile(host, username, port, local_path, remote_path): + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Accept unknown host keys + try: + ssh.connect(hostname=host, port=port, username=username) + except: + print("Login to {}@{} failed. You need to add your keys to remote using ssh-copy-id.".format(username,host)) + + try: + sftp = ssh.open_sftp() + sftp.put(local_path, remote_path) + print("Uploaded {} to {}".format(local_path, remote_path)) + finally: + sftp.close() + ssh.close() + if __name__ == '__main__': ### COMMAND LINE ARGUMENTS From 11c764c659f02a4501771a05f6e67f9e6e4efcd0 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Thu, 26 Jun 2025 14:26:14 +0000 Subject: [PATCH 03/22] Handle when port not given --- Utils/SkyFit2.py | 143 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index c18e63f1f..e9e72eea3 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -6841,6 +6841,146 @@ def uploadFile(host, username, port, local_path, remote_path): sftp.close() ssh.close() + + +def isLoginPath(path): + """Passed a path see if it is a path to a remote RMS installation + + Arguments: + path: [str] String to be tested + + Returns: + is_login_path: [bool] True if a network path, else false + """ + + pattern_port = r'^([\w.-]+)@([\w.-]+):(\d+):(.*)$' + pattern = r'^[^@:\s]+@[^@:\s]+:[^\s]+$' + is_login_path = re.match(pattern, path) or re.match(pattern_port, path) + + return is_login_path + +def getUserHostPortPath(path): + """Passed a user@host:path, return components + + Arguments: + path: [str] path to be broken appart + + Returns: + user: [str] e.g. rms + host: [str] e.g. raspberrypi + port: [str] e.g. 22 + path: [str] e.g. 192.168.1.2 + """ + + + + + pattern_port = r'^([\w.-]+)@([\w.-]+):(\d+):(.*)$' + match_port = re.match(pattern_port, path) + + if match_port: + user, host, port, path = match_port.groups() + return user, host, port, path + + pattern = r'^([^@:\s]+)@([^@:\s]+):([^\s]+)$' + match = re.match(pattern, path) + + if match: + user, host, path = match.groups() + return user, host, 22, path + + + return None, None, None, None + +def getFiles(host, user, port, remote_path, local_path, files_list): + + local_target_list = [] + for f in files_list: + local_target = os.path.join(local_path, f) + remote_target = os.path.join(remote_path, f) + downloadFile(host, user, port, remote_target, local_target) + local_target_list.append(local_target) + return local_target_list + + +def getRemoteCapturedDirsPath(rc): + len_local_home_directory = len(os.path.expanduser("~")) + len("/") + return os.path.join(rc.data_dir[len_local_home_directory:], rc.captured_dir) + +def getLatestCapturedDirectory(r, host, user, port): + + remote_captured_directory_list = lsRemote(host, user, port, getRemoteCapturedDirsPath(r)) + remote_captured_directory_list = [d for d in remote_captured_directory_list if d.startswith(r.stationID)] + remote_captured_directory_list.sort(reverse=True) + return remote_captured_directory_list[0] + +def putFiles(host, user, port, local_path, remote_path, files_list): + + local_target_list = [] + for f in files_list: + local_target = os.path.join(local_path, f) + remote_target = os.path.join(remote_path, f) + uploadFile(host, user, port, local_target, remote_target) + local_target_list.append(local_target) + return local_target_list + +def handleLoginPath(login_path): + + user, host, port, remote_path = getUserHostPortPath(login_path) + config_file_name = ('.config') + if remote_path.endswith(config_file_name): + remote_path = remote_path[:-len(config_file_name)] + print("Working on {}@{}:{}:{}".format(user, host, port, remote_path)) + # Create temporary directory + with tempfile.TemporaryDirectory() as local_path: + + # First get the .config file + files_list = ['.config'] + remote_config = cr.parse(getFiles(host, user, port, remote_path, local_path, files_list)[0]) + + files_list = [remote_config.platepar_name, remote_config.mask_file] + getFiles(host, user, port, remote_path, local_path, files_list) + latest_cap_dir = os.path.join(getRemoteCapturedDirsPath(remote_config), getLatestCapturedDirectory(remote_config, host, user, port)) + latest_captured_files = lsRemote(host, user, port, latest_cap_dir) + fits_files = [f for f in latest_captured_files if f.endswith(".fits") and f.startswith("FF_{}".format(remote_config.stationID))] + fits_files.sort(reverse=True) + num_fits_files = len(fits_files) + gap = int(num_fits_files / 5) + fits_to_download = [] + fits_to_download.append(fits_files[1 * gap]) + fits_to_download.append(fits_files[2 * gap]) + fits_to_download.append(fits_files[3 * gap]) + + # Ignore the latest file, might still be being written, but get the two before + getFiles(host, user, port, latest_cap_dir, local_path, fits_to_download) + + mask_path = os.path.join(local_path, remote_config.mask_file) + if os.path.exists(mask_path): + mask = getMaskFile(".", remote_config) + + # If the dimensions of the mask do not match the config file, ignore the mask + if (mask is not None) and (not mask.checkResolution(remote_config.width, remote_config.height)): + print( + "Mask resolution ({:d}, {:d}) does not match the image resolution ({:d}, {:d}). Ignoring the mask.".format( + mask.width, mask.height, remote_config.width, remote_config.height)) + mask = None + + # Init SkyFit + plate_tool = PlateTool(local_path, remote_config, beginning_time=beginning_time, fps=cml_args.fps, \ + gamma=cml_args.gamma, use_fr_files=cml_args.fr, geo_points_input=cml_args.geopoints, + mask=mask, nobg=cml_args.nobg, peribg=cml_args.peribg, flipud=cml_args.flipud, + flatbiassub=cml_args.flatbiassub) + + # Run the GUI app + a = app.exec_() + files_list = [remote_config.platepar_name] + putFiles(host, user, port, local_path, remote_path, files_list) + sys.exit(a) + + + pass + + if __name__ == '__main__': ### COMMAND LINE ARGUMENTS @@ -6960,6 +7100,9 @@ def uploadFile(host, username, port, local_path, remote_path): elif cml_args.input_path.endswith('.bz2'): handleBZ2(cml_args.input_path) + elif isLoginPath(cml_args.input_path): + handleLoginPath(cml_args.input_path) + else: # Extract the data directory path From a13babb057aab2da6252c628937e321a6b35eec5 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Thu, 26 Jun 2025 14:40:41 +0000 Subject: [PATCH 04/22] Update docstrings --- Utils/SkyFit2.py | 110 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 86 insertions(+), 24 deletions(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index e9e72eea3..ef8f063d8 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -6825,6 +6825,19 @@ def downloadFile(host, username, port, remote_path, local_path): ssh.close() def uploadFile(host, username, port, local_path, remote_path): + """Upload a single file. + + Arguments: + host: [str] hostname of remote machine. + username: [str] username of remote machine. + port: [str] port. + local_path: [path] full path to file to be uploaded. + remote_path: [path] full path to destination. + + Return: + Nothing. + """ + ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Accept unknown host keys @@ -6844,13 +6857,13 @@ def uploadFile(host, username, port, local_path, remote_path): def isLoginPath(path): - """Passed a path see if it is a path to a remote RMS installation + """Passed a path see if it is a path to a remote RMS installation. - Arguments: - path: [str] String to be tested + Arguments: + path: [str] String to be tested. - Returns: - is_login_path: [bool] True if a network path, else false + Return: + is_login_path: [bool] True if a network path, else false. """ pattern_port = r'^([\w.-]+)@([\w.-]+):(\d+):(.*)$' @@ -6860,20 +6873,17 @@ def isLoginPath(path): return is_login_path def getUserHostPortPath(path): - """Passed a user@host:path, return components - - Arguments: - path: [str] path to be broken appart - - Returns: - user: [str] e.g. rms - host: [str] e.g. raspberrypi - port: [str] e.g. 22 - path: [str] e.g. 192.168.1.2 - """ - + """Passed a user@host:port:path, or user@host:path return components, assuming port 22 + Arguments: + path: [str] path to be broken apart. + Return: + user: [str] e.g. rms + host: [str] e.g. raspberrypi + port: [str] e.g. 22 + path: [str] e.g. 192.168.1.2 + """ pattern_port = r'^([\w.-]+)@([\w.-]+):(\d+):(.*)$' match_port = re.match(pattern_port, path) @@ -6893,21 +6903,55 @@ def getUserHostPortPath(path): return None, None, None, None def getFiles(host, user, port, remote_path, local_path, files_list): + """Passed a list of files, get from remote path and put in local path. + + Arguments: + host: [str] hostname. + user: [str] user account. + port: [str] port. + remote_path: [str] remote path to get files from. + local_path: [str] local path to put files in. + + Return: + local_target_list: [list] list of retrieved files. + """ local_target_list = [] for f in files_list: - local_target = os.path.join(local_path, f) - remote_target = os.path.join(remote_path, f) + local_target, remote_target = os.path.join(local_path, f), os.path.join(remote_path, f) downloadFile(host, user, port, remote_target, local_target) local_target_list.append(local_target) return local_target_list def getRemoteCapturedDirsPath(rc): + """Passed a config, get the path to the captured files on the remote machine. + + config parser will expond the ~ in any remote path as though it is on the local machine. + This function strips off the local ~ and joins the remote ~ + + Arguments: + rc: [config] the remote RMS config instance. + + Return: + [path] path to remote captured files. + """ + len_local_home_directory = len(os.path.expanduser("~")) + len("/") return os.path.join(rc.data_dir[len_local_home_directory:], rc.captured_dir) def getLatestCapturedDirectory(r, host, user, port): + """Get the latest captured directory from the remote machine. + + Arguments: + r: [config] the remote RMS config instance. + host: [str] remote host. + user: [str] remote user. + port: [str] remote port. + + Return: + [path] path to remote captured files directory. + """ remote_captured_directory_list = lsRemote(host, user, port, getRemoteCapturedDirsPath(r)) remote_captured_directory_list = [d for d in remote_captured_directory_list if d.startswith(r.stationID)] @@ -6915,16 +6959,37 @@ def getLatestCapturedDirectory(r, host, user, port): return remote_captured_directory_list[0] def putFiles(host, user, port, local_path, remote_path, files_list): + """Passed a list of files, put from local_path to remote path. + + Arguments: + host: [str] hostname. + user: [str] user account. + port: [str] port. + local_path: [str] local path to get files from. + remote_path: [str] remote path to put files in. + + + Return: + local_target_list: [list] list of retrieved files. + """ local_target_list = [] for f in files_list: - local_target = os.path.join(local_path, f) - remote_target = os.path.join(remote_path, f) + local_target, remote_target = os.path.join(local_path, f), os.path.join(remote_path, f) uploadFile(host, user, port, local_target, remote_target) local_target_list.append(local_target) return local_target_list def handleLoginPath(login_path): + """Passed a login path, retrieve necessary files and start the platetool. + + Arguments: + login_path: [str] user@host:port:path/to/.config or user@host:path/to/.config + + + Return: + Nothing. + """ user, host, port, remote_path = getUserHostPortPath(login_path) config_file_name = ('.config') @@ -6978,9 +7043,6 @@ def handleLoginPath(login_path): sys.exit(a) - pass - - if __name__ == '__main__': ### COMMAND LINE ARGUMENTS From 55abd0a3c5ea36afd4c5c66f58db065653075669 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Fri, 27 Jun 2025 06:09:19 +0000 Subject: [PATCH 05/22] Use rsync for speed if available --- Utils/SkyFit2.py | 189 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 160 insertions(+), 29 deletions(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index ef8f063d8..edcd3da09 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -22,6 +22,7 @@ import tarfile import shutil import paramiko +import subprocess from RMS.Astrometry.ApplyAstrometry import xyToRaDecPP, raDecToXYPP, \ @@ -6807,6 +6808,29 @@ def lsRemote(host, username, port, remote_path): def downloadFile(host, username, port, remote_path, local_path): + """Download a single file try compressed rsync first, then fall back to Paramiko + + Arguments: + host: [str] hostname of remote machine. + username: [str] username for remote machine. + port: [str] port. + remote_path: [path] full path to destination. + local_path: [path] full path of local target. + + Return: + Nothing. + """ + + try: + + remote = "{}@{}:{}".format(username, host, remote_path) + result = subprocess.run(['rsync', '-z', remote, local_path], capture_output=True, text=True) + if not os.path.exists(os.path.expanduser(local_path)): + print("Login to {}@{} failed. You need to add your keys to remote using ssh-copy-id.".format(username,host)) + quit() + return + except: + pass ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Accept unknown host keys @@ -6814,22 +6838,22 @@ def downloadFile(host, username, port, remote_path, local_path): ssh.connect(hostname=host, port=port, username=username) except: print("Login to {}@{} failed. You need to add your keys to remote using ssh-copy-id.".format(username,host)) - + quit() try: sftp = ssh.open_sftp() - print("Downloading {} to {}".format(remote_path, local_path)) sftp.get(remote_path, local_path) finally: sftp.close() ssh.close() + return def uploadFile(host, username, port, local_path, remote_path): """Upload a single file. Arguments: host: [str] hostname of remote machine. - username: [str] username of remote machine. + username: [str] username for remote machine. port: [str] port. local_path: [path] full path to file to be uploaded. remote_path: [path] full path to destination. @@ -6838,7 +6862,6 @@ def uploadFile(host, username, port, local_path, remote_path): Nothing. """ - ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Accept unknown host keys try: @@ -6902,9 +6925,18 @@ def getUserHostPortPath(path): return None, None, None, None -def getFiles(host, user, port, remote_path, local_path, files_list): - """Passed a list of files, get from remote path and put in local path. +def rsyncAvailable(path): + try: + result = subprocess.run(['rsync', '-l'], capture_output=True, text=True) + return True + except: + return False + + +def getFiles(host, user, port, remote_path, local_path, files_list, number = None): + """Passed a list of files, get from remote path and put in local path. + Arguments: host: [str] hostname. user: [str] user account. @@ -6912,17 +6944,71 @@ def getFiles(host, user, port, remote_path, local_path, files_list): remote_path: [str] remote path to get files from. local_path: [str] local path to put files in. + Keyword Arguments: + number: [int] Optional, default None. The number of files to download from the list. If none, + or more than the number of files in the list, download all. If 1, download penultimate, if 0, + download middle. + + Return: local_target_list: [list] list of retrieved files. """ + + + files_list.sort() + if number == 0: + # Pick approximately the middle from the fil + files_list = [files_list[len(files_list) // 2]] + else: + files_list = nItemsFromList(number, files_list, drop_last=True, sort=True) local_target_list = [] for f in files_list: local_target, remote_target = os.path.join(local_path, f), os.path.join(remote_path, f) + text = highlight("Downloading ", files_list, f) + print(text , end='\r') downloadFile(host, user, port, remote_target, local_target) local_target_list.append(local_target) + print() return local_target_list +def highlight(custom_text, list, highlight): + + HEADER = '\033[95m' + OKBLUE = '\033[94m' + OKCYAN = '\033[96m' + OKGREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + + output = HEADER + custom_text + OKBLUE + if len(list) > 5: + output += "\n" + i = 0 + for item in list: + i += 1 + if item == highlight: + output += WARNING + output += "{}".format(item) + output += ENDC + " " + else: + output += OKBLUE + output += "{}".format(item) + output += ENDC + " " + + if i % 6 == 0: + output += "\n" + " " * (len(custom_text) + 2) + output += ENDC + + return output + + + + + def getRemoteCapturedDirsPath(rc): """Passed a config, get the path to the captured files on the remote machine. @@ -6980,7 +7066,53 @@ def putFiles(host, user, port, local_path, remote_path, files_list): local_target_list.append(local_target) return local_target_list -def handleLoginPath(login_path): +def nItemsFromList(number, input_list, drop_first=False, drop_last=False, sort=True): + """Return a list of length number, containing equally spaced items from input list. + + + + Arguments: + number: [int] Number of list items to return. Can be more than the length of the input list, + in which case items will be duplicated to pad to length. + input_list: [list] Input list. + + Keyword arguments: + drop_first: [bool] If true, remove the first item from the list. + drop_last: [bool] If true, remove the last item from the list. + + Return: + output_list: [list] list of length number + """ + + if number is None: + return input_list + + # Avoid divide by zero error + if number < 1: + return [] + + # Avoid working on empty list + + if len(input_list) < 1: + return[] + + # Truncate as required + input_list = input_list[1:] if drop_first else input_list + input_list = input_list[:-1] if drop_last else input_list + + # Sort the list + if sort: + input_list.sort() + + output_list, n, gap = [], 0, (len(input_list)) / (number) + for i in range(0, number): + output_list.append(input_list[round(n)]) + n += gap + + return output_list + + +def handleLoginPath(login_path, number_of_fits=None): """Passed a login path, retrieve necessary files and start the platetool. Arguments: @@ -6990,35 +7122,28 @@ def handleLoginPath(login_path): Return: Nothing. """ - + number_of_fits = 1 if number_of_fits is None else int(number_of_fits) user, host, port, remote_path = getUserHostPortPath(login_path) config_file_name = ('.config') if remote_path.endswith(config_file_name): remote_path = remote_path[:-len(config_file_name)] - print("Working on {}@{}:{}:{}".format(user, host, port, remote_path)) + print("Getting .config from {}@{}:{}:{}".format(user, host, port, remote_path)) # Create temporary directory with tempfile.TemporaryDirectory() as local_path: # First get the .config file files_list = ['.config'] + time_started_getting_files = datetime.datetime.now(datetime.timezone.utc) remote_config = cr.parse(getFiles(host, user, port, remote_path, local_path, files_list)[0]) - - files_list = [remote_config.platepar_name, remote_config.mask_file] - getFiles(host, user, port, remote_path, local_path, files_list) + platepar_mask_list = [remote_config.platepar_name, remote_config.mask_file] + getFiles(host, user, port, remote_path, local_path, platepar_mask_list) latest_cap_dir = os.path.join(getRemoteCapturedDirsPath(remote_config), getLatestCapturedDirectory(remote_config, host, user, port)) latest_captured_files = lsRemote(host, user, port, latest_cap_dir) fits_files = [f for f in latest_captured_files if f.endswith(".fits") and f.startswith("FF_{}".format(remote_config.stationID))] - fits_files.sort(reverse=True) - num_fits_files = len(fits_files) - gap = int(num_fits_files / 5) - fits_to_download = [] - fits_to_download.append(fits_files[1 * gap]) - fits_to_download.append(fits_files[2 * gap]) - fits_to_download.append(fits_files[3 * gap]) - - # Ignore the latest file, might still be being written, but get the two before - getFiles(host, user, port, latest_cap_dir, local_path, fits_to_download) - + getFiles(host, user, port, latest_cap_dir, local_path, fits_files, number=number_of_fits) + time_finished_getting_files = datetime.datetime.now(datetime.timezone.utc) + time_taken = (time_finished_getting_files - time_started_getting_files).total_seconds() + print("Files retrieved in {:.1f} seconds".format(time_taken)) mask_path = os.path.join(local_path, remote_config.mask_file) if os.path.exists(mask_path): mask = getMaskFile(".", remote_config) @@ -7051,15 +7176,15 @@ def handleLoginPath(login_path): arg_parser.add_argument('input_path', metavar='INPUT_PATH', type=str, help='Path to the folder with FF or image files, path to a video file, ' - ' to a state file, or an RMS bz2 file.' + ' to a state file, an RMS bz2 file, or user@host:path/to/config/ .' ' If images or videos are given, their names must be in the format: YYYYMMDD_hhmmss.uuuuuu') + arg_parser.add_argument('-c', '--config', nargs=1, metavar='CONFIG_PATH', type=str, help="Path to a config file which will be used instead of the default one." " To load the .config file in the given data directory, write '.' (dot).") - arg_parser.add_argument('-r', '--fr', action="store_true", \ - help="""Use FR files. """) + arg_parser.add_argument('-r', '--fr', action="store_true", help="""Use FR files. """) arg_parser.add_argument('-t', '--timebeg', nargs=1, metavar='TIME', type=str, help="The beginning time of the video file in the YYYYMMDD_hhmmss.uuuuuu format.") @@ -7093,12 +7218,18 @@ def handleLoginPath(login_path): arg_parser.add_argument('-m', '--mask', metavar='MASK_PATH', type=str, help="Path to a mask file which will be applied to the star catalog") - + + arg_parser.add_argument('-u', '--number_of_fits', metavar='NUMBER_OF_FITS', type=int, + help="When working remotely, number of fits files to download. \n" + "1 - Pick the penultimate file by time. \n" + "0 - Pick the file in the approximate middle of most recent capture session. \n") + + + arg_parser.add_argument('--flatbiassub', action="store_true", \ help="Subtract the bias from the flat. False by default.") - # Parse the command line arguments cml_args = arg_parser.parse_args() @@ -7163,7 +7294,7 @@ def handleLoginPath(login_path): handleBZ2(cml_args.input_path) elif isLoginPath(cml_args.input_path): - handleLoginPath(cml_args.input_path) + handleLoginPath(cml_args.input_path, cml_args.number_of_fits) else: From c2d9b936ad3830e0e16080b7aa277dbc93c0f546 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Fri, 27 Jun 2025 08:47:44 +0000 Subject: [PATCH 06/22] Completed work on remote access --- Utils/SkyFit2.py | 252 +++++++++++++++++++++++------------------------ 1 file changed, 122 insertions(+), 130 deletions(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index edcd3da09..a76bc994d 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -6742,6 +6742,8 @@ def getVisibleUnmarkedStarsAndDistanceToMarked(marked_x_list, marked_y_list, min return unmarked_x_list[next_star_index], unmarked_y_list[next_star_index], max_distance_between_paired + + def handleBZ2(bz2_path): """Passed a path to a bz2 file, unpack and prepare a working area for PlateTool, and launch. @@ -6792,7 +6794,6 @@ def handleBZ2(bz2_path): shutil.copy2(temp_platepar_location, platepar_destination) sys.exit(a) - def lsRemote(host, username, port, remote_path): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Accept unknown host keys @@ -6806,7 +6807,6 @@ def lsRemote(host, username, port, remote_path): sftp.close() ssh.close() - def downloadFile(host, username, port, remote_path, local_path): """Download a single file try compressed rsync first, then fall back to Paramiko @@ -6824,9 +6824,14 @@ def downloadFile(host, username, port, remote_path, local_path): try: remote = "{}@{}:{}".format(username, host, remote_path) - result = subprocess.run(['rsync', '-z', remote, local_path], capture_output=True, text=True) + result = subprocess.run(['rsync', '-z', remote], capture_output=True, text=True) + if "No such file or directory" in result.stderr : + print("Remote file {} was not found.".format(os.path.basename(remote))) + return + else: + result = subprocess.run(['rsync', '-z', remote, local_path], capture_output=True, text=True) if not os.path.exists(os.path.expanduser(local_path)): - print("Login to {}@{} failed. You need to add your keys to remote using ssh-copy-id.".format(username,host)) + print("Download of {} from {}@{} failed. You need to add your keys to remote using ssh-copy-id.".format(remote_path, username,host)) quit() return except: @@ -6841,13 +6846,97 @@ def downloadFile(host, username, port, remote_path, local_path): quit() try: sftp = ssh.open_sftp() - sftp.get(remote_path, local_path) + remote_file_list = sftp.listdir(remote_path) + if remote_file_list: + sftp.get(remote_path, local_path) finally: sftp.close() ssh.close() return + +def nItemsFromList(number, input_list, drop_first=False, drop_last=False, sort=True): + """Return a list of length number, containing equally spaced items from input list. + + + + Arguments: + number: [int] Number of list items to return. Can be more than the length of the input list, + in which case items will be duplicated to pad to length. + input_list: [list] Input list. + + Keyword arguments: + drop_first: [bool] If true, remove the first item from the list. + drop_last: [bool] If true, remove the last item from the list. + + Return: + output_list: [list] list of length number + """ + + if number is None: + return input_list + + # Avoid divide by zero error + if number < 1: + return [] + + # Avoid working on empty list + + if len(input_list) < 1: + return[] + + # Truncate as required + input_list = input_list[1:] if drop_first else input_list + input_list = input_list[:-1] if drop_last else input_list + + # Sort the list + if sort: + input_list.sort() + + output_list, n, gap = [], 0, (len(input_list)) / (number) + for i in range(0, number): + output_list.append(input_list[round(n)]) + n += gap + + return output_list + +def getFiles(host, user, port, remote_path, local_path, files_list, number=None): + """Passed a list of files, get from remote path and put in local path. + + Arguments: + host: [str] hostname. + user: [str] user account. + port: [str] port. + remote_path: [str] remote path to get files from. + local_path: [str] local path to put files in. + + Keyword Arguments: + number: [int] Optional, default None. The number of files to download from the list. If none, + or more than the number of files in the list, download all. If 1, download penultimate, if 0, + download middle. + + + Return: + local_target_list: [list] list of retrieved files. + """ + + files_list.sort() + if number == 0: + # Pick approximately the middle from the fil + files_list = [files_list[len(files_list) // 2]] + else: + files_list = nItemsFromList(number, files_list, drop_last=True, sort=True) + local_target_list = [] + for f in files_list: + local_target, remote_target = os.path.join(local_path, f), os.path.join(remote_path, f) + text = highlight("Downloading ", files_list, f) + print(text, end='\r') + downloadFile(host, user, port, remote_target, local_target) + local_target_list.append(local_target) + print() + return local_target_list + def uploadFile(host, username, port, local_path, remote_path): """Upload a single file. @@ -6877,23 +6966,30 @@ def uploadFile(host, username, port, local_path, remote_path): sftp.close() ssh.close() - - -def isLoginPath(path): - """Passed a path see if it is a path to a remote RMS installation. +def putFiles(host, user, port, local_path, remote_path, files_list): + """Passed a list of files, put from local_path to remote path. Arguments: - path: [str] String to be tested. + host: [str] hostname. + user: [str] user account. + port: [str] port. + local_path: [str] local path to get files from. + remote_path: [str] remote path to put files in. - Return: - is_login_path: [bool] True if a network path, else false. - """ - pattern_port = r'^([\w.-]+)@([\w.-]+):(\d+):(.*)$' - pattern = r'^[^@:\s]+@[^@:\s]+:[^\s]+$' - is_login_path = re.match(pattern, path) or re.match(pattern_port, path) + Return: + local_target_list: [list] list of retrieved files. + """ - return is_login_path + local_target_list = [] + for f in files_list: + local_target, remote_target = os.path.join(local_path, f), os.path.join(remote_path, f) + if os.path.exists(local_target): + uploadFile(host, user, port, local_target, remote_target) + local_target_list.append(local_target) + else: + print("Upload target {} not found on local.".format(local_target)) + return local_target_list def getUserHostPortPath(path): """Passed a user@host:port:path, or user@host:path return components, assuming port 22 @@ -6925,53 +7021,6 @@ def getUserHostPortPath(path): return None, None, None, None -def rsyncAvailable(path): - - try: - result = subprocess.run(['rsync', '-l'], capture_output=True, text=True) - return True - except: - return False - - -def getFiles(host, user, port, remote_path, local_path, files_list, number = None): - """Passed a list of files, get from remote path and put in local path. - - Arguments: - host: [str] hostname. - user: [str] user account. - port: [str] port. - remote_path: [str] remote path to get files from. - local_path: [str] local path to put files in. - - Keyword Arguments: - number: [int] Optional, default None. The number of files to download from the list. If none, - or more than the number of files in the list, download all. If 1, download penultimate, if 0, - download middle. - - - Return: - local_target_list: [list] list of retrieved files. - """ - - - - files_list.sort() - if number == 0: - # Pick approximately the middle from the fil - files_list = [files_list[len(files_list) // 2]] - else: - files_list = nItemsFromList(number, files_list, drop_last=True, sort=True) - local_target_list = [] - for f in files_list: - local_target, remote_target = os.path.join(local_path, f), os.path.join(remote_path, f) - text = highlight("Downloading ", files_list, f) - print(text , end='\r') - downloadFile(host, user, port, remote_target, local_target) - local_target_list.append(local_target) - print() - return local_target_list - def highlight(custom_text, list, highlight): HEADER = '\033[95m' @@ -7005,11 +7054,6 @@ def highlight(custom_text, list, highlight): return output - - - - - def getRemoteCapturedDirsPath(rc): """Passed a config, get the path to the captured files on the remote machine. @@ -7044,73 +7088,21 @@ def getLatestCapturedDirectory(r, host, user, port): remote_captured_directory_list.sort(reverse=True) return remote_captured_directory_list[0] -def putFiles(host, user, port, local_path, remote_path, files_list): - """Passed a list of files, put from local_path to remote path. - - Arguments: - host: [str] hostname. - user: [str] user account. - port: [str] port. - local_path: [str] local path to get files from. - remote_path: [str] remote path to put files in. - - - Return: - local_target_list: [list] list of retrieved files. - """ - - local_target_list = [] - for f in files_list: - local_target, remote_target = os.path.join(local_path, f), os.path.join(remote_path, f) - uploadFile(host, user, port, local_target, remote_target) - local_target_list.append(local_target) - return local_target_list - -def nItemsFromList(number, input_list, drop_first=False, drop_last=False, sort=True): - """Return a list of length number, containing equally spaced items from input list. - - +def isLoginPath(path): + """Passed a path see if it is a path to a remote RMS installation. Arguments: - number: [int] Number of list items to return. Can be more than the length of the input list, - in which case items will be duplicated to pad to length. - input_list: [list] Input list. - - Keyword arguments: - drop_first: [bool] If true, remove the first item from the list. - drop_last: [bool] If true, remove the last item from the list. + path: [str] String to be tested. Return: - output_list: [list] list of length number - """ - - if number is None: - return input_list - - # Avoid divide by zero error - if number < 1: - return [] - - # Avoid working on empty list - - if len(input_list) < 1: - return[] - - # Truncate as required - input_list = input_list[1:] if drop_first else input_list - input_list = input_list[:-1] if drop_last else input_list - - # Sort the list - if sort: - input_list.sort() - - output_list, n, gap = [], 0, (len(input_list)) / (number) - for i in range(0, number): - output_list.append(input_list[round(n)]) - n += gap + is_login_path: [bool] True if a network path, else false. + """ - return output_list + pattern_port = r'^([\w.-]+)@([\w.-]+):(\d+):(.*)$' + pattern = r'^[^@:\s]+@[^@:\s]+:[^\s]+$' + is_login_path = re.match(pattern, path) or re.match(pattern_port, path) + return is_login_path def handleLoginPath(login_path, number_of_fits=None): """Passed a login path, retrieve necessary files and start the platetool. From f71ee44b56d38832820485cd1a6f762b845c755c Mon Sep 17 00:00:00 2001 From: g7gpr Date: Fri, 27 Jun 2025 11:34:24 +0000 Subject: [PATCH 07/22] Small tweak to cli --- Utils/SkyFit2.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index a76bc994d..945f64b83 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -6934,7 +6934,8 @@ def getFiles(host, user, port, remote_path, local_path, files_list, number=None) print(text, end='\r') downloadFile(host, user, port, remote_target, local_target) local_target_list.append(local_target) - print() + text = highlight("Downloading ", files_list, f, all_done = True) + print(text) return local_target_list def uploadFile(host, username, port, local_path, remote_path): @@ -7021,7 +7022,7 @@ def getUserHostPortPath(path): return None, None, None, None -def highlight(custom_text, list, highlight): +def highlight(custom_text, list, highlight, all_done=False): HEADER = '\033[95m' OKBLUE = '\033[94m' @@ -7037,16 +7038,22 @@ def highlight(custom_text, list, highlight): if len(list) > 5: output += "\n" i = 0 + highlight_passed = False for item in list: i += 1 - if item == highlight: + if item == highlight and not all_done: + highlight_passed = True output += WARNING output += "{}".format(item) output += ENDC + " " - else: + elif highlight_passed == True and not all_done: output += OKBLUE output += "{}".format(item) output += ENDC + " " + else: + output += OKGREEN + output += "{}".format(item) + output += ENDC + " " if i % 6 == 0: output += "\n" + " " * (len(custom_text) + 2) From 10c37f48a951182fddb863f2dd5189bb26ad5502 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Sun, 13 Jul 2025 11:21:40 +0000 Subject: [PATCH 08/22] Collect correct mask file --- Utils/SkyFit2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index 945f64b83..693adc803 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -7145,7 +7145,7 @@ def handleLoginPath(login_path, number_of_fits=None): print("Files retrieved in {:.1f} seconds".format(time_taken)) mask_path = os.path.join(local_path, remote_config.mask_file) if os.path.exists(mask_path): - mask = getMaskFile(".", remote_config) + mask = getMaskFile(mask_path, remote_config) # If the dimensions of the mask do not match the config file, ignore the mask if (mask is not None) and (not mask.checkResolution(remote_config.width, remote_config.height)): From c5674aef40ae2eb5c4023fb70a055f95de2551ca Mon Sep 17 00:00:00 2001 From: g7gpr Date: Mon, 14 Jul 2025 01:03:58 +0000 Subject: [PATCH 09/22] Set a default path if none given --- Utils/SkyFit2.py | 45 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index 693adc803..ed595aa19 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -6878,8 +6878,8 @@ def nItemsFromList(number, input_list, drop_first=False, drop_last=False, sort=T return input_list # Avoid divide by zero error - if number < 1: - return [] + if number == 1: + return input_list[-1] # Avoid working on empty list @@ -7019,6 +7019,14 @@ def getUserHostPortPath(path): user, host, path = match.groups() return user, host, 22, path + pattern = r'^([\w\.-]+)@([\w\.-]+)$' + match_username_hostname = re.match(pattern, path) + + if match_username_hostname: + print(match_username_hostname.groups()) + user, host = match_username_hostname.groups() + return user, host, 22, "source/RMS/" + return None, None, None, None @@ -7104,10 +7112,10 @@ def isLoginPath(path): Return: is_login_path: [bool] True if a network path, else false. """ - + pattern_username_hostname = r'\b[\w\.-]+@[\w\.-]+\b' pattern_port = r'^([\w.-]+)@([\w.-]+):(\d+):(.*)$' pattern = r'^[^@:\s]+@[^@:\s]+:[^\s]+$' - is_login_path = re.match(pattern, path) or re.match(pattern_port, path) + is_login_path = re.match(pattern, path) or re.match(pattern_port, path) or re.match(pattern_username_hostname, path) return is_login_path @@ -7121,8 +7129,12 @@ def handleLoginPath(login_path, number_of_fits=None): Return: Nothing. """ + # If no argyment was passsed then set as one, return approximiate middle file number_of_fits = 1 if number_of_fits is None else int(number_of_fits) user, host, port, remote_path = getUserHostPortPath(login_path) + + # If no config path passed in then assume ~/source/RMS for .config + remote_path = "source/RMS" if not len(remote_path) else remote_path config_file_name = ('.config') if remote_path.endswith(config_file_name): remote_path = remote_path[:-len(config_file_name)] @@ -7132,20 +7144,39 @@ def handleLoginPath(login_path, number_of_fits=None): # First get the .config file files_list = ['.config'] + + # Record a start time time_started_getting_files = datetime.datetime.now(datetime.timezone.utc) + + # Parse the remote configuration remote_config = cr.parse(getFiles(host, user, port, remote_path, local_path, files_list)[0]) + + # Start making a list of files to get; the platepar, and the mask platepar_mask_list = [remote_config.platepar_name, remote_config.mask_file] + + # Get the platepar and and the mask getFiles(host, user, port, remote_path, local_path, platepar_mask_list) + + # Now get the remote paths of the remote directories of interest latest_cap_dir = os.path.join(getRemoteCapturedDirsPath(remote_config), getLatestCapturedDirectory(remote_config, host, user, port)) latest_captured_files = lsRemote(host, user, port, latest_cap_dir) + + # Filter only for fits files for the remote station fits_files = [f for f in latest_captured_files if f.endswith(".fits") and f.startswith("FF_{}".format(remote_config.stationID))] + + # Get the files getFiles(host, user, port, latest_cap_dir, local_path, fits_files, number=number_of_fits) + + # Record and print the time taken time_finished_getting_files = datetime.datetime.now(datetime.timezone.utc) time_taken = (time_finished_getting_files - time_started_getting_files).total_seconds() print("Files retrieved in {:.1f} seconds".format(time_taken)) + + # Load the mask structure mask_path = os.path.join(local_path, remote_config.mask_file) if os.path.exists(mask_path): - mask = getMaskFile(mask_path, remote_config) + mask_dir = os.path.dirname(mask_path) + mask = getMaskFile(mask_dir, remote_config) # If the dimensions of the mask do not match the config file, ignore the mask if (mask is not None) and (not mask.checkResolution(remote_config.width, remote_config.height)): @@ -7175,7 +7206,8 @@ def handleLoginPath(login_path, number_of_fits=None): arg_parser.add_argument('input_path', metavar='INPUT_PATH', type=str, help='Path to the folder with FF or image files, path to a video file, ' - ' to a state file, an RMS bz2 file, or user@host:path/to/config/ .' + ' to a state file, an RMS bz2 file, or user@host:path/to/config/ . for remote platepar fitting' + ' if no path is given, the .config is assumedd to be at ~/source/RMS/.config' ' If images or videos are given, their names must be in the format: YYYYMMDD_hhmmss.uuuuuu') @@ -7220,6 +7252,7 @@ def handleLoginPath(login_path, number_of_fits=None): arg_parser.add_argument('-u', '--number_of_fits', metavar='NUMBER_OF_FITS', type=int, help="When working remotely, number of fits files to download. \n" + "If not specified, then pick the middle file" "1 - Pick the penultimate file by time. \n" "0 - Pick the file in the approximate middle of most recent capture session. \n") From b8497702caa0618286bb39e5df0fa037146750fe Mon Sep 17 00:00:00 2001 From: g7gpr Date: Mon, 14 Jul 2025 01:12:13 +0000 Subject: [PATCH 10/22] Update docstrings --- Utils/SkyFit2.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index ed595aa19..c20d8d8f0 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -6750,7 +6750,7 @@ def handleBZ2(bz2_path): Arguments: bz2_path: [path] Path to a bz2 file. - Returns: + Return: working_dir: [path] Path to a directory containing .config, fits files and if available, a mask. """ @@ -6795,6 +6795,18 @@ def handleBZ2(bz2_path): sys.exit(a) def lsRemote(host, username, port, remote_path): + """Return the files in a remote directory. + + Arguments: + host: [str] remote host. + username: [str] user account to use. + port: [int] remote port number. + remote_pat: [str] path of remote directory to list. + + Return: + files: [list of strings] Names of remote files. + """ + ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Accept unknown host keys ssh.connect(hostname=host, port=port, username=username) @@ -6879,7 +6891,7 @@ def nItemsFromList(number, input_list, drop_first=False, drop_last=False, sort=T # Avoid divide by zero error if number == 1: - return input_list[-1] + return [input_list[-1]] # Avoid working on empty list @@ -6993,7 +7005,8 @@ def putFiles(host, user, port, local_path, remote_path, files_list): return local_target_list def getUserHostPortPath(path): - """Passed a user@host:port:path, or user@host:path return components, assuming port 22 + """Passed a user@host:port:path, or user@host:path return components, assuming port 22, and + if no path, assume source/RMS/ Arguments: path: [str] path to be broken apart. @@ -7023,7 +7036,6 @@ def getUserHostPortPath(path): match_username_hostname = re.match(pattern, path) if match_username_hostname: - print(match_username_hostname.groups()) user, host = match_username_hostname.groups() return user, host, 22, "source/RMS/" From af1b551162bd780edd4012eeafdc775a92a561c0 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Mon, 14 Jul 2025 01:16:46 +0000 Subject: [PATCH 11/22] Add a print call when getting remote directory list --- Utils/SkyFit2.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index c20d8d8f0..49707ecaf 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -7110,7 +7110,9 @@ def getLatestCapturedDirectory(r, host, user, port): [path] path to remote captured files directory. """ - remote_captured_directory_list = lsRemote(host, user, port, getRemoteCapturedDirsPath(r)) + remote_directory = getRemoteCapturedDirsPath(r) + print("Getting directory list of {}@{}:{}:{}".format(user, host, port, remote_directory)) + remote_captured_directory_list = lsRemote(host, user, port, remote_directory) remote_captured_directory_list = [d for d in remote_captured_directory_list if d.startswith(r.stationID)] remote_captured_directory_list.sort(reverse=True) return remote_captured_directory_list[0] From 433d42fbf8049004453dc7ec7ac1155e4e9f2938 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Fri, 18 Jul 2025 13:46:34 +0000 Subject: [PATCH 12/22] use dirname --- Utils/SkyFit2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index 49707ecaf..803b37f4a 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -6813,7 +6813,7 @@ def lsRemote(host, username, port, remote_path): try: sftp = ssh.open_sftp() - files = sftp.listdir(remote_path) + files = sftp.listdir(os.path.basname(remote_path)) return files finally: sftp.close() From 8a9dddf24d68f36a5a4593e4d3509d31124964b6 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Fri, 18 Jul 2025 13:50:41 +0000 Subject: [PATCH 13/22] Fix paths --- Utils/SkyFit2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index 803b37f4a..a9d6054da 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -6813,7 +6813,7 @@ def lsRemote(host, username, port, remote_path): try: sftp = ssh.open_sftp() - files = sftp.listdir(os.path.basname(remote_path)) + files = sftp.listdir(remote_path) return files finally: sftp.close() @@ -6858,7 +6858,7 @@ def downloadFile(host, username, port, remote_path, local_path): quit() try: sftp = ssh.open_sftp() - remote_file_list = sftp.listdir(remote_path) + remote_file_list = sftp.listdir(os.path.dirname(remote_path)) if remote_file_list: sftp.get(remote_path, local_path) From caf1f3c026ef03282d29743bb1993d83c772a117 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Fri, 18 Jul 2025 14:46:18 +0000 Subject: [PATCH 14/22] Fix paths --- Utils/SkyFit2.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index a9d6054da..3ac315ed0 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -6813,6 +6813,7 @@ def lsRemote(host, username, port, remote_path): try: sftp = ssh.open_sftp() + print("Remote path {}".format(remote_path)) files = sftp.listdir(remote_path) return files finally: From facd1cfe565f4d58c44cbcadefd965a2163bbd11 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Fri, 1 Aug 2025 14:55:51 +0000 Subject: [PATCH 15/22] Running without input path --- Utils/SkyFit2.py | 92 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 86 insertions(+), 6 deletions(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index 3ac315ed0..7a160daef 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -452,7 +452,7 @@ def __len__(self): class PlateTool(QtWidgets.QMainWindow): def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None, use_fr_files=False, geo_points_input=None, startUI=True, mask=None, nobg=False, peribg=False, flipud=False, - flatbiassub=False): + flatbiassub=False, platepar_file=None, fits_file=None): """ SkyFit interactive window. Arguments: @@ -475,6 +475,8 @@ def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None coloured mask instead of the avepixel. False by default. flipud: [bool] Flip the image upside down. False by default. flatbiassub: [bool] Subtract flat and bias frames. False by default. + platepar_file: [str] path to platepar file to load + fits_file: [str] path to fits file to open """ super(PlateTool, self).__init__() @@ -494,6 +496,7 @@ def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None self.config = config + # Store forced time of first frame self.beginning_time = beginning_time @@ -614,6 +617,10 @@ def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None # Detect data input type and init the image handle self.detectInputType(load=True, beginning_time=beginning_time, use_fr_files=self.use_fr_files) + if fits_file is not None: + print("Loading {}".format(fits_file)) + self.img_handle.setCurrentFF(fits_file) + # Update the FPS if it's forced self.setFPS() @@ -649,7 +656,7 @@ def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None # PLATEPAR # Load the platepar file - self.loadPlatepar() + self.loadPlatepar(platepar_file=platepar_file) # Set the given gamma value to platepar @@ -7150,7 +7157,7 @@ def handleLoginPath(login_path, number_of_fits=None): # If no config path passed in then assume ~/source/RMS for .config remote_path = "source/RMS" if not len(remote_path) else remote_path - config_file_name = ('.config') + config_file_name = '.config' if remote_path.endswith(config_file_name): remote_path = remote_path[:-len(config_file_name)] print("Getting .config from {}@{}:{}:{}".format(user, host, port, remote_path)) @@ -7219,10 +7226,10 @@ def handleLoginPath(login_path, number_of_fits=None): # Init the command line arguments parser arg_parser = argparse.ArgumentParser(description="Tool for fitting astrometry plates and photometric calibration.") - arg_parser.add_argument('input_path', metavar='INPUT_PATH', type=str, + arg_parser.add_argument('input_path', metavar='INPUT_PATH', type=str, nargs='?', help='Path to the folder with FF or image files, path to a video file, ' ' to a state file, an RMS bz2 file, or user@host:path/to/config/ . for remote platepar fitting' - ' if no path is given, the .config is assumedd to be at ~/source/RMS/.config' + ' if no path is given, the .config is assumed to be at ~/source/RMS/.config' ' If images or videos are given, their names must be in the format: YYYYMMDD_hhmmss.uuuuuu') @@ -7281,7 +7288,78 @@ def handleLoginPath(login_path, number_of_fits=None): cml_args = arg_parser.parse_args() ######################### + platepar_file = None + best_fits_file = None + mask_file = None + if cml_args.input_path is None: + print("No input path specified") + platepar_file = None + if cml_args.config is None: + c = cr.parse(os.path.expanduser(os.path.join(os.getcwd(), ".config"))) + cml_args.config_path = os.path.expanduser(os.path.join(os.getcwd(), ".config")) + else: + c = cr.parse(os.path.expanduser(cml_args.config)) + cml_args.config_path = os.path.expanduser(cml_args.config) + if cml_args.mask is None: + mask_path = os.path.expanduser(os.path.join(os.getcwd(), c.mask_file)) + if os.path.exists(mask_path): + cml_args.mask_path = mask_path + + captured_directory_path = os.path.expanduser(os.path.join(c.data_dir, c.captured_dir)) + station = c.stationID + captured_directory_list = os.listdir(captured_directory_path) + captured_directory_list.sort(reverse=True) + captured_directory_calstars = {} + + for potential_captured_directory in captured_directory_list: + one_valid_fits = False + captured_directory_full_path = os.path.join(captured_directory_path, potential_captured_directory) + if potential_captured_directory.startswith("{}_".format(station)) and os.path.isdir( + captured_directory_full_path): + dir_date = potential_captured_directory.split("_")[1] + dir_time = potential_captured_directory.split("_")[2] + dir_us = potential_captured_directory.split("_")[3] + calstars_file_name = "CALSTARS_{}_{}_{}_{}.txt".format(station, dir_date, dir_time, dir_us) + calstars_full_path = os.path.join(captured_directory_full_path, calstars_file_name) + if os.path.exists(calstars_full_path): + calstar_data, _ = CALSTARS.readCALSTARS(captured_directory_full_path, calstars_file_name) + star_count_max = 0 + best_fits_file = None + for calstar_entry in calstar_data: + fits_file = calstar_entry[0] + star_count = len(calstar_entry[1]) + if star_count > star_count_max: + if os.path.exists(os.path.join(captured_directory_full_path, fits_file)): + best_fits_file = fits_file + star_count_max = star_count + print("Best fits file {} has {} stars".format(best_fits_file, star_count_max)) + captured_directory_calstars[captured_directory_full_path] = [best_fits_file, star_count_max] + + # Now check to see if the directory contains at least one valid FITS file + captured_directory_contents = os.listdir(captured_directory_full_path) + for file_name in captured_directory_contents: + if file_name.startswith("FF_{}".format(station.upper())) and file_name.endswith(".fits"): + one_valid_fits = True + break + if one_valid_fits: + break + + print("In directory {} we found fits file {}".format(captured_directory_full_path, file_name)) + if len(captured_directory_calstars): + print("We found a calstars file for directory {}".format(captured_directory_full_path)) + print("Best file was {} with {} stars".format(best_fits_file, star_count_max)) + else: + print("We did not find calstars for directory {}".format(captured_directory_full_path)) + + potential_platepar_path = os.path.join(os.getcwd(), "platepar_cmn2010.cal") + if os.path.exists(potential_platepar_path): + platepar_file = potential_platepar_path + else: + platepar_file = None + + cml_args.input_path = captured_directory_full_path + pass # Parse the beginning time into a datetime object if cml_args.timebeg is not None: @@ -7377,11 +7455,13 @@ def handleLoginPath(login_path, number_of_fits=None): mask.width, mask.height, config.width, config.height)) mask = None + + # Init SkyFit plate_tool = PlateTool(input_path, config, beginning_time=beginning_time, fps=cml_args.fps, \ gamma=cml_args.gamma, use_fr_files=cml_args.fr, geo_points_input=cml_args.geopoints, mask=mask, nobg=cml_args.nobg, peribg=cml_args.peribg, flipud=cml_args.flipud, - flatbiassub=cml_args.flatbiassub) + flatbiassub=cml_args.flatbiassub, platepar_file=platepar_file, fits_file=best_fits_file) # Run the GUI app From fa13319eac333e843df2dbb2c2cbee7b79d93b98 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Fri, 1 Aug 2025 16:32:24 +0000 Subject: [PATCH 16/22] Updated docstrings --- Utils/SkyFit2.py | 208 +++++++++++++++++++++++++++++++---------------- 1 file changed, 137 insertions(+), 71 deletions(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index 7a160daef..2b0186ce9 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -617,9 +617,6 @@ def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None # Detect data input type and init the image handle self.detectInputType(load=True, beginning_time=beginning_time, use_fr_files=self.use_fr_files) - if fits_file is not None: - print("Loading {}".format(fits_file)) - self.img_handle.setCurrentFF(fits_file) # Update the FPS if it's forced self.setFPS() @@ -672,6 +669,11 @@ def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None print() + if fits_file is not None: + print("Loading {}".format(fits_file)) + self.img_handle.setCurrentFF(fits_file) + + # INIT WINDOW if startUI: self.setupUI() @@ -7220,6 +7222,137 @@ def handleLoginPath(login_path, number_of_fits=None): sys.exit(a) +def getFITSMostStars(calstars_full_path): + """ + Use the calstars file to find the fits file which exists in the respective directory with the most stars + Args: + calstars_full_path: [str] full path to the calstars file + + Return: + best_fits_file:[str] full path to the fits file with the most stars + star_count_max[int]: number of stars on that fits file + """ + + best_fits_file, star_count_max = None, 0 + if os.path.exists(calstars_full_path): + captured_directory_full_path = os.path.dirname(calstars_full_path) + calstars_file_name = os.path.basename(calstars_full_path) + calstar_data, _ = CALSTARS.readCALSTARS(captured_directory_full_path, calstars_file_name) + for calstar_entry in calstar_data: + fits_file = calstar_entry[0] + star_count = len(calstar_entry[1]) + if star_count > star_count_max: + if os.path.exists(os.path.join(captured_directory_full_path, fits_file)): + best_fits_file = os.path.join(captured_directory_full_path, fits_file) + star_count_max = star_count + print("Best fits file {} has {} stars".format(best_fits_file, star_count_max)) + + + return best_fits_file, star_count_max + +def getCalstarsPath(captured_directory, config): + """ + + Arguments: + captured_directory: [str] RMS captured files directory + config: [config] RMS config instance + + Return: + calstars_full_path:[str] full path to the calstars file + """ + + dir_date = captured_directory.split("_")[1] + dir_time = captured_directory.split("_")[2] + dir_us = captured_directory.split("_")[3] + captured_directory_full_path = os.path.join(config.data_dir, config.captured_dir, captured_directory) + calstars_file_name = "CALSTARS_{}_{}_{}_{}.txt".format(config.stationID, dir_date, dir_time, dir_us) + calstars_full_path = os.path.join(captured_directory_full_path, calstars_file_name) + + return calstars_full_path + + +def getPlateparFilePath(config): + """ + + Arguments: + config: [config] RMS config instance + + Return: + platepar_file_path: [str] full path to a platepar file else None + """ + + potential_platepar_path = os.path.join(os.getcwd(), config.platepar_name) + if os.path.exists(potential_platepar_path): + platepar_file_path = potential_platepar_path + else: + platepar_file_path = None + + return platepar_file_path + +def handleNoInputPath(): + """ + If no input path is specified then determine some good parameters for starting the platetool + + The most recent captured files directory is chosen. + If this contains a calstar file, then the path to the fits file with the most stars is returned + If no calstar file, then none is returned + The default config and mask are also returned + + Return: + captured_directory_full_path: [str] full path to the most recent captured directory which contains at least one fits file + platepar_file: [str] full path to the platepar file + mask_path: [str] full path to the mask kile + best_fits_file: [str] full path to the fits file with the most stars + c: [config] rms config instance + + + """ + + + print("No input path specified, determining folder to use") + + if cml_args.config is None: + c = cr.parse(os.path.expanduser(os.path.join(os.getcwd(), ".config"))) + cml_args.config_path = os.path.expanduser(os.path.join(os.getcwd(), ".config")) + else: + c = cr.parse(os.path.expanduser(cml_args.config)) + cml_args.config_path = os.path.expanduser(cml_args.config) + + if cml_args.mask is None: + mask_path = os.path.expanduser(os.path.join(os.getcwd(), c.mask_file)) + if os.path.exists(mask_path): + cml_args.mask_path = mask_path + else: + mask_path = cml_args.mask_path + + captured_directory_path = os.path.expanduser(os.path.join(str(c.data_dir), str(c.captured_dir))) + station = c.stationID + captured_directory_list = os.listdir(captured_directory_path) + captured_directory_full_path = None + if not len(captured_directory_list): + print("No captured directories found, cannot continue") + quit() + + for potential_captured_directory in sorted(captured_directory_list, reverse=True): + one_valid_fits = False + captured_directory_full_path = os.path.join(captured_directory_path, potential_captured_directory) + if potential_captured_directory.startswith("{}_".format(station)) and os.path.isdir( + captured_directory_full_path): + best_fits_file, star_count_max = getFITSMostStars(getCalstarsPath(potential_captured_directory, c)) + + # Now check to see if the directory contains at least one valid FITS file + captured_directory_contents = os.listdir(captured_directory_full_path) + for file_name in captured_directory_contents: + if file_name.startswith("FF_{}".format(station.upper())) and file_name.endswith(".fits"): + one_valid_fits = True + break + if one_valid_fits: + break + + platepar_file = getPlateparFilePath(c) + + return captured_directory_full_path, platepar_file, mask_path, best_fits_file, c + if __name__ == '__main__': ### COMMAND LINE ARGUMENTS @@ -7292,74 +7425,7 @@ def handleLoginPath(login_path, number_of_fits=None): best_fits_file = None mask_file = None if cml_args.input_path is None: - print("No input path specified") - platepar_file = None - if cml_args.config is None: - c = cr.parse(os.path.expanduser(os.path.join(os.getcwd(), ".config"))) - cml_args.config_path = os.path.expanduser(os.path.join(os.getcwd(), ".config")) - else: - c = cr.parse(os.path.expanduser(cml_args.config)) - cml_args.config_path = os.path.expanduser(cml_args.config) - if cml_args.mask is None: - mask_path = os.path.expanduser(os.path.join(os.getcwd(), c.mask_file)) - if os.path.exists(mask_path): - cml_args.mask_path = mask_path - - captured_directory_path = os.path.expanduser(os.path.join(c.data_dir, c.captured_dir)) - station = c.stationID - captured_directory_list = os.listdir(captured_directory_path) - captured_directory_list.sort(reverse=True) - captured_directory_calstars = {} - - for potential_captured_directory in captured_directory_list: - one_valid_fits = False - captured_directory_full_path = os.path.join(captured_directory_path, potential_captured_directory) - if potential_captured_directory.startswith("{}_".format(station)) and os.path.isdir( - captured_directory_full_path): - dir_date = potential_captured_directory.split("_")[1] - dir_time = potential_captured_directory.split("_")[2] - dir_us = potential_captured_directory.split("_")[3] - calstars_file_name = "CALSTARS_{}_{}_{}_{}.txt".format(station, dir_date, dir_time, dir_us) - calstars_full_path = os.path.join(captured_directory_full_path, calstars_file_name) - if os.path.exists(calstars_full_path): - calstar_data, _ = CALSTARS.readCALSTARS(captured_directory_full_path, calstars_file_name) - star_count_max = 0 - best_fits_file = None - for calstar_entry in calstar_data: - fits_file = calstar_entry[0] - star_count = len(calstar_entry[1]) - if star_count > star_count_max: - if os.path.exists(os.path.join(captured_directory_full_path, fits_file)): - best_fits_file = fits_file - star_count_max = star_count - print("Best fits file {} has {} stars".format(best_fits_file, star_count_max)) - captured_directory_calstars[captured_directory_full_path] = [best_fits_file, star_count_max] - - # Now check to see if the directory contains at least one valid FITS file - captured_directory_contents = os.listdir(captured_directory_full_path) - for file_name in captured_directory_contents: - if file_name.startswith("FF_{}".format(station.upper())) and file_name.endswith(".fits"): - one_valid_fits = True - break - if one_valid_fits: - break - - print("In directory {} we found fits file {}".format(captured_directory_full_path, file_name)) - - if len(captured_directory_calstars): - print("We found a calstars file for directory {}".format(captured_directory_full_path)) - print("Best file was {} with {} stars".format(best_fits_file, star_count_max)) - else: - print("We did not find calstars for directory {}".format(captured_directory_full_path)) - - potential_platepar_path = os.path.join(os.getcwd(), "platepar_cmn2010.cal") - if os.path.exists(potential_platepar_path): - platepar_file = potential_platepar_path - else: - platepar_file = None - - cml_args.input_path = captured_directory_full_path - pass + cml_args.input_path, platepar_file, cml_args.mask_path, best_fits_file, config = handleNoInputPath() # Parse the beginning time into a datetime object if cml_args.timebeg is not None: From 3218306887c81dd96d4e01f8cb271f310fddb3ce Mon Sep 17 00:00:00 2001 From: g7gpr Date: Sun, 3 Aug 2025 08:59:47 +0000 Subject: [PATCH 17/22] New dialog box to select stations --- RMS/Routines/CustomPyqtgraphClasses.py | 32 +++ Utils/SkyFit2.py | 348 ++++++++++++++++++++----- 2 files changed, 312 insertions(+), 68 deletions(-) diff --git a/RMS/Routines/CustomPyqtgraphClasses.py b/RMS/Routines/CustomPyqtgraphClasses.py index f2d2a6156..5d238d290 100644 --- a/RMS/Routines/CustomPyqtgraphClasses.py +++ b/RMS/Routines/CustomPyqtgraphClasses.py @@ -15,6 +15,7 @@ from RMS.Formats.FFfile import reconstructFrame as reconstructFrameFF from RMS.Routines import Image from RMS.Routines.DebruijnSequence import findAllInDeBruijnSequence, generateDeBruijnSequence +from PyQt5.QtWidgets import QDialog, QVBoxLayout, QLabel, QComboBox, QPushButton, QSizePolicy import time import re @@ -169,6 +170,37 @@ def setIcon(self, icon): pass +class ComboDialog(QDialog): + def __init__(self, options_dict, window_title="Select", label="Options"): + super().__init__() + + self.resize(300,100) + self.setWindowTitle(window_title) + + # Layout + layout = QVBoxLayout() + + # Label + self.label = QLabel(label) + layout.addWidget(self.label) + + # Combo Box + self.combo = QComboBox() + self.combo.addItems(options_dict.keys()) + layout.addWidget(self.combo) + + # Button + + ok_button = QPushButton("OK") + ok_button.clicked.connect(self.accept) + layout.addWidget(ok_button) + + self.setLayout(layout) + + def get_selection(self): + return self.combo.currentText() + + class TextItemList(pg.GraphicsObject): """ Allows for a list of TextItems without having to constantly add items to a widget diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index 2b0186ce9..59898aae6 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -450,9 +450,9 @@ def __len__(self): class PlateTool(QtWidgets.QMainWindow): - def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None, use_fr_files=False, + def __init__(self, input_path, config, config_path=os.getcwd(), beginning_time=None, fps=None, gamma=None, use_fr_files=False, geo_points_input=None, startUI=True, mask=None, nobg=False, peribg=False, flipud=False, - flatbiassub=False, platepar_file=None, fits_file=None): + flatbiassub=False, platepar_file=None, fits_file_to_open=None): """ SkyFit interactive window. Arguments: @@ -481,6 +481,38 @@ def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None super(PlateTool, self).__init__() + + mask_path, fits_file, star_count_max = None, None, 0 + if input_path is None: + input_path, platepar_file, mask_path, mask, fits_file, star_count_max, config, config_path = handleNoInputPath(input_path=input_path) + else: + if len(input_path) == 6: + input_path, platepar_file, mask_path, mask, fits_file, star_count_max, config, config_path = handleNoInputPath( + input_path=input_path) + + + fits_file_to_open = fits_file + if fits_file is not None: + print("Opening {} which has {} stars".format(fits_file, star_count_max)) + # This message box is just for debugging + if False: + message = "" + message += "Config directory {} \n".format(os.path.dirname(config_path)) + if platepar_file is None: + message += "No valid platepar found for this station\n" + else: + message += "Platepar directory {} \n".format(os.path.dirname(platepar_file)) + if mask_path is None: + message += "No mask given" + else: + message += "Mask directory {} \n".format(os.path.dirname(mask_path)) + message += "fits directory {} \n".format(os.path.basename(input_path)) + if fits_file_to_open is None and fits_file is not None: + fits_file_to_open = os.path.basename(fits_file) + message += "Opening {}, which has {} stars".format(os.path.basename(fits_file_to_open), star_count_max) + qmessagebox(title='Station {} has data available'.format(config.stationID), \ + message=message, + message_type="information") # Mode of operation - skyfit for fitting astrometric plates, manualreduction for manual picking # of position on frames and photometry self.mode = 'skyfit' @@ -617,7 +649,8 @@ def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None # Detect data input type and init the image handle self.detectInputType(load=True, beginning_time=beginning_time, use_fr_files=self.use_fr_files) - + if fits_file_to_open is not None and os.path.exists(fits_file_to_open): + self.img_handle.setCurrentFF(os.path.basename(fits_file_to_open)) # Update the FPS if it's forced self.setFPS() @@ -649,10 +682,12 @@ def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None self.loadCalstars() + ################################################################################################### # PLATEPAR # Load the platepar file + print("Platepar is {}".format(platepar_file)) self.loadPlatepar(platepar_file=platepar_file) @@ -667,16 +702,16 @@ def __init__(self, input_path, config, beginning_time=None, fps=None, gamma=None ################################################################################################### - print() + if fits_file is not None: print("Loading {}".format(fits_file)) - self.img_handle.setCurrentFF(fits_file) + # INIT WINDOW if startUI: - self.setupUI() + self.setupUI(starting_ff=fits_file) def setFPS(self): @@ -689,13 +724,15 @@ def setFPS(self): print("Forcing video FPS to:", self.fps) - def setupUI(self, loaded_file=False): + def setupUI(self, loaded_file=False, starting_ff=None): """ Setup pyqt UI with widgets. No variables worth saving should be defined here. Keyword arguments: loaded_file: [bool] Loaded a state from a file. False by default. """ + + self.central = QtWidgets.QWidget() self.setCentralWidget(self.central) @@ -4505,6 +4542,7 @@ def detectInputType(self, beginning_time=None, use_fr_files=False, load=False): self.img_handle = img_handle + def loadCalstars(self): """ Loads data from calstars file and updates self.calstars """ @@ -7289,7 +7327,7 @@ def getPlateparFilePath(config): return platepar_file_path -def handleNoInputPath(): +def handleNoInputPath(input_path=None): """ If no input path is specified then determine some good parameters for starting the platetool @@ -7307,9 +7345,19 @@ def handleNoInputPath(): """ + station_from_command_line = None + if input_path is not None: + if len(input_path) == 6 and input_path[0:2].isalpha(): + station_from_command_line = input_path.upper() + else: + station_from_command_line = None + # This will hold the configs, and paths to platepars and masks for all valid stations found + config_platepar_mask_dict = {} + best_fits_file = None + star_count_max = 0 - print("No input path specified, determining folder to use") + # Load the config in ~/source/RMS/.config if cml_args.config is None: c = cr.parse(os.path.expanduser(os.path.join(os.getcwd(), ".config"))) @@ -7318,12 +7366,73 @@ def handleNoInputPath(): c = cr.parse(os.path.expanduser(cml_args.config)) cml_args.config_path = os.path.expanduser(cml_args.config) + # Handle the single station per user account case + if not c.stationID.startswith("XX"): + # If we have fits, then populate + if anyFits(verifyCapturedDirectories(getCapturedDirectoryObjects(c), c), c): + config_platepar_mask_dict[c.stationID] = [ + c, + cml_args.config_path, + getPlateparPath(os.getcwd), + getMaskPath(os.getcwd())] + + # Are we in a multiple camera per username environment + # Check to see if there is a XX at the start of the stationID or the ~/source/Stations directory exists + multi_cam_stations_directory = os.path.join(os.path.dirname(os.getcwd()), "Stations") + if c.stationID.startswith("XX") or os.path.exists(multi_cam_stations_directory): + + # This dictionary will hold lists [config, platepar_path, mask_path], key will be station directory + if os.path.exists(multi_cam_stations_directory): + potential_station_directory_list = sorted(os.listdir(multi_cam_stations_directory)) + # Iterate over the stations and collect what we can + for potential_station_directory in potential_station_directory_list: + full_path_potential_station_directory = os.path.join(multi_cam_stations_directory, potential_station_directory) + config_path = os.path.join(full_path_potential_station_directory, os.path.basename(c.config_file_name)) + + # If we have a valid config, load it, if it is not valid, then move to the next stations + try: + # Load the multi-cam config + mc_c = cr.parse(os.path.expanduser(config_path)) + except: + continue + + # If no fits file was found, then skip this station + if not anyFits(verifyCapturedDirectories(getCapturedDirectoryObjects(mc_c), mc_c), mc_c): + continue + + if mc_c.stationID == potential_station_directory: + config_platepar_mask_dict[potential_station_directory] = \ + [mc_c, + config_path, + getPlateparPath(potential_station_directory, multi_cam=True), + getMaskPath(potential_station_directory, multi_cam=True)] + + if station_from_command_line in config_platepar_mask_dict.keys(): + selected_station = station_from_command_line + else: + + dialog = ComboDialog(config_platepar_mask_dict, + window_title="Select station to calibrate", + label="Stations available for calibration:") + if dialog.exec_() == QDialog.Accepted: + selected_station = dialog.get_selection() + print("Selected station {}".format(selected_station)) + else: + sys.exit() + + + station_data = config_platepar_mask_dict[selected_station] + c = station_data[0] + cml_args.config_path = station_data[1] + platepar_file = station_data[2] + cml_args.mask = station_data[3] + if cml_args.mask is None: mask_path = os.path.expanduser(os.path.join(os.getcwd(), c.mask_file)) if os.path.exists(mask_path): - cml_args.mask_path = mask_path + cml_args.mask = mask_path else: - mask_path = cml_args.mask_path + mask_path = cml_args.mask captured_directory_path = os.path.expanduser(os.path.join(str(c.data_dir), str(c.captured_dir))) station = c.stationID @@ -7349,9 +7458,104 @@ def handleNoInputPath(): if one_valid_fits: break - platepar_file = getPlateparFilePath(c) + mask = None + cml_args.mask = os.path.dirname(mask_path) + if os.path.exists(cml_args.mask): + mask = getMaskFile(cml_args.mask, c) + + if (mask is not None) and (not mask.checkResolution(c.width, c.height)): + print("Mask resolution ({:d}, {:d}) does not match the image resolution ({:d}, {:d}). Ignoring the mask.".format( + mask.width, mask.height, c.width, c.height)) + mask = None + + return captured_directory_full_path, platepar_file, mask_path, mask, best_fits_file, star_count_max, c, cml_args.config_path + + +def getMaskPath(station_directory, multi_cam=False): + # Determine the mask path + if multi_cam: + multi_cam_stations_directory = os.path.join(os.path.dirname(os.getcwd()), "Stations") + full_path_potential_station_directory = os.path.join(multi_cam_stations_directory, station_directory) + c = cr.parse(os.path.expanduser(os.path.join(full_path_potential_station_directory, ".config"))) + mask_path = os.path.join(full_path_potential_station_directory, c.mask_file) + else: + c = cr.parse(os.path.expanduser(os.path.join(os.getcwd(), ".config"))) + mask_path = os.path.join(os.getcwd(), c.mask_file) + + if not os.path.exists(mask_path): + mask_path = None + return mask_path + + +def getPlateparPath(station_directory, multi_cam=False): + # Determine the platepar_path + if multi_cam: + multi_cam_stations_directory = os.path.join(os.path.dirname(os.getcwd()), "Stations") + full_path_potential_station_directory = os.path.join(multi_cam_stations_directory, station_directory) + c = cr.parse(os.path.expanduser(os.path.join(full_path_potential_station_directory, ".config"))) + platepar_path = os.path.join(full_path_potential_station_directory, c.platepar_name) + else: + c = cr.parse(os.path.expanduser(os.path.join(os.getcwd(), ".config"))) + platepar_path = os.path.join(os.getcwd(), c.platepar_name) + + if os.path.exists(platepar_path): + pp = Platepar() + try: + pp.read(platepar_path) + except: + pp = None + if pp is None: + return None + else: + if pp.station_code != station_directory and multi_cam: + pp = None + platepar_path = None + else: + return platepar_path + + return None + + +def anyFits(directory_list, mc_c): + # Does at least one of the captured_directories contain a fits file + one_fits_file_found = False + for captured_directory in directory_list: + file_list = os.listdir(captured_directory) + for test_file in file_list: + if test_file.startswith("FF_{}_".format(mc_c.stationID)) \ + and test_file.endswith(".fits") \ + and len(test_file.split("_")) == 6: + # This is probably a fits file + one_fits_file_found = True + break + if one_fits_file_found: + break + return one_fits_file_found + + +def verifyCapturedDirectories(directory_list, mc_c): + # Filter these list of files in the captured directory for directories which match expected pattern + full_path_to_captured_files_directory = os.path.join(mc_c.data_dir, mc_c.captured_dir) + verified_directory_list = [] + for potential_directory in directory_list: + if not os.path.isdir(os.path.join(full_path_to_captured_files_directory, potential_directory)): + # If it is not a directory, continue + continue + # Check as much as we reasonably can that this is not some random directory saved here + if potential_directory.startswith("{}_".format(mc_c.stationID)) \ + and len(potential_directory.split("_")) == 4: + verified_directory_list.append(os.path.join(full_path_to_captured_files_directory, potential_directory)) + return sorted(verified_directory_list) + + +def getCapturedDirectoryObjects(config): + # Get the list of objects in the captured directory, if this directory exists + full_path_to_captured_files_directory = os.path.join(config.data_dir, config.captured_dir) + captured_directory_list = [] + if os.path.exists(full_path_to_captured_files_directory): + captured_directory_list = os.listdir(full_path_to_captured_files_directory) + return sorted(captured_directory_list) - return captured_directory_full_path, platepar_file, mask_path, best_fits_file, c if __name__ == '__main__': ### COMMAND LINE ARGUMENTS @@ -7424,8 +7628,7 @@ def handleNoInputPath(): platepar_file = None best_fits_file = None mask_file = None - if cml_args.input_path is None: - cml_args.input_path, platepar_file, cml_args.mask_path, best_fits_file, config = handleNoInputPath() + # Parse the beginning time into a datetime object if cml_args.timebeg is not None: @@ -7449,85 +7652,94 @@ def handleNoInputPath(): app = QtWidgets.QApplication(sys.argv) - # If the state file was given, load the state - if cml_args.input_path.endswith('.state'): - - dir_path, state_name = os.path.split(cml_args.input_path) - config = cr.loadConfigFromDirectory(cml_args.config, cml_args.input_path) + if cml_args.input_path is None: + input_path = None + config = None + mask = None + elif len(cml_args.input_path) == 6: + input_path = cml_args.input_path + config = None + mask = None + else: + # If the state file was given, load the state + if cml_args.input_path.endswith('.state'): - # Create plate_tool without calling its constructor then calling loadstate - plate_tool = PlateTool.__new__(PlateTool) - super(PlateTool, plate_tool).__init__() + dir_path, state_name = os.path.split(cml_args.input_path) + config = cr.loadConfigFromDirectory(cml_args.config, cml_args.input_path) - if cml_args.mask is not None: - print("Given a path to a mask at {}".format(cml_args.mask)) - mask = getMaskFile(os.path.expanduser(cml_args.mask), config) + # Create plate_tool without calling its constructor then calling loadstate + plate_tool = PlateTool.__new__(PlateTool) + super(PlateTool, plate_tool).__init__() - elif os.path.exists(os.path.join(config.rms_root_dir, config.mask_file)): - print("No mask specified loading mask from {}".format(os.path.join(config.rms_root_dir, config.mask_file))) - mask = getMaskFile(config.rms_root_dir, config) + if cml_args.mask is not None: + print("Given a path to a mask at {}".format(cml_args.mask)) + mask = getMaskFile(os.path.expanduser(cml_args.mask), config) - elif os.path.exists("mask.bmp"): - mask = getMaskFile(".", config) + elif os.path.exists(os.path.join(config.rms_root_dir, config.mask_file)): + print("No mask specified loading mask from {}".format(os.path.join(config.rms_root_dir, config.mask_file))) + mask = getMaskFile(config.rms_root_dir, config) - elif True: - mask = None + elif os.path.exists("mask.bmp"): + mask = getMaskFile(".", config) - # If the dimensions of the mask do not match the config file, ignore the mask - if (mask is not None) and (not mask.checkResolution(config.width, config.height)): - print("Mask resolution ({:d}, {:d}) does not match the image resolution ({:d}, {:d}). Ignoring the mask.".format( - mask.width, mask.height, config.width, config.height)) - mask = None + elif True: + mask = None - plate_tool.loadState(dir_path, state_name, beginning_time=beginning_time, mask=mask) + # If the dimensions of the mask do not match the config file, ignore the mask + if (mask is not None) and (not mask.checkResolution(config.width, config.height)): + print("Mask resolution ({:d}, {:d}) does not match the image resolution ({:d}, {:d}). Ignoring the mask.".format( + mask.width, mask.height, config.width, config.height)) + mask = None - elif cml_args.input_path.endswith('.bz2'): - handleBZ2(cml_args.input_path) + plate_tool.loadState(dir_path, state_name, beginning_time=beginning_time, mask=mask) - elif isLoginPath(cml_args.input_path): - handleLoginPath(cml_args.input_path, cml_args.number_of_fits) + elif cml_args.input_path.endswith('.bz2'): + handleBZ2(cml_args.input_path) - else: + elif isLoginPath(cml_args.input_path): + handleLoginPath(cml_args.input_path, cml_args.number_of_fits) - # Extract the data directory path - input_path = cml_args.input_path.replace('"', '') - if os.path.isfile(input_path): - dir_path = os.path.dirname(input_path) else: - dir_path = input_path - # Load the config file - config = cr.loadConfigFromDirectory(cml_args.config, dir_path) + # Extract the data directory path + input_path = cml_args.input_path.replace('"', '') + if os.path.isfile(input_path): + dir_path = os.path.dirname(input_path) + else: + dir_path = input_path + # Load the config file + config = cr.loadConfigFromDirectory(cml_args.config, dir_path) - if cml_args.mask is not None: - print("Given a path to a mask at {}".format(cml_args.mask)) - mask = getMaskFile(os.path.expanduser(cml_args.mask), config) - elif os.path.exists(os.path.join(config.rms_root_dir, config.mask_file)): + if cml_args.mask is not None: + print("Given a path to a mask at {}".format(cml_args.mask)) + mask = getMaskFile(os.path.expanduser(cml_args.mask), config) - print("No mask specified loading mask from {}".format(os.path.join(config.rms_root_dir, config.mask_file))) - mask = getMaskFile(config.rms_root_dir, config) + elif os.path.exists(os.path.join(config.rms_root_dir, config.mask_file)): - elif os.path.exists("mask.bmp"): - mask = getMaskFile(".", config) + print("No mask specified loading mask from {}".format(os.path.join(config.rms_root_dir, config.mask_file))) + mask = getMaskFile(config.rms_root_dir, config) - else: - mask = None + elif os.path.exists("mask.bmp"): + mask = getMaskFile(".", config) - # If the dimensions of the mask do not match the config file, ignore the mask - if (mask is not None) and (not mask.checkResolution(config.width, config.height)): - print("Mask resolution ({:d}, {:d}) does not match the image resolution ({:d}, {:d}). Ignoring the mask.".format( - mask.width, mask.height, config.width, config.height)) - mask = None + else: + mask = None + + # If the dimensions of the mask do not match the config file, ignore the mask + if (mask is not None) and (not mask.checkResolution(config.width, config.height)): + print("Mask resolution ({:d}, {:d}) does not match the image resolution ({:d}, {:d}). Ignoring the mask.".format( + mask.width, mask.height, config.width, config.height)) + mask = None # Init SkyFit - plate_tool = PlateTool(input_path, config, beginning_time=beginning_time, fps=cml_args.fps, \ + plate_tool = PlateTool(input_path, config, beginning_time=beginning_time, fps=cml_args.fps, \ gamma=cml_args.gamma, use_fr_files=cml_args.fr, geo_points_input=cml_args.geopoints, mask=mask, nobg=cml_args.nobg, peribg=cml_args.peribg, flipud=cml_args.flipud, - flatbiassub=cml_args.flatbiassub, platepar_file=platepar_file, fits_file=best_fits_file) + flatbiassub=cml_args.flatbiassub, platepar_file=platepar_file, fits_file_to_open=None) # Run the GUI app From a172a8ee6bc79fd37afa85dfa0b5bf1b0b53a9b1 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Sun, 3 Aug 2025 10:16:48 +0000 Subject: [PATCH 18/22] Support ~ expansion --- Utils/SkyFit2.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index 59898aae6..036231629 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -4642,6 +4642,7 @@ def loadPlatepar(self, update=False, platepar_file = None): if self.config.platepar_name in os.listdir(self.dir_path): initial_file = os.path.join(self.dir_path, self.config.platepar_name) + platepar_file = initial_file else: initial_file = self.dir_path @@ -7308,6 +7309,18 @@ def getCalstarsPath(captured_directory, config): return calstars_full_path +def expandUserList(path_list, file_type): + + target_path_list = [] + for path in path_list: + target = os.path.expanduser(path) + if not os.path.basename(target) == file_type and os.path.isdir(target): + target = os.path.join(target, file_type) + if os.path.isfile(target): + target_path_list.append(target) + else: + target_path_list.append(path) + return target_path_list def getPlateparFilePath(config): """ @@ -7630,6 +7643,12 @@ def getCapturedDirectoryObjects(config): mask_file = None + # expand the user for the list of cml_args.config + + cml_args.config = expandUserList(cml_args.config, ".config") + + print(cml_args.config) + # Parse the beginning time into a datetime object if cml_args.timebeg is not None: @@ -7661,6 +7680,15 @@ def getCapturedDirectoryObjects(config): config = None mask = None else: + cml_args.input_path = os.path.expanduser(cml_args.input_path) + config_path_list = [] + + if os.path.isfile(cml_args.mask) or cml_args.mask.endswith(".bmp"): + cml_args.mask = os.path.dirname(cml_args.mask) + + + + # If the state file was given, load the state if cml_args.input_path.endswith('.state'): From 7c422d2353ab10314d5bb6a1d9ef812ff310285c Mon Sep 17 00:00:00 2001 From: g7gpr Date: Sun, 3 Aug 2025 11:04:50 +0000 Subject: [PATCH 19/22] Code tidying --- Utils/SkyFit2.py | 163 +++++++++++++++++++++++++++-------------------- 1 file changed, 94 insertions(+), 69 deletions(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index 036231629..c59d06256 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -490,29 +490,8 @@ def __init__(self, input_path, config, config_path=os.getcwd(), beginning_time=N input_path, platepar_file, mask_path, mask, fits_file, star_count_max, config, config_path = handleNoInputPath( input_path=input_path) - fits_file_to_open = fits_file - if fits_file is not None: - print("Opening {} which has {} stars".format(fits_file, star_count_max)) - # This message box is just for debugging - if False: - message = "" - message += "Config directory {} \n".format(os.path.dirname(config_path)) - if platepar_file is None: - message += "No valid platepar found for this station\n" - else: - message += "Platepar directory {} \n".format(os.path.dirname(platepar_file)) - if mask_path is None: - message += "No mask given" - else: - message += "Mask directory {} \n".format(os.path.dirname(mask_path)) - message += "fits directory {} \n".format(os.path.basename(input_path)) - if fits_file_to_open is None and fits_file is not None: - fits_file_to_open = os.path.basename(fits_file) - message += "Opening {}, which has {} stars".format(os.path.basename(fits_file_to_open), star_count_max) - qmessagebox(title='Station {} has data available'.format(config.stationID), \ - message=message, - message_type="information") + print("Starting with file {}".format(fits_file_to_open)) # Mode of operation - skyfit for fitting astrometric plates, manualreduction for manual picking # of position on frames and photometry self.mode = 'skyfit' @@ -6594,7 +6573,7 @@ def furthestStar(self, miss_this_one=False, min_separation=15): miss_this_one: Return coordinates of a different star at random, but don't mark anything. min_separation: Minimum separation in pixels between stars. - Returns: + Return: (x,y) integers of the image location of the furthest star away from all other matched stars. """ @@ -6626,7 +6605,7 @@ def getMarkedStars(include_unsuitable=True): Keyword Arguments: include_unsuitable: [bool] Include stars marked as unsuitable. - Return + Return: marked_x: [list] of x coordinates. marked_y: [list] of y coordindates. @@ -6684,10 +6663,10 @@ def getVisibleUnmarkedStarsAndDistanceToMarked(marked_x_list, marked_y_list, min marked_x_list: [list] list of marked star x coordinates. marked_y_list: [list] list of marked star y coordinates. - Keyword Arguments + Keyword Arguments: min_separation: [int] Minimum seperation not be regarded as a double star. - Returns: + Return: unmarked_x_list: list of unmarked star x coordinates. unmarked_y_list: list of unmarked star x coordinates. dist_nearest_marked_list: distance of the nearest marked star for returned star coordinates. @@ -7263,13 +7242,14 @@ def handleLoginPath(login_path, number_of_fits=None): def getFITSMostStars(calstars_full_path): """ - Use the calstars file to find the fits file which exists in the respective directory with the most stars - Args: - calstars_full_path: [str] full path to the calstars file + Use the calstars file to find the fits file with the most stars. + + Argument: + calstars_full_path: [str] full path to the calstars file. Return: - best_fits_file:[str] full path to the fits file with the most stars - star_count_max[int]: number of stars on that fits file + best_fits_file:[str] full path to the fits file with the most stars. + star_count_max[int]: number of stars on that fits file. """ best_fits_file, star_count_max = None, 0 @@ -7290,14 +7270,16 @@ def getFITSMostStars(calstars_full_path): return best_fits_file, star_count_max def getCalstarsPath(captured_directory, config): + """ + Get the path to the calstars file. Arguments: - captured_directory: [str] RMS captured files directory - config: [config] RMS config instance + captured_directory: [str] RMS captured files directory. + config: [config] RMS config instance. Return: - calstars_full_path:[str] full path to the calstars file + calstars_full_path:[str] full path to the calstars file. """ dir_date = captured_directory.split("_")[1] @@ -7309,13 +7291,26 @@ def getCalstarsPath(captured_directory, config): return calstars_full_path -def expandUserList(path_list, file_type): +def expandUserList(path_list, file_name): + """ + Given a list of paths, expand each path and add file_name to the end if the path is a directory. + + Arguments: + path_list: [list] list of paths to be expanded. + file_name: [str] file name to be appended to each list item, if the list item is not this file already, and + the directory is a target. + + Return: + target_path_list: [list] returned list of paths, expanded, and with the file_type appended to each one. + """ target_path_list = [] + if path_list is None: + return path_list for path in path_list: target = os.path.expanduser(path) - if not os.path.basename(target) == file_type and os.path.isdir(target): - target = os.path.join(target, file_type) + if not os.path.basename(target) == file_name and os.path.isdir(target): + target = os.path.join(target, file_name) if os.path.isfile(target): target_path_list.append(target) else: @@ -7326,10 +7321,10 @@ def getPlateparFilePath(config): """ Arguments: - config: [config] RMS config instance + config: [config] RMS config instance. Return: - platepar_file_path: [str] full path to a platepar file else None + platepar_file_path: [str] full path to a platepar file else None. """ potential_platepar_path = os.path.join(os.getcwd(), config.platepar_name) @@ -7342,19 +7337,18 @@ def getPlateparFilePath(config): def handleNoInputPath(input_path=None): """ - If no input path is specified then determine some good parameters for starting the platetool - - The most recent captured files directory is chosen. - If this contains a calstar file, then the path to the fits file with the most stars is returned - If no calstar file, then none is returned - The default config and mask are also returned + If no input path is specified then check to see if a single station name was passed, i.e. au000d, + or if nothing was passed open a dialog box with a list of stations. Return: captured_directory_full_path: [str] full path to the most recent captured directory which contains at least one fits file platepar_file: [str] full path to the platepar file - mask_path: [str] full path to the mask kile + mask_path: [str] full path to the mask file + mask: [img] the mask best_fits_file: [str] full path to the fits file with the most stars + star_count_max: [int] the number of stars on the best_fits_file c: [config] rms config instance + cml_args.config_path: [path] path to the config file to use """ @@ -7371,7 +7365,6 @@ def handleNoInputPath(input_path=None): star_count_max = 0 # Load the config in ~/source/RMS/.config - if cml_args.config is None: c = cr.parse(os.path.expanduser(os.path.join(os.getcwd(), ".config"))) cml_args.config_path = os.path.expanduser(os.path.join(os.getcwd(), ".config")) @@ -7383,11 +7376,11 @@ def handleNoInputPath(input_path=None): if not c.stationID.startswith("XX"): # If we have fits, then populate if anyFits(verifyCapturedDirectories(getCapturedDirectoryObjects(c), c), c): - config_platepar_mask_dict[c.stationID] = [ - c, + config_platepar_mask_dict[c.stationID] = [ c, cml_args.config_path, getPlateparPath(os.getcwd), - getMaskPath(os.getcwd())] + getMaskPath(os.getcwd()) + ] # Are we in a multiple camera per username environment # Check to see if there is a XX at the start of the stationID or the ~/source/Stations directory exists @@ -7420,10 +7413,11 @@ def handleNoInputPath(input_path=None): getPlateparPath(potential_station_directory, multi_cam=True), getMaskPath(potential_station_directory, multi_cam=True)] + # Is the station passed in from the command line available in the dictionary of stations if station_from_command_line in config_platepar_mask_dict.keys(): selected_station = station_from_command_line else: - + # If not, the open a dialog box to select from known stations with data dialog = ComboDialog(config_platepar_mask_dict, window_title="Select station to calibrate", label="Stations available for calibration:") @@ -7434,6 +7428,7 @@ def handleNoInputPath(input_path=None): sys.exit() + # Start to set the variables to launch the platetool station_data = config_platepar_mask_dict[selected_station] c = station_data[0] cml_args.config_path = station_data[1] @@ -7447,29 +7442,28 @@ def handleNoInputPath(input_path=None): else: mask_path = cml_args.mask + # Use the correct config file to build the paths captured_directory_path = os.path.expanduser(os.path.join(str(c.data_dir), str(c.captured_dir))) station = c.stationID captured_directory_list = os.listdir(captured_directory_path) captured_directory_full_path = None + + # Check there are still some captured directories to use - there must be if not len(captured_directory_list): print("No captured directories found, cannot continue") - quit() + sys.exit() - for potential_captured_directory in sorted(captured_directory_list, reverse=True): + # Now work back from the newest directory + for potential_captured_directory in sorted(verifyCapturedDirectories(getCapturedDirectoryObjects(c), c), reverse=True): one_valid_fits = False captured_directory_full_path = os.path.join(captured_directory_path, potential_captured_directory) - if potential_captured_directory.startswith("{}_".format(station)) and os.path.isdir( - captured_directory_full_path): + potential_captured_directory = os.path.basename(potential_captured_directory) + if potential_captured_directory.startswith("{}_".format(station)) \ + and os.path.isdir(captured_directory_full_path): + # Maybe we have a calstar file and can get a best_fits_file best_fits_file, star_count_max = getFITSMostStars(getCalstarsPath(potential_captured_directory, c)) - - # Now check to see if the directory contains at least one valid FITS file - captured_directory_contents = os.listdir(captured_directory_full_path) - for file_name in captured_directory_contents: - if file_name.startswith("FF_{}".format(station.upper())) and file_name.endswith(".fits"): - one_valid_fits = True - break - if one_valid_fits: - break + if anyFits([captured_directory_full_path], c): + break mask = None cml_args.mask = os.path.dirname(mask_path) @@ -7501,7 +7495,21 @@ def getMaskPath(station_directory, multi_cam=False): def getPlateparPath(station_directory, multi_cam=False): - # Determine the platepar_path + """ + + Get the path to a file which can be read as a platepar. + + Arguments: + station_directory: the directory of a specific station i.e. AU000A + + Keyword Arguments: + multi_cam: True if this is a multi_cam environment, default false + + Return: + [str] path to the platepar if one exists, else None + """ + + if multi_cam: multi_cam_stations_directory = os.path.join(os.path.dirname(os.getcwd()), "Stations") full_path_potential_station_directory = os.path.join(multi_cam_stations_directory, station_directory) @@ -7529,15 +7537,32 @@ def getPlateparPath(station_directory, multi_cam=False): return None -def anyFits(directory_list, mc_c): +def anyFits(directory_list, config, prefix="FF", extension=".fits", delimiter="_", no_of_parts=6): + + """ + + Arguments: + directory_list: [list] list of directories to be searched + config: config file for stationID + + Keyword arguments + prefix: [str] optional default FF + extension: [str] optional default .fits + delimiter: [char] optional default _ + no_of_parts: [int] optional default 6 + + Returns: + [bool] True if any file matching is found + """ + # Does at least one of the captured_directories contain a fits file one_fits_file_found = False for captured_directory in directory_list: file_list = os.listdir(captured_directory) for test_file in file_list: - if test_file.startswith("FF_{}_".format(mc_c.stationID)) \ - and test_file.endswith(".fits") \ - and len(test_file.split("_")) == 6: + if test_file.startswith("{}{}{}{}".format(prefix,delimiter,config.stationID,delimiter)) \ + and test_file.endswith(extension) \ + and len(test_file.split("_")) == no_of_parts: # This is probably a fits file one_fits_file_found = True break From 784724aa94073ae17d0217b96449240e9ce320cf Mon Sep 17 00:00:00 2001 From: g7gpr Date: Sun, 3 Aug 2025 11:17:20 +0000 Subject: [PATCH 20/22] Doc strings --- Utils/SkyFit2.py | 51 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index c59d06256..75d724b7f 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -7253,10 +7253,14 @@ def getFITSMostStars(calstars_full_path): """ best_fits_file, star_count_max = None, 0 + + # Try and load in the calstars file if os.path.exists(calstars_full_path): captured_directory_full_path = os.path.dirname(calstars_full_path) calstars_file_name = os.path.basename(calstars_full_path) calstar_data, _ = CALSTARS.readCALSTARS(captured_directory_full_path, calstars_file_name) + + # Iterate through the calstar files, looking for the fits file which exists, and has the most stars for calstar_entry in calstar_data: fits_file = calstar_entry[0] star_count = len(calstar_entry[1]) @@ -7264,8 +7268,6 @@ def getFITSMostStars(calstars_full_path): if os.path.exists(os.path.join(captured_directory_full_path, fits_file)): best_fits_file = os.path.join(captured_directory_full_path, fits_file) star_count_max = star_count - print("Best fits file {} has {} stars".format(best_fits_file, star_count_max)) - return best_fits_file, star_count_max @@ -7479,7 +7481,19 @@ def handleNoInputPath(input_path=None): def getMaskPath(station_directory, multi_cam=False): - # Determine the mask path + + """ + Work out the path to the mask. + + Arguments: + station_directory: [str] The directory for this station, i.e. ~/source/RMS/ or ~/source/RMSStations + Keyword Arguments: + multi_cam: [bool] Optional, default false, if true work using a multicam linux file structure. + + Returns: + [str] path to the mask + """ + if multi_cam: multi_cam_stations_directory = os.path.join(os.path.dirname(os.getcwd()), "Stations") full_path_potential_station_directory = os.path.join(multi_cam_stations_directory, station_directory) @@ -7571,23 +7585,46 @@ def anyFits(directory_list, config, prefix="FF", extension=".fits", delimiter="_ return one_fits_file_found -def verifyCapturedDirectories(directory_list, mc_c): +def verifyCapturedDirectories(directory_list, config): + """ + Given a list of file system objects, and a config file, return of list of directories which match the style + of an RMS captured directory associated with the station in the config file. + + Args: + directory_list: [list] list of file system objects. + config: [config] RMS config instance. + + Return: + verified directory list: Only the directories which match the expected format, sorted ascending. + + """ + + # Filter these list of files in the captured directory for directories which match expected pattern - full_path_to_captured_files_directory = os.path.join(mc_c.data_dir, mc_c.captured_dir) + full_path_to_captured_files_directory = os.path.join(config.data_dir, config.captured_dir) verified_directory_list = [] for potential_directory in directory_list: if not os.path.isdir(os.path.join(full_path_to_captured_files_directory, potential_directory)): # If it is not a directory, continue continue # Check as much as we reasonably can that this is not some random directory saved here - if potential_directory.startswith("{}_".format(mc_c.stationID)) \ + if potential_directory.startswith("{}_".format(config.stationID)) \ and len(potential_directory.split("_")) == 4: verified_directory_list.append(os.path.join(full_path_to_captured_files_directory, potential_directory)) return sorted(verified_directory_list) def getCapturedDirectoryObjects(config): - # Get the list of objects in the captured directory, if this directory exists + """ + Get all the file system objects in captured directory pointed to by the config file. + + Arguments: + config: [config] RMS config instance. + + Return: + captured_directory_list sorted ascending + """ + full_path_to_captured_files_directory = os.path.join(config.data_dir, config.captured_dir) captured_directory_list = [] if os.path.exists(full_path_to_captured_files_directory): From 715064b944bac7dcb113472323a159a573b57ee5 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Mon, 25 Aug 2025 02:44:02 +0000 Subject: [PATCH 21/22] Handle None mask --- Utils/SkyFit2.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index 75d724b7f..d60b761c6 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -7745,8 +7745,9 @@ def getCapturedDirectoryObjects(config): cml_args.input_path = os.path.expanduser(cml_args.input_path) config_path_list = [] - if os.path.isfile(cml_args.mask) or cml_args.mask.endswith(".bmp"): - cml_args.mask = os.path.dirname(cml_args.mask) + if not cml_args.mask is None: + if os.path.isfile(cml_args.mask) or cml_args.mask.endswith(".bmp"): + cml_args.mask = os.path.dirname(cml_args.mask) From 025759ecb3f7ed2f5e5091b67114bf33f449c2a0 Mon Sep 17 00:00:00 2001 From: g7gpr Date: Mon, 25 Aug 2025 03:30:07 +0000 Subject: [PATCH 22/22] Allow mask, or path to mask to be specified --- Utils/SkyFit2.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/Utils/SkyFit2.py b/Utils/SkyFit2.py index d60b761c6..4dea8dd0d 100644 --- a/Utils/SkyFit2.py +++ b/Utils/SkyFit2.py @@ -6771,7 +6771,7 @@ def getVisibleUnmarkedStarsAndDistanceToMarked(marked_x_list, marked_y_list, min -def handleBZ2(bz2_path): +def handleBZ2(bz2_path, mask_path): """Passed a path to a bz2 file, unpack and prepare a working area for PlateTool, and launch. Arguments: @@ -6796,7 +6796,13 @@ def handleBZ2(bz2_path): print("No config file found in {}".format(bz2_basename)) print("Quitting") exit() - mask_path = os.path.join(working_dir, config.mask_file) + if mask_path is None: + mask_path = os.path.join(working_dir, config.mask_file) + else: + if os.path.isfile(os.path.expanduser(mask_path)): + mask_path = os.path.expanduser(mask_path) + else: + mask_path = os.path.join(os.path.expanduser(mask_path), config.mask_file) if os.path.exists(mask_path): mask = getMaskFile(".", config) @@ -7785,7 +7791,7 @@ def getCapturedDirectoryObjects(config): plate_tool.loadState(dir_path, state_name, beginning_time=beginning_time, mask=mask) elif cml_args.input_path.endswith('.bz2'): - handleBZ2(cml_args.input_path) + handleBZ2(cml_args.input_path, cml_args.mask) elif isLoginPath(cml_args.input_path): handleLoginPath(cml_args.input_path, cml_args.number_of_fits)