Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 42 additions & 19 deletions src/a2a3/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,43 +84,67 @@ struct PTO2RingFlowControl {
std::atomic<uint64_t> heap_tail; // heap reclaim pointer
};

struct PTO2SharedMemoryRingHeader {
struct alignas(64) PTO2SharedMemoryRingHeader {
PTO2RingFlowControl fc;

// Layout metadata (set once at init)
uint64_t task_window_size;
int32_t task_window_mask; // task_window_size - 1
uint64_t heap_size;
uint64_t task_descriptors_offset;

// Per-ring data pointers (host-side, set by pto2_sm_setup_pointers)
PTO2TaskDescriptor *task_descriptors;
PTO2TaskPayload *task_payloads;
PTO2TaskSlotState *slot_states;

// Accessors (slot = local_id & task_window_mask)
PTO2TaskDescriptor &get_task_by_slot(int32_t slot);
PTO2TaskDescriptor &get_task_by_task_id(int32_t local_id);
PTO2TaskPayload &get_payload_by_slot(int32_t slot);
PTO2TaskPayload &get_payload_by_task_id(int32_t local_id);
PTO2TaskSlotState &get_slot_state_by_slot(int32_t slot);
PTO2TaskSlotState &get_slot_state_by_task_id(int32_t local_id);
};

// In header:
PTO2SharedMemoryRingHeader rings[PTO2_MAX_RING_DEPTH];
```

The global `heap_tail_gen` ticket counter is removed; each ring's scheduler state serializes ring-advance via a per-ring try-lock.
Per-ring try-locks in the scheduler state prevent concurrent scheduler threads from interleaving watermark writes within the same ring. `FaninPool`/`DepListPool` `reclaim`/`ensure_space` take `PTO2SharedMemoryRingHeader&` directly (no `ring_id` or `fc` parameters).

### 4.4 PTO2SharedMemoryHandle (modified)
### 4.4 PTO2SharedMemoryHandle (lifecycle-only)

Per-ring descriptor and payload arrays:
Slimmed to lifecycle management only. Per-ring data pointers now live in `PTO2SharedMemoryRingHeader` (§4.3). Runtime components (orchestrator, scheduler) store `PTO2SharedMemoryHeader*` directly, eliminating one indirection on every per-ring access.

```cpp
PTO2TaskDescriptor* task_descriptors[PTO2_MAX_RING_DEPTH];
PTO2TaskPayload* task_payloads[PTO2_MAX_RING_DEPTH];
struct PTO2SharedMemoryHandle {
void *sm_base;
uint64_t sm_size;
PTO2SharedMemoryHeader *header;
bool is_owner;
};
```

### 4.5 PTO2SchedulerState (modified)

```cpp
struct RingSchedState {
PTO2TaskSlotState* slot_states;
int32_t task_window_size;
int32_t task_window_mask;
std::atomic<int32_t> advance_lock;
alignas(64) PTO2DepListPool dep_pool; // fanout wiring dep pool (thread 0 only, cache-isolated)
// Cache Line 0: ring pointer (read-only) + hot path (read-write)
PTO2SharedMemoryRingHeader *ring; // direct pointer, no indirection
int32_t last_task_alive;
std::atomic<int32_t> advance_lock; // multi-thread CAS

// Cache Line 1+: Thread 0 only (wiring dep_pool, cache-isolated)
alignas(64) PTO2DepListPool dep_pool;
};

RingSchedState ring_sched_states[PTO2_MAX_RING_DEPTH];
PTO2SpscQueue wiring_queue; // global SPSC queue: orchestrator pushes, scheduler thread 0 drains
```

`slot_states`, `task_window_size`, and `task_window_mask` are no longer duplicated — callers access them via `ring->get_slot_state_by_*()` and other ring header accessors. The ring pointer shares cache line 0 with `last_task_alive` and `advance_lock`.

### 4.6 PTO2TensorMap (modified)

```cpp
Expand Down Expand Up @@ -154,13 +178,12 @@ bool entry_valid(const PTO2TensorMapEntry& e) {
Each ring's `last_task_alive` advances independently:

```text
advance_ring_pointers(ring_id):
la = rings[ring_id].fc.last_task_alive
while task_state[la & mask] >= CONSUMED:
advance heap_tail from packed_buffer_end
reset fanin_refcount
CAS(last_task_alive, la, la+1)
advance_ring_pointers(ring_id): // protected by per-ring advance_lock
la = ring->fc.last_task_alive
while ring->get_slot_state_by_task_id(la).task_state >= CONSUMED:
reset slot for reuse
la++
sync_to_sm() // release-store last_task_alive
```

Per-ring try-locks in the scheduler state prevent concurrent scheduler threads from interleaving heap_tail writes within the same ring.
Expand All @@ -180,9 +203,9 @@ DepPool is exclusively managed by scheduler thread 0 (allocation during wiring,
```text
// Called by scheduler thread 0 during wiring_queue drain:
dep_pool_reclaim(ring_id):
la = rings[ring_id].fc.last_task_alive
la = ring->fc.last_task_alive
newest_consumed = la - 1
mark = slot_states[slot(newest_consumed)].dep_pool_mark
mark = ring->get_slot_state_by_task_id(newest_consumed).dep_pool_mark
if mark > 0:
ring_sched_states[ring_id].dep_pool.advance_tail(mark)
```
Expand Down
40 changes: 20 additions & 20 deletions src/a2a3/runtime/tensormap_and_ringbuffer/docs/RUNTIME_LOGIC.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,12 @@ The orchestrator and schedulers communicate through a contiguous shared memory r

```text
┌─────────────────────────────┐ offset 0
│ PTO2SharedMemoryHeader │ (flow control, config, sync flags)
│ PTO2SharedMemoryHeader │ (per-ring flow control + layout, global flags)
├─────────────────────────────┤ aligned
│ PTO2TaskDescriptor[N] │ N = task_window_size (default 65536)
├─────────────────────────────┤ aligned
│ PTO2DepListEntry[M+1] │ M = dep_list_pool_size (entry 0 = NULL sentinel)
│ Per-ring regions ×4: │
│ PTO2TaskDescriptor[N] │ N = task_window_size per ring
│ PTO2TaskPayload[N] │
│ PTO2TaskSlotState[N] │
└─────────────────────────────┘
```

Expand All @@ -123,22 +124,21 @@ The orchestrator and schedulers communicate through a contiguous shared memory r
| `last_task_alive` | Scheduler | Orchestrator | Oldest still-active task (task ring tail) |
| `heap_top` | Orchestrator | Scheduler | Heap ring allocation pointer |
| `heap_tail` | Scheduler | Orchestrator | Heap ring reclamation pointer |
| `heap_tail_gen` | Scheduler | Scheduler | Ticket counter for serialized `heap_tail` writes |
| `orchestrator_done` | Orchestrator | Scheduler | Signals orchestration completion |
| `task_window_size` | Init | Both | Number of task slots |
| `heap_size` | Init | Both | Heap total size |
| `dep_list_pool_size` | Init | Both | Dependency list pool size |
| `task_descriptors_offset` | Init | Both | Offset to TaskDescriptor array in SM |
| `dep_list_pool_offset` | Init | Both | Offset to DepListPool in SM |
| `task_window_size` | Init | Both | Number of task slots (per-ring, in `PTO2SharedMemoryRingHeader`) |
| `heap_size` | Init | Both | Heap total size (per-ring, in `PTO2SharedMemoryRingHeader`) |
| `task_descriptors_offset` | Init | Both | Offset to TaskDescriptor array in SM (per-ring) |
| `total_size` | Init | Both | Total shared memory size |
| `graph_output_ptr` | Orchestrator | Host | Address of final output (packed buffer) |
| `graph_output_size` | Orchestrator | Host | Size of final output in bytes |

### 3.2 Size Calculation

```text
total = ALIGN(Header) + ALIGN(window_size * sizeof(TaskDescriptor))
+ ALIGN((dep_pool_size + 1) * sizeof(DepListEntry))
total = ALIGN(Header)
+ Σ_ring [ ALIGN(window_size * sizeof(TaskDescriptor))
+ ALIGN(window_size * sizeof(TaskPayload))
+ ALIGN(window_size * sizeof(TaskSlotState)) ]
```

Alignment is 64 bytes (`PTO2_ALIGN_SIZE`).
Expand Down Expand Up @@ -395,7 +395,7 @@ Key members:
| 2 | Initialize task descriptor + slot state, copy parameters |
| 3 | **Lookup**: for each INPUT/INOUT param, search TensorMap for producers; collect producer pointers in `PTO2FaninBuilder` |
| 4 | **Insert**: register OUTPUT/INOUT args in TensorMap |
| 5 | **Record fanin metadata**: store producer pointers in `payload->fanin_inline_slot_states[]` (+ spill pool if >16); increment each producer's `fanout_count` (no lock needed — single writer) |
| 5 | **Record fanin metadata**: store producer pointers in `payload->fanin_inline_slot_states[]` (+ spill pool if >16); increment each producer's `fanout_count` (no lock needed — single writer). This step runs **before** `payload.init()`. |
| 6 | **Push to wiring queue**: push to global `PTO2SpscQueue`; scheduler thread 0 asynchronously wires fanout edges (lock + dep_pool + early_finished check + ready push) |

> **Note**: Fanout wiring (Steps 4–7 in earlier versions) has been moved from the
Expand Down Expand Up @@ -489,15 +489,15 @@ Ready queues use a lock-free bounded MPMC (Vyukov) design:
After a task reaches state CONSUMED (4), the scheduler tries to advance `last_task_alive`:

```text
while la < current_task_index:
if task_state[la & mask] < CONSUMED: break
reset fanin_refcount[la & mask] = 0
CAS(last_task_alive, la, la+1)
advance heap_tail from task's packed_buffer_end
la++
advance_ring_pointers(ring_id): // protected by per-ring advance_lock
while la < current_task_index:
if task_state[la & mask] < CONSUMED: break
reset slot for reuse
la++
sync_to_sm() // release-store last_task_alive
```

This is lock-free (CAS-based) and multiple scheduler threads can attempt it concurrently. The `heap_tail_gen` ticket counter serializes `heap_tail` writes to ensure tasks' buffer regions are freed in order.
This is protected by a per-ring try-lock (`advance_lock`) in `RingSchedState`, ensuring only one scheduler thread advances a given ring's watermark at a time.

---

Expand Down
61 changes: 42 additions & 19 deletions src/a5/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,43 +84,67 @@ struct PTO2RingFlowControl {
std::atomic<uint64_t> heap_tail; // heap reclaim pointer
};

struct PTO2SharedMemoryRingHeader {
struct alignas(64) PTO2SharedMemoryRingHeader {
PTO2RingFlowControl fc;

// Layout metadata (set once at init)
uint64_t task_window_size;
int32_t task_window_mask; // task_window_size - 1
uint64_t heap_size;
uint64_t task_descriptors_offset;

// Per-ring data pointers (host-side, set by pto2_sm_setup_pointers)
PTO2TaskDescriptor *task_descriptors;
PTO2TaskPayload *task_payloads;
PTO2TaskSlotState *slot_states;

// Accessors (slot = local_id & task_window_mask)
PTO2TaskDescriptor &get_task_by_slot(int32_t slot);
PTO2TaskDescriptor &get_task_by_task_id(int32_t local_id);
PTO2TaskPayload &get_payload_by_slot(int32_t slot);
PTO2TaskPayload &get_payload_by_task_id(int32_t local_id);
PTO2TaskSlotState &get_slot_state_by_slot(int32_t slot);
PTO2TaskSlotState &get_slot_state_by_task_id(int32_t local_id);
};

// In header:
PTO2SharedMemoryRingHeader rings[PTO2_MAX_RING_DEPTH];
```

The global `heap_tail_gen` ticket counter is removed; each ring's scheduler state serializes ring-advance via a per-ring try-lock.
Per-ring try-locks in the scheduler state prevent concurrent scheduler threads from interleaving watermark writes within the same ring. `FaninPool`/`DepListPool` `reclaim`/`ensure_space` take `PTO2SharedMemoryRingHeader&` directly (no `ring_id` or `fc` parameters).

### 4.4 PTO2SharedMemoryHandle (modified)
### 4.4 PTO2SharedMemoryHandle (lifecycle-only)

Per-ring descriptor and payload arrays:
Slimmed to lifecycle management only. Per-ring data pointers now live in `PTO2SharedMemoryRingHeader` (§4.3). Runtime components (orchestrator, scheduler) store `PTO2SharedMemoryHeader*` directly, eliminating one indirection on every per-ring access.

```cpp
PTO2TaskDescriptor* task_descriptors[PTO2_MAX_RING_DEPTH];
PTO2TaskPayload* task_payloads[PTO2_MAX_RING_DEPTH];
struct PTO2SharedMemoryHandle {
void *sm_base;
uint64_t sm_size;
PTO2SharedMemoryHeader *header;
bool is_owner;
};
```

### 4.5 PTO2SchedulerState (modified)

```cpp
struct RingSchedState {
PTO2TaskSlotState* slot_states;
int32_t task_window_size;
int32_t task_window_mask;
std::atomic<int32_t> advance_lock;
alignas(64) PTO2DepListPool dep_pool; // fanout wiring dep pool (thread 0 only, cache-isolated)
// Cache Line 0: ring pointer (read-only) + hot path (read-write)
PTO2SharedMemoryRingHeader *ring; // direct pointer, no indirection
int32_t last_task_alive;
std::atomic<int32_t> advance_lock; // multi-thread CAS

// Cache Line 1+: Thread 0 only (wiring dep_pool, cache-isolated)
alignas(64) PTO2DepListPool dep_pool;
};

RingSchedState ring_sched_states[PTO2_MAX_RING_DEPTH];
PTO2SpscQueue wiring_queue; // global SPSC queue: orchestrator pushes, scheduler thread 0 drains
```

`slot_states`, `task_window_size`, and `task_window_mask` are no longer duplicated — callers access them via `ring->get_slot_state_by_*()` and other ring header accessors. The ring pointer shares cache line 0 with `last_task_alive` and `advance_lock`.

### 4.6 PTO2TensorMap (modified)

```cpp
Expand Down Expand Up @@ -154,13 +178,12 @@ bool entry_valid(const PTO2TensorMapEntry& e) {
Each ring's `last_task_alive` advances independently:

```text
advance_ring_pointers(ring_id):
la = rings[ring_id].fc.last_task_alive
while task_state[la & mask] >= CONSUMED:
advance heap_tail from packed_buffer_end
reset fanin_refcount
CAS(last_task_alive, la, la+1)
advance_ring_pointers(ring_id): // protected by per-ring advance_lock
la = ring->fc.last_task_alive
while ring->get_slot_state_by_task_id(la).task_state >= CONSUMED:
reset slot for reuse
la++
sync_to_sm() // release-store last_task_alive
```

Per-ring try-locks in the scheduler state prevent concurrent scheduler threads from interleaving heap_tail writes within the same ring.
Expand All @@ -180,9 +203,9 @@ DepPool is exclusively managed by scheduler thread 0 (allocation during wiring,
```text
// Called by scheduler thread 0 during wiring_queue drain:
dep_pool_reclaim(ring_id):
la = rings[ring_id].fc.last_task_alive
la = ring->fc.last_task_alive
newest_consumed = la - 1
mark = slot_states[slot(newest_consumed)].dep_pool_mark
mark = ring->get_slot_state_by_task_id(newest_consumed).dep_pool_mark
if mark > 0:
ring_sched_states[ring_id].dep_pool.advance_tail(mark)
```
Expand Down
40 changes: 20 additions & 20 deletions src/a5/runtime/tensormap_and_ringbuffer/docs/RUNTIME_LOGIC.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,12 @@ The orchestrator and schedulers communicate through a contiguous shared memory r

```text
┌─────────────────────────────┐ offset 0
│ PTO2SharedMemoryHeader │ (flow control, config, sync flags)
│ PTO2SharedMemoryHeader │ (per-ring flow control + layout, global flags)
├─────────────────────────────┤ aligned
│ PTO2TaskDescriptor[N] │ N = task_window_size (default 65536)
├─────────────────────────────┤ aligned
│ PTO2DepListEntry[M+1] │ M = dep_list_pool_size (entry 0 = NULL sentinel)
│ Per-ring regions ×4: │
│ PTO2TaskDescriptor[N] │ N = task_window_size per ring
│ PTO2TaskPayload[N] │
│ PTO2TaskSlotState[N] │
└─────────────────────────────┘
```

Expand All @@ -123,22 +124,21 @@ The orchestrator and schedulers communicate through a contiguous shared memory r
| `last_task_alive` | Scheduler | Orchestrator | Oldest still-active task (task ring tail) |
| `heap_top` | Orchestrator | Scheduler | Heap ring allocation pointer |
| `heap_tail` | Scheduler | Orchestrator | Heap ring reclamation pointer |
| `heap_tail_gen` | Scheduler | Scheduler | Ticket counter for serialized `heap_tail` writes |
| `orchestrator_done` | Orchestrator | Scheduler | Signals orchestration completion |
| `task_window_size` | Init | Both | Number of task slots |
| `heap_size` | Init | Both | Heap total size |
| `dep_list_pool_size` | Init | Both | Dependency list pool size |
| `task_descriptors_offset` | Init | Both | Offset to TaskDescriptor array in SM |
| `dep_list_pool_offset` | Init | Both | Offset to DepListPool in SM |
| `task_window_size` | Init | Both | Number of task slots (per-ring, in `PTO2SharedMemoryRingHeader`) |
| `heap_size` | Init | Both | Heap total size (per-ring, in `PTO2SharedMemoryRingHeader`) |
| `task_descriptors_offset` | Init | Both | Offset to TaskDescriptor array in SM (per-ring) |
| `total_size` | Init | Both | Total shared memory size |
| `graph_output_ptr` | Orchestrator | Host | Address of final output (packed buffer) |
| `graph_output_size` | Orchestrator | Host | Size of final output in bytes |

### 3.2 Size Calculation

```text
total = ALIGN(Header) + ALIGN(window_size * sizeof(TaskDescriptor))
+ ALIGN((dep_pool_size + 1) * sizeof(DepListEntry))
total = ALIGN(Header)
+ Σ_ring [ ALIGN(window_size * sizeof(TaskDescriptor))
+ ALIGN(window_size * sizeof(TaskPayload))
+ ALIGN(window_size * sizeof(TaskSlotState)) ]
```

Alignment is 64 bytes (`PTO2_ALIGN_SIZE`).
Expand Down Expand Up @@ -395,7 +395,7 @@ Key members:
| 2 | Initialize task descriptor + slot state, copy parameters |
| 3 | **Lookup**: for each INPUT/INOUT param, search TensorMap for producers; collect producer pointers in `PTO2FaninBuilder` |
| 4 | **Insert**: register OUTPUT/INOUT args in TensorMap |
| 5 | **Record fanin metadata**: store producer pointers in `payload->fanin_inline_slot_states[]` (+ spill pool if >16); increment each producer's `fanout_count` (no lock needed — single writer) |
| 5 | **Record fanin metadata**: store producer pointers in `payload->fanin_inline_slot_states[]` (+ spill pool if >16); increment each producer's `fanout_count` (no lock needed — single writer). This step runs **before** `payload.init()`. |
| 6 | **Push to wiring queue**: push to global `PTO2SpscQueue`; scheduler thread 0 asynchronously wires fanout edges (lock + dep_pool + early_finished check + ready push) |

> **Note**: Fanout wiring (Steps 4–7 in earlier versions) has been moved from the
Expand Down Expand Up @@ -489,15 +489,15 @@ Ready queues use a lock-free bounded MPMC (Vyukov) design:
After a task reaches state CONSUMED (4), the scheduler tries to advance `last_task_alive`:

```text
while la < current_task_index:
if task_state[la & mask] < CONSUMED: break
reset fanin_refcount[la & mask] = 0
CAS(last_task_alive, la, la+1)
advance heap_tail from task's packed_buffer_end
la++
advance_ring_pointers(ring_id): // protected by per-ring advance_lock
while la < current_task_index:
if task_state[la & mask] < CONSUMED: break
reset slot for reuse
la++
sync_to_sm() // release-store last_task_alive
```

This is lock-free (CAS-based) and multiple scheduler threads can attempt it concurrently. The `heap_tail_gen` ticket counter serializes `heap_tail` writes to ensure tasks' buffer regions are freed in order.
This is protected by a per-ring try-lock (`advance_lock`) in `RingSchedState`, ensuring only one scheduler thread advances a given ring's watermark at a time.

---

Expand Down
Loading