Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
625 changes: 85 additions & 540 deletions src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp

Large diffs are not rendered by default.

176 changes: 79 additions & 97 deletions src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp

Large diffs are not rendered by default.

45 changes: 18 additions & 27 deletions src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,24 @@ struct PTO2OrchestratorState {
int32_t* aicpu_completed_by_task; // task_id that set the completed state (for slot-reuse validation)
int32_t aicpu_window_mask;

// === ORCHESTRATOR READY QUEUE (early-return path → scheduler) ===
// When the orchestrator discovers a producer already completed, it
// increments the consumer's refcount directly. If that makes the
// consumer ready, the consumer_id is pushed here so scheduler threads
// can pick it up without an O(N) scan.
// SPSC-ish ring: orchestrator writes (single producer), scheduler
// threads read via CAS on orch_ready_head (multiple consumers).
static constexpr int32_t ORCH_READY_QUEUE_SIZE = 4096;
volatile int32_t orch_ready_queue[4096];
volatile int32_t orch_ready_tail; // written by orchestrator only
volatile int32_t orch_ready_head; // advanced by scheduler via CAS
/**
* Allocate packed output buffer for a task
*/
void* pto2_alloc_packed_buffer(int32_t total_size) {
if (total_size <= 0) {
return NULL;
}

void* buffer = heap_ring.pto2_heap_ring_alloc(total_size);

buffers_allocated++;
bytes_allocated += total_size;

// Update shared memory with new heap top
PTO2_STORE_RELEASE(&sm_handle->header->heap_top, heap_ring.top);

return buffer;
}
};

// =============================================================================
Expand Down Expand Up @@ -210,22 +217,6 @@ void pto2_orchestrator_wait_all(PTO2OrchestratorState* orch);
*/
bool pto2_orchestrator_has_space(PTO2OrchestratorState* orch);

// =============================================================================
// Internal Helpers
// =============================================================================

/**
* Add consumer to producer's fanout list (with spinlock)
* Also checks if producer has already completed and updates consumer's fanin_refcount
*/
void pto2_add_consumer_to_producer(
PTO2OrchestratorState* orch, PTO2TaskDescriptor* producer, int32_t producer_id, int32_t consumer_id);

/**
* Allocate packed output buffer for a task
*/
void* pto2_alloc_packed_buffer(PTO2OrchestratorState* orch, int32_t total_size);

// =============================================================================
// Debug Utilities
// =============================================================================
Expand Down
239 changes: 0 additions & 239 deletions src/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@
#include <stdlib.h> // for exit()
#include "common/unified_log.h"

// Set to 1 to enable periodic BLOCKED/Unblocked messages during spin-wait.
#ifndef PTO2_SPIN_VERBOSE_LOGGING
#define PTO2_SPIN_VERBOSE_LOGGING 1
#endif

// =============================================================================
// Heap Ring Buffer Implementation
// =============================================================================
Expand All @@ -30,134 +25,6 @@ void pto2_heap_ring_init(PTO2HeapRing* ring, void* base, uint64_t size,
ring->tail_ptr = tail_ptr;
}

// Block notification interval (in spin counts)
#define PTO2_BLOCK_NOTIFY_INTERVAL 10000
// Heap ring spin limit - after this, report deadlock and exit
#define PTO2_HEAP_SPIN_LIMIT 100000

