diff --git a/csrc/multidevice/c10d_mock.h b/csrc/multidevice/c10d_mock.h index 33a061e1481..bc51fe02dfb 100644 --- a/csrc/multidevice/c10d_mock.h +++ b/csrc/multidevice/c10d_mock.h @@ -20,8 +20,12 @@ #pragma once +#include #include #include +#include +#include +#include #include namespace c10d { @@ -34,7 +38,7 @@ class Work : public torch::CustomClassHolder { }; struct ReduceOp : torch::CustomClassHolder { - enum RedOpType { + enum RedOpType : std::uint8_t { SUM, AVG, PRODUCT, @@ -211,4 +215,58 @@ class TCPStore : public torch::CustomClassHolder { } }; +class ProcessGroup : public torch::CustomClassHolder { + public: +}; + +inline c10::intrusive_ptr resolve_process_group( + const std::string& group_name) { + return c10::make_intrusive(); +} + +inline void register_process_group( + const std::string& group_name, + const c10::intrusive_ptr& group) {} + +inline void unregister_process_group(const std::string& group_name) {} + } // namespace c10d + +namespace c10d::symmetric_memory { + +class SymmetricMemory : public torch::CustomClassHolder { + public: + ~SymmetricMemory() override = default; + virtual bool has_multicast_support() { + return false; + } + virtual void* get_multicast_ptr() { + return nullptr; + } + at::Tensor get_remote_tensor( + int peer, + c10::IntArrayRef sizes, + c10::ScalarType dtype) { + return at::empty(sizes, at::TensorOptions().dtype(dtype)); + } +}; + +inline void set_backend(const std::string&) {} + +inline at::Tensor empty_strided_p2p( + c10::IntArrayRef size, + c10::IntArrayRef stride, + c10::ScalarType dtype, + c10::Device device, + const std::optional& group_name, + std::optional alloc_id) { + return at::empty(size, at::TensorOptions().dtype(dtype)); +} + +inline c10::intrusive_ptr rendezvous( + const at::Tensor& tensor, + const std::optional& group_name = std::nullopt) { + return c10::make_intrusive(); +} + +} // namespace c10d::symmetric_memory diff --git a/csrc/multidevice/communicator.cpp b/csrc/multidevice/communicator.cpp index dbd65ba4610..afe2ff1308d 100644 --- a/csrc/multidevice/communicator.cpp +++ b/csrc/multidevice/communicator.cpp @@ -14,6 +14,7 @@ #include #ifdef NVFUSER_DISTRIBUTED +#include #include #include #ifdef USE_C10D_NCCL @@ -121,7 +122,8 @@ bool parseEnv( } // retrieves master port - if ((env = std::getenv("NVFUSER_MASTER_PORT")) != nullptr) { + env = std::getenv("NVFUSER_MASTER_PORT"); + if (env != nullptr) { master_port = std::atoi(env); } else { LOG(INFO) << "The environment variable NVFUSER_MASTER_PORT has not been " @@ -248,10 +250,10 @@ void waitForDebuggerAtRanks( std::cerr << "Process " << pid << " is waiting for the debugger. To continue debugging, " << "start gdb, `attach " << pid - << "`, `set var waiting=false`, and `fini`." << std::endl; + << "`, `set var waiting=false`, and `fini`.\n"; while (waiting) { // Please change `waiting` in the debugger. } - std::cerr << "Process " << getpid() << " finished waiting." << std::endl; + std::cerr << "Process " << getpid() << " finished waiting.\n"; } if (communicator->is_available()) { @@ -331,6 +333,13 @@ Communicator& Communicator::getInstance() { return *communicator; } +void Communicator::registerProcessGroup( + const std::string& name, + const c10::intrusive_ptr& pg) { + c10d::register_process_group(name, pg); + process_groups_[name] = pg; +} + void Communicator::cleanup() { static bool cleaned_up = false; NVF_CHECK( @@ -349,12 +358,13 @@ void Communicator::cleanup() { store_ = nullptr; -#if defined(NVFUSER_DISTRIBUTED) && defined(USE_C10D_NCCL) +#if defined(NVFUSER_DISTRIBUTED) +#if defined(USE_C10D_NCCL) // Sort backends to work around a NCCL bug (nvbugs/4889623). Closing backends // in different orders between ranks have been causing a hang. std::vector>> keyed_backends(backends_.begin(), backends_.end()); - std::sort(keyed_backends.begin(), keyed_backends.end()); + std::ranges::sort(keyed_backends.begin(), keyed_backends.end()); for (auto& [key, backend] : keyed_backends) { // Call shutdown before destructing a ProcessGroupNCCL as instructed by // https://github.com/pytorch/pytorch/blob/e62073d7997c9e63896cb5289ffd0874a8cc1838/torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp#L1164-L1170. @@ -362,6 +372,11 @@ void Communicator::cleanup() { pg_nccl->shutdown(); } } +#endif + for (const auto& entry : process_groups_) { + c10d::unregister_process_group(entry.first); + } + process_groups_.clear(); #endif backends_.clear(); } @@ -382,16 +397,16 @@ c10d::Backend* Communicator::getBackendForTeam( // generate a string key which is unique to the team // create the team and cache it std::string team_key = prefix + getTeamKey(team, b); + // check that the caller's rank belongs to the requested team + auto rank_it = std::ranges::find(team.begin(), team.end(), deviceId()); + if (rank_it == team.end()) { + return nullptr; + } // check if backend associated with the team is present in the cache if (backends_.find(team_key) == backends_.end()) { // create the backend and cache it #ifdef NVFUSER_DISTRIBUTED backends_[team_key] = [&]() -> c10::intrusive_ptr { - // check that the caller's rank belongs to the requested team - auto rank_it = std::find(team.begin(), team.end(), deviceId()); - if (rank_it == team.end()) { - return nullptr; - } // retrieve the caller's rank index/position in the team RankType team_rank = std::distance(team.begin(), rank_it); return createBackend( @@ -404,6 +419,26 @@ c10d::Backend* Communicator::getBackendForTeam( backends_[team_key] = nullptr; #endif } +#if defined(NVFUSER_DISTRIBUTED) && defined(USE_DISTRIBUTED) + if (process_groups_.find(team_key) == process_groups_.end()) { + if (b == CommunicatorBackend::kNccl) { + RankType team_rank = std::distance(team.begin(), rank_it); + + auto pg = c10::make_intrusive( + c10::make_intrusive(team_key, store_), + team_rank, + static_cast(team.size())); + pg->setBackend( + c10::DeviceType::CUDA, + c10d::ProcessGroup::BackendType::NCCL, + backends_[team_key]); + pg->setDefaultBackend(c10d::ProcessGroup::BackendType::NCCL); + pg->setGroupName(team_key); + + registerProcessGroup(team_key, pg); + } + } +#endif return backends_.at(team_key).get(); } diff --git a/csrc/multidevice/communicator.h b/csrc/multidevice/communicator.h index b56e6fee3aa..f19707334b2 100644 --- a/csrc/multidevice/communicator.h +++ b/csrc/multidevice/communicator.h @@ -11,8 +11,9 @@ #include #include -#ifdef NVFUSER_DISTRIBUTED +#if defined(NVFUSER_DISTRIBUTED) #include +#include #include #include #else @@ -110,6 +111,10 @@ class NVF_API Communicator { c10d::Backend* getWorld( std::optional backend = std::nullopt); + void registerProcessGroup( + const std::string& name, + const c10::intrusive_ptr& pg); + // returns if a backend is available for creation bool isBackendAvailable(CommunicatorBackend backend) const { if (backend == CommunicatorBackend::kUcc) { @@ -153,6 +158,10 @@ class NVF_API Communicator { c10::intrusive_ptr store_; // cache for the created backends. The keys are strings generated from Teams std::unordered_map> backends_; + // c10d process-group wrappers registered for symmetric-memory rendezvous. + // Keeps track of the process groups created for the rendezvous. + std::unordered_map> + process_groups_; }; } // namespace nvfuser diff --git a/csrc/multidevice/ipc_utils.cpp b/csrc/multidevice/ipc_utils.cpp index 656b4ee5e24..ae5461e4e70 100644 --- a/csrc/multidevice/ipc_utils.cpp +++ b/csrc/multidevice/ipc_utils.cpp @@ -38,7 +38,7 @@ int createIpcSocket(const std::string& path) { int sockfd = socket(AF_UNIX, SOCK_STREAM, 0); NVF_CHECK(sockfd >= 0, "Failed to create socket: ", strerror(errno)); - struct sockaddr_un addr; + struct sockaddr_un addr{}; setupSockAddr(addr, path); // For abstract namespace, len is usually calculated specifically, but for @@ -69,7 +69,7 @@ void sendFd( int sockfd = socket(AF_UNIX, SOCK_STREAM, 0); NVF_CHECK(sockfd >= 0, "Failed to create socket: ", strerror(errno)); - struct sockaddr_un addr; + struct sockaddr_un addr{}; setupSockAddr(addr, path); socklen_t addrlen = sizeof(addr.sun_family) + path.length(); @@ -77,8 +77,9 @@ void sendFd( int ret = -1; for (int i = 0; i < 100; ++i) { ret = connect(sockfd, (struct sockaddr*)&addr, addrlen); - if (ret == 0) + if (ret == 0) { break; + } usleep(10000); // 10ms } if (ret < 0) { @@ -86,14 +87,16 @@ void sendFd( NVF_CHECK(false, "Failed to connect to ", path, ": ", strerror(errno)); } - struct msghdr msg = {0}; - struct cmsghdr* cmsg; + struct msghdr msg{}; + struct cmsghdr* cmsg = nullptr; + // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays, modernize-avoid-c-arrays) char buf[CMSG_SPACE(sizeof(int))]; // If no header data, send at least one byte char dummy = '.'; - struct iovec iov; + struct iovec iov{}; if (header_data && header_len > 0) { + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast) iov.iov_base = const_cast(header_data); iov.iov_len = header_len; } else { @@ -121,21 +124,22 @@ void sendFd( } int recvFd(int socket_fd, void* header_data, size_t header_len) { - struct sockaddr_un client_addr; + struct sockaddr_un client_addr{}; socklen_t client_len = sizeof(client_addr); int client_fd = accept(socket_fd, (struct sockaddr*)&client_addr, &client_len); NVF_CHECK(client_fd >= 0, "Failed to accept connection: ", strerror(errno)); - struct msghdr msg = {0}; - struct cmsghdr* cmsg; + struct msghdr msg{}; + struct cmsghdr* cmsg = nullptr; + // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays, modernize-avoid-c-arrays) char buf[CMSG_SPACE(sizeof(int))]; // If header_len > 0, we expect that much data. // Note: recvmsg might return fewer bytes if strict requirements aren't met, // but for local unix sockets with small payloads, it usually delivers all. - char dummy; - struct iovec iov; + char dummy = '.'; + struct iovec iov{}; if (header_data && header_len > 0) { iov.iov_base = header_data; iov.iov_len = header_len; @@ -168,7 +172,7 @@ int recvFd(int socket_fd, void* header_data, size_t header_len) { int recv_fd = -1; cmsg = CMSG_FIRSTHDR(&msg); - if (cmsg != NULL && cmsg->cmsg_len == CMSG_LEN(sizeof(int))) { + if (cmsg != nullptr && cmsg->cmsg_len == CMSG_LEN(sizeof(int))) { if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { memcpy(&recv_fd, CMSG_DATA(cmsg), sizeof(int)); } @@ -191,4 +195,22 @@ MulticastProtocol getMulticastProtocol() { return MulticastProtocol::BatchMemcpy; } +SymmetricMemoryBackend getSymmetricMemoryBackend() { + if (isOptionEnabled(EnableOption::SymmetricMemoryBackend)) { + if (hasEnableOptionArgument( + EnableOption::SymmetricMemoryBackend, "pytorch_nccl")) { + return SymmetricMemoryBackend::PyTorchNccl; + } + if (hasEnableOptionArgument( + EnableOption::SymmetricMemoryBackend, "pytorch_nvshmem")) { + return SymmetricMemoryBackend::PyTorchNvshmem; + } + if (hasEnableOptionArgument( + EnableOption::SymmetricMemoryBackend, "pytorch_cuda")) { + return SymmetricMemoryBackend::PyTorchCuda; + } + } + return SymmetricMemoryBackend::Native; +} + } // namespace nvfuser diff --git a/csrc/multidevice/ipc_utils.h b/csrc/multidevice/ipc_utils.h index bac466d74f8..f0b69fcdbf1 100644 --- a/csrc/multidevice/ipc_utils.h +++ b/csrc/multidevice/ipc_utils.h @@ -29,10 +29,24 @@ const T& fromBytes(const std::vector& bytes) { // IPC Utils for sharing file descriptors -enum class MulticastProtocol { Memcpy, Multimem, BatchMemcpy }; +enum class MulticastProtocol : uint8_t { Memcpy, Multimem, BatchMemcpy }; MulticastProtocol getMulticastProtocol(); +// Backend for symmetric memory allocation and rendezvous. +// Native: Fuser's own CUDA VMM + IPC implementation (default, maintained). +// PyTorch*: Use PyTorch's symmetric memory +// (torch.distributed._symmetric_memory) with the given transport backend (Nccl, +// Nvshmem, or Cuda). +enum class SymmetricMemoryBackend : uint8_t { + Native, + PyTorchNccl, + PyTorchNvshmem, + PyTorchCuda, +}; + +SymmetricMemoryBackend getSymmetricMemoryBackend(); + // Creates a listening Unix domain socket bound to path. // If path starts with '@', it uses the abstract namespace (replaced with \0). // Returns the socket file descriptor. diff --git a/csrc/multidevice/symmetric_tensor.cpp b/csrc/multidevice/symmetric_tensor.cpp index 553e96638bb..862a40045e3 100644 --- a/csrc/multidevice/symmetric_tensor.cpp +++ b/csrc/multidevice/symmetric_tensor.cpp @@ -19,6 +19,63 @@ namespace nvfuser { namespace { +std::string initSymmMemBackendAndGetGroup(SymmetricMemoryBackend backend) { + static std::once_flag once; + std::call_once(once, [backend]() { + const char* name = nullptr; + switch (backend) { + case SymmetricMemoryBackend::PyTorchNccl: + name = "NCCL"; + c10d::symmetric_memory::set_backend(name); + break; + case SymmetricMemoryBackend::PyTorchNvshmem: + name = "NVSHMEM"; + c10d::symmetric_memory::set_backend(name); + break; + case SymmetricMemoryBackend::PyTorchCuda: + name = "CUDA"; + // set_backend(name) is not required for CUDA backend + break; + default: + NVF_ERROR(false, "Unexpected PyTorch symmetric memory backend"); + } + }); + + Communicator& comm = Communicator::getInstance(); + NVF_CHECK( + comm.is_available(), "Communicator not available for symmetric memory"); + + NVF_CHECK( + comm.isBackendAvailable(CommunicatorBackend::kNccl), + "kNccl backend is required for non-native symmetric memory backend"); + + std::vector all_ranks(comm.size()); + std::iota(all_ranks.begin(), all_ranks.end(), 0); + (void)comm.getBackendForTeam(all_ranks, CommunicatorBackend::kNccl); + std::string group_name = std::accumulate( + std::begin(all_ranks), + std::end(all_ranks), + std::string("nccl"), + [](const std::string& a, const RankType& b) { + return a.empty() ? std::to_string(b) : a + ',' + std::to_string(b); + }); + if (backend == SymmetricMemoryBackend::PyTorchNvshmem) { + static std::once_flag pg0_once; + std::call_once(pg0_once, [&]() { + try { + (void)c10d::resolve_process_group("0"); + } catch (const std::exception&) { + // resolve_process_group throws c10d Error + // (derives from std::exception) + auto pg = c10d::resolve_process_group(group_name); + comm.registerProcessGroup("0", pg); + } + }); + } + + return group_name; +} + // Returns the allocation granularity for symmetric memory. // - query_mcast_granularity: if true, considers multicast granularity // - query_mcast_recommended_granularity: if true, uses recommended (larger) @@ -34,7 +91,7 @@ size_t getGranularityForSymmetricMemory( #if (CUDA_VERSION >= NVF_MIN_CUDA_FOR_MCAST) if (!query_mcast_granularity) { - return alloc_granularity; + return static_cast(alloc_granularity); } // Check if device supports multicast before querying multicast granularity @@ -45,7 +102,7 @@ size_t getGranularityForSymmetricMemory( prop.location.id)); if (is_multicast_supported == 0) { - return alloc_granularity; + return static_cast(alloc_granularity); } // Device supports multicast, query multicast granularity @@ -73,7 +130,7 @@ size_t getGranularityForSymmetricMemory( granularity = mcast_rec_granularity; } - return std::max(alloc_granularity, granularity); + return static_cast(std::max(alloc_granularity, granularity)); #else (void)requested_size_bytes; (void)query_mcast_granularity; @@ -88,6 +145,30 @@ at::Tensor SymmetricTensor::allocate( at::IntArrayRef sizes, at::ScalarType dtype, at::Device device) { + SymmetricMemoryBackend backend = getSymmetricMemoryBackend(); + + if (backend != SymmetricMemoryBackend::Native) { + const std::string group_name = initSymmMemBackendAndGetGroup(backend); + std::vector strides(sizes.size()); + strides.back() = 1; + for (int64_t i = (int64_t)strides.size() - 2; i >= 0; --i) { + strides[i] = strides[i + 1] * sizes[i + 1]; + } + // NCCLSymmetricMemoryAllocator::alloc must not be called with a group_name. + c10::optional alloc_group_name = + (backend == SymmetricMemoryBackend::PyTorchNccl || + backend == SymmetricMemoryBackend::PyTorchNvshmem) + ? c10::nullopt + : c10::optional(group_name); + return c10d::symmetric_memory::empty_strided_p2p( + sizes, + strides, + dtype, + device, + alloc_group_name, + /*alloc_id=*/c10::nullopt); + } + int is_vmm_supported = 0; NVFUSER_CUDA_SAFE_CALL(cuDeviceGetAttribute( &is_vmm_supported, @@ -128,7 +209,7 @@ at::Tensor SymmetricTensor::allocate( std::vector strides(sizes.size()); strides.back() = 1; - for (int64_t i = static_cast(strides.size()) - 2; i >= 0; --i) { + for (int64_t i = std::ssize(strides) - 2; i >= 0; --i) { strides[i] = strides[i + 1] * sizes[i + 1]; } @@ -145,6 +226,9 @@ at::Tensor SymmetricTensor::allocate( } std::string SymmetricTensor::validate(at::Tensor tensor) { + if (getSymmetricMemoryBackend() != SymmetricMemoryBackend::Native) { + return ""; + } int is_vmm_supported = 0; NVFUSER_CUDA_SAFE_CALL(cuDeviceGetAttribute( &is_vmm_supported, @@ -214,6 +298,15 @@ SymmetricTensor::SymmetricTensor(const at::Tensor& local_tensor) "Expected CUDA tensor, got: ", local_tensor.device()); + SymmetricMemoryBackend backend = getSymmetricMemoryBackend(); + if (backend != SymmetricMemoryBackend::Native) { + Communicator& comm = Communicator::getInstance(); + world_size_ = comm.size(); + my_device_id_ = comm.deviceId(); + requested_size_ = local_tensor.numel() * local_tensor.element_size(); + return; + } + std::string error = SymmetricTensor::validate(local_tensor); NVF_CHECK(error.empty(), "Invalid symmetric allocation: ", error); @@ -256,6 +349,9 @@ SymmetricTensor::SymmetricTensor(const at::Tensor& local_tensor) } SymmetricTensor::~SymmetricTensor() { + if (torch_symm_handle_) { + return; // PyTorch backend: no native VMM cleanup + } #if (CUDA_VERSION >= 13000) if (is_multicast_setup_) { if (mc_base_ptr_) { @@ -307,6 +403,21 @@ void SymmetricTensor::setupRemoteHandles(const std::string& tag) { return; } Communicator& comm = Communicator::getInstance(); + // PyTorch backend: perform rendezvous here (lazy, on first + // setupRemoteHandles). + SymmetricMemoryBackend backend = getSymmetricMemoryBackend(); + if (backend != SymmetricMemoryBackend::Native) { + const std::string group_name = initSymmMemBackendAndGetGroup(backend); + comm.barrier(CommunicatorBackend::kNccl); + torch_symm_handle_ = + c10d::symmetric_memory::rendezvous(local_tensor_, group_name); + are_remote_tensors_setup_ = true; + if (torch_symm_handle_->has_multicast_support()) { + is_multicast_setup_ = true; + multicast_ptr_ = torch_symm_handle_->get_multicast_ptr(); + } + return; + } CUmemGenericAllocationHandle local_handle = alloc_handles_[my_device_id_]; CUdeviceptr local_ptr = remote_ptrs_[my_device_id_]; @@ -400,6 +511,13 @@ at::Tensor SymmetricTensor::remoteTensor(int64_t rank) const { return local_tensor_; } + if (torch_symm_handle_) { + return torch_symm_handle_->get_remote_tensor( + static_cast(rank), + local_tensor_.sizes(), + local_tensor_.scalar_type()); + } + NVF_CHECK(are_remote_tensors_setup_ == true, "Remote tensors not setup"); return at::from_blob( // NOLINTNEXTLINE(performance-no-int-to-ptr) @@ -413,14 +531,19 @@ at::Tensor SymmetricTensor::remoteTensor(int64_t rank) const { void* SymmetricTensor::multicastPtr() const { NVF_CHECK(is_multicast_setup_, "Multicast not setup"); - return mc_ptr_; + return multicast_ptr_; } void SymmetricTensor::setupContiguousView(const std::string& tag) { if (is_contiguous_view_setup_) { return; } - + if (torch_symm_handle_) { + NVF_THROW( + "Contiguous view is not yet supported for PyTorch symmetric memory " + "backend." + "Use native backend for SymmetricContiguousView."); + } NVF_CHECK( are_remote_tensors_setup_ == true, "Remote tensors must be setup before setupContiguousView"); @@ -485,6 +608,11 @@ void SymmetricTensor::setupContiguousView(const std::string& tag) { } at::Tensor SymmetricTensor::getContiguousView() const { + if (torch_symm_handle_) { + NVF_THROW( + "Contiguous view is not yet supported for PyTorch symmetric memory " + "backend."); + } NVF_CHECK(is_contiguous_view_setup_, "Contiguous view not setup"); return contiguous_view_; } @@ -496,7 +624,15 @@ void SymmetricTensor::setupMulticast( if (is_multicast_setup_) { return; } - + if (getSymmetricMemoryBackend() != SymmetricMemoryBackend::Native) { + if (!torch_symm_handle_) { + setupRemoteHandles(tag); + NVF_CHECK( + torch_symm_handle_->has_multicast_support(), + "Multicast not supported"); + } + return; + } Communicator& comm = Communicator::getInstance(); const int64_t my_rank = comm.deviceId(); const int64_t local_rank = comm.local_rank(); @@ -591,7 +727,7 @@ void SymmetricTensor::setupMulticast( NVFUSER_CUDA_SAFE_CALL(cuMemSetAccess(mc_ptr, aligned_size_, &access, 1)); // NOLINTNEXTLINE(performance-no-int-to-ptr) - mc_ptr_ = reinterpret_cast(mc_ptr + offset_diff); + multicast_ptr_ = reinterpret_cast(mc_ptr + offset_diff); mc_base_ptr_ = mc_ptr; is_multicast_setup_ = true; diff --git a/csrc/multidevice/symmetric_tensor.h b/csrc/multidevice/symmetric_tensor.h index 395486b302d..ae10cb3dcbc 100644 --- a/csrc/multidevice/symmetric_tensor.h +++ b/csrc/multidevice/symmetric_tensor.h @@ -10,6 +10,13 @@ #include #include +#if defined(NVFUSER_DISTRIBUTED) && defined(USE_DISTRIBUTED) +#include +#include +#else +#include "multidevice/c10d_mock.h" +#endif + namespace nvfuser { // SymmetricTensor wraps a local symmetric memory allocation and enables: @@ -18,13 +25,15 @@ namespace nvfuser { // - Contiguous view creation across all ranks // // Design: Decouples local allocation from IPC handle exchange for better -// interoperability and support for pre-allocated user buffers +// interoperability and support for pre-allocated user buffers. // -// TODO: Long term plan is to integrate pytorch's native symmetric memory as a -// possible backend. One important reason to use pytorch's allocator is to use -// pytorch's memory pool to let the framework own the memory stack and not -// further fragment the memory. On the other hand, having our own implementation -// allows us to experiment more advanced features like contigous view creation. +// Backends (see SymmetricMemoryBackend in ipc_utils.h): +// - Native (default): Fuser's own CUDA VMM + IPC implementation +// - PyTorch (Nccl, Nvshmem, Cuda): Use PyTorch's symmetric memory +// (torch.distributed._symmetric_memory) with the chosen transport backend. +// Select via +// NVFUSER_ENABLE=symmetric_memory_backend(native|pytorch_nccl|pytorch_nvshmem|pytorch_cuda). +// Native remains the default when the option is not set. class SymmetricTensor { public: // Wrap pre-allocated symmetric tensor (must use allocate()) @@ -80,12 +89,14 @@ class SymmetricTensor { bool is_multicast_setup_ = false; CUmemGenericAllocationHandle mcast_handle_{}; CUdevice cu_dev_{}; - void* mc_ptr_{nullptr}; + void* multicast_ptr_{nullptr}; CUdeviceptr mc_base_ptr_{0}; int exporter_rank_{-1}; int peer_fd_{-1}; bool is_contiguous_view_setup_ = false; at::Tensor contiguous_view_; + c10::intrusive_ptr + torch_symm_handle_; }; } // namespace nvfuser diff --git a/csrc/options.cpp b/csrc/options.cpp index 046ffc0e0e1..39affb51306 100644 --- a/csrc/options.cpp +++ b/csrc/options.cpp @@ -182,6 +182,7 @@ const std::unordered_map& getEnableOptions() { {"p2p_protocol", EnableOption::P2pProtocol}, {"p2p_transport", EnableOption::P2pTransport}, {"multicast_protocol", EnableOption::MulticastProtocol}, + {"symmetric_memory_backend", EnableOption::SymmetricMemoryBackend}, }; return available_options; } diff --git a/csrc/options.h b/csrc/options.h index 54afd132354..2ac38a830d6 100644 --- a/csrc/options.h +++ b/csrc/options.h @@ -129,6 +129,8 @@ enum class EnableOption : std::uint8_t { //! CopyEngine) MulticastProtocol, //! Prescribe multicast protocol: //! memcpy|multimem|batch_memcpy + SymmetricMemoryBackend, //! Prescribe symmetric memory backend: + //! native|pytorch_nccl|pytorch_nvshmem|pytorch_cuda EndOfOption //! Placeholder for counting the number of elements }; diff --git a/tests/cpp/test_multidevice_symmetric_tensor.cpp b/tests/cpp/test_multidevice_symmetric_tensor.cpp index cb5e427cf37..ec166697939 100644 --- a/tests/cpp/test_multidevice_symmetric_tensor.cpp +++ b/tests/cpp/test_multidevice_symmetric_tensor.cpp @@ -6,6 +6,7 @@ */ // clang-format on #include "multidevice/cuda_p2p.h" +#include "multidevice/ipc_utils.h" #include "multidevice/symmetric_tensor.h" #include "tests/cpp/multidevice.h" @@ -240,6 +241,9 @@ TEST_F(SymmetricTensorTest, ContiguousView) { if (communicator_->size() == 1) { GTEST_SKIP() << "Skipping test for single device"; } + if (getSymmetricMemoryBackend() != SymmetricMemoryBackend::Native) { + GTEST_SKIP() << "Skipping test for Pytorch symmetric memory backend"; + } const int64_t rank = communicator_->deviceId(); const int64_t world_size = communicator_->size();