From cd0d8635208e0c9cbb6c2fd28a936e506e0fed80 Mon Sep 17 00:00:00 2001 From: poursoul Date: Wed, 22 Apr 2026 09:54:12 +0800 Subject: [PATCH] Docs: update MULTI_RING.md and RUNTIME_LOGIC.md for PR #622 refactor Align documentation with the PTO2SharedMemoryRingHeader consolidation: - RingHeader now carries per-ring data pointers and accessors - SharedMemoryHandle reduced to lifecycle-only role - RingSchedState uses ring pointer instead of cached members - Remove stale heap_tail_gen / dep_list_pool references - Update SM layout, size formula, and watermark pseudocode --- .../docs/MULTI_RING.md | 61 +++++++++++++------ .../docs/RUNTIME_LOGIC.md | 40 ++++++------ .../docs/MULTI_RING.md | 61 +++++++++++++------ .../docs/RUNTIME_LOGIC.md | 40 ++++++------ 4 files changed, 124 insertions(+), 78 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.md b/src/a2a3/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.md index 09d732784..e146fd823 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.md +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.md @@ -84,43 +84,67 @@ struct PTO2RingFlowControl { std::atomic heap_tail; // heap reclaim pointer }; -struct PTO2SharedMemoryRingHeader { +struct alignas(64) PTO2SharedMemoryRingHeader { PTO2RingFlowControl fc; + + // Layout metadata (set once at init) uint64_t task_window_size; + int32_t task_window_mask; // task_window_size - 1 uint64_t heap_size; uint64_t task_descriptors_offset; + + // Per-ring data pointers (host-side, set by pto2_sm_setup_pointers) + PTO2TaskDescriptor *task_descriptors; + PTO2TaskPayload *task_payloads; + PTO2TaskSlotState *slot_states; + + // Accessors (slot = local_id & task_window_mask) + PTO2TaskDescriptor &get_task_by_slot(int32_t slot); + PTO2TaskDescriptor &get_task_by_task_id(int32_t local_id); + PTO2TaskPayload &get_payload_by_slot(int32_t slot); + PTO2TaskPayload &get_payload_by_task_id(int32_t local_id); + PTO2TaskSlotState &get_slot_state_by_slot(int32_t slot); + PTO2TaskSlotState &get_slot_state_by_task_id(int32_t local_id); }; // In header: PTO2SharedMemoryRingHeader rings[PTO2_MAX_RING_DEPTH]; ``` -The global `heap_tail_gen` ticket counter is removed; each ring's scheduler state serializes ring-advance via a per-ring try-lock. +Per-ring try-locks in the scheduler state prevent concurrent scheduler threads from interleaving watermark writes within the same ring. `FaninPool`/`DepListPool` `reclaim`/`ensure_space` take `PTO2SharedMemoryRingHeader&` directly (no `ring_id` or `fc` parameters). -### 4.4 PTO2SharedMemoryHandle (modified) +### 4.4 PTO2SharedMemoryHandle (lifecycle-only) -Per-ring descriptor and payload arrays: +Slimmed to lifecycle management only. Per-ring data pointers now live in `PTO2SharedMemoryRingHeader` (§4.3). Runtime components (orchestrator, scheduler) store `PTO2SharedMemoryHeader*` directly, eliminating one indirection on every per-ring access. ```cpp -PTO2TaskDescriptor* task_descriptors[PTO2_MAX_RING_DEPTH]; -PTO2TaskPayload* task_payloads[PTO2_MAX_RING_DEPTH]; +struct PTO2SharedMemoryHandle { + void *sm_base; + uint64_t sm_size; + PTO2SharedMemoryHeader *header; + bool is_owner; +}; ``` ### 4.5 PTO2SchedulerState (modified) ```cpp struct RingSchedState { - PTO2TaskSlotState* slot_states; - int32_t task_window_size; - int32_t task_window_mask; - std::atomic advance_lock; - alignas(64) PTO2DepListPool dep_pool; // fanout wiring dep pool (thread 0 only, cache-isolated) + // Cache Line 0: ring pointer (read-only) + hot path (read-write) + PTO2SharedMemoryRingHeader *ring; // direct pointer, no indirection + int32_t last_task_alive; + std::atomic advance_lock; // multi-thread CAS + + // Cache Line 1+: Thread 0 only (wiring dep_pool, cache-isolated) + alignas(64) PTO2DepListPool dep_pool; }; RingSchedState ring_sched_states[PTO2_MAX_RING_DEPTH]; PTO2SpscQueue wiring_queue; // global SPSC queue: orchestrator pushes, scheduler thread 0 drains ``` +`slot_states`, `task_window_size`, and `task_window_mask` are no longer duplicated — callers access them via `ring->get_slot_state_by_*()` and other ring header accessors. The ring pointer shares cache line 0 with `last_task_alive` and `advance_lock`. + ### 4.6 PTO2TensorMap (modified) ```cpp @@ -154,13 +178,12 @@ bool entry_valid(const PTO2TensorMapEntry& e) { Each ring's `last_task_alive` advances independently: ```text -advance_ring_pointers(ring_id): - la = rings[ring_id].fc.last_task_alive - while task_state[la & mask] >= CONSUMED: - advance heap_tail from packed_buffer_end - reset fanin_refcount - CAS(last_task_alive, la, la+1) +advance_ring_pointers(ring_id): // protected by per-ring advance_lock + la = ring->fc.last_task_alive + while ring->get_slot_state_by_task_id(la).task_state >= CONSUMED: + reset slot for reuse la++ + sync_to_sm() // release-store last_task_alive ``` Per-ring try-locks in the scheduler state prevent concurrent scheduler threads from interleaving heap_tail writes within the same ring. @@ -180,9 +203,9 @@ DepPool is exclusively managed by scheduler thread 0 (allocation during wiring, ```text // Called by scheduler thread 0 during wiring_queue drain: dep_pool_reclaim(ring_id): - la = rings[ring_id].fc.last_task_alive + la = ring->fc.last_task_alive newest_consumed = la - 1 - mark = slot_states[slot(newest_consumed)].dep_pool_mark + mark = ring->get_slot_state_by_task_id(newest_consumed).dep_pool_mark if mark > 0: ring_sched_states[ring_id].dep_pool.advance_tail(mark) ``` diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/docs/RUNTIME_LOGIC.md b/src/a2a3/runtime/tensormap_and_ringbuffer/docs/RUNTIME_LOGIC.md index d18f04a01..e6a87585e 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/docs/RUNTIME_LOGIC.md +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/docs/RUNTIME_LOGIC.md @@ -107,11 +107,12 @@ The orchestrator and schedulers communicate through a contiguous shared memory r ```text ┌─────────────────────────────┐ offset 0 -│ PTO2SharedMemoryHeader │ (flow control, config, sync flags) +│ PTO2SharedMemoryHeader │ (per-ring flow control + layout, global flags) ├─────────────────────────────┤ aligned -│ PTO2TaskDescriptor[N] │ N = task_window_size (default 65536) -├─────────────────────────────┤ aligned -│ PTO2DepListEntry[M+1] │ M = dep_list_pool_size (entry 0 = NULL sentinel) +│ Per-ring regions ×4: │ +│ PTO2TaskDescriptor[N] │ N = task_window_size per ring +│ PTO2TaskPayload[N] │ +│ PTO2TaskSlotState[N] │ └─────────────────────────────┘ ``` @@ -123,13 +124,10 @@ The orchestrator and schedulers communicate through a contiguous shared memory r | `last_task_alive` | Scheduler | Orchestrator | Oldest still-active task (task ring tail) | | `heap_top` | Orchestrator | Scheduler | Heap ring allocation pointer | | `heap_tail` | Scheduler | Orchestrator | Heap ring reclamation pointer | -| `heap_tail_gen` | Scheduler | Scheduler | Ticket counter for serialized `heap_tail` writes | | `orchestrator_done` | Orchestrator | Scheduler | Signals orchestration completion | -| `task_window_size` | Init | Both | Number of task slots | -| `heap_size` | Init | Both | Heap total size | -| `dep_list_pool_size` | Init | Both | Dependency list pool size | -| `task_descriptors_offset` | Init | Both | Offset to TaskDescriptor array in SM | -| `dep_list_pool_offset` | Init | Both | Offset to DepListPool in SM | +| `task_window_size` | Init | Both | Number of task slots (per-ring, in `PTO2SharedMemoryRingHeader`) | +| `heap_size` | Init | Both | Heap total size (per-ring, in `PTO2SharedMemoryRingHeader`) | +| `task_descriptors_offset` | Init | Both | Offset to TaskDescriptor array in SM (per-ring) | | `total_size` | Init | Both | Total shared memory size | | `graph_output_ptr` | Orchestrator | Host | Address of final output (packed buffer) | | `graph_output_size` | Orchestrator | Host | Size of final output in bytes | @@ -137,8 +135,10 @@ The orchestrator and schedulers communicate through a contiguous shared memory r ### 3.2 Size Calculation ```text -total = ALIGN(Header) + ALIGN(window_size * sizeof(TaskDescriptor)) - + ALIGN((dep_pool_size + 1) * sizeof(DepListEntry)) +total = ALIGN(Header) + + Σ_ring [ ALIGN(window_size * sizeof(TaskDescriptor)) + + ALIGN(window_size * sizeof(TaskPayload)) + + ALIGN(window_size * sizeof(TaskSlotState)) ] ``` Alignment is 64 bytes (`PTO2_ALIGN_SIZE`). @@ -395,7 +395,7 @@ Key members: | 2 | Initialize task descriptor + slot state, copy parameters | | 3 | **Lookup**: for each INPUT/INOUT param, search TensorMap for producers; collect producer pointers in `PTO2FaninBuilder` | | 4 | **Insert**: register OUTPUT/INOUT args in TensorMap | -| 5 | **Record fanin metadata**: store producer pointers in `payload->fanin_inline_slot_states[]` (+ spill pool if >16); increment each producer's `fanout_count` (no lock needed — single writer) | +| 5 | **Record fanin metadata**: store producer pointers in `payload->fanin_inline_slot_states[]` (+ spill pool if >16); increment each producer's `fanout_count` (no lock needed — single writer). This step runs **before** `payload.init()`. | | 6 | **Push to wiring queue**: push to global `PTO2SpscQueue`; scheduler thread 0 asynchronously wires fanout edges (lock + dep_pool + early_finished check + ready push) | > **Note**: Fanout wiring (Steps 4–7 in earlier versions) has been moved from the @@ -489,15 +489,15 @@ Ready queues use a lock-free bounded MPMC (Vyukov) design: After a task reaches state CONSUMED (4), the scheduler tries to advance `last_task_alive`: ```text -while la < current_task_index: - if task_state[la & mask] < CONSUMED: break - reset fanin_refcount[la & mask] = 0 - CAS(last_task_alive, la, la+1) - advance heap_tail from task's packed_buffer_end - la++ +advance_ring_pointers(ring_id): // protected by per-ring advance_lock + while la < current_task_index: + if task_state[la & mask] < CONSUMED: break + reset slot for reuse + la++ + sync_to_sm() // release-store last_task_alive ``` -This is lock-free (CAS-based) and multiple scheduler threads can attempt it concurrently. The `heap_tail_gen` ticket counter serializes `heap_tail` writes to ensure tasks' buffer regions are freed in order. +This is protected by a per-ring try-lock (`advance_lock`) in `RingSchedState`, ensuring only one scheduler thread advances a given ring's watermark at a time. --- diff --git a/src/a5/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.md b/src/a5/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.md index 09d732784..e146fd823 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.md +++ b/src/a5/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.md @@ -84,43 +84,67 @@ struct PTO2RingFlowControl { std::atomic heap_tail; // heap reclaim pointer }; -struct PTO2SharedMemoryRingHeader { +struct alignas(64) PTO2SharedMemoryRingHeader { PTO2RingFlowControl fc; + + // Layout metadata (set once at init) uint64_t task_window_size; + int32_t task_window_mask; // task_window_size - 1 uint64_t heap_size; uint64_t task_descriptors_offset; + + // Per-ring data pointers (host-side, set by pto2_sm_setup_pointers) + PTO2TaskDescriptor *task_descriptors; + PTO2TaskPayload *task_payloads; + PTO2TaskSlotState *slot_states; + + // Accessors (slot = local_id & task_window_mask) + PTO2TaskDescriptor &get_task_by_slot(int32_t slot); + PTO2TaskDescriptor &get_task_by_task_id(int32_t local_id); + PTO2TaskPayload &get_payload_by_slot(int32_t slot); + PTO2TaskPayload &get_payload_by_task_id(int32_t local_id); + PTO2TaskSlotState &get_slot_state_by_slot(int32_t slot); + PTO2TaskSlotState &get_slot_state_by_task_id(int32_t local_id); }; // In header: PTO2SharedMemoryRingHeader rings[PTO2_MAX_RING_DEPTH]; ``` -The global `heap_tail_gen` ticket counter is removed; each ring's scheduler state serializes ring-advance via a per-ring try-lock. +Per-ring try-locks in the scheduler state prevent concurrent scheduler threads from interleaving watermark writes within the same ring. `FaninPool`/`DepListPool` `reclaim`/`ensure_space` take `PTO2SharedMemoryRingHeader&` directly (no `ring_id` or `fc` parameters). -### 4.4 PTO2SharedMemoryHandle (modified) +### 4.4 PTO2SharedMemoryHandle (lifecycle-only) -Per-ring descriptor and payload arrays: +Slimmed to lifecycle management only. Per-ring data pointers now live in `PTO2SharedMemoryRingHeader` (§4.3). Runtime components (orchestrator, scheduler) store `PTO2SharedMemoryHeader*` directly, eliminating one indirection on every per-ring access. ```cpp -PTO2TaskDescriptor* task_descriptors[PTO2_MAX_RING_DEPTH]; -PTO2TaskPayload* task_payloads[PTO2_MAX_RING_DEPTH]; +struct PTO2SharedMemoryHandle { + void *sm_base; + uint64_t sm_size; + PTO2SharedMemoryHeader *header; + bool is_owner; +}; ``` ### 4.5 PTO2SchedulerState (modified) ```cpp struct RingSchedState { - PTO2TaskSlotState* slot_states; - int32_t task_window_size; - int32_t task_window_mask; - std::atomic advance_lock; - alignas(64) PTO2DepListPool dep_pool; // fanout wiring dep pool (thread 0 only, cache-isolated) + // Cache Line 0: ring pointer (read-only) + hot path (read-write) + PTO2SharedMemoryRingHeader *ring; // direct pointer, no indirection + int32_t last_task_alive; + std::atomic advance_lock; // multi-thread CAS + + // Cache Line 1+: Thread 0 only (wiring dep_pool, cache-isolated) + alignas(64) PTO2DepListPool dep_pool; }; RingSchedState ring_sched_states[PTO2_MAX_RING_DEPTH]; PTO2SpscQueue wiring_queue; // global SPSC queue: orchestrator pushes, scheduler thread 0 drains ``` +`slot_states`, `task_window_size`, and `task_window_mask` are no longer duplicated — callers access them via `ring->get_slot_state_by_*()` and other ring header accessors. The ring pointer shares cache line 0 with `last_task_alive` and `advance_lock`. + ### 4.6 PTO2TensorMap (modified) ```cpp @@ -154,13 +178,12 @@ bool entry_valid(const PTO2TensorMapEntry& e) { Each ring's `last_task_alive` advances independently: ```text -advance_ring_pointers(ring_id): - la = rings[ring_id].fc.last_task_alive - while task_state[la & mask] >= CONSUMED: - advance heap_tail from packed_buffer_end - reset fanin_refcount - CAS(last_task_alive, la, la+1) +advance_ring_pointers(ring_id): // protected by per-ring advance_lock + la = ring->fc.last_task_alive + while ring->get_slot_state_by_task_id(la).task_state >= CONSUMED: + reset slot for reuse la++ + sync_to_sm() // release-store last_task_alive ``` Per-ring try-locks in the scheduler state prevent concurrent scheduler threads from interleaving heap_tail writes within the same ring. @@ -180,9 +203,9 @@ DepPool is exclusively managed by scheduler thread 0 (allocation during wiring, ```text // Called by scheduler thread 0 during wiring_queue drain: dep_pool_reclaim(ring_id): - la = rings[ring_id].fc.last_task_alive + la = ring->fc.last_task_alive newest_consumed = la - 1 - mark = slot_states[slot(newest_consumed)].dep_pool_mark + mark = ring->get_slot_state_by_task_id(newest_consumed).dep_pool_mark if mark > 0: ring_sched_states[ring_id].dep_pool.advance_tail(mark) ``` diff --git a/src/a5/runtime/tensormap_and_ringbuffer/docs/RUNTIME_LOGIC.md b/src/a5/runtime/tensormap_and_ringbuffer/docs/RUNTIME_LOGIC.md index d18f04a01..e6a87585e 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/docs/RUNTIME_LOGIC.md +++ b/src/a5/runtime/tensormap_and_ringbuffer/docs/RUNTIME_LOGIC.md @@ -107,11 +107,12 @@ The orchestrator and schedulers communicate through a contiguous shared memory r ```text ┌─────────────────────────────┐ offset 0 -│ PTO2SharedMemoryHeader │ (flow control, config, sync flags) +│ PTO2SharedMemoryHeader │ (per-ring flow control + layout, global flags) ├─────────────────────────────┤ aligned -│ PTO2TaskDescriptor[N] │ N = task_window_size (default 65536) -├─────────────────────────────┤ aligned -│ PTO2DepListEntry[M+1] │ M = dep_list_pool_size (entry 0 = NULL sentinel) +│ Per-ring regions ×4: │ +│ PTO2TaskDescriptor[N] │ N = task_window_size per ring +│ PTO2TaskPayload[N] │ +│ PTO2TaskSlotState[N] │ └─────────────────────────────┘ ``` @@ -123,13 +124,10 @@ The orchestrator and schedulers communicate through a contiguous shared memory r | `last_task_alive` | Scheduler | Orchestrator | Oldest still-active task (task ring tail) | | `heap_top` | Orchestrator | Scheduler | Heap ring allocation pointer | | `heap_tail` | Scheduler | Orchestrator | Heap ring reclamation pointer | -| `heap_tail_gen` | Scheduler | Scheduler | Ticket counter for serialized `heap_tail` writes | | `orchestrator_done` | Orchestrator | Scheduler | Signals orchestration completion | -| `task_window_size` | Init | Both | Number of task slots | -| `heap_size` | Init | Both | Heap total size | -| `dep_list_pool_size` | Init | Both | Dependency list pool size | -| `task_descriptors_offset` | Init | Both | Offset to TaskDescriptor array in SM | -| `dep_list_pool_offset` | Init | Both | Offset to DepListPool in SM | +| `task_window_size` | Init | Both | Number of task slots (per-ring, in `PTO2SharedMemoryRingHeader`) | +| `heap_size` | Init | Both | Heap total size (per-ring, in `PTO2SharedMemoryRingHeader`) | +| `task_descriptors_offset` | Init | Both | Offset to TaskDescriptor array in SM (per-ring) | | `total_size` | Init | Both | Total shared memory size | | `graph_output_ptr` | Orchestrator | Host | Address of final output (packed buffer) | | `graph_output_size` | Orchestrator | Host | Size of final output in bytes | @@ -137,8 +135,10 @@ The orchestrator and schedulers communicate through a contiguous shared memory r ### 3.2 Size Calculation ```text -total = ALIGN(Header) + ALIGN(window_size * sizeof(TaskDescriptor)) - + ALIGN((dep_pool_size + 1) * sizeof(DepListEntry)) +total = ALIGN(Header) + + Σ_ring [ ALIGN(window_size * sizeof(TaskDescriptor)) + + ALIGN(window_size * sizeof(TaskPayload)) + + ALIGN(window_size * sizeof(TaskSlotState)) ] ``` Alignment is 64 bytes (`PTO2_ALIGN_SIZE`). @@ -395,7 +395,7 @@ Key members: | 2 | Initialize task descriptor + slot state, copy parameters | | 3 | **Lookup**: for each INPUT/INOUT param, search TensorMap for producers; collect producer pointers in `PTO2FaninBuilder` | | 4 | **Insert**: register OUTPUT/INOUT args in TensorMap | -| 5 | **Record fanin metadata**: store producer pointers in `payload->fanin_inline_slot_states[]` (+ spill pool if >16); increment each producer's `fanout_count` (no lock needed — single writer) | +| 5 | **Record fanin metadata**: store producer pointers in `payload->fanin_inline_slot_states[]` (+ spill pool if >16); increment each producer's `fanout_count` (no lock needed — single writer). This step runs **before** `payload.init()`. | | 6 | **Push to wiring queue**: push to global `PTO2SpscQueue`; scheduler thread 0 asynchronously wires fanout edges (lock + dep_pool + early_finished check + ready push) | > **Note**: Fanout wiring (Steps 4–7 in earlier versions) has been moved from the @@ -489,15 +489,15 @@ Ready queues use a lock-free bounded MPMC (Vyukov) design: After a task reaches state CONSUMED (4), the scheduler tries to advance `last_task_alive`: ```text -while la < current_task_index: - if task_state[la & mask] < CONSUMED: break - reset fanin_refcount[la & mask] = 0 - CAS(last_task_alive, la, la+1) - advance heap_tail from task's packed_buffer_end - la++ +advance_ring_pointers(ring_id): // protected by per-ring advance_lock + while la < current_task_index: + if task_state[la & mask] < CONSUMED: break + reset slot for reuse + la++ + sync_to_sm() // release-store last_task_alive ``` -This is lock-free (CAS-based) and multiple scheduler threads can attempt it concurrently. The `heap_tail_gen` ticket counter serializes `heap_tail` writes to ensure tasks' buffer regions are freed in order. +This is protected by a per-ring try-lock (`advance_lock`) in `RingSchedState`, ensuring only one scheduler thread advances a given ring's watermark at a time. ---