Skip to content

Commit 6505cd8

Browse files
committed
various zrw / gen-worker indexing fixes. defensive programming around worker indexes and zrws. mypy types. vibe-coded new test cases for test_mpi_runners_zrw_subnode_uneven
1 parent e0d4cba commit 6505cd8

7 files changed

Lines changed: 35 additions & 17 deletions

File tree

libensemble/libE.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ def libE_mpi(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE
385385
# Run manager or worker code, depending
386386
if is_manager:
387387
if resources is not None:
388-
resources.set_resource_manager(nworkers)
388+
resources.set_resource_manager(nworkers + 1)
389389
return libE_mpi_manager(
390390
mpi_comm, sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0
391391
)

libensemble/resources/resources.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ def __init__(self, libE_specs: dict, platform_info: dict = {}, top_level_dir: st
6767
"""Initiate a new resources object"""
6868
self.top_level_dir = top_level_dir or os.getcwd()
6969
self.glob_resources = GlobalResources(libE_specs=libE_specs, platform_info=platform_info, top_level_dir="")
70-
self.resource_manager = None # For Manager
71-
self.worker_resources = None # For Workers
70+
self.resource_manager: ResourceManager | None = None # For Manager
71+
self.worker_resources: WorkerResources | None = None # For Workers
7272

7373
def set_worker_resources(self, num_workers: int, workerid: int) -> None:
7474
"""Initiate the worker resources component of resources"""
@@ -167,8 +167,6 @@ def __init__(self, libE_specs: dict, platform_info: dict = {}, top_level_dir: st
167167
self.top_level_dir = top_level_dir
168168
self.dedicated_mode = libE_specs.get("dedicated_mode", False)
169169
self.zero_resource_workers = libE_specs.get("zero_resource_workers", [])
170-
if 0 not in self.zero_resource_workers:
171-
self.zero_resource_workers.append(0)
172170
self.num_resource_sets = libE_specs.get("num_resource_sets", None)
173171
self.enforce_worker_core_bounds = libE_specs.get("enforce_worker_core_bounds", False)
174172
self.gpus_per_group = libE_specs.get("gpus_per_group")

libensemble/resources/worker_resources.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def free_rsets(self, worker=None):
105105
def get_index_list(num_workers: int, num_rsets: int, zero_resource_list: list[int | Any]) -> list[int | None]:
106106
"""Map WorkerID to index into a nodelist"""
107107
index = 0
108-
index_list = []
108+
index_list: list[int | None] = []
109109
for i in range(0, num_workers):
110110
if i in zero_resource_list:
111111
index_list.append(None)
@@ -116,6 +116,11 @@ def get_index_list(num_workers: int, num_rsets: int, zero_resource_list: list[in
116116
else:
117117
index_list.append(index)
118118
index += 1
119+
120+
for i in zero_resource_list:
121+
if i >= num_workers:
122+
logger.warning(f"Worker index {i} from zero_resource_workers is out of range (0-{num_workers - 1})")
123+
119124
return index_list
120125

121126

@@ -364,7 +369,7 @@ def get_local_nodelist(
364369
local_nodelist = list(OrderedDict.fromkeys(team_list)) # Maintain order of nodes
365370
logger.debug(f"Worker's local_nodelist is {local_nodelist}")
366371

367-
slots = {}
372+
slots: dict[str, list[int]] = {}
368373
for node in local_nodelist:
369374
slots[node] = []
370375

libensemble/tests/functionality_tests/test_mpi_gpu_settings.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
6767
if __name__ == "__main__":
6868
nworkers, is_manager, libE_specs, _ = parse_args()
69-
libE_specs["num_resource_sets"] = nworkers - 1 # Persistent gen does not need resources
69+
libE_specs["num_resource_sets"] = nworkers # Persistent gen does not need resources
7070
libE_specs["use_workflow_dir"] = True # Only a place for Open MPI machinefiles
7171

7272
if libE_specs["comms"] == "tcp":
@@ -88,8 +88,8 @@
8888
"persis_in": ["f", "x", "sim_id"],
8989
"out": [("priority", float), ("resource_sets", int), ("x", float, n)],
9090
"user": {
91-
"initial_batch_size": nworkers - 1,
92-
"max_resource_sets": nworkers - 1, # Any sim created can req. 1 worker up to all.
91+
"initial_batch_size": nworkers,
92+
"max_resource_sets": nworkers, # Any sim created can req. 1 worker up to all.
9393
"lb": np.array([-3, -2]),
9494
"ub": np.array([3, 2]),
9595
},

libensemble/tests/functionality_tests/test_mpi_gpu_settings_env.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
4444
if __name__ == "__main__":
4545
nworkers, is_manager, libE_specs, _ = parse_args()
46-
libE_specs["num_resource_sets"] = nworkers - 1 # Persistent gen does not need resources
46+
libE_specs["num_resource_sets"] = nworkers # Persistent gen does not need resources
4747
libE_specs["use_workflow_dir"] = True # Only a place for Open MPI machinefiles
4848

4949
# Optional for organization of output scripts
@@ -70,8 +70,8 @@
7070
"persis_in": ["f", "x", "sim_id"],
7171
"out": [("priority", float), ("resource_sets", int), ("x", float, n)],
7272
"user": {
73-
"initial_batch_size": nworkers - 1,
74-
"max_resource_sets": nworkers - 1, # Any sim created can req. 1 worker up to all.
73+
"initial_batch_size": nworkers,
74+
"max_resource_sets": nworkers, # Any sim created can req. 1 worker up to all.
7575
"lb": np.array([-3, -2]),
7676
"ub": np.array([3, 2]),
7777
},

libensemble/tests/functionality_tests/test_mpi_runners_zrw_subnode_uneven.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,26 +139,39 @@
139139
exp_srun.append(srun_p1 + str(nodename) + srun_p2 + str(ntasks) + srun_p3 + str(ntasks) + srun_p4)
140140

141141
test_list = test_list_base
142-
exp_list = exp_srun
142+
exp_list_dynamic = exp_srun.copy()
143+
if nworkers == 5:
144+
# Iteration 0 (Dynamic): Workers 1, 2 -> node-2; 3, 4, 5 -> node-1
145+
n1 = srun_p1 + "node-1" + srun_p2 + "5" + srun_p3 + "5" + srun_p4
146+
n2 = srun_p1 + "node-2" + srun_p2 + "8" + srun_p3 + "8" + srun_p4
147+
exp_list_dynamic = [n2, n2, n1, n1, n1]
148+
# Iteration 1 (Static): Worker 1 is gen. Workers 2, 3 -> node-1; 4, 5 -> node-2
149+
exp_list_static = [n1, n1, n2, n2]
150+
143151
sim_specs["user"] = {
144152
"tests": test_list,
145-
"expect": exp_list,
146153
"persis_gens": n_gens,
147154
}
148155

149156
iterations = 2
150157
for prob_id in range(iterations):
151158
if prob_id == 0:
152159
# Uses dynamic scheduler - will find node 2 slots first (as fewer)
153-
libE_specs["num_resource_sets"] = nworkers - 1 # Any worker can be the gen
154-
sim_specs["user"]["offset_for_scheduler"] = True # Changes expected values
160+
libE_specs["gen_on_worker"] = False
161+
libE_specs["num_resource_sets"] = nworkers
162+
sim_specs["user"]["expect"] = exp_list_dynamic
163+
sim_specs["user"]["offset_for_scheduler"] = False
164+
sim_specs["user"]["persis_gens"] = 0
155165
persis_info = add_unique_random_streams({}, nworkers + 1)
156166

157167
else:
158168
# Uses static scheduler - will find node 1 slots first
169+
libE_specs["gen_on_worker"] = True
170+
sim_specs["user"]["expect"] = exp_list_static
159171
del libE_specs["num_resource_sets"]
160172
libE_specs["zero_resource_workers"] = [1] # Gen must be worker 1
161173
sim_specs["user"]["offset_for_scheduler"] = False
174+
sim_specs["user"]["persis_gens"] = 1
162175
persis_info = add_unique_random_streams({}, nworkers + 1)
163176

164177
# Perform the run

libensemble/tests/functionality_tests/test_persistent_uniform_gen_decides_stop.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
n = 2
3737
init_batch_size = nworkers - ngens
3838

39+
libE_specs["gen_on_worker"] = True
40+
3941
if ngens >= nworkers:
4042
sys.exit("The number of generators must be less than the number of workers -- aborting...")
4143

0 commit comments

Comments
 (0)