Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions python/bindings/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ set(DIST_SOURCES
${DIST_SRC}/dist_ring.cpp
${DIST_SRC}/dist_scope.cpp
${DIST_SRC}/dist_orchestrator.cpp
${DIST_SRC}/dist_sub_worker.cpp
${DIST_SRC}/dist_chip_process.cpp
${DIST_SRC}/dist_worker_manager.cpp
${DIST_SRC}/dist_scheduler.cpp
${DIST_SRC}/dist_worker.cpp
Expand Down
99 changes: 29 additions & 70 deletions python/bindings/dist_worker_bind.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@
*/

/**
* Nanobind bindings for the distributed runtime (DistWorker, DistOrchestrator,
* mailbox helpers).
* Nanobind bindings for the distributed runtime (DistWorker, DistOrchestrator).
*
* Compiled into the same _task_interface extension module as task_interface.cpp.
* Call bind_dist_worker(m) from the NB_MODULE definition in task_interface.cpp.
*
* PR-D-2: `DistChipProcess` and `DistSubWorker` bindings are removed; their
* PROCESS-mode dispatch logic now lives inside `WorkerThread`. Python callers
* register PROCESS-mode workers via `add_next_level_process(mailbox_ptr)` /
* `add_sub_process(mailbox_ptr)` instead of wrapping an IWorker subclass.
*/

#pragma once
Expand All @@ -28,11 +32,10 @@

#include "chip_worker.h"
#include "dist_ring.h"
#include "dist_chip_process.h"
#include "dist_orchestrator.h"
#include "dist_sub_worker.h"
#include "dist_types.h"
#include "dist_worker.h"
#include "dist_worker_manager.h"

namespace nb = nanobind;

Expand All @@ -54,45 +57,7 @@ inline void bind_dist_worker(nb::module_ &m) {
return r.task_slot;
});

// --- DistSubWorker ---
// The fork + Python callable loop are managed from Python (HostWorker.__init__).
// This class only handles dispatch/poll via the shared-memory mailbox.
nb::class_<DistSubWorker>(m, "DistSubWorker")
.def(
"__init__",
[](DistSubWorker *self, uint64_t mailbox_ptr) {
new (self) DistSubWorker(reinterpret_cast<void *>(mailbox_ptr));
},
nb::arg("mailbox_ptr"), "Wrap a shared-memory mailbox pointer (uint64_t address)."
)
.def("shutdown", &DistSubWorker::shutdown);

// Python can use this constant to allocate mailboxes of the right size.
m.attr("DIST_SUB_MAILBOX_SIZE") = static_cast<int>(DIST_SUB_MAILBOX_SIZE);

// --- DistChipProcess ---
// Fork + host_runtime.so init are managed from Python (Worker.__init__).
// This class handles dispatch/poll via the chip mailbox (4096 bytes).
nb::class_<DistChipProcess>(m, "DistChipProcess")
.def(
"__init__",
[](DistChipProcess *self, uint64_t mailbox_ptr, size_t args_size) {
new (self) DistChipProcess(reinterpret_cast<void *>(mailbox_ptr), args_size);
},
nb::arg("mailbox_ptr"), nb::arg("args_size"),
"Wrap a chip mailbox pointer. args_size = sizeof(ChipStorageTaskArgs)."
)
.def("shutdown", &DistChipProcess::shutdown);

m.attr("DIST_CHIP_MAILBOX_SIZE") = static_cast<int>(DIST_CHIP_MAILBOX_SIZE);

