diff --git a/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index 8bed0cbe7..21efb31ad 100644 --- a/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -63,19 +63,6 @@ constexpr int MAX_AIC_PER_THREAD = PLATFORM_MAX_AIC_PER_THREAD; constexpr int MAX_AIV_PER_THREAD = PLATFORM_MAX_AIV_PER_THREAD; constexpr int MAX_CORES_PER_THREAD = PLATFORM_MAX_CORES_PER_THREAD; -// Maximum tasks for ready queue (PTO2 mode uses shared memory task count) -constexpr int AICPU_MAX_READY_TASKS = 16384; -constexpr int AICPU_READY_MASK = AICPU_MAX_READY_TASKS - 1; -// One shard per scheduler thread: push to own shard (thread_idx % shards), pop own first + work stealing -// Runtime-configurable via env var PTO2_READY_QUEUE_SHARDS (1..MAX_AICPU_THREADS). Default=3. - -// Lightweight spinlock (avoids futex syscall overhead of std::mutex) -struct SpinLock { - std::atomic flag{0}; - void lock() { while (flag.exchange(1, std::memory_order_acquire) != 0) { PTO2_SPIN_PAUSE_LIGHT(); } } - void unlock() { flag.store(0, std::memory_order_release); } -}; - // Core information for discovery (with register address for fast dispatch) struct CoreInfo { int worker_id; // Index in runtime.workers[] @@ -84,7 +71,11 @@ struct CoreInfo { CoreType core_type; }; + +static PTO2Runtime *rt{nullptr}; + struct AicpuExecutor { + // ===== Thread management state ===== std::atomic thread_idx_{0}; std::atomic initialized_{false}; @@ -113,18 +104,7 @@ struct AicpuExecutor { // Track executing task_id per core (AICPU_TASK_INVALID = idle) int executing_task_ids_[MAX_CORES_PER_THREAD]; - // ===== N shards per type: push to own shard (thread_idx % N), pop own first + work stealing ===== - // active_shards_ is set at runtime (1..MAX_AICPU_THREADS) via env PTO2_READY_QUEUE_SHARDS - int active_shards_{3}; - SpinLock ready_queue_aic_lock_[MAX_AICPU_THREADS]; - int ready_queue_aic_[MAX_AICPU_THREADS][AICPU_MAX_READY_TASKS]; - int ready_queue_aic_head_[MAX_AICPU_THREADS]{0}; - int ready_queue_aic_tail_[MAX_AICPU_THREADS]{0}; - - SpinLock ready_queue_aiv_lock_[MAX_AICPU_THREADS]; - int ready_queue_aiv_[MAX_AICPU_THREADS][AICPU_MAX_READY_TASKS]; - int ready_queue_aiv_head_[MAX_AICPU_THREADS]{0}; - int ready_queue_aiv_tail_[MAX_AICPU_THREADS]{0}; + // ===== Task queue state (managed by scheduler ready queues) ===== // Task execution tracking std::atomic completed_tasks_{0}; @@ -133,16 +113,8 @@ struct AicpuExecutor { // Device orchestration: set by Thread 3 when graph is built; workers wait for it std::atomic orchestrator_done_{false}; std::atomic pto2_init_done_{false}; + std::atomic runtime_init_ready_{false}; std::atomic pto2_init_complete_{false}; // init block finished; others wait for this - std::atomic next_scan_index_{0}; - std::atomic sm_header_ready_{false}; // Thread 3 sets after SM header init - std::atomic orch_pointers_ready_{false}; // Thread 3 sets after aicpu parallel mode pointers + orch_ready_queue are configured - - // Orchestrator ready queue pointers (set by Thread 3, read by scheduler threads) - volatile int32_t* orch_ready_queue_{nullptr}; - volatile int32_t* orch_ready_tail_{nullptr}; - volatile int32_t* orch_ready_head_{nullptr}; - int32_t orch_ready_capacity_{0}; // Orchestration SO handle - defer dlclose until all tasks complete void* orch_so_handle_{nullptr}; @@ -162,69 +134,15 @@ struct AicpuExecutor { void deinit(); void diagnose_stuck_state(Runtime* runtime, int thread_idx, const int* cur_thread_cores, int core_num, Handshake* hank); - -private: - // Helper: enqueue a ready task to the appropriate shard with profiling - inline void enqueue_ready_task_with_profiling( - int32_t task_id, - int32_t worker_type, - int thread_idx -#if PTO2_ORCH_PROFILING - , uint64_t& wait_counter, - uint64_t& hold_counter -#endif - ); }; static AicpuExecutor g_aicpu_executor; -// PTO2 device-mode state (shared memory view + per-task fanin refcount) -static constexpr int PTO2_MAX_SLOTS = PTO2_TASK_WINDOW_SIZE; -static int s_pto2_fanin_refcount[PTO2_MAX_SLOTS]; -static volatile int32_t s_pto2_task_completed[PTO2_MAX_SLOTS]; -static int32_t s_pto2_completed_by_task[PTO2_MAX_SLOTS]; // task_id that set completed state (for slot-reuse validation) +// PTO2 device-mode state (per-core dispatch payloads) static PTO2DispatchPayload s_pto2_payload_per_core[RUNTIME_MAX_WORKER]; // ===== AicpuExecutor Method Implementations ===== -// Helper: enqueue a ready task to the appropriate shard with profiling -inline void AicpuExecutor::enqueue_ready_task_with_profiling( - int32_t task_id, - int32_t worker_type, - int thread_idx -#if PTO2_ORCH_PROFILING - , uint64_t& wait_counter, - uint64_t& hold_counter -#endif -) { - int my_shard = thread_idx % active_shards_; -#if PTO2_ORCH_PROFILING - uint64_t _l0 = get_sys_cnt_aicpu(), _l1, _l2; -#endif - - if (worker_type == PTO2_WORKER_CUBE) { - ready_queue_aic_lock_[my_shard].lock(); -#if PTO2_ORCH_PROFILING - _l1 = get_sys_cnt_aicpu(); -#endif - ready_queue_aic_[my_shard][ready_queue_aic_tail_[my_shard]++ & AICPU_READY_MASK] = task_id; - ready_queue_aic_lock_[my_shard].unlock(); - } else { - ready_queue_aiv_lock_[my_shard].lock(); -#if PTO2_ORCH_PROFILING - _l1 = get_sys_cnt_aicpu(); -#endif - ready_queue_aiv_[my_shard][ready_queue_aiv_tail_[my_shard]++ & AICPU_READY_MASK] = task_id; - ready_queue_aiv_lock_[my_shard].unlock(); - } - -#if PTO2_ORCH_PROFILING - _l2 = get_sys_cnt_aicpu(); - wait_counter += (_l1 - _l0); - hold_counter += (_l2 - _l1); -#endif -} - /** * Handshake with all cores and discover their types * Sets up register addresses for fast dispatch. @@ -397,17 +315,7 @@ int AicpuExecutor::init(Runtime* runtime) { DEV_INFO("Init: orch_built_on_host=%d", orch_on_host ? 1 : 0); orchestrator_done_.store(orch_on_host, std::memory_order_release); - // Read ready queue shard count from Runtime (already validated by host) - active_shards_ = runtime->ready_queue_shards; - DEV_ALWAYS("Ready queue shards: %d (max=%d)", active_shards_, MAX_AICPU_THREADS); - - // Initial ready tasks will be populated from PTO2 shared memory in resolve_and_dispatch_pto2 - for (int s = 0; s < MAX_AICPU_THREADS; s++) { - ready_queue_aic_head_[s] = 0; - ready_queue_aic_tail_[s] = 0; - ready_queue_aiv_head_[s] = 0; - ready_queue_aiv_tail_[s] = 0; - } + // Initial ready tasks will be populated via scheduler ready queues // Reset per-core dispatch timestamps and task counters for (int i = 0; i < RUNTIME_MAX_WORKER; i++) { @@ -485,14 +393,7 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } DEV_INFO("Thread %d: sm_base=%p", thread_idx, sm_base); - // Device orchestration: wait for last thread to initialize SM header - if (thread_num_ > 1 && !runtime->get_orch_built_on_host()) { - while (!sm_header_ready_.load(std::memory_order_acquire)) { - } - } - PTO2SharedMemoryHeader* header = static_cast(sm_base); - void* gm_heap_base = runtime->get_pto2_gm_heap_ptr(); // For heap_tail offset calc DEV_INFO("Thread %d: header=%p, task_desc_offset=%d, dep_pool_offset=%d, window_size=%d", thread_idx, (void*)header, header->task_descriptors_offset, header->dep_list_pool_offset, header->task_window_size); @@ -505,19 +406,16 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, thread_idx, (void*)task_descriptors, (void*)dep_list_pool); int32_t window_size = header->task_window_size; - if (window_size <= 0 || window_size > PTO2_MAX_SLOTS) window_size = PTO2_MAX_SLOTS; + if (window_size <= 0 || window_size > PTO2_TASK_WINDOW_SIZE) window_size = PTO2_TASK_WINDOW_SIZE; int32_t window_mask = window_size - 1; Handshake* hank = static_cast(runtime->workers); DEV_INFO("Thread %d: hank=%p, window_size=%d", thread_idx, (void*)hank, window_size); - // One-time init: clear refcount and completed arrays (one thread does it; others wait) + // One-time init: assign perf buffers (one thread does it; others wait) if (!pto2_init_done_.exchange(true, std::memory_order_acq_rel)) { DEV_INFO("Thread %d: doing one-time init", thread_idx); - std::memset(s_pto2_fanin_refcount, 0, sizeof(s_pto2_fanin_refcount)); - std::memset((void*)s_pto2_task_completed, 0, sizeof(s_pto2_task_completed)); - std::memset(s_pto2_completed_by_task, -1, sizeof(s_pto2_completed_by_task)); // Assign perf buffers to cores early so profiling captures all tasks // (total_tasks written to header later when orchestrator completes) @@ -533,13 +431,6 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, pto2_init_complete_.store(true, std::memory_order_release); } else { while (!pto2_init_complete_.load(std::memory_order_acquire)) { - } - } - - // Wait for last thread to finish setting up aicpu parallel mode pointers - // and orch_ready_queue before entering the scheduling loop. - if (thread_num_ > 1 && !runtime->get_orch_built_on_host()) { - while (!orch_pointers_ready_.load(std::memory_order_acquire)) { std::this_thread::yield(); } } @@ -561,37 +452,19 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, // Scheduler profiling counters #if PTO2_ORCH_PROFILING uint64_t sched_scan_cycle = 0; - uint64_t sched_early_ready_cycle = 0; + uint64_t sched_orch_drain_cycle = 0; uint64_t sched_complete_cycle = 0; uint64_t sched_dispatch_cycle = 0; + uint64_t sched_yield_cycle = 0; uint64_t sched_loop_count = 0; - uint64_t sched_scan_ready_wait = 0, sched_scan_ready_hold = 0; - uint64_t sched_early_ready_wait = 0, sched_early_ready_hold = 0; - uint64_t sched_complete_ready_wait = 0, sched_complete_ready_hold = 0; - uint64_t sched_dispatch_hit_wait = 0, sched_dispatch_hit_hold = 0; - uint64_t sched_dispatch_miss_wait = 0, sched_dispatch_miss_hold = 0; - uint64_t ready_pop_own = 0, ready_pop_steal = 0; + uint64_t sched_yield_count = 0; #endif - // Phase profiling: per-phase task counters - uint32_t phase_complete_count = 0; - uint32_t phase_dispatch_count = 0; - uint32_t phase_scan_count = 0; - uint32_t phase_early_ready_count = 0; - // Fanout traversal statistics: how many downstream deps were notified after task completions - uint64_t fanout_edges_notified = 0; - int32_t fanout_max_degree = 0; while (true) { #if PTO2_ORCH_PROFILING sched_loop_count++; #endif CYCLE_COUNT_START(); - // Phase profiling: record start time for this iteration - uint64_t _t0_phase = _t0; - phase_complete_count = 0; - phase_dispatch_count = 0; - phase_scan_count = 0; - phase_early_ready_count = 0; // Dynamic task_count (Thread 3 sets total_tasks_ when orchestration completes) int32_t task_count = total_tasks_.load(std::memory_order_acquire); bool orch_done = orchestrator_done_.load(std::memory_order_acquire); @@ -635,6 +508,7 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, PTO2DispatchPayload* payload = &s_pto2_payload_per_core[core_id]; int32_t task_id = executing_task_ids_[core_id]; + pto2_scheduler_on_task_complete(&rt->scheduler, task_id); executing_task_ids_[core_id] = AICPU_TASK_INVALID; // Write AICPU dispatch/finish timestamps into the PerfRecord @@ -654,141 +528,12 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } } - PTO2TaskDescriptor* pto2_task = &task_descriptors[task_id & window_mask]; - DEV_DEBUG("Thread %d: Core %d completed PTO2 task %d", thread_idx, core_id, task_id); - // Mark completed (state=2), then snapshot fanout_head under the per-task spinlock. - // - // WHY THE LOCK IS REQUIRED (device orchestration / AICPU parallel mode): - // The orchestrator (Thread 3) runs concurrently with the scheduler threads and - // may still be adding consumers to this task's fanout list via - // pto2_add_consumer_to_producer(). That function holds fanout_lock while it - // (a) checks the completion state and (b) prepends to fanout_head. - // - // Without the lock here we have a TOCTOU race: - // 1. Orch acquires lock, checks state=0 (task still running), plans insert. - // 2. Task finishes; we store state=2 (RELEASE) but haven't acquired the lock. - // 3. Orch inserts consumer X into fanout_head, releases lock. - // 4. We read the OLD fanout_head (before X was inserted) → X is never woken. - // - // By acquiring the lock AFTER storing state=2 we guarantee mutual exclusion: - // • If Orch holds the lock first → it writes fanout_head → we read it with X. - // • If we acquire the lock first → Orch's subsequent lock-acquire sees state=2 - // via the release/acquire pair and takes the early-return path, directly - // incrementing X's fanin_refcount instead of touching fanout_head. - // Either way every consumer is accounted for exactly once. - __atomic_store_n(&s_pto2_completed_by_task[task_id & window_mask], task_id, __ATOMIC_RELEASE); - __atomic_store_n(&s_pto2_task_completed[task_id & window_mask], 2, __ATOMIC_RELEASE); - pto2_fanout_lock(pto2_task); - int32_t fanout_head = (int32_t)pto2_task->fanout_head; - pto2_fanout_unlock(pto2_task); - - // Traverse fanout (no lock) - // - // SEQ_CST on the refcount increment and fanin_count load breaks the IRIW - // (Independent Reads of Independent Writes) hazard with the orchestrator's - // Step 5 / Step 5b: - // - // Thread 0 (here): Thread 3 (orchestrator Step 5/5b): - // fetch_add(refcount, SEQ_CST) store(fanin_count=N, SEQ_CST) - // load(fanin_count, SEQ_CST) load(refcount, SEQ_CST) - // - // On ARM (IRIW is architecturally allowed with ACQ/REL), both threads could - // simultaneously read stale values — this thread sees fanin_count=0 and Step 5b - // sees refcount 0) { - fanout_len++; - PTO2DepListEntry* entry = &dep_list_pool[current]; - int32_t consumer_id = entry->task_id; - int32_t consumer_slot = consumer_id & window_mask; - int prev = __atomic_fetch_add(&s_pto2_fanin_refcount[consumer_slot], 1, __ATOMIC_SEQ_CST); - PTO2TaskDescriptor* consumer_desc = &task_descriptors[consumer_slot]; - int32_t fanin_count = __atomic_load_n(&consumer_desc->fanin_count, __ATOMIC_SEQ_CST); - if (prev + 1 == fanin_count) { - __atomic_store_n(&s_pto2_task_completed[consumer_slot], 1, __ATOMIC_RELEASE); - enqueue_ready_task_with_profiling( - consumer_id, consumer_desc->worker_type, thread_idx -#if PTO2_ORCH_PROFILING - , sched_complete_ready_wait, sched_complete_ready_hold -#endif - ); - } - current = entry->next_offset; - } - fanout_edges_notified += fanout_len; - if (fanout_len > fanout_max_degree) fanout_max_degree = fanout_len; - cur_thread_tasks_in_flight--; cur_thread_completed++; - phase_complete_count++; made_progress = true; completed_tasks_.fetch_add(1, std::memory_order_release); - - // Advance last_task_alive for TaskRing flow control. - // Mark this task as fully consumed (state=3), then try to - // advance the watermark using lock-free CAS. - // - // ORDERING: Mark completed as state=3 and reset refcount BEFORE advancing last_task_alive. - // Once last_task_alive advances past a slot, the orchestrator can - // immediately reuse it. The early-return path in - // pto2_add_consumer_to_producer checks aicpu_task_completed[prod_slot]; - // if we reset AFTER the CAS, the orchestrator could see stale state=3 - // from the old task and incorrectly skip dependency setup. - __atomic_store_n(&s_pto2_task_completed[task_id & window_mask], 3, __ATOMIC_RELEASE); - { - int32_t la = __atomic_load_n(&header->last_task_alive, __ATOMIC_ACQUIRE); - int32_t cti = __atomic_load_n(&header->current_task_index, __ATOMIC_ACQUIRE); - while (la < cti) { - int32_t la_slot = la & window_mask; - if (__atomic_load_n(&s_pto2_task_completed[la_slot], __ATOMIC_ACQUIRE) < 3) - break; - // Only reset refcount — the orchestrator's early-return path - // (pto2_add_consumer_to_producer) MUST see completed >= 2 when - // the producer has actually finished, per the fanout lock protocol. - // completed_by_task guards against stale state from recycled slots: - // the old task's completed_by_task won't match the new producer_id. - __atomic_store_n(&s_pto2_fanin_refcount[la_slot], 0, __ATOMIC_RELEASE); - // Advance last_task_alive to make this slot available. - int32_t expected = la; - if (__atomic_compare_exchange_n(&header->last_task_alive, &expected, la + 1, - false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) { - // Serialize heap_tail writes via ticket-based generation counter. - // Without this, concurrent CAS winners can interleave their - // heap_tail stores, causing stale regression (see design note below). - // - // DESIGN: heap_tail_gen tracks which task's tail was last written. - // Each CAS winner waits for gen==la (its ticket), writes heap_tail, - // then advances gen to la+1. The critical section is ~3 instructions, - // so the spin is effectively zero in the common (no-preemption) case. - while (__atomic_load_n(&header->heap_tail_gen, __ATOMIC_ACQUIRE) != la) { - } - - // Advance heap_tail for HeapRing flow control - PTO2TaskDescriptor* consumed_t = &task_descriptors[la_slot]; - if (consumed_t->packed_buffer_end != nullptr) { - uint64_t new_tail = (uint64_t)((char*)consumed_t->packed_buffer_end - (char*)gm_heap_base); - if (new_tail <= header->heap_size) { - __atomic_store_n(&header->heap_tail, new_tail, __ATOMIC_RELEASE); - } - } - - // Release next writer - __atomic_store_n(&header->heap_tail_gen, la + 1, __ATOMIC_RELEASE); - - la = la + 1; - } else { - break; - } - } - } - // Debug: periodic progress (thread 0 only) to find which task hangs if (thread_idx == 0 && task_count > 0) { int32_t c = completed_tasks_.load(std::memory_order_relaxed); @@ -800,13 +545,6 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } } CYCLE_COUNT_LAP(sched_complete_cycle); -#if PTO2_ORCH_PROFILING - if (profiling_enabled) { - perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_COMPLETE, - _t0_phase, _t1, static_cast(sched_loop_count), phase_complete_count); - _t0_phase = _t1; - } -#endif // Phase 2: Dispatch ready tasks to idle cores (register-based dispatch) if (cur_thread_tasks_in_flight < core_num) { @@ -817,76 +555,8 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, int reg_state = EXTRACT_TASK_STATE(reg_val); if (reg_state == TASK_FIN_STATE && executing_task_ids_[core_id] == AICPU_TASK_INVALID) { Handshake* h = &hank[core_id]; - int32_t task_id = AICPU_TASK_INVALID; -#if PTO2_ORCH_PROFILING - bool found_task = false; - bool is_stolen = false; -#endif - int my_shard = thread_idx % active_shards_; - if (h->core_type == CoreType::AIC) { - for (int k = 0; k < active_shards_ && task_id < 0; k++) { - int shard = (my_shard + k) % active_shards_; -#if PTO2_ORCH_PROFILING - uint64_t _l0 = get_sys_cnt_aicpu(); -#endif - ready_queue_aic_lock_[shard].lock(); -#if PTO2_ORCH_PROFILING - uint64_t _l1 = get_sys_cnt_aicpu(); -#endif - if (ready_queue_aic_head_[shard] < ready_queue_aic_tail_[shard]) { - task_id = ready_queue_aic_[shard][ready_queue_aic_head_[shard]++ & AICPU_READY_MASK]; - ready_queue_aic_lock_[shard].unlock(); -#if PTO2_ORCH_PROFILING - uint64_t _l2 = get_sys_cnt_aicpu(); - sched_dispatch_hit_wait += (_l1 - _l0); - sched_dispatch_hit_hold += (_l2 - _l1); - found_task = true; - is_stolen = (k != 0); -#endif - break; - } - ready_queue_aic_lock_[shard].unlock(); -#if PTO2_ORCH_PROFILING - uint64_t _l2 = get_sys_cnt_aicpu(); - sched_dispatch_miss_wait += (_l1 - _l0); - sched_dispatch_miss_hold += (_l2 - _l1); -#endif - } - } else { - for (int k = 0; k < active_shards_ && task_id < 0; k++) { - int shard = (my_shard + k) % active_shards_; -#if PTO2_ORCH_PROFILING - uint64_t _l0 = get_sys_cnt_aicpu(); -#endif - ready_queue_aiv_lock_[shard].lock(); -#if PTO2_ORCH_PROFILING - uint64_t _l1 = get_sys_cnt_aicpu(); -#endif - if (ready_queue_aiv_head_[shard] < ready_queue_aiv_tail_[shard]) { - task_id = ready_queue_aiv_[shard][ready_queue_aiv_head_[shard]++ & AICPU_READY_MASK]; - ready_queue_aiv_lock_[shard].unlock(); -#if PTO2_ORCH_PROFILING - uint64_t _l2 = get_sys_cnt_aicpu(); - sched_dispatch_hit_wait += (_l1 - _l0); - sched_dispatch_hit_hold += (_l2 - _l1); - found_task = true; - is_stolen = (k != 0); -#endif - break; - } - ready_queue_aiv_lock_[shard].unlock(); -#if PTO2_ORCH_PROFILING - uint64_t _l2 = get_sys_cnt_aicpu(); - sched_dispatch_miss_wait += (_l1 - _l0); - sched_dispatch_miss_hold += (_l2 - _l1); -#endif - } - } -#if PTO2_ORCH_PROFILING - if (found_task) { - if (is_stolen) ready_pop_steal++; else ready_pop_own++; - } -#endif + PTO2WorkerType wt = (h->core_type == CoreType::AIC) ? PTO2_WORKER_CUBE : PTO2_WORKER_VECTOR; + int32_t task_id = pto2_scheduler_get_ready_task(&rt->scheduler, wt); if (task_id >= 0) { PTO2TaskDescriptor* task = &task_descriptors[task_id & window_mask]; PTO2DispatchPayload* payload = &s_pto2_payload_per_core[core_id]; @@ -900,11 +570,9 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } core_dispatch_counts_[core_id]++; } - write_reg(reg_addr, RegId::DATA_MAIN_BASE, static_cast(task_id + 1)); executing_task_ids_[core_id] = task_id; cur_thread_tasks_in_flight++; - phase_dispatch_count++; made_progress = true; DEV_DEBUG("Thread %d: Dispatching PTO2 task %d to core %d", thread_idx, task_id, core_id); } @@ -912,19 +580,10 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } } CYCLE_COUNT_LAP(sched_dispatch_cycle); -#if PTO2_ORCH_PROFILING - if (profiling_enabled) { - perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_DISPATCH, - _t0_phase, _t1, static_cast(sched_loop_count), phase_dispatch_count); - _t0_phase = _t1; - } -#endif - // Incremental scan: discover root tasks (fanin_count == 0) + // Update perf header total_tasks if visible tasks have changed { int32_t visible = __atomic_load_n(&header->current_task_index, __ATOMIC_ACQUIRE); - - // Update perf header total_tasks if visible tasks have changed if (profiling_enabled && visible > 0 && visible != last_reported_task_count) { perf_aicpu_update_total_tasks(runtime, static_cast(visible)); @@ -933,79 +592,8 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, last_reported_task_count = visible; } - - while (true) { - int32_t idx = next_scan_index_.load(std::memory_order_acquire); - if (idx >= visible) break; - if (!next_scan_index_.compare_exchange_weak(idx, idx + 1, - std::memory_order_acq_rel, std::memory_order_acquire)) continue; - - int32_t slot = idx & window_mask; - - PTO2TaskDescriptor* t = &task_descriptors[slot]; - int32_t fanin_count = __atomic_load_n(&t->fanin_count, __ATOMIC_ACQUIRE); - if (fanin_count == 0) { - // Mark as enqueued (state=1) to prevent double-enqueue - __atomic_store_n(&s_pto2_task_completed[slot], 1, __ATOMIC_RELEASE); - enqueue_ready_task_with_profiling( - idx, t->worker_type, thread_idx -#if PTO2_ORCH_PROFILING - , sched_scan_ready_wait, sched_scan_ready_hold -#endif - ); - phase_scan_count++; - made_progress = true; - } - } } CYCLE_COUNT_LAP(sched_scan_cycle); -#if PTO2_ORCH_PROFILING - if (profiling_enabled) { - perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_SCAN, - _t0_phase, _t1, static_cast(sched_loop_count), phase_scan_count); - _t0_phase = _t1; - } -#endif - - // Early-ready drain: tasks whose deps were already met at submit time - // (orchestrator detected all producers completed → pushed to orch_ready_queue_) - if (orch_ready_queue_ != nullptr) { - while (true) { - int32_t head = __atomic_load_n(orch_ready_head_, __ATOMIC_ACQUIRE); - int32_t tail = __atomic_load_n(orch_ready_tail_, __ATOMIC_ACQUIRE); - if (head == tail) break; // queue empty - - // CAS to claim this slot (multiple scheduler threads compete) - if (!__atomic_compare_exchange_n(orch_ready_head_, &head, head + 1, - false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) continue; - - int32_t task_id = orch_ready_queue_[head & (orch_ready_capacity_ - 1)]; - int32_t slot = task_id & window_mask; - - // CAS from 0 → 1 to claim enqueue rights (may already be enqueued by fanout path) - int32_t expected = 0; - if (!__atomic_compare_exchange_n(&s_pto2_task_completed[slot], &expected, 1, - false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) continue; - - PTO2TaskDescriptor* t = &task_descriptors[slot]; - enqueue_ready_task_with_profiling( - task_id, t->worker_type, thread_idx -#if PTO2_ORCH_PROFILING - , sched_early_ready_wait, sched_early_ready_hold -#endif - ); - phase_early_ready_count++; - made_progress = true; - } - } - CYCLE_COUNT_LAP(sched_early_ready_cycle); -#if PTO2_ORCH_PROFILING - if (profiling_enabled) { - perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_EARLY_READY, - _t0_phase, _t1, static_cast(sched_loop_count), phase_early_ready_count); - _t0_phase = _t1; - } -#endif if (!made_progress) { idle_iterations++; @@ -1013,31 +601,30 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, int32_t c = completed_tasks_.load(std::memory_order_relaxed); DEV_ALWAYS("PTO2 stall: no progress for %d iterations, completed=%d total=%d", idle_iterations, c, task_count); - // Scan all task slots to find truly stuck tasks - // state=0: not yet completed (may be waiting for deps or ready but not enqueued) - // state=1: enqueued in ready queue or dispatched to hardware - // state=2: completed by Phase 1 + // Scan all task slots to find truly stuck tasks using scheduler state + PTO2SchedulerState* sched = &rt->scheduler; int cnt_ready = 0, cnt_waiting = 0, cnt_inflight = 0; for (int si = 0; si < task_count; si++) { - int32_t st = __atomic_load_n(&s_pto2_task_completed[si], __ATOMIC_RELAXED); - int32_t rc = __atomic_load_n(&s_pto2_fanin_refcount[si], __ATOMIC_RELAXED); - int32_t fi = __atomic_load_n(&task_descriptors[si].fanin_count, __ATOMIC_RELAXED); - int32_t kid = task_descriptors[si].kernel_id; - if (st == 2) continue; // Already done - if (st == 1) { cnt_inflight++; continue; } - // st == 0 + int32_t slot = si & window_mask; + PTO2TaskState st = (PTO2TaskState)__atomic_load_n(&sched->task_state[slot], __ATOMIC_RELAXED); + int32_t rc = __atomic_load_n(&sched->fanin_refcount[slot], __ATOMIC_RELAXED); + int32_t fi = __atomic_load_n(&task_descriptors[slot].fanin_count, __ATOMIC_RELAXED); + int32_t kid = task_descriptors[slot].kernel_id; + if (st >= PTO2_TASK_COMPLETED) continue; // Already done + if (st == PTO2_TASK_READY || st == PTO2_TASK_RUNNING) { cnt_inflight++; continue; } + // PENDING if (rc >= fi) { // Ready (all deps satisfied) but not enqueued — this is the real bug cnt_ready++; if (cnt_ready <= STALL_DUMP_READY_MAX) { - DEV_ALWAYS(" STUCK-READY slot=%d kernel_id=%d refcount=%d fanin=%d", - si, kid, rc, fi); + DEV_ALWAYS(" STUCK-READY slot=%d kernel_id=%d refcount=%d fanin=%d state=%d", + slot, kid, rc, fi, (int)st); } } else { cnt_waiting++; if (cnt_waiting <= STALL_DUMP_WAIT_MAX) { - DEV_ALWAYS(" STUCK-WAIT slot=%d kernel_id=%d refcount=%d fanin=%d", - si, kid, rc, fi); + DEV_ALWAYS(" STUCK-WAIT slot=%d kernel_id=%d refcount=%d fanin=%d state=%d", + slot, kid, rc, fi, (int)st); } } } @@ -1069,6 +656,10 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } else { SPIN_WAIT_HINT(); } +#if PTO2_ORCH_PROFILING + sched_yield_count++; +#endif + CYCLE_COUNT_LAP(sched_yield_cycle); } else { idle_iterations = 0; } @@ -1076,56 +667,33 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, #if PTO2_ORCH_PROFILING uint64_t sched_total = - sched_scan_cycle + sched_early_ready_cycle + sched_complete_cycle + sched_dispatch_cycle; + sched_scan_cycle + sched_orch_drain_cycle + sched_complete_cycle + sched_dispatch_cycle + sched_yield_cycle; if (sched_total == 0) sched_total = 1; // avoid div-by-zero - double tasks_per_loop = sched_loop_count > 0 ? (double)cur_thread_completed / sched_loop_count : 0.0; - - // === Summary === - DEV_ALWAYS("Thread %d: === PTO2 Scheduler Summary ===", thread_idx); - DEV_ALWAYS("Thread %d: completed=%d tasks in %.0fus (%llu loops, %.1f tasks/loop)", - thread_idx, cur_thread_completed, cycles_to_us(sched_total), - (unsigned long long)sched_loop_count, tasks_per_loop); - - // --- Phase Breakdown (execution order) --- - DEV_ALWAYS("Thread %d: --- Phase Breakdown (execution order) ---", thread_idx); - DEV_ALWAYS("Thread %d: scan: %8.0fus (%4.1f%%)", - thread_idx, cycles_to_us(sched_scan_cycle), sched_scan_cycle * 100.0 / sched_total); - DEV_ALWAYS("Thread %d: early_ready: %8.0fus (%4.1f%%) (deps already met at submit time)", - thread_idx, cycles_to_us(sched_early_ready_cycle), sched_early_ready_cycle * 100.0 / sched_total); - DEV_ALWAYS("Thread %d: complete: %8.0fus (%4.1f%%) [fanout: edges=%llu, max_degree=%d, avg=%.1f]", - thread_idx, cycles_to_us(sched_complete_cycle), sched_complete_cycle * 100.0 / sched_total, - (unsigned long long)fanout_edges_notified, fanout_max_degree, - cur_thread_completed > 0 ? (double)fanout_edges_notified / cur_thread_completed : 0.0); - DEV_ALWAYS("Thread %d: dispatch: %8.0fus (%4.1f%%) [steal: own=%llu, steal=%llu, pct=%.1f%%]", - thread_idx, cycles_to_us(sched_dispatch_cycle), sched_dispatch_cycle * 100.0 / sched_total, - (unsigned long long)ready_pop_own, (unsigned long long)ready_pop_steal, - (ready_pop_own + ready_pop_steal) > 0 ? 100.0 * (double)ready_pop_steal / (double)(ready_pop_own + ready_pop_steal) : 0.0); - - // --- Lock Contention (ready_q) --- - DEV_ALWAYS("Thread %d: --- Lock Contention (ready_q) ---", thread_idx); - DEV_ALWAYS("Thread %d: total: wait=%5.0fus hold=%5.0fus", - thread_idx, - (double)cycles_to_us(sched_scan_ready_wait + sched_early_ready_wait + sched_complete_ready_wait + sched_dispatch_hit_wait + sched_dispatch_miss_wait), - (double)cycles_to_us(sched_scan_ready_hold + sched_early_ready_hold + sched_complete_ready_hold + sched_dispatch_hit_hold + sched_dispatch_miss_hold)); - DEV_ALWAYS("Thread %d: scan: wait=%5.0fus hold=%5.0fus", - thread_idx, - (double)cycles_to_us(sched_scan_ready_wait), (double)cycles_to_us(sched_scan_ready_hold)); - DEV_ALWAYS("Thread %d: early_ready: wait=%5.0fus hold=%5.0fus", + DEV_ALWAYS("Thread %d: PTO2 scheduler stats: loops=%llu, completed=%d, total=%.3fus", thread_idx, - (double)cycles_to_us(sched_early_ready_wait), (double)cycles_to_us(sched_early_ready_hold)); - DEV_ALWAYS("Thread %d: complete: wait=%5.0fus hold=%5.0fus", + (unsigned long long)sched_loop_count, + cur_thread_completed, + cycles_to_us(sched_total)); + DEV_ALWAYS( + "Thread %d: scan=%.3fus (%.1f%%), orch_drain=%.3fus (%.1f%%), complete=%.3fus (%.1f%%), dispatch=%.3fus " + "(%.1f%%)", thread_idx, - (double)cycles_to_us(sched_complete_ready_wait), (double)cycles_to_us(sched_complete_ready_hold)); - DEV_ALWAYS("Thread %d: dispatch: wait=%5.0fus hold=%5.0fus", + cycles_to_us(sched_scan_cycle), + sched_scan_cycle * 100.0 / sched_total, + cycles_to_us(sched_orch_drain_cycle), + sched_orch_drain_cycle * 100.0 / sched_total, + cycles_to_us(sched_complete_cycle), + sched_complete_cycle * 100.0 / sched_total, + cycles_to_us(sched_dispatch_cycle), + sched_dispatch_cycle * 100.0 / sched_total); + DEV_ALWAYS("Thread %d: yield=%.3fus (%.1f%%, %llu calls, avg=%.1fus)", thread_idx, - (double)cycles_to_us(sched_dispatch_hit_wait + sched_dispatch_miss_wait), - (double)cycles_to_us(sched_dispatch_hit_hold + sched_dispatch_miss_hold)); - DEV_ALWAYS("Thread %d: hit: wait=%5.0fus hold=%5.0fus (dequeued task)", - thread_idx, - (double)cycles_to_us(sched_dispatch_hit_wait), (double)cycles_to_us(sched_dispatch_hit_hold)); - DEV_ALWAYS("Thread %d: miss: wait=%5.0fus hold=%5.0fus (empty queue)", - thread_idx, - (double)cycles_to_us(sched_dispatch_miss_wait), (double)cycles_to_us(sched_dispatch_miss_hold)); + cycles_to_us(sched_yield_cycle), + sched_yield_cycle * 100.0 / sched_total, + (unsigned long long)sched_yield_count, + sched_yield_count > 0 ? cycles_to_us(sched_yield_cycle) / sched_yield_count : 0.0); + + DEV_ALWAYS("Thread %d: PTO2 execution complete, completed %d tasks", thread_idx, cur_thread_completed); #endif // Flush performance buffers for cores managed by this thread @@ -1146,6 +714,7 @@ int AicpuExecutor::run(Runtime* runtime) { // Thread 3 when 4 AICPU threads: orchestrator (no cores) if (thread_num_ == 4 && thread_idx == 3) { + rt = nullptr; if (runtime->get_orch_built_on_host()) { DEV_INFO("Thread 3: Host orchestration mode, no-op"); } else { @@ -1256,6 +825,13 @@ int AicpuExecutor::run(Runtime* runtime) { DEV_INFO("Thread 3: No config function, using defaults"); } + if (expected_arg_count > 0 && arg_count < expected_arg_count) { + DEV_ERROR("Thread 3: arg_count %d < expected %d", arg_count, expected_arg_count); + dlclose(handle); + unlink(so_path); + return -1; + } + // Apply ring buffer size overrides from Runtime (set by host env vars) if (runtime->pto2_task_window_size > 0) { task_window_size = runtime->pto2_task_window_size; @@ -1269,16 +845,8 @@ int AicpuExecutor::run(Runtime* runtime) { DEV_INFO("Thread 3: Ring sizes: task_window=%lu, heap=%lu, dep_pool=%lu", (unsigned long)task_window_size, (unsigned long)heap_size, (unsigned long)dep_list_pool_size); - if (expected_arg_count > 0 && arg_count < expected_arg_count) { - DEV_ERROR("Thread 3: arg_count %d < expected %d", arg_count, expected_arg_count); - dlclose(handle); - unlink(so_path); - return -1; - } - // Get GM heap from runtime (dedicated field) void* sm_ptr = runtime->get_pto2_gm_sm_ptr(); - PTO2SharedMemoryHeader* header = static_cast(sm_ptr); void* gm_heap = runtime->get_pto2_gm_heap_ptr(); // Create shared memory handle and runtime (ops table populated inside) @@ -1293,10 +861,7 @@ int AicpuExecutor::run(Runtime* runtime) { return -1; } - // Signal scheduler threads that SM header is initialized - sm_header_ready_.store(true, std::memory_order_release); - - PTO2Runtime* rt = pto2_runtime_create_from_sm(PTO2_MODE_EXECUTE, + rt = pto2_runtime_create_from_sm(PTO2_MODE_EXECUTE, sm_handle, gm_heap, heap_size); if (!rt) { DEV_ERROR("Thread 3: Failed to create PTO2Runtime"); @@ -1306,27 +871,13 @@ int AicpuExecutor::run(Runtime* runtime) { return -1; } - // Wait for scheduler's one-time init to complete (ensures memset has executed) + runtime_init_ready_.store(true, std::memory_order_release); + + // Wait for scheduler's one-time init to complete while (!pto2_init_complete_.load(std::memory_order_acquire)) { + std::this_thread::yield(); } - // Set orchestrator's aicpu parallel mode pointers - uint64_t ws = header->task_window_size; - if (ws == 0 || ws > PTO2_MAX_SLOTS) ws = PTO2_MAX_SLOTS; - rt->orchestrator.aicpu_fanin_refcount = s_pto2_fanin_refcount; - rt->orchestrator.aicpu_task_completed = s_pto2_task_completed; - rt->orchestrator.aicpu_completed_by_task = s_pto2_completed_by_task; - rt->orchestrator.aicpu_window_mask = ws - 1; - - // Expose orchestrator ready queue to scheduler threads - orch_ready_queue_ = rt->orchestrator.orch_ready_queue; - orch_ready_tail_ = &rt->orchestrator.orch_ready_tail; - orch_ready_head_ = &rt->orchestrator.orch_ready_head; - orch_ready_capacity_ = PTO2OrchestratorState::ORCH_READY_QUEUE_SIZE; - - // Signal scheduler threads: all pointers are ready, safe to start scheduling. - orch_pointers_ready_.store(true, std::memory_order_release); - // Call orchestration wrapped in outer scope (matches old PTO2_ORCHESTRATION behavior) DEV_ALWAYS("Thread 3: Calling aicpu_orchestration_entry from SO"); uint64_t orch_cycle_start = get_sys_cnt_aicpu(); @@ -1419,6 +970,13 @@ int AicpuExecutor::run(Runtime* runtime) { } else { // Note: Handshake already completed in init() via handshake_all_cores() + // Device orchestration: wait for Thread 3 to initialize SM header + if (!runtime->get_orch_built_on_host()) { + while (!runtime_init_ready_.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + } + always_assert(rt != nullptr); DEV_INFO("Thread %d: Starting PTO2 dispatch", thread_idx); int completed = resolve_and_dispatch_pto2(runtime, thread_idx, cur_thread_cores, my_cores); DEV_INFO("Thread %d: Executed %d tasks from runtime", thread_idx, completed); @@ -1442,14 +1000,6 @@ int AicpuExecutor::run(Runtime* runtime) { } void AicpuExecutor::deinit() { - // Cleanup runtime execution state (clear all max slots for safety) - for (int s = 0; s < MAX_AICPU_THREADS; s++) { - ready_queue_aic_head_[s] = 0; - ready_queue_aic_tail_[s] = 0; - ready_queue_aiv_head_[s] = 0; - ready_queue_aiv_tail_[s] = 0; - } - // Reset per-core dispatch timestamps and task counters for (int i = 0; i < RUNTIME_MAX_WORKER; i++) { dispatch_timestamps_[i] = 0; @@ -1462,13 +1012,7 @@ void AicpuExecutor::deinit() { orchestrator_done_.store(false, std::memory_order_release); pto2_init_done_.store(false, std::memory_order_release); pto2_init_complete_.store(false, std::memory_order_release); - next_scan_index_.store(0, std::memory_order_release); - sm_header_ready_.store(false, std::memory_order_release); - orch_pointers_ready_.store(false, std::memory_order_release); - orch_ready_queue_ = nullptr; - orch_ready_tail_ = nullptr; - orch_ready_head_ = nullptr; - orch_ready_capacity_ = 0; + runtime_init_ready_.store(false, std::memory_order_release); // Reset core discovery state aic_count_ = 0; @@ -1503,12 +1047,13 @@ void AicpuExecutor::diagnose_stuck_state(Runtime* runtime, int thread_idx, DEV_ALWAYS("Progress: %d/%d tasks (%.1f%%)", completed, total, total > 0 ? completed * 100.0 / total : 0.0); - int aic_ready = 0, aiv_ready = 0; - for (int s = 0; s < active_shards_; s++) { - aic_ready += ready_queue_aic_tail_[s] - ready_queue_aic_head_[s]; - aiv_ready += ready_queue_aiv_tail_[s] - ready_queue_aiv_head_[s]; + uint64_t aic_ready = 0, aiv_ready = 0; + if (rt) { + PTO2SchedulerState* sched = &rt->scheduler; + aic_ready = pto2_ready_queue_count(&sched->ready_queues[PTO2_WORKER_CUBE]); + aiv_ready = pto2_ready_queue_count(&sched->ready_queues[PTO2_WORKER_VECTOR]); } - DEV_ALWAYS("Ready Queues (%d shards, per-thread push + work-steal pop): AIC=%d, AIV=%d", active_shards_, aic_ready, aiv_ready); + DEV_ALWAYS("Ready Queues: AIC=%lu, AIV=%lu", aic_ready, aiv_ready); int busy_cores = 0; int idle_cores = 0; diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index 547791b4c..fc6707d2d 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -213,10 +213,6 @@ void pto2_add_consumer_to_producer( if (orch->aicpu_task_completed) { int32_t prod_slot = producer_id & orch->aicpu_window_mask; if (__atomic_load_n(&orch->aicpu_task_completed[prod_slot], __ATOMIC_ACQUIRE) >= 2 && - // RELAXED is sufficient: the ACQUIRE on aicpu_task_completed above - // synchronizes with the RELEASE on task_completed in the scheduler, - // and completed_by_task is stored (with RELEASE) sequenced-before - // task_completed — so it is visible after the ACQUIRE load above. __atomic_load_n(&orch->aicpu_completed_by_task[prod_slot], __ATOMIC_RELAXED) == producer_id) { int32_t cons_slot = consumer_id & orch->aicpu_window_mask; __atomic_fetch_add(&orch->aicpu_fanin_refcount[cons_slot], 1, __ATOMIC_ACQ_REL); @@ -245,22 +241,6 @@ void pto2_add_consumer_to_producer( pto2_fanout_unlock(producer); } -void* pto2_alloc_packed_buffer(PTO2OrchestratorState* orch, int32_t total_size) { - if (total_size <= 0) { - return NULL; - } - - void* buffer = orch->heap_ring.pto2_heap_ring_alloc(total_size); - - orch->buffers_allocated++; - orch->bytes_allocated += total_size; - - // Update shared memory with new heap top - PTO2_STORE_RELEASE(&orch->sm_handle->header->heap_top, orch->heap_ring.top); - - return buffer; -} - void pto2_submit_task(PTO2OrchestratorState* orch, int32_t kernel_id, PTO2WorkerType worker_type, @@ -309,16 +289,13 @@ void pto2_submit_task(PTO2OrchestratorState* orch, // Register this task in its owning scope scope_tasks_push(orch, task_id); - // Temporary storage for collecting output sizes - int32_t total_output_size = 0; - // Temporary storage for fanin int32_t fanin_temp[PTO2_MAX_INPUTS]; int32_t fanin_count = 0; task->param_count = num_params; for (int i = 0; i < num_params; i++) { - task->params[i].type = params[i].type; + task->params[i].type = params[i].type; if (params[i].type == PTOParamType::SCALAR) { task->params[i].scalar_value = params[i].scalar_value; } else { @@ -328,8 +305,28 @@ void pto2_submit_task(PTO2OrchestratorState* orch, CYCLE_COUNT_LAP_RECORD(g_orch_params_cycle, AicpuPhaseId::ORCH_PARAMS); - // === STEP 2: First pass - collect output sizes and process inputs === + // Temporary storage for collecting output sizes + int32_t total_output_size = 0; + for (int i = 0; i < num_params; i++) { + PTOParam& p = task->params[i]; + if (p.type != PTOParamType::OUTPUT) { + continue; + } + auto& tensor_data = p.tensor.data(); + // Only allocate from ring buffer when caller did not provide an address + if (tensor_data.buffer.addr == 0) { + total_output_size += PTO2_ALIGN_UP(tensor_data.buffer.size, PTO2_PACKED_OUTPUT_ALIGN); + } + } + if (total_output_size > 0) { + task->packed_buffer_base = orch->pto2_alloc_packed_buffer(total_output_size); + task->packed_buffer_end = (char*)task->packed_buffer_base + total_output_size; + } + CYCLE_COUNT_LAP_RECORD(g_orch_heap_cycle, AicpuPhaseId::ORCH_HEAP); + + // === STEP 2: First pass - set output addr and process tensor === + int32_t offset = 0; for (int i = 0; i < num_params; i++) { PTOParam& p = task->params[i]; @@ -358,10 +355,6 @@ void pto2_submit_task(PTO2OrchestratorState* orch, if (fanin_count < PTO2_MAX_INPUTS) { fanin_temp[fanin_count++] = producer_task_id; } - - // Add this task to producer's fanout list (with spinlock) - PTO2TaskDescriptor* producer = pto2_task_ring_get(&orch->task_ring, producer_task_id); - pto2_add_consumer_to_producer(orch, producer, producer_task_id, task_id); } if (p.type == PTOParamType::INOUT && overlap_status == OverlapStatus::COVERED) { // inout因为会再次insert进tensor map, @@ -378,9 +371,13 @@ void pto2_submit_task(PTO2OrchestratorState* orch, case PTOParamType::OUTPUT: { auto &tensor_data = p.tensor.data(); - // Only allocate from ring buffer when caller did not provide an address + // Offsets: each output at 1024B-aligned slot; slot size = ALIGN_UP(size, 1024) + // Allocation happens here only; no memcpy of buffer content. Caller's tensor gets addr written back. if (tensor_data.buffer.addr == 0) { - total_output_size += PTO2_ALIGN_UP(tensor_data.buffer.size, PTO2_PACKED_OUTPUT_ALIGN); + uint64_t alloc_addr = reinterpret_cast((char*)task->packed_buffer_base + offset); + tensor_data.buffer.addr = alloc_addr; + offset += PTO2_ALIGN_UP(tensor_data.buffer.size, PTO2_PACKED_OUTPUT_ALIGN); + } break; } @@ -391,29 +388,6 @@ void pto2_submit_task(PTO2OrchestratorState* orch, CYCLE_COUNT_LAP_RECORD(g_orch_lookup_cycle, AicpuPhaseId::ORCH_LOOKUP); - // === STEP 3: Allocate packed buffer from Heap Ring (may stall) === - // Each output slot is aligned to PTO2_PACKED_OUTPUT_ALIGN (1024B); gap after data is padding. - if (total_output_size > 0) { - task->packed_buffer_base = orch->pto2_alloc_packed_buffer(total_output_size); - task->packed_buffer_end = (char*)task->packed_buffer_base + total_output_size; - - // Offsets: each output at 1024B-aligned slot; slot size = ALIGN_UP(size, 1024) - // Allocation happens here only; no memcpy of buffer content. Caller's tensor gets addr written back. - int32_t offset = 0; - for (int i = 0; i < task->param_count; i++) { - PTOParam& p = task->params[i]; - if (p.type == PTOParamType::OUTPUT) { - auto &tensor_data = p.tensor.data(); - if (tensor_data.buffer.addr == 0) { - uint64_t alloc_addr = reinterpret_cast((char*)task->packed_buffer_base + offset); - tensor_data.buffer.addr = alloc_addr; - offset += PTO2_ALIGN_UP(tensor_data.buffer.size, PTO2_PACKED_OUTPUT_ALIGN); - } - } - } - } - - CYCLE_COUNT_LAP_RECORD(g_orch_heap_cycle, AicpuPhaseId::ORCH_HEAP); // === STEP 4: Second pass - register outputs in TensorMap === for (int i = 0; i < num_params; i++) { @@ -429,35 +403,44 @@ void pto2_submit_task(PTO2OrchestratorState* orch, // === STEP 5: Finalize fanin list === // First build the fanin list - for (int i = 0; i < fanin_count; i++) { - task->fanin_head = pto2_dep_list_prepend(&orch->dep_pool, task->fanin_head, fanin_temp[i]); + if (orch->scheduler) { + PTO2SchedulerState* sched = orch->scheduler; + int32_t slot = sched->pto2_task_slot(task_id); + + int32_t early_finished = 0; + task->fanin_count = fanin_count + 1; // +1 redundance for not being ready too early + for (int i = 0; i < fanin_count; i++) { + int32_t producer_task_id = fanin_temp[i]; + // Add this task to producer's fanout list (with spinlock) + PTO2TaskDescriptor* producer = pto2_task_ring_get(&orch->task_ring, producer_task_id); + pto2_fanout_lock(producer); + producer->fanout_head = pto2_dep_list_prepend(&orch->dep_pool, producer->fanout_head, task_id); + producer->fanout_count++; + // Normal path: prepend consumer to producer's fanout list + task->fanin_head = pto2_dep_list_prepend(&orch->dep_pool, task->fanin_head, producer_task_id); + + int32_t prod_slot = sched->pto2_task_slot(producer_task_id); + int32_t prod_state = __atomic_load_n(&sched->task_state[prod_slot], __ATOMIC_ACQUIRE); + if (prod_state >= PTO2_TASK_COMPLETED) { + early_finished++; + } + pto2_fanout_unlock(producer); + } + if (early_finished > 0) { + __atomic_fetch_add(&sched->fanin_refcount[slot], early_finished, __ATOMIC_SEQ_CST); + } + } else { + // No scheduler: just build fanin list + add to producers using pto2_add_consumer_to_producer + for (int i = 0; i < fanin_count; i++) { + task->fanin_head = pto2_dep_list_prepend(&orch->dep_pool, task->fanin_head, fanin_temp[i]); + PTO2TaskDescriptor* producer = pto2_task_ring_get(&orch->task_ring, fanin_temp[i]); + pto2_add_consumer_to_producer(orch, producer, fanin_temp[i], task_id); + } + __atomic_store_n(&task->fanin_count, fanin_count, __ATOMIC_SEQ_CST); } - // SEQ_CST store: participates in the global total order with Phase 1's SEQ_CST - // fetch_add on s_pto2_fanin_refcount to prevent the IRIW hazard on ARM. - // (See comment above the fetch_add in aicpu_executor.cpp Phase 1 for details.) - __atomic_store_n(&task->fanin_count, fanin_count, __ATOMIC_SEQ_CST); CYCLE_COUNT_LAP_RECORD(g_orch_fanin_cycle, AicpuPhaseId::ORCH_FANIN); - // === STEP 5b: Check if task is already ready (all producers completed via early-return) === - // In AICPU parallel mode, early-return in pto2_add_consumer_to_producer may have - // already incremented aicpu_fanin_refcount for this task. Now that fanin_count is - // finalized, check if the task is already satisfied and push it to the orchestrator - // ready queue so scheduler threads can pick it up without an O(N) scan. - if (orch->aicpu_fanin_refcount && fanin_count > 0) { - int32_t slot = task_id & orch->aicpu_window_mask; - int32_t refcount = __atomic_load_n(&orch->aicpu_fanin_refcount[slot], __ATOMIC_SEQ_CST); - if (refcount >= fanin_count) { - // All producers already completed — push to orch ready queue - int32_t tail = orch->orch_ready_tail; - int32_t capacity = PTO2OrchestratorState::ORCH_READY_QUEUE_SIZE; - int32_t head = __atomic_load_n(&orch->orch_ready_head, __ATOMIC_ACQUIRE); - if (((tail + 1) & (capacity - 1)) != (head & (capacity - 1))) { - orch->orch_ready_queue[tail & (capacity - 1)] = task_id; - __atomic_store_n(&orch->orch_ready_tail, tail + 1, __ATOMIC_RELEASE); - } - } - } // === STEP 6: Initialize task in scheduler === // In multi-threaded mode, scheduler thread handles task initialization via polling diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h b/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h index 1185b9865..129c24243 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h @@ -82,18 +82,6 @@ struct PTO2OrchestratorState { int32_t* aicpu_completed_by_task; // task_id that set the completed state (for slot-reuse validation) int32_t aicpu_window_mask; - // === ORCHESTRATOR READY QUEUE (early-return path → scheduler) === - // When the orchestrator discovers a producer already completed, it - // increments the consumer's refcount directly. If that makes the - // consumer ready, the consumer_id is pushed here so scheduler threads - // can pick it up without an O(N) scan. - // SPSC-ish ring: orchestrator writes (single producer), scheduler - // threads read via CAS on orch_ready_head (multiple consumers). - static constexpr int32_t ORCH_READY_QUEUE_SIZE = 4096; - volatile int32_t orch_ready_queue[4096]; - volatile int32_t orch_ready_tail; // written by orchestrator only - volatile int32_t orch_ready_head; // advanced by scheduler via CAS - /** * Allocate packed output buffer for a task */ @@ -229,17 +217,6 @@ void pto2_orchestrator_wait_all(PTO2OrchestratorState* orch); */ bool pto2_orchestrator_has_space(PTO2OrchestratorState* orch); -// ============================================================================= -// Internal Helpers -// ============================================================================= - -/** - * Add consumer to producer's fanout list (with spinlock) - * Also checks if producer has already completed and updates consumer's fanin_refcount - */ -void pto2_add_consumer_to_producer( - PTO2OrchestratorState* orch, PTO2TaskDescriptor* producer, int32_t producer_id, int32_t consumer_id); - // ============================================================================= // Debug Utilities // ============================================================================= diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp b/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp index 34c922e2d..988aa5877 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp @@ -113,6 +113,7 @@ bool pto2_scheduler_init(PTO2SchedulerState* sched, // Initialize local copies of ring pointers sched->last_task_alive = 0; + sched->last_heap_consumed = 0; sched->heap_tail = 0; // Allocate per-task state arrays (dynamically sized based on runtime window_size) @@ -174,8 +175,8 @@ void pto2_scheduler_destroy(PTO2SchedulerState* sched) { void pto2_scheduler_reset(PTO2SchedulerState* sched) { sched->last_task_alive = 0; + sched->last_heap_consumed = 0; sched->heap_tail = 0; - memset(sched->task_state, 0, sched->task_window_size * sizeof(PTO2TaskState)); memset(sched->fanin_refcount, 0, sched->task_window_size * sizeof(int32_t)); memset(sched->fanout_refcount, 0, sched->task_window_size * sizeof(int32_t)); @@ -202,50 +203,6 @@ int32_t pto2_scheduler_get_ready_task(PTO2SchedulerState* sched, // Task Completion Handling // ============================================================================= -/** - * Check if task can transition to CONSUMED and handle if so - * - * NOTE: fanout_refcount is accessed atomically because it can be modified - * by both orchestrator thread (via scope_end) and scheduler thread (via task_complete). - */ -static void check_and_handle_consumed(PTO2SchedulerState* sched, - int32_t task_id, - PTO2TaskDescriptor* task) { - int32_t slot = sched->pto2_task_slot(task_id); - - // Read fanout_count (set by orchestrator, only grows) - int32_t fanout_count = __atomic_load_n(&task->fanout_count, __ATOMIC_ACQUIRE); - - // Read fanout_refcount atomically (modified by both orchestrator and scheduler threads) - int32_t refcount = __atomic_load_n(&sched->fanout_refcount[slot], __ATOMIC_ACQUIRE); - - if (refcount != fanout_count) { - return; // Not all references released yet - } - - // Use CAS to atomically transition COMPLETED -> CONSUMED - // This prevents multiple threads from transitioning the same task - PTO2TaskState expected = PTO2_TASK_COMPLETED; - if (!__atomic_compare_exchange_n(&sched->task_state[slot], &expected, PTO2_TASK_CONSUMED, - false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) { - // CAS failed - either not COMPLETED or another thread already transitioned - return; - } - - // Successfully transitioned to CONSUMED - __atomic_fetch_add(&sched->tasks_consumed, 1, __ATOMIC_RELAXED); - - // Reset refcounts for slot reuse (ring buffer will reuse this slot) - // Use atomic store for fanout_refcount - __atomic_store_n(&sched->fanout_refcount[slot], 0, __ATOMIC_RELEASE); - __atomic_store_n(&sched->fanin_refcount[slot], 0, __ATOMIC_RELEASE); - - // Try to advance ring pointers - if (task_id == sched->last_task_alive) { - pto2_scheduler_advance_ring_pointers(sched); // RISK: Multiple entries - } -} - void pto2_scheduler_on_task_complete(PTO2SchedulerState* sched, int32_t task_id) { int32_t slot = sched->pto2_task_slot(task_id); PTO2TaskDescriptor* task = pto2_sm_get_task(sched->sm_handle, task_id); @@ -261,7 +218,7 @@ void pto2_scheduler_on_task_complete(PTO2SchedulerState* sched, int32_t task_id) int32_t fanout_head = PTO2_LOAD_ACQUIRE(&task->fanout_head); pto2_fanout_unlock(task); - // Traverse fanout chain OUTSIDE the lock to avoid blocking orchestrator + // Traverse fanout chain OUTSIDE the lock to notify consumers int32_t current = fanout_head; while (current > 0) { PTO2DepListEntry* entry = pto2_dep_pool_get(sched->dep_pool, current); @@ -276,40 +233,54 @@ void pto2_scheduler_on_task_complete(PTO2SchedulerState* sched, int32_t task_id) current = entry->next_offset; } - // === STEP 2: Update fanout_refcount of all producers === - // This task is a consumer of its fanin producers - release references - current = task->fanin_head; - - while (current > 0) { - PTO2DepListEntry* entry = pto2_dep_pool_get(sched->dep_pool, current); - if (!entry) break; - - int32_t producer_id = entry->task_id; - pto2_scheduler_release_producer(sched, producer_id); + // === STEP 2: Mark CONSUMED and CAS-advance ring pointers === + // Mark this task as fully processed. Once CONSUMED is visible, the CAS loop + // below (or another thread's) can advance last_task_alive past this slot. + __atomic_store_n(&sched->task_state[slot], PTO2_TASK_CONSUMED, __ATOMIC_RELEASE); + __atomic_fetch_add(&sched->tasks_consumed, 1, __ATOMIC_RELAXED); - current = entry->next_offset; + // CAS-based lock-free advancement of last_task_alive (matches pre-migration logic). + // Multiple threads race to advance; CAS serializes winners. + PTO2SharedMemoryHeader* header = sched->sm_handle->header; + int32_t la = PTO2_LOAD_ACQUIRE(&header->last_task_alive); + int32_t cti = PTO2_LOAD_ACQUIRE(&header->current_task_index); + + while (la < cti) { + int32_t la_slot = la & sched->task_window_mask; + if (__atomic_load_n(&sched->task_state[la_slot], __ATOMIC_ACQUIRE) != PTO2_TASK_CONSUMED) + break; + + // Reset fanin_refcount before exposing slot for reuse + __atomic_store_n(&sched->fanin_refcount[la_slot], 0, __ATOMIC_RELEASE); + + // Atomically advance last_task_alive by 1 + int32_t expected = la; + if (__atomic_compare_exchange_n(&header->last_task_alive, &expected, la + 1, + false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) { + // Ticket-based heap_tail serialization: wait for our turn, then write + while (__atomic_load_n(&header->heap_tail_gen, __ATOMIC_ACQUIRE) != la) { + PTO2_SPIN_PAUSE_LIGHT(); + } + PTO2TaskDescriptor* consumed_t = pto2_sm_get_task(sched->sm_handle, la); + if (consumed_t->packed_buffer_end != NULL) { + uint64_t new_tail = (uint64_t)((char*)consumed_t->packed_buffer_end - (char*)sched->heap_base); + PTO2_STORE_RELEASE(&header->heap_tail, new_tail); + } + PTO2_STORE_RELEASE(&header->heap_tail_gen, la + 1); + la = la + 1; + } else { + break; + } } - - // === STEP 3: Check if this task can transition to CONSUMED === - check_and_handle_consumed(sched, task_id, task); } void pto2_scheduler_on_scope_end(PTO2SchedulerState* sched, const int32_t* task_ids, int32_t count) { - for (int32_t i = 0; i < count; i++) { - pto2_scheduler_release_producer(sched, task_ids[i]); - } -} - -void pto2_scheduler_release_producer(PTO2SchedulerState* sched, int32_t producer_id) { - int32_t slot = sched->pto2_task_slot(producer_id); - PTO2TaskDescriptor* producer = pto2_sm_get_task(sched->sm_handle, producer_id); - - // Increment fanout_refcount atomically (called from both orchestrator and scheduler threads) - __atomic_fetch_add(&sched->fanout_refcount[slot], 1, __ATOMIC_ACQ_REL); - - // Check if producer can transition to CONSUMED - check_and_handle_consumed(sched, producer_id, producer); + // Scope references are no longer on the critical path for ring advancement. + // Tasks transition to CONSUMED directly in on_task_complete. + (void)sched; + (void)task_ids; + (void)count; } // ============================================================================= @@ -317,41 +288,14 @@ void pto2_scheduler_release_producer(PTO2SchedulerState* sched, int32_t producer // ============================================================================= void pto2_scheduler_advance_ring_pointers(PTO2SchedulerState* sched) { - PTO2SharedMemoryHeader* header = sched->sm_handle->header; - int32_t current_task_index = PTO2_LOAD_ACQUIRE(&header->current_task_index); - - // Advance last_task_alive while tasks at that position are CONSUMED - while (sched->last_task_alive < current_task_index) { - int32_t slot = sched->pto2_task_slot(sched->last_task_alive); - - if (sched->task_state[slot] != PTO2_TASK_CONSUMED) { - break; // Found non-consumed task, stop advancing - } - - sched->last_task_alive++; - } - - // Update heap_tail based on last consumed task's buffer - if (sched->last_task_alive > 0) { - int32_t last_consumed_id = sched->last_task_alive - 1; - PTO2TaskDescriptor* last_consumed = pto2_sm_get_task(sched->sm_handle, last_consumed_id); - - if (last_consumed->packed_buffer_end != NULL) { - sched->heap_tail = (uint64_t)((char*)last_consumed->packed_buffer_end - (char*)sched->heap_base); - } - } - - // Write to shared memory for orchestrator flow control - pto2_scheduler_sync_to_sm(sched); + // Ring advancement is now handled inline by the CAS loop in on_task_complete. + // This function is retained for API compatibility but is a no-op. + (void)sched; } void pto2_scheduler_sync_to_sm(PTO2SchedulerState* sched) { - PTO2SharedMemoryHeader* header = sched->sm_handle->header; - - PTO2_STORE_RELEASE(&header->last_task_alive, sched->last_task_alive); - PTO2_STORE_RELEASE(&header->heap_tail, sched->heap_tail); - // Keep generation in sync so AICPU mode sees a consistent starting state - PTO2_STORE_RELEASE(&header->heap_tail_gen, sched->last_task_alive); + // Sync is now handled inline by the CAS loop in on_task_complete. + (void)sched; } // ============================================================================= @@ -367,9 +311,10 @@ bool pto2_scheduler_is_done(PTO2SchedulerState* sched) { return false; } - // Check if all tasks have been consumed + // Check if all tasks have been consumed (read directly from shared memory) int32_t current_task_index = PTO2_LOAD_ACQUIRE(&header->current_task_index); - return sched->last_task_alive >= current_task_index; + int32_t last_alive = PTO2_LOAD_ACQUIRE(&header->last_task_alive); + return last_alive >= current_task_index; } // ============================================================================= diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h b/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h index 1a294d410..8efd2631e 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h @@ -60,7 +60,8 @@ struct PTO2SchedulerState { PTO2SharedMemoryHandle* sm_handle; // Local copies of ring pointers (written to shared memory after update) - int32_t last_task_alive; // Task ring tail + int32_t last_task_alive; // Task ring tail (advances on COMPLETED for slot reuse) + int32_t last_heap_consumed; // Heap watermark (advances on CONSUMED for buffer reuse) uint64_t heap_tail; // Heap ring tail (offset from heap_base) // Heap base address (for converting absolute pointers to offsets) @@ -268,16 +269,6 @@ void pto2_scheduler_on_task_complete(PTO2SchedulerState* sched, int32_t task_id) void pto2_scheduler_on_scope_end(PTO2SchedulerState* sched, const int32_t* task_ids, int32_t count); -/** - * Increment fanout_refcount and check CONSUMED - * - * Used when consumer completes or scope ends. - * - * @param sched Scheduler state - * @param producer_id Producer task ID - */ -void pto2_scheduler_release_producer(PTO2SchedulerState* sched, int32_t producer_id); - // ============================================================================= // Ring Pointer Management // =============================================================================