Skip to content

Commit a94a6c0

Browse files
committed
wip: launch jobs to instances
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
1 parent 9ea1e92 commit a94a6c0

13 files changed

Lines changed: 619 additions & 21 deletions

File tree

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99

1010
This library includes a cluster agnostic language to setup a job (one unit of work in a jobspec).
1111
It is a transformational layer, or a simple language that converts steps needed to prepare a job
12-
for a specific clusters scheduler. We are currently prototyping off of the Flux JobSpec, and intent
13-
to derive some variant between that and something more. It is JobSpec... the next generation! 🚀️
12+
for a specific clusters scheduler. It also provides a means to submit to a hierarchy.
13+
We are currently prototyping off of the Flux JobSpec, and intend to derive some variant
14+
between that and something more. It is JobSpec... the next generation! 🚀️
1415

1516
⭐️ [Documentation](https://compspec.github.io/jobspec) ⭐️
1617

docs/docs/commands.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,29 @@ If run from python, the function "satisfied" would return False and the broker c
3737
is expected to be in JSON and our jobspec files are in yaml, so we can throw them into the same examples directory without issue.
3838

3939

40+
### Submit to a Hierarchy
41+
42+
#### 1. Start Flux
43+
44+
Start up the development environment to find yourself in a container with flux. Start a test instance:
45+
46+
```bash
47+
flux start --test-size=4
48+
```
49+
50+
Note that we have 4 faux nodes and 40 faux cores.
51+
52+
```bash
53+
flux resource list
54+
```
55+
```console
56+
STATE NNODES NCORES NGPUS NODELIST
57+
free 4 40 0 194c2b9f4f3c,194c2b9f4f3c,194c2b9f4f3c,194c2b9f4f3c
58+
allocated 0 0 0
59+
down 0 0 0
60+
```
61+
62+
4063
### Run
4164

4265
#### 1. Start Flux

examples/launch/hierarchy.yaml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
version: 2
2+
3+
# The entrypoint is where to start the hierarchy.
4+
entrypoint: level-1
5+
6+
# Groups are templates for a level in the hierarchy.
7+
groups:
8+
# Each of these are intermediate groups - they only exist to launch children.
9+
- name: level-1
10+
resources: level1
11+
launch:
12+
- group: level-2
13+
count: 2 # Launch 2 instances of level-2
14+
15+
- name: level-2
16+
resources: level2
17+
launch:
18+
- group: level-3
19+
count: 2 # Each level-2 instance launches 2 instances of level-3
20+
21+
- name: level-3
22+
resources: level3
23+
launch:
24+
- group: level-4
25+
count: 2 # Each level-3 instance launches 2 instances of level-4
26+
27+
# This is a leaf group. No launch section == start a broker!
28+
- name: level-4
29+
resources: level4
30+
31+
# These are nested resources. If we have 8 at the top, we have to split that up
32+
resources:
33+
level1: # Consumed by the single level-1 broker
34+
count: 8
35+
type: core
36+
level2: # Consumed by each of the two level-2 brokers
37+
count: 4
38+
type: core
39+
level3: # Consumed by each of the four level-3 brokers
40+
count: 2
41+
type: core
42+
level4: # Consumed by each of the eight level-4 leaf brokers
43+
count: 1
44+
type: core

examples/launch/throughput.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#!/usr/bin/env python3
2+
3+
import time
4+
import argparse
5+
6+
from jobspec.transformer.flux import FluxHierarchy
7+
from jobspec.logger import LogColors
8+
9+
def parse_args():
10+
parser = argparse.ArgumentParser(description="Run job throughput test on a Flux Hierarchy")
11+
parser.add_argument("config", help="Path to the FluxHierarchy YAML configuration file")
12+
parser.add_argument("-n", "--njobs", type=int, metavar="N", help="Total number of jobs to run", default=100)
13+
parser.add_argument("-t", "--runtime", help="Simulated runtime of each job (default=1ms)", default="0.001s")
14+
parser.add_argument("-x", "--exec", help="Do not simulate, actually run jobs", action="store_true")
15+
parser.add_argument("-o", "--setopt", action="append", help="Set shell option OPT or OPT=VAL", metavar="OPT")
16+
parser.add_argument("--setattr", action="append", help="Set job attribute ATTR=VAL", metavar="ATTR=VAL")
17+
parser.add_argument("command", nargs=argparse.REMAINDER, default=["true"])
18+
return parser.parse_args()
19+
20+
def main():
21+
args = parse_args()
22+
23+
# Instantiate, build, and connect to the Flux Hierarchy!
24+
hierarchy = FluxHierarchy(args.config)
25+
hierarchy.start(interactive=False)
26+
27+
# Default to true if not set.
28+
if not args.command:
29+
args.command = ["true"]
30+
31+
# Run the throughput test using the specialized 'throughput' method
32+
time0 = time.time()
33+
jobs = hierarchy.throughput(args.command, args.njobs)
34+
35+
if not jobs:
36+
print(f"{LogColors.RED}No jobs were tracked. Cannot calculate throughput.{LogColors.ENDC}")
37+
return
38+
39+
# 3. Analyze and print results (logic is unchanged)
40+
first = jobs[min(jobs.keys(), key=lambda x: jobs[x].get("submit", type("o", (), {"timestamp": float('inf')})()).timestamp)]
41+
last = jobs[max(jobs.keys(), key=lambda x: jobs[x].get("clean", type("o", (), {"timestamp": float('-inf')})()).timestamp)]
42+
lastsubmit = jobs[max(jobs.keys(), key=lambda x: jobs[x]["t_submit"])]
43+
44+
submit_time = lastsubmit["t_submit"] - time0
45+
sjps = args.njobs / submit_time if submit_time > 0 else float('inf')
46+
script_runtime = time.time() - time0
47+
48+
job_runtime = last["clean"].timestamp - first["submit"].timestamp
49+
jps = args.njobs / job_runtime if job_runtime > 0 else float('inf')
50+
jpsb = args.njobs / script_runtime if script_runtime > 0 else float('inf')
51+
52+
print(f"\n--- Throughput Results ---")
53+
print(f"number of jobs: {args.njobs} (on {len(hierarchy.handles)} workers)")
54+
print(f" submit time: {submit_time:<6.3f}s ({sjps:5.1f} job/s)")
55+
print(f"script runtime: {script_runtime:<6.3f}s")
56+
print(f" job runtime: {job_runtime:<6.3f}s")
57+
print(f" throughput: {jps:<.1f} job/s (script: {jpsb:5.1f} job/s)")
58+
59+
60+
if __name__ == "__main__":
61+
main()

jobspec/cli/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ def get_parser():
4646
)
4747
subparsers.add_parser("version", description="show software version")
4848