// --- DistOrchestrator (DAG builder, exposed via DistWorker.get_orchestrator()) ---
//
// Returned as a reference borrowed from the parent DistWorker. Lifetime is
// tied to the DistWorker; using the handle after dw.close() / dw destruction
// is undefined behaviour. The Python facade in simpler/orchestrator.py keeps
// a strong reference to the parent DistWorker for the lifetime of the
// Orchestrator.
nb::class_<DistOrchestrator>(m, "DistOrchestrator")
.def(
"submit_next_level",
Expand Down Expand Up @@ -136,19 +101,11 @@ inline void bind_dist_worker(nb::module_ &m) {
"Allocate an intermediate ContinuousTensor from the orchestrator's MAP_SHARED "
"pool (visible to forked child workers). Lifetime: until the next Worker.run() call."
)
// User-facing nested scope (Strict-1). Tasks submitted between
// `scope_begin` and `scope_end` bind to a heap ring chosen by the
// current nesting depth (`min(depth, DIST_MAX_RING_DEPTH - 1)`),
// reclaiming independently of outer-scope tasks. Non-blocking:
// `scope_end` releases scope refs and returns; use `_drain()` for
// a synchronous wait.
.def(
"scope_begin", &DistOrchestrator::scope_begin,
"Open a nested scope. Max nesting depth = DIST_MAX_SCOPE_DEPTH (64)."
)
.def("scope_end", &DistOrchestrator::scope_end, "Close the innermost scope. Non-blocking.")
// Aliases used by the Python `Worker.run` facade to open/close the
// outermost scope without looking like a user-facing API.
.def("_scope_begin", &DistOrchestrator::scope_begin)
.def("_scope_end", &DistOrchestrator::scope_end)
.def(
Expand All @@ -157,50 +114,51 @@ inline void bind_dist_worker(nb::module_ &m) {
);

// --- DistWorker ---
//
// `heap_ring_size` is the MAP_SHARED|MAP_ANONYMOUS region the Orchestrator
// hands out for auto-allocated OUTPUT slabs and `orch.alloc()` buffers.
// The mapping is taken in the ctor, before the Python caller forks any
// child workers, so children see the same bytes at the same virtual
// address.
nb::class_<DistWorker>(m, "DistWorker")
.def(
nb::init<int32_t, uint64_t>(), nb::arg("level"), nb::arg("heap_ring_size") = DIST_DEFAULT_HEAP_RING_SIZE,
"Create a DistWorker for the given hierarchy level (3=L3, 4=L4, …). "
"`heap_ring_size` selects the MAP_SHARED heap mmap'd in the ctor "
"(default 1 GiB)."
"`heap_ring_size` selects the per-ring MAP_SHARED heap mmap'd in the ctor "
"(default 1 GiB; total VA = 4 × heap_ring_size)."
)

// THREAD-mode registration (parent calls worker->run directly).
.def(
"add_next_level_worker",
[](DistWorker &self, DistWorker &w) {
self.add_worker(WorkerType::NEXT_LEVEL, &w);
},
nb::arg("worker"), "Add a lower-level DistWorker as a NEXT_LEVEL sub-worker."
nb::arg("worker"), "Add a lower-level DistWorker as a NEXT_LEVEL sub-worker (THREAD mode)."
)

.def(
"add_next_level_worker",
[](DistWorker &self, ChipWorker &w) {
self.add_worker(WorkerType::NEXT_LEVEL, &w);
},
nb::arg("worker"), "Add a ChipWorker as a NEXT_LEVEL sub-worker."
nb::arg("worker"), "Add a ChipWorker as a NEXT_LEVEL sub-worker (THREAD mode)."
)

// PROCESS-mode registration (parent writes unified mailbox; child runs
// the real IWorker in its own address space).
.def(
"add_next_level_worker",
[](DistWorker &self, DistChipProcess &w) {
self.add_worker(WorkerType::NEXT_LEVEL, &w);
"add_next_level_process",
[](DistWorker &self, uint64_t mailbox_ptr) {
self.add_process_worker(WorkerType::NEXT_LEVEL, reinterpret_cast<void *>(mailbox_ptr));
},
nb::arg("worker"), "Add a forked process as a NEXT_LEVEL sub-worker."
nb::arg("mailbox_ptr"),
"Add a PROCESS-mode NEXT_LEVEL worker. `mailbox_ptr` is the address of a "
"DIST_MAILBOX_SIZE-byte MAP_SHARED region. The child process loop is "
"Python-managed (fork + _chip_process_loop)."
)

.def(
"add_sub_worker",
[](DistWorker &self, DistSubWorker &w) {
self.add_worker(WorkerType::SUB, &w);
"add_sub_process",
[](DistWorker &self, uint64_t mailbox_ptr) {
self.add_process_worker(WorkerType::SUB, reinterpret_cast<void *>(mailbox_ptr));
},
nb::arg("worker"), "Add a SubWorker (fork/shm) as a SUB sub-worker."
nb::arg("mailbox_ptr"),
"Add a PROCESS-mode SUB worker. `mailbox_ptr` is the address of a "
"DIST_MAILBOX_SIZE-byte MAP_SHARED region. The child process loop is "
"Python-managed (fork + _sub_worker_loop)."
)

.def("init", &DistWorker::init, "Start the Scheduler thread.")
Expand All @@ -212,6 +170,7 @@ inline void bind_dist_worker(nb::module_ &m) {
);

m.attr("DIST_DEFAULT_HEAP_RING_SIZE") = static_cast<uint64_t>(DIST_DEFAULT_HEAP_RING_SIZE);
m.attr("DIST_MAILBOX_SIZE") = static_cast<int>(DIST_MAILBOX_SIZE);
m.attr("DIST_MAX_RING_DEPTH") = static_cast<int32_t>(DIST_MAX_RING_DEPTH);
m.attr("DIST_MAX_SCOPE_DEPTH") = static_cast<int32_t>(DIST_MAX_SCOPE_DEPTH);
}
10 changes: 2 additions & 8 deletions python/simpler/task_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@

from _task_interface import ( # pyright: ignore[reportMissingImports]
CONTINUOUS_TENSOR_MAX_DIMS,
DIST_CHIP_MAILBOX_SIZE,
DIST_SUB_MAILBOX_SIZE,
DIST_MAILBOX_SIZE,
ArgDirection,
ChipCallable,
ChipCallConfig,
ChipStorageTaskArgs,
ContinuousTensor,
CoreCallable,
DataType,
DistChipProcess,
DistOrchestrator,
DistSubmitResult,
DistSubWorker,
DistWorker,
TaskArgs,
TaskState,
Expand Down Expand Up @@ -65,11 +62,8 @@
"TaskState",
"DistOrchestrator",
"DistSubmitResult",
"DistSubWorker",
"DistChipProcess",
"DistWorker",
"DIST_SUB_MAILBOX_SIZE",
"DIST_CHIP_MAILBOX_SIZE",
"DIST_MAILBOX_SIZE",
]


Expand Down
Loading
Loading