Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 154 additions & 90 deletions docs/orchestrator.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ SubmitResult Orchestrator::submit_next_level(Callable cb,

### Step details

**Step 1 — `ring_.alloc()`**: See [§5 Ring](#5-ring-slot--heap-allocator). Blocks the Orch thread
**Step 1 — `ring_.alloc()`**: See [§5 Ring](#5-ring-slot--per-scope-heap-allocator). Blocks the Orch thread
if all slots are in-flight; this is the system's back-pressure mechanism.

**Step 2 — store task data**: `TaskArgs` is moved (not copied). `config` is a
Expand Down Expand Up @@ -246,137 +246,201 @@ SubmitResult Orchestrator::submit_sub(Callable cb, TaskArgs args, const CallConf

---

## 5. Ring (slot + heap allocator)
## 5. Ring (slot + per-scope heap allocator)

`DistRing` owns three correlated per-task resources:

1. A **monotonic task id** — allocated by the Orchestrator, incremented
on every `alloc()`. There is no fixed window and no modulo wrap at
L3: slot state lives in parent-process heap (never crossed into
child workers), so the ring index L2 uses to address shmem
descriptors buys us nothing here (see plan Allowed Exception #6).
A monotonic `int32_t` gives ~2 billion ids per `reset_to_empty()`
interval, reset back to 0 at the end of every `Worker.run()`.
2. A **shared-memory heap slab** — bump-allocated under the same mutex,
FIFO-reclaimed via `last_alive_`. This still mirrors L2 (Strict-2):
the heap must be `mmap(MAP_SHARED)` and forked into child workers,
so it has to be pre-sized. `heap_ring_size` on the Worker ctor
controls the total bytes (default 1 GiB).
3. The **per-task slot state** (`DistTaskSlotState`) — stored in a
`std::deque<std::unique_ptr<...>>`. `std::deque::push_back` never
invalidates pointers to existing elements, so the pointer returned
by `slot_state(id)` stays valid until `reset_to_empty()` drops the
whole deque.
1. A **monotonic task id** — allocated on every `alloc()`, shared across
all rings. There is no fixed window and no modulo wrap at L3: slot
state lives in parent-process heap (never crossed into child workers),
so the ring index L2 uses to address shmem descriptors buys us nothing
here (see plan Allowed Exception #6). A monotonic `int32_t` gives ~2
billion ids per `reset_to_empty()` interval, reset to 0 at the end of
every `Worker.run()`.
2. **`DIST_MAX_RING_DEPTH = 4` independent shared-memory heap slabs**
(Strict-1; matches L2's `PTO2_MAX_RING_DEPTH`). Each slab has its own
`mmap(MAP_SHARED | MAP_ANONYMOUS)` region, bump cursor, FIFO
reclamation pointer, and mutex / cv. A task's ring is chosen by
**scope depth**:

```cpp
ring_idx = std::min(scope_depth, DIST_MAX_RING_DEPTH - 1);
```

so nested-scope tasks never share a FIFO head with outer-scope
long-lived allocations. The mapping is taken in the Worker ctor —
*before* any fork — so forked child workers inherit every ring at
the same virtual address range. `heap_ring_size` on the Worker ctor
is the **per-ring** size (default 1 GiB → 4 GiB total VA reservation;
physical pages stay lazy).
3. The **per-task slot state** (`DistTaskSlotState`) — stored in a single
`std::deque<std::unique_ptr<...>>` shared across rings. Each slot
records its `ring_idx` and `ring_slot_idx` (position within that
ring's FIFO order). `std::deque::push_back` never invalidates pointers
to existing elements, so the pointer returned by `slot_state(id)`
stays valid until `reset_to_empty()` drops the whole deque.

```cpp
struct DistAllocResult {
TaskSlot slot;
void *heap_ptr; // nullptr when alloc(0)
uint64_t heap_end_offset; // recorded per-slot for FIFO reclamation
uint64_t heap_end_offset; // byte offset within the selected ring
int32_t ring_idx; // which of the DIST_MAX_RING_DEPTH rings was used
};

class DistRing {
public:
void init(uint64_t heap_bytes, // default 1 GiB, Worker-configurable
uint32_t timeout_ms); // default 10 s

DistAllocResult alloc(uint64_t bytes = 0); // blocks on heap full, throws on timeout
void release(TaskSlot sid); // FIFO-advances last_alive
DistTaskSlotState *slot_state(TaskSlot sid); // pointer-stable until reset_to_empty
void reset_to_empty(); // drop per-task state at drain boundary
void shutdown();

void *heap_base() const;
uint64_t heap_size() const;
// Initialise DIST_MAX_RING_DEPTH heap rings, each of heap_bytes.
// Total VA = DIST_MAX_RING_DEPTH * heap_bytes.
void init(uint64_t heap_bytes, // per-ring, default 1 GiB
uint32_t timeout_ms); // default 10 s

// Pick ring = min(scope_depth, DIST_MAX_RING_DEPTH - 1); reserve a
// slab from that ring (blocks on its cv) and stamp the slot state
// with ring_idx / ring_slot_idx.
DistAllocResult alloc(uint64_t bytes = 0, int32_t scope_depth = 0);
void release(TaskSlot sid); // FIFO-advances THAT slot's ring
DistTaskSlotState *slot_state(TaskSlot sid);
void reset_to_empty(); // rewinds every ring
void shutdown();

// Per-ring introspection
void *heap_base(int32_t ring_idx) const;
uint64_t heap_size(int32_t ring_idx) const;
uint64_t heap_top (int32_t ring_idx) const;
uint64_t heap_tail(int32_t ring_idx) const;
};
```

**Back-pressure rationale**: only the heap can be full at L3. `alloc()`
spin-waits on a cv; if `timeout_ms` elapses with no progress, it throws
`std::runtime_error`. That surfaces as a Python exception so users can
enlarge `heap_ring_size` on the `Worker` instead of deadlocking.
**Back-pressure is per-ring**: only the selected ring's heap can be full
for a given `alloc()`. `alloc()` spin-waits on that ring's cv while
other rings remain fully usable; if `timeout_ms` elapses with no
progress, it throws `std::runtime_error`. That surfaces as a Python
exception so users can enlarge `heap_ring_size` on the `Worker` instead
of deadlocking.

**Alignment**: every heap allocation is rounded up to `DIST_HEAP_ALIGN = 1024 B`
(matches L2's `PTO2_PACKED_OUTPUT_ALIGN`, Strict-3).

**Heap mapping**: the heap region is a single `mmap(MAP_SHARED | MAP_ANONYMOUS)`
taken in the `DistWorker` ctor — *before* any fork — so forked child workers
inherit the same virtual address range.

**FIFO reclamation**: each `alloc()` records the slot's `heap_end_offset`.
`release(slot)` flags that slot consumed and advances `last_alive_` as long
as the next-oldest slot is also released, walking the `heap_tail_` forward
accordingly. Heap space is reclaimed implicitly; no per-slot `munmap` runs.

**End-of-run reset**: `DistOrchestrator::drain()` waits for `active_tasks_`
to hit 0, then calls `ring.reset_to_empty()` which clears the deque of
slot states, zeroes the heap cursors, and resets `next_task_id_` to 0.
Memory per `Worker.run()` is bounded by that run's peak alive task count;
nothing accumulates across runs.
**FIFO reclamation per ring**: each `alloc()` appends the slot's
`heap_end_offset` onto the selected ring's `slot_heap_end[]` vector, and
pushes a `released=0` byte. `release(slot)` looks up the slot's ring via
`slot.ring_idx` and advances **that ring's** `last_alive` as long as the
next-oldest in-ring slot is released, walking the ring's `heap_tail`
forward. Rings never touch each other — inner-scope tasks reclaim
without waiting for an outer-scope task to finish.

**End-of-run reset**: `DistOrchestrator::drain()` waits for
`active_tasks_` to hit 0, then calls `ring.reset_to_empty()` which
drops the whole slot-state deque *and* rewinds every ring's cursors /
`released[]` / `slot_heap_end[]` back to 0. Memory per `Worker.run()`
is bounded by that run's peak alive task count; nothing accumulates
across runs.

**Locking**: each ring has its own `mu` / `cv`; the shared
`next_task_id_` and slot deque are guarded by a separate `slots_mu_`.
`alloc()` holds ring.mu (back-pressure wait + reserve in-ring position),
releases it, then takes `slots_mu_` briefly to publish the new slot —
no nested locking. `reset_to_empty()` takes `slots_mu_` first and each
ring's mu sequentially (nested, outer is `slots_mu_`); readers that
need both lock in the same order.

**Ownership by role**:

| Field | Writer | Reader |
| ----- | ------ | ------ |
| `next_task_id_`, `heap_top_` | Orch (`alloc`, under `mu_`) | Allocator only |
| `last_alive_`, `heap_tail_`, `released_[]` | `release` (scheduler or Orch thread) | Allocator only |
| `slot_heap_end_[]`, `slot_states_[]` | Orch at alloc | `release` / `slot_state()` readers |

All shared state is guarded by a single mutex. The Orch thread is the only
writer of `next_task_id_` / `heap_top_`, so the mutex serves primarily to
coordinate with `release` and to protect the back-pressure condition
variable. `slot_state()` takes the mutex briefly to read the deque cell, but
the returned pointer is safe to dereference without the mutex because
`std::deque::push_back` doesn't invalidate existing elements.
| `next_task_id_`, `slot_states_` | `alloc` under `slots_mu_` | `slot_state`, `next_task_id`, `reset_to_empty` |
| `rings_[r].top`, `rings_[r].released[]`, `rings_[r].slot_heap_end[]` | `alloc` under `rings_[r].mu` | `release` under `rings_[r].mu`, introspection accessors |
| `rings_[r].tail`, `rings_[r].last_alive` | `release` under `rings_[r].mu` | same; `reset_to_empty` |
| `slot.ring_idx`, `slot.ring_slot_idx` | `alloc` (stamped before return) | `release` |

---

## 6. Scope

Scope solves: **"how do we release a task's ring slot if it has no downstream
consumer?"**
Scope solves two concerns at once:

Every slot has a `fanout_total` counter: the number of outstanding references
(downstream consumers + any scope refs). A slot transitions to `CONSUMED`
(ring slot freed) only when `fanout_total == fanout_released`.
1. **Lifetime anchoring** — release a task's ring slot even when it has no
downstream consumer, so leaf tasks don't strand heap bytes.
2. **Per-scope reclamation** — tasks submitted inside an inner scope bind
to a deeper HeapRing (§5), so a long-lived outer-scope task cannot hold
the FIFO head against inner-scope churn.

Without scope, a leaf task (no consumers, `fanout_total = 0`) would reach
COMPLETED but never transition further — but then all its outputs have been
observed at the earliest moment, so it's actually fine in this degenerate
case. The problem appears when user code does this:
Every slot has a `fanout_total` counter: the number of outstanding
references (downstream consumers + any scope refs). A slot transitions to
`CONSUMED` (slot + heap slab freed) only when `fanout_released` meets the
threshold (`>= total + 1`; see §8 fanout-release threshold).

```python
def my_orch(orch, args, cfg):
r = orch.submit_next_level(...) # produces tensor X
# no one consumes X within this DAG
# without scope: slot stays, ring fills up eventually
```

Scope adds a deferred reference that releases at `scope_end`:
Without scope, a leaf task (no consumers, `fanout_total = 0`) would reach
COMPLETED but never transition further. Scope adds a deferred reference
that releases at `scope_end`:

```cpp
class Scope {
class DistScope {
public:
void scope_begin();
void scope_end(const std::function<void(TaskSlot)> &release_ref);
void register_task(TaskSlot sid); // called by Orchestrator.submit_*
void scope_begin();
void scope_end(const std::function<void(TaskSlot)> &release_ref);
void register_task(TaskSlot sid); // called by Orchestrator.submit_*
int32_t depth() const; // 1-based: 0 = no open scope
int32_t current_depth() const; // 0-based: L2-style; used for ring selection
private:
std::vector<std::vector<TaskSlot>> depth_; // stack of scope levels
std::vector<std::vector<TaskSlot>> stack_;
};
```

`Worker::run` always opens one outer scope; user orch fns may nest up to
`DIST_MAX_SCOPE_DEPTH = 64` additional scopes on top. Ring selection uses
the 0-based `current_depth()`:

| Where you are | `depth()` | `current_depth()` | Ring |
| ------------- | --------- | ----------------- | ---- |
| outer (Worker.run-only) scope | 1 | 0 | 0 |
| `with orch.scope():` | 2 | 1 | 1 |
| nested x 2 | 3 | 2 | 2 |
| nested x 3 | 4 | 3 | 3 |
| nested x 4 or deeper | >= 5 | >= 4 | 3 (clamp) |

Flow:

1. `scope_begin` pushes an empty vector onto the depth stack
2. Each `submit_*` calls `scope.register_task(sid)`, appending to the top
vector and bumping `slots_[sid].fanout_total` by 1
3. `scope_end` pops the top vector; for each `sid`, releases the scope ref
(`release_ref(sid)` decrements `fanout_total` bookkeeping and may
transition the slot to CONSUMED)
1. `scope_begin` pushes an empty frame onto `stack_`.
2. Each `submit_*` calls `scope.register_task(sid)`; the Orchestrator
has already set `slot.fanout_total = scope_ref` (1 when `depth() > 0`)
and stamped `slot.ring_idx = dist_ring_idx_for_scope(current_depth())`
before the call.
3. `scope_end` pops the frame; for each `sid`, invokes the release
callback (`Orchestrator::release_ref`) which bumps `fanout_released`
by 1 and transitions the slot to CONSUMED if the threshold is met.

### User-facing API

```python
def my_orch(orch, args, cfg):
with orch.scope(): # ring 1
orch.submit_next_level(chip_a, a_args, cfg)
orch.submit_next_level(chip_b, b_args, cfg)
# Inner tasks are now eligible for reclamation on ring 1,
# without waiting for any outer-scope task.
orch.submit_next_level(chip_c, c_args, cfg) # ring 0 (outer)
```

`with orch.scope():` is the recommended form. Raw `orch.scope_begin()` /
`orch.scope_end()` are exposed too, primarily for advanced use where the
context manager would be awkward (e.g., scopes that span a user-level
state machine).

### Why `scope_end` is non-blocking

`scope_end` walks the scope's tasks and bumps each one's `fanout_released`
counter, then returns. A task whose `fanout_released` now meets the
threshold transitions to CONSUMED inline; others stay COMPLETED / RUNNING /
PENDING until the scheduler and consumers finish their own releases. This
mirrors L2's `pto2_scope_end`.

Nested scopes are supported (the stack structure). For now only `Worker::run`
opens a single top-level scope; nested scopes would be a future extension for
explicit user scoping.
Users who need a synchronous wait for *all* in-flight tasks must call
`drain()` (or let `Worker::run` finish — its outer `scope_end` is followed
by `drain()` before the call returns). There is deliberately no
per-scope drain primitive: the extra machinery (per-scope active counter
and cv) would only pay for itself in patterns we do not have yet.

---

Expand Down
39 changes: 21 additions & 18 deletions docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ get if I pip install `main` today", this page.
`Orchestrator._scope_begin` / `_scope_end` / `_drain` are invoked by
the Python `Worker.run` facade only.
- **`orch.alloc(shape, dtype)`** — runtime-owned intermediate buffer
carved out of the Worker's HeapRing (a single
`mmap(MAP_SHARED | MAP_ANONYMOUS)` region taken in the `DistWorker`
ctor, before fork, inherited by child workers at the same virtual
address). Lifetime follows a synthetic task slot; the slab is
reclaimed implicitly by the allocator once all downstream consumers
have completed and `last_alive` sweeps over it (see
carved out of the Worker's HeapRing (per-scope ring chosen by the
caller's scope depth; see "Per-scope HeapRing" below). Lifetime
follows a synthetic task slot; the slab is reclaimed implicitly by
the allocator once all downstream consumers have completed and the
ring's `last_alive` sweeps over it (see
[orchestrator.md](orchestrator.md) §8b).
- **`OUTPUT` auto-allocation** — `OUTPUT`-tagged tensors submitted with
`data == 0` are auto-allocated from the same HeapRing as part of the
Expand All @@ -59,11 +58,24 @@ get if I pip install `main` today", this page.
(see [orchestrator.md](orchestrator.md) §8b "Tag semantics for
write-after-write"). `OUTPUT_EXISTING` is never auto-allocated.
- **`heap_ring_size` knob** — `Worker(level=3, heap_ring_size=...)`
selects the HeapRing size (default 1 GiB). The underlying
`DistWorker(level, heap_ring_size)` ctor also installs fork hygiene
(setenv of `OMP/BLIS/OPENBLAS/MKL_NUM_THREADS=1`, plus
selects the **per-ring** HeapRing size (default 1 GiB; total VA
reservation is `heap_ring_size * DIST_MAX_RING_DEPTH`). The
underlying `DistWorker(level, heap_ring_size)` ctor also installs
fork hygiene (setenv of `OMP/BLIS/OPENBLAS/MKL_NUM_THREADS=1`, plus
`KMP_DUPLICATE_LIB_OK=TRUE` on macOS, and a `pthread_atfork` landing
pad).
- **Per-scope HeapRing (Strict-1) + user-facing nested scope** —
`DistRing` owns `DIST_MAX_RING_DEPTH = 4` independent HeapRing
instances, each its own `mmap(MAP_SHARED | MAP_ANONYMOUS)` taken
before fork. Ring selection is driven by scope depth
(`min(scope_depth, DIST_MAX_RING_DEPTH - 1)`); every ring has its own
`mu` / `cv` / `last_alive`, so inner-scope tasks reclaim independently
of outer-scope tasks. `Orchestrator::scope_begin` / `scope_end` are
now user-facing (bound on the nanobind `DistOrchestrator`); the
Python facade adds a `with orch.scope():` context manager. Outermost
scope is still opened by `Worker::run`. Max user nesting is
`DIST_MAX_SCOPE_DEPTH = 64`; scopes deeper than the ring depth share
the innermost ring.

### Dispatch internals

Expand Down Expand Up @@ -98,15 +110,6 @@ get if I pip install `main` today", this page.

## In flight / not yet landed

### PR-Scope: user-facing nested scope + Strict-1 per-scope rings

- Expose `Orchestrator.scope_begin` / `scope_end` to the user's orch fn
(allow nesting up to `DIST_MAX_SCOPE_DEPTH = 64`).
- Refactor `DistRing` into `DIST_MAX_RING_DEPTH = 4` independent
HeapRing instances; `alloc_for_scope(depth)` picks one. Per-ring
`last_alive` so deeply nested scopes never contend on a single
reclamation pointer.

### PR-D: WorkerThread unification + per-shape ready queues

- Fold `DistChipProcess` / `DistSubWorker` into `WorkerThread` with
Expand Down
Loading
Loading