Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions csrc/multidevice/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "base.h"
#include "cuda_utils.h"
#include "multidevice/ipc_utils.h"
#include "options.h"

namespace nvfuser {
Expand Down Expand Up @@ -156,6 +157,11 @@ c10::intrusive_ptr<c10d::Backend> createBackend(
#ifdef USE_C10D_NCCL
if (backend == CommunicatorBackend::kNccl) {
auto pg_opts = c10::make_intrusive<::c10d::ProcessGroupNCCL::Options>();
#ifdef NCCL_HAS_CTA_POLICY
if (getSymmetricMemoryBackend() == SymmetricMemoryBackend::PyTorchNccl) {
pg_opts->config.CTAPolicy = NCCL_CTA_POLICY_ZERO;
}
Comment on lines +160 to +163
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 CTAPolicy=ZERO applied to every NCCL PG, not just the world PG

createBackend is called lazily by getBackendForTeam whenever any new team key is created — including sub-team PGs (e.g., tensor-parallel or pipeline-parallel groups). When PyTorchNccl is active, every future NCCL process group will be created with CTAPolicy=ZERO, not only the world communicator used for the symmetric-memory allgather. Operations on those sub-team groups (allreduce, reduce-scatter, broadcast, etc.) that cannot use the CE path will be executed under the ZERO-CTA policy, which may meaningfully change their latency/throughput relative to the default policy.

#endif
return c10::make_intrusive<::c10d::ProcessGroupNCCL>(
store, rank, size, pg_opts);
}
Expand Down
62 changes: 62 additions & 0 deletions tests/cpp/test_multidevice_symmetric_tensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
* SPDX-License-Identifier: BSD-3-Clause
*/
// clang-format on
#include <numeric>

#include "multidevice/cuda_p2p.h"
#include "multidevice/communicator.h"
#include "multidevice/ipc_utils.h"
#include "multidevice/symmetric_tensor.h"
#include "tests/cpp/multidevice.h"
Expand Down Expand Up @@ -385,4 +388,63 @@ TEST_F(SymmetricTensorTest, SmallAllocationMulticast) {
#endif
}

// Verifies that allgather over symm_mem-allocated tensors produces correct
// results when the NCCL PG is configured with CTAPolicy=ZERO (CE path).
// Run with NVFUSER_ENABLE=symmetric_memory_backend(pytorch_nccl).
TEST_F(SymmetricTensorTest, CopyEngineAllgather) {
if (getSymmetricMemoryBackend() != SymmetricMemoryBackend::PyTorchNccl) {
GTEST_SKIP()
<< "Test requires NVFUSER_ENABLE=symmetric_memory_backend(pytorch_nccl)";
}
if (communicator_->size() == 1) {
GTEST_SKIP() << "Skipping single-device run";
}
if (!communicator_->isBackendAvailable(CommunicatorBackend::kNccl)) {
GTEST_SKIP() << "NCCL backend not available";
}

const int64_t rank = communicator_->deviceId();
const int64_t world_size = communicator_->size();
// 4MB per rank — large enough that CE scheduling overhead is worthwhile.
constexpr int64_t kNumElems = 1024 * 1024;

// Allocate via empty_strided_p2p so NCCL can window-register the buffers,
// which is required for the Copy Engine collective path.
at::Tensor input = SymmetricTensor::allocate(
{kNumElems}, at::ScalarType::Float, communicator_->device());
at::Tensor output = SymmetricTensor::allocate(
{world_size * kNumElems}, at::ScalarType::Float, communicator_->device());

// setupRemoteHandles triggers c10d::symmetric_memory::rendezvous, which
// performs the NCCL window registration on both buffers.
SymmetricTensor input_sym(input);
SymmetricTensor output_sym(output);
input_sym.setupRemoteHandles();
output_sym.setupRemoteHandles();

// Each rank fills its input with a unique value (rank+1).
input.fill_(static_cast<float>(rank + 1));

// getBackendForTeam returns the NCCL PG created with CTAPolicy=ZERO by our
// change, so _allgather_base will use the Copy Engine when both conditions
// (CTAPolicy=ZERO + window-registered buffers) are met.
Team all_ranks(world_size);
std::iota(all_ranks.begin(), all_ranks.end(), 0);
c10d::Backend* backend =
communicator_->getBackendForTeam(all_ranks, CommunicatorBackend::kNccl);
Comment on lines +431 to +434
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The test manually replicates the logic inside Communicator::getWorld() — constructing a full-rank Team and calling getBackendForTeam. Using getWorld directly is cleaner and is the existing idiom in the codebase.

Suggested change
Team all_ranks(world_size);
std::iota(all_ranks.begin(), all_ranks.end(), 0);
c10d::Backend* backend =
communicator_->getBackendForTeam(all_ranks, CommunicatorBackend::kNccl);
c10d::Backend* backend = communicator_->getWorld(CommunicatorBackend::kNccl);

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

ASSERT_NE(backend, nullptr);

auto work = backend->_allgather_base(output, input, {});
work->wait();

// Validate: gathered slice for rank r must equal r+1 on every rank.
at::Tensor output_cpu = output.cpu();
for (int64_t r = 0; r < world_size; ++r) {
at::Tensor slice =
output_cpu.slice(0, r * kNumElems, (r + 1) * kNumElems);
EXPECT_TRUE(slice.eq(static_cast<float>(r + 1)).all().item<bool>())
<< "Rank " << rank << ": allgather mismatch for source rank " << r;
}
}

} // namespace nvfuser
Loading