void* pto2_heap_ring_alloc(PTO2HeapRing* ring, uint64_t size) {
// Align size for DMA efficiency
size = PTO2_ALIGN_UP(size, PTO2_ALIGN_SIZE);

// Spin-wait if insufficient space (back-pressure from Scheduler)
int spin_count = 0;
#if PTO2_SPIN_VERBOSE_LOGGING
bool notified = false;
#endif

while (1) {
void* ptr = pto2_heap_ring_try_alloc(ring, size);
if (ptr != NULL) {
#if PTO2_SPIN_VERBOSE_LOGGING
if (notified) {
LOG_INFO("[HeapRing] Unblocked after %d spins", spin_count);
}
#endif
return ptr;
}

// No space available, spin-wait
spin_count++;

#if PTO2_SPIN_VERBOSE_LOGGING
// Periodic block notification
if (spin_count % PTO2_BLOCK_NOTIFY_INTERVAL == 0 &&
spin_count < PTO2_HEAP_SPIN_LIMIT) {
uint64_t tail = PTO2_LOAD_ACQUIRE(ring->tail_ptr);
uint64_t available = pto2_heap_ring_available(ring);
LOG_WARN("[HeapRing] BLOCKED: requesting %" PRIu64 " bytes, available=%" PRIu64 ", "
"top=%" PRIu64 ", tail=%" PRIu64 ", spins=%d",
size, available, ring->top, tail, spin_count);
notified = true;
}
#endif

if (spin_count >= PTO2_HEAP_SPIN_LIMIT) {
uint64_t tail = PTO2_LOAD_ACQUIRE(ring->tail_ptr);
uint64_t available = pto2_heap_ring_available(ring);
LOG_ERROR("========================================");
LOG_ERROR("FATAL: Heap Ring Deadlock Detected!");
LOG_ERROR("========================================");
LOG_ERROR("Orchestrator blocked waiting for heap space after %d spins.", spin_count);
LOG_ERROR(" - Requested: %" PRIu64 " bytes", size);
LOG_ERROR(" - Available: %" PRIu64 " bytes", available);
LOG_ERROR(" - Heap top: %" PRIu64, ring->top);
LOG_ERROR(" - Heap tail: %" PRIu64, tail);
LOG_ERROR(" - Heap size: %" PRIu64, ring->size);
LOG_ERROR("Solution: Increase PTO2_HEAP_SIZE (e.g. 256*1024 for 4 x 64KB outputs).");
LOG_ERROR("========================================");
exit(1);
}

PTO2_SPIN_PAUSE();
}
}

void* pto2_heap_ring_try_alloc(PTO2HeapRing* ring, uint64_t size) {
// Align size for DMA efficiency
size = PTO2_ALIGN_UP(size, PTO2_ALIGN_SIZE);

// Read latest tail from shared memory (Scheduler updates this)
uint64_t tail = PTO2_LOAD_ACQUIRE(ring->tail_ptr);
uint64_t top = ring->top;

if (top >= tail) {
// Case 1: top is at or ahead of tail (normal case)
// [....tail====top......]
// ^-- space_at_end = size - top

uint64_t space_at_end = ring->size - top;

if (space_at_end >= size) {
// Enough space at end - allocate here
void* ptr = (char*)ring->base + top;
ring->top = top + size;
return ptr;
}

// Not enough space at end - check if we can wrap to beginning
// IMPORTANT: Don't split buffer, skip remaining space at end
if (tail > size) {
// Wrap to beginning (space available: [0, tail))
ring->top = size;
return ring->base;
}

// Not enough space anywhere - return NULL
return NULL;

} else {
// Case 2: top has wrapped, tail is ahead
// [====top....tail=====]
// ^-- free space = tail - top

uint64_t gap = tail - top;
if (gap >= size) {
void* ptr = (char*)ring->base + top;
ring->top = top + size;
return ptr;
}

// Not enough space - return NULL
return NULL;
}
}

uint64_t pto2_heap_ring_available(PTO2HeapRing* ring) {
uint64_t tail = PTO2_LOAD_ACQUIRE(ring->tail_ptr);
uint64_t top = ring->top;

if (top >= tail) {
// Space at end + space at beginning (if any)
uint64_t at_end = ring->size - top;
uint64_t at_begin = tail;
return at_end > at_begin ? at_end : at_begin; // Max usable
} else {
// Contiguous space between top and tail
return tail - top;
}
}

void pto2_heap_ring_reset(PTO2HeapRing* ring) {
ring->top = 0;
}
Expand All @@ -174,112 +41,6 @@ void pto2_task_ring_init(PTO2TaskRing* ring, PTO2TaskDescriptor* descriptors,
ring->last_alive_ptr = last_alive_ptr;
}

// Flow control spin limit - if exceeded, likely deadlock due to scope/fanout_count
#define PTO2_FLOW_CONTROL_SPIN_LIMIT 100000

