diff --git a/doc/changelog.md b/doc/changelog.md index a1bca3ade..e81226c3f 100644 --- a/doc/changelog.md +++ b/doc/changelog.md @@ -28,6 +28,11 @@ Description Detailed Notes +- Enable control over monitoring of Models launched with `experiment.start()` by + adding an optional boolean argument determining whether to monitor the particular + model or not. The argument is set to True by default, so no changes are needed for + the default behavior of monitoring all Models launched. + ([SmartSim-PR788](https://github.com/CrayLabs/SmartSim/pull/788)) - Updated tests which would create experiment in root directory, patched tests which would not work on some Slurm systems, added an environment variable to control how long to wait for Redis server to be available. diff --git a/smartsim/_core/control/controller.py b/smartsim/_core/control/controller.py index 51b69b13a..90e15c89b 100644 --- a/smartsim/_core/control/controller.py +++ b/smartsim/_core/control/controller.py @@ -106,6 +106,7 @@ def start( manifest: Manifest, block: bool = True, kill_on_interrupt: bool = True, + monitor: bool = True, ) -> None: """Start the passed SmartSim entities @@ -121,7 +122,7 @@ def start( SignalInterceptionStack.get(signal.SIGINT).push_unique( self._jobs.signal_interrupt ) - self._launch(exp_name, exp_path, manifest) + self._launch(exp_name, exp_path, manifest, monitor) # start the job manager thread if not already started if not self._jobs.actively_monitoring: @@ -155,7 +156,7 @@ def poll( :param kill_on_interrupt: flag for killing jobs when SIGINT is received """ self._jobs.kill_on_interrupt = kill_on_interrupt - to_monitor = self._jobs.jobs + to_monitor = self._jobs.monitor_jobs while len(to_monitor) > 0: time.sleep(interval) @@ -370,7 +371,9 @@ def symlink_output_files( "Symlinking files failed." ) - def _launch(self, _exp_name: str, exp_path: str, manifest: Manifest) -> None: + def _launch( + self, _exp_name: str, exp_path: str, manifest: Manifest, monitor: bool = True + ) -> None: """Main launching function of the controller Orchestrators are always launched first so that the @@ -379,6 +382,7 @@ def _launch(self, _exp_name: str, exp_path: str, manifest: Manifest) -> None: :param exp_name: The name of the launching experiment :param exp_path: path to location of ``Experiment`` directory if generated :param manifest: Manifest of deployables to launch + :param monitor: boolean to signal whether to monitor deployables """ # Create a unique timestamp for this launch to ensure unique metadata @@ -454,7 +458,7 @@ def _launch(self, _exp_name: str, exp_path: str, manifest: Manifest) -> None: # launch and symlink steps for step, entity in steps: - self._launch_step(step, entity) + self._launch_step(step, entity, monitor) self.symlink_output_files(step, entity) # symlink substeps to maintain directory structure @@ -533,11 +537,13 @@ def _launch_step( self, job_step: Step, entity: SmartSimEntity | EntitySequence[SmartSimEntity], + monitor: bool = True, ) -> None: """Use the launcher to launch a job step :param job_step: a job step instance :param entity: entity instance + :param monitor: boolean determining whether to monitor job :raises SmartSimError: if launch fails """ # attempt to retrieve entity name in JobManager.completed @@ -582,10 +588,10 @@ def _launch_step( if self._jobs.query_restart(entity.name): logger.debug(f"Restarting {entity.name}") - self._jobs.restart_job(job_step.name, job_id, entity.name, is_task) + self._jobs.restart_job(job_step.name, job_id, entity.name, is_task, monitor) else: logger.debug(f"Launching {entity.name}") - self._jobs.add_job(job_step.name, job_id, entity, is_task) + self._jobs.add_job(job_step.name, job_id, entity, is_task, monitor) def _create_batch_job_step( self, diff --git a/smartsim/_core/control/jobmanager.py b/smartsim/_core/control/jobmanager.py index d253c02c8..9efcf7862 100644 --- a/smartsim/_core/control/jobmanager.py +++ b/smartsim/_core/control/jobmanager.py @@ -65,6 +65,7 @@ def __init__(self, lock: RLock, launcher: Launcher | None = None) -> None: # active jobs self.jobs: dict[str, Job] = {} + self.monitor_jobs: dict[str, Job] = {} self.db_jobs: dict[str, Job] = {} # completed jobs @@ -132,6 +133,8 @@ def move_to_completed(self, job: Job) -> None: del self.db_jobs[job.ename] elif job.ename in self.jobs: del self.jobs[job.ename] + if job.ename in self.monitor_jobs: + del self.monitor_jobs[job.ename] def __getitem__(self, entity_name: str) -> Job: """Return the job associated with the name of the entity @@ -165,12 +168,14 @@ def add_job( job_id: str | None, entity: SmartSimEntity | EntitySequence[SmartSimEntity], is_task: bool = True, + monitor: bool = True, ) -> None: """Add a job to the job manager which holds specific jobs by type. :param job_name: name of the job step :param job_id: job step id created by launcher :param entity: entity that was launched on job step + :param monitor: boolean to monitor job :param is_task: process monitored by TaskManager (True) or the WLM (True) """ launcher = str(self._launcher) @@ -180,6 +185,8 @@ def add_job( self.db_jobs[entity.name] = job else: self.jobs[entity.name] = job + if monitor: + self.monitor_jobs[entity.name] = job def is_finished(self, entity: SmartSimEntity) -> bool: """Detect if a job has completed @@ -264,6 +271,7 @@ def restart_job( job_id: str | None, entity_name: str, is_task: bool = True, + monitor: bool = True, ) -> None: """Function to reset a job to record history and be ready to launch again. @@ -272,6 +280,7 @@ def restart_job( :param job_id: new job id :param entity_name: name of the entity of the job :param is_task: process monitored by TaskManager (True) or the WLM (True) + :param monitor: boolean to monitor job """ with self._lock: @@ -283,6 +292,8 @@ def restart_job( self.db_jobs[entity_name] = job else: self.jobs[entity_name] = job + if monitor: + self.monitor_jobs[entity_name] = job def get_db_host_addresses(self) -> dict[str, list[str]]: """Retrieve the list of hosts for the database diff --git a/smartsim/experiment.py b/smartsim/experiment.py index e04ff5fe7..b687529fc 100644 --- a/smartsim/experiment.py +++ b/smartsim/experiment.py @@ -165,6 +165,7 @@ def start( block: bool = True, summary: bool = False, kill_on_interrupt: bool = True, + monitor: bool = True, ) -> None: """Start passed instances using Experiment launcher @@ -205,11 +206,16 @@ def start( that all jobs launched by this experiment will be killed, and the zombie processes will need to be manually killed. + If `monitor=True`, all the jobs being started will be monitored + by the Controller. If `monitor=False`, the jobs will not be + monitored, meaning that their status will not be reported. + :param block: block execution until all non-database jobs are finished :param summary: print a launch summary prior to launch :param kill_on_interrupt: flag for killing jobs when ^C (SIGINT) signal is received. + :param monitor: monitor the jobs being started """ start_manifest = Manifest(*args) self._create_entity_dir(start_manifest) @@ -222,6 +228,7 @@ def start( manifest=start_manifest, block=block, kill_on_interrupt=kill_on_interrupt, + monitor=monitor, ) except SmartSimError as e: logger.error(e) diff --git a/smartsim/settings/palsSettings.py b/smartsim/settings/palsSettings.py index 061ce2201..14e909c34 100644 --- a/smartsim/settings/palsSettings.py +++ b/smartsim/settings/palsSettings.py @@ -158,6 +158,16 @@ def set_broadcast(self, dest_path: str | None = None) -> None: ) self.run_args["transfer"] = None + def set_launcher_args( + self, arguments: t.Dict[str, t.Union[int, str, float, None]] + ) -> None: + """Set any other task launcher argument + + :param arguments: dictionary with string name and value + """ + for name, value in arguments.items(): + self.run_args[name] = value + def set_walltime(self, walltime: str) -> None: """Set the maximum number of seconds that a job will run diff --git a/tests/test_model.py b/tests/test_model.py index 882461c84..9e60be575 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -93,12 +93,18 @@ def _monkeypatch_exp_controller(exp): entity_steps = [] def start_wo_job_manager( - self, exp_name, exp_path, manifest, block=True, kill_on_interrupt=True + self, + exp_name, + exp_path, + manifest, + block=True, + kill_on_interrupt=True, + monitor=True, ): self._launch(exp_name, exp_path, manifest) return None - def launch_step_nop(self, step, entity): + def launch_step_nop(self, step, entity, monitor): entity_steps.append((step, entity)) monkeypatch.setattr( diff --git a/tests/test_pals_settings.py b/tests/test_pals_settings.py index 9d6c87b3c..94d5b1b3a 100644 --- a/tests/test_pals_settings.py +++ b/tests/test_pals_settings.py @@ -61,6 +61,12 @@ # func(None) +def test_set_launcher_args(): + settings = PalsMpiexecSettings(default_exe, **default_kwargs) + settings.set_launcher_args({"mem-bind": "none", "line-buffer": ""}) + assert settings.format_run_args() == ["--mem-bind", "none", "--line-buffer"] + + def test_affinity_script(): settings = PalsMpiexecSettings(default_exe, **default_kwargs) settings.set_gpu_affinity_script("/path/to/set_affinity_gpu.sh", 1, 2)