Skip to content

Commit a40e94e

Browse files
authored
Add files via upload
1 parent 267bbb5 commit a40e94e

1 file changed

Lines changed: 157 additions & 129 deletions

File tree

multioptpy/Wrapper/mapper.py

Lines changed: 157 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,9 +1209,8 @@ def _build_candidates(
12091209
sub_symbols = [symbols[i] for i in atom_indices]
12101210
m = len(atom_indices)
12111211

1212-
dmat = distance_matrix(sub_coords)
12131212
ii, jj = np.triu_indices(m, k=1)
1214-
dists = dmat[ii, jj]
1213+
dists = pdist(sub_coords)
12151214

12161215
# ── Distance window filter ────────────────────────────────────────
12171216
dist_mask = (dists >= self.dist_lower_ang) & (dists <= self.dist_upper_ang)
@@ -2300,92 +2299,119 @@ def run(self) -> None:
23002299

23012300
def _run_sequential(self, history_log: str, priority_log: str) -> None:
23022301
"""Sequential execution loop (n_parallel == 1).
2303-
2302+
23042303
Extracted from run() so that the parallel and sequential paths are
23052304
symmetric and independently testable. Runs until one of:
2306-
2305+
23072306
* queue exhausted (all pairs explored),
23082307
* ``max_iterations`` reached, or
23092308
* ``stop.txt`` sentinel file detected.
2309+
2310+
Executor lifetime
2311+
-----------------
2312+
A single ProcessPoolExecutor(max_workers=1, max_tasks_per_child=1) is
2313+
created here before the loop and torn down in the finally clause.
2314+
Previously _run_autots created and destroyed an executor on every
2315+
iteration, incurring one full spawn/join cycle per AutoTS call.
2316+
Hoisting the executor out of the loop removes that overhead while
2317+
preserving the CWD isolation guarantee: max_tasks_per_child=1 ensures
2318+
that each task still runs in a freshly spawned child process, so
2319+
os.chdir() inside _autots_worker cannot bleed across iterations.
23102320
"""
2311-
while True:
2312-
# ── stop.txt sentinel file ────────────────────────────────────
2313-
if os.path.isfile(os.path.join(self.output_dir, "stop.txt")):
2314-
logger.info("stop.txt detected in output_dir. Stopping.")
2315-
break
2316-
2317-
# ── Iteration limit ───────────────────────────────────────────
2318-
if self.max_iterations > 0 and self._iteration >= self.max_iterations:
2319-
logger.info("Reached max_iterations (%d). Stopping.", self.max_iterations)
2320-
break
2321-
2322-
# ── Re-weight queue after a new lowest-energy node is found ───
2323-
self.queue.refresh_priorities(self.graph.reference_energy())
2324-
task = self.queue.pop()
2325-
2326-
if task is not None:
2327-
# Parse the AFIR key once for both the skip check and record()
2328-
gamma_sign, atom_i, atom_j = self._parse_afir_task_key(task)
2329-
2330-
# Avoid duplicates when the queue was rebuilt on resume but
2331-
# explored_pairs log retains historical records
2332-
if self.explored_log.has(task.node_id, atom_i, atom_j, gamma_sign):
2333-
logger.debug(
2334-
"Skipping queued task (EQ%06d, %d-%d, %s): already explored.",
2335-
task.node_id, atom_i, atom_j, gamma_sign,
2336-
)
2321+
# ADDED: create the executor once for the entire sequential run.
2322+
# max_tasks_per_child=1 keeps per-task process isolation (os.chdir safety).
2323+
executor = ProcessPoolExecutor(
2324+
max_workers=1,
2325+
mp_context=self._mp_ctx,
2326+
max_tasks_per_child=1,
2327+
)
2328+
try: # ADDED: wrapping try/finally to guarantee executor.shutdown()
2329+
while True:
2330+
# ── stop.txt sentinel file ────────────────────────────────────
2331+
if os.path.isfile(os.path.join(self.output_dir, "stop.txt")):
2332+
logger.info("stop.txt detected in output_dir. Stopping.")
2333+
break
2334+
2335+
# ── Iteration limit ───────────────────────────────────────────
2336+
if self.max_iterations > 0 and self._iteration >= self.max_iterations:
2337+
logger.info("Reached max_iterations (%d). Stopping.", self.max_iterations)
2338+
break
2339+
2340+
# ── Re-weight queue after a new lowest-energy node is found ───
2341+
self.queue.refresh_priorities(self.graph.reference_energy())
2342+
task = self.queue.pop()
2343+
2344+
if task is not None:
2345+
# Parse the AFIR key once for both the skip check and record()
2346+
gamma_sign, atom_i, atom_j = self._parse_afir_task_key(task)
2347+
2348+
# Avoid duplicates when the queue was rebuilt on resume but
2349+
# explored_pairs log retains historical records
2350+
if self.explored_log.has(task.node_id, atom_i, atom_j, gamma_sign):
2351+
logger.debug(
2352+
"Skipping queued task (EQ%06d, %d-%d, %s): already explored.",
2353+
task.node_id, atom_i, atom_j, gamma_sign,
2354+
)
2355+
self.queue.release((task.node_id, tuple(task.afir_params)))
2356+
continue
2357+
2358+
else:
2359+
# Queue is empty: deterministically rescan all nodes for
2360+
# unexplored pairs
2361+
logger.info("Queue empty; re-scanning all nodes for unexplored pairs.")
2362+
for node in self.graph.all_nodes():
2363+
self._enqueue_perturbations(node, force_add=True)
2364+
if len(self.queue) == 0:
2365+
logger.info("All candidate (EQ, pair) combinations exhausted. Stopping.")
2366+
break
2367+
continue
2368+
2369+
# ── Task execution ────────────────────────────────────────────
2370+
self._iteration += 1
2371+
self.graph.last_iteration = self._iteration
2372+
self._append_history(history_log, self._iteration, task)
2373+
2374+
run_dir = self._make_run_dir(task)
2375+
try:
2376+
profile_dirs = self._run_autots(task, run_dir, executor) # CHANGED: pass executor
2377+
except Exception as exc:
2378+
logger.error("AutoTS failed for run %s: %s", run_dir, exc)
2379+
# Do not call explored_log.record() on failure.
2380+
# _in_flight (set by pop()) prevents duplicates within the
2381+
# current run. Omitting record() allows transient failures
2382+
# (OOM, segfault, etc.) to be retried on resume.
23372383
self.queue.release((task.node_id, tuple(task.afir_params)))
2384+
self._finalize_iteration(run_dir, task, "FAILED", [], priority_log)
23382385
continue
2339-
2340-
else:
2341-
# Queue is empty: deterministically rescan all nodes for
2342-
# unexplored pairs
2343-
logger.info("Queue empty; re-scanning all nodes for unexplored pairs.")
2344-
for node in self.graph.all_nodes():
2345-
self._enqueue_perturbations(node, force_add=True)
2346-
if len(self.queue) == 0:
2347-
logger.info("All candidate (EQ, pair) combinations exhausted. Stopping.")
2348-
break
2349-
continue
2350-
2351-
# ── Task execution ────────────────────────────────────────────
2352-
self._iteration += 1
2353-
self.graph.last_iteration = self._iteration
2354-
self._append_history(history_log, self._iteration, task)
2355-
2356-
run_dir = self._make_run_dir(task)
2357-
try:
2358-
profile_dirs = self._run_autots(task, run_dir)
2359-
except Exception as exc:
2360-
logger.error("AutoTS failed for run %s: %s", run_dir, exc)
2361-
# Do not call explored_log.record() on failure.
2362-
# _in_flight (set by pop()) prevents duplicates within the
2363-
# current run. Omitting record() allows transient failures
2364-
# (OOM, segfault, etc.) to be retried on resume.
2386+
2387+
# Persist the exploration record only after confirming success
2388+
self.explored_log.record(task.node_id, atom_i, atom_j, gamma_sign)
2389+
# Call release() after record() so that is_submitted() returns
2390+
# True throughout the entire [pop() → record() → release()] window
23652391
self.queue.release((task.node_id, tuple(task.afir_params)))
2366-
self._finalize_iteration(run_dir, task, "FAILED", [], priority_log)
2367-
continue
2368-
2369-
# Persist the exploration record only after confirming success
2370-
self.explored_log.record(task.node_id, atom_i, atom_j, gamma_sign)
2371-
# Call release() after record() so that is_submitted() returns
2372-
# True throughout the entire [pop() → record() → release()] window
2373-
self.queue.release((task.node_id, tuple(task.afir_params)))
2374-
2375-
logger.info(
2376-
"Iter %06d: _run_autots returned %d profile director%s.",
2377-
self._iteration, len(profile_dirs),
2378-
"y" if len(profile_dirs) == 1 else "ies",
2379-
)
2380-
for pdir in profile_dirs:
2381-
self._process_profile(pdir, run_dir)
2382-
2383-
# Notify queue of updated graph (required by RCMCQueue)
2384-
if hasattr(self.queue, "set_graph"):
2385-
self.queue.set_graph(self.graph)
2386-
2387-
self._finalize_iteration(run_dir, task, "DONE", profile_dirs, priority_log)
2388-
2392+
2393+
logger.info(
2394+
"Iter %06d: _run_autots returned %d profile director%s.",
2395+
self._iteration, len(profile_dirs),
2396+
"y" if len(profile_dirs) == 1 else "ies",
2397+
)
2398+
for pdir in profile_dirs:
2399+
self._process_profile(pdir, run_dir)
2400+
2401+
# Notify queue of updated graph (required by RCMCQueue)
2402+
if hasattr(self.queue, "set_graph"):
2403+
self.queue.set_graph(self.graph)
2404+
2405+
self._finalize_iteration(run_dir, task, "DONE", profile_dirs, priority_log)
2406+
2407+
finally:
2408+
# ADDED: shut down the shared executor once the loop exits for any reason
2409+
# (exhausted, max_iterations, stop.txt, or unhandled exception).
2410+
# wait=True performs a clean join on any still-running worker, which is
2411+
# always safe here because the loop only reaches finally after the
2412+
# current task has either completed or been force-killed in _run_autots.
2413+
executor.shutdown(wait=True)
2414+
23892415
def _append_history(
23902416
self,
23912417
path: str,
@@ -3013,26 +3039,31 @@ def _make_autots_config(self, task: ExplorationTask, workspace: str) -> dict:
30133039
config["run_step4"] = True
30143040
return config
30153041

3016-
def _run_autots(self, task: ExplorationTask, run_dir: str) -> list[str]:
3042+
def _run_autots(
3043+
self,
3044+
task: ExplorationTask,
3045+
run_dir: str,
3046+
executor: ProcessPoolExecutor, # ADDED: caller-owned executor passed in
3047+
) -> list[str]:
30173048
"""Run AutoTSWorkflow in an isolated spawned subprocess.
3018-
3019-
Uses ProcessPoolExecutor(max_workers=1, max_tasks_per_child=1), the
3020-
same mechanism as _run_batch_parallel, so that crash detection,
3021-
timeout handling, and CWD isolation are identical between the
3022-
sequential and parallel paths.
3023-
3049+
3050+
The executor is owned and managed by the caller (_run_sequential).
3051+
This method only submits one future and waits for its result, so the
3052+
executor can be reused across iterations without being recreated each time.
3053+
30243054
Crash detection:
30253055
ProcessPoolExecutor automatically captures any exception raised
30263056
inside _autots_worker and re-raises it via future.result(), so no
30273057
manual polling loop or Queue is required.
3028-
3058+
30293059
Timeout:
30303060
future.result(timeout=worker_timeout_s) raises concurrent.futures.
30313061
TimeoutError when the limit is exceeded. The handler cancels
3032-
pending work, force-kills the live worker via executor._processes
3033-
(same approach as _run_batch_parallel), then shuts the executor
3034-
down with wait=False to avoid blocking on a hung binary.
3035-
3062+
pending work and force-kills the live worker via executor._processes
3063+
(same approach as _run_parallel_rolling), then raises RuntimeError.
3064+
Executor shutdown is left to the caller so that a single timeout does
3065+
not tear down the shared executor prematurely.
3066+
30363067
Return value:
30373068
_autots_worker returns a sorted list of Step-4 profile directories;
30383069
future.result() delivers that list directly to the caller.
@@ -3044,7 +3075,7 @@ def _run_autots(self, task: ExplorationTask, run_dir: str) -> list[str]:
30443075
)
30453076
workspace = os.path.join(run_dir, "autots_workspace")
30463077
config = self._make_autots_config(task, workspace)
3047-
3078+
30483079
# Written before the workflow starts so it survives a crash.
30493080
try:
30503081
with open(
@@ -3053,44 +3084,40 @@ def _run_autots(self, task: ExplorationTask, run_dir: str) -> list[str]:
30533084
json.dump(config, fh, indent=2, default=str)
30543085
except Exception as exc:
30553086
logger.warning("_run_autots: could not write config_used.json: %s", exc)
3056-
3057-
# max_tasks_per_child=1 guarantees a fresh process for this task,
3058-
# isolating os.chdir() inside _autots_worker from the parent process.
3059-
executor = ProcessPoolExecutor(
3060-
max_workers=1,
3061-
mp_context=self._mp_ctx,
3062-
max_tasks_per_child=1,
3063-
)
3064-
timed_out = False
3087+
3088+
# REMOVED: executor creation block.
3089+
# ProcessPoolExecutor is now created once in _run_sequential and passed in,
3090+
# eliminating the per-iteration spawn/join overhead.
3091+
# max_tasks_per_child=1 is still set on the shared executor (see
3092+
# _run_sequential) so each task still runs in a fresh child process,
3093+
# keeping os.chdir() isolation intact.
3094+
3095+
future = executor.submit(_autots_worker, config, run_dir, workspace)
30653096
try:
3066-
future = executor.submit(_autots_worker, config, run_dir, workspace)
3067-
try:
3068-
return future.result(timeout=self.worker_timeout_s)
3069-
except (TimeoutError, FuturesTimeoutError):
3070-
timed_out = True
3071-
logger.error(
3072-
"_run_autots: worker exceeded hard timeout of %ds — "
3073-
"force-killing worker process.",
3074-
self.worker_timeout_s,
3075-
)
3076-
future.cancel()
3077-
# No public API exposes individual worker handles; use the
3078-
# private _processes dict (same pattern as _run_batch_parallel).
3079-
worker_procs = getattr(executor, "_processes", {})
3080-
for pid, proc in list(worker_procs.items()):
3081-
if proc.is_alive():
3082-
logger.warning(
3083-
"_run_autots: force-killing worker pid=%d", pid
3084-
)
3085-
proc.kill()
3086-
raise RuntimeError(
3087-
f"_run_autots: worker exceeded hard timeout of "
3088-
f"{self.worker_timeout_s}s."
3089-
)
3090-
finally:
3091-
# wait=False after a force-kill avoids blocking on a hung binary;
3092-
# wait=True in the normal path ensures a clean join.
3093-
executor.shutdown(wait=not timed_out, cancel_futures=timed_out)
3097+
return future.result(timeout=self.worker_timeout_s)
3098+
except (TimeoutError, FuturesTimeoutError):
3099+
logger.error(
3100+
"_run_autots: worker exceeded hard timeout of %ds — "
3101+
"force-killing worker process.",
3102+
self.worker_timeout_s,
3103+
)
3104+
future.cancel()
3105+
# No public API exposes individual worker handles; use the
3106+
# private _processes dict (same pattern as _run_parallel_rolling).
3107+
worker_procs = getattr(executor, "_processes", {})
3108+
for pid, proc in list(worker_procs.items()):
3109+
if proc.is_alive():
3110+
logger.warning(
3111+
"_run_autots: force-killing worker pid=%d", pid
3112+
)
3113+
proc.kill()
3114+
raise RuntimeError(
3115+
f"_run_autots: worker exceeded hard timeout of "
3116+
f"{self.worker_timeout_s}s."
3117+
)
3118+
# REMOVED: finally block with executor.shutdown().
3119+
# Shutdown is now the caller's responsibility (see _run_sequential finally clause).
3120+
30943121
# ------------------------------------------------------------------ #
30953122
# Energy back-fill #
30963123
# ------------------------------------------------------------------ #
@@ -3420,6 +3447,7 @@ def _enqueue_perturbations(self, node: EQNode, force_add: bool = False) -> None:
34203447
"graph has only 1 node — exclusion suppressed.",
34213448
node.node_id,
34223449
)
3450+
force_add = True
34233451
elif node.node_id in self.excluded_node_ids:
34243452
logger.debug(
34253453
"_enqueue_perturbations: EQ%d is in excluded_node_ids and has "

0 commit comments

Comments
 (0)