Skip to content

Commit b6d0ca1

Browse files
authored
Adding CI Test to DSL Executor (#782)
1 parent b59e6d7 commit b6d0ca1

18 files changed

Lines changed: 3889 additions & 0 deletions
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
parameters:
2+
- name: subscription
3+
type: string
4+
- name: vmssName
5+
type: string
6+
- name: platform
7+
type: string
8+
default: 'cuda'
9+
- name: gpuArch
10+
type: string
11+
12+
steps:
13+
- template: deploy.yml
14+
parameters:
15+
subscription: ${{ parameters.subscription }}
16+
vmssName: ${{ parameters.vmssName }}
17+
platform: ${{ parameters.platform }}
18+
gpuArch: ${{ parameters.gpuArch }}
19+
deployArgs: 'single-node-test true ${{ parameters.platform }}'
20+
21+
22+
- template: run-remote-task.yml
23+
parameters:
24+
name: ExecutorTest
25+
displayName: Run executor tests
26+
remoteScript: |
27+
python3 -m pip install .
28+
PLANS_DIR=/root/mscclpp/test/executor-tests/execution-plans
29+
TEST_SCRIPT=/root/mscclpp/python/test/executor_test.py
30+
mpirun -np 2 --allow-run-as-root python3 $TEST_SCRIPT -path $PLANS_DIR/transfer_pack.json --size 32M --in_place
31+
mpirun -np 2 --allow-run-as-root python3 $TEST_SCRIPT -path $PLANS_DIR/transfer_pack_tbg.json --size 32M --in_place
32+
mpirun -np 2 --allow-run-as-root python3 $TEST_SCRIPT -path $PLANS_DIR/reduce.json --size 32M --in_place
33+
mpirun -np 2 --allow-run-as-root python3 $TEST_SCRIPT -path $PLANS_DIR/reduce_tbg.json --size 32M --in_place
34+
mpirun -np 2 --allow-run-as-root python3 $TEST_SCRIPT -path $PLANS_DIR/reduce_pack.json --size 32M --in_place
35+
mpirun -np 2 --allow-run-as-root python3 $TEST_SCRIPT -path $PLANS_DIR/reduce_pack_tbg.json --size 32M --in_place
36+
mpirun -np 2 --allow-run-as-root python3 $TEST_SCRIPT -path $PLANS_DIR/reduce_nvls.json --size 32M --in_place
37+
mpirun -np 2 --allow-run-as-root python3 $TEST_SCRIPT -path $PLANS_DIR/reduce_nvls_pipeline.json --size 32M --in_place
38+
39+
- template: stop.yml
40+
parameters:
41+
subscription: ${{ parameters.subscription }}
42+
vmssName: ${{ parameters.vmssName }}

.azure-pipelines/ut.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,3 +148,24 @@ jobs:
148148
vmssName: mscclpp-mi300x-ci
149149
platform: rocm
150150
gpuArch: gfx942
151+
152+
- job: UnitTestExecutor
153+
timeoutInMinutes: 60
154+
displayName: Test DSL Executor
155+
pool:
156+
name: msccl-ci-h100
157+
158+
strategy:
159+
matrix:
160+
cuda12:
161+
containerImage: ghcr.io/microsoft/mscclpp/mscclpp:base-dev-cuda12.9
162+
163+
container:
164+
image: $(containerImage)
165+
166+
steps:
167+
- template: templates/ut-executor.yml
168+
parameters:
169+
subscription: mscclpp-ci-h100
170+
vmssName: mscclpp-h100-ci
171+
gpuArch: '90'
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
"""
5+
Reduce Test
6+
7+
This file tests the PUT, GET, COPY, REDUCE_SEND and READ_REDUCE_SEND
8+
operations. It implements a 2-GPU allreduce using the Simple protocol
9+
with instruction fusion enabled.
10+
"""
11+
12+
import argparse
13+
from mscclpp.language.channel import *
14+
from mscclpp.language.rank import *
15+
from mscclpp.language.general import *
16+
from mscclpp.language.program import *
17+
from mscclpp.language.collectives import *
18+
19+
20+
def reduce(name, num_threads_per_block, min_message_size, max_message_size):
21+
collective = AllReduce(2, 2, True)
22+
with CollectiveProgram(
23+
name,
24+
collective,
25+
2,
26+
protocol="Simple",
27+
instr_fusion=True,
28+
num_threads_per_block=num_threads_per_block,
29+
use_double_scratch_buffer=False,
30+
min_message_size=min_message_size,
31+
max_message_size=max_message_size,
32+
):
33+
# Setup ranks, channels, input and scratch buffers for 2-GPU allreduce
34+
first_rank = Rank(0)
35+
second_rank = Rank(1)
36+
first_ch = MemoryChannel(1, 0)
37+
second_ch = MemoryChannel(0, 1)
38+
first_input_buffer = first_rank.get_input_buffer()
39+
second_input_buffer = second_rank.get_input_buffer()
40+
first_scratch_buffer = Buffer(0, 4)
41+
second_scratch_buffer = Buffer(1, 4)
42+
43+
# Each rank copies its input chunks to scratch to prepare for remote access
44+
first_rank.copy(first_scratch_buffer[2:4], first_input_buffer[2:4], tb=0)
45+
second_rank.copy(second_scratch_buffer[0:2], second_input_buffer[0:2], tb=0)
46+
47+
# Signal and wait to ensure scratch data is visible to the remote rank
48+
first_ch.signal(tb=0)
49+
second_ch.signal(tb=0)
50+
51+
first_ch.wait(tb=0)
52+
second_ch.wait(tb=0)
53+
54+
# Rank 0 reduces chunk 0 from rank 1's scratch and writes result to both ranks
55+
first_ch.reduce(first_input_buffer[0:1], [second_scratch_buffer[0:1]], tb=0)
56+
first_ch.put(second_input_buffer[0:1], first_input_buffer[0:1], tb=0)
57+
58+
# Rank 0 fetches chunk 1 from rank 1's scratch, reduces locally, and writes result to both ranks
59+
first_ch.get(first_scratch_buffer[1:2], second_scratch_buffer[1:2], tb=0)
60+
first_rank.reduce(first_input_buffer[1:2], [first_scratch_buffer[1:2]], tb=0)
61+
first_ch.put(second_input_buffer[1:2], first_input_buffer[1:2], tb=0)
62+
63+
# Rank 1 reduces chunks 2-3 from rank 0's input, copies to scratch, and writes result to both ranks
64+
second_ch.reduce(second_input_buffer[2:4], [first_input_buffer[2:4]], tb=0)
65+
second_rank.copy(second_scratch_buffer[2:4], second_input_buffer[2:4], tb=0)
66+
second_ch.put(first_input_buffer[2:4], second_scratch_buffer[2:4], tb=0)
67+
68+
# Final signal/wait to ensure all reduced data is consistent across both ranks
69+
first_ch.signal(tb=0)
70+
second_ch.signal(tb=0)
71+
72+
first_ch.wait(tb=0)
73+
second_ch.wait(tb=0)
74+
75+
print(JSON())
76+
77+
78+
parser = argparse.ArgumentParser()
79+
80+
parser.add_argument("--name", type=str, help="name of the program")
81+
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
82+
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
83+
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")
84+
85+
args = parser.parse_args()
86+
87+
reduce(args.name, args.num_threads_per_block, args.min_message_size, args.max_message_size)
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
"""
5+
Reduce NVLS Test
6+
7+
This file tests the executor MULTI_LOAD_REDUCE_STORE operation using
8+
NVLS SwitchChannels. Each GPU reduces its chunk via the
9+
NVSwitch and broadcasts the result to all other GPUs.
10+
"""
11+
12+
import argparse
13+
from mscclpp.language.channel import *
14+
from mscclpp.language.rank import *
15+
from mscclpp.language.general import *
16+
from mscclpp.language.program import *
17+
from mscclpp.language.collectives import *
18+
19+
20+
def reduce_nvls(name, gpu_size, num_threads_per_block, min_message_size, max_message_size):
21+
chunksperloop = 1
22+
collective = AllReduce(gpu_size, chunksperloop, True)
23+
with CollectiveProgram(
24+
name,
25+
collective,
26+
gpu_size,
27+
instances=1,
28+
protocol="Simple",
29+
num_threads_per_block=num_threads_per_block,
30+
use_double_scratch_buffer=False,
31+
min_message_size=min_message_size,
32+
max_message_size=max_message_size,
33+
):
34+
# Creating Channels
35+
nvls_chan = SwitchChannel(rank_list=[gpu for gpu in range(gpu_size)], buffer_type=BufferType.input)
36+
channels = {}
37+
for gpu in range(gpu_size):
38+
for peer in range(gpu_size):
39+
if peer != gpu:
40+
channels[(peer, gpu)] = MemoryChannel(peer, gpu)
41+
42+
# Synchronization to Ensure all the GPUs are Ready
43+
for gpu in range(gpu_size):
44+
src_rank = gpu
45+
for peer in range(gpu_size):
46+
if peer != src_rank:
47+
dst_rank = peer
48+
channels[(dst_rank, src_rank)].signal(tb=0, relaxed=True)
49+
for peer in range(gpu_size):
50+
if peer != src_rank:
51+
dst_rank = peer
52+
channels[(dst_rank, src_rank)].wait(tb=0, relaxed=True, data_sync=SyncType.after)
53+
54+
# Reducing and Storing the data
55+
for gpu in range(gpu_size):
56+
buffer_offset = gpu
57+
rank = Rank(gpu)
58+
input_buffer = rank.get_input_buffer()
59+
nvls_chan.at_rank(gpu).reduce(
60+
buffer_offset=buffer_offset, size=1, dst_chunk=input_buffer[gpu : gpu + 1], tb=0
61+
)
62+
nvls_chan.at_rank(gpu).broadcast(
63+
src_chunk=input_buffer[gpu : gpu + 1], buffer_offset=buffer_offset, size=1, tb=0
64+
)
65+
66+
# Synchronization to Ensure the GPUs finished
67+
for gpu in range(gpu_size):
68+
src_rank = gpu
69+
for peer in range(gpu_size):
70+
if peer != src_rank:
71+
dst_rank = peer
72+
channels[(dst_rank, src_rank)].signal(tb=0, relaxed=True, data_sync=SyncType.before)
73+
for peer in range(gpu_size):
74+
if peer != src_rank:
75+
dst_rank = peer
76+
channels[(dst_rank, src_rank)].wait(tb=0, relaxed=True)
77+
78+
print(JSON())
79+
80+
81+
parser = argparse.ArgumentParser()
82+
83+
parser.add_argument("--name", type=str, help="name of the program")
84+
parser.add_argument("--num_gpus", type=int, help="number of gpus")
85+
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
86+
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
87+
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")
88+
89+
args = parser.parse_args()
90+
91+
reduce_nvls(args.name, args.num_gpus, args.num_threads_per_block, args.min_message_size, args.max_message_size)
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
"""
5+
Reduce NVLS Pipeline Test
6+
7+
This file tests the executor MULTI_LOAD_REDUCE_STORE operation in a
8+
pipeline context using SwitchChannel. Each GPU reduces
9+
its chunk via the NVSwitch and broadcasts the result, processing data
10+
in a pipelined loop over fixed-size iterations.
11+
"""
12+
13+
import argparse
14+
from mscclpp.language.channel import *
15+
from mscclpp.language.rank import *
16+
from mscclpp.language.general import *
17+
from mscclpp.language.program import *
18+
from mscclpp.language.collectives import *
19+
from mscclpp.language.loop import LoopIterationContext
20+
21+
22+
def reduce_nvls_pipeline(name, gpu_size, num_threads_per_block, min_message_size, max_message_size):
23+
chunksperloop = 1
24+
collective = AllReduce(gpu_size, chunksperloop, True)
25+
with CollectiveProgram(
26+
name,
27+
collective,
28+
gpu_size,
29+
instances=1,
30+
protocol="Simple",
31+
num_threads_per_block=num_threads_per_block,
32+
use_double_scratch_buffer=False,
33+
min_message_size=min_message_size,
34+
max_message_size=max_message_size,
35+
):
36+
# Creating Channels
37+
nvls_chan = SwitchChannel(rank_list=[gpu for gpu in range(gpu_size)], buffer_type=BufferType.input)
38+
channels = {}
39+
for gpu in range(gpu_size):
40+
for peer in range(gpu_size):
41+
if peer != gpu:
42+
channels[(peer, gpu)] = MemoryChannel(peer, gpu)
43+
44+
# Synchronization to Ensure all the GPUs are Ready
45+
for gpu in range(gpu_size):
46+
src_rank = gpu
47+
for peer in range(gpu_size):
48+
if peer != src_rank:
49+
dst_rank = peer
50+
channels[(dst_rank, src_rank)].signal(tb=0, relaxed=True)
51+
for peer in range(gpu_size):
52+
if peer != src_rank:
53+
dst_rank = peer
54+
channels[(dst_rank, src_rank)].wait(tb=0, relaxed=True, data_sync=SyncType.after)
55+
56+
# Pipeline Reducing and Storing the data
57+
with LoopIterationContext(unit=2**20, num_chunks=1):
58+
for gpu in range(gpu_size):
59+
buffer_offset = gpu
60+
rank = Rank(gpu)
61+
input_buffer = rank.get_input_buffer()
62+
nvls_chan.at_rank(gpu).reduce(
63+
buffer_offset=buffer_offset, size=1, dst_chunk=input_buffer[gpu : gpu + 1], tb=0
64+
)
65+
nvls_chan.at_rank(gpu).broadcast(
66+
src_chunk=input_buffer[gpu : gpu + 1], buffer_offset=buffer_offset, size=1, tb=0
67+
)
68+
69+
# Synchronization to Ensure the GPUs finished
70+
for gpu in range(gpu_size):
71+
src_rank = gpu
72+
for peer in range(gpu_size):
73+
if peer != src_rank:
74+
dst_rank = peer
75+
channels[(dst_rank, src_rank)].signal(tb=0, relaxed=True, data_sync=SyncType.before)
76+
for peer in range(gpu_size):
77+
if peer != src_rank:
78+
dst_rank = peer
79+
channels[(dst_rank, src_rank)].wait(tb=0, relaxed=True)
80+
81+
print(JSON())
82+
83+
84+
parser = argparse.ArgumentParser()
85+
86+
parser.add_argument("--name", type=str, help="name of the program")
87+
parser.add_argument("--num_gpus", type=int, help="number of gpus")
88+
parser.add_argument("--num_threads_per_block", type=int, default=1024, help="number of threads per block")
89+
parser.add_argument("--min_message_size", type=int, default=0, help="minimum message size")
90+
parser.add_argument("--max_message_size", type=int, default=2**64 - 1, help="maximum message size")
91+
92+
args = parser.parse_args()
93+
94+
reduce_nvls_pipeline(args.name, args.num_gpus, args.num_threads_per_block, args.min_message_size, args.max_message_size)

0 commit comments

Comments
 (0)