diff --git a/.gitmodules b/.gitmodules index 4b188d6bb1..e531c95507 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,3 +7,7 @@ [submodule "3rdparty/cutlass"] path = 3rdparty/cutlass url = https://github.com/NVIDIA/cutlass.git +[submodule "3rdparty/nccl"] + path = 3rdparty/nccl + url = https://github.com/NVIDIA/nccl.git + branch = v2.30u1 diff --git a/3rdparty/nccl b/3rdparty/nccl new file mode 160000 index 0000000000..146496ac88 --- /dev/null +++ b/3rdparty/nccl @@ -0,0 +1 @@ +Subproject commit 146496ac881bc504ed1a52be0ae7b707ce41e706 diff --git a/memory/MEMORY.md b/memory/MEMORY.md new file mode 100644 index 0000000000..81a9edfd29 --- /dev/null +++ b/memory/MEMORY.md @@ -0,0 +1 @@ +- [Commit message: no TE subsystem prefix](feedback_commit_message_no_te_subsystem_prefix.md) — don't prefix commit subjects with "Expert Parallelism:" or "EP:" in this repo diff --git a/memory/feedback_commit_message_no_te_subsystem_prefix.md b/memory/feedback_commit_message_no_te_subsystem_prefix.md new file mode 100644 index 0000000000..16aa7abc94 --- /dev/null +++ b/memory/feedback_commit_message_no_te_subsystem_prefix.md @@ -0,0 +1,12 @@ +--- +name: feedback-commit-message-no-te-subsystem-prefix +description: For this TE repo, commit messages should not prefix the subject with "Expert Parallelism:" or "EP:" — go straight to the change itself. +metadata: + type: feedback +--- + +When writing commit messages in this Transformer Engine repo, do not prefix the subject with subsystem labels like "Expert Parallelism:" or "EP:". + +**Why:** User feedback during the EP reviewer-feedback session ("in your commit message, don't need to mention 'expert parallelism or EP'"). Subject lines should describe the change itself. + +**How to apply:** Lead with the actual action (e.g. "require token_dtype in NVTEEpGroupConfig and enforce at dispatch", "consolidate EP tests into single test_ep.cu"). Path/file context implicitly identifies the subsystem. diff --git a/qa/L1_cpp_distributed/test.sh b/qa/L1_cpp_distributed/test.sh index 8d767a4efb..7e5ce2cf0d 100755 --- a/qa/L1_cpp_distributed/test.sh +++ b/qa/L1_cpp_distributed/test.sh @@ -14,4 +14,7 @@ if [[ $(nvidia-smi --list-gpus | wc -l) -ge 4 ]]; then cmake -GNinja -S. -Bbuild cmake --build build mpirun --allow-run-as-root --np 4 --oversubscribe ./build/test_comm_gemm + + # EP suites; runner self-skips on pre-Hopper GPUs. + bash ./run_test_ep.sh 4 ./build fi diff --git a/setup.py b/setup.py index ec277b6349..34a3abfd99 100644 --- a/setup.py +++ b/setup.py @@ -83,6 +83,34 @@ def setup_common_extension() -> CMakeExtension: cusolvermp_dir = os.getenv("CUSOLVERMP_HOME", "/usr") cmake_flags.append(f"-DCUSOLVERMP_DIR={cusolvermp_dir}") + # NCCL EP: on by default; auto-disabled if no arch >= 90. + # Set NVTE_BUILD_WITH_NCCL_EP=0/1 to force off/on. + nccl_ep_env = os.getenv("NVTE_BUILD_WITH_NCCL_EP") + explicit_nccl_ep = nccl_ep_env is not None + build_with_nccl_ep = bool(int(nccl_ep_env)) if explicit_nccl_ep else True + + if build_with_nccl_ep: + arch_tokens = [a.strip() for a in str(archs or "").split(";") if a.strip()] + has_hopper_or_newer = any(t.lower() == "native" for t in arch_tokens) or any( + int(t.rstrip("af")) >= 90 for t in arch_tokens if t.rstrip("af").isdigit() + ) + if not has_hopper_or_newer: + if explicit_nccl_ep: + raise RuntimeError( + "NVTE_BUILD_WITH_NCCL_EP=1 requires at least one CUDA arch >= 90 in " + f"NVTE_CUDA_ARCHS (got '{archs}'). Add '90' or unset NVTE_BUILD_WITH_NCCL_EP." + ) + print( + "[NCCL EP] No CUDA arch >= 90 in NVTE_CUDA_ARCHS" + f" ('{archs}'); auto-disabling NCCL EP (nvte_ep_* will throw at runtime)." + ) + build_with_nccl_ep = False + + if build_with_nccl_ep: + build_nccl_ep_submodule() + else: + cmake_flags.append("-DNVTE_WITH_NCCL_EP=OFF") + # Add custom CMake arguments from environment variable nvte_cmake_extra_args = os.getenv("NVTE_CMAKE_EXTRA_ARGS") if nvte_cmake_extra_args: @@ -128,6 +156,109 @@ def setup_requirements() -> Tuple[List[str], List[str]]: return [remove_dups(reqs) for reqs in [install_reqs, test_reqs]] +def _discover_nccl_home() -> str: + """Resolve NCCL_HOME: honor env var, else probe well-known prefixes, else ldconfig.""" + env_home = os.environ.get("NCCL_HOME") + if env_home: + if (Path(env_home) / "include" / "nccl.h").exists(): + return env_home + print( + f"[NCCL EP] WARNING: NCCL_HOME='{env_home}' is set but " + f"'{env_home}/include/nccl.h' was not found; falling back to system probes." + ) + + lib_names = ("libnccl.so", "libnccl.so.2") + # Include Debian/Ubuntu multiarch subdirs (e.g. lib/aarch64-linux-gnu). + lib_subdirs = ("lib", "lib64", "lib/aarch64-linux-gnu", "lib/x86_64-linux-gnu") + for cand in ("/opt/nvidia/nccl", "/usr/local/nccl", "/usr"): + p = Path(cand) + if (p / "include" / "nccl.h").exists() and any( + (p / sub / name).exists() for sub in lib_subdirs for name in lib_names + ): + return str(p) + + try: + out = subprocess.check_output(["ldconfig", "-p"], stderr=subprocess.DEVNULL).decode() + for line in out.splitlines(): + if "libnccl.so" in line and "=>" in line: + lib_path = Path(line.split("=>")[-1].strip()) + # Walk upward so multiarch layouts (.../lib//libnccl.so) + # resolve to the prefix that contains include/nccl.h. + for root in (lib_path.parent.parent, lib_path.parent.parent.parent): + if (root / "include" / "nccl.h").exists(): + return str(root) + except (subprocess.CalledProcessError, FileNotFoundError): + pass + + raise RuntimeError( + "Could not locate NCCL core (nccl.h + libnccl.so). Set NCCL_HOME to the install prefix." + ) + + +def build_nccl_ep_submodule() -> str: + """Build libnccl_ep.so from the 3rdparty/nccl submodule. + + NCCL EP is on by default; the system NCCL core (libnccl.so) supplies the + headers and runtime symbols. Returns the submodule build directory. + """ + nccl_root = current_file_path / "3rdparty" / "nccl" + if not (nccl_root / "Makefile").exists(): + raise RuntimeError( + f"NCCL submodule not found at {nccl_root}. " + "Run `git submodule update --init --recursive`." + ) + + build_dir = nccl_root / "build" + nccl_ep_lib = build_dir / "lib" / "libnccl_ep.so" + + archs = cuda_archs() or "90" + arch_list = [] + for a in str(archs).split(";"): + a = a.strip().rstrip("af") + if a and a.isdigit() and int(a) >= 90: + arch_list.append(a) + if not arch_list: + arch_list = ["90"] + gencode = " ".join(f"-gencode=arch=compute_{a},code=sm_{a}" for a in arch_list) + + nproc = os.cpu_count() or 8 + env = os.environ.copy() + env["NVCC_GENCODE"] = gencode + # NCCL EP needs the core NCCL headers + libnccl.so; write NCCL EP build + # outputs to the submodule's local build/ tree. + nccl_home = _discover_nccl_home() + env["NCCL_HOME"] = nccl_home + env["NCCL_EP_BUILDDIR"] = str(build_dir) + + if not nccl_ep_lib.exists(): + print(f"[NCCL EP] Building libnccl_ep.so (gencode='{gencode}')") + subprocess.check_call( + ["make", "-j", str(nproc), "-C", "contrib/nccl_ep", "lib"], + cwd=str(nccl_root), + env=env, + ) + + # TE's CMake expects nccl.h under 3rdparty/nccl/build/include/ for its + # version check. Mirror the top-level host headers from the system NCCL + # install — DON'T mirror nccl_device/ because the submodule ships its own + # newer copy at src/include/nccl_device/ with device-side templates that + # conflict with older system versions, and the JIT include path picks the + # submodule's. + nccl_include = build_dir / "include" + nccl_include.mkdir(parents=True, exist_ok=True) + for cand in (Path(nccl_home) / "include", Path("/usr/include")): + p = Path(cand) + if (p / "nccl.h").exists(): + for name in ("nccl.h", "nccl_net.h", "nccl_tuner.h"): + src = p / name + dst = nccl_include / name + if src.exists() and not dst.exists(): + dst.symlink_to(src) + break + + return str(build_dir) + + def git_check_submodules() -> None: """ Attempt to checkout git submodules automatically during setup. diff --git a/tests/cpp_distributed/CMakeLists.txt b/tests/cpp_distributed/CMakeLists.txt index 0d7258a81d..7dd8ea33e7 100644 --- a/tests/cpp_distributed/CMakeLists.txt +++ b/tests/cpp_distributed/CMakeLists.txt @@ -46,12 +46,84 @@ add_executable(test_comm_gemm find_package(OpenMP REQUIRED) find_package(MPI REQUIRED) + +# ── NCCL library ────────────────────────────────────────────────────────────── +# Search order: NCCL_HOME env → 3rdparty/nccl submodule build → system paths. +set(NCCL_SUBMODULE_BUILD "${CMAKE_CURRENT_SOURCE_DIR}/../../3rdparty/nccl/build") find_library(NCCL_LIB NAMES nccl libnccl - PATH_SUFFIXES lib + HINTS $ENV{NCCL_HOME}/lib ${NCCL_SUBMODULE_BUILD}/lib + PATH_SUFFIXES lib lib64 REQUIRED) + +# NCCL headers: prefer submodule build output (has the handle_init API), +# then submodule src, then system (CUDA toolkit). +set(NCCL_SUBMODULE_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/../../3rdparty/nccl/build/include") +set(NCCL_SUBMODULE_SRC_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/../../3rdparty/nccl/src/include") +if(EXISTS "${NCCL_SUBMODULE_INCLUDE}/nccl.h") + set(NCCL_INCLUDE_DIR "${NCCL_SUBMODULE_INCLUDE}") +elseif(EXISTS "${NCCL_SUBMODULE_SRC_INCLUDE}/nccl.h") + set(NCCL_INCLUDE_DIR "${NCCL_SUBMODULE_SRC_INCLUDE}") +elseif(DEFINED ENV{NCCL_HOME}) + set(NCCL_INCLUDE_DIR "$ENV{NCCL_HOME}/include") +endif() target_include_directories(test_comm_gemm PRIVATE ${MPI_CXX_INCLUDE_PATH} $ENV{CUBLASMP_HOME}/include) target_link_libraries(test_comm_gemm PUBLIC CUDA::cuda_driver CUDA::cudart GTest::gtest ${TE_LIB} CUDA::nvrtc MPI::MPI_CXX ${NCCL_LIB} OpenMP::OpenMP_CXX) include(GoogleTest) gtest_discover_tests(test_comm_gemm DISCOVERY_TIMEOUT 600) + +# ── EP distributed tests ────────────────────────────────────────────────────── +# Launched via mpirun; ncclUniqueId exchange uses MPI_Bcast (see test_ep_common.h). +# Headers + libs come from the in-tree 3rdparty/nccl submodule build. +set(NCCL_EP_SUBMODULE_ROOT + "${CMAKE_CURRENT_SOURCE_DIR}/../../3rdparty/nccl") +find_library(NCCL_EP_LIB + NAMES nccl_ep libnccl_ep + HINTS ${NCCL_EP_SUBMODULE_ROOT}/build/lib + NO_DEFAULT_PATH + REQUIRED) + +set(NCCL_EP_INCLUDE_DIR "${NCCL_EP_SUBMODULE_ROOT}/contrib/nccl_ep/include") +if(NOT EXISTS "${NCCL_EP_INCLUDE_DIR}/nccl_ep.h") + message(FATAL_ERROR + "NCCL EP header not found at ${NCCL_EP_INCLUDE_DIR}/nccl_ep.h. " + "Run `git submodule update --init --recursive` to checkout 3rdparty/nccl.") +endif() +message(STATUS "EP test: NCCL EP headers: ${NCCL_EP_INCLUDE_DIR}") + +# Collect NCCL include dirs shared by all EP test targets (nccl_ep.h + nccl.h). +set(EP_TEST_NCCL_INCLUDES ${NCCL_EP_INCLUDE_DIR}) +if(DEFINED NCCL_INCLUDE_DIR) + list(APPEND EP_TEST_NCCL_INCLUDES ${NCCL_INCLUDE_DIR}) + message(STATUS "EP test: NCCL headers: ${NCCL_INCLUDE_DIR}") +endif() + +set(EP_TEST_COMMON_INCLUDES + ${EP_TEST_NCCL_INCLUDES} + ${MPI_CXX_INCLUDE_PATH} + ../../transformer_engine/common/include + ../../transformer_engine/common + ${CMAKE_CURRENT_SOURCE_DIR}) + +# nvrtc must follow TE_LIB so symbols referenced from libtransformer_engine.so +# (loaded via dlopen in Python; not in its DT_NEEDED) resolve through nvrtc. +set(EP_TEST_COMMON_LIBS + CUDA::cuda_driver + CUDA::cudart + GTest::gtest + ${TE_LIB} + CUDA::nvrtc + ${NCCL_LIB} + ${NCCL_EP_LIB} + MPI::MPI_CXX + OpenMP::OpenMP_CXX) + +# ── EP distributed tests (per-op + full pipeline + zero-copy symm) ─────────── +add_executable(test_ep test_ep.cu ../cpp/test_common.cu) +target_include_directories(test_ep PRIVATE ${EP_TEST_COMMON_INCLUDES}) +target_link_libraries(test_ep PUBLIC ${EP_TEST_COMMON_LIBS}) + +# Do NOT use gtest_discover_tests — these binaries require multi-process +# launch via run_test_ep.sh, not direct single-process execution. +message(STATUS "EP distributed tests enabled: ${NCCL_EP_LIB}") diff --git a/tests/cpp_distributed/run_test_ep.sh b/tests/cpp_distributed/run_test_ep.sh new file mode 100755 index 0000000000..13e86fa02d --- /dev/null +++ b/tests/cpp_distributed/run_test_ep.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash +# Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# See LICENSE for license information. +# +# Run TE EP distributed unit tests via mpirun. Each MPI rank pins to one GPU +# (rank % device_count) and exchanges ncclUniqueId through MPI_Bcast. +# +# Usage: +# bash run_test_ep.sh [num_gpus] [build_dir] +# +# Defaults: +# num_gpus = number of GPUs visible to nvidia-smi +# build_dir = /build +# +# Environment variables: +# GTEST_FILTER — forwarded to all processes (e.g., "EPPipelineTest.*") +# MPIRUN — override the mpirun binary (default: mpirun) +# MPIRUN_EXTRA — extra flags forwarded to mpirun + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +BUILD_DIR="${2:-${SCRIPT_DIR}/build}" +NUM_GPUS="${1:-$(nvidia-smi -L 2>/dev/null | wc -l)}" +MPIRUN="${MPIRUN:-mpirun}" + +# Skip cleanly on pre-Hopper: NCCL EP requires SM>=90. +MIN_SM=$(nvidia-smi --query-gpu=compute_cap --format=csv,noheader 2>/dev/null \ + | awk -F. 'NR==1 || ($1*10+$2) 0 && MIN_SM < 90 )); then + echo "NCCL EP requires SM>=90 (lowest visible GPU is SM${MIN_SM}); SKIPPING." + exit 0 +fi + +TEST_BIN="${BUILD_DIR}/test_ep" +if [[ ! -x "${TEST_BIN}" ]]; then + echo "ERROR: binary not found: ${TEST_BIN}" + echo "Build: cd ${SCRIPT_DIR} && mkdir -p build && cd build && cmake .. && make" + exit 1 +fi + +if (( NUM_GPUS < 2 )); then + echo "EP Tests: requires at least 2 GPUs, found ${NUM_GPUS}. Skipping." + exit 0 +fi + +GTEST_ARGS="${GTEST_FILTER:+--gtest_filter=${GTEST_FILTER}}" + +echo "=== EP Tests ===" +echo " GPUs: ${NUM_GPUS} Binary: ${TEST_BIN}" +echo + +"${MPIRUN}" -n "${NUM_GPUS}" ${MPIRUN_EXTRA:-} "${TEST_BIN}" ${GTEST_ARGS} diff --git a/tests/cpp_distributed/test_ep.cu b/tests/cpp_distributed/test_ep.cu new file mode 100644 index 0000000000..bcf4ca3c98 --- /dev/null +++ b/tests/cpp_distributed/test_ep.cu @@ -0,0 +1,805 @@ +/************************************************************************* + * Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See LICENSE for license information. + ************************************************************************/ + +/* + * EP pipeline tests: smallest-scope first. + * + * EPDispatchTest/PrepareAndDispatch — exact recv values + per-expert counts + * EPCombineTest/Combine — round-trip: out == top_k * tokens + * EPCombineBwdTest/CombineBwdCheck — exact grad_expert values + * EPDispatchBwdTest/DispatchBwdCheck — exact grad_tokens + * EPDispatchBwdGradWeightsTest/RoundTrip — exact per-(t, k) grad_topk_weights + * EPPipelineTest/FullForwardBackward — fwd + bwd NaN/Inf check + * + * Routing: token t on rank r → expert (r * num_local_experts + t * top_k + k) % num_experts + * Token values: rank r, token t → all hidden dims = (r+1)*0.01 + t*0.001 + * + * Closed-form expected values: + * dispatch recv: multiset of source-token values routed to this rank's experts + * combine: result[t] == top_k * tokens[t] + * combine_bwd: grad_expert[slot] == d_result[t] (no weighting) + * dispatch_bwd: grad_tokens[t] == top_k * d_result[t] + */ + +#include "test_ep_common.h" + +#include +#include +#include +#include + +// ── Deterministic routing helpers ───────────────────────────────────────────── + +// Token value for (rank, t): (rank * num_tokens + t + 1) / 256. Step 1/256 is +// bf16-exact and unique across (rank, t) when rank * num_tokens + t < 256. +static inline float token_value(int rank, int t, int num_tokens) { + return static_cast(rank * num_tokens + t + 1) * (1.0f / 256.0f); +} + +// Per-element host-side conversion helpers used by templated test code. +inline float tok_to_float(nv_bfloat16 v) { return __bfloat162float(v); } +inline float tok_to_float(__half v) { return __half2float(v); } +inline float tok_to_float(float v) { return v; } + +template T tok_from_float(float v); +template <> inline nv_bfloat16 tok_from_float(float v) { return __float2bfloat16(v); } +template <> inline __half tok_from_float<__half> (float v) { return __float2half(v); } +template <> inline float tok_from_float (float v) { return v; } + +template +static std::vector generate_tokens(int rank, int num_tokens, int hidden_dim) { + std::vector v(num_tokens * hidden_dim); + for (int t = 0; t < num_tokens; ++t) { + T val = tok_from_float(token_value(rank, t, num_tokens)); + for (int h = 0; h < hidden_dim; ++h) + v[t * hidden_dim + h] = val; + } + return v; +} + +static std::vector expected_token_counts( + int recv_rank, int num_processes, int num_tokens, int top_k, + int num_experts, int num_local_experts) { + int base = recv_rank * num_local_experts; + std::vector cnt(num_local_experts, 0); + for (int src = 0; src < num_processes; ++src) { + auto idx = routing_balanced(src, num_tokens, top_k, num_experts, num_local_experts); + for (int t = 0; t < num_tokens; ++t) + for (int k = 0; k < top_k; ++k) { + int64_t e = idx[t * top_k + k]; + if (e >= base && e < base + num_local_experts) ++cnt[e - base]; + } + } + return cnt; +} + +static std::vector expected_recv_values_sorted( + int recv_rank, int num_processes, int num_tokens, int top_k, + int num_experts, int num_local_experts) { + int base = recv_rank * num_local_experts; + std::vector vals; + for (int src = 0; src < num_processes; ++src) { + auto idx = routing_balanced(src, num_tokens, top_k, num_experts, num_local_experts); + for (int t = 0; t < num_tokens; ++t) + for (int k = 0; k < top_k; ++k) { + int64_t e = idx[t * top_k + k]; + if (e >= base && e < base + num_local_experts) { + float raw = token_value(src, t, num_tokens); + vals.push_back(__bfloat162float(__float2bfloat16(raw))); + } + } + } + std::sort(vals.begin(), vals.end()); + return vals; +} + +// 2^-5 relative tolerance for BF16 (matches mantissa precision with margin), +// plus a small atol floor for near-zero expected values. +static constexpr float kBf16Rtol = 1.0f / 32.0f; +static constexpr float kBf16Atol = 1e-3f; +static float bf16_tol(float magnitude) { + return kBf16Atol + kBf16Rtol * std::fabs(magnitude); +} + +template +static bool check_no_nan_inf(const T* dev, int count, const char* name) { + std::vector h(count); + cudaMemcpy(h.data(), dev, count * sizeof(T), cudaMemcpyDeviceToHost); + for (int i = 0; i < count; ++i) { + float v = tok_to_float(h[i]); + if (std::isnan(v) || std::isinf(v)) { + fprintf(stderr, "Rank %d: %s in %s[%d]\n", + g_process_id, std::isnan(v) ? "NaN" : "Inf", name, i); + return false; + } + } + return true; +} + +// ── Forward buffer set with RAII ────────────────────────────────────────────── + +template +struct EPBuffers { + // Forward + DevBuf topk_idx; + DevBuf topk_weights; + DevBuf tokens; + DevBuf token_counts; + DevBuf handle_mem; + DevBuf recv_tokens; + DevBuf recv_topk_weights; + DevBuf result; + // Backward + DevBuf grad_result; + DevBuf grad_expert; + DevBuf grad_tokens; + DevBuf g_recv_topk_weights; + DevBuf grad_topk_weights; + + uint64_t handle_id = 0; + size_t handle_mem_size = 0; + size_t recv_capacity = 0; + int top_k_ = 0; + + void alloc(int num_tokens, int top_k, int hidden_dim, int num_local_experts, + int ep_size, int max_tokens_per_rank, size_t alignment = 0) { + top_k_ = top_k; + recv_capacity = static_cast(ep_size) * max_tokens_per_rank * 2; + + topk_idx.alloc(num_tokens * top_k); + topk_weights.alloc(num_tokens * top_k); + tokens.alloc(num_tokens * hidden_dim); + token_counts.alloc(num_local_experts); + recv_tokens.alloc(recv_capacity * hidden_dim); + recv_topk_weights.alloc(recv_capacity); + result.alloc(num_tokens * hidden_dim); + + NVTEEpLayerConfig cfg{num_local_experts, top_k, alignment}; + handle_id = nvte_ep_register_layer(cfg, &handle_mem_size); + handle_mem.alloc(handle_mem_size); + + grad_result.alloc(num_tokens * hidden_dim); + grad_expert.alloc(recv_capacity * hidden_dim); + grad_tokens.alloc(num_tokens * hidden_dim); + g_recv_topk_weights.alloc(recv_capacity); + grad_topk_weights.alloc(num_tokens * top_k); + } +}; + +// Bundled NVTETensor views over an EPBuffers, with the shapes the EP C API +// expects. +template +struct EPTensors { + TensorWrapper topk_idx, topk_weights, token_counts, handle_mem, tokens; + TensorWrapper recv_tokens, recv_topk_weights, result; + TensorWrapper grad_result, grad_expert, grad_tokens; + TensorWrapper g_recv_topk_weights, grad_topk_weights; + + EPTensors(EPBuffers& b, int num_tokens, int top_k, int hidden_dim, + int num_local_experts) { + constexpr DType kTokDType = test::TypeInfo::dtype; + using Shape = std::vector; + topk_idx = TensorWrapper(b.topk_idx.get(), + Shape{(size_t)num_tokens, (size_t)top_k}, DType::kInt64); + topk_weights = TensorWrapper(b.topk_weights.get(), + Shape{(size_t)num_tokens, (size_t)top_k}, DType::kFloat32); + token_counts = TensorWrapper(b.token_counts.get(), + Shape{(size_t)num_local_experts}, DType::kInt32); + handle_mem = TensorWrapper(b.handle_mem.get(), + Shape{b.handle_mem_size}, DType::kByte); + tokens = TensorWrapper(b.tokens.get(), + Shape{(size_t)num_tokens, (size_t)hidden_dim}, kTokDType); + recv_tokens = TensorWrapper(b.recv_tokens.get(), + Shape{b.recv_capacity, (size_t)hidden_dim}, kTokDType); + recv_topk_weights = TensorWrapper(b.recv_topk_weights.get(), + Shape{b.recv_capacity}, DType::kFloat32); + result = TensorWrapper(b.result.get(), + Shape{(size_t)num_tokens, (size_t)hidden_dim}, kTokDType); + grad_result = TensorWrapper(b.grad_result.get(), + Shape{(size_t)num_tokens, (size_t)hidden_dim}, kTokDType); + grad_expert = TensorWrapper(b.grad_expert.get(), + Shape{b.recv_capacity, (size_t)hidden_dim}, kTokDType); + grad_tokens = TensorWrapper(b.grad_tokens.get(), + Shape{(size_t)num_tokens, (size_t)hidden_dim}, kTokDType); + g_recv_topk_weights = TensorWrapper(b.g_recv_topk_weights.get(), + Shape{b.recv_capacity}, DType::kFloat32); + grad_topk_weights = TensorWrapper(b.grad_topk_weights.get(), + Shape{(size_t)num_tokens, (size_t)top_k}, DType::kFloat32); + } +}; + +// ── Shared fixture base ─────────────────────────────────────────────────────── + +class EpOpTestBase : public ::testing::Test { + protected: + int ep_size_, num_experts_, num_local_experts_, hidden_dim_; + int max_tokens_per_rank_, top_k_, num_tokens_; + + void SetUp() override { + if (g_sm_major < 9) + GTEST_SKIP() << "EP requires SM_90+ (device is SM_" << g_sm_major << "0)"; + ASSERT_GE(g_num_processes, 2); + ASSERT_TRUE(g_ep_initialized); + + ep_size_ = g_ep_size; + num_experts_ = g_num_experts; + num_local_experts_ = num_experts_ / ep_size_; + hidden_dim_ = g_hidden_dim; + max_tokens_per_rank_ = g_max_tokens_per_rank; + top_k_ = 2; + num_tokens_ = 32; + } + + template + void upload_inputs(EPBuffers& buf, int rank = -1) { + if (rank < 0) rank = g_process_id; + auto h_idx = routing_balanced(rank, num_tokens_, top_k_, + num_experts_, num_local_experts_); + std::vector h_w(num_tokens_ * top_k_, 1.0f / top_k_); + auto h_tok = generate_tokens(rank, num_tokens_, hidden_dim_); + + NVTE_CHECK_CUDA(cudaMemcpy(buf.topk_idx.get(), h_idx.data(), + h_idx.size() * sizeof(int64_t), cudaMemcpyHostToDevice)); + NVTE_CHECK_CUDA(cudaMemcpy(buf.topk_weights.get(), h_w.data(), + h_w.size() * sizeof(float), cudaMemcpyHostToDevice)); + NVTE_CHECK_CUDA(cudaMemcpy(buf.tokens.get(), h_tok.data(), + h_tok.size() * sizeof(T), cudaMemcpyHostToDevice)); + } + + NVTEEpLayerConfig layer_config(size_t alignment = 0) const { + return NVTEEpLayerConfig{num_local_experts_, top_k_, alignment}; + } + + // NVTE_CHECK_CUDA (fprintf+exit) so this non-void helper stays legal. + template + int read_total_recv(const EPBuffers& buf) const { + std::vector cnt(num_local_experts_); + NVTE_CHECK_CUDA(cudaMemcpy(cnt.data(), buf.token_counts.get(), + num_local_experts_ * sizeof(int32_t), cudaMemcpyDeviceToHost)); + int total = 0; + for (int c : cnt) total += c; + return total; + } +}; + +// ============================================================================= +// EPDispatchTest: exact recv values and per-expert counts. +// ============================================================================= + +class EPDispatchTest : public EpOpTestBase {}; + +TEST_F(EPDispatchTest, PrepareAndDispatch) { + EPBuffers<> buf; + buf.alloc(num_tokens_, top_k_, hidden_dim_, num_local_experts_, + ep_size_, max_tokens_per_rank_); + upload_inputs(buf); + EPTensors<> t(buf, num_tokens_, top_k_, hidden_dim_, num_local_experts_); + + NVTE_CHECK_CUDA(cudaMemset(buf.recv_tokens.get(), 0, buf.recv_tokens.bytes())); + + cudaStream_t stream; + NVTE_CHECK_CUDA(cudaStreamCreate(&stream)); + + uint64_t handle_id = buf.handle_id; + ASSERT_NO_THROW(nvte_ep_prepare(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.topk_idx.data(), t.token_counts.data(), /*alignment=*/0, stream)); + ASSERT_NO_THROW(nvte_ep_dispatch(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.topk_idx.data(), + t.tokens.data(), NVTECommWindow{}, t.topk_weights.data(), + NVTECommWindow{}, t.recv_tokens.data(), NVTECommWindow{}, + t.recv_topk_weights.data(), NVTECommWindow{}, stream)); + NVTE_CHECK_CUDA(cudaStreamSynchronize(stream)); + + // 1. Per-expert counts. + std::vector got_counts(num_local_experts_); + NVTE_CHECK_CUDA(cudaMemcpy(got_counts.data(), buf.token_counts.get(), + num_local_experts_ * sizeof(int32_t), cudaMemcpyDeviceToHost)); + auto exp_counts = expected_token_counts(g_process_id, g_num_processes, num_tokens_, top_k_, + num_experts_, num_local_experts_); + int total_recv = 0; + for (int i = 0; i < num_local_experts_; ++i) { + EXPECT_EQ(got_counts[i], exp_counts[i]) << "local expert " << i; + total_recv += exp_counts[i]; + } + ASSERT_LE(total_recv, static_cast(buf.recv_capacity)) + << "total_recv exceeded recv_capacity — overflow would corrupt downstream memory"; + + // 2. Recv values: read only the filled prefix per local-expert zone, not the + // whole recv buffer — avoids false positives from legitimate-zero token values. + std::vector h_recv(buf.recv_capacity * hidden_dim_); + NVTE_CHECK_CUDA(cudaMemcpy(h_recv.data(), buf.recv_tokens.get(), + h_recv.size() * sizeof(nv_bfloat16), cudaMemcpyDeviceToHost)); + + std::vector got_vals; + got_vals.reserve(total_recv); + size_t slot = 0; + for (int e = 0; e < num_local_experts_; ++e) { + for (int i = 0; i < got_counts[e]; ++i) { + got_vals.push_back(__bfloat162float(h_recv[slot * hidden_dim_])); + ++slot; + } + } + std::sort(got_vals.begin(), got_vals.end()); + + auto exp_vals = expected_recv_values_sorted(g_process_id, g_num_processes, num_tokens_, + top_k_, num_experts_, num_local_experts_); + + ASSERT_EQ(got_vals.size(), exp_vals.size()); + for (size_t i = 0; i < exp_vals.size(); ++i) + EXPECT_NEAR(got_vals[i], exp_vals[i], bf16_tol(exp_vals[i])) + << "recv value mismatch at sorted index " << i; + + // 3. recv_topk_weights: every filled slot must equal the per-token weight (1/top_k). + std::vector h_w(buf.recv_capacity); + NVTE_CHECK_CUDA(cudaMemcpy(h_w.data(), buf.recv_topk_weights.get(), + h_w.size() * sizeof(float), cudaMemcpyDeviceToHost)); + const float exp_w = 1.0f / static_cast(top_k_); + for (int i = 0; i < total_recv; ++i) + EXPECT_NEAR(h_w[i], exp_w, 1e-6f) << "recv_topk_weights[" << i << "]"; + + if (g_process_id == 0) + printf(" PrepareAndDispatch: passed (recv=%d, values + weights exact)\n", total_recv); + + NVTE_CHECK_CUDA(cudaStreamDestroy(stream)); +} + +// ============================================================================= +// EPCombineTest: round-trip identity expert → result == top_k * tokens. +// ============================================================================= + +class EPCombineTest : public EpOpTestBase {}; + +TEST_F(EPCombineTest, Combine) { + EPBuffers<> buf; + buf.alloc(num_tokens_, top_k_, hidden_dim_, num_local_experts_, + ep_size_, max_tokens_per_rank_); + upload_inputs(buf); + EPTensors<> t(buf, num_tokens_, top_k_, hidden_dim_, num_local_experts_); + + cudaStream_t stream; + NVTE_CHECK_CUDA(cudaStreamCreate(&stream)); + + uint64_t handle_id = buf.handle_id; + ASSERT_NO_THROW(nvte_ep_prepare(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.topk_idx.data(), t.token_counts.data(), /*alignment=*/0, stream)); + ASSERT_NO_THROW(nvte_ep_dispatch(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.topk_idx.data(), + t.tokens.data(), NVTECommWindow{}, t.topk_weights.data(), + NVTECommWindow{}, t.recv_tokens.data(), NVTECommWindow{}, + t.recv_topk_weights.data(), NVTECommWindow{}, stream)); + ASSERT_NO_THROW(nvte_ep_combine(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.recv_tokens.data(), NVTECommWindow{}, + t.result.data(), stream)); + NVTE_CHECK_CUDA(cudaStreamSynchronize(stream)); + + std::vector h_result(num_tokens_ * hidden_dim_); + NVTE_CHECK_CUDA(cudaMemcpy(h_result.data(), buf.result.get(), + h_result.size() * sizeof(nv_bfloat16), cudaMemcpyDeviceToHost)); + auto h_tok = generate_tokens(g_process_id, num_tokens_, hidden_dim_); + for (int tok = 0; tok < num_tokens_; ++tok) { + float exp = __bfloat162float(h_tok[tok * hidden_dim_]) * static_cast(top_k_); + for (int p = 0; p < hidden_dim_; ++p) { + float got = __bfloat162float(h_result[tok * hidden_dim_ + p]); + EXPECT_NEAR(got, exp, bf16_tol(exp)) + << "token " << tok << " rank " << g_process_id << " hidden " << p; + } + } + + if (g_process_id == 0) + printf(" Combine: passed (result == top_k * tokens)\n"); + + NVTE_CHECK_CUDA(cudaStreamDestroy(stream)); +} + +// ============================================================================= +// EPCombineBwdTest: filled slots in grad_expert == d_result (unweighted). +// ============================================================================= + +class EPCombineBwdTest : public EpOpTestBase {}; + +TEST_F(EPCombineBwdTest, CombineBwdCheck) { + EPBuffers<> buf; + buf.alloc(num_tokens_, top_k_, hidden_dim_, num_local_experts_, + ep_size_, max_tokens_per_rank_); + upload_inputs(buf); + EPTensors<> t(buf, num_tokens_, top_k_, hidden_dim_, num_local_experts_); + + cudaStream_t stream; + NVTE_CHECK_CUDA(cudaStreamCreate(&stream)); + + uint64_t handle_id = buf.handle_id; + ASSERT_NO_THROW(nvte_ep_prepare(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.topk_idx.data(), t.token_counts.data(), /*alignment=*/0, stream)); + ASSERT_NO_THROW(nvte_ep_dispatch(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.topk_idx.data(), + t.tokens.data(), NVTECommWindow{}, t.topk_weights.data(), + NVTECommWindow{}, t.recv_tokens.data(), NVTECommWindow{}, + t.recv_topk_weights.data(), NVTECommWindow{}, stream)); + ASSERT_NO_THROW(nvte_ep_combine(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.recv_tokens.data(), NVTECommWindow{}, + t.result.data(), stream)); + + std::vector h_grad_r(num_tokens_ * hidden_dim_, __float2bfloat16(0.1f)); + NVTE_CHECK_CUDA(cudaMemcpyAsync(buf.grad_result.get(), h_grad_r.data(), + h_grad_r.size() * sizeof(nv_bfloat16), + cudaMemcpyHostToDevice, stream)); + NVTE_CHECK_CUDA(cudaMemsetAsync(buf.grad_expert.get(), 0, buf.grad_expert.bytes(), stream)); + + ASSERT_NO_THROW(nvte_ep_combine_bwd(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.grad_result.data(), NVTECommWindow{}, + t.grad_expert.data(), NVTECommWindow{}, stream)); + NVTE_CHECK_CUDA(cudaStreamSynchronize(stream)); + + int total_recv = read_total_recv(buf); + + std::vector cnt(num_local_experts_); + NVTE_CHECK_CUDA(cudaMemcpy(cnt.data(), buf.token_counts.get(), + num_local_experts_ * sizeof(int32_t), cudaMemcpyDeviceToHost)); + std::vector h_ge(buf.recv_capacity * hidden_dim_); + NVTE_CHECK_CUDA(cudaMemcpy(h_ge.data(), buf.grad_expert.get(), + h_ge.size() * sizeof(nv_bfloat16), cudaMemcpyDeviceToHost)); + + // Walk filled slots by per-expert zone (no v != 0 heuristic). + const float kExpGrad = 0.1f; + size_t slot = 0; + int filled = 0; + for (int e = 0; e < num_local_experts_; ++e) { + for (int i = 0; i < cnt[e]; ++i) { + for (int p = 0; p < hidden_dim_; ++p) { + float v = __bfloat162float(h_ge[slot * hidden_dim_ + p]); + EXPECT_NEAR(v, kExpGrad, bf16_tol(kExpGrad)) + << "grad_expert expert " << e << " slot " << i + << " (linear " << slot << ") hidden " << p; + } + ++filled; ++slot; + } + } + EXPECT_EQ(filled, total_recv); + + if (g_process_id == 0) + printf(" CombineBwdCheck: passed (filled=%d)\n", filled); + + NVTE_CHECK_CUDA(cudaStreamDestroy(stream)); +} + +// ============================================================================= +// EPDispatchBwdTest: grad_tokens == top_k * d_result. +// ============================================================================= + +class EPDispatchBwdTest : public EpOpTestBase {}; + +TEST_F(EPDispatchBwdTest, DispatchBwdCheck) { + EPBuffers<> buf; + buf.alloc(num_tokens_, top_k_, hidden_dim_, num_local_experts_, + ep_size_, max_tokens_per_rank_); + upload_inputs(buf); + EPTensors<> t(buf, num_tokens_, top_k_, hidden_dim_, num_local_experts_); + + cudaStream_t stream; + NVTE_CHECK_CUDA(cudaStreamCreate(&stream)); + + uint64_t handle_id = buf.handle_id; + ASSERT_NO_THROW(nvte_ep_prepare(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.topk_idx.data(), t.token_counts.data(), /*alignment=*/0, stream)); + ASSERT_NO_THROW(nvte_ep_dispatch(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.topk_idx.data(), + t.tokens.data(), NVTECommWindow{}, t.topk_weights.data(), + NVTECommWindow{}, t.recv_tokens.data(), NVTECommWindow{}, + t.recv_topk_weights.data(), NVTECommWindow{}, stream)); + ASSERT_NO_THROW(nvte_ep_combine(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.recv_tokens.data(), NVTECommWindow{}, + t.result.data(), stream)); + + std::vector h_grad(num_tokens_ * hidden_dim_, __float2bfloat16(0.1f)); + NVTE_CHECK_CUDA(cudaMemcpyAsync(buf.grad_result.get(), h_grad.data(), + h_grad.size() * sizeof(nv_bfloat16), + cudaMemcpyHostToDevice, stream)); + NVTE_CHECK_CUDA(cudaMemsetAsync(buf.grad_expert.get(), 0, buf.grad_expert.bytes(), stream)); + NVTE_CHECK_CUDA(cudaMemsetAsync(buf.g_recv_topk_weights.get(), 0, buf.g_recv_topk_weights.bytes(), stream)); + NVTE_CHECK_CUDA(cudaMemsetAsync(buf.grad_topk_weights.get(), 0, buf.grad_topk_weights.bytes(), stream)); + + ASSERT_NO_THROW(nvte_ep_combine_bwd(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.grad_result.data(), NVTECommWindow{}, + t.grad_expert.data(), NVTECommWindow{}, stream)); + ASSERT_NO_THROW(nvte_ep_dispatch_bwd(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.grad_expert.data(), NVTECommWindow{}, + t.g_recv_topk_weights.data(), NVTECommWindow{}, + t.grad_tokens.data(), t.grad_topk_weights.data(), stream)); + NVTE_CHECK_CUDA(cudaStreamSynchronize(stream)); + + std::vector h_gt(num_tokens_ * hidden_dim_); + NVTE_CHECK_CUDA(cudaMemcpy(h_gt.data(), buf.grad_tokens.get(), + h_gt.size() * sizeof(nv_bfloat16), cudaMemcpyDeviceToHost)); + const float kExpGrad = static_cast(top_k_) * 0.1f; + for (int tok = 0; tok < num_tokens_; ++tok) + for (int p = 0; p < hidden_dim_; ++p) + EXPECT_NEAR(__bfloat162float(h_gt[tok * hidden_dim_ + p]), kExpGrad, + bf16_tol(kExpGrad)) + << "grad_tokens token " << tok << " hidden " << p; + + if (g_process_id == 0) + printf(" DispatchBwdCheck: passed (grad_tokens == %.2f)\n", kExpGrad); + + NVTE_CHECK_CUDA(cudaStreamDestroy(stream)); +} + +// ============================================================================= +// EPDispatchBwdGradWeightsTest: round-trip per-(t, k) weights. +// ============================================================================= + +class EPDispatchBwdGradWeightsTest : public EpOpTestBase {}; + +TEST_F(EPDispatchBwdGradWeightsTest, RoundTrip) { + EPBuffers<> buf; + buf.alloc(num_tokens_, top_k_, hidden_dim_, num_local_experts_, + ep_size_, max_tokens_per_rank_); + upload_inputs(buf); + EPTensors<> t(buf, num_tokens_, top_k_, hidden_dim_, num_local_experts_); + + // Distinct per-(rank, t, k) weights so each slot carries a unique value. + std::vector h_w(num_tokens_ * top_k_); + for (int tok = 0; tok < num_tokens_; ++tok) + for (int k = 0; k < top_k_; ++k) + h_w[tok * top_k_ + k] = 0.1f + 0.01f * tok + 0.001f * k + + 0.0001f * (g_process_id + 1); + NVTE_CHECK_CUDA(cudaMemcpy(buf.topk_weights.get(), h_w.data(), + h_w.size() * sizeof(float), cudaMemcpyHostToDevice)); + + cudaStream_t stream; + NVTE_CHECK_CUDA(cudaStreamCreate(&stream)); + + uint64_t handle_id = buf.handle_id; + ASSERT_NO_THROW(nvte_ep_prepare(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.topk_idx.data(), t.token_counts.data(), /*alignment=*/0, stream)); + NVTE_CHECK_CUDA(cudaMemsetAsync(buf.recv_topk_weights.get(), 0, + buf.recv_topk_weights.bytes(), stream)); + ASSERT_NO_THROW(nvte_ep_dispatch(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.topk_idx.data(), + t.tokens.data(), NVTECommWindow{}, t.topk_weights.data(), + NVTECommWindow{}, t.recv_tokens.data(), NVTECommWindow{}, + t.recv_topk_weights.data(), NVTECommWindow{}, stream)); + + // Sentinel: NaN so any (t, k) the bwd kernel fails to write is immediately visible. + std::vector h_nan(num_tokens_ * top_k_, + std::numeric_limits::quiet_NaN()); + NVTE_CHECK_CUDA(cudaMemcpyAsync(buf.grad_topk_weights.get(), h_nan.data(), + h_nan.size() * sizeof(float), + cudaMemcpyHostToDevice, stream)); + NVTE_CHECK_CUDA(cudaMemsetAsync(buf.grad_expert.get(), 0, buf.grad_expert.bytes(), stream)); + + // g_recv_topk_weights := recv_topk_weights (the round-trip input). + auto g_recv_t = TensorWrapper(buf.recv_topk_weights.get(), + std::vector{buf.recv_capacity}, DType::kFloat32); + ASSERT_NO_THROW(nvte_ep_dispatch_bwd(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.grad_expert.data(), + NVTECommWindow{}, g_recv_t.data(), NVTECommWindow{}, + t.grad_tokens.data(), t.grad_topk_weights.data(), stream)); + NVTE_CHECK_CUDA(cudaStreamSynchronize(stream)); + + std::vector h_grad_w(num_tokens_ * top_k_); + NVTE_CHECK_CUDA(cudaMemcpy(h_grad_w.data(), buf.grad_topk_weights.get(), + h_grad_w.size() * sizeof(float), cudaMemcpyDeviceToHost)); + + const float kTol = 1e-5f; + int errs = 0, k0_eq_k1 = 0; + for (int tok = 0; tok < num_tokens_; ++tok) { + for (int k = 0; k < top_k_; ++k) { + float got = h_grad_w[tok * top_k_ + k]; + float exp = h_w[tok * top_k_ + k]; + if (std::isnan(got) || std::fabs(got - exp) > kTol) { + if (errs < 8) + fprintf(stderr, "Rank %d: grad_topk_weights[%d, %d]: got %.6f, expected %.6f\n", + g_process_id, tok, k, got, exp); + ++errs; + } + } + if (top_k_ >= 2 && + std::fabs(h_grad_w[tok * top_k_ + 0] - h_grad_w[tok * top_k_ + 1]) < 1e-7f) + ++k0_eq_k1; + } + EXPECT_EQ(errs, 0); + EXPECT_EQ(k0_eq_k1, 0) << "per-token-average regression: grad[t, 0] == grad[t, 1]"; + + if (g_process_id == 0 && errs == 0 && k0_eq_k1 == 0) + printf(" RoundTrip: passed (%d (t, k) gradients)\n", num_tokens_ * top_k_); + + NVTE_CHECK_CUDA(cudaStreamDestroy(stream)); +} + +// ============================================================================= +// Integrated FwdBwd: NaN/Inf check end-to-end. +// ============================================================================= + +class EPPipelineTest : public EpOpTestBase, public ::testing::WithParamInterface { + protected: + template + void run_full_forward_backward() { + EPBuffers buf; + buf.alloc(num_tokens_, top_k_, hidden_dim_, num_local_experts_, + ep_size_, max_tokens_per_rank_); + upload_inputs(buf); + EPTensors t(buf, num_tokens_, top_k_, hidden_dim_, num_local_experts_); + + cudaStream_t stream; + NVTE_CHECK_CUDA(cudaStreamCreate(&stream)); + + uint64_t handle_id = buf.handle_id; + ASSERT_NO_THROW(nvte_ep_prepare(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.topk_idx.data(), t.token_counts.data(), /*alignment=*/0, stream)); + ASSERT_NO_THROW(nvte_ep_dispatch(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.topk_idx.data(), + t.tokens.data(), NVTECommWindow{}, t.topk_weights.data(), + NVTECommWindow{}, t.recv_tokens.data(), NVTECommWindow{}, + t.recv_topk_weights.data(), NVTECommWindow{}, stream)); + ASSERT_NO_THROW(nvte_ep_combine(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.recv_tokens.data(), NVTECommWindow{}, + t.result.data(), stream)); + + std::vector h_grad(num_tokens_ * hidden_dim_, tok_from_float(0.1f)); + NVTE_CHECK_CUDA(cudaMemcpyAsync(buf.grad_result.get(), h_grad.data(), + h_grad.size() * sizeof(Tok), + cudaMemcpyHostToDevice, stream)); + NVTE_CHECK_CUDA(cudaMemsetAsync(buf.grad_expert.get(), 0, buf.grad_expert.bytes(), stream)); + NVTE_CHECK_CUDA(cudaMemsetAsync(buf.g_recv_topk_weights.get(), 0, buf.g_recv_topk_weights.bytes(), stream)); + NVTE_CHECK_CUDA(cudaMemsetAsync(buf.grad_topk_weights.get(), 0, buf.grad_topk_weights.bytes(), stream)); + + ASSERT_NO_THROW(nvte_ep_combine_bwd(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.grad_result.data(), NVTECommWindow{}, + t.grad_expert.data(), NVTECommWindow{}, stream)); + ASSERT_NO_THROW(nvte_ep_dispatch_bwd(NVTEEpHandle{handle_id, t.handle_mem.data()}, t.grad_expert.data(), NVTECommWindow{}, + t.g_recv_topk_weights.data(), NVTECommWindow{}, + t.grad_tokens.data(), t.grad_topk_weights.data(), stream)); + NVTE_CHECK_CUDA(cudaStreamSynchronize(stream)); + + ASSERT_TRUE(check_no_nan_inf(buf.result.get(), num_tokens_ * hidden_dim_, "result")); + ASSERT_TRUE(check_no_nan_inf(buf.grad_tokens.get(), num_tokens_ * hidden_dim_, "grad_tokens")); + + NVTE_CHECK_CUDA(cudaStreamDestroy(stream)); + } +}; + +TEST_P(EPPipelineTest, FullForwardBackward) { + const DType dtype = GetParam(); + // NCCL EP backend currently asserts ncclBfloat16 in ncclEpDispatch + // (contrib/nccl_ep/nccl_ep.cc); skip FP16/FP32 until the backend supports them. + if (dtype != DType::kBFloat16) { + GTEST_SKIP() << test::typeName(dtype) << " not yet supported by NCCL EP backend"; + } + switch (dtype) { + case DType::kBFloat16: run_full_forward_backward(); break; + case DType::kFloat16: run_full_forward_backward<__half> (); break; + case DType::kFloat32: run_full_forward_backward (); break; + default: FAIL() << "unsupported token dtype " << static_cast(dtype); + } + if (g_process_id == 0) + printf(" FullForwardBackward[%s]: passed\n", test::typeName(dtype).c_str()); +} + +INSTANTIATE_TEST_SUITE_P( + Dtypes, EPPipelineTest, + ::testing::Values(DType::kBFloat16, DType::kFloat16, DType::kFloat32), + [](const ::testing::TestParamInfo& info) { + return test::typeName(info.param); + }); + +// ============================================================================= +// EPZeroCopyTest: dispatch/combine with NCCL symmetric-memory windows attached +// to payload tensors (zero-copy fast path via ncclEpTensorCreateFromWindow). +// Symm-mem requirements per spec: input&output of Dispatch, input of Combine, +// input&output of Combine bwd, input of Dispatch bwd. +// ============================================================================= + +namespace { + +// Caller-owned ncclMemAlloc'd buffer with a registered symmetric window. +// Frees in destructor (deregister + ncclMemFree). Non-copyable, move-only. +struct SymmBuf { + void* ptr = nullptr; + size_t bytes = 0; + ncclWindow_t win = nullptr; + + SymmBuf() = default; + SymmBuf(const SymmBuf&) = delete; + SymmBuf& operator=(const SymmBuf&) = delete; + SymmBuf(SymmBuf&& o) noexcept : ptr(o.ptr), bytes(o.bytes), win(o.win) { + o.ptr = nullptr; o.win = nullptr; o.bytes = 0; + } + ~SymmBuf() { + if (win) ncclCommWindowDeregister(g_ep_comm, win); + if (ptr) ncclMemFree(ptr); + } + + void alloc(size_t n_bytes) { + bytes = n_bytes; + NVTE_CHECK_NCCL(ncclMemAlloc(&ptr, bytes)); + NVTE_CHECK_CUDA(cudaMemset(ptr, 0, bytes)); + NVTE_CHECK_NCCL(ncclCommWindowRegister(g_ep_comm, ptr, bytes, &win, + NCCL_WIN_COLL_SYMMETRIC)); + } +}; + +// Build an NVTECommWindow descriptor pointing at a SymmBuf's window (offset 0). +static inline NVTECommWindow symm_window(const SymmBuf& b) { + return NVTECommWindow{b.win, /*offset=*/0}; +} + +} // namespace + +class EPZeroCopyTest : public EpOpTestBase {}; + +// Identity round-trip with symm-mem on dispatch i/o + combine input. Bit-exact +// vs HBM reference (same routing, same input). +TEST_F(EPZeroCopyTest, IdentityAllSymm) { + // HBM reference run. + EPBuffers<> ref_buf; + ref_buf.alloc(num_tokens_, top_k_, hidden_dim_, num_local_experts_, + ep_size_, max_tokens_per_rank_); + upload_inputs(ref_buf); + EPTensors<> ref_t(ref_buf, num_tokens_, top_k_, hidden_dim_, num_local_experts_); + + cudaStream_t stream; + NVTE_CHECK_CUDA(cudaStreamCreate(&stream)); + + uint64_t ref_hid = ref_buf.handle_id; + ASSERT_NO_THROW(nvte_ep_prepare(NVTEEpHandle{ref_hid, ref_t.handle_mem.data()}, ref_t.topk_idx.data(), ref_t.token_counts.data(), /*alignment=*/0, stream)); + ASSERT_NO_THROW(nvte_ep_dispatch(NVTEEpHandle{ref_hid, ref_t.handle_mem.data()}, ref_t.topk_idx.data(), + ref_t.tokens.data(), NVTECommWindow{}, ref_t.topk_weights.data(), + NVTECommWindow{}, ref_t.recv_tokens.data(), NVTECommWindow{}, + ref_t.recv_topk_weights.data(), NVTECommWindow{}, stream)); + ASSERT_NO_THROW(nvte_ep_combine(NVTEEpHandle{ref_hid, ref_t.handle_mem.data()}, ref_t.recv_tokens.data(), NVTECommWindow{}, + ref_t.result.data(), stream)); + NVTE_CHECK_CUDA(cudaStreamSynchronize(stream)); + + std::vector ref_recv(ref_buf.recv_capacity * hidden_dim_); + std::vector ref_result(num_tokens_ * hidden_dim_); + NVTE_CHECK_CUDA(cudaMemcpy(ref_recv.data(), ref_buf.recv_tokens.get(), + ref_recv.size() * sizeof(nv_bfloat16), cudaMemcpyDeviceToHost)); + NVTE_CHECK_CUDA(cudaMemcpy(ref_result.data(), ref_buf.result.get(), + ref_result.size() * sizeof(nv_bfloat16), cudaMemcpyDeviceToHost)); + + // Symm-mem run: tokens, recv_tokens, combine_input (== recv_tokens) all symm. + EPBuffers<> sym_buf; // alloc all buffers except the symm ones. + sym_buf.alloc(num_tokens_, top_k_, hidden_dim_, num_local_experts_, + ep_size_, max_tokens_per_rank_); + upload_inputs(sym_buf); + + SymmBuf sym_tokens, sym_recv; + sym_tokens.alloc(num_tokens_ * hidden_dim_ * sizeof(nv_bfloat16)); + sym_recv .alloc(sym_buf.recv_capacity * hidden_dim_ * sizeof(nv_bfloat16)); + + // Stage same tokens into the symm-mem input. + auto h_tok = generate_tokens(g_process_id, num_tokens_, hidden_dim_); + NVTE_CHECK_CUDA(cudaMemcpy(sym_tokens.ptr, h_tok.data(), + h_tok.size() * sizeof(nv_bfloat16), cudaMemcpyHostToDevice)); + + EPTensors<> sym_t(sym_buf, num_tokens_, top_k_, hidden_dim_, num_local_experts_); + // Replace the tokens/recv_tokens views with ones pointing at the symm buffers. + sym_t.tokens = TensorWrapper(sym_tokens.ptr, + std::vector{(size_t)num_tokens_, (size_t)hidden_dim_}, DType::kBFloat16); + sym_t.recv_tokens = TensorWrapper(sym_recv.ptr, + std::vector{sym_buf.recv_capacity, (size_t)hidden_dim_}, DType::kBFloat16); + + uint64_t sym_hid = sym_buf.handle_id; + ASSERT_NO_THROW(nvte_ep_prepare(NVTEEpHandle{sym_hid, sym_t.handle_mem.data()}, sym_t.topk_idx.data(), sym_t.token_counts.data(), /*alignment=*/0, stream)); + ASSERT_NO_THROW(nvte_ep_dispatch(NVTEEpHandle{sym_hid, sym_t.handle_mem.data()}, sym_t.topk_idx.data(), + sym_t.tokens.data(), symm_window(sym_tokens), + sym_t.topk_weights.data(), NVTECommWindow{}, + sym_t.recv_tokens.data(), symm_window(sym_recv), + sym_t.recv_topk_weights.data(), NVTECommWindow{}, stream)); + ASSERT_NO_THROW(nvte_ep_combine(NVTEEpHandle{sym_hid, sym_t.handle_mem.data()}, sym_t.recv_tokens.data(), + symm_window(sym_recv), sym_t.result.data(), stream)); + NVTE_CHECK_CUDA(cudaStreamSynchronize(stream)); + + std::vector sym_recv_host(sym_buf.recv_capacity * hidden_dim_); + std::vector sym_result(num_tokens_ * hidden_dim_); + NVTE_CHECK_CUDA(cudaMemcpy(sym_recv_host.data(), sym_recv.ptr, + sym_recv_host.size() * sizeof(nv_bfloat16), cudaMemcpyDeviceToHost)); + NVTE_CHECK_CUDA(cudaMemcpy(sym_result.data(), sym_buf.result.get(), + sym_result.size() * sizeof(nv_bfloat16), cudaMemcpyDeviceToHost)); + + // Compare per filled recv slot (HBM ref vs symm) and full result. + int total_recv = read_total_recv(sym_buf); + for (int i = 0; i < total_recv * hidden_dim_; ++i) + ASSERT_EQ(__bfloat162float(sym_recv_host[i]), __bfloat162float(ref_recv[i])) + << "recv mismatch at " << i; + for (size_t i = 0; i < sym_result.size(); ++i) + ASSERT_EQ(__bfloat162float(sym_result[i]), __bfloat162float(ref_result[i])) + << "result mismatch at " << i; + + if (g_process_id == 0) + printf(" IdentityAllSymm: passed (recv_slots=%d, bit-exact vs HBM)\n", total_recv); + + NVTE_CHECK_CUDA(cudaStreamDestroy(stream)); +} + + +// ── main ────────────────────────────────────────────────────────────────────── + +int main(int argc, char* argv[]) { + if (!ep_bootstrap(argc, argv)) return 0; + int ret = RUN_ALL_TESTS(); + ep_teardown(); + return ret; +} diff --git a/tests/cpp_distributed/test_ep_common.h b/tests/cpp_distributed/test_ep_common.h new file mode 100644 index 0000000000..135a39416e --- /dev/null +++ b/tests/cpp_distributed/test_ep_common.h @@ -0,0 +1,184 @@ +/************************************************************************* + * Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See LICENSE for license information. + ************************************************************************/ + +/* + * Shared TE EP test infrastructure. Include once per TU; ep_bootstrap() in + * each test binary's main() populates process-level globals. + * Defaults: 4 experts/rank, hidden_dim=256, max_tokens_per_rank=64. + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include "../cpp/test_common.h" +#include "util/logging.h" + +using transformer_engine::DType; +using transformer_engine::TensorWrapper; + +#define CHECK_MPI(expr) \ + do { \ + int _err_mpi = (expr); \ + NVTE_CHECK(_err_mpi == MPI_SUCCESS, "MPI error: ", _err_mpi); \ + } while (false) + +// ── Process-level state ─────────────────────────────────────────────────────── + +static int g_process_id = -1; +static int g_num_processes = -1; + +static int g_sm_major = -1; // set by ep_bootstrap; -1 until then +static int g_ep_size = -1; +static int g_num_experts = -1; +static int g_hidden_dim = 256; +static int g_max_tokens_per_rank = 64; +static NVTEDType g_max_token_dtype = kNVTEFloat32; // staging-buffer sizing +static bool g_ep_initialized = false; +static ncclComm_t g_ep_comm = nullptr; // owned by harness, destroyed in ep_teardown + +// RAII owner for a cudaMalloc'd device buffer; element-count API on top of +// test::CudaPtr. +template +struct DevBuf { + test::CudaPtr ptr; + size_t count = 0; + + DevBuf() = default; + explicit DevBuf(size_t n) { alloc(n); } + + void alloc(size_t n) { + count = n; + ptr = (n > 0) ? test::cuda_alloc(n * sizeof(T)) : test::CudaPtr{}; + } + void reset() { + ptr.reset(); + count = 0; + } + + T* get() const { return ptr.get(); } + size_t bytes() const { return count * sizeof(T); } +}; + +// ── Shared routing helper ───────────────────────────────────────────────────── + +// Balanced round-robin routing: token t on rank r maps top_k experts to +// (r * num_local_experts + t * top_k + k) % num_experts +static inline std::vector routing_balanced( + int rank, int num_tokens, int top_k, int num_experts, int num_local_experts) { + std::vector idx(num_tokens * top_k); + for (int t = 0; t < num_tokens; ++t) + for (int k = 0; k < top_k; ++k) + idx[t * top_k + k] = (rank * num_local_experts + t * top_k + k) % num_experts; + return idx; +} + +// ── ncclUniqueId exchange via MPI ───────────────────────────────────────────── + +static void exchange_unique_id(ncclUniqueId* uid) { + if (g_process_id == 0) NVTE_CHECK_NCCL(ncclGetUniqueId(uid)); + CHECK_MPI(MPI_Bcast(uid, sizeof(*uid), MPI_BYTE, 0, MPI_COMM_WORLD)); +} + +// ── CLI parsing ─────────────────────────────────────────────────────────────── + +static void ep_parse_args(int argc, char* argv[]) { + for (int i = 1; i < argc; ++i) { + std::string a(argv[i]); + if (a.rfind("--max-token-dtype=", 0) == 0) + g_max_token_dtype = static_cast(std::stoi(a.substr(18))); + } +} + +// ── Bootstrap / teardown ────────────────────────────────────────────────────── + +// Returns false if the binary should exit without running tests (wrong SM, etc.). +static bool ep_bootstrap(int argc, char* argv[]) { + int mpi_initialized = 0; + MPI_Initialized(&mpi_initialized); + if (!mpi_initialized) CHECK_MPI(MPI_Init(&argc, &argv)); + CHECK_MPI(MPI_Comm_rank(MPI_COMM_WORLD, &g_process_id)); + CHECK_MPI(MPI_Comm_size(MPI_COMM_WORLD, &g_num_processes)); + + ep_parse_args(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + + int device_count; + cudaGetDeviceCount(&device_count); + cudaSetDevice(g_process_id % device_count); + + int device, major; + cudaGetDevice(&device); + cudaDeviceGetAttribute(&major, cudaDevAttrComputeCapabilityMajor, device); + g_sm_major = major; + if (major < 9) { + if (g_process_id == 0) + printf("SKIP: EP requires SM_90+ (device is SM_%d0)\n", major); + return false; + } + if (g_num_processes < 2) { + if (g_process_id == 0) + printf("SKIP: at least 2 processes required\n"); + return false; + } + + g_ep_size = g_num_processes; + g_num_experts = g_ep_size * 4; // 4 experts per rank + + ncclUniqueId uid{}; + exchange_unique_id(&uid); + + NVTEEpGroupConfig group_config{}; + group_config.ep_size = g_ep_size; + group_config.num_experts = g_num_experts; + group_config.max_tokens_per_rank = g_max_tokens_per_rank; + // Worst-case for top_k fan-out: ep_size * max_tokens_per_rank * 2. + group_config.max_recv_tokens_per_rank = g_ep_size * g_max_tokens_per_rank * 2; + group_config.hidden_dim = g_hidden_dim; + group_config.max_token_dtype = g_max_token_dtype; + + NVTE_CHECK_NCCL(ncclCommInitRank(&g_ep_comm, g_num_processes, uid, g_process_id)); + nvte_ep_initialize(static_cast(g_ep_comm), group_config); + + if (g_process_id == 0) { + printf("EP initialized: ep_size=%d num_experts=%d " + "hidden_dim=%d max_tokens_per_rank=%d\n", + g_ep_size, g_num_experts, g_hidden_dim, g_max_tokens_per_rank); + } + + g_ep_initialized = true; + return true; +} + +// Tear down in dependency order: backend's ep_group reads from ep_comm, +// so destroy the group first, then the comm. +static void ep_teardown() { + if (g_ep_initialized) { + nvte_ep_shutdown(); + if (g_ep_comm != nullptr) { + ncclCommDestroy(g_ep_comm); + g_ep_comm = nullptr; + } + g_ep_initialized = false; + } + int finalized = 0; + MPI_Finalized(&finalized); + if (!finalized) MPI_Finalize(); +} diff --git a/transformer_engine/common/CMakeLists.txt b/transformer_engine/common/CMakeLists.txt index 030023d949..c5f8dfb1ab 100644 --- a/transformer_engine/common/CMakeLists.txt +++ b/transformer_engine/common/CMakeLists.txt @@ -379,6 +379,96 @@ if (NVTE_WITH_CUSOLVERMP) message(STATUS "Using cuSolverMp at: ${CUSOLVERMP_DIR}") endif() +# ── NCCL EP (on by default, HT mode only) ───────────────────────────────── +# Set -DNVTE_WITH_NCCL_EP=OFF (or NVTE_BUILD_WITH_NCCL_EP=0 in setup.py) to +# skip NCCL EP entirely — useful on older images whose system NCCL is below +# the 2.30.4 EP minimum. +option(NVTE_WITH_NCCL_EP "Build NCCL EP into libtransformer_engine.so" ON) +if(NVTE_WITH_NCCL_EP) +# SM>=90 and NCCL>=2.30.4 are gated at runtime in EPBackend::initialize. +# ── NCCL EP headers ──────────────────────────────────────────────────────── +# Headers + libs are produced by the in-tree 3rdparty/nccl submodule build +# (auto-built by setup.py via build_nccl_ep_submodule). +set(NCCL_EP_SUBMODULE_ROOT + "${CMAKE_CURRENT_SOURCE_DIR}/../../3rdparty/nccl") +set(NCCL_EP_INCLUDE_DIR "${NCCL_EP_SUBMODULE_ROOT}/contrib/nccl_ep/include") +if(NOT EXISTS "${NCCL_EP_INCLUDE_DIR}/nccl_ep.h") + message(FATAL_ERROR + "NCCL EP header not found at ${NCCL_EP_INCLUDE_DIR}/nccl_ep.h. " + "Run `git submodule update --init --recursive` to checkout 3rdparty/nccl.") +endif() +message(STATUS "NCCL EP headers: ${NCCL_EP_INCLUDE_DIR}") + +# ── libnccl_ep.so ────────────────────────────────────────────────────────── +set(NCCL_EP_LIB_DIR "${NCCL_EP_SUBMODULE_ROOT}/build/lib") +find_library(NCCL_EP_LIB + NAMES nccl_ep libnccl_ep + HINTS ${NCCL_EP_LIB_DIR} + NO_DEFAULT_PATH + REQUIRED) + +# ── NCCL + GIN headers ───────────────────────────────────────────────────── +# libnccl.so and all GIN headers (ncclGin.h, ncclWindow_t, ncclDevComm_t) +# ship with the base CUDA Toolkit OR the 3rdparty/nccl submodule build +# (preferred when present; auto-built by setup.py via build_nccl_ep_submodule). +if(NOT NCCL_LIB) + find_library(NCCL_LIB + NAMES nccl libnccl + HINTS ${NCCL_EP_LIB_DIR} ${CUDAToolkit_LIBRARY_DIR} + PATH_SUFFIXES lib lib64 + REQUIRED) +endif() + +set(NCCL_SUBMODULE_INCLUDE + "${CMAKE_CURRENT_SOURCE_DIR}/../../3rdparty/nccl/build/include") +if(EXISTS "${NCCL_SUBMODULE_INCLUDE}/nccl.h") + set(NCCL_INCLUDE_DIRS_FOR_TE ${NCCL_SUBMODULE_INCLUDE}) +else() + set(NCCL_INCLUDE_DIRS_FOR_TE ${CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES}) +endif() + +# Diagnostic: log detected NCCL header version (minimum enforced at runtime). +find_file(_nvte_nccl_header_path nccl.h + PATHS ${NCCL_INCLUDE_DIRS_FOR_TE} ${CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES} + NO_DEFAULT_PATH) +if(_nvte_nccl_header_path) + file(READ "${_nvte_nccl_header_path}" _nvte_nccl_h) + string(REGEX MATCH "#define[ \t]+NCCL_MAJOR[ \t]+([0-9]+)" _ "${_nvte_nccl_h}") + set(_nvte_nccl_major "${CMAKE_MATCH_1}") + string(REGEX MATCH "#define[ \t]+NCCL_MINOR[ \t]+([0-9]+)" _ "${_nvte_nccl_h}") + set(_nvte_nccl_minor "${CMAKE_MATCH_1}") + string(REGEX MATCH "#define[ \t]+NCCL_PATCH[ \t]+([0-9]+)" _ "${_nvte_nccl_h}") + set(_nvte_nccl_patch "${CMAKE_MATCH_1}") + if(_nvte_nccl_major AND _nvte_nccl_minor AND _nvte_nccl_patch) + message(STATUS "NCCL header: ${_nvte_nccl_header_path} (version ${_nvte_nccl_major}.${_nvte_nccl_minor}.${_nvte_nccl_patch})") + endif() +endif() + +target_include_directories(transformer_engine PRIVATE + ${NCCL_EP_INCLUDE_DIR} + ${NCCL_INCLUDE_DIRS_FOR_TE}) # covers nccl.h + nccl_device/ + +target_link_libraries(transformer_engine PUBLIC + ${NCCL_EP_LIB} + ${NCCL_LIB}) + +# Embed rpath so the installed wheel finds libnccl_ep.so at runtime. +# libnccl.so is already on the system via the Toolkit — no rpath needed for it. +set_target_properties(transformer_engine PROPERTIES + INSTALL_RPATH "$ORIGIN;${NCCL_EP_LIB_DIR}") + +target_sources(transformer_engine PRIVATE + ep/ep_backend.cpp + ep/ep_api.cpp) + +message(STATUS "NCCL EP enabled: ${NCCL_EP_LIB}") +message(STATUS "NCCL EP include: ${NCCL_EP_INCLUDE_DIR}") +else() + # NCCL EP off: export throwing nvte_ep_* stubs so framework bindings link. + target_sources(transformer_engine PRIVATE ep/ep_api_stub.cpp) + message(STATUS "NCCL EP disabled (NVTE_WITH_NCCL_EP=OFF) — using nvte_ep_* stubs") +endif() + # Number of philox4x32 rounds for stochastic rounding (build-time constant). set(NVTE_BUILD_NUM_PHILOX_ROUNDS_STR $ENV{NVTE_BUILD_NUM_PHILOX_ROUNDS}) if (NOT NVTE_BUILD_NUM_PHILOX_ROUNDS_STR) diff --git a/transformer_engine/common/ep/ep_api.cpp b/transformer_engine/common/ep/ep_api.cpp new file mode 100644 index 0000000000..89d8b38607 --- /dev/null +++ b/transformer_engine/common/ep/ep_api.cpp @@ -0,0 +1,76 @@ +/************************************************************************* + * Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See LICENSE for license information. + ************************************************************************/ + +/*! \file ep_api.cpp + * \brief nvte_ep_* C API: thin delegations to the EPBackend singleton. + */ + +#include +#include + +#include "../common.h" +#include "../util/logging.h" +#include "ep_backend.h" + +using transformer_engine::ep::EPBackend; + +void nvte_ep_initialize(void* ep_comm, NVTEEpGroupConfig group_config) { + NVTE_CHECK(ep_comm != nullptr, "ep_comm must not be null"); + EPBackend::initialize(static_cast(ep_comm), group_config); +} + +void nvte_ep_shutdown(void) { EPBackend::shutdown(); } + +uint64_t nvte_ep_register_layer(NVTEEpLayerConfig layer_config, size_t* handle_mem_size) { + NVTE_CHECK(handle_mem_size != nullptr, "handle_mem_size must not be null"); + return EPBackend::get().register_layer(layer_config, handle_mem_size); +} + +void nvte_ep_prepare(NVTEEpHandle handle, NVTETensor topk_idx, NVTETensor token_counts, + size_t dispatch_output_per_expert_alignment, cudaStream_t stream) { + void* mem_ptr = nvte_tensor_data(handle.mem); + NVTE_CHECK(mem_ptr != nullptr, "handle_mem tensor data must not be null"); + EPBackend::get().prepare(handle.id, topk_idx, token_counts, mem_ptr, + dispatch_output_per_expert_alignment, stream); +} + +void nvte_ep_dispatch(NVTEEpHandle handle, NVTETensor topk_idx, NVTETensor tokens, + NVTECommWindow tokens_win, NVTETensor topk_weights, + NVTECommWindow topk_weights_win, NVTETensor recv_tokens, + NVTECommWindow recv_tokens_win, NVTETensor recv_topk_weights, + NVTECommWindow recv_topk_weights_win, cudaStream_t stream) { + void* mem_ptr = nvte_tensor_data(handle.mem); + NVTE_CHECK(mem_ptr != nullptr, "handle_mem tensor data must not be null"); + EPBackend::get().dispatch(handle.id, mem_ptr, topk_idx, tokens, tokens_win, topk_weights, + topk_weights_win, recv_tokens, recv_tokens_win, recv_topk_weights, + recv_topk_weights_win, stream); +} + +void nvte_ep_combine(NVTEEpHandle handle, NVTETensor expert_out, NVTECommWindow expert_out_win, + NVTETensor result, cudaStream_t stream) { + void* mem_ptr = nvte_tensor_data(handle.mem); + NVTE_CHECK(mem_ptr != nullptr, "handle_mem tensor data must not be null"); + EPBackend::get().combine(handle.id, mem_ptr, expert_out, expert_out_win, result, stream); +} + +void nvte_ep_dispatch_bwd(NVTEEpHandle handle, NVTETensor grad, NVTECommWindow grad_win, + NVTETensor g_recv_topk_weights, NVTECommWindow g_recv_topk_weights_win, + NVTETensor grad_tokens, NVTETensor grad_topk_weights, + cudaStream_t stream) { + void* mem_ptr = nvte_tensor_data(handle.mem); + NVTE_CHECK(mem_ptr != nullptr, "handle_mem tensor data must not be null"); + EPBackend::get().dispatch_bwd(handle.id, mem_ptr, grad, grad_win, g_recv_topk_weights, + g_recv_topk_weights_win, grad_tokens, grad_topk_weights, stream); +} + +void nvte_ep_combine_bwd(NVTEEpHandle handle, NVTETensor grad, NVTECommWindow grad_win, + NVTETensor grad_expert_out, NVTECommWindow grad_expert_out_win, + cudaStream_t stream) { + void* mem_ptr = nvte_tensor_data(handle.mem); + NVTE_CHECK(mem_ptr != nullptr, "handle_mem tensor data must not be null"); + EPBackend::get().combine_bwd(handle.id, mem_ptr, grad, grad_win, grad_expert_out, + grad_expert_out_win, stream); +} diff --git a/transformer_engine/common/ep/ep_api_stub.cpp b/transformer_engine/common/ep/ep_api_stub.cpp new file mode 100644 index 0000000000..fe4127d87d --- /dev/null +++ b/transformer_engine/common/ep/ep_api_stub.cpp @@ -0,0 +1,61 @@ +/************************************************************************* + * Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See LICENSE for license information. + ************************************************************************/ + +/*! \file ep_api_stub.cpp + * \brief Throwing nvte_ep_* stubs compiled when NVTE_WITH_NCCL_EP=OFF. + */ + +#include + +#include "../util/logging.h" + +namespace { +[[noreturn]] void ep_not_built() { + NVTE_ERROR( + "NCCL EP is not built into this TransformerEngine. Rebuild TE with " + "NVTE_BUILD_WITH_NCCL_EP=1 and CUDA arch >= 90 (e.g. NVTE_CUDA_ARCHS=\"90\")."); +} +} // namespace + +void nvte_ep_initialize(void* /*ep_comm*/, NVTEEpGroupConfig /*group_config*/) { ep_not_built(); } + +void nvte_ep_shutdown(void) {} + +uint64_t nvte_ep_register_layer(NVTEEpLayerConfig /*layer_config*/, size_t* /*handle_mem_size*/) { + ep_not_built(); +} + +void nvte_ep_prepare(NVTEEpHandle /*handle*/, NVTETensor /*topk_idx*/, NVTETensor /*token_counts*/, + size_t /*dispatch_output_per_expert_alignment*/, cudaStream_t /*stream*/) { + ep_not_built(); +} + +void nvte_ep_dispatch(NVTEEpHandle /*handle*/, NVTETensor /*topk_idx*/, NVTETensor /*tokens*/, + NVTECommWindow /*tokens_win*/, NVTETensor /*topk_weights*/, + NVTECommWindow /*topk_weights_win*/, NVTETensor /*recv_tokens*/, + NVTECommWindow /*recv_tokens_win*/, NVTETensor /*recv_topk_weights*/, + NVTECommWindow /*recv_topk_weights_win*/, cudaStream_t /*stream*/) { + ep_not_built(); +} + +void nvte_ep_combine(NVTEEpHandle /*handle*/, NVTETensor /*expert_out*/, + NVTECommWindow /*expert_out_win*/, NVTETensor /*result*/, + cudaStream_t /*stream*/) { + ep_not_built(); +} + +void nvte_ep_dispatch_bwd(NVTEEpHandle /*handle*/, NVTETensor /*grad*/, NVTECommWindow /*grad_win*/, + NVTETensor /*g_recv_topk_weights*/, + NVTECommWindow /*g_recv_topk_weights_win*/, NVTETensor /*grad_tokens*/, + NVTETensor /*grad_topk_weights*/, cudaStream_t /*stream*/) { + ep_not_built(); +} + +void nvte_ep_combine_bwd(NVTEEpHandle /*handle*/, NVTETensor /*grad*/, NVTECommWindow /*grad_win*/, + NVTETensor /*grad_expert_out*/, NVTECommWindow /*grad_expert_out_win*/, + cudaStream_t /*stream*/) { + ep_not_built(); +} diff --git a/transformer_engine/common/ep/ep_backend.cpp b/transformer_engine/common/ep/ep_backend.cpp new file mode 100644 index 0000000000..a5ae99b089 --- /dev/null +++ b/transformer_engine/common/ep/ep_backend.cpp @@ -0,0 +1,513 @@ +/************************************************************************* + * Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See LICENSE for license information. + ************************************************************************/ + +/*! \file ep_backend.cpp + * \brief EPBackend implementation. See ep_backend.h for the op flow. + */ + +#include "ep_backend.h" + +#include +#include +#include +#include +#include +#include + +#include "../common.h" +#include "../util/cuda_runtime.h" +#include "../util/logging.h" + +namespace transformer_engine { +namespace ep { + +namespace { + +// Build a by-value ncclEpTensor_t descriptor. `sizes` is caller-owned and must +// outlive any NCCL EP call that consumes the descriptor. +inline ncclEpTensor_t make_tensor(void* data, unsigned int ndim, ncclDataType_t datatype, + size_t* sizes) { + ncclEpTensor_t t = NCCL_EP_TENSOR_INIT; + t.ndim = ndim; + t.datatype = datatype; + t.data = data; + t.sizes = sizes; + return t; +} + +// Payload descriptor: prefer the symmem window when set, else fall back to the +// NVTETensor's raw device pointer. +inline ncclEpTensor_t make_payload_tensor(const NVTETensor t, const NVTECommWindow& win, + unsigned int ndim, ncclDataType_t datatype, + size_t* sizes) { + ncclEpTensor_t desc = NCCL_EP_TENSOR_INIT; + desc.ndim = ndim; + desc.datatype = datatype; + desc.sizes = sizes; + if (win.window != nullptr) { + desc.win_hdl = win.window; + desc.win_offset = win.offset; + } else { + desc.data = nvte_tensor_data(t); + NVTE_CHECK(desc.data != nullptr, "payload tensor data must not be null"); + } + return desc; +} + +} // namespace + +// --------------------------------------------------------------------------- +// Singleton + bootstrap +// --------------------------------------------------------------------------- + +EPBackend& EPBackend::instance() { + static EPBackend inst; + return inst; +} + +EPBackend& EPBackend::get() { + EPBackend& inst = instance(); + NVTE_CHECK(inst.initialized_, "EPBackend not initialized. Call nvte_ep_initialize() first."); + return inst; +} + +void EPBackend::validate_config(const NVTEEpGroupConfig& config) { + NVTE_CHECK(config.ep_size > 0, "ep_size must be positive, got ", config.ep_size); + NVTE_CHECK(config.num_experts > 0, "num_experts must be positive, got ", config.num_experts); + NVTE_CHECK(config.max_tokens_per_rank > 0, "max_tokens_per_rank must be positive, got ", + config.max_tokens_per_rank); + NVTE_CHECK(config.max_recv_tokens_per_rank > 0, "max_recv_tokens_per_rank must be positive, got ", + config.max_recv_tokens_per_rank); + NVTE_CHECK(config.hidden_dim > 0, "hidden_dim must be positive, got ", config.hidden_dim); + NVTE_CHECK(config.max_token_dtype >= 0 && config.max_token_dtype < kNVTENumTypes, + "max_token_dtype out of range, got ", static_cast(config.max_token_dtype)); + const size_t elem_bytes = typeToSize(static_cast(config.max_token_dtype)); + NVTE_CHECK(config.hidden_dim * elem_bytes >= 16, + "hidden_dim * sizeof(max_token_dtype) must be >= 16 (NCCL EP 16B row alignment); " + "got hidden_dim=", + config.hidden_dim, ", element_bytes=", elem_bytes); + NVTE_CHECK(config.num_experts % config.ep_size == 0, "num_experts (", config.num_experts, + ") must be divisible by ep_size (", config.ep_size, ")"); + NVTE_CHECK(config.max_num_sms >= 0, "max_num_sms must be >= 0 (0 = auto), got ", + config.max_num_sms); + + int device, major; + NVTE_CHECK_CUDA(cudaGetDevice(&device)); + NVTE_CHECK_CUDA(cudaDeviceGetAttribute(&major, cudaDevAttrComputeCapabilityMajor, device)); + NVTE_CHECK(major >= 9, + "NCCL EP requires SM_90+ (Hopper or later), " + "but current device has compute capability ", + major, ".x"); + + // NCCL EP needs CUDA multicast (NVLS); init hangs without it. + NVTE_CHECK(cuda::supports_multicast(device), + "NCCL EP requires CUDA multicast (NVLS) support on device ", device, + " but CU_DEVICE_ATTRIBUTE_MULTICAST_SUPPORTED reports 0."); +} + +void EPBackend::initialize(ncclComm_t ep_comm, NVTEEpGroupConfig config) { + EPBackend& inst = instance(); + std::lock_guard lock(inst.mutex_); + NVTE_CHECK(!inst.initialized_, "EP already initialized. Call initialize only once per process."); + NVTE_CHECK(ep_comm != nullptr, "ep_comm must not be null"); + + // Runtime gate: NCCL >= 2.30.4 (matches the submodule pin). + constexpr int kMinNcclVersion = 23004; + int nccl_version = 0; + NVTE_CHECK_NCCL(ncclGetVersion(&nccl_version)); + NVTE_CHECK(nccl_version >= kMinNcclVersion, "NCCL EP requires NCCL >= 2.30.4, found ", + nccl_version / 10000, ".", (nccl_version / 100) % 100, ".", nccl_version % 100, + " at runtime."); + + validate_config(config); + + int comm_size = 0; + NVTE_CHECK_NCCL(ncclCommCount(ep_comm, &comm_size)); + NVTE_CHECK(comm_size == config.ep_size, "ep_comm size (", comm_size, ") must equal ep_size (", + config.ep_size, "). Pass the EP sub-communicator, not the world comm."); + + inst.init(ep_comm, config); +} + +void EPBackend::shutdown() { + EPBackend& inst = instance(); + std::lock_guard lock(inst.mutex_); + if (!inst.initialized_) return; + for (auto& kv : inst.handles_) { + if (kv.second.cached_handle != nullptr) { + ncclEpHandleDestroy(kv.second.cached_handle); + kv.second.cached_handle = nullptr; + kv.second.cached_handle_mem = nullptr; + } + } + inst.handles_.clear(); + // ncclEpGroupDestroy reads from ep_comm_; destroy group while comm is still alive. + if (inst.ep_group_ != nullptr) { + ncclEpGroupDestroy(inst.ep_group_); + inst.ep_group_ = nullptr; + } + inst.ep_comm_ = nullptr; // borrowed — caller destroys + inst.initialized_ = false; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +ncclDataType_t EPBackend::nvte_dtype_to_nccl(NVTEDType dtype) { + switch (dtype) { + case kNVTEFloat32: + return ncclFloat32; + case kNVTEFloat16: + return ncclFloat16; + case kNVTEBFloat16: + return ncclBfloat16; + case kNVTEInt32: + return ncclInt32; + case kNVTEInt64: + return ncclInt64; + case kNVTEByte: + return ncclUint8; + case kNVTEFloat8E4M3: + return ncclFloat8e4m3; + case kNVTEFloat8E5M2: + return ncclFloat8e5m2; + default: + NVTE_ERROR("Unsupported NVTEDType for NCCL EP conversion: ", static_cast(dtype)); + } + return ncclFloat32; // unreachable +} + +// Open a fresh ncclEpHandle over handle_mem. Caller (or cache) owns the result. +ncclEpHandle_t EPBackend::open_handle(void* handle_mem, size_t handle_mem_size, int num_topk, + size_t dispatch_output_per_expert_alignment) { + size_t hm_sizes[1] = {handle_mem_size}; + ncclEpTensor_t routing_desc = make_tensor(handle_mem, 1, ncclUint8, hm_sizes); + ncclEpHandleConfig_t hcfg = NCCL_EP_HANDLE_CONFIG_INIT; + hcfg.dispatch_output_per_expert_alignment = dispatch_output_per_expert_alignment; + ncclEpHandle_t handle; + NVTE_CHECK_NCCL(ncclEpInitHandle(&handle, ep_group_, NCCL_EP_LAYOUT_EXPERT_MAJOR, &hcfg, num_topk, + &routing_desc)); + return handle; +} + +// --------------------------------------------------------------------------- +// Lifecycle +// --------------------------------------------------------------------------- + +// Static-dtor teardown: skip NCCL calls (CUDA context / borrowed ep_comm_ may +// already be gone) and release in-memory state only. +EPBackend::~EPBackend() { + std::lock_guard lock(mutex_); + if (!initialized_) return; + handles_.clear(); + ep_group_ = nullptr; + ep_comm_ = nullptr; + initialized_ = false; +} + +void EPBackend::init(ncclComm_t ep_comm, NVTEEpGroupConfig group_config) { + NVTE_CHECK(!initialized_, "EPBackend already initialized"); + + group_config_ = group_config; + + ncclEpGroupConfig_t cfg = NCCL_EP_GROUP_CONFIG_INIT; + cfg.algorithm = NCCL_EP_ALGO_HIGH_THROUGHPUT; + cfg.num_experts = static_cast(group_config.num_experts); + cfg.max_dispatch_tokens_per_rank = static_cast(group_config.max_tokens_per_rank); + const size_t elem_bytes = typeToSize(static_cast(group_config.max_token_dtype)); + cfg.max_token_bytes = static_cast(group_config.hidden_dim * elem_bytes); + cfg.rdma_buffer_size = NCCL_EP_AUTO; + cfg.num_qp_per_rank = NCCL_EP_AUTO; + cfg.num_channels = NCCL_EP_AUTO; + cfg.max_num_sms = group_config.max_num_sms > 0 + ? static_cast(group_config.max_num_sms) + : NCCL_EP_AUTO; + // Must be > 0; NCCL EP errors out on 0. + cfg.max_recv_tokens_per_rank = static_cast(group_config.max_recv_tokens_per_rank); + + NVTE_CHECK_NCCL(ncclEpCreateGroup(&ep_group_, ep_comm, &cfg)); + + ep_comm_ = ep_comm; + + initialized_ = true; +} + +// --------------------------------------------------------------------------- +// Per-handle_id config cache +// --------------------------------------------------------------------------- + +uint64_t EPBackend::insert_new_entry(size_t handle_mem_size, int top_k, size_t alignment) { + if (handle_cache_cap_ == 0) { + const char* cap_env = std::getenv("NVTE_EP_HANDLE_CACHE_SIZE"); + handle_cache_cap_ = (cap_env != nullptr) ? std::max(1, std::atoi(cap_env)) : 8192; + } + NVTE_CHECK(handles_.size() < handle_cache_cap_, "EP handle cache full (", handle_cache_cap_, + " entries). Raise via NVTE_EP_HANDLE_CACHE_SIZE."); + uint64_t id = next_handle_id_.fetch_add(1, std::memory_order_relaxed); + handles_.emplace(id, HandleEntry{handle_mem_size, alignment, top_k}); + return id; +} + +EPBackend::HandleEntry& EPBackend::lookup_config(uint64_t handle_id) { + auto it = handles_.find(handle_id); + NVTE_CHECK(it != handles_.end(), "ep op on handle_id=", handle_id, + " with no cached config — call ep_prepare first."); + return it->second; +} + +ncclEpHandle_t EPBackend::get_or_open_handle(HandleEntry& cfg, void* handle_mem) { + if (cfg.cached_handle != nullptr && cfg.cached_handle_mem == handle_mem) { + return cfg.cached_handle; + } + if (cfg.cached_handle != nullptr) { + NVTE_CHECK(group_config_.allow_handle_mem_reloc != 0, + "EP handle_mem relocated for cached handle (old=", + reinterpret_cast(cfg.cached_handle_mem), + ", new=", reinterpret_cast(handle_mem), + "). Set NVTEEpGroupConfig.allow_handle_mem_reloc=1 to allow rebuild."); + ncclEpHandleDestroy(cfg.cached_handle); + cfg.cached_handle = nullptr; + cfg.cached_handle_mem = nullptr; + } + ncclEpHandle_t h = open_handle(handle_mem, cfg.handle_mem_size, cfg.top_k, cfg.alignment); + cfg.cached_handle = h; + cfg.cached_handle_mem = handle_mem; + return h; +} + +// --------------------------------------------------------------------------- +// Per-step operations +// --------------------------------------------------------------------------- + +uint64_t EPBackend::register_layer(NVTEEpLayerConfig layer_config, size_t* handle_mem_size) { + NVTE_CHECK(initialized_, "EPBackend not initialized"); + NVTE_CHECK(layer_config.top_k > 0, "NVTEEpLayerConfig.top_k must be > 0"); + NVTE_CHECK(handle_mem_size != nullptr, "handle_mem_size must not be null"); + ncclEpHandleConfig_t hcfg = NCCL_EP_HANDLE_CONFIG_INIT; + hcfg.dispatch_output_per_expert_alignment = layer_config.dispatch_output_per_expert_alignment; + size_t hm_size = 0; + NVTE_CHECK_NCCL(ncclEpHandleMemSize(ep_group_, NCCL_EP_LAYOUT_EXPERT_MAJOR, &hcfg, &hm_size, + layer_config.top_k)); + *handle_mem_size = hm_size; + std::lock_guard lock(mutex_); + return insert_new_entry(hm_size, layer_config.top_k, + layer_config.dispatch_output_per_expert_alignment); +} + +void EPBackend::prepare(uint64_t handle_id, const NVTETensor topk_idx, NVTETensor token_counts, + void* handle_mem, size_t dispatch_output_per_expert_alignment, + cudaStream_t stream) { + NVTE_CHECK(initialized_, "EPBackend not initialized"); + NVTE_CHECK(handle_mem != nullptr, "handle_mem must not be null"); + + NVTEShape idx_shape = nvte_tensor_shape(topk_idx); + void* idx_data = nvte_tensor_data(topk_idx); + NVTE_CHECK(idx_data != nullptr, "topk_idx data must not be null"); + + const size_t num_tokens = idx_shape.data[0]; + const size_t top_k = idx_shape.ndim > 1 ? idx_shape.data[1] : 1; + const size_t num_local_experts = + static_cast(group_config_.num_experts / group_config_.ep_size); + + size_t idx_sizes[2] = {num_tokens, top_k}; + ncclEpTensor_t nccl_topk_idx = make_tensor(idx_data, 2, ncclInt64, idx_sizes); + + // ncclEpUpdateHandle writes per-expert counts via expert_counters. + size_t cnt_sizes[1] = {num_local_experts}; + ncclEpTensor_t token_counts_desc; + void* token_counts_data = (token_counts != nullptr) ? nvte_tensor_data(token_counts) : nullptr; + if (token_counts_data != nullptr) { + token_counts_desc = make_tensor(token_counts_data, 1, ncclInt32, cnt_sizes); + } + ncclEpLayoutInfo_t layout_info = NCCL_EP_LAYOUT_INFO_INIT; + layout_info.expert_counters = (token_counts_data != nullptr) ? &token_counts_desc : nullptr; + + std::lock_guard lock(mutex_); + HandleEntry& cfg = lookup_config(handle_id); + NVTE_CHECK(cfg.alignment == dispatch_output_per_expert_alignment, + "ep_prepare: alignment mismatch for handle_id=", handle_id, " (cached=", cfg.alignment, + ", got=", dispatch_output_per_expert_alignment, ")"); + ncclEpHandle_t h = get_or_open_handle(cfg, handle_mem); + NVTE_CHECK_NCCL(ncclEpUpdateHandle(h, &nccl_topk_idx, &layout_info, stream)); +} + +void EPBackend::dispatch(uint64_t handle_id, void* handle_mem, const NVTETensor topk_idx, + const NVTETensor tokens, const NVTECommWindow& tokens_win, + const NVTETensor topk_weights, const NVTECommWindow& topk_weights_win, + NVTETensor recv_tokens, const NVTECommWindow& recv_tokens_win, + NVTETensor recv_topk_weights, const NVTECommWindow& recv_topk_weights_win, + cudaStream_t stream) { + NVTE_CHECK(initialized_, "EPBackend not initialized"); + NVTE_CHECK(handle_mem != nullptr, "handle_mem must not be null"); + + NVTEShape tok_shape = nvte_tensor_shape(tokens); + NVTEDType tok_dtype = nvte_tensor_type(tokens); + NVTE_CHECK(typeToSize(static_cast(tok_dtype)) <= + typeToSize(static_cast(group_config_.max_token_dtype)), + "tokens dtype (", static_cast(tok_dtype), ") wider than group max_token_dtype (", + static_cast(group_config_.max_token_dtype), ")"); + + const size_t num_tokens = tok_shape.data[0]; + const size_t hidden_dim = tok_shape.data[1]; + + size_t tok_sizes[2] = {num_tokens, hidden_dim}; + ncclEpTensor_t nccl_tokens_in = + make_payload_tensor(tokens, tokens_win, 2, nvte_dtype_to_nccl(tok_dtype), tok_sizes); + + const bool is_forward = (topk_weights != nullptr); + + // Routing is cached in handle_mem by ep_prepare; dispatch only needs + // topk_weights to reconstruct the sparse-to-dense prob map. + size_t weights_in_sizes[2] = {0, 0}; + ncclEpTensor_t nccl_topk_weights_in; + if (is_forward) { + NVTE_CHECK(topk_idx != nullptr, "topk_idx required in forward dispatch"); + NVTEShape idx_shape = nvte_tensor_shape(topk_idx); + const size_t top_k = idx_shape.ndim > 1 ? idx_shape.data[1] : 1; + weights_in_sizes[0] = num_tokens; + weights_in_sizes[1] = top_k; + nccl_topk_weights_in = + make_payload_tensor(topk_weights, topk_weights_win, 2, ncclFloat32, weights_in_sizes); + } + + NVTEShape recv_shape = nvte_tensor_shape(recv_tokens); + NVTEDType recv_dtype = nvte_tensor_type(recv_tokens); + NVTE_CHECK(typeToSize(static_cast(recv_dtype)) <= + typeToSize(static_cast(group_config_.max_token_dtype)), + "recv_tokens dtype (", static_cast(recv_dtype), + ") wider than group max_token_dtype (", + static_cast(group_config_.max_token_dtype), ")"); + + size_t recv_sizes[2] = {recv_shape.data[0], recv_shape.data[1]}; + ncclEpTensor_t nccl_tokens_out = make_payload_tensor(recv_tokens, recv_tokens_win, 2, + nvte_dtype_to_nccl(recv_dtype), recv_sizes); + + size_t weights_out_sizes[1] = {recv_shape.data[0]}; + ncclEpTensor_t nccl_topk_weights_out; + if (is_forward) { + NVTE_CHECK(recv_topk_weights != nullptr, + "recv_topk_weights must not be null in forward dispatch"); + NVTEShape recv_w_shape = nvte_tensor_shape(recv_topk_weights); + NVTE_CHECK(recv_w_shape.ndim == 1, "recv_topk_weights must be 1D [recv_capacity]"); + nccl_topk_weights_out = make_payload_tensor(recv_topk_weights, recv_topk_weights_win, 1, + ncclFloat32, weights_out_sizes); + } + + ncclEpDispatchInputs_t in_struct = NCCL_EP_DISPATCH_INPUTS_INIT; + in_struct.tokens = &nccl_tokens_in; + in_struct.topk_weights = is_forward ? &nccl_topk_weights_in : nullptr; + + ncclEpDispatchOutputs_t out_struct = NCCL_EP_DISPATCH_OUTPUTS_INIT; + out_struct.tokens = &nccl_tokens_out; + out_struct.topk_weights = is_forward ? &nccl_topk_weights_out : nullptr; + + ncclEpDispatchConfig_t dispatch_cfg = NCCL_EP_DISPATCH_CONFIG_INIT; + dispatch_cfg.pass_direction = is_forward ? NCCL_EP_FWD_PASS : NCCL_EP_BWD_PASS; + + std::lock_guard lock(mutex_); + HandleEntry& cfg = lookup_config(handle_id); + ncclEpHandle_t h = get_or_open_handle(cfg, handle_mem); + NVTE_CHECK_NCCL(ncclEpDispatch(h, &in_struct, &out_struct, + /*layout_info=*/nullptr, &dispatch_cfg, stream)); +} + +void EPBackend::combine(uint64_t handle_id, void* handle_mem, const NVTETensor expert_out, + const NVTECommWindow& expert_out_win, NVTETensor result, + cudaStream_t stream) { + NVTE_CHECK(initialized_, "EPBackend not initialized"); + NVTE_CHECK(handle_mem != nullptr, "handle_mem must not be null"); + + NVTEShape exp_shape = nvte_tensor_shape(expert_out); + NVTEDType exp_dtype = nvte_tensor_type(expert_out); + + size_t exp_sizes[2] = {exp_shape.data[0], exp_shape.data[1]}; + ncclEpTensor_t nccl_expert_in = + make_payload_tensor(expert_out, expert_out_win, 2, nvte_dtype_to_nccl(exp_dtype), exp_sizes); + + NVTEShape res_shape = nvte_tensor_shape(result); + void* res_data = nvte_tensor_data(result); + NVTEDType res_dtype = nvte_tensor_type(result); + NVTE_CHECK(res_data != nullptr, "result data must not be null"); + + size_t res_sizes[2] = {res_shape.data[0], res_shape.data[1]}; + ncclEpTensor_t nccl_result_out = + make_tensor(res_data, 2, nvte_dtype_to_nccl(res_dtype), res_sizes); + + ncclEpCombineInputs_t in_struct = NCCL_EP_COMBINE_INPUTS_INIT; + in_struct.tokens = &nccl_expert_in; + + ncclEpCombineOutputs_t out_struct = NCCL_EP_COMBINE_OUTPUTS_INIT; + out_struct.tokens = &nccl_result_out; + + std::lock_guard lock(mutex_); + HandleEntry& cfg = lookup_config(handle_id); + ncclEpHandle_t h = get_or_open_handle(cfg, handle_mem); + NVTE_CHECK_NCCL(ncclEpCombine(h, &in_struct, &out_struct, /*config=*/nullptr, stream)); +} + +void EPBackend::dispatch_bwd(uint64_t handle_id, void* handle_mem, const NVTETensor grad, + const NVTECommWindow& grad_win, const NVTETensor g_recv_topk_weights, + const NVTECommWindow& g_recv_topk_weights_win, NVTETensor grad_tokens, + NVTETensor grad_topk_weights, cudaStream_t stream) { + NVTE_CHECK(initialized_, "EPBackend not initialized"); + NVTE_CHECK(handle_mem != nullptr, "handle_mem must not be null"); + + NVTEShape g_shape = nvte_tensor_shape(grad); + NVTEDType g_dtype = nvte_tensor_type(grad); + size_t g_sizes[2] = {g_shape.data[0], g_shape.data[1]}; + ncclEpTensor_t nccl_tok_in = + make_payload_tensor(grad, grad_win, 2, nvte_dtype_to_nccl(g_dtype), g_sizes); + + // g_recv_topk_weights must be 1D [recv_capacity] — caller flattens. + NVTEShape gw_shape = nvte_tensor_shape(g_recv_topk_weights); + NVTE_CHECK(gw_shape.ndim == 1, + "g_recv_topk_weights must be 1D [recv_capacity]; caller must flatten leading dims"); + size_t gw_sizes[1] = {gw_shape.data[0]}; + ncclEpTensor_t nccl_w_in = + make_payload_tensor(g_recv_topk_weights, g_recv_topk_weights_win, 1, ncclFloat32, gw_sizes); + + NVTEShape gt_shape = nvte_tensor_shape(grad_tokens); + void* gt_data = nvte_tensor_data(grad_tokens); + NVTE_CHECK(gt_data != nullptr, "grad_tokens data must not be null"); + size_t gt_sizes[2] = {gt_shape.data[0], gt_shape.data[1]}; + ncclEpTensor_t nccl_tok_out = make_tensor(gt_data, 2, nvte_dtype_to_nccl(g_dtype), gt_sizes); + + NVTEShape gtw_shape = nvte_tensor_shape(grad_topk_weights); + void* gtw_data = nvte_tensor_data(grad_topk_weights); + NVTE_CHECK(gtw_data != nullptr, "grad_topk_weights data must not be null"); + NVTE_CHECK(gtw_shape.ndim == 2, "grad_topk_weights must be 2D [T, top_k]"); + size_t gtw_sizes[2] = {gtw_shape.data[0], gtw_shape.data[1]}; + ncclEpTensor_t nccl_w_out = make_tensor(gtw_data, 2, ncclFloat32, gtw_sizes); + + ncclEpCombineInputs_t in_struct = NCCL_EP_COMBINE_INPUTS_INIT; + in_struct.tokens = &nccl_tok_in; + in_struct.topk_weights = &nccl_w_in; + + ncclEpCombineOutputs_t out_struct = NCCL_EP_COMBINE_OUTPUTS_INIT; + out_struct.tokens = &nccl_tok_out; + out_struct.topk_weights = &nccl_w_out; + + ncclEpCombineConfig_t cfg = NCCL_EP_COMBINE_CONFIG_INIT; + cfg.pass_direction = NCCL_EP_BWD_PASS; + + std::lock_guard lock(mutex_); + HandleEntry& entry = lookup_config(handle_id); + ncclEpHandle_t h = get_or_open_handle(entry, handle_mem); + NVTE_CHECK_NCCL(ncclEpCombine(h, &in_struct, &out_struct, &cfg, stream)); +} + +void EPBackend::combine_bwd(uint64_t handle_id, void* handle_mem, const NVTETensor grad, + const NVTECommWindow& grad_win, NVTETensor grad_expert_out, + const NVTECommWindow& grad_expert_out_win, cudaStream_t stream) { + // Backward of combine = reverse-direction dispatch. + dispatch(handle_id, handle_mem, /*topk_idx=*/nullptr, grad, grad_win, /*topk_weights=*/nullptr, + /*topk_weights_win=*/NVTECommWindow{}, grad_expert_out, grad_expert_out_win, + /*recv_topk_weights=*/nullptr, /*recv_topk_weights_win=*/NVTECommWindow{}, stream); +} + +} // namespace ep +} // namespace transformer_engine diff --git a/transformer_engine/common/ep/ep_backend.h b/transformer_engine/common/ep/ep_backend.h new file mode 100644 index 0000000000..e82c974c3f --- /dev/null +++ b/transformer_engine/common/ep/ep_backend.h @@ -0,0 +1,122 @@ +/************************************************************************* + * Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See LICENSE for license information. + ************************************************************************/ + +/*! \file ep_backend.h + * \brief Internal NCCL EP singleton; not part of the public API. + * + * Per handle_id the cache stores config only (no device pointers), so + * handle_mem may be relocated between ops. Cap: NVTE_EP_HANDLE_CACHE_SIZE + * (default 8192); overflow throws. + */ + +#ifndef TRANSFORMER_ENGINE_COMMON_EP_EP_BACKEND_H_ +#define TRANSFORMER_ENGINE_COMMON_EP_EP_BACKEND_H_ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace transformer_engine { +namespace ep { + +/*! \brief EP backend singleton — owns the NCCL EP group; borrows the comm. */ +class EPBackend { + public: + /*! \brief Access the singleton. Aborts if not initialized. */ + static EPBackend& get(); + + /*! \brief Bootstrap from an existing EP sub-communicator. + * ep_comm is borrowed; the caller keeps it alive until shutdown() returns + * and must span exactly config.ep_size ranks. + */ + static void initialize(ncclComm_t ep_comm, NVTEEpGroupConfig config); + + /*! \brief Tear down the backend. Idempotent. Does not destroy ep_comm_. */ + static void shutdown(); + + // Host-only: reserve a fresh handle_id, cache the layer config, and report + // the handle_mem buffer size the caller must allocate. + uint64_t register_layer(NVTEEpLayerConfig layer_config, size_t* handle_mem_size); + + void prepare(uint64_t handle_id, const NVTETensor topk_idx, NVTETensor token_counts, + void* handle_mem, size_t dispatch_output_per_expert_alignment, cudaStream_t stream); + + void dispatch(uint64_t handle_id, void* handle_mem, const NVTETensor topk_idx, + const NVTETensor tokens, const NVTECommWindow& tokens_win, + const NVTETensor topk_weights, const NVTECommWindow& topk_weights_win, + NVTETensor recv_tokens, const NVTECommWindow& recv_tokens_win, + NVTETensor recv_topk_weights, const NVTECommWindow& recv_topk_weights_win, + cudaStream_t stream); + + void combine(uint64_t handle_id, void* handle_mem, const NVTETensor expert_out, + const NVTECommWindow& expert_out_win, NVTETensor result, cudaStream_t stream); + + // g_recv_topk_weights: 1D [recv_capacity] f32; grad_topk_weights: 2D [T, top_k] f32. + void dispatch_bwd(uint64_t handle_id, void* handle_mem, const NVTETensor grad, + const NVTECommWindow& grad_win, const NVTETensor g_recv_topk_weights, + const NVTECommWindow& g_recv_topk_weights_win, NVTETensor grad_tokens, + NVTETensor grad_topk_weights, cudaStream_t stream); + + void combine_bwd(uint64_t handle_id, void* handle_mem, const NVTETensor grad, + const NVTECommWindow& grad_win, NVTETensor grad_expert_out, + const NVTECommWindow& grad_expert_out_win, cudaStream_t stream); + + private: + EPBackend() = default; + ~EPBackend(); + EPBackend(const EPBackend&) = delete; + EPBackend& operator=(const EPBackend&) = delete; + + // ep_comm is borrowed — caller retains ownership across the backend lifetime. + void init(ncclComm_t ep_comm, NVTEEpGroupConfig config); + + static EPBackend& instance(); // Meyers singleton accessor + static void validate_config(const NVTEEpGroupConfig& config); + + static ncclDataType_t nvte_dtype_to_nccl(NVTEDType dtype); + // Open a transient ncclEpHandle over handle_mem. num_topk=-1 for paths + // that don't carry per-token weights. + ncclEpHandle_t open_handle(void* handle_mem, size_t handle_mem_size, int num_topk, + size_t dispatch_output_per_expert_alignment); + + ncclEpGroup_t ep_group_{nullptr}; + ncclComm_t ep_comm_{nullptr}; + NVTEEpGroupConfig group_config_{}; + bool initialized_{false}; + std::mutex mutex_; + struct HandleEntry { + size_t handle_mem_size; + size_t alignment; + int top_k; + // Persistent ncclEpHandle bound to cached_handle_mem. Lazily opened on first + // op; reused while handle_mem ptr is unchanged. Destroyed in shutdown(). + ncclEpHandle_t cached_handle{nullptr}; + void* cached_handle_mem{nullptr}; + }; + std::unordered_map handles_; + std::atomic next_handle_id_{1}; // 0 reserved as "no id" + size_t handle_cache_cap_{0}; // set lazily from NVTE_EP_HANDLE_CACHE_SIZE + + // Caller must hold mutex_. Throws on cap overflow. + uint64_t insert_new_entry(size_t handle_mem_size, int top_k, size_t alignment); + HandleEntry& lookup_config(uint64_t handle_id); + // Caller must hold mutex_. Returns the cached handle if handle_mem matches. + // On mismatch: if group_config_.allow_handle_mem_reloc != 0, destroys the + // stale handle and opens a fresh one; otherwise throws. + ncclEpHandle_t get_or_open_handle(HandleEntry& cfg, void* handle_mem); +}; + +} // namespace ep +} // namespace transformer_engine + +#endif // TRANSFORMER_ENGINE_COMMON_EP_EP_BACKEND_H_ diff --git a/transformer_engine/common/include/transformer_engine/comm_window.h b/transformer_engine/common/include/transformer_engine/comm_window.h new file mode 100644 index 0000000000..088ea7f0c3 --- /dev/null +++ b/transformer_engine/common/include/transformer_engine/comm_window.h @@ -0,0 +1,32 @@ +/************************************************************************* + * Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See LICENSE for license information. + ************************************************************************/ + +/*! \file comm_window.h + * \brief Borrowed symmetric-memory window + offset for zero-copy one-sided ops. + * Pass ``{NULL, 0}`` to use the raw-pointer path. + */ + +#ifndef TRANSFORMER_ENGINE_COMM_WINDOW_H_ +#define TRANSFORMER_ENGINE_COMM_WINDOW_H_ + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/*! \brief NCCL window + byte offset for a zero-copy payload tensor. */ +typedef struct { + ncclWindow_t window; /*!< NCCL window, or NULL to use the raw data pointer. */ + uint64_t offset; /*!< Byte offset of the payload within ``window``. */ +} NVTECommWindow; + +#ifdef __cplusplus +} +#endif + +#endif // TRANSFORMER_ENGINE_COMM_WINDOW_H_ diff --git a/transformer_engine/common/include/transformer_engine/ep.h b/transformer_engine/common/include/transformer_engine/ep.h new file mode 100644 index 0000000000..22e7ec48ac --- /dev/null +++ b/transformer_engine/common/include/transformer_engine/ep.h @@ -0,0 +1,177 @@ +/************************************************************************* + * Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See LICENSE for license information. + ************************************************************************/ + +/*! \file ep.h + * \brief Public C API for Expert Parallelism. Per-step ops are allocation-free + * and CUDA graph-capturable. + */ + +#ifndef TRANSFORMER_ENGINE_EP_H_ +#define TRANSFORMER_ENGINE_EP_H_ + +#include +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* ── Config structs ─────────────────────────────────────────────────────── */ +/* TODO: add a struct_size/version field to these configs (and align with other + * TE public structs) once a TE-wide convention for ABI versioning lands. */ + +/*! \brief Group-level EP configuration (fixed for the EP group lifetime). */ +typedef struct { + int ep_size; /*!< EP world size. */ + int num_experts; /*!< Total experts across all ranks. */ + int max_tokens_per_rank; /*!< Upper bound on tokens this rank sends per dispatch. */ + /*! Upper bound on tokens received per dispatch (worst-case top_k fan-out; must be > 0). */ + int max_recv_tokens_per_rank; + int hidden_dim; /*!< Token hidden dimension. */ + int max_num_sms; /*!< Max SMs for EP kernels. 0 = auto. */ + /*! 0 (default): throw on relocated handle_mem for a cached handle_id. 1: silently rebuild. */ + int allow_handle_mem_reloc; + /*! Widest token dtype the group will dispatch. Sizes NCCL EP staging buffers + * at group create. Tensors passed to nvte_ep_dispatch may use any dtype whose + * element size is <= sizeof(max_token_dtype). */ + NVTEDType max_token_dtype; +} NVTEEpGroupConfig; + +/*! \brief Per-layer EP configuration. */ +typedef struct { + int num_local_experts; /*!< Reserved for ABI stability (derived from group config). */ + int top_k; /*!< Per-token expert fan-out. Required. */ + size_t dispatch_output_per_expert_alignment; + /*!< Per-expert zone alignment in tokens (pow2; 0/1 = no padding). Must match + * between nvte_ep_register_layer and nvte_ep_prepare. */ +} NVTEEpLayerConfig; + +/* ── Bootstrap ──────────────────────────────────────────────────────────── */ + +/*! \brief Bootstrap from an existing NCCL EP sub-communicator. Requires SM>=90. + * + * ep_comm is borrowed and must span exactly group_config.ep_size ranks. + * The caller retains ownership and must keep ep_comm alive until + * nvte_ep_shutdown() returns; destroying it earlier is undefined behavior. + * Re-init after shutdown is allowed; double-init throws. + * + * One EP group per process, bound to the current CUDA device at initialize + * time. Multiple GPUs per process are not supported. + * + * \param[in] ep_comm Opaque ncclComm_t for the EP sub-group. + * \param[in] group_config Group-level EP configuration. + */ +void nvte_ep_initialize(void* ep_comm, NVTEEpGroupConfig group_config); + +/*! \brief Tear down the EP backend. Idempotent. Does not destroy ep_comm. */ +void nvte_ep_shutdown(void); + +/* ── Layer registration (host-only, eager) ───────────────────────────────── */ + +/*! \brief Reserve a handle_id for a layer config and report the handle_mem buffer + * size the caller must allocate. Host-only. + * + * Registration is intended to be static (once per layer at model init). There is + * no per-layer unregister API; all registrations are released by nvte_ep_shutdown. + * Re-registering the same layer config each step is not supported and will + * eventually exhaust the handle cache (NVTE_EP_HANDLE_CACHE_SIZE, default 8192). + * + * \param[in] layer_config Per-layer EP configuration. + * \param[out] handle_mem_size Bytes the caller must allocate for handle_mem. + * \return uint64_t handle_id (non-zero). + */ +uint64_t nvte_ep_register_layer(NVTEEpLayerConfig layer_config, size_t* handle_mem_size); + +/*! \brief Per-step handle: the registered handle_id paired with its handle_mem buffer. */ +typedef struct { + uint64_t id; /*!< Handle id from nvte_ep_register_layer. */ + NVTETensor mem; /*!< Caller-allocated handle_mem buffer (size from nvte_ep_register_layer). */ +} NVTEEpHandle; + +/* ── Per-step ops (all allocation-free, CUDA graph-capturable) ──────────── */ + +/*! \brief AllGather the routing map; write per-expert counts and cache routing + * metadata in handle.mem for the subsequent dispatch/combine. + * + * \param[in] handle EP handle (id + mem buffer). + * \param[in] topk_idx [T, top_k] int64 routing indices. + * \param[out] token_counts [num_local_experts] int32 counts. + * \param[in] dispatch_output_per_expert_alignment Must match the handle_mem sizing. + * \param[in] stream CUDA stream. + */ +void nvte_ep_prepare(NVTEEpHandle handle, NVTETensor topk_idx, NVTETensor token_counts, + size_t dispatch_output_per_expert_alignment, cudaStream_t stream); + +/*! \brief Dispatch tokens (and routing weights) to expert ranks. + * + * \param[in] handle EP handle (id + mem buffer). + * \param[in] topk_idx [T, top_k] int64 sparse routing indices. + * \param[in] tokens [T, hidden_dim] input tokens. + * \param[in] tokens_win Optional symmem window for ``tokens``. + * \param[in] topk_weights [T, top_k] float32 weights, or null in backward. + * \param[in] topk_weights_win Optional symmem window for ``topk_weights``. + * \param[out] recv_tokens [recv_T, hidden_dim] received tokens. + * \param[in] recv_tokens_win Optional symmem window for ``recv_tokens``. + * \param[out] recv_topk_weights [recv_T] float32 per-slot weights, or null in backward. + * \param[in] recv_topk_weights_win Optional symmem window for ``recv_topk_weights``. + * \param[in] stream CUDA stream. + */ +void nvte_ep_dispatch(NVTEEpHandle handle, NVTETensor topk_idx, NVTETensor tokens, + NVTECommWindow tokens_win, NVTETensor topk_weights, + NVTECommWindow topk_weights_win, NVTETensor recv_tokens, + NVTECommWindow recv_tokens_win, NVTETensor recv_topk_weights, + NVTECommWindow recv_topk_weights_win, cudaStream_t stream); + +/*! \brief Scatter-sum expert outputs back to originating ranks. Unweighted — + * caller must pre-multiply expert_out by recv_topk_weights (and the + * valid-slot mask) before calling. + * + * \param[in] handle EP handle (id + mem buffer). + * \param[in] expert_out [recv_T, hidden_dim] pre-weighted expert outputs. + * \param[in] expert_out_win Optional symmem window for ``expert_out``. + * \param[out] result [T, hidden_dim] combined output. + * \param[in] stream CUDA stream. + */ +void nvte_ep_combine(NVTEEpHandle handle, NVTETensor expert_out, NVTECommWindow expert_out_win, + NVTETensor result, cudaStream_t stream); + +/*! \brief Backward of dispatch — routes token and weight grads back to source. + * + * \param[in] handle EP handle (id + mem buffer). + * \param[in] grad [recv_capacity, hidden_dim] grad w.r.t. recv_tokens. + * \param[in] grad_win Optional symmem window for ``grad``. + * \param[in] g_recv_topk_weights [recv_capacity] f32 grad w.r.t. recv_topk_weights. + * \param[in] g_recv_topk_weights_win Optional symmem window for ``g_recv_topk_weights``. + * \param[out] grad_tokens [T, hidden_dim] grad w.r.t. tokens. + * \param[out] grad_topk_weights [T, top_k] f32 grad w.r.t. topk_weights. + * \param[in] stream CUDA stream. + */ +void nvte_ep_dispatch_bwd(NVTEEpHandle handle, NVTETensor grad, NVTECommWindow grad_win, + NVTETensor g_recv_topk_weights, NVTECommWindow g_recv_topk_weights_win, + NVTETensor grad_tokens, NVTETensor grad_topk_weights, + cudaStream_t stream); + +/*! \brief Backward of combine. Padded slots in grad_expert_out are zeroed. + * + * \param[in] handle EP handle (id + mem buffer). + * \param[in] grad [T, hidden_dim] grad w.r.t. result. + * \param[in] grad_win Optional symmem window for ``grad``. + * \param[out] grad_expert_out [recv_capacity, hidden_dim] grad w.r.t. expert_out. + * \param[in] grad_expert_out_win Optional symmem window for ``grad_expert_out``. + * \param[in] stream CUDA stream. + */ +void nvte_ep_combine_bwd(NVTEEpHandle handle, NVTETensor grad, NVTECommWindow grad_win, + NVTETensor grad_expert_out, NVTECommWindow grad_expert_out_win, + cudaStream_t stream); + +#ifdef __cplusplus +} +#endif + +#endif // TRANSFORMER_ENGINE_EP_H_ diff --git a/transformer_engine/common/util/logging.h b/transformer_engine/common/util/logging.h index da8b9b377d..3308bd22e4 100644 --- a/transformer_engine/common/util/logging.h +++ b/transformer_engine/common/util/logging.h @@ -98,6 +98,14 @@ } \ } while (false) +#define NVTE_CHECK_NCCL(expr) \ + do { \ + const ncclResult_t status_NVTE_CHECK_NCCL = (expr); \ + if (status_NVTE_CHECK_NCCL != ncclSuccess) { \ + NVTE_ERROR("NCCL Error: ", ncclGetErrorString(status_NVTE_CHECK_NCCL)); \ + } \ + } while (false) + #ifdef NVTE_WITH_CUBLASMP #define NVTE_CHECK_CUBLASMP(expr) \