49+
# start an instane hierarchy
50+
start = subparsers.add_parser(
51+
"start",
52+
formatter_class=argparse.RawTextHelpFormatter,
53+
description="start an instance hierarchy",
54+
)
55+
start.add_argument("config", help="configuration file for hierarchy")
56+
start.add_argument("-o", "--out", help="output directory for hierarchy assets", default=None)
57+
4958
# Maybe this warrants a better name, but this seems to be what we'd want to do -
5059
# run a jobspec
5160
run = subparsers.add_parser(
@@ -125,6 +134,8 @@ def help(return_code=0):
125134
from .run import main
126135
elif args.command == "satisfy":
127136
from .satisfy import main
137+
elif args.command == "start":
138+
from .start import main
128139
else:
129140
help(1)
130141
main(args, extra)

jobspec/cli/start.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#!/usr/bin/env python
2+
3+
from jobspec.transformer.flux import FluxHierarchy
4+
5+
6+
def main(args, _):
7+
"""
8+
Start a set of nested instances to submit jobs to.
9+
"""
10+
hier = FluxHierarchy(args.config, outdir=args.out)
11+
# Note that this returns the uri lookup, if we wanted
12+
# to use it somewhere.
13+
hier.start()

jobspec/runner.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,21 @@ def run(self, filename):
5757
"""
5858
# Load the jobspec
5959
jobspec = self.load_jobspec(filename)
60+
print(jobspec.data)
6061

6162
# Get validated transformation steps
6263
# These will depend on the transformer logic
6364
steps = self.parse(jobspec)
65+
print(jobspec.data)
6466
self.announce()
6567

6668
# Run each step to submit the job, and that's it.
6769
for step in steps:
6870
step_runner.run(self.name, step)
71+
print("POST RUN")
72+
import IPython
73+
74+
IPython.embed()
6975

7076
def load_jobspec(self, filename):
7177
"""
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
1+
from .hierarchy import FluxHierarchy
12
from .workload import FluxWorkload as Transformer
3+
4+
assert Transformer
5+
assert FluxHierarchy

0 commit comments

Comments
 (0)