diff --git a/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index 8bed0cbe7..21efb31ad 100644 --- a/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -63,19 +63,6 @@ constexpr int MAX_AIC_PER_THREAD = PLATFORM_MAX_AIC_PER_THREAD; constexpr int MAX_AIV_PER_THREAD = PLATFORM_MAX_AIV_PER_THREAD; constexpr int MAX_CORES_PER_THREAD = PLATFORM_MAX_CORES_PER_THREAD; -// Maximum tasks for ready queue (PTO2 mode uses shared memory task count) -constexpr int AICPU_MAX_READY_TASKS = 16384; -constexpr int AICPU_READY_MASK = AICPU_MAX_READY_TASKS - 1; -// One shard per scheduler thread: push to own shard (thread_idx % shards), pop own first + work stealing -// Runtime-configurable via env var PTO2_READY_QUEUE_SHARDS (1..MAX_AICPU_THREADS). Default=3. - -// Lightweight spinlock (avoids futex syscall overhead of std::mutex) -struct SpinLock { - std::atomic flag{0}; - void lock() { while (flag.exchange(1, std::memory_order_acquire) != 0) { PTO2_SPIN_PAUSE_LIGHT(); } } - void unlock() { flag.store(0, std::memory_order_release); } -}; - // Core information for discovery (with register address for fast dispatch) struct CoreInfo { int worker_id; // Index in runtime.workers[] @@ -84,7 +71,11 @@ struct CoreInfo { CoreType core_type; }; + +static PTO2Runtime *rt{nullptr}; + struct AicpuExecutor { + // ===== Thread management state ===== std::atomic thread_idx_{0}; std::atomic initialized_{false}; @@ -113,18 +104,7 @@ struct AicpuExecutor { // Track executing task_id per core (AICPU_TASK_INVALID = idle) int executing_task_ids_[MAX_CORES_PER_THREAD]; - // ===== N shards per type: push to own shard (thread_idx % N), pop own first + work stealing ===== - // active_shards_ is set at runtime (1..MAX_AICPU_THREADS) via env PTO2_READY_QUEUE_SHARDS - int active_shards_{3}; - SpinLock ready_queue_aic_lock_[MAX_AICPU_THREADS]; - int ready_queue_aic_[MAX_AICPU_THREADS][AICPU_MAX_READY_TASKS]; - int ready_queue_aic_head_[MAX_AICPU_THREADS]{0}; - int ready_queue_aic_tail_[MAX_AICPU_THREADS]{0}; - - SpinLock ready_queue_aiv_lock_[MAX_AICPU_THREADS]; - int ready_queue_aiv_[MAX_AICPU_THREADS][AICPU_MAX_READY_TASKS]; - int ready_queue_aiv_head_[MAX_AICPU_THREADS]{0}; - int ready_queue_aiv_tail_[MAX_AICPU_THREADS]{0}; + // ===== Task queue state (managed by scheduler ready queues) ===== // Task execution tracking std::atomic completed_tasks_{0}; @@ -133,16 +113,8 @@ struct AicpuExecutor { // Device orchestration: set by Thread 3 when graph is built; workers wait for it std::atomic orchestrator_done_{false}; std::atomic pto2_init_done_{false}; + std::atomic runtime_init_ready_{false}; std::atomic pto2_init_complete_{false}; // init block finished; others wait for this - std::atomic next_scan_index_{0}; - std::atomic sm_header_ready_{false}; // Thread 3 sets after SM header init - std::atomic orch_pointers_ready_{false}; // Thread 3 sets after aicpu parallel mode pointers + orch_ready_queue are configured - - // Orchestrator ready queue pointers (set by Thread 3, read by scheduler threads) - volatile int32_t* orch_ready_queue_{nullptr}; - volatile int32_t* orch_ready_tail_{nullptr}; - volatile int32_t* orch_ready_head_{nullptr}; - int32_t orch_ready_capacity_{0}; // Orchestration SO handle - defer dlclose until all tasks complete void* orch_so_handle_{nullptr}; @@ -162,69 +134,15 @@ struct AicpuExecutor { void deinit(); void diagnose_stuck_state(Runtime* runtime, int thread_idx, const int* cur_thread_cores, int core_num, Handshake* hank); - -private: - // Helper: enqueue a ready task to the appropriate shard with profiling - inline void enqueue_ready_task_with_profiling( - int32_t task_id, - int32_t worker_type, - int thread_idx -#if PTO2_ORCH_PROFILING - , uint64_t& wait_counter, - uint64_t& hold_counter -#endif - ); }; static AicpuExecutor g_aicpu_executor; -// PTO2 device-mode state (shared memory view + per-task fanin refcount) -static constexpr int PTO2_MAX_SLOTS = PTO2_TASK_WINDOW_SIZE; -static int s_pto2_fanin_refcount[PTO2_MAX_SLOTS]; -static volatile int32_t s_pto2_task_completed[PTO2_MAX_SLOTS]; -static int32_t s_pto2_completed_by_task[PTO2_MAX_SLOTS]; // task_id that set completed state (for slot-reuse validation) +// PTO2 device-mode state (per-core dispatch payloads) static PTO2DispatchPayload s_pto2_payload_per_core[RUNTIME_MAX_WORKER]; // ===== AicpuExecutor Method Implementations ===== -// Helper: enqueue a ready task to the appropriate shard with profiling -inline void AicpuExecutor::enqueue_ready_task_with_profiling( - int32_t task_id, - int32_t worker_type, - int thread_idx -#if PTO2_ORCH_PROFILING - , uint64_t& wait_counter, - uint64_t& hold_counter -#endif -) { - int my_shard = thread_idx % active_shards_; -#if PTO2_ORCH_PROFILING - uint64_t _l0 = get_sys_cnt_aicpu(), _l1, _l2; -#endif - - if (worker_type == PTO2_WORKER_CUBE) { - ready_queue_aic_lock_[my_shard].lock(); -#if PTO2_ORCH_PROFILING - _l1 = get_sys_cnt_aicpu(); -#endif - ready_queue_aic_[my_shard][ready_queue_aic_tail_[my_shard]++ & AICPU_READY_MASK] = task_id; - ready_queue_aic_lock_[my_shard].unlock(); - } else { - ready_queue_aiv_lock_[my_shard].lock(); -#if PTO2_ORCH_PROFILING - _l1 = get_sys_cnt_aicpu(); -#endif - ready_queue_aiv_[my_shard][ready_queue_aiv_tail_[my_shard]++ & AICPU_READY_MASK] = task_id; - ready_queue_aiv_lock_[my_shard].unlock(); - } - -#if PTO2_ORCH_PROFILING - _l2 = get_sys_cnt_aicpu(); - wait_counter += (_l1 - _l0); - hold_counter += (_l2 - _l1); -#endif -} - /** * Handshake with all cores and discover their types * Sets up register addresses for fast dispatch. @@ -397,17 +315,7 @@ int AicpuExecutor::init(Runtime* runtime) { DEV_INFO("Init: orch_built_on_host=%d", orch_on_host ? 1 : 0); orchestrator_done_.store(orch_on_host, std::memory_order_release); - // Read ready queue shard count from Runtime (already validated by host) - active_shards_ = runtime->ready_queue_shards; - DEV_ALWAYS("Ready queue shards: %d (max=%d)", active_shards_, MAX_AICPU_THREADS); - - // Initial ready tasks will be populated from PTO2 shared memory in resolve_and_dispatch_pto2 - for (int s = 0; s < MAX_AICPU_THREADS; s++) { - ready_queue_aic_head_[s] = 0; - ready_queue_aic_tail_[s] = 0; - ready_queue_aiv_head_[s] = 0; - ready_queue_aiv_tail_[s] = 0; - } + // Initial ready tasks will be populated via scheduler ready queues // Reset per-core dispatch timestamps and task counters for (int i = 0; i < RUNTIME_MAX_WORKER; i++) { @@ -485,14 +393,7 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } DEV_INFO("Thread %d: sm_base=%p", thread_idx, sm_base); - // Device orchestration: wait for last thread to initialize SM header - if (thread_num_ > 1 && !runtime->get_orch_built_on_host()) { - while (!sm_header_ready_.load(std::memory_order_acquire)) { - } - } - PTO2SharedMemoryHeader* header = static_cast(sm_base); - void* gm_heap_base = runtime->get_pto2_gm_heap_ptr(); // For heap_tail offset calc DEV_INFO("Thread %d: header=%p, task_desc_offset=%d, dep_pool_offset=%d, window_size=%d", thread_idx, (void*)header, header->task_descriptors_offset, header->dep_list_pool_offset, header->task_window_size); @@ -505,19 +406,16 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, thread_idx, (void*)task_descriptors, (void*)dep_list_pool); int32_t window_size = header->task_window_size; - if (window_size <= 0 || window_size > PTO2_MAX_SLOTS) window_size = PTO2_MAX_SLOTS; + if (window_size <= 0 || window_size > PTO2_TASK_WINDOW_SIZE) window_size = PTO2_TASK_WINDOW_SIZE; int32_t window_mask = window_size - 1; Handshake* hank = static_cast(runtime->workers); DEV_INFO("Thread %d: hank=%p, window_size=%d", thread_idx, (void*)hank, window_size); - // One-time init: clear refcount and completed arrays (one thread does it; others wait) + // One-time init: assign perf buffers (one thread does it; others wait) if (!pto2_init_done_.exchange(true, std::memory_order_acq_rel)) { DEV_INFO("Thread %d: doing one-time init", thread_idx); - std::memset(s_pto2_fanin_refcount, 0, sizeof(s_pto2_fanin_refcount)); - std::memset((void*)s_pto2_task_completed, 0, sizeof(s_pto2_task_completed)); - std::memset(s_pto2_completed_by_task, -1, sizeof(s_pto2_completed_by_task)); // Assign perf buffers to cores early so profiling captures all tasks // (total_tasks written to header later when orchestrator completes) @@ -533,13 +431,6 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, pto2_init_complete_.store(true, std::memory_order_release); } else { while (!pto2_init_complete_.load(std::memory_order_acquire)) { - } - } - - // Wait for last thread to finish setting up aicpu parallel mode pointers - // and orch_ready_queue before entering the scheduling loop. - if (thread_num_ > 1 && !runtime->get_orch_built_on_host()) { - while (!orch_pointers_ready_.load(std::memory_order_acquire)) { std::this_thread::yield(); } } @@ -561,37 +452,19 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, // Scheduler profiling counters #if PTO2_ORCH_PROFILING uint64_t sched_scan_cycle = 0; - uint64_t sched_early_ready_cycle = 0; + uint64_t sched_orch_drain_cycle = 0; uint64_t sched_complete_cycle = 0; uint64_t sched_dispatch_cycle = 0; + uint64_t sched_yield_cycle = 0; uint64_t sched_loop_count = 0; - uint64_t sched_scan_ready_wait = 0, sched_scan_ready_hold = 0; - uint64_t sched_early_ready_wait = 0, sched_early_ready_hold = 0; - uint64_t sched_complete_ready_wait = 0, sched_complete_ready_hold = 0; - uint64_t sched_dispatch_hit_wait = 0, sched_dispatch_hit_hold = 0; - uint64_t sched_dispatch_miss_wait = 0, sched_dispatch_miss_hold = 0; - uint64_t ready_pop_own = 0, ready_pop_steal = 0; + uint64_t sched_yield_count = 0; #endif - // Phase profiling: per-phase task counters - uint32_t phase_complete_count = 0; - uint32_t phase_dispatch_count = 0; - uint32_t phase_scan_count = 0; - uint32_t phase_early_ready_count = 0; - // Fanout traversal statistics: how many downstream deps were notified after task completions - uint64_t fanout_edges_notified = 0; - int32_t fanout_max_degree = 0; while (true) { #if PTO2_ORCH_PROFILING sched_loop_count++; #endif CYCLE_COUNT_START(); - // Phase profiling: record start time for this iteration - uint64_t _t0_phase = _t0; - phase_complete_count = 0; - phase_dispatch_count = 0; - phase_scan_count = 0; - phase_early_ready_count = 0; // Dynamic task_count (Thread 3 sets total_tasks_ when orchestration completes) int32_t task_count = total_tasks_.load(std::memory_order_acquire); bool orch_done = orchestrator_done_.load(std::memory_order_acquire); @@ -635,6 +508,7 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, PTO2DispatchPayload* payload = &s_pto2_payload_per_core[core_id]; int32_t task_id = executing_task_ids_[core_id]; + pto2_scheduler_on_task_complete(&rt->scheduler, task_id); executing_task_ids_[core_id] = AICPU_TASK_INVALID; // Write AICPU dispatch/finish timestamps into the PerfRecord @@ -654,141 +528,12 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } } - PTO2TaskDescriptor* pto2_task = &task_descriptors[task_id & window_mask]; - DEV_DEBUG("Thread %d: Core %d completed PTO2 task %d", thread_idx, core_id, task_id); - // Mark completed (state=2), then snapshot fanout_head under the per-task spinlock. - // - // WHY THE LOCK IS REQUIRED (device orchestration / AICPU parallel mode): - // The orchestrator (Thread 3) runs concurrently with the scheduler threads and - // may still be adding consumers to this task's fanout list via - // pto2_add_consumer_to_producer(). That function holds fanout_lock while it - // (a) checks the completion state and (b) prepends to fanout_head. - // - // Without the lock here we have a TOCTOU race: - // 1. Orch acquires lock, checks state=0 (task still running), plans insert. - // 2. Task finishes; we store state=2 (RELEASE) but haven't acquired the lock. - // 3. Orch inserts consumer X into fanout_head, releases lock. - // 4. We read the OLD fanout_head (before X was inserted) → X is never woken. - // - // By acquiring the lock AFTER storing state=2 we guarantee mutual exclusion: - // • If Orch holds the lock first → it writes fanout_head → we read it with X. - // • If we acquire the lock first → Orch's subsequent lock-acquire sees state=2 - // via the release/acquire pair and takes the early-return path, directly - // incrementing X's fanin_refcount instead of touching fanout_head. - // Either way every consumer is accounted for exactly once. - __atomic_store_n(&s_pto2_completed_by_task[task_id & window_mask], task_id, __ATOMIC_RELEASE); - __atomic_store_n(&s_pto2_task_completed[task_id & window_mask], 2, __ATOMIC_RELEASE); - pto2_fanout_lock(pto2_task); - int32_t fanout_head = (int32_t)pto2_task->fanout_head; - pto2_fanout_unlock(pto2_task); - - // Traverse fanout (no lock) - // - // SEQ_CST on the refcount increment and fanin_count load breaks the IRIW - // (Independent Reads of Independent Writes) hazard with the orchestrator's - // Step 5 / Step 5b: - // - // Thread 0 (here): Thread 3 (orchestrator Step 5/5b): - // fetch_add(refcount, SEQ_CST) store(fanin_count=N, SEQ_CST) - // load(fanin_count, SEQ_CST) load(refcount, SEQ_CST) - // - // On ARM (IRIW is architecturally allowed with ACQ/REL), both threads could - // simultaneously read stale values — this thread sees fanin_count=0 and Step 5b - // sees refcount 0) { - fanout_len++; - PTO2DepListEntry* entry = &dep_list_pool[current]; - int32_t consumer_id = entry->task_id; - int32_t consumer_slot = consumer_id & window_mask; - int prev = __atomic_fetch_add(&s_pto2_fanin_refcount[consumer_slot], 1, __ATOMIC_SEQ_CST); - PTO2TaskDescriptor* consumer_desc = &task_descriptors[consumer_slot]; - int32_t fanin_count = __atomic_load_n(&consumer_desc->fanin_count, __ATOMIC_SEQ_CST); - if (prev + 1 == fanin_count) { - __atomic_store_n(&s_pto2_task_completed[consumer_slot], 1, __ATOMIC_RELEASE); - enqueue_ready_task_with_profiling( - consumer_id, consumer_desc->worker_type, thread_idx -#if PTO2_ORCH_PROFILING - , sched_complete_ready_wait, sched_complete_ready_hold -#endif - ); - } - current = entry->next_offset; - } - fanout_edges_notified += fanout_len; - if (fanout_len > fanout_max_degree) fanout_max_degree = fanout_len; - cur_thread_tasks_in_flight--; cur_thread_completed++; - phase_complete_count++; made_progress = true; completed_tasks_.fetch_add(1, std::memory_order_release); - - // Advance last_task_alive for TaskRing flow control. - // Mark this task as fully consumed (state=3), then try to - // advance the watermark using lock-free CAS. - // - // ORDERING: Mark completed as state=3 and reset refcount BEFORE advancing last_task_alive. - // Once last_task_alive advances past a slot, the orchestrator can - // immediately reuse it. The early-return path in - // pto2_add_consumer_to_producer checks aicpu_task_completed[prod_slot]; - // if we reset AFTER the CAS, the orchestrator could see stale state=3 - // from the old task and incorrectly skip dependency setup. - __atomic_store_n(&s_pto2_task_completed[task_id & window_mask], 3, __ATOMIC_RELEASE); - { - int32_t la = __atomic_load_n(&header->last_task_alive, __ATOMIC_ACQUIRE); - int32_t cti = __atomic_load_n(&header->current_task_index, __ATOMIC_ACQUIRE); - while (la < cti) { - int32_t la_slot = la & window_mask; - if (__atomic_load_n(&s_pto2_task_completed[la_slot], __ATOMIC_ACQUIRE) < 3) - break; - // Only reset refcount — the orchestrator's early-return path - // (pto2_add_consumer_to_producer) MUST see completed >= 2 when - // the producer has actually finished, per the fanout lock protocol. - // completed_by_task guards against stale state from recycled slots: - // the old task's completed_by_task won't match the new producer_id. - __atomic_store_n(&s_pto2_fanin_refcount[la_slot], 0, __ATOMIC_RELEASE); - // Advance last_task_alive to make this slot available. - int32_t expected = la; - if (__atomic_compare_exchange_n(&header->last_task_alive, &expected, la + 1, - false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) { - // Serialize heap_tail writes via ticket-based generation counter. - // Without this, concurrent CAS winners can interleave their - // heap_tail stores, causing stale regression (see design note below). - // - // DESIGN: heap_tail_gen tracks which task's tail was last written. - // Each CAS winner waits for gen==la (its ticket), writes heap_tail, - // then advances gen to la+1. The critical section is ~3 instructions, - // so the spin is effectively zero in the common (no-preemption) case. - while (__atomic_load_n(&header->heap_tail_gen, __ATOMIC_ACQUIRE) != la) { - } - - // Advance heap_tail for HeapRing flow control - PTO2TaskDescriptor* consumed_t = &task_descriptors[la_slot]; - if (consumed_t->packed_buffer_end != nullptr) { - uint64_t new_tail = (uint64_t)((char*)consumed_t->packed_buffer_end - (char*)gm_heap_base); - if (new_tail <= header->heap_size) { - __atomic_store_n(&header->heap_tail, new_tail, __ATOMIC_RELEASE); - } - } - - // Release next writer - __atomic_store_n(&header->heap_tail_gen, la + 1, __ATOMIC_RELEASE); - - la = la + 1; - } else { - break; - } - } - } - // Debug: periodic progress (thread 0 only) to find which task hangs if (thread_idx == 0 && task_count > 0) { int32_t c = completed_tasks_.load(std::memory_order_relaxed); @@ -800,13 +545,6 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } } CYCLE_COUNT_LAP(sched_complete_cycle); -#if PTO2_ORCH_PROFILING - if (profiling_enabled) { - perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_COMPLETE, - _t0_phase, _t1, static_cast(sched_loop_count), phase_complete_count); - _t0_phase = _t1; - } -#endif // Phase 2: Dispatch ready tasks to idle cores (register-based dispatch) if (cur_thread_tasks_in_flight < core_num) { @@ -817,76 +555,8 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, int reg_state = EXTRACT_TASK_STATE(reg_val); if (reg_state == TASK_FIN_STATE && executing_task_ids_[core_id] == AICPU_TASK_INVALID) { Handshake* h = &hank[core_id]; - int32_t task_id = AICPU_TASK_INVALID; -#if PTO2_ORCH_PROFILING - bool found_task = false; - bool is_stolen = false; -#endif - int my_shard = thread_idx % active_shards_; - if (h->core_type == CoreType::AIC) { - for (int k = 0; k < active_shards_ && task_id < 0; k++) { - int shard = (my_shard + k) % active_shards_; -#if PTO2_ORCH_PROFILING - uint64_t _l0 = get_sys_cnt_aicpu(); -#endif - ready_queue_aic_lock_[shard].lock(); -#if PTO2_ORCH_PROFILING - uint64_t _l1 = get_sys_cnt_aicpu(); -#endif - if (ready_queue_aic_head_[shard] < ready_queue_aic_tail_[shard]) { - task_id = ready_queue_aic_[shard][ready_queue_aic_head_[shard]++ & AICPU_READY_MASK]; - ready_queue_aic_lock_[shard].unlock(); -#if PTO2_ORCH_PROFILING - uint64_t _l2 = get_sys_cnt_aicpu(); - sched_dispatch_hit_wait += (_l1 - _l0); - sched_dispatch_hit_hold += (_l2 - _l1); - found_task = true; - is_stolen = (k != 0); -#endif - break; - } - ready_queue_aic_lock_[shard].unlock(); -#if PTO2_ORCH_PROFILING - uint64_t _l2 = get_sys_cnt_aicpu(); - sched_dispatch_miss_wait += (_l1 - _l0); - sched_dispatch_miss_hold += (_l2 - _l1); -#endif - } - } else { - for (int k = 0; k < active_shards_ && task_id < 0; k++) { - int shard = (my_shard + k) % active_shards_; -#if PTO2_ORCH_PROFILING - uint64_t _l0 = get_sys_cnt_aicpu(); -#endif - ready_queue_aiv_lock_[shard].lock(); -#if PTO2_ORCH_PROFILING - uint64_t _l1 = get_sys_cnt_aicpu(); -#endif - if (ready_queue_aiv_head_[shard] < ready_queue_aiv_tail_[shard]) { - task_id = ready_queue_aiv_[shard][ready_queue_aiv_head_[shard]++ & AICPU_READY_MASK]; - ready_queue_aiv_lock_[shard].unlock(); -#if PTO2_ORCH_PROFILING - uint64_t _l2 = get_sys_cnt_aicpu(); - sched_dispatch_hit_wait += (_l1 - _l0); - sched_dispatch_hit_hold += (_l2 - _l1); - found_task = true; - is_stolen = (k != 0); -#endif - break; - } - ready_queue_aiv_lock_[shard].unlock(); -#if PTO2_ORCH_PROFILING - uint64_t _l2 = get_sys_cnt_aicpu(); - sched_dispatch_miss_wait += (_l1 - _l0); - sched_dispatch_miss_hold += (_l2 - _l1); -#endif - } - } -#if PTO2_ORCH_PROFILING - if (found_task) { - if (is_stolen) ready_pop_steal++; else ready_pop_own++; - } -#endif + PTO2WorkerType wt = (h->core_type == CoreType::AIC) ? PTO2_WORKER_CUBE : PTO2_WORKER_VECTOR; + int32_t task_id = pto2_scheduler_get_ready_task(&rt->scheduler, wt); if (task_id >= 0) { PTO2TaskDescriptor* task = &task_descriptors[task_id & window_mask]; PTO2DispatchPayload* payload = &s_pto2_payload_per_core[core_id]; @@ -900,11 +570,9 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } core_dispatch_counts_[core_id]++; } - write_reg(reg_addr, RegId::DATA_MAIN_BASE, static_cast(task_id + 1)); executing_task_ids_[core_id] = task_id; cur_thread_tasks_in_flight++; - phase_dispatch_count++; made_progress = true; DEV_DEBUG("Thread %d: Dispatching PTO2 task %d to core %d", thread_idx, task_id, core_id); } @@ -912,19 +580,10 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } } CYCLE_COUNT_LAP(sched_dispatch_cycle); -#if PTO2_ORCH_PROFILING - if (profiling_enabled) { - perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_DISPATCH, - _t0_phase, _t1, static_cast(sched_loop_count), phase_dispatch_count); - _t0_phase = _t1; - } -#endif - // Incremental scan: discover root tasks (fanin_count == 0) + // Update perf header total_tasks if visible tasks have changed { int32_t visible = __atomic_load_n(&header->current_task_index, __ATOMIC_ACQUIRE); - - // Update perf header total_tasks if visible tasks have changed if (profiling_enabled && visible > 0 && visible != last_reported_task_count) { perf_aicpu_update_total_tasks(runtime, static_cast(visible)); @@ -933,79 +592,8 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, last_reported_task_count = visible; } - - while (true) { - int32_t idx = next_scan_index_.load(std::memory_order_acquire); - if (idx >= visible) break; - if (!next_scan_index_.compare_exchange_weak(idx, idx + 1, - std::memory_order_acq_rel, std::memory_order_acquire)) continue; - - int32_t slot = idx & window_mask; - - PTO2TaskDescriptor* t = &task_descriptors[slot]; - int32_t fanin_count = __atomic_load_n(&t->fanin_count, __ATOMIC_ACQUIRE); - if (fanin_count == 0) { - // Mark as enqueued (state=1) to prevent double-enqueue - __atomic_store_n(&s_pto2_task_completed[slot], 1, __ATOMIC_RELEASE); - enqueue_ready_task_with_profiling( - idx, t->worker_type, thread_idx -#if PTO2_ORCH_PROFILING - , sched_scan_ready_wait, sched_scan_ready_hold -#endif - ); - phase_scan_count++; - made_progress = true; - } - } } CYCLE_COUNT_LAP(sched_scan_cycle); -#if PTO2_ORCH_PROFILING - if (profiling_enabled) { - perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_SCAN, - _t0_phase, _t1, static_cast(sched_loop_count), phase_scan_count); - _t0_phase = _t1; - } -#endif - - // Early-ready drain: tasks whose deps were already met at submit time - // (orchestrator detected all producers completed → pushed to orch_ready_queue_) - if (orch_ready_queue_ != nullptr) { - while (true) { - int32_t head = __atomic_load_n(orch_ready_head_, __ATOMIC_ACQUIRE); - int32_t tail = __atomic_load_n(orch_ready_tail_, __ATOMIC_ACQUIRE); - if (head == tail) break; // queue empty - - // CAS to claim this slot (multiple scheduler threads compete) - if (!__atomic_compare_exchange_n(orch_ready_head_, &head, head + 1, - false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) continue; - - int32_t task_id = orch_ready_queue_[head & (orch_ready_capacity_ - 1)]; - int32_t slot = task_id & window_mask; - - // CAS from 0 → 1 to claim enqueue rights (may already be enqueued by fanout path) - int32_t expected = 0; - if (!__atomic_compare_exchange_n(&s_pto2_task_completed[slot], &expected, 1, - false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) continue; - - PTO2TaskDescriptor* t = &task_descriptors[slot]; - enqueue_ready_task_with_profiling( - task_id, t->worker_type, thread_idx -#if PTO2_ORCH_PROFILING - , sched_early_ready_wait, sched_early_ready_hold -#endif - ); - phase_early_ready_count++; - made_progress = true; - } - } - CYCLE_COUNT_LAP(sched_early_ready_cycle); -#if PTO2_ORCH_PROFILING - if (profiling_enabled) { - perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_EARLY_READY, - _t0_phase, _t1, static_cast(sched_loop_count), phase_early_ready_count); - _t0_phase = _t1; - } -#endif if (!made_progress) { idle_iterations++; @@ -1013,31 +601,30 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, int32_t c = completed_tasks_.load(std::memory_order_relaxed); DEV_ALWAYS("PTO2 stall: no progress for %d iterations, completed=%d total=%d", idle_iterations, c, task_count); - // Scan all task slots to find truly stuck tasks - // state=0: not yet completed (may be waiting for deps or ready but not enqueued) - // state=1: enqueued in ready queue or dispatched to hardware - // state=2: completed by Phase 1 + // Scan all task slots to find truly stuck tasks using scheduler state + PTO2SchedulerState* sched = &rt->scheduler; int cnt_ready = 0, cnt_waiting = 0, cnt_inflight = 0; for (int si = 0; si < task_count; si++) { - int32_t st = __atomic_load_n(&s_pto2_task_completed[si], __ATOMIC_RELAXED); - int32_t rc = __atomic_load_n(&s_pto2_fanin_refcount[si], __ATOMIC_RELAXED); - int32_t fi = __atomic_load_n(&task_descriptors[si].fanin_count, __ATOMIC_RELAXED); - int32_t kid = task_descriptors[si].kernel_id; - if (st == 2) continue; // Already done - if (st == 1) { cnt_inflight++; continue; } - // st == 0 + int32_t slot = si & window_mask; + PTO2TaskState st = (PTO2TaskState)__atomic_load_n(&sched->task_state[slot], __ATOMIC_RELAXED); + int32_t rc = __atomic_load_n(&sched->fanin_refcount[slot], __ATOMIC_RELAXED); + int32_t fi = __atomic_load_n(&task_descriptors[slot].fanin_count, __ATOMIC_RELAXED); + int32_t kid = task_descriptors[slot].kernel_id; + if (st >= PTO2_TASK_COMPLETED) continue; // Already done + if (st == PTO2_TASK_READY || st == PTO2_TASK_RUNNING) { cnt_inflight++; continue; } + // PENDING if (rc >= fi) { // Ready (all deps satisfied) but not enqueued — this is the real bug cnt_ready++; if (cnt_ready <= STALL_DUMP_READY_MAX) { - DEV_ALWAYS(" STUCK-READY slot=%d kernel_id=%d refcount=%d fanin=%d", - si, kid, rc, fi); + DEV_ALWAYS(" STUCK-READY slot=%d kernel_id=%d refcount=%d fanin=%d state=%d", + slot, kid, rc, fi, (int)st); } } else { cnt_waiting++; if (cnt_waiting <= STALL_DUMP_WAIT_MAX) { - DEV_ALWAYS(" STUCK-WAIT slot=%d kernel_id=%d refcount=%d fanin=%d", - si, kid, rc, fi); + DEV_ALWAYS(" STUCK-WAIT slot=%d kernel_id=%d refcount=%d fanin=%d state=%d", + slot, kid, rc, fi, (int)st); } } } @@ -1069,6 +656,10 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } else { SPIN_WAIT_HINT(); } +#if PTO2_ORCH_PROFILING + sched_yield_count++; +#endif + CYCLE_COUNT_LAP(sched_yield_cycle); } else { idle_iterations = 0; } @@ -1076,56 +667,33 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, #if PTO2_ORCH_PROFILING uint64_t sched_total = - sched_scan_cycle + sched_early_ready_cycle + sched_complete_cycle + sched_dispatch_cycle; + sched_scan_cycle + sched_orch_drain_cycle + sched_complete_cycle + sched_dispatch_cycle + sched_yield_cycle; if (sched_total == 0) sched_total = 1; // avoid div-by-zero - double tasks_per_loop = sched_loop_count > 0 ? (double)cur_thread_completed / sched_loop_count : 0.0; - - // === Summary === - DEV_ALWAYS("Thread %d: === PTO2 Scheduler Summary ===", thread_idx); - DEV_ALWAYS("Thread %d: completed=%d tasks in %.0fus (%llu loops, %.1f tasks/loop)", - thread_idx, cur_thread_completed, cycles_to_us(sched_total), - (unsigned long long)sched_loop_count, tasks_per_loop); - - // --- Phase Breakdown (execution order) --- - DEV_ALWAYS("Thread %d: --- Phase Breakdown (execution order) ---", thread_idx); - DEV_ALWAYS("Thread %d: scan: %8.0fus (%4.1f%%)", - thread_idx, cycles_to_us(sched_scan_cycle), sched_scan_cycle * 100.0 / sched_total); - DEV_ALWAYS("Thread %d: early_ready: %8.0fus (%4.1f%%) (deps already met at submit time)", - thread_idx, cycles_to_us(sched_early_ready_cycle), sched_early_ready_cycle * 100.0 / sched_total); - DEV_ALWAYS("Thread %d: complete: %8.0fus (%4.1f%%) [fanout: edges=%llu, max_degree=%d, avg=%.1f]", - thread_idx, cycles_to_us(sched_complete_cycle), sched_complete_cycle * 100.0 / sched_total, - (unsigned long long)fanout_edges_notified, fanout_max_degree, - cur_thread_completed > 0 ? (double)fanout_edges_notified / cur_thread_completed : 0.0); - DEV_ALWAYS("Thread %d: dispatch: %8.0fus (%4.1f%%) [steal: own=%llu, steal=%llu, pct=%.1f%%]", - thread_idx, cycles_to_us(sched_dispatch_cycle), sched_dispatch_cycle * 100.0 / sched_total, - (unsigned long long)ready_pop_own, (unsigned long long)ready_pop_steal, - (ready_pop_own + ready_pop_steal) > 0 ? 100.0 * (double)ready_pop_steal / (double)(ready_pop_own + ready_pop_steal) : 0.0); - - // --- Lock Contention (ready_q) --- - DEV_ALWAYS("Thread %d: --- Lock Contention (ready_q) ---", thread_idx); - DEV_ALWAYS("Thread %d: total: wait=%5.0fus hold=%5.0fus", - thread_idx, - (double)cycles_to_us(sched_scan_ready_wait + sched_early_ready_wait + sched_complete_ready_wait + sched_dispatch_hit_wait + sched_dispatch_miss_wait), - (double)cycles_to_us(sched_scan_ready_hold + sched_early_ready_hold + sched_complete_ready_hold + sched_dispatch_hit_hold + sched_dispatch_miss_hold)); - DEV_ALWAYS("Thread %d: scan: wait=%5.0fus hold=%5.0fus", - thread_idx, - (double)cycles_to_us(sched_scan_ready_wait), (double)cycles_to_us(sched_scan_ready_hold)); - DEV_ALWAYS("Thread %d: early_ready: wait=%5.0fus hold=%5.0fus", + DEV_ALWAYS("Thread %d: PTO2 scheduler stats: loops=%llu, completed=%d, total=%.3fus", thread_idx, - (double)cycles_to_us(sched_early_ready_wait), (double)cycles_to_us(sched_early_ready_hold)); - DEV_ALWAYS("Thread %d: complete: wait=%5.0fus hold=%5.0fus", + (unsigned long long)sched_loop_count, + cur_thread_completed, + cycles_to_us(sched_total)); + DEV_ALWAYS( + "Thread %d: scan=%.3fus (%.1f%%), orch_drain=%.3fus (%.1f%%), complete=%.3fus (%.1f%%), dispatch=%.3fus " + "(%.1f%%)", thread_idx, - (double)cycles_to_us(sched_complete_ready_wait), (double)cycles_to_us(sched_complete_ready_hold)); - DEV_ALWAYS("Thread %d: dispatch: wait=%5.0fus hold=%5.0fus", + cycles_to_us(sched_scan_cycle), + sched_scan_cycle * 100.0 / sched_total, + cycles_to_us(sched_orch_drain_cycle), + sched_orch_drain_cycle * 100.0 / sched_total, + cycles_to_us(sched_complete_cycle), + sched_complete_cycle * 100.0 / sched_total, + cycles_to_us(sched_dispatch_cycle), + sched_dispatch_cycle * 100.0 / sched_total); + DEV_ALWAYS("Thread %d: yield=%.3fus (%.1f%%, %llu calls, avg=%.1fus)", thread_idx, - (double)cycles_to_us(sched_dispatch_hit_wait + sched_dispatch_miss_wait), - (double)cycles_to_us(sched_dispatch_hit_hold + sched_dispatch_miss_hold)); - DEV_ALWAYS("Thread %d: hit: wait=%5.0fus hold=%5.0fus (dequeued task)", - thread_idx, - (double)cycles_to_us(sched_dispatch_hit_wait), (double)cycles_to_us(sched_dispatch_hit_hold)); - DEV_ALWAYS("Thread %d: miss: wait=%5.0fus hold=%5.0fus (empty queue)", - thread_idx, - (double)cycles_to_us(sched_dispatch_miss_wait), (double)cycles_to_us(sched_dispatch_miss_hold)); + cycles_to_us(sched_yield_cycle), + sched_yield_cycle * 100.0 / sched_total, + (unsigned long long)sched_yield_count, + sched_yield_count > 0 ? cycles_to_us(sched_yield_cycle) / sched_yield_count : 0.0); + + DEV_ALWAYS("Thread %d: PTO2 execution complete, completed %d tasks", thread_idx, cur_thread_completed); #endif // Flush performance buffers for cores managed by this thread @@ -1146,6 +714,7 @@ int AicpuExecutor::run(Runtime* runtime) { // Thread 3 when 4 AICPU threads: orchestrator (no cores) if (thread_num_ == 4 && thread_idx == 3) { + rt = nullptr; if (runtime->get_orch_built_on_host()) { DEV_INFO("Thread 3: Host orchestration mode, no-op"); } else { @@ -1256,6 +825,13 @@ int AicpuExecutor::run(Runtime* runtime) { DEV_INFO("Thread 3: No config function, using defaults"); } + if (expected_arg_count > 0 && arg_count < expected_arg_count) { + DEV_ERROR("Thread 3: arg_count %d < expected %d", arg_count, expected_arg_count); + dlclose(handle); + unlink(so_path); + return -1; + } + // Apply ring buffer size overrides from Runtime (set by host env vars) if (runtime->pto2_task_window_size > 0) { task_window_size = runtime->pto2_task_window_size; @@ -1269,16 +845,8 @@ int AicpuExecutor::run(Runtime* runtime) { DEV_INFO("Thread 3: Ring sizes: task_window=%lu, heap=%lu, dep_pool=%lu", (unsigned long)task_window_size, (unsigned long)heap_size, (unsigned long)dep_list_pool_size); - if (expected_arg_count > 0 && arg_count < expected_arg_count) { - DEV_ERROR("Thread 3: arg_count %d < expected %d", arg_count, expected_arg_count); - dlclose(handle); - unlink(so_path); - return -1; - } - // Get GM heap from runtime (dedicated field) void* sm_ptr = runtime->get_pto2_gm_sm_ptr(); - PTO2SharedMemoryHeader* header = static_cast(sm_ptr); void* gm_heap = runtime->get_pto2_gm_heap_ptr(); // Create shared memory handle and runtime (ops table populated inside) @@ -1293,10 +861,7 @@ int AicpuExecutor::run(Runtime* runtime) { return -1; } - // Signal scheduler threads that SM header is initialized - sm_header_ready_.store(true, std::memory_order_release); - - PTO2Runtime* rt = pto2_runtime_create_from_sm(PTO2_MODE_EXECUTE, + rt = pto2_runtime_create_from_sm(PTO2_MODE_EXECUTE, sm_handle, gm_heap, heap_size); if (!rt) { DEV_ERROR("Thread 3: Failed to create PTO2Runtime"); @@ -1306,27 +871,13 @@ int AicpuExecutor::run(Runtime* runtime) { return -1; } - // Wait for scheduler's one-time init to complete (ensures memset has executed) + runtime_init_ready_.store(true, std::memory_order_release); + + // Wait for scheduler's one-time init to complete while (!pto2_init_complete_.load(std::memory_order_acquire)) { + std::this_thread::yield(); } - // Set orchestrator's aicpu parallel mode pointers - uint64_t ws = header->task_window_size; - if (ws == 0 || ws > PTO2_MAX_SLOTS) ws = PTO2_MAX_SLOTS; - rt->orchestrator.aicpu_fanin_refcount = s_pto2_fanin_refcount; - rt->orchestrator.aicpu_task_completed = s_pto2_task_completed; - rt->orchestrator.aicpu_completed_by_task = s_pto2_completed_by_task; - rt->orchestrator.aicpu_window_mask = ws - 1; - - // Expose orchestrator ready queue to scheduler threads - orch_ready_queue_ = rt->orchestrator.orch_ready_queue; - orch_ready_tail_ = &rt->orchestrator.orch_ready_tail; - orch_ready_head_ = &rt->orchestrator.orch_ready_head; - orch_ready_capacity_ = PTO2OrchestratorState::ORCH_READY_QUEUE_SIZE; - - // Signal scheduler threads: all pointers are ready, safe to start scheduling. - orch_pointers_ready_.store(true, std::memory_order_release); - // Call orchestration wrapped in outer scope (matches old PTO2_ORCHESTRATION behavior) DEV_ALWAYS("Thread 3: Calling aicpu_orchestration_entry from SO"); uint64_t orch_cycle_start = get_sys_cnt_aicpu(); @@ -1419,6 +970,13 @@ int AicpuExecutor::run(Runtime* runtime) { } else { // Note: Handshake already completed in init() via handshake_all_cores() + // Device orchestration: wait for Thread 3 to initialize SM header + if (!runtime->get_orch_built_on_host()) { + while (!runtime_init_ready_.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + } + always_assert(rt != nullptr); DEV_INFO("Thread %d: Starting PTO2 dispatch", thread_idx); int completed = resolve_and_dispatch_pto2(runtime, thread_idx, cur_thread_cores, my_cores); DEV_INFO("Thread %d: Executed %d tasks from runtime", thread_idx, completed); @@ -1442,14 +1000,6 @@ int AicpuExecutor::run(Runtime* runtime) { } void AicpuExecutor::deinit() { - // Cleanup runtime execution state (clear all max slots for safety) - for (int s = 0; s < MAX_AICPU_THREADS; s++) { - ready_queue_aic_head_[s] = 0; - ready_queue_aic_tail_[s] = 0; - ready_queue_aiv_head_[s] = 0; - ready_queue_aiv_tail_[s] = 0; - } - // Reset per-core dispatch timestamps and task counters for (int i = 0; i < RUNTIME_MAX_WORKER; i++) { dispatch_timestamps_[i] = 0; @@ -1462,13 +1012,7 @@ void AicpuExecutor::deinit() { orchestrator_done_.store(false, std::memory_order_release); pto2_init_done_.store(false, std::memory_order_release); pto2_init_complete_.store(false, std::memory_order_release); - next_scan_index_.store(0, std::memory_order_release); - sm_header_ready_.store(false, std::memory_order_release); - orch_pointers_ready_.store(false, std::memory_order_release); - orch_ready_queue_ = nullptr; - orch_ready_tail_ = nullptr; - orch_ready_head_ = nullptr; - orch_ready_capacity_ = 0; + runtime_init_ready_.store(false, std::memory_order_release); // Reset core discovery state aic_count_ = 0; @@ -1503,12 +1047,13 @@ void AicpuExecutor::diagnose_stuck_state(Runtime* runtime, int thread_idx, DEV_ALWAYS("Progress: %d/%d tasks (%.1f%%)", completed, total, total > 0 ? completed * 100.0 / total : 0.0); - int aic_ready = 0, aiv_ready = 0; - for (int s = 0; s < active_shards_; s++) { - aic_ready += ready_queue_aic_tail_[s] - ready_queue_aic_head_[s]; - aiv_ready += ready_queue_aiv_tail_[s] - ready_queue_aiv_head_[s]; + uint64_t aic_ready = 0, aiv_ready = 0; + if (rt) { + PTO2SchedulerState* sched = &rt->scheduler; + aic_ready = pto2_ready_queue_count(&sched->ready_queues[PTO2_WORKER_CUBE]); + aiv_ready = pto2_ready_queue_count(&sched->ready_queues[PTO2_WORKER_VECTOR]); } - DEV_ALWAYS("Ready Queues (%d shards, per-thread push + work-steal pop): AIC=%d, AIV=%d", active_shards_, aic_ready, aiv_ready); + DEV_ALWAYS("Ready Queues: AIC=%lu, AIV=%lu", aic_ready, aiv_ready); int busy_cores = 0; int idle_cores = 0; diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index 2a9908a08..67c897aee 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -15,6 +15,7 @@ #include "common/unified_log.h" #include "pto_tensormap.h" +#include "pto_types.h" #include "tensor.h" // ============================================================================= @@ -81,7 +82,7 @@ bool pto2_orchestrator_init( pto2_dep_pool_init(&orch->dep_pool, sm_handle->dep_list_pool, (int32_t)sm_handle->header->dep_list_pool_size); // Initialize TensorMap - if (!pto2_tensormap_init_default(&orch->tensor_map)) { + if (!orch->tensor_map.init_default()) { return false; } orch->tensor_map.orch = orch; @@ -95,7 +96,7 @@ bool pto2_orchestrator_init( if (!orch->scope_tasks || !orch->scope_begins) { free(orch->scope_tasks); free(orch->scope_begins); - pto2_tensormap_destroy(&orch->tensor_map); + orch->tensor_map.destroy(); return false; } orch->scope_tasks_size = 0; @@ -110,7 +111,7 @@ bool pto2_orchestrator_init( } void pto2_orchestrator_destroy(PTO2OrchestratorState* orch) { - pto2_tensormap_destroy(&orch->tensor_map); + orch->tensor_map.destroy(); free(orch->scope_tasks); orch->scope_tasks = NULL; @@ -122,7 +123,7 @@ void pto2_orchestrator_reset(PTO2OrchestratorState* orch) { pto2_heap_ring_reset(&orch->heap_ring); pto2_task_ring_reset(&orch->task_ring); pto2_dep_pool_reset(&orch->dep_pool); - pto2_tensormap_reset(&orch->tensor_map); + orch->tensor_map.reset(); orch->tensormap_last_cleanup = 0; orch->scope_stack_top = -1; @@ -212,10 +213,6 @@ void pto2_add_consumer_to_producer( if (orch->aicpu_task_completed) { int32_t prod_slot = producer_id & orch->aicpu_window_mask; if (__atomic_load_n(&orch->aicpu_task_completed[prod_slot], __ATOMIC_ACQUIRE) >= 2 && - // RELAXED is sufficient: the ACQUIRE on aicpu_task_completed above - // synchronizes with the RELEASE on task_completed in the scheduler, - // and completed_by_task is stored (with RELEASE) sequenced-before - // task_completed — so it is visible after the ACQUIRE load above. __atomic_load_n(&orch->aicpu_completed_by_task[prod_slot], __ATOMIC_RELAXED) == producer_id) { int32_t cons_slot = consumer_id & orch->aicpu_window_mask; __atomic_fetch_add(&orch->aicpu_fanin_refcount[cons_slot], 1, __ATOMIC_ACQ_REL); @@ -231,11 +228,11 @@ void pto2_add_consumer_to_producer( // Check if producer has already completed (scheduler mode) if (orch->scheduler) { PTO2SchedulerState* sched = orch->scheduler; - int32_t prod_slot = pto2_task_slot(sched, producer_id); + int32_t prod_slot = sched->pto2_task_slot(producer_id); int32_t prod_state = __atomic_load_n(&sched->task_state[prod_slot], __ATOMIC_ACQUIRE); if (prod_state >= PTO2_TASK_COMPLETED) { - int32_t cons_slot = pto2_task_slot(sched, consumer_id); + int32_t cons_slot = sched->pto2_task_slot(consumer_id); __atomic_fetch_add(&sched->fanin_refcount[cons_slot], 1, __ATOMIC_SEQ_CST); } } @@ -244,22 +241,6 @@ void pto2_add_consumer_to_producer( pto2_fanout_unlock(producer); } -void* pto2_alloc_packed_buffer(PTO2OrchestratorState* orch, int32_t total_size) { - if (total_size <= 0) { - return NULL; - } - - void* buffer = pto2_heap_ring_alloc(&orch->heap_ring, total_size); - - orch->buffers_allocated++; - orch->bytes_allocated += total_size; - - // Update shared memory with new heap top - PTO2_STORE_RELEASE(&orch->sm_handle->header->heap_top, orch->heap_ring.top); - - return buffer; -} - void pto2_submit_task(PTO2OrchestratorState* orch, int32_t kernel_id, PTO2WorkerType worker_type, @@ -268,15 +249,15 @@ void pto2_submit_task(PTO2OrchestratorState* orch, CYCLE_COUNT_START(); // === STEP 0: Sync TensorMap validity and optional cleanup === - pto2_orchestrator_sync_tensormap(&orch->tensor_map); + orch->tensor_map.sync_tensormap(); CYCLE_COUNT_LAP_RECORD(g_orch_sync_cycle, AicpuPhaseId::ORCH_SYNC); // Submission without an open scope is illegal - assert(orch->scope_stack_top >= 0 && "Cannot submit task outside a scope"); + always_assert(orch->scope_stack_top >= 0 && "Cannot submit task outside a scope"); // === STEP 1: Allocate task slot from Task Ring (blocks until available) === - int32_t task_id = pto2_task_ring_alloc(&orch->task_ring); + int32_t task_id = orch->task_ring.pto2_task_ring_alloc(); CYCLE_COUNT_LAP_RECORD(g_orch_alloc_cycle, AicpuPhaseId::ORCH_ALLOC); @@ -303,22 +284,18 @@ void pto2_submit_task(PTO2OrchestratorState* orch, task->fanout_count = 1; task->packed_buffer_base = NULL; task->packed_buffer_end = NULL; - task->num_outputs = 0; task->is_active = true; // Register this task in its owning scope scope_tasks_push(orch, task_id); - // Temporary storage for collecting output sizes - int32_t total_output_size = 0; - // Temporary storage for fanin int32_t fanin_temp[PTO2_MAX_INPUTS]; int32_t fanin_count = 0; task->param_count = num_params; for (int i = 0; i < num_params; i++) { - task->params[i].type = params[i].type; + task->params[i].type = params[i].type; if (params[i].type == PTOParamType::SCALAR) { task->params[i].scalar_value = params[i].scalar_value; } else { @@ -328,8 +305,28 @@ void pto2_submit_task(PTO2OrchestratorState* orch, CYCLE_COUNT_LAP_RECORD(g_orch_params_cycle, AicpuPhaseId::ORCH_PARAMS); - // === STEP 2: First pass - collect output sizes and process inputs === + // Temporary storage for collecting output sizes + int32_t total_output_size = 0; + for (int i = 0; i < num_params; i++) { + PTOParam& p = task->params[i]; + if (p.type != PTOParamType::OUTPUT) { + continue; + } + auto& tensor_data = p.tensor.data(); + // Only allocate from ring buffer when caller did not provide an address + if (tensor_data.buffer.addr == 0) { + total_output_size += PTO2_ALIGN_UP(tensor_data.buffer.size, PTO2_PACKED_OUTPUT_ALIGN); + } + } + + if (total_output_size > 0) { + task->packed_buffer_base = orch->pto2_alloc_packed_buffer(total_output_size); + task->packed_buffer_end = (char*)task->packed_buffer_base + total_output_size; + } + CYCLE_COUNT_LAP_RECORD(g_orch_heap_cycle, AicpuPhaseId::ORCH_HEAP); + // === STEP 2: First pass - set output addr and process tensor === + int32_t offset = 0; for (int i = 0; i < num_params; i++) { PTOParam& p = task->params[i]; @@ -338,11 +335,10 @@ void pto2_submit_task(PTO2OrchestratorState* orch, case PTOParamType::INPUT: { // Look up producer via TensorMap PTO2LookupResult lookup_result; - pto2_tensormap_lookup(&orch->tensor_map, p.tensor, &lookup_result); + orch->tensor_map.lookup(p.tensor, lookup_result); for (int r = 0; r < lookup_result.count; r++) { - int32_t entry_idx = lookup_result.entries[r].entry_idx; - auto &entry = orch->tensor_map.entry_pool[entry_idx]; + PTO2TensorMapEntry &entry = *lookup_result.entries[r].entry; auto overlap_status = lookup_result.entries[r].overlap_status; // Check if this producer is already in fanin list (avoid duplicates) int producer_task_id = entry.producer_task_id; @@ -359,10 +355,6 @@ void pto2_submit_task(PTO2OrchestratorState* orch, if (fanin_count < PTO2_MAX_INPUTS) { fanin_temp[fanin_count++] = producer_task_id; } - - // Add this task to producer's fanout list (with spinlock) - PTO2TaskDescriptor* producer = pto2_task_ring_get(&orch->task_ring, producer_task_id); - pto2_add_consumer_to_producer(orch, producer, producer_task_id, task_id); } if (p.type == PTOParamType::INOUT && overlap_status == OverlapStatus::COVERED) { // inout因为会再次insert进tensor map, @@ -370,7 +362,7 @@ void pto2_submit_task(PTO2OrchestratorState* orch, // 应将前面的tensor从tensor map中剔除。 // 但是最开始的tensor除外,因为必须建立和最开始的task的依赖关系以保证tensor生命周期的正确管理 if (!entry.with_alloc) { - pto2_tensormap_remove_entry(orch->tensor_map, entry_idx); + orch->tensor_map.remove_entry(entry); } } } @@ -379,9 +371,13 @@ void pto2_submit_task(PTO2OrchestratorState* orch, case PTOParamType::OUTPUT: { auto &tensor_data = p.tensor.data(); - // Only allocate from ring buffer when caller did not provide an address + // Offsets: each output at 1024B-aligned slot; slot size = ALIGN_UP(size, 1024) + // Allocation happens here only; no memcpy of buffer content. Caller's tensor gets addr written back. if (tensor_data.buffer.addr == 0) { - total_output_size += PTO2_ALIGN_UP(tensor_data.buffer.size, PTO2_PACKED_OUTPUT_ALIGN); + uint64_t alloc_addr = reinterpret_cast((char*)task->packed_buffer_base + offset); + tensor_data.buffer.addr = alloc_addr; + offset += PTO2_ALIGN_UP(tensor_data.buffer.size, PTO2_PACKED_OUTPUT_ALIGN); + } break; } @@ -392,30 +388,6 @@ void pto2_submit_task(PTO2OrchestratorState* orch, CYCLE_COUNT_LAP_RECORD(g_orch_lookup_cycle, AicpuPhaseId::ORCH_LOOKUP); - // === STEP 3: Allocate packed buffer from Heap Ring (may stall) === - // Each output slot is aligned to PTO2_PACKED_OUTPUT_ALIGN (1024B); gap after data is padding. - if (total_output_size > 0) { - task->packed_buffer_base = pto2_alloc_packed_buffer(orch, total_output_size); - task->packed_buffer_end = (char*)task->packed_buffer_base + total_output_size; - - // Offsets: each output at 1024B-aligned slot; slot size = ALIGN_UP(size, 1024) - // Allocation happens here only; no memcpy of buffer content. Caller's tensor gets addr written back. - int32_t offset = 0; - for (int i = 0; i < task->param_count; i++) { - PTOParam& p = task->params[i]; - if (p.type == PTOParamType::OUTPUT) { - auto &tensor_data = p.tensor.data(); - if (tensor_data.buffer.addr == 0) { - uint64_t alloc_addr = reinterpret_cast((char*)task->packed_buffer_base + offset); - tensor_data.buffer.addr = alloc_addr; - offset += PTO2_ALIGN_UP(tensor_data.buffer.size, PTO2_PACKED_OUTPUT_ALIGN); - } - task->output_index[task->num_outputs++] = i; - } - } - } - - CYCLE_COUNT_LAP_RECORD(g_orch_heap_cycle, AicpuPhaseId::ORCH_HEAP); // === STEP 4: Second pass - register outputs in TensorMap === for (int i = 0; i < num_params; i++) { @@ -423,7 +395,7 @@ void pto2_submit_task(PTO2OrchestratorState* orch, if (p.type == PTOParamType::OUTPUT || p.type == PTOParamType::INOUT) { // Register in TensorMap: this tensor is produced by task_id // Use task's tensor_copies (which has the heap-allocated address for outputs) - pto2_tensormap_insert(&orch->tensor_map, p.tensor, task_id, p.type == PTOParamType::OUTPUT); + orch->tensor_map.insert(p.tensor, task_id, p.type == PTOParamType::OUTPUT); } } @@ -431,40 +403,50 @@ void pto2_submit_task(PTO2OrchestratorState* orch, // === STEP 5: Finalize fanin list === // First build the fanin list - for (int i = 0; i < fanin_count; i++) { - task->fanin_head = pto2_dep_list_prepend(&orch->dep_pool, task->fanin_head, fanin_temp[i]); + if (orch->scheduler) { + PTO2SchedulerState* sched = orch->scheduler; + int32_t slot = sched->pto2_task_slot(task_id); + + int32_t early_finished = 0; + task->fanin_count = fanin_count + 1; // +1 redundance for not being ready too early + for (int i = 0; i < fanin_count; i++) { + int32_t producer_task_id = fanin_temp[i]; + // Add this task to producer's fanout list (with spinlock) + PTO2TaskDescriptor* producer = pto2_task_ring_get(&orch->task_ring, producer_task_id); + pto2_fanout_lock(producer); + producer->fanout_head = pto2_dep_list_prepend(&orch->dep_pool, producer->fanout_head, task_id); + producer->fanout_count++; + // Normal path: prepend consumer to producer's fanout list + task->fanin_head = pto2_dep_list_prepend(&orch->dep_pool, task->fanin_head, producer_task_id); + + int32_t prod_slot = sched->pto2_task_slot(producer_task_id); + int32_t prod_state = __atomic_load_n(&sched->task_state[prod_slot], __ATOMIC_ACQUIRE); + if (prod_state == PTO2_TASK_COMPLETED) { + // Early return optimization: if producer already completed, we can skip adding dependency and directly decrement fanin_count + early_finished++; + } + pto2_fanout_unlock(producer); + } + if (early_finished > 0) { + __atomic_fetch_add(&sched->fanin_refcount[slot], early_finished, __ATOMIC_SEQ_CST); + } + } else { + // No scheduler: just build fanin list + add to producers using pto2_add_consumer_to_producer + for (int i = 0; i < fanin_count; i++) { + task->fanin_head = pto2_dep_list_prepend(&orch->dep_pool, task->fanin_head, fanin_temp[i]); + PTO2TaskDescriptor* producer = pto2_task_ring_get(&orch->task_ring, fanin_temp[i]); + pto2_add_consumer_to_producer(orch, producer, fanin_temp[i], task_id); + } + __atomic_store_n(&task->fanin_count, fanin_count, __ATOMIC_SEQ_CST); } - // SEQ_CST store: participates in the global total order with Phase 1's SEQ_CST - // fetch_add on s_pto2_fanin_refcount to prevent the IRIW hazard on ARM. - // (See comment above the fetch_add in aicpu_executor.cpp Phase 1 for details.) - __atomic_store_n(&task->fanin_count, fanin_count, __ATOMIC_SEQ_CST); CYCLE_COUNT_LAP_RECORD(g_orch_fanin_cycle, AicpuPhaseId::ORCH_FANIN); - // === STEP 5b: Check if task is already ready (all producers completed via early-return) === - // In AICPU parallel mode, early-return in pto2_add_consumer_to_producer may have - // already incremented aicpu_fanin_refcount for this task. Now that fanin_count is - // finalized, check if the task is already satisfied and push it to the orchestrator - // ready queue so scheduler threads can pick it up without an O(N) scan. - if (orch->aicpu_fanin_refcount && fanin_count > 0) { - int32_t slot = task_id & orch->aicpu_window_mask; - int32_t refcount = __atomic_load_n(&orch->aicpu_fanin_refcount[slot], __ATOMIC_SEQ_CST); - if (refcount >= fanin_count) { - // All producers already completed — push to orch ready queue - int32_t tail = orch->orch_ready_tail; - int32_t capacity = PTO2OrchestratorState::ORCH_READY_QUEUE_SIZE; - int32_t head = __atomic_load_n(&orch->orch_ready_head, __ATOMIC_ACQUIRE); - if (((tail + 1) & (capacity - 1)) != (head & (capacity - 1))) { - orch->orch_ready_queue[tail & (capacity - 1)] = task_id; - __atomic_store_n(&orch->orch_ready_tail, tail + 1, __ATOMIC_RELEASE); - } - } - } // === STEP 6: Initialize task in scheduler === // In multi-threaded mode, scheduler thread handles task initialization via polling if (orch->scheduler && orch->init_task_on_submit) { - pto2_scheduler_init_task(orch->scheduler, task_id, task); + orch->scheduler->init_task(task_id, task); } // === STEP 7: Update shared memory with current task index === @@ -515,7 +497,7 @@ void pto2_orchestrator_print_stats(PTO2OrchestratorState* orch) { LOG_INFO("Task ring active: %d", pto2_task_ring_active_count(&orch->task_ring)); LOG_INFO("Heap ring used: %" PRIu64 " / %" PRIu64, orch->heap_ring.top, orch->heap_ring.size); LOG_INFO("Dep pool used: %d / %d", pto2_dep_pool_used(&orch->dep_pool), orch->dep_pool.capacity); - LOG_INFO("TensorMap valid: %d", pto2_tensormap_valid_count(&orch->tensor_map)); + LOG_INFO("TensorMap valid: %d", orch->tensor_map.valid_count()); LOG_INFO("==============================="); } diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h b/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h index acc72454b..129c24243 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h @@ -82,17 +82,24 @@ struct PTO2OrchestratorState { int32_t* aicpu_completed_by_task; // task_id that set the completed state (for slot-reuse validation) int32_t aicpu_window_mask; - // === ORCHESTRATOR READY QUEUE (early-return path → scheduler) === - // When the orchestrator discovers a producer already completed, it - // increments the consumer's refcount directly. If that makes the - // consumer ready, the consumer_id is pushed here so scheduler threads - // can pick it up without an O(N) scan. - // SPSC-ish ring: orchestrator writes (single producer), scheduler - // threads read via CAS on orch_ready_head (multiple consumers). - static constexpr int32_t ORCH_READY_QUEUE_SIZE = 4096; - volatile int32_t orch_ready_queue[4096]; - volatile int32_t orch_ready_tail; // written by orchestrator only - volatile int32_t orch_ready_head; // advanced by scheduler via CAS + /** + * Allocate packed output buffer for a task + */ + void* pto2_alloc_packed_buffer(int32_t total_size) { + if (total_size <= 0) { + return NULL; + } + + void* buffer = heap_ring.pto2_heap_ring_alloc(total_size); + + buffers_allocated++; + bytes_allocated += total_size; + + // Update shared memory with new heap top + PTO2_STORE_RELEASE(&sm_handle->header->heap_top, heap_ring.top); + + return buffer; + } }; // ============================================================================= @@ -210,22 +217,6 @@ void pto2_orchestrator_wait_all(PTO2OrchestratorState* orch); */ bool pto2_orchestrator_has_space(PTO2OrchestratorState* orch); -// ============================================================================= -// Internal Helpers -// ============================================================================= - -/** - * Add consumer to producer's fanout list (with spinlock) - * Also checks if producer has already completed and updates consumer's fanin_refcount - */ -void pto2_add_consumer_to_producer( - PTO2OrchestratorState* orch, PTO2TaskDescriptor* producer, int32_t producer_id, int32_t consumer_id); - -/** - * Allocate packed output buffer for a task - */ -void* pto2_alloc_packed_buffer(PTO2OrchestratorState* orch, int32_t total_size); - // ============================================================================= // Debug Utilities // ============================================================================= diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.cpp b/src/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.cpp index 6c2ab39b9..8bd0b0d19 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.cpp +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.cpp @@ -13,11 +13,6 @@ #include // for exit() #include "common/unified_log.h" -// Set to 1 to enable periodic BLOCKED/Unblocked messages during spin-wait. -#ifndef PTO2_SPIN_VERBOSE_LOGGING -#define PTO2_SPIN_VERBOSE_LOGGING 1 -#endif - // ============================================================================= // Heap Ring Buffer Implementation // ============================================================================= @@ -30,134 +25,6 @@ void pto2_heap_ring_init(PTO2HeapRing* ring, void* base, uint64_t size, ring->tail_ptr = tail_ptr; } -// Block notification interval (in spin counts) -#define PTO2_BLOCK_NOTIFY_INTERVAL 10000 -// Heap ring spin limit - after this, report deadlock and exit -#define PTO2_HEAP_SPIN_LIMIT 100000 - -void* pto2_heap_ring_alloc(PTO2HeapRing* ring, uint64_t size) { - // Align size for DMA efficiency - size = PTO2_ALIGN_UP(size, PTO2_ALIGN_SIZE); - - // Spin-wait if insufficient space (back-pressure from Scheduler) - int spin_count = 0; -#if PTO2_SPIN_VERBOSE_LOGGING - bool notified = false; -#endif - - while (1) { - void* ptr = pto2_heap_ring_try_alloc(ring, size); - if (ptr != NULL) { -#if PTO2_SPIN_VERBOSE_LOGGING - if (notified) { - LOG_INFO("[HeapRing] Unblocked after %d spins", spin_count); - } -#endif - return ptr; - } - - // No space available, spin-wait - spin_count++; - -#if PTO2_SPIN_VERBOSE_LOGGING - // Periodic block notification - if (spin_count % PTO2_BLOCK_NOTIFY_INTERVAL == 0 && - spin_count < PTO2_HEAP_SPIN_LIMIT) { - uint64_t tail = PTO2_LOAD_ACQUIRE(ring->tail_ptr); - uint64_t available = pto2_heap_ring_available(ring); - LOG_WARN("[HeapRing] BLOCKED: requesting %" PRIu64 " bytes, available=%" PRIu64 ", " - "top=%" PRIu64 ", tail=%" PRIu64 ", spins=%d", - size, available, ring->top, tail, spin_count); - notified = true; - } -#endif - - if (spin_count >= PTO2_HEAP_SPIN_LIMIT) { - uint64_t tail = PTO2_LOAD_ACQUIRE(ring->tail_ptr); - uint64_t available = pto2_heap_ring_available(ring); - LOG_ERROR("========================================"); - LOG_ERROR("FATAL: Heap Ring Deadlock Detected!"); - LOG_ERROR("========================================"); - LOG_ERROR("Orchestrator blocked waiting for heap space after %d spins.", spin_count); - LOG_ERROR(" - Requested: %" PRIu64 " bytes", size); - LOG_ERROR(" - Available: %" PRIu64 " bytes", available); - LOG_ERROR(" - Heap top: %" PRIu64, ring->top); - LOG_ERROR(" - Heap tail: %" PRIu64, tail); - LOG_ERROR(" - Heap size: %" PRIu64, ring->size); - LOG_ERROR("Solution: Increase PTO2_HEAP_SIZE (e.g. 256*1024 for 4 x 64KB outputs)."); - LOG_ERROR("========================================"); - exit(1); - } - - PTO2_SPIN_PAUSE(); - } -} - -void* pto2_heap_ring_try_alloc(PTO2HeapRing* ring, uint64_t size) { - // Align size for DMA efficiency - size = PTO2_ALIGN_UP(size, PTO2_ALIGN_SIZE); - - // Read latest tail from shared memory (Scheduler updates this) - uint64_t tail = PTO2_LOAD_ACQUIRE(ring->tail_ptr); - uint64_t top = ring->top; - - if (top >= tail) { - // Case 1: top is at or ahead of tail (normal case) - // [....tail====top......] - // ^-- space_at_end = size - top - - uint64_t space_at_end = ring->size - top; - - if (space_at_end >= size) { - // Enough space at end - allocate here - void* ptr = (char*)ring->base + top; - ring->top = top + size; - return ptr; - } - - // Not enough space at end - check if we can wrap to beginning - // IMPORTANT: Don't split buffer, skip remaining space at end - if (tail > size) { - // Wrap to beginning (space available: [0, tail)) - ring->top = size; - return ring->base; - } - - // Not enough space anywhere - return NULL - return NULL; - - } else { - // Case 2: top has wrapped, tail is ahead - // [====top....tail=====] - // ^-- free space = tail - top - - uint64_t gap = tail - top; - if (gap >= size) { - void* ptr = (char*)ring->base + top; - ring->top = top + size; - return ptr; - } - - // Not enough space - return NULL - return NULL; - } -} - -uint64_t pto2_heap_ring_available(PTO2HeapRing* ring) { - uint64_t tail = PTO2_LOAD_ACQUIRE(ring->tail_ptr); - uint64_t top = ring->top; - - if (top >= tail) { - // Space at end + space at beginning (if any) - uint64_t at_end = ring->size - top; - uint64_t at_begin = tail; - return at_end > at_begin ? at_end : at_begin; // Max usable - } else { - // Contiguous space between top and tail - return tail - top; - } -} - void pto2_heap_ring_reset(PTO2HeapRing* ring) { ring->top = 0; } @@ -174,112 +41,6 @@ void pto2_task_ring_init(PTO2TaskRing* ring, PTO2TaskDescriptor* descriptors, ring->last_alive_ptr = last_alive_ptr; } -// Flow control spin limit - if exceeded, likely deadlock due to scope/fanout_count -#define PTO2_FLOW_CONTROL_SPIN_LIMIT 100000 - -int32_t pto2_task_ring_alloc(PTO2TaskRing* ring) { - // Spin-wait if window is full (back-pressure from Scheduler) - int spin_count = 0; -#if PTO2_SPIN_VERBOSE_LOGGING - bool notified = false; -#endif - - while (1) { - int32_t task_id = pto2_task_ring_try_alloc(ring); - if (task_id >= 0) { -#if PTO2_SPIN_VERBOSE_LOGGING - if (notified) { - LOG_INFO("[TaskRing] Unblocked after %d spins, task_id=%d", spin_count, task_id); - } -#endif - return task_id; - } - - // Window is full, spin-wait (with yield to prevent CPU starvation) - spin_count++; - -#if PTO2_SPIN_VERBOSE_LOGGING - // Periodic block notification - if (spin_count % PTO2_BLOCK_NOTIFY_INTERVAL == 0 && - spin_count < PTO2_FLOW_CONTROL_SPIN_LIMIT) { - int32_t last_alive = PTO2_LOAD_ACQUIRE(ring->last_alive_ptr); - int32_t active_count = ring->current_index - last_alive; - LOG_WARN("[TaskRing] BLOCKED (Flow Control): current=%d, last_alive=%d, " - "active=%d/%d (%.1f%%), spins=%d", - ring->current_index, last_alive, active_count, ring->window_size, - 100.0 * active_count / ring->window_size, spin_count); - notified = true; - } -#endif - - // Check for potential deadlock - if (spin_count >= PTO2_FLOW_CONTROL_SPIN_LIMIT) { - int32_t last_alive = PTO2_LOAD_ACQUIRE(ring->last_alive_ptr); - int32_t active_count = ring->current_index - last_alive; - - LOG_ERROR("========================================"); - LOG_ERROR("FATAL: Flow Control Deadlock Detected!"); - LOG_ERROR("========================================"); - LOG_ERROR("Task Ring is FULL and no progress after %d spins.", spin_count); - LOG_ERROR("Flow Control Status:"); - LOG_ERROR(" - Current task index: %d", ring->current_index); - LOG_ERROR(" - Last task alive: %d", last_alive); - LOG_ERROR(" - Active tasks: %d", active_count); - LOG_ERROR(" - Window size: %d", ring->window_size); - LOG_ERROR(" - Window utilization: %.1f%%", 100.0 * active_count / ring->window_size); - LOG_ERROR("Root Cause:"); - LOG_ERROR(" Tasks cannot transition to CONSUMED state because:"); - LOG_ERROR(" - fanout_count includes 1 for the owning scope"); - LOG_ERROR(" - scope_end() requires orchestrator to continue"); - LOG_ERROR(" - But orchestrator is blocked waiting for task ring space"); - LOG_ERROR(" This creates a circular dependency (deadlock)."); - LOG_ERROR("Solution:"); - LOG_ERROR(" Current task_window_size: %d", ring->window_size); - LOG_ERROR(" Default PTO2_TASK_WINDOW_SIZE: %d", PTO2_TASK_WINDOW_SIZE); - LOG_ERROR(" Recommended: %d (at least 2x current active tasks)", active_count * 2); - LOG_ERROR(" Option 1: Change PTO2_TASK_WINDOW_SIZE in pto_runtime2_types.h"); - LOG_ERROR(" Option 2: Use pto2_runtime_create_threaded_custom() with larger"); - LOG_ERROR(" task_window_size parameter."); - LOG_ERROR("========================================"); - - // Abort program - exit(1); - } - - PTO2_SPIN_PAUSE(); - } -} - -int32_t pto2_task_ring_try_alloc(PTO2TaskRing* ring) { - // Read latest last_task_alive from shared memory - int32_t last_alive = PTO2_LOAD_ACQUIRE(ring->last_alive_ptr); - int32_t current = ring->current_index; - - // Calculate number of active tasks (handles wrap-around) - int32_t active_count = current - last_alive; - - // Check if there's room for one more task - // Leave at least 1 slot empty to distinguish full from empty - if (active_count < ring->window_size - 1) { - int32_t task_id = current; - int32_t slot = task_id & (ring->window_size - 1); - - // Mark slot as occupied (skip full memset — pto2_submit_task - // explicitly initializes all fields it needs) - PTO2TaskDescriptor* task = &ring->descriptors[slot]; - task->task_id = task_id; - task->is_active = true; - - // Advance current index - ring->current_index = current + 1; - - return task_id; - } - - // Window is full - return -1; -} - int32_t pto2_task_ring_active_count(PTO2TaskRing* ring) { int32_t last_alive = PTO2_LOAD_ACQUIRE(ring->last_alive_ptr); return ring->current_index - last_alive; diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h b/src/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h index f7bd0a4bf..d0bd5421c 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h @@ -26,8 +26,25 @@ #ifndef PTO_RING_BUFFER_H #define PTO_RING_BUFFER_H +#include +#include // for exit() + #include "pto_runtime2_types.h" #include "pto_shared_memory.h" +#include "common/unified_log.h" + +// Set to 1 to enable periodic BLOCKED/Unblocked messages during spin-wait. +#ifndef PTO2_SPIN_VERBOSE_LOGGING +#define PTO2_SPIN_VERBOSE_LOGGING 1 +#endif + +// Block notification interval (in spin counts) +#define PTO2_BLOCK_NOTIFY_INTERVAL 10000 +// Heap ring spin limit - after this, report deadlock and exit +#define PTO2_HEAP_SPIN_LIMIT 100000 + +// Flow control spin limit - if exceeded, likely deadlock due to scope/fanout_count +#define PTO2_FLOW_CONTROL_SPIN_LIMIT 100000 // ============================================================================= // Heap Ring Buffer @@ -47,6 +64,144 @@ struct PTO2HeapRing { // Reference to shared memory tail (for back-pressure) volatile uint64_t* tail_ptr; // Points to header->heap_tail + /** + * Allocate memory from heap ring + * + * O(1) bump allocation with wrap-around. + * May STALL (spin-wait) if insufficient space (back-pressure). + * Never splits a buffer across the wrap-around boundary. + * + * @param size Requested size in bytes + * @return Pointer to allocated memory, never NULL (stalls instead) + */ + void* pto2_heap_ring_alloc(uint64_t size) { + // Align size for DMA efficiency + size = PTO2_ALIGN_UP(size, PTO2_ALIGN_SIZE); + + // Spin-wait if insufficient space (back-pressure from Scheduler) + int spin_count = 0; +#if PTO2_SPIN_VERBOSE_LOGGING + bool notified = false; +#endif + + while (1) { + void* ptr = pto2_heap_ring_try_alloc(size); + if (ptr != NULL) { +#if PTO2_SPIN_VERBOSE_LOGGING + if (notified) { + LOG_INFO("[HeapRing] Unblocked after %d spins", spin_count); + } +#endif + return ptr; + } + + // No space available, spin-wait + spin_count++; + +#if PTO2_SPIN_VERBOSE_LOGGING + // Periodic block notification + if (spin_count % PTO2_BLOCK_NOTIFY_INTERVAL == 0 && spin_count < PTO2_HEAP_SPIN_LIMIT) { + uint64_t tail = PTO2_LOAD_ACQUIRE(tail_ptr); + uint64_t available = pto2_heap_ring_available(); + LOG_WARN("[HeapRing] BLOCKED: requesting %" PRIu64 " bytes, available=%" PRIu64 + ", top=%" PRIu64 ", tail=%" PRIu64 ", spins=%d", + size, available, top, tail, spin_count); + notified = true; + } +#endif + + if (spin_count >= PTO2_HEAP_SPIN_LIMIT) { + uint64_t tail = PTO2_LOAD_ACQUIRE(tail_ptr); + uint64_t available = pto2_heap_ring_available(); + LOG_ERROR("========================================"); + LOG_ERROR("FATAL: Heap Ring Deadlock Detected!"); + LOG_ERROR("========================================"); + LOG_ERROR("Orchestrator blocked waiting for heap space after %d spins.", spin_count); + LOG_ERROR(" - Requested: %" PRIu64 " bytes", size); + LOG_ERROR(" - Available: %" PRIu64 " bytes", available); + LOG_ERROR(" - Heap top: %" PRIu64, top); + LOG_ERROR(" - Heap tail: %" PRIu64, tail); + LOG_ERROR(" - Heap size: %" PRIu64, this->size); + LOG_ERROR("Solution: Increase PTO2_HEAP_SIZE (e.g. 256*1024 for 4 x 64KB outputs)."); + LOG_ERROR("========================================"); + exit(1); + } + + PTO2_SPIN_PAUSE(); + } + } + + /** + * Try to allocate memory without stalling + * + * @param size Requested size in bytes + * @return Pointer to allocated memory, or NULL if no space + */ + void* pto2_heap_ring_try_alloc(uint64_t alloc_size) { + // Align size for DMA efficiency + alloc_size = PTO2_ALIGN_UP(alloc_size, PTO2_ALIGN_SIZE); + + // Read latest tail from shared memory (Scheduler updates this) + uint64_t tail = PTO2_LOAD_ACQUIRE(tail_ptr); + + if (top >= tail) { + // Case 1: top is at or ahead of tail (normal case) + // [....tail====top......] + // ^-- space_at_end = size - top + + uint64_t space_at_end = size - top; + + if (space_at_end >= alloc_size) { + // Enough space at end - allocate here + void* ptr = (char*)base + top; + top += alloc_size; + return ptr; + } + + // Not enough space at end - check if we can wrap to beginning + // IMPORTANT: Don't split buffer, skip remaining space at end + if (tail > alloc_size) { + // Wrap to beginning (space available: [0, tail)) + top = alloc_size; + return base; + } + + // Not enough space anywhere - return NULL + return NULL; + + } else { + // Case 2: top has wrapped, tail is ahead + // [====top....tail=====] + // ^-- free space = tail - top + + uint64_t gap = tail - top; + if (gap >= alloc_size) { + void* ptr = (char*)base + top; + top += alloc_size; + return ptr; + } + + // Not enough space - return NULL + return NULL; + } + } + + /** + * Get available space in heap ring + */ + uint64_t pto2_heap_ring_available() { + uint64_t tail = PTO2_LOAD_ACQUIRE(tail_ptr); + + if (top >= tail) { + // Space at end + space at beginning (if any) + uint64_t at_end = size - top; + uint64_t at_begin = tail; + return at_end > at_begin ? at_end : at_begin; // Max usable + } else { + // Contiguous space between top and tail + return tail - top; + } + } }; /** @@ -60,33 +215,6 @@ struct PTO2HeapRing { void pto2_heap_ring_init(PTO2HeapRing* ring, void* base, uint64_t size, volatile uint64_t* tail_ptr); -/** - * Allocate memory from heap ring - * - * O(1) bump allocation with wrap-around. - * May STALL (spin-wait) if insufficient space (back-pressure). - * Never splits a buffer across the wrap-around boundary. - * - * @param ring Heap ring - * @param size Requested size in bytes - * @return Pointer to allocated memory, never NULL (stalls instead) - */ -void* pto2_heap_ring_alloc(PTO2HeapRing* ring, uint64_t size); - -/** - * Try to allocate memory without stalling - * - * @param ring Heap ring - * @param size Requested size in bytes - * @return Pointer to allocated memory, or NULL if no space - */ -void* pto2_heap_ring_try_alloc(PTO2HeapRing* ring, uint64_t size); - -/** - * Get available space in heap ring - */ -uint64_t pto2_heap_ring_available(PTO2HeapRing* ring); - /** * Reset heap ring to initial state */ @@ -109,7 +237,121 @@ struct PTO2TaskRing { // Reference to shared memory last_task_alive (for back-pressure) volatile int32_t* last_alive_ptr; // Points to header->last_task_alive - + + /** + * Allocate a task slot from task ring + * + * May STALL (spin-wait) if window is full (back-pressure). + * Initializes the task descriptor to default values. + * + * @return Allocated task ID (absolute, not wrapped) + */ + int32_t pto2_task_ring_alloc() { + // Spin-wait if window is full (back-pressure from Scheduler) + int spin_count = 0; +#if PTO2_SPIN_VERBOSE_LOGGING + bool notified = false; +#endif + + while (1) { + int32_t task_id = pto2_task_ring_try_alloc(); + if (task_id >= 0) { +#if PTO2_SPIN_VERBOSE_LOGGING + if (notified) { + LOG_INFO("[TaskRing] Unblocked after %d spins, task_id=%d", spin_count, task_id); + } +#endif + return task_id; + } + + // Window is full, spin-wait (with yield to prevent CPU starvation) + spin_count++; + +#if PTO2_SPIN_VERBOSE_LOGGING + // Periodic block notification + if (spin_count % PTO2_BLOCK_NOTIFY_INTERVAL == 0 && spin_count < PTO2_FLOW_CONTROL_SPIN_LIMIT) { + int32_t last_alive = PTO2_LOAD_ACQUIRE(last_alive_ptr); + int32_t active_count = current_index - last_alive; + LOG_WARN("[TaskRing] BLOCKED (Flow Control): current=%d, last_alive=%d, " + "active=%d/%d (%.1f%%), spins=%d", + current_index, last_alive, active_count, window_size, + 100.0 * active_count / window_size, spin_count); + notified = true; + } +#endif + + // Check for potential deadlock + if (spin_count >= PTO2_FLOW_CONTROL_SPIN_LIMIT) { + int32_t last_alive = PTO2_LOAD_ACQUIRE(last_alive_ptr); + int32_t active_count = current_index - last_alive; + + LOG_ERROR("========================================"); + LOG_ERROR("FATAL: Flow Control Deadlock Detected!"); + LOG_ERROR("========================================"); + LOG_ERROR("Task Ring is FULL and no progress after %d spins.", spin_count); + LOG_ERROR("Flow Control Status:"); + LOG_ERROR(" - Current task index: %d", current_index); + LOG_ERROR(" - Last task alive: %d", last_alive); + LOG_ERROR(" - Active tasks: %d", active_count); + LOG_ERROR(" - Window size: %d", window_size); + LOG_ERROR(" - Window utilization: %.1f%%", 100.0 * active_count / window_size); + LOG_ERROR("Root Cause:"); + LOG_ERROR(" Tasks cannot transition to CONSUMED state because:"); + LOG_ERROR(" - fanout_count includes 1 for the owning scope"); + LOG_ERROR(" - scope_end() requires orchestrator to continue"); + LOG_ERROR(" - But orchestrator is blocked waiting for task ring space"); + LOG_ERROR(" This creates a circular dependency (deadlock)."); + LOG_ERROR("Solution:"); + LOG_ERROR(" Current task_window_size: %d", window_size); + LOG_ERROR(" Default PTO2_TASK_WINDOW_SIZE: %d", PTO2_TASK_WINDOW_SIZE); + LOG_ERROR(" Recommended: %d (at least 2x current active tasks)", active_count * 2); + LOG_ERROR(" Option 1: Change PTO2_TASK_WINDOW_SIZE in pto_runtime2_types.h"); + LOG_ERROR(" Option 2: Use pto2_runtime_create_threaded_custom() with larger"); + LOG_ERROR(" task_window_size parameter."); + LOG_ERROR("========================================"); + + // Abort program + exit(1); + } + + PTO2_SPIN_PAUSE(); + } + } + + /** + * Try to allocate task slot without stalling + * + * @return Task ID, or -1 if window is full + */ + int32_t pto2_task_ring_try_alloc() { + // Read latest last_task_alive from shared memory + int32_t last_alive = PTO2_LOAD_ACQUIRE(last_alive_ptr); + int32_t current = current_index; + + // Calculate number of active tasks (handles wrap-around) + int32_t active_count = current - last_alive; + + // Check if there's room for one more task + // Leave at least 1 slot empty to distinguish full from empty + if (active_count < window_size - 1) { + int32_t task_id = current; + int32_t slot = task_id & (window_size - 1); + + // Mark slot as occupied (skip full memset — pto2_submit_task + // explicitly initializes all fields it needs) + PTO2TaskDescriptor* task = &descriptors[slot]; + task->task_id = task_id; + task->is_active = true; + + // Advance current index + current_index = current + 1; + + return task_id; + } + + // Window is full + return -1; + } }; /** @@ -123,25 +365,6 @@ struct PTO2TaskRing { void pto2_task_ring_init(PTO2TaskRing* ring, PTO2TaskDescriptor* descriptors, int32_t window_size, volatile int32_t* last_alive_ptr); -/** - * Allocate a task slot from task ring - * - * May STALL (spin-wait) if window is full (back-pressure). - * Initializes the task descriptor to default values. - * - * @param ring Task ring - * @return Allocated task ID (absolute, not wrapped) - */ -int32_t pto2_task_ring_alloc(PTO2TaskRing* ring); - -/** - * Try to allocate task slot without stalling - * - * @param ring Task ring - * @return Task ID, or -1 if window is full - */ -int32_t pto2_task_ring_try_alloc(PTO2TaskRing* ring); - /** * Get number of active tasks in window */ diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h b/src/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h index f0ad1c389..1bd521fe0 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h @@ -266,8 +266,6 @@ struct PTO2TaskDescriptor { // Packed output buffer (all outputs packed into single contiguous buffer) void* packed_buffer_base; // Start of packed buffer in GM Heap void* packed_buffer_end; // End of packed buffer (for heap reclamation) - int32_t output_index[PTO2_MAX_OUTPUTS]; // Offset of each output in params; - int32_t num_outputs; // Number of output buffers // Status flags bool is_active; // Task slot is in use diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp b/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp index 6b5752fd8..34c922e2d 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp @@ -1,8 +1,8 @@ /** * PTO Runtime2 - Scheduler Implementation - * + * * Implements scheduler state management, ready queues, and task lifecycle. - * + * * Based on: docs/runtime_buffer_manager_methods.md */ @@ -41,6 +41,7 @@ bool pto2_ready_queue_init(PTO2ReadyQueue* queue, uint64_t capacity) { queue->tail = 0; queue->capacity = capacity; queue->count = 0; + queue->spinlock = 0; return true; } @@ -59,26 +60,35 @@ void pto2_ready_queue_reset(PTO2ReadyQueue* queue) { } bool pto2_ready_queue_push(PTO2ReadyQueue* queue, int32_t task_id) { - if (pto2_ready_queue_full(queue)) { - return false; + while (__atomic_exchange_n(&queue->spinlock, 1, __ATOMIC_ACQUIRE)) { + PTO2_SPIN_PAUSE_LIGHT(); } - - queue->task_ids[queue->tail] = task_id; - queue->tail = (queue->tail + 1) % queue->capacity; - queue->count++; - - return true; + + bool result = false; + if (!pto2_ready_queue_full(queue)) { + queue->task_ids[queue->tail] = task_id; + queue->tail = (queue->tail + 1) % queue->capacity; + queue->count++; + result = true; + } + + __atomic_store_n(&queue->spinlock, 0, __ATOMIC_RELEASE); + return result; } int32_t pto2_ready_queue_pop(PTO2ReadyQueue* queue) { - if (pto2_ready_queue_empty(queue)) { - return -1; + while (__atomic_exchange_n(&queue->spinlock, 1, __ATOMIC_ACQUIRE)) { + PTO2_SPIN_PAUSE_LIGHT(); + } + + int32_t task_id = -1; + if (!pto2_ready_queue_empty(queue)) { + task_id = queue->task_ids[queue->head]; + queue->head = (queue->head + 1) % queue->capacity; + queue->count--; } - - int32_t task_id = queue->task_ids[queue->head]; - queue->head = (queue->head + 1) % queue->capacity; - queue->count--; - + + __atomic_store_n(&queue->spinlock, 0, __ATOMIC_RELEASE); return task_id; } @@ -95,35 +105,35 @@ bool pto2_scheduler_init(PTO2SchedulerState* sched, sched->sm_handle = sm_handle; sched->dep_pool = dep_pool; sched->heap_base = heap_base; - + // Get runtime task_window_size from shared memory header uint64_t window_size = sm_handle->header->task_window_size; sched->task_window_size = window_size; sched->task_window_mask = window_size - 1; // For fast modulo (window_size must be power of 2) - + // Initialize local copies of ring pointers sched->last_task_alive = 0; sched->heap_tail = 0; - + // Allocate per-task state arrays (dynamically sized based on runtime window_size) sched->task_state = (PTO2TaskState*)calloc(window_size, sizeof(PTO2TaskState)); if (!sched->task_state) { return false; } - + sched->fanin_refcount = (int32_t*)calloc(window_size, sizeof(int32_t)); if (!sched->fanin_refcount) { free(sched->task_state); return false; } - + sched->fanout_refcount = (int32_t*)calloc(window_size, sizeof(int32_t)); if (!sched->fanout_refcount) { free(sched->fanin_refcount); free(sched->task_state); return false; } - + // Initialize ready queues for (int i = 0; i < PTO2_NUM_WORKER_TYPES; i++) { if (!pto2_ready_queue_init(&sched->ready_queues[i], PTO2_READY_QUEUE_SIZE)) { @@ -137,7 +147,7 @@ bool pto2_scheduler_init(PTO2SchedulerState* sched, return false; } } - + return true; } @@ -146,17 +156,17 @@ void pto2_scheduler_destroy(PTO2SchedulerState* sched) { free(sched->task_state); sched->task_state = NULL; } - + if (sched->fanin_refcount) { free(sched->fanin_refcount); sched->fanin_refcount = NULL; } - + if (sched->fanout_refcount) { free(sched->fanout_refcount); sched->fanout_refcount = NULL; } - + for (int i = 0; i < PTO2_NUM_WORKER_TYPES; i++) { pto2_ready_queue_destroy(&sched->ready_queues[i]); } @@ -166,60 +176,24 @@ void pto2_scheduler_reset(PTO2SchedulerState* sched) { sched->last_task_alive = 0; sched->heap_tail = 0; - memset(sched->task_state, 0, PTO2_TASK_WINDOW_SIZE * sizeof(PTO2TaskState)); - memset(sched->fanin_refcount, 0, PTO2_TASK_WINDOW_SIZE * sizeof(int32_t)); - memset(sched->fanout_refcount, 0, PTO2_TASK_WINDOW_SIZE * sizeof(int32_t)); - + memset(sched->task_state, 0, sched->task_window_size * sizeof(PTO2TaskState)); + memset(sched->fanin_refcount, 0, sched->task_window_size * sizeof(int32_t)); + memset(sched->fanout_refcount, 0, sched->task_window_size * sizeof(int32_t)); + for (int i = 0; i < PTO2_NUM_WORKER_TYPES; i++) { pto2_ready_queue_reset(&sched->ready_queues[i]); } - + sched->tasks_completed = 0; sched->tasks_consumed = 0; } -// ============================================================================= -// Task State Management -// ============================================================================= - -void pto2_scheduler_init_task(PTO2SchedulerState* sched, int32_t task_id, - PTO2TaskDescriptor* task) { - int32_t slot = pto2_task_slot(sched, task_id); - - // Initialize scheduler state for this task - sched->task_state[slot] = PTO2_TASK_PENDING; - sched->fanin_refcount[slot] = 0; - sched->fanout_refcount[slot] = 0; - - // Check if task is immediately ready (no dependencies) - if (task->fanin_count == 0) { - sched->task_state[slot] = PTO2_TASK_READY; - pto2_ready_queue_push(&sched->ready_queues[task->worker_type], task_id); - } -} - -void pto2_scheduler_check_ready(PTO2SchedulerState* sched, int32_t task_id, - PTO2TaskDescriptor* task) { - int32_t slot = pto2_task_slot(sched, task_id); - - // Only transition PENDING -> READY - if (sched->task_state[slot] != PTO2_TASK_PENDING) { - return; - } - - // Check if all producers have completed - if (sched->fanin_refcount[slot] == task->fanin_count) { - sched->task_state[slot] = PTO2_TASK_READY; - pto2_ready_queue_push(&sched->ready_queues[task->worker_type], task_id); - } -} - void pto2_scheduler_mark_running(PTO2SchedulerState* sched, int32_t task_id) { - int32_t slot = pto2_task_slot(sched, task_id); + int32_t slot = sched->pto2_task_slot(task_id); sched->task_state[slot] = PTO2_TASK_RUNNING; } -int32_t pto2_scheduler_get_ready_task(PTO2SchedulerState* sched, +int32_t pto2_scheduler_get_ready_task(PTO2SchedulerState* sched, PTO2WorkerType worker_type) { return pto2_ready_queue_pop(&sched->ready_queues[worker_type]); } @@ -230,21 +204,21 @@ int32_t pto2_scheduler_get_ready_task(PTO2SchedulerState* sched, /** * Check if task can transition to CONSUMED and handle if so - * + * * NOTE: fanout_refcount is accessed atomically because it can be modified * by both orchestrator thread (via scope_end) and scheduler thread (via task_complete). */ -static void check_and_handle_consumed(PTO2SchedulerState* sched, +static void check_and_handle_consumed(PTO2SchedulerState* sched, int32_t task_id, PTO2TaskDescriptor* task) { - int32_t slot = pto2_task_slot(sched, task_id); - + int32_t slot = sched->pto2_task_slot(task_id); + // Read fanout_count (set by orchestrator, only grows) int32_t fanout_count = __atomic_load_n(&task->fanout_count, __ATOMIC_ACQUIRE); - + // Read fanout_refcount atomically (modified by both orchestrator and scheduler threads) - int32_t refcount = __atomic_load_n(&sched->fanout_refcount[slot], __ATOMIC_SEQ_CST); - + int32_t refcount = __atomic_load_n(&sched->fanout_refcount[slot], __ATOMIC_ACQUIRE); + if (refcount != fanout_count) { return; // Not all references released yet } @@ -253,69 +227,69 @@ static void check_and_handle_consumed(PTO2SchedulerState* sched, // This prevents multiple threads from transitioning the same task PTO2TaskState expected = PTO2_TASK_COMPLETED; if (!__atomic_compare_exchange_n(&sched->task_state[slot], &expected, PTO2_TASK_CONSUMED, - false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { + false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) { // CAS failed - either not COMPLETED or another thread already transitioned return; } - + // Successfully transitioned to CONSUMED - __atomic_fetch_add(&sched->tasks_consumed, 1, __ATOMIC_SEQ_CST); - + __atomic_fetch_add(&sched->tasks_consumed, 1, __ATOMIC_RELAXED); + // Reset refcounts for slot reuse (ring buffer will reuse this slot) // Use atomic store for fanout_refcount - __atomic_store_n(&sched->fanout_refcount[slot], 0, __ATOMIC_SEQ_CST); - __atomic_store_n(&sched->fanin_refcount[slot], 0, __ATOMIC_SEQ_CST); - + __atomic_store_n(&sched->fanout_refcount[slot], 0, __ATOMIC_RELEASE); + __atomic_store_n(&sched->fanin_refcount[slot], 0, __ATOMIC_RELEASE); + // Try to advance ring pointers if (task_id == sched->last_task_alive) { - pto2_scheduler_advance_ring_pointers(sched); + pto2_scheduler_advance_ring_pointers(sched); // RISK: Multiple entries } } void pto2_scheduler_on_task_complete(PTO2SchedulerState* sched, int32_t task_id) { - int32_t slot = pto2_task_slot(sched, task_id); + int32_t slot = sched->pto2_task_slot(task_id); PTO2TaskDescriptor* task = pto2_sm_get_task(sched->sm_handle, task_id); - - // Mark task as completed - sched->task_state[slot] = PTO2_TASK_COMPLETED; - sched->tasks_completed++; - - // === STEP 1: Update fanin_refcount of all consumers === - // Read fanout_list and increment each consumer's fanin_refcount + + // === STEP 1: Mark COMPLETED and snapshot fanout_head under lock === + // Acquire fanout_lock to safely read fanout_head (orchestrator may be appending). + // Release lock EARLY: once COMPLETED is visible, orchestrator's Step 5 will + // skip this producer (prod_state >= COMPLETED), so no new entries can be + // appended to the fanout list. Traversal outside the lock is safe. + pto2_fanout_lock(task); + __atomic_store_n(&sched->task_state[slot], PTO2_TASK_COMPLETED, __ATOMIC_RELEASE); + __atomic_fetch_add(&sched->tasks_completed, 1, __ATOMIC_RELAXED); int32_t fanout_head = PTO2_LOAD_ACQUIRE(&task->fanout_head); + pto2_fanout_unlock(task); + + // Traverse fanout chain OUTSIDE the lock to avoid blocking orchestrator int32_t current = fanout_head; - while (current > 0) { PTO2DepListEntry* entry = pto2_dep_pool_get(sched->dep_pool, current); if (!entry) break; - + int32_t consumer_id = entry->task_id; - int32_t consumer_slot = pto2_task_slot(sched, consumer_id); PTO2TaskDescriptor* consumer = pto2_sm_get_task(sched->sm_handle, consumer_id); - - // Increment consumer's fanin_refcount - sched->fanin_refcount[consumer_slot]++; - - // Check if consumer is now ready - pto2_scheduler_check_ready(sched, consumer_id, consumer); - + + // Atomically increment consumer's fanin_refcount and check if consumer is now ready + sched->release_fanin_and_check_ready(consumer_id, consumer); + current = entry->next_offset; } - + // === STEP 2: Update fanout_refcount of all producers === // This task is a consumer of its fanin producers - release references current = task->fanin_head; - + while (current > 0) { PTO2DepListEntry* entry = pto2_dep_pool_get(sched->dep_pool, current); if (!entry) break; - + int32_t producer_id = entry->task_id; pto2_scheduler_release_producer(sched, producer_id); - + current = entry->next_offset; } - + // === STEP 3: Check if this task can transition to CONSUMED === check_and_handle_consumed(sched, task_id, task); } @@ -328,12 +302,12 @@ void pto2_scheduler_on_scope_end(PTO2SchedulerState* sched, } void pto2_scheduler_release_producer(PTO2SchedulerState* sched, int32_t producer_id) { - int32_t slot = pto2_task_slot(sched, producer_id); + int32_t slot = sched->pto2_task_slot(producer_id); PTO2TaskDescriptor* producer = pto2_sm_get_task(sched->sm_handle, producer_id); - + // Increment fanout_refcount atomically (called from both orchestrator and scheduler threads) - __atomic_fetch_add(&sched->fanout_refcount[slot], 1, __ATOMIC_SEQ_CST); - + __atomic_fetch_add(&sched->fanout_refcount[slot], 1, __ATOMIC_ACQ_REL); + // Check if producer can transition to CONSUMED check_and_handle_consumed(sched, producer_id, producer); } @@ -345,28 +319,28 @@ void pto2_scheduler_release_producer(PTO2SchedulerState* sched, int32_t producer void pto2_scheduler_advance_ring_pointers(PTO2SchedulerState* sched) { PTO2SharedMemoryHeader* header = sched->sm_handle->header; int32_t current_task_index = PTO2_LOAD_ACQUIRE(&header->current_task_index); - + // Advance last_task_alive while tasks at that position are CONSUMED while (sched->last_task_alive < current_task_index) { - int32_t slot = pto2_task_slot(sched, sched->last_task_alive); - + int32_t slot = sched->pto2_task_slot(sched->last_task_alive); + if (sched->task_state[slot] != PTO2_TASK_CONSUMED) { break; // Found non-consumed task, stop advancing } - + sched->last_task_alive++; } - + // Update heap_tail based on last consumed task's buffer if (sched->last_task_alive > 0) { int32_t last_consumed_id = sched->last_task_alive - 1; PTO2TaskDescriptor* last_consumed = pto2_sm_get_task(sched->sm_handle, last_consumed_id); - + if (last_consumed->packed_buffer_end != NULL) { sched->heap_tail = (uint64_t)((char*)last_consumed->packed_buffer_end - (char*)sched->heap_base); } } - + // Write to shared memory for orchestrator flow control pto2_scheduler_sync_to_sm(sched); } @@ -386,13 +360,13 @@ void pto2_scheduler_sync_to_sm(PTO2SchedulerState* sched) { bool pto2_scheduler_is_done(PTO2SchedulerState* sched) { PTO2SharedMemoryHeader* header = sched->sm_handle->header; - + // Check if orchestrator has finished int32_t orch_done = PTO2_LOAD_ACQUIRE(&header->orchestrator_done); if (!orch_done) { return false; } - + // Check if all tasks have been consumed int32_t current_task_index = PTO2_LOAD_ACQUIRE(&header->current_task_index); return sched->last_task_alive >= current_task_index; @@ -413,13 +387,13 @@ void pto2_scheduler_print_stats(PTO2SchedulerState* sched) { void pto2_scheduler_print_queues(PTO2SchedulerState* sched) { LOG_INFO("=== Ready Queues ==="); - + const char* worker_names[] = {"CUBE", "VECTOR", "AI_CPU", "ACCELERATOR"}; - + for (int i = 0; i < PTO2_NUM_WORKER_TYPES; i++) { LOG_INFO(" %s: count=%" PRIu64, worker_names[i], pto2_ready_queue_count(&sched->ready_queues[i])); } - + LOG_INFO("===================="); } diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h b/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h index e43604f57..1a294d410 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h @@ -36,8 +36,15 @@ typedef struct { uint64_t tail; // Enqueue position uint64_t capacity; // Queue capacity uint64_t count; // Current number of tasks in queue + int32_t spinlock; // Spinlock for thread-safe push/pop } PTO2ReadyQueue; +/** + * Push task to ready queue + * @return true if successful, false if queue is full + */ +bool pto2_ready_queue_push(PTO2ReadyQueue* queue, int32_t task_id); + // ============================================================================= // Scheduler State // ============================================================================= @@ -48,7 +55,7 @@ typedef struct { * Contains dynamic state updated during task execution. * Separated from shared memory for cache efficiency. */ -typedef struct PTO2SchedulerState { +struct PTO2SchedulerState { // Shared memory access PTO2SharedMemoryHandle* sm_handle; @@ -81,15 +88,61 @@ typedef struct PTO2SchedulerState { int64_t tasks_consumed; int64_t total_dispatch_cycles; -} PTO2SchedulerState; -/** - * Calculate task slot from task_id - * Uses runtime window_size instead of compile-time constant - */ -static inline int32_t pto2_task_slot(PTO2SchedulerState* sched, int32_t task_id) { - return task_id & sched->task_window_mask; -} + + /** + * Calculate task slot from task_id + * Uses runtime window_size instead of compile-time constant + */ + inline int32_t pto2_task_slot(int32_t task_id) { + return task_id & task_window_mask; + } + + // ============================================================================= + // Task State Management + // ============================================================================= + + /** + * Signal that one fanin dependency has been satisfied + * + * Atomically increments fanin_refcount. If the new count equals + * fanin_count, CAS PENDING -> READY and enqueue. + * + * @param task_id Task ID + * @param task Task descriptor + */ + void release_fanin_and_check_ready(int32_t task_id, PTO2TaskDescriptor* task) { + int32_t slot = pto2_task_slot(task_id); + + // Atomically increment fanin_refcount and check if all producers are done + // ACQ_REL on fanin_refcount already synchronizes with the orchestrator's + // release in init_task, making fanin_count visible — plain load suffices. + int32_t new_refcount = __atomic_fetch_add(&fanin_refcount[slot], 1, __ATOMIC_ACQ_REL) + 1; + + // Check if all producers have completed + if (new_refcount == task->fanin_count) { + // CAS PENDING -> READY to prevent double-enqueue from concurrent threads + PTO2TaskState expected = PTO2_TASK_PENDING; + if (__atomic_compare_exchange_n(&task_state[slot], &expected, PTO2_TASK_READY, + false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) { + pto2_ready_queue_push(&ready_queues[task->worker_type], task_id); + } + } + } + + void init_task(int32_t task_id, PTO2TaskDescriptor* task) { + int32_t slot = pto2_task_slot(task_id); + + task_state[slot] = PTO2_TASK_PENDING; // Orchestrator is the unique owner + + // Reset fanout_refcount for new task lifecycle. + // Do NOT reset fanin_refcount — it may have been incremented by + // concurrent on_task_complete between Step 5 and Step 6. + fanout_refcount[slot] = 0; + + release_fanin_and_check_ready(task_id, task); + } +}; // ============================================================================= // Scheduler API @@ -138,12 +191,6 @@ void pto2_ready_queue_destroy(PTO2ReadyQueue* queue); */ void pto2_ready_queue_reset(PTO2ReadyQueue* queue); -/** - * Push task to ready queue - * @return true if successful, false if queue is full - */ -bool pto2_ready_queue_push(PTO2ReadyQueue* queue, int32_t task_id); - /** * Pop task from ready queue * @return task_id, or -1 if queue is empty @@ -175,31 +222,6 @@ static inline uint64_t pto2_ready_queue_count(PTO2ReadyQueue* queue) { // Task State Management // ============================================================================= -/** - * Initialize task in scheduler (called when task is submitted) - * - * Sets task state to PENDING or READY based on fanin_count. - * - * @param sched Scheduler state - * @param task_id Task ID - * @param task Task descriptor (from shared memory) - */ -void pto2_scheduler_init_task(PTO2SchedulerState* sched, int32_t task_id, - PTO2TaskDescriptor* task); - -/** - * Check if task should transition to READY - * - * Called after fanin_refcount is updated. - * If fanin_refcount == fanin_count, moves task to READY and enqueues. - * - * @param sched Scheduler state - * @param task_id Task ID - * @param task Task descriptor - */ -void pto2_scheduler_check_ready(PTO2SchedulerState* sched, int32_t task_id, - PTO2TaskDescriptor* task); - /** * Mark task as RUNNING (dispatched to worker) */ diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp b/src/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp index 2c4453b38..4d948b31a 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp @@ -21,7 +21,6 @@ #include "common.h" #include "common/unified_log.h" #include "pto_orchestrator.h" -#include "tensor.h" // ============================================================================= // TensorMap Lookup Chain Length Statistics (compile-time toggle) @@ -39,380 +38,136 @@ static uint64_t g_insert_count = 0; // Initialization and Destruction // ============================================================================= -bool pto2_tensormap_init(PTO2TensorMap* tm, int32_t num_buckets, int32_t pool_size) { +bool PTO2TensorMap::init(int32_t new_num_buckets, int32_t new_pool_size) { // Validate power of 2 for fast modulo - if ((num_buckets & (num_buckets - 1)) != 0) { + if ((new_num_buckets & (new_num_buckets - 1)) != 0) { return false; // num_buckets must be power of 2 } // Allocate buckets - tm->buckets = (int32_t*)malloc(num_buckets * sizeof(int32_t)); - if (!tm->buckets) { + buckets = (PTO2TensorMapEntry**)malloc(new_num_buckets * sizeof(PTO2TensorMapEntry*)); + if (!buckets) { return false; } // Initialize all buckets to empty (-1) - for (int32_t i = 0; i < num_buckets; i++) { - tm->buckets[i] = -1; + for (int32_t i = 0; i < new_num_buckets; i++) { + buckets[i] = nullptr; } - tm->num_buckets = num_buckets; + num_buckets = new_num_buckets; // Allocate entry pool - tm->entry_pool = (PTO2TensorMapEntry*)calloc(pool_size, sizeof(PTO2TensorMapEntry)); - if (!tm->entry_pool) { - free(tm->buckets); - tm->buckets = NULL; + entry_pool = (PTO2TensorMapEntry*)calloc(new_pool_size, sizeof(PTO2TensorMapEntry)); + if (!entry_pool) { + free(buckets); + buckets = NULL; return false; } // Allocate free entry list - tm->free_entry_list = (int32_t*)calloc(pool_size, sizeof(int32_t)); - if (!tm->free_entry_list) { - free(tm->buckets); - free(tm->entry_pool); - tm->buckets = NULL; - tm->entry_pool = NULL; + free_entry_list = (PTO2TensorMapEntry**)calloc(new_pool_size, sizeof(PTO2TensorMapEntry*)); + if (!free_entry_list) { + free(buckets); + free(entry_pool); + buckets = NULL; + entry_pool = NULL; return false; } - tm->pool_size = pool_size; - tm->next_entry_idx = 0; - tm->free_num = 0; + pool_size = new_pool_size; + next_entry_idx = 0; + free_num = 0; // Initialize all entries as not in bucket for (int32_t i = 0; i < pool_size; i++) { - tm->entry_pool[i].in_bucket = false; - tm->entry_pool[i].next_in_bucket = -1; - tm->entry_pool[i].prev_in_bucket = -1; - tm->entry_pool[i].next_in_task = -1; - tm->entry_pool[i].prev_in_task = -1; - tm->entry_pool[i].producer_task_id = -1; + entry_pool[i].bucket_index = -1; + entry_pool[i].next_in_bucket = nullptr; + entry_pool[i].prev_in_bucket = nullptr; + entry_pool[i].next_in_task = nullptr; + entry_pool[i].prev_in_task = nullptr; + entry_pool[i].producer_task_id = -1; } // Allocate per-task entry tracking - tm->task_entry_head = (int32_t*)malloc(PTO2_TASK_WINDOW_SIZE * sizeof(int32_t)); - if (!tm->task_entry_head) { - free(tm->entry_pool); - free(tm->buckets); - free(tm->free_entry_list); - tm->entry_pool = NULL; - tm->buckets = NULL; - tm->free_entry_list = NULL; + task_entry_head = (PTO2TensorMapEntry**)malloc(new_pool_size * sizeof(PTO2TensorMapEntry*)); + if (!task_entry_head) { + free(entry_pool); + free(buckets); + free(free_entry_list); + entry_pool = NULL; + buckets = NULL; + free_entry_list = NULL; return false; } // Initialize all task entry heads to -1 (no entries) for (int32_t i = 0; i < PTO2_TASK_WINDOW_SIZE; i++) { - tm->task_entry_head[i] = -1; + task_entry_head[i] = nullptr; } - tm->last_task_alive = 0; + last_task_alive = 0; return true; } -bool pto2_tensormap_init_default(PTO2TensorMap* tm) { - return pto2_tensormap_init(tm, PTO2_TENSORMAP_NUM_BUCKETS, PTO2_TENSORMAP_POOL_SIZE); +bool PTO2TensorMap::init_default() { + return init(PTO2_TENSORMAP_NUM_BUCKETS, PTO2_TENSORMAP_POOL_SIZE); } -void pto2_tensormap_destroy(PTO2TensorMap* tm) { - if (tm->buckets) { - free(tm->buckets); - tm->buckets = NULL; +void PTO2TensorMap::destroy() { + if (buckets) { + free(buckets); + buckets = NULL; + } + + if (entry_pool) { + free(entry_pool); + entry_pool = NULL; } - if (tm->entry_pool) { - free(tm->entry_pool); - tm->entry_pool = NULL; + if (task_entry_head) { + free(task_entry_head); + task_entry_head = NULL; } - if (tm->task_entry_head) { - free(tm->task_entry_head); - tm->task_entry_head = NULL; + if (free_entry_list) { + free(free_entry_list); + free_entry_list = NULL; } } -void pto2_tensormap_reset(PTO2TensorMap* tm) { +void PTO2TensorMap::reset() { // Reset all buckets to empty - for (int32_t i = 0; i < tm->num_buckets; i++) { - tm->buckets[i] = -1; + for (int32_t i = 0; i < num_buckets; i++) { + buckets[i] = nullptr; } // Reset all entries - for (int32_t i = 0; i < tm->pool_size; i++) { - tm->entry_pool[i].in_bucket = false; - tm->entry_pool[i].next_in_bucket = -1; - tm->entry_pool[i].prev_in_bucket = -1; - tm->entry_pool[i].next_in_task = -1; - tm->entry_pool[i].prev_in_task = -1; - tm->entry_pool[i].producer_task_id = -1; + for (int32_t i = 0; i < pool_size; i++) { + entry_pool[i].bucket_index = -1; + entry_pool[i].next_in_bucket = nullptr; + entry_pool[i].prev_in_bucket = nullptr; + entry_pool[i].next_in_task = nullptr; + entry_pool[i].prev_in_task = nullptr; + entry_pool[i].producer_task_id = -1; } // Reset per-task entry tracking - for (int32_t i = 0; i < PTO2_TASK_WINDOW_SIZE; i++) { - tm->task_entry_head[i] = -1; - } - - tm->next_entry_idx = 0; - tm->free_num = 0; - tm->last_task_alive = 0; -} - -// ============================================================================= -// Hash Function -// ============================================================================= - -uint32_t pto2_tensormap_hash(PTO2TensorMap* tm, const Tensor& tensor) { - // ======================================================================== - // CRITICAL: Hash ONLY by base_ptr for correct overlap detection! - // ======================================================================== - // - // For overlap detection to work, ALL regions accessing the same base - // tensor MUST be in the SAME hash bucket. This allows lookup to find - // and check all potentially overlapping regions. - // - // If we included offset in the hash, overlapping regions with different - // offsets would end up in different buckets and never be compared: - // Region A: base=X, offset=0 → bucket 5 - // Region B: base=X, offset=128 → bucket 12 (WRONG! Can't detect overlap) - // - // With base_ptr-only hash: - // Region A: base=X, offset=0 → bucket 5 - // Region B: base=X, offset=128 → bucket 5 (CORRECT! Same bucket) - // - uint64_t key = (uint64_t)(uintptr_t)tensor.data().buffer.addr; - - // Improve distribution by mixing bits (pointers often have aligned low bits) - key = key ^ (key >> 16); - key = key ^ (key >> 32); - - // Use bitwise AND for power-of-2 modulo (faster than %) - return (uint32_t)(key & (tm->num_buckets - 1)); -} - -// ============================================================================= -// Validity and Cleanup -// ============================================================================= - -void pto2_tensormap_sync_validity(PTO2TensorMap* tm, int32_t last_task_alive) { tm->last_task_alive = last_task_alive; } - -void pto2_tensormap_remove_entry(PTO2TensorMap& tm, int32_t entry_idx) { - pto2_tensormap_remove_from_task(&tm, entry_idx); - tm.free_entry(entry_idx); -} - -void pto2_tensormap_remove_from_task(PTO2TensorMap* tm, int32_t entry_idx) { - auto entry = &tm->entry_pool[entry_idx]; - // Update predecessor's next pointer (O(1) via prev_in_task) - if (entry->prev_in_task == -1) { - // Entry is the head of its task chain, update task_entry_head - int32_t task_slot = entry->producer_task_id & (PTO2_TASK_WINDOW_SIZE - 1); - tm->task_entry_head[task_slot] = entry->next_in_task; - } else { - tm->entry_pool[entry->prev_in_task].next_in_task = entry->next_in_task; - } - - // Update successor's prev pointer - if (entry->next_in_task >= 0) { - tm->entry_pool[entry->next_in_task].prev_in_task = entry->prev_in_task; - } - - entry->next_in_task = -1; - entry->prev_in_task = -1; -} - -void pto2_tensormap_cleanup_retired(PTO2TensorMap* tm, int32_t old_last_task_alive, int32_t new_last_task_alive) { - // Iterate through retired tasks and remove their entries from bucket chains - for (int32_t task_id = old_last_task_alive; task_id < new_last_task_alive; task_id++) { - int32_t task_slot = task_id & (PTO2_TASK_WINDOW_SIZE - 1); - int32_t offset = tm->task_entry_head[task_slot]; - - while (offset >= 0) { - PTO2TensorMapEntry* entry = &tm->entry_pool[offset]; - int32_t next = entry->next_in_task; // Save before clearing - // Only remove if this entry belongs to the retiring task - // (slot may have been reused by a newer task) - if (entry->producer_task_id == task_id) { - // Clear task chain pointers (entire chain is being destroyed) - tm->free_entry(offset); - } - offset = next; - } - - // Clear task's entry head (slot will be reused by task_id + TASK_WINDOW_SIZE) - tm->task_entry_head[task_slot] = -1; - } -} - -// ============================================================================= -// Lookup with Chain Truncation -// ============================================================================= - -void pto2_tensormap_lookup(PTO2TensorMap* tm, const Tensor &tensor, PTO2LookupResult* result) { - uint32_t bucket = pto2_tensormap_hash(tm, tensor); - int32_t* prev_ptr = &tm->buckets[bucket]; // For truncation - int32_t offset = *prev_ptr; - - result->count = 0; -#if PTO2_ORCH_PROFILING - int32_t chain_len = 0; -#endif - - while (offset >= 0) { - PTO2TensorMapEntry* entry = &tm->entry_pool[offset]; - - // Check validity first - if (!pto2_tensormap_entry_valid(tm, entry)) { - // ========== STALE ENTRY: Truncate chain here ========== - // All subsequent entries are guaranteed to be stale too! - // Truncate: unlink this and all following entries - *prev_ptr = -1; // Terminate chain at previous entry - - // Mark truncated entries as not in bucket (for correct reuse) - while (offset >= 0) { - PTO2TensorMapEntry* stale = &tm->entry_pool[offset]; - int32_t next = stale->next_in_bucket; - stale->in_bucket = false; - stale->next_in_bucket = -1; - stale->prev_in_bucket = -1; - offset = next; - } - -#if PTO2_ORCH_PROFILING - g_lookup_chain_total += chain_len; - g_lookup_count++; - if (chain_len > g_lookup_chain_max) g_lookup_chain_max = chain_len; -#endif - return; - } - -#if PTO2_ORCH_PROFILING - chain_len++; - g_lookup_overlap_checks++; -#endif - // Entry is valid - check if regions OVERLAP (not just exact match) - // Since we hash only by base_ptr, all entries in this bucket have - // potential to overlap. We must check actual byte-range overlap. - auto overlap_status = tensor.is_overlap(entry->tensor); - if (overlap_status != OverlapStatus::NO_OVERLAP) { - result->push(offset, overlap_status); -#if PTO2_ORCH_PROFILING - g_lookup_overlap_hits++; -#endif - } - - // Move to next entry - prev_ptr = &entry->next_in_bucket; - offset = *prev_ptr; - } - -#if PTO2_ORCH_PROFILING - g_lookup_chain_total += chain_len; - g_lookup_count++; - if (chain_len > g_lookup_chain_max) g_lookup_chain_max = chain_len; -#endif -} - -int32_t PTO2TensorMap::new_entry() { - if (free_num > 0) { - int32_t res = free_entry_list[--free_num]; - debug_assert(!entry_pool[res].in_bucket); - return res; - } - if (next_entry_idx < pool_size) { - int32_t res = next_entry_idx++; - debug_assert(!entry_pool[res].in_bucket); - return res; - } - - size_t wait_count = 0; - while (free_num == 0) { - pto2_orchestrator_sync_tensormap(this, true); - always_assert(wait_count++ <= 1000000000UL); - } - debug_assert(free_num > 0); - int32_t res = free_entry_list[--free_num]; - debug_assert(!entry_pool[res].in_bucket); - return res; -} - -void PTO2TensorMap::free_entry(int32_t entry_idx) { - auto entry = &entry_pool[entry_idx]; - if (!entry->in_bucket) { - return; // Already removed - } - - // Update predecessor's next pointer (O(1) via prev_in_bucket) - if (entry->prev_in_bucket == -1) { - // Entry is the head of its bucket chain, update bucket head - // Must compute hash BEFORE clearing tensor (tensor.data() needs valid tensor_pool) - uint32_t bucket = pto2_tensormap_hash(this, entry->tensor); - buckets[bucket] = entry->next_in_bucket; - } else { - entry_pool[entry->prev_in_bucket].next_in_bucket = entry->next_in_bucket; - } - - // Update successor's prev pointer - if (entry->next_in_bucket >= 0) { - entry_pool[entry->next_in_bucket].prev_in_bucket = entry->prev_in_bucket; + for (int32_t i = 0; i < pool_size; i++) { + task_entry_head[i] = nullptr; } - // Clear tensor AFTER bucket chain manipulation (hash computation needs valid tensor) - entry->tensor = Tensor(); - - free_entry_list[free_num++] = entry_idx; - entry->in_bucket = false; - entry->next_in_bucket = -1; - entry->prev_in_bucket = -1; - entry->next_in_task = -1; - entry->prev_in_task = -1; -} - -// ============================================================================= -// Insert -// ============================================================================= - -void pto2_tensormap_insert(PTO2TensorMap* tm, const Tensor& tensor, int32_t producer_task_id, bool with_alloc) { -#if PTO2_ORCH_PROFILING - g_insert_count++; -#endif - // Allocate entry from ring buffer pool - int32_t entry_offset = tm->new_entry(); - PTO2TensorMapEntry* entry = &tm->entry_pool[entry_offset]; - - // Initialize new entry - entry->tensor = tensor; - entry->producer_task_id = producer_task_id; - entry->with_alloc = with_alloc; - - // Insert at head of hash bucket (maintains task_id descending order) - uint32_t bucket = pto2_tensormap_hash(tm, tensor); - entry->next_in_bucket = tm->buckets[bucket]; - entry->prev_in_bucket = -1; // New head has no predecessor - // Update old head's prev pointer - if (entry->next_in_bucket >= 0) { - tm->entry_pool[entry->next_in_bucket].prev_in_bucket = entry_offset; - } - tm->buckets[bucket] = entry_offset; - entry->in_bucket = true; - - // Link to task's entry list (for cleanup) - int32_t task_slot = producer_task_id & (PTO2_TASK_WINDOW_SIZE - 1); - entry->next_in_task = tm->task_entry_head[task_slot]; - entry->prev_in_task = -1; // New head has no predecessor - // Update old head's prev pointer - if (entry->next_in_task >= 0) { - tm->entry_pool[entry->next_in_task].prev_in_task = entry_offset; - } - tm->task_entry_head[task_slot] = entry_offset; + next_entry_idx = 0; + free_num = 0; + last_task_alive = 0; } // ============================================================================= // Debug Utilities // ============================================================================= -void pto2_tensormap_print_stats(PTO2TensorMap* tm) { +void PTO2TensorMap::print_stats() { int32_t valid = 0; int32_t stale = 0; int32_t empty_buckets = 0; @@ -421,9 +176,9 @@ void pto2_tensormap_print_stats(PTO2TensorMap* tm) { int32_t non_empty_buckets = 0; // Count entries - for (int32_t i = 0; i < tm->pool_size; i++) { - if (tm->entry_pool[i].in_bucket) { - if (pto2_tensormap_entry_valid(tm, &tm->entry_pool[i])) { + for (int32_t i = 0; i < pool_size; i++) { + if (entry_pool[i].bucket_index != -1) { + if (entry_valid(entry_pool[i])) { valid++; } else { stale++; @@ -432,13 +187,13 @@ void pto2_tensormap_print_stats(PTO2TensorMap* tm) { } // Count bucket stats - for (int32_t b = 0; b < tm->num_buckets; b++) { + for (int32_t b = 0; b < num_buckets; b++) { int32_t chain_len = 0; - int32_t offset = tm->buckets[b]; + auto cur_entry = buckets[b]; - while (offset >= 0) { + while (cur_entry != nullptr) { chain_len++; - offset = tm->entry_pool[offset].next_in_bucket; + cur_entry = cur_entry->next_in_bucket; } if (chain_len == 0) { @@ -453,24 +208,24 @@ void pto2_tensormap_print_stats(PTO2TensorMap* tm) { } LOG_INFO("=== TensorMap Statistics ==="); - LOG_INFO("Pool size: %d", tm->pool_size); - LOG_INFO("Pool next entry idx: %d", tm->next_entry_idx); - LOG_INFO("Pool free_num: %d", tm->free_num); - LOG_INFO("Num buckets: %d", tm->num_buckets); + LOG_INFO("Pool size: %d", pool_size); + LOG_INFO("Pool next entry idx: %d", next_entry_idx); + LOG_INFO("Pool free_num: %d", free_num); + LOG_INFO("Num buckets: %d", num_buckets); LOG_INFO("Valid entries: %d", valid); LOG_INFO("Stale entries: %d", stale); LOG_INFO("Empty buckets: %d", empty_buckets); LOG_INFO("Max chain len: %d", max_chain); LOG_INFO("Avg chain len: %.2f", non_empty_buckets > 0 ? (float)total_chain / non_empty_buckets : 0); - LOG_INFO("Last task alive: %d", tm->last_task_alive); + LOG_INFO("Last task alive: %d", last_task_alive); LOG_INFO("============================"); } -int32_t pto2_tensormap_valid_count(PTO2TensorMap* tm) { +int32_t PTO2TensorMap::valid_count() { int32_t count = 0; - for (int32_t i = 0; i < tm->pool_size; i++) { - if (tm->entry_pool[i].in_bucket && pto2_tensormap_entry_valid(tm, &tm->entry_pool[i])) { + for (int32_t i = 0; i < pool_size; i++) { + if (entry_pool[i].bucket_index != -1 && entry_valid(entry_pool[i])) { count++; } } @@ -478,22 +233,19 @@ int32_t pto2_tensormap_valid_count(PTO2TensorMap* tm) { return count; } -// ============================================================================= -// TensorMap Synchronization -// ============================================================================= - -void pto2_orchestrator_sync_tensormap(PTO2TensorMap* tm, bool force) { - always_assert(tm->orch != nullptr); - // Read current last_task_alive from shared memory - int32_t new_last_task_alive = PTO2_LOAD_ACQUIRE(&tm->orch->sm_handle->header->last_task_alive); - - // Update TensorMap validity threshold - pto2_tensormap_sync_validity(tm, new_last_task_alive); - - // Periodically cleanup TensorMap to remove stale entries from bucket chains - if (force || (new_last_task_alive - tm->orch->tensormap_last_cleanup >= PTO2_TENSORMAP_CLEANUP_INTERVAL)) { - pto2_tensormap_cleanup_retired(tm, tm->orch->tensormap_last_cleanup, new_last_task_alive); - tm->orch->tensormap_last_cleanup = new_last_task_alive; +void PTO2TensorMap::sync_tensormap() { + constexpr int MIN_FREE_NUM = 1024; + always_assert(orch != nullptr); + while(true) { + // Read current last_task_alive from shared memory + int32_t new_last_task_alive = PTO2_LOAD_ACQUIRE(&orch->sm_handle->header->last_task_alive); + sync_validity(new_last_task_alive); + if ((pool_size - next_entry_idx + free_num < MIN_FREE_NUM) || new_last_task_alive - orch->tensormap_last_cleanup >= PTO2_TENSORMAP_CLEANUP_INTERVAL) { + cleanup_retired(orch->tensormap_last_cleanup, new_last_task_alive); + orch->tensormap_last_cleanup = new_last_task_alive; + } else { + break; + } } } diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h b/src/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h index 22c085e23..7b22c6e62 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h @@ -30,9 +30,9 @@ * Based on: docs/runtime_buffer_manager_methods.md */ -#ifndef PTO_TENSORMAP_H -#define PTO_TENSORMAP_H +#pragma once +#include "common.h" #include "pto_runtime2_types.h" #include "tensor.h" @@ -56,15 +56,38 @@ struct PTO2OrchestratorState; // forward declare * - When lookup hits stale entry, truncate rest of chain */ struct PTO2TensorMapEntry { - Tensor tensor; // Tensor descriptor key - int32_t producer_task_id; // Task that produces this region - int32_t next_in_bucket; // Offset to next entry in hash bucket (-1 = end) - int32_t prev_in_bucket; // Offset to prev entry in hash bucket (-1 = head is buckets[bucket]) - int32_t next_in_task; // Offset to next entry for same task (-1 = end) - int32_t prev_in_task; // Offset to prev entry for same task (-1 = head is task_entry_head[slot]) - bool in_bucket; // True if entry is linked in a bucket chain - // CRITICAL: Must be set false before overwriting! bool with_alloc{true}; // True if entry is task output, False if entry is task inout + int32_t producer_task_id; // Task that produces this region + PTO2TensorMapEntry* next_in_bucket; // Offset to next entry in hash bucket (-1 = end) + PTO2TensorMapEntry* prev_in_bucket; // Offset to prev entry in hash bucket (-1 = head is buckets[bucket]) + PTO2TensorMapEntry* next_in_task; // Offset to next entry for same task (-1 = end) + PTO2TensorMapEntry* prev_in_task; // Offset to prev entry for same task (-1 = head is task_entry_head[slot]) + int32_t bucket_index; // != -1 if entry is linked in a bucket chain + // CRITICAL: Must be set -1 before overwriting! + uint64_t addr; + Tensor tensor; // Tensor descriptor key +}; + +/** + * Stack-allocated lookup result (avoids heap allocation per lookup) + */ +#define PTO2_LOOKUP_MAX_RESULTS 16 +// ============================================================================= +// TensorMap Lookup Chain Length Statistics (compile-time toggle) +// ============================================================================= +struct PTO2LookupResult { + struct Entry { + PTO2TensorMapEntry* entry; + OverlapStatus overlap_status; + }; + Entry entries[PTO2_LOOKUP_MAX_RESULTS]; + int32_t count{0}; + + void push(PTO2TensorMapEntry* entry, OverlapStatus s) { + if (count < PTO2_LOOKUP_MAX_RESULTS) { + entries[count++] = {entry, s}; + } + } }; /** @@ -74,18 +97,18 @@ struct PTO2TensorMapEntry { */ struct PTO2TensorMap { // Hash table buckets (fixed size, power of 2) - int32_t* buckets; // Array of offsets into entry_pool (-1 = empty) + PTO2TensorMapEntry** buckets; // Array of offsets into entry_pool (-1 = empty) int32_t num_buckets; // Must be power of 2 for fast modulo // Entry pool as ring buffer PTO2TensorMapEntry* entry_pool; // Ring buffer of entries - int32_t* free_entry_list; // free entry ids + PTO2TensorMapEntry** free_entry_list; // free entry ids int32_t pool_size; // Total pool capacity - int32_t next_entry_idx; // id when next entry insert + int32_t next_entry_idx; // id when next entry insert int32_t free_num; // free entry number in entry pool // Per-task entry tracking (for efficient bucket cleanup) - int32_t* task_entry_head; // Per-task head offset (-1 = no entries) + PTO2TensorMapEntry** task_entry_head; // Per-task head offset (-1 = no entries) // Indexed by task_id % TASK_WINDOW_SIZE // Validity threshold (for lazy invalidation) @@ -93,155 +116,283 @@ struct PTO2TensorMap { PTO2OrchestratorState* orch{nullptr}; - int32_t new_entry(); - void free_entry(int32_t entry_idx); -}; - -// ============================================================================= -// TensorMap API -// ============================================================================= - -/** - * Initialize TensorMap - * - * @param tm TensorMap to initialize - * @param num_buckets Number of hash buckets (must be power of 2) - * @param pool_size Size of entry pool - * @return true on success, false on allocation failure - */ -bool pto2_tensormap_init(PTO2TensorMap* tm, int32_t num_buckets, int32_t pool_size); + // new_entry目前不负责分配属性,仅分配内存 + PTO2TensorMapEntry* new_entry() { + if (free_num > 0) { + PTO2TensorMapEntry* res = free_entry_list[--free_num]; + debug_assert(res->bucket_index == -1); + return res; + } + always_assert(next_entry_idx < pool_size); + PTO2TensorMapEntry* res = &entry_pool[next_entry_idx++]; + debug_assert(res->bucket_index == -1); + return res; + } -/** - * Initialize TensorMap with default sizes - */ -bool pto2_tensormap_init_default(PTO2TensorMap* tm); + void free_entry(PTO2TensorMapEntry& entry) { + always_assert(entry.bucket_index != -1); // 必须保证仍在桶中 -/** - * Destroy TensorMap and free resources - */ -void pto2_tensormap_destroy(PTO2TensorMap* tm); + // Update predecessor's next pointer (O(1) via prev_in_bucket) + if (entry.prev_in_bucket == nullptr) { + // Entry is the head of its bucket chain, update bucket head + // Must compute hash BEFORE clearing tensor (tensor.data() needs valid tensor_pool) + buckets[entry.bucket_index] = entry.next_in_bucket; + } else { + entry.prev_in_bucket->next_in_bucket = entry.next_in_bucket; + } -/** - * Reset TensorMap to empty state - */ -void pto2_tensormap_reset(PTO2TensorMap* tm); + // Update successor's prev pointer + if (entry.next_in_bucket != nullptr) { + entry.next_in_bucket->prev_in_bucket = entry.prev_in_bucket; + } -/** - * Update validity threshold from shared memory - * Called periodically to refresh the lazy invalidation threshold. - * - * @param tm TensorMap - * @param last_task_alive Current value from shared memory - */ -void pto2_tensormap_sync_validity(PTO2TensorMap* tm, int32_t last_task_alive); + // Clear tensor AFTER bucket chain manipulation (hash computation needs valid tensor) + entry.tensor = Tensor(); -/** - * Stack-allocated lookup result (avoids heap allocation per lookup) - */ -#define PTO2_LOOKUP_MAX_RESULTS 16 -struct PTO2LookupResult { - struct Entry { - int32_t entry_idx; - OverlapStatus overlap_status; - }; - Entry entries[PTO2_LOOKUP_MAX_RESULTS]; - int32_t count{0}; + free_entry_list[free_num++] = &entry; + entry.bucket_index = -1; + entry.next_in_bucket = nullptr; + entry.prev_in_bucket = nullptr; + entry.next_in_task = nullptr; + entry.prev_in_task = nullptr; + } - void push(int32_t entry_idx, OverlapStatus s) { - if (count < PTO2_LOOKUP_MAX_RESULTS) { - entries[count++] = {entry_idx, s}; + // ============================================================================= + // TensorMap API + // ============================================================================= + + /** + * Initialize TensorMap + * + * @param num_buckets Number of hash buckets (must be power of 2) + * @param pool_size Size of entry pool + * @return true on success, false on allocation failure + */ + bool init(int32_t num_buckets, int32_t pool_size); + + /** + * Initialize TensorMap with default sizes + */ + bool init_default(); + + /** + * Destroy TensorMap and free resources + */ + void destroy(); + + /** + * Reset TensorMap to empty state + */ + void reset(); + + /** + * Update validity threshold from shared memory + * Called periodically to refresh the lazy invalidation threshold. + * + * @param last_task_alive Current value from shared memory + */ + void sync_validity(int32_t last_task_alive) { this->last_task_alive = last_task_alive; } + + /** + * Lookup producer for a tensor region + * + * Searches the hash table for a matching region. + * Returns producer entry if found and valid. + * + * Chain truncation: When first stale entry is found, truncates + * the rest of the chain (all subsequent entries are also stale). + * + * @param tensor Tensor to look up + * @param result Output: stack-allocated result buffer + */ + void lookup(const Tensor& tensor, PTO2LookupResult& result) { + auto& query_tensor_data = tensor.data(); + uint32_t bucket_index = hash(query_tensor_data.buffer.addr); + PTO2TensorMapEntry** prev_ptr = &buckets[bucket_index]; // For truncation + PTO2TensorMapEntry* cur_entry = *prev_ptr; + + result.count = 0; + + while (cur_entry != nullptr) { + // Check validity first + if (!entry_valid(*cur_entry)) { + // ========== STALE ENTRY: Truncate chain here ========== + // All subsequent entries are guaranteed to be stale too! + // Truncate: unlink this and all following entries + *prev_ptr = nullptr; // Terminate chain at previous entry + + // Mark truncated entries as not in bucket (for correct reuse) + while (cur_entry != nullptr) { + PTO2TensorMapEntry* next_entry = cur_entry->next_in_bucket; + remove_entry(*cur_entry); + cur_entry = next_entry; + } + return; + } + + // Entry is valid - check if regions OVERLAP (not just exact match) + // Since we hash only by base_ptr, all entries in this bucket have + // potential to overlap. We must check actual byte-range overlap. + if (query_tensor_data.buffer.addr == cur_entry->addr) { + auto overlap_status = query_tensor_data.is_overlap(cur_entry->tensor.data()); + if (overlap_status != OverlapStatus::NO_OVERLAP) { + result.push(cur_entry, overlap_status); + } + } + + // Move to next entry + prev_ptr = &cur_entry->next_in_bucket; + cur_entry = *prev_ptr; } } -}; -/** - * Lookup producer for a tensor region - * - * Searches the hash table for a matching region. - * Returns producer entry if found and valid. - * - * Chain truncation: When first stale entry is found, truncates - * the rest of the chain (all subsequent entries are also stale). - * - * @param tm TensorMap - * @param tensor Tensor to look up - * @param result Output: stack-allocated result buffer - */ -void pto2_tensormap_lookup(PTO2TensorMap* tm, const Tensor& tensor, PTO2LookupResult* result); + /** + * Insert a new entry (called when task produces output) + * + * Allocates from ring buffer pool, may overwrite stale entries. + * Inserts at head of hash bucket chain (maintains task_id ordering). + * + * @param tensor Tensor produced + * @param producer_task_id Task ID of producer + */ + void insert(const Tensor& tensor, int32_t producer_task_id, bool with_alloc) { + // Allocate entry from ring buffer pool + PTO2TensorMapEntry* entry = new_entry(); + + // Initialize new entry + entry->tensor = tensor; + entry->producer_task_id = producer_task_id; + entry->with_alloc = with_alloc; + + // Insert at head of hash bucket (maintains task_id descending order) + entry->addr = tensor.data().buffer.addr; + entry->bucket_index = hash(entry->addr); + entry->next_in_bucket = buckets[entry->bucket_index]; + // Update old head's prev pointer + if (entry->next_in_bucket != nullptr) { + entry->next_in_bucket->prev_in_bucket = entry; + } + buckets[entry->bucket_index] = entry; + entry->prev_in_bucket = nullptr; // New head has no predecessor + + // Link to task's entry list (for cleanup) + int32_t task_slot = producer_task_id & (PTO2_TASK_WINDOW_SIZE - 1); + entry->next_in_task = task_entry_head[task_slot]; + entry->prev_in_task = nullptr; // New head has no predecessor + // Update old head's prev pointer + if (entry->next_in_task != nullptr) { + entry->next_in_task->prev_in_task = entry; + } + task_entry_head[task_slot] = entry; + } -/** - * Insert a new entry (called when task produces output) - * - * Allocates from ring buffer pool, may overwrite stale entries. - * Inserts at head of hash bucket chain (maintains task_id ordering). - * - * @param tm TensorMap - * @param tensor Tensor produced - * @param producer_task_id Task ID of producer - */ -void pto2_tensormap_insert(PTO2TensorMap* tm, const Tensor& tensor, int32_t producer_task_id, bool with_alloc); + /** + * Cleanup stale entries for retired tasks + * + * Called periodically by Orchestrator when last_task_alive advances. + * Removes entries from bucket chains for tasks in [old, new) range. + * + * @param old_last_task_alive Previous threshold + * @param new_last_task_alive New threshold + */ + void cleanup_retired(int32_t old_last_task_alive, int32_t new_last_task_alive) { + // Iterate through retired tasks and remove their entries from bucket chains + for (int32_t task_id = old_last_task_alive; task_id < new_last_task_alive; task_id++) { + int32_t task_slot = task_id & (PTO2_TASK_WINDOW_SIZE - 1); + PTO2TensorMapEntry* cur_entry = task_entry_head[task_slot]; + + while (cur_entry != nullptr) { + PTO2TensorMapEntry* next_entry = cur_entry->next_in_task; // Save before clearing + // Only remove if this entry belongs to the retiring task + // (slot may have been reused by a newer task) + debug_assert(cur_entry->producer_task_id == task_id); + free_entry(*cur_entry); + cur_entry = next_entry; + } + + // Clear task's entry head (slot will be reused by task_id + TASK_WINDOW_SIZE) + task_entry_head[task_slot] = nullptr; + } + } -/** - * Cleanup stale entries for retired tasks - * - * Called periodically by Orchestrator when last_task_alive advances. - * Removes entries from bucket chains for tasks in [old, new) range. - * - * @param tm TensorMap - * @param old_last_task_alive Previous threshold - * @param new_last_task_alive New threshold - */ -void pto2_tensormap_cleanup_retired(PTO2TensorMap* tm, int32_t old_last_task_alive, int32_t new_last_task_alive); + // ============================================================================= + // Internal Helpers (exposed for testing) + // ============================================================================= -// ============================================================================= -// Internal Helpers (exposed for testing) -// ============================================================================= + /** + * Compute hash for tensor addr + */ + uint32_t hash(uint64_t key) { + // Improve distribution by mixing bits (pointers often have aligned low bits) + key = key ^ (key >> 16); + key = key ^ (key >> 32); -/** - * Compute hash for tensor region - */ -uint32_t pto2_tensormap_hash(PTO2TensorMap* tm, Tensor* tensor); - -/** - * Check if entry is valid (producer has not retired) - */ -static inline bool pto2_tensormap_entry_valid(PTO2TensorMap* tm, PTO2TensorMapEntry* entry) { - return entry->producer_task_id >= tm->last_task_alive; -} - -void pto2_tensormap_remove_entry(PTO2TensorMap& tm, int32_t entry_idx); + // Use bitwise AND for power-of-2 modulo (faster than %) + return (uint32_t)(key & (num_buckets - 1)); + } -/** - * Remove entry from its task chain (O(1) with prev pointer) - * Called during pool wrap-around to unlink reused entries. - */ -void pto2_tensormap_remove_from_task(PTO2TensorMap* tm, int32_t entry_idx); + /** + * Check if entry is valid (producer has not retired) + */ + bool entry_valid(const PTO2TensorMapEntry& entry) const { + return entry.producer_task_id >= last_task_alive; + } -// ============================================================================= -// Debug Utilities -// ============================================================================= + void remove_entry(PTO2TensorMapEntry& entry) { + remove_from_task(entry); + free_entry(entry); + } -/** - * Print TensorMap statistics - */ -void pto2_tensormap_print_stats(PTO2TensorMap* tm); + /** + * Remove entry from its task chain (O(1) with prev pointer) + * Called during pool wrap-around to unlink reused entries. + */ + void remove_from_task(PTO2TensorMapEntry& entry) { + always_assert(entry.bucket_index != -1); // 必须保证仍在桶中 + // Update predecessor's next pointer (O(1) via prev_in_task) + if (entry.prev_in_task == nullptr) { + // Entry is the head of its task chain, update task_entry_head + int32_t task_slot = entry.producer_task_id & (PTO2_TASK_WINDOW_SIZE - 1); + task_entry_head[task_slot] = entry.next_in_task; + } else { + entry.prev_in_task->next_in_task = entry.next_in_task; + } -/** - * Get count of valid entries - */ -int32_t pto2_tensormap_valid_count(PTO2TensorMap* tm); + // Update successor's prev pointer + if (entry.next_in_task != nullptr) { + entry.next_in_task->prev_in_task = entry.prev_in_task; + } -// ============================================================================= -// TensorMap Synchronization -// ============================================================================= + entry.next_in_task = nullptr; + entry.prev_in_task = nullptr; + } -/** - * Sync TensorMap validity threshold from shared memory - * - * Called periodically to refresh the lazy invalidation threshold. - * Also triggers cleanup if threshold has advanced significantly. - */ -void pto2_orchestrator_sync_tensormap(PTO2TensorMap* tm, bool force = false); + // ============================================================================= + // Debug Utilities + // ============================================================================= + + /** + * Print TensorMap statistics + */ + void print_stats(); + + /** + * Get count of valid entries + */ + int32_t valid_count(); + + // ============================================================================= + // TensorMap Synchronization + // ============================================================================= + + /** + * Sync TensorMap validity threshold from shared memory + * + * Called periodically to refresh the lazy invalidation threshold. + * Also triggers cleanup if threshold has advanced significantly. + */ + void sync_tensormap(); +}; // ============================================================================= // TensorMap Lookup Profiling @@ -262,5 +413,3 @@ struct PTO2TensorMapProfilingData { PTO2TensorMapProfilingData pto2_tensormap_get_profiling(); #endif - -#endif // PTO_TENSORMAP_H diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/tensor_pool.h b/src/runtime/tensormap_and_ringbuffer/runtime/tensor_pool.h index 6452034ba..5a8dbe4b1 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/tensor_pool.h +++ b/src/runtime/tensormap_and_ringbuffer/runtime/tensor_pool.h @@ -65,6 +65,7 @@ struct TensorData { this->shapes[i] = shapes[i]; this->offsets[i] = offsets[i]; } + ref_count = 1; } void init(const TensorData& other) { @@ -77,6 +78,7 @@ struct TensorData { shapes[i] = other.shapes[i]; offsets[i] = other.offsets[i]; } + ref_count = 1; } void init_with_view(const TensorData& other, const uint64_t view_shapes[], const uint64_t view_offsets[]) { @@ -89,6 +91,7 @@ struct TensorData { shapes[i] = view_shapes[i]; offsets[i] = other.offsets[i] + view_offsets[i]; } + ref_count = 1; } TensorData(TensorData&& other) = delete; @@ -180,9 +183,7 @@ struct TensorData { bool is_same_memref(const TensorData& other) const { return buffer.addr == other.buffer.addr; } OverlapStatus is_overlap(const TensorData& pre_task_output) const { - if (!is_same_memref(pre_task_output)) { - return OverlapStatus::NO_OVERLAP; - } + debug_assert(is_same_memref(pre_task_output)); debug_assert(version >= pre_task_output.version); if (version > pre_task_output.version) { return OverlapStatus::OTHER; @@ -192,26 +193,20 @@ struct TensorData { // and offsets[i] + shapes[i] <= raw_shapes[i] is guaranteed by is_valid_tensor(), // so hyper-rectangle wrapping cannot happen. bool contains = true; - bool overlap = true; for (uint64_t i = 0; i < ndims; i++) { Segment input_range_dim_i{offsets[i], offsets[i] + shapes[i]}; Segment output_range_dim_i{ pre_task_output.offsets[i], pre_task_output.offsets[i] + pre_task_output.shapes[i]}; if (!input_range_dim_i.line_segment_intersection(output_range_dim_i)) { - overlap = false; - contains = false; - break; + return OverlapStatus::NO_OVERLAP; } else if (!input_range_dim_i.contains(output_range_dim_i)) { contains = false; } } if (contains) { return OverlapStatus::COVERED; - } else if (overlap) { - return OverlapStatus::OTHER; - } else { - return OverlapStatus::NO_OVERLAP; } + return OverlapStatus::OTHER; } }; @@ -237,7 +232,6 @@ struct TensorPool { always_assert(next_index < TENSOR_DATA_MAX_SIZE); index = next_index++; } - data[index].ref_count = 1; return index; }