From 5a26db72c7c62ef5c6e55efacc4f41fa65a6e196 Mon Sep 17 00:00:00 2001 From: saivishal1999 Date: Wed, 6 May 2026 04:04:41 -0700 Subject: [PATCH] multidevice: enable NCCL Copy Engine allgather via CTAPolicy=ZERO When `SymmetricMemoryBackend::PyTorchNccl` is active, configure the ProcessGroupNCCL with `NCCL_CTA_POLICY_ZERO` at creation time. Combined with NCCL-window-registered buffers (allocated via `empty_strided_p2p` and rendezvous'd through `SymmetricTensor::setupRemoteHandles`), this causes NCCL to select the Copy Engine (DMA) path for allgather, freeing SM/CTA resources. Add `SymmetricTensorTest.CopyEngineAllgather` to verify correctness of an allgather over symm_mem buffers on all 8 ranks. Confirmed via NCCL logs: - `Comm config CTA policy flags set to 2` on each rank - `Inserted window ... into address map` for both input/output buffers - `AllGather [Copy Engine]: ... -> cudaMemcpy; CE synchronization with NVLS` --- csrc/multidevice/communicator.cpp | 6 ++ .../cpp/test_multidevice_symmetric_tensor.cpp | 62 +++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/csrc/multidevice/communicator.cpp b/csrc/multidevice/communicator.cpp index afe2ff1308d..f8ef81f07d8 100644 --- a/csrc/multidevice/communicator.cpp +++ b/csrc/multidevice/communicator.cpp @@ -27,6 +27,7 @@ #include "base.h" #include "cuda_utils.h" +#include "multidevice/ipc_utils.h" #include "options.h" namespace nvfuser { @@ -156,6 +157,11 @@ c10::intrusive_ptr createBackend( #ifdef USE_C10D_NCCL if (backend == CommunicatorBackend::kNccl) { auto pg_opts = c10::make_intrusive<::c10d::ProcessGroupNCCL::Options>(); +#ifdef NCCL_HAS_CTA_POLICY + if (getSymmetricMemoryBackend() == SymmetricMemoryBackend::PyTorchNccl) { + pg_opts->config.CTAPolicy = NCCL_CTA_POLICY_ZERO; + } +#endif return c10::make_intrusive<::c10d::ProcessGroupNCCL>( store, rank, size, pg_opts); } diff --git a/tests/cpp/test_multidevice_symmetric_tensor.cpp b/tests/cpp/test_multidevice_symmetric_tensor.cpp index ec166697939..58ee796a814 100644 --- a/tests/cpp/test_multidevice_symmetric_tensor.cpp +++ b/tests/cpp/test_multidevice_symmetric_tensor.cpp @@ -5,7 +5,10 @@ * SPDX-License-Identifier: BSD-3-Clause */ // clang-format on +#include + #include "multidevice/cuda_p2p.h" +#include "multidevice/communicator.h" #include "multidevice/ipc_utils.h" #include "multidevice/symmetric_tensor.h" #include "tests/cpp/multidevice.h" @@ -385,4 +388,63 @@ TEST_F(SymmetricTensorTest, SmallAllocationMulticast) { #endif } +// Verifies that allgather over symm_mem-allocated tensors produces correct +// results when the NCCL PG is configured with CTAPolicy=ZERO (CE path). +// Run with NVFUSER_ENABLE=symmetric_memory_backend(pytorch_nccl). +TEST_F(SymmetricTensorTest, CopyEngineAllgather) { + if (getSymmetricMemoryBackend() != SymmetricMemoryBackend::PyTorchNccl) { + GTEST_SKIP() + << "Test requires NVFUSER_ENABLE=symmetric_memory_backend(pytorch_nccl)"; + } + if (communicator_->size() == 1) { + GTEST_SKIP() << "Skipping single-device run"; + } + if (!communicator_->isBackendAvailable(CommunicatorBackend::kNccl)) { + GTEST_SKIP() << "NCCL backend not available"; + } + + const int64_t rank = communicator_->deviceId(); + const int64_t world_size = communicator_->size(); + // 4MB per rank — large enough that CE scheduling overhead is worthwhile. + constexpr int64_t kNumElems = 1024 * 1024; + + // Allocate via empty_strided_p2p so NCCL can window-register the buffers, + // which is required for the Copy Engine collective path. + at::Tensor input = SymmetricTensor::allocate( + {kNumElems}, at::ScalarType::Float, communicator_->device()); + at::Tensor output = SymmetricTensor::allocate( + {world_size * kNumElems}, at::ScalarType::Float, communicator_->device()); + + // setupRemoteHandles triggers c10d::symmetric_memory::rendezvous, which + // performs the NCCL window registration on both buffers. + SymmetricTensor input_sym(input); + SymmetricTensor output_sym(output); + input_sym.setupRemoteHandles(); + output_sym.setupRemoteHandles(); + + // Each rank fills its input with a unique value (rank+1). + input.fill_(static_cast(rank + 1)); + + // getBackendForTeam returns the NCCL PG created with CTAPolicy=ZERO by our + // change, so _allgather_base will use the Copy Engine when both conditions + // (CTAPolicy=ZERO + window-registered buffers) are met. + Team all_ranks(world_size); + std::iota(all_ranks.begin(), all_ranks.end(), 0); + c10d::Backend* backend = + communicator_->getBackendForTeam(all_ranks, CommunicatorBackend::kNccl); + ASSERT_NE(backend, nullptr); + + auto work = backend->_allgather_base(output, input, {}); + work->wait(); + + // Validate: gathered slice for rank r must equal r+1 on every rank. + at::Tensor output_cpu = output.cpu(); + for (int64_t r = 0; r < world_size; ++r) { + at::Tensor slice = + output_cpu.slice(0, r * kNumElems, (r + 1) * kNumElems); + EXPECT_TRUE(slice.eq(static_cast(r + 1)).all().item()) + << "Rank " << rank << ": allgather mismatch for source rank " << r; + } +} + } // namespace nvfuser