int32_t pto2_task_ring_alloc(PTO2TaskRing* ring) {
// Spin-wait if window is full (back-pressure from Scheduler)
int spin_count = 0;
#if PTO2_SPIN_VERBOSE_LOGGING
bool notified = false;
#endif

while (1) {
int32_t task_id = pto2_task_ring_try_alloc(ring);
if (task_id >= 0) {
#if PTO2_SPIN_VERBOSE_LOGGING
if (notified) {
LOG_INFO("[TaskRing] Unblocked after %d spins, task_id=%d", spin_count, task_id);
}
#endif
return task_id;
}

// Window is full, spin-wait (with yield to prevent CPU starvation)
spin_count++;

#if PTO2_SPIN_VERBOSE_LOGGING
// Periodic block notification
if (spin_count % PTO2_BLOCK_NOTIFY_INTERVAL == 0 &&
spin_count < PTO2_FLOW_CONTROL_SPIN_LIMIT) {
int32_t last_alive = PTO2_LOAD_ACQUIRE(ring->last_alive_ptr);
int32_t active_count = ring->current_index - last_alive;
LOG_WARN("[TaskRing] BLOCKED (Flow Control): current=%d, last_alive=%d, "
"active=%d/%d (%.1f%%), spins=%d",
ring->current_index, last_alive, active_count, ring->window_size,
100.0 * active_count / ring->window_size, spin_count);
notified = true;
}
#endif

// Check for potential deadlock
if (spin_count >= PTO2_FLOW_CONTROL_SPIN_LIMIT) {
int32_t last_alive = PTO2_LOAD_ACQUIRE(ring->last_alive_ptr);
int32_t active_count = ring->current_index - last_alive;

LOG_ERROR("========================================");
LOG_ERROR("FATAL: Flow Control Deadlock Detected!");
LOG_ERROR("========================================");
LOG_ERROR("Task Ring is FULL and no progress after %d spins.", spin_count);
LOG_ERROR("Flow Control Status:");
LOG_ERROR(" - Current task index: %d", ring->current_index);
LOG_ERROR(" - Last task alive: %d", last_alive);
LOG_ERROR(" - Active tasks: %d", active_count);
LOG_ERROR(" - Window size: %d", ring->window_size);
LOG_ERROR(" - Window utilization: %.1f%%", 100.0 * active_count / ring->window_size);
LOG_ERROR("Root Cause:");
LOG_ERROR(" Tasks cannot transition to CONSUMED state because:");
LOG_ERROR(" - fanout_count includes 1 for the owning scope");
LOG_ERROR(" - scope_end() requires orchestrator to continue");
LOG_ERROR(" - But orchestrator is blocked waiting for task ring space");
LOG_ERROR(" This creates a circular dependency (deadlock).");
LOG_ERROR("Solution:");
LOG_ERROR(" Current task_window_size: %d", ring->window_size);
LOG_ERROR(" Default PTO2_TASK_WINDOW_SIZE: %d", PTO2_TASK_WINDOW_SIZE);
LOG_ERROR(" Recommended: %d (at least 2x current active tasks)", active_count * 2);
LOG_ERROR(" Option 1: Change PTO2_TASK_WINDOW_SIZE in pto_runtime2_types.h");
LOG_ERROR(" Option 2: Use pto2_runtime_create_threaded_custom() with larger");
LOG_ERROR(" task_window_size parameter.");
LOG_ERROR("========================================");

// Abort program
exit(1);
}

PTO2_SPIN_PAUSE();
}
}

int32_t pto2_task_ring_try_alloc(PTO2TaskRing* ring) {
// Read latest last_task_alive from shared memory
int32_t last_alive = PTO2_LOAD_ACQUIRE(ring->last_alive_ptr);
int32_t current = ring->current_index;

// Calculate number of active tasks (handles wrap-around)
int32_t active_count = current - last_alive;

// Check if there's room for one more task
// Leave at least 1 slot empty to distinguish full from empty
if (active_count < ring->window_size - 1) {
int32_t task_id = current;
int32_t slot = task_id & (ring->window_size - 1);

// Mark slot as occupied (skip full memset — pto2_submit_task
// explicitly initializes all fields it needs)
PTO2TaskDescriptor* task = &ring->descriptors[slot];
task->task_id = task_id;
task->is_active = true;

// Advance current index
ring->current_index = current + 1;

return task_id;
}

// Window is full
return -1;
}

int32_t pto2_task_ring_active_count(PTO2TaskRing* ring) {
int32_t last_alive = PTO2_LOAD_ACQUIRE(ring->last_alive_ptr);
return ring->current_index - last_alive;
Expand Down
Loading
Loading