From 3be34a0dc81315ed22ad1135b291ed19f8c4500c Mon Sep 17 00:00:00 2001 From: wcwxy <26245345+ChaoWao@users.noreply.github.com> Date: Tue, 3 Mar 2026 10:24:30 +0800 Subject: [PATCH] Refactor: migrate executor to scheduler API with lock-free orch_pending - Replace inline scheduling logic in aicpu_executor (ready queue, SpinLock, fanout traversal) with scheduler API calls (get_ready_task, on_task_complete) - Restructure orchestrator STEP 5 fanin path to use scheduler task_state for early-return optimization with +1 fanin protocol - Add lock-free orch_pending queue (SPSC producer, MPMC consumer) so orchestrator never touches ready_queue spinlock in init_task - Split release_fanin_and_check_ready into mark_ready (CAS only) and check_ready (CAS + enqueue) for orch vs scheduler paths - Reorder scheduler loop: complete -> scan (drain orch_pending) -> dispatch -> idle, ensuring pending tasks dispatch same iteration - Add scan_enqueue_count profiling counter with [enqueue: N] output - Rename SCHED_EARLY_READY phase to SCHED_IDLE_WAIT for clarity - Move PTO-ISA fallback retry into ci.sh: -c flag specifies fallback commit, first failure pins and retries, subsequent examples reuse - Update sched_overhead_analysis.py parser for new log format --- .github/workflows/ci.yml | 10 +- ci.sh | 33 +- src/platform/include/common/perf_profiling.h | 2 +- .../src/host/performance_collector.cpp | 2 +- .../aicpu/aicpu_executor.cpp | 695 ++++-------------- .../runtime/pto_orchestrator.cpp | 142 ++-- .../runtime/pto_orchestrator.h | 23 - .../runtime/pto_scheduler.cpp | 33 +- .../runtime/pto_scheduler.h | 91 ++- tools/sched_overhead_analysis.py | 217 ++---- tools/swimlane_converter.py | 2 +- 11 files changed, 423 insertions(+), 827 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6be51d6fb..210362783 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,15 +57,7 @@ jobs: pip install torch --index-url https://download.pytorch.org/whl/cpu - name: Run simulation examples - id: sim - run: ./ci.sh -p a2a3sim - continue-on-error: true - - - name: Retry sim with pinned PTO-ISA on failure - if: steps.sim.outcome == 'failure' - run: | - rm -rf examples/scripts/_deps/pto-isa - ./ci.sh -p a2a3sim --pto-isa-commit 1b22fea + run: ./ci.sh -p a2a3sim -c 1b22fea run-example-on-device: runs-on: self-hosted diff --git a/ci.sh b/ci.sh index e9994a3a6..5681a85c8 100755 --- a/ci.sh +++ b/ci.sh @@ -92,11 +92,25 @@ cleanup() { trap cleanup INT TERM trap 'rm -rf "$LOG_DIR"' EXIT -# Build commit flag for run_example.py +# commit_flag starts empty (try latest PTO-ISA first). +# If -c is given AND a test fails, pin_pto_isa_on_failure sets commit_flag. commit_flag=() -if [[ -n "$PTO_ISA_COMMIT" ]]; then + +# Pin PTO-ISA to the specified commit on first failure. +# On first failure: cleans cached clone, sets commit_flag, returns 0 (caller retries). +# On subsequent failures (already pinned): returns 1 (real failure). +pin_pto_isa_on_failure() { + if [[ -z "$PTO_ISA_COMMIT" ]]; then + return 1 # No fallback commit configured + fi + if [[ ${#commit_flag[@]} -gt 0 ]]; then + return 1 # Already pinned, real failure + fi + echo "[CI] First failure detected, pinning PTO-ISA to commit $PTO_ISA_COMMIT" + rm -rf examples/scripts/_deps/pto-isa commit_flag=(-c "$PTO_ISA_COMMIT") -fi + return 0 # Pinned, caller should retry +} # ---- Discover all tasks ---- EXAMPLES_DIR="examples" @@ -215,7 +229,7 @@ if [[ "$PARALLEL" == "false" ]]; then fi done done - # SIM tasks + # SIM tasks (with pin-on-first-failure for PTO-ISA) for i in "${!SIM_TASK_NAMES[@]}"; do name="${SIM_TASK_NAMES[$i]}" dir="${SIM_TASK_DIRS[$i]}" @@ -226,6 +240,15 @@ if [[ "$PARALLEL" == "false" ]]; then -k "${dir}/kernels" -g "${dir}/golden.py" \ -p a2a3sim "${commit_flag[@]}"; then echo "${name}:a2a3sim|PASS" >> "$RESULTS_FILE" + elif pin_pto_isa_on_failure; then + echo "[CI] Retrying: $name with pinned PTO-ISA" + if python examples/scripts/run_example.py \ + -k "${dir}/kernels" -g "${dir}/golden.py" \ + -p a2a3sim "${commit_flag[@]}"; then + echo "${name}:a2a3sim|PASS" >> "$RESULTS_FILE" + else + echo "${name}:a2a3sim|FAIL" >> "$RESULTS_FILE" + fi else echo "${name}:a2a3sim|FAIL" >> "$RESULTS_FILE" fi @@ -245,7 +268,7 @@ else echo "Running: $name (a2a3sim)" echo "========================================" if python examples/scripts/run_example.py \ - -k "${dir}/kernels" -g "${dir}/golden.py" -p a2a3sim $COMMIT_FLAG; then + -k "${dir}/kernels" -g "${dir}/golden.py" -p a2a3sim "${commit_flag[@]}"; then echo "${name}:a2a3sim|PASS" >> "$RESULTS_FILE" else echo "${name}:a2a3sim|FAIL" >> "$RESULTS_FILE" diff --git a/src/platform/include/common/perf_profiling.h b/src/platform/include/common/perf_profiling.h index 596f52a16..298e336ba 100644 --- a/src/platform/include/common/perf_profiling.h +++ b/src/platform/include/common/perf_profiling.h @@ -205,7 +205,7 @@ enum class AicpuPhaseId : uint32_t { SCHED_COMPLETE = 0, // Process completed tasks (fanout traversal) SCHED_DISPATCH = 1, // Dispatch ready tasks to idle cores SCHED_SCAN = 2, // Incremental scan for root tasks - SCHED_EARLY_READY = 3, // Drain orchestrator's early-ready queue + SCHED_IDLE_WAIT = 3, // Idle time (no progress made) // Orchestrator phases (16-24) ORCH_SYNC = 16, // tensormap sync ORCH_ALLOC = 17, // task_ring_alloc diff --git a/src/platform/src/host/performance_collector.cpp b/src/platform/src/host/performance_collector.cpp index ea7a86cab..fb3b95fda 100644 --- a/src/platform/src/host/performance_collector.cpp +++ b/src/platform/src/host/performance_collector.cpp @@ -511,7 +511,7 @@ int PerformanceCollector::export_swimlane_json(const std::string& output_path) { case AicpuPhaseId::SCHED_COMPLETE: phase_name = "complete"; break; case AicpuPhaseId::SCHED_DISPATCH: phase_name = "dispatch"; break; case AicpuPhaseId::SCHED_SCAN: phase_name = "scan"; break; - case AicpuPhaseId::SCHED_EARLY_READY: phase_name = "early_ready"; break; + case AicpuPhaseId::SCHED_IDLE_WAIT: phase_name = "idle"; break; default: break; } diff --git a/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index 8bed0cbe7..5e50b9ebb 100644 --- a/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -63,19 +63,6 @@ constexpr int MAX_AIC_PER_THREAD = PLATFORM_MAX_AIC_PER_THREAD; constexpr int MAX_AIV_PER_THREAD = PLATFORM_MAX_AIV_PER_THREAD; constexpr int MAX_CORES_PER_THREAD = PLATFORM_MAX_CORES_PER_THREAD; -// Maximum tasks for ready queue (PTO2 mode uses shared memory task count) -constexpr int AICPU_MAX_READY_TASKS = 16384; -constexpr int AICPU_READY_MASK = AICPU_MAX_READY_TASKS - 1; -// One shard per scheduler thread: push to own shard (thread_idx % shards), pop own first + work stealing -// Runtime-configurable via env var PTO2_READY_QUEUE_SHARDS (1..MAX_AICPU_THREADS). Default=3. - -// Lightweight spinlock (avoids futex syscall overhead of std::mutex) -struct SpinLock { - std::atomic flag{0}; - void lock() { while (flag.exchange(1, std::memory_order_acquire) != 0) { PTO2_SPIN_PAUSE_LIGHT(); } } - void unlock() { flag.store(0, std::memory_order_release); } -}; - // Core information for discovery (with register address for fast dispatch) struct CoreInfo { int worker_id; // Index in runtime.workers[] @@ -84,7 +71,11 @@ struct CoreInfo { CoreType core_type; }; + +static PTO2Runtime *rt{nullptr}; + struct AicpuExecutor { + // ===== Thread management state ===== std::atomic thread_idx_{0}; std::atomic initialized_{false}; @@ -113,18 +104,7 @@ struct AicpuExecutor { // Track executing task_id per core (AICPU_TASK_INVALID = idle) int executing_task_ids_[MAX_CORES_PER_THREAD]; - // ===== N shards per type: push to own shard (thread_idx % N), pop own first + work stealing ===== - // active_shards_ is set at runtime (1..MAX_AICPU_THREADS) via env PTO2_READY_QUEUE_SHARDS - int active_shards_{3}; - SpinLock ready_queue_aic_lock_[MAX_AICPU_THREADS]; - int ready_queue_aic_[MAX_AICPU_THREADS][AICPU_MAX_READY_TASKS]; - int ready_queue_aic_head_[MAX_AICPU_THREADS]{0}; - int ready_queue_aic_tail_[MAX_AICPU_THREADS]{0}; - - SpinLock ready_queue_aiv_lock_[MAX_AICPU_THREADS]; - int ready_queue_aiv_[MAX_AICPU_THREADS][AICPU_MAX_READY_TASKS]; - int ready_queue_aiv_head_[MAX_AICPU_THREADS]{0}; - int ready_queue_aiv_tail_[MAX_AICPU_THREADS]{0}; + // ===== Task queue state (managed by scheduler ready queues) ===== // Task execution tracking std::atomic completed_tasks_{0}; @@ -133,16 +113,8 @@ struct AicpuExecutor { // Device orchestration: set by Thread 3 when graph is built; workers wait for it std::atomic orchestrator_done_{false}; std::atomic pto2_init_done_{false}; + std::atomic runtime_init_ready_{false}; std::atomic pto2_init_complete_{false}; // init block finished; others wait for this - std::atomic next_scan_index_{0}; - std::atomic sm_header_ready_{false}; // Thread 3 sets after SM header init - std::atomic orch_pointers_ready_{false}; // Thread 3 sets after aicpu parallel mode pointers + orch_ready_queue are configured - - // Orchestrator ready queue pointers (set by Thread 3, read by scheduler threads) - volatile int32_t* orch_ready_queue_{nullptr}; - volatile int32_t* orch_ready_tail_{nullptr}; - volatile int32_t* orch_ready_head_{nullptr}; - int32_t orch_ready_capacity_{0}; // Orchestration SO handle - defer dlclose until all tasks complete void* orch_so_handle_{nullptr}; @@ -162,69 +134,15 @@ struct AicpuExecutor { void deinit(); void diagnose_stuck_state(Runtime* runtime, int thread_idx, const int* cur_thread_cores, int core_num, Handshake* hank); - -private: - // Helper: enqueue a ready task to the appropriate shard with profiling - inline void enqueue_ready_task_with_profiling( - int32_t task_id, - int32_t worker_type, - int thread_idx -#if PTO2_ORCH_PROFILING - , uint64_t& wait_counter, - uint64_t& hold_counter -#endif - ); }; static AicpuExecutor g_aicpu_executor; -// PTO2 device-mode state (shared memory view + per-task fanin refcount) -static constexpr int PTO2_MAX_SLOTS = PTO2_TASK_WINDOW_SIZE; -static int s_pto2_fanin_refcount[PTO2_MAX_SLOTS]; -static volatile int32_t s_pto2_task_completed[PTO2_MAX_SLOTS]; -static int32_t s_pto2_completed_by_task[PTO2_MAX_SLOTS]; // task_id that set completed state (for slot-reuse validation) +// PTO2 device-mode state (per-core dispatch payloads) static PTO2DispatchPayload s_pto2_payload_per_core[RUNTIME_MAX_WORKER]; // ===== AicpuExecutor Method Implementations ===== -// Helper: enqueue a ready task to the appropriate shard with profiling -inline void AicpuExecutor::enqueue_ready_task_with_profiling( - int32_t task_id, - int32_t worker_type, - int thread_idx -#if PTO2_ORCH_PROFILING - , uint64_t& wait_counter, - uint64_t& hold_counter -#endif -) { - int my_shard = thread_idx % active_shards_; -#if PTO2_ORCH_PROFILING - uint64_t _l0 = get_sys_cnt_aicpu(), _l1, _l2; -#endif - - if (worker_type == PTO2_WORKER_CUBE) { - ready_queue_aic_lock_[my_shard].lock(); -#if PTO2_ORCH_PROFILING - _l1 = get_sys_cnt_aicpu(); -#endif - ready_queue_aic_[my_shard][ready_queue_aic_tail_[my_shard]++ & AICPU_READY_MASK] = task_id; - ready_queue_aic_lock_[my_shard].unlock(); - } else { - ready_queue_aiv_lock_[my_shard].lock(); -#if PTO2_ORCH_PROFILING - _l1 = get_sys_cnt_aicpu(); -#endif - ready_queue_aiv_[my_shard][ready_queue_aiv_tail_[my_shard]++ & AICPU_READY_MASK] = task_id; - ready_queue_aiv_lock_[my_shard].unlock(); - } - -#if PTO2_ORCH_PROFILING - _l2 = get_sys_cnt_aicpu(); - wait_counter += (_l1 - _l0); - hold_counter += (_l2 - _l1); -#endif -} - /** * Handshake with all cores and discover their types * Sets up register addresses for fast dispatch. @@ -397,17 +315,7 @@ int AicpuExecutor::init(Runtime* runtime) { DEV_INFO("Init: orch_built_on_host=%d", orch_on_host ? 1 : 0); orchestrator_done_.store(orch_on_host, std::memory_order_release); - // Read ready queue shard count from Runtime (already validated by host) - active_shards_ = runtime->ready_queue_shards; - DEV_ALWAYS("Ready queue shards: %d (max=%d)", active_shards_, MAX_AICPU_THREADS); - - // Initial ready tasks will be populated from PTO2 shared memory in resolve_and_dispatch_pto2 - for (int s = 0; s < MAX_AICPU_THREADS; s++) { - ready_queue_aic_head_[s] = 0; - ready_queue_aic_tail_[s] = 0; - ready_queue_aiv_head_[s] = 0; - ready_queue_aiv_tail_[s] = 0; - } + // Initial ready tasks will be populated via scheduler ready queues // Reset per-core dispatch timestamps and task counters for (int i = 0; i < RUNTIME_MAX_WORKER; i++) { @@ -485,14 +393,7 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } DEV_INFO("Thread %d: sm_base=%p", thread_idx, sm_base); - // Device orchestration: wait for last thread to initialize SM header - if (thread_num_ > 1 && !runtime->get_orch_built_on_host()) { - while (!sm_header_ready_.load(std::memory_order_acquire)) { - } - } - PTO2SharedMemoryHeader* header = static_cast(sm_base); - void* gm_heap_base = runtime->get_pto2_gm_heap_ptr(); // For heap_tail offset calc DEV_INFO("Thread %d: header=%p, task_desc_offset=%d, dep_pool_offset=%d, window_size=%d", thread_idx, (void*)header, header->task_descriptors_offset, header->dep_list_pool_offset, header->task_window_size); @@ -505,19 +406,16 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, thread_idx, (void*)task_descriptors, (void*)dep_list_pool); int32_t window_size = header->task_window_size; - if (window_size <= 0 || window_size > PTO2_MAX_SLOTS) window_size = PTO2_MAX_SLOTS; + if (window_size <= 0 || window_size > PTO2_TASK_WINDOW_SIZE) window_size = PTO2_TASK_WINDOW_SIZE; int32_t window_mask = window_size - 1; Handshake* hank = static_cast(runtime->workers); DEV_INFO("Thread %d: hank=%p, window_size=%d", thread_idx, (void*)hank, window_size); - // One-time init: clear refcount and completed arrays (one thread does it; others wait) + // One-time init: assign perf buffers (one thread does it; others wait) if (!pto2_init_done_.exchange(true, std::memory_order_acq_rel)) { DEV_INFO("Thread %d: doing one-time init", thread_idx); - std::memset(s_pto2_fanin_refcount, 0, sizeof(s_pto2_fanin_refcount)); - std::memset((void*)s_pto2_task_completed, 0, sizeof(s_pto2_task_completed)); - std::memset(s_pto2_completed_by_task, -1, sizeof(s_pto2_completed_by_task)); // Assign perf buffers to cores early so profiling captures all tasks // (total_tasks written to header later when orchestrator completes) @@ -533,13 +431,6 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, pto2_init_complete_.store(true, std::memory_order_release); } else { while (!pto2_init_complete_.load(std::memory_order_acquire)) { - } - } - - // Wait for last thread to finish setting up aicpu parallel mode pointers - // and orch_ready_queue before entering the scheduling loop. - if (thread_num_ > 1 && !runtime->get_orch_built_on_host()) { - while (!orch_pointers_ready_.load(std::memory_order_acquire)) { std::this_thread::yield(); } } @@ -561,37 +452,28 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, // Scheduler profiling counters #if PTO2_ORCH_PROFILING uint64_t sched_scan_cycle = 0; - uint64_t sched_early_ready_cycle = 0; uint64_t sched_complete_cycle = 0; uint64_t sched_dispatch_cycle = 0; + uint64_t sched_idle_cycle = 0; uint64_t sched_loop_count = 0; - uint64_t sched_scan_ready_wait = 0, sched_scan_ready_hold = 0; - uint64_t sched_early_ready_wait = 0, sched_early_ready_hold = 0; - uint64_t sched_complete_ready_wait = 0, sched_complete_ready_hold = 0; - uint64_t sched_dispatch_hit_wait = 0, sched_dispatch_hit_hold = 0; - uint64_t sched_dispatch_miss_wait = 0, sched_dispatch_miss_hold = 0; - uint64_t ready_pop_own = 0, ready_pop_steal = 0; + uint64_t notify_edges_total = 0; + int32_t notify_max_degree = 0; + uint64_t notify_tasks_enqueued = 0; + uint64_t pop_hit = 0; + uint64_t pop_miss = 0; + uint64_t scan_enqueue_count = 0; #endif - // Phase profiling: per-phase task counters - uint32_t phase_complete_count = 0; - uint32_t phase_dispatch_count = 0; - uint32_t phase_scan_count = 0; - uint32_t phase_early_ready_count = 0; - // Fanout traversal statistics: how many downstream deps were notified after task completions - uint64_t fanout_edges_notified = 0; - int32_t fanout_max_degree = 0; while (true) { #if PTO2_ORCH_PROFILING sched_loop_count++; #endif CYCLE_COUNT_START(); - // Phase profiling: record start time for this iteration +#if PTO2_ORCH_PROFILING uint64_t _t0_phase = _t0; - phase_complete_count = 0; - phase_dispatch_count = 0; - phase_scan_count = 0; - phase_early_ready_count = 0; + uint32_t phase_complete_count = 0; + uint32_t phase_dispatch_count = 0; +#endif // Dynamic task_count (Thread 3 sets total_tasks_ when orchestration completes) int32_t task_count = total_tasks_.load(std::memory_order_acquire); bool orch_done = orchestrator_done_.load(std::memory_order_acquire); @@ -635,7 +517,17 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, PTO2DispatchPayload* payload = &s_pto2_payload_per_core[core_id]; int32_t task_id = executing_task_ids_[core_id]; + PTO2CompletionStats cstats = pto2_scheduler_on_task_complete(&rt->scheduler, task_id); executing_task_ids_[core_id] = AICPU_TASK_INVALID; +#if PTO2_ORCH_PROFILING + notify_edges_total += cstats.fanout_edges; + if (cstats.fanout_edges > notify_max_degree) { + notify_max_degree = cstats.fanout_edges; + } + notify_tasks_enqueued += cstats.tasks_enqueued; +#else + (void)cstats; +#endif // Write AICPU dispatch/finish timestamps into the PerfRecord if (profiling_enabled) { @@ -654,141 +546,15 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } } - PTO2TaskDescriptor* pto2_task = &task_descriptors[task_id & window_mask]; - DEV_DEBUG("Thread %d: Core %d completed PTO2 task %d", thread_idx, core_id, task_id); - // Mark completed (state=2), then snapshot fanout_head under the per-task spinlock. - // - // WHY THE LOCK IS REQUIRED (device orchestration / AICPU parallel mode): - // The orchestrator (Thread 3) runs concurrently with the scheduler threads and - // may still be adding consumers to this task's fanout list via - // pto2_add_consumer_to_producer(). That function holds fanout_lock while it - // (a) checks the completion state and (b) prepends to fanout_head. - // - // Without the lock here we have a TOCTOU race: - // 1. Orch acquires lock, checks state=0 (task still running), plans insert. - // 2. Task finishes; we store state=2 (RELEASE) but haven't acquired the lock. - // 3. Orch inserts consumer X into fanout_head, releases lock. - // 4. We read the OLD fanout_head (before X was inserted) → X is never woken. - // - // By acquiring the lock AFTER storing state=2 we guarantee mutual exclusion: - // • If Orch holds the lock first → it writes fanout_head → we read it with X. - // • If we acquire the lock first → Orch's subsequent lock-acquire sees state=2 - // via the release/acquire pair and takes the early-return path, directly - // incrementing X's fanin_refcount instead of touching fanout_head. - // Either way every consumer is accounted for exactly once. - __atomic_store_n(&s_pto2_completed_by_task[task_id & window_mask], task_id, __ATOMIC_RELEASE); - __atomic_store_n(&s_pto2_task_completed[task_id & window_mask], 2, __ATOMIC_RELEASE); - pto2_fanout_lock(pto2_task); - int32_t fanout_head = (int32_t)pto2_task->fanout_head; - pto2_fanout_unlock(pto2_task); - - // Traverse fanout (no lock) - // - // SEQ_CST on the refcount increment and fanin_count load breaks the IRIW - // (Independent Reads of Independent Writes) hazard with the orchestrator's - // Step 5 / Step 5b: - // - // Thread 0 (here): Thread 3 (orchestrator Step 5/5b): - // fetch_add(refcount, SEQ_CST) store(fanin_count=N, SEQ_CST) - // load(fanin_count, SEQ_CST) load(refcount, SEQ_CST) - // - // On ARM (IRIW is architecturally allowed with ACQ/REL), both threads could - // simultaneously read stale values — this thread sees fanin_count=0 and Step 5b - // sees refcount 0) { - fanout_len++; - PTO2DepListEntry* entry = &dep_list_pool[current]; - int32_t consumer_id = entry->task_id; - int32_t consumer_slot = consumer_id & window_mask; - int prev = __atomic_fetch_add(&s_pto2_fanin_refcount[consumer_slot], 1, __ATOMIC_SEQ_CST); - PTO2TaskDescriptor* consumer_desc = &task_descriptors[consumer_slot]; - int32_t fanin_count = __atomic_load_n(&consumer_desc->fanin_count, __ATOMIC_SEQ_CST); - if (prev + 1 == fanin_count) { - __atomic_store_n(&s_pto2_task_completed[consumer_slot], 1, __ATOMIC_RELEASE); - enqueue_ready_task_with_profiling( - consumer_id, consumer_desc->worker_type, thread_idx -#if PTO2_ORCH_PROFILING - , sched_complete_ready_wait, sched_complete_ready_hold -#endif - ); - } - current = entry->next_offset; - } - fanout_edges_notified += fanout_len; - if (fanout_len > fanout_max_degree) fanout_max_degree = fanout_len; - cur_thread_tasks_in_flight--; cur_thread_completed++; - phase_complete_count++; made_progress = true; +#if PTO2_ORCH_PROFILING + phase_complete_count++; +#endif completed_tasks_.fetch_add(1, std::memory_order_release); - - // Advance last_task_alive for TaskRing flow control. - // Mark this task as fully consumed (state=3), then try to - // advance the watermark using lock-free CAS. - // - // ORDERING: Mark completed as state=3 and reset refcount BEFORE advancing last_task_alive. - // Once last_task_alive advances past a slot, the orchestrator can - // immediately reuse it. The early-return path in - // pto2_add_consumer_to_producer checks aicpu_task_completed[prod_slot]; - // if we reset AFTER the CAS, the orchestrator could see stale state=3 - // from the old task and incorrectly skip dependency setup. - __atomic_store_n(&s_pto2_task_completed[task_id & window_mask], 3, __ATOMIC_RELEASE); - { - int32_t la = __atomic_load_n(&header->last_task_alive, __ATOMIC_ACQUIRE); - int32_t cti = __atomic_load_n(&header->current_task_index, __ATOMIC_ACQUIRE); - while (la < cti) { - int32_t la_slot = la & window_mask; - if (__atomic_load_n(&s_pto2_task_completed[la_slot], __ATOMIC_ACQUIRE) < 3) - break; - // Only reset refcount — the orchestrator's early-return path - // (pto2_add_consumer_to_producer) MUST see completed >= 2 when - // the producer has actually finished, per the fanout lock protocol. - // completed_by_task guards against stale state from recycled slots: - // the old task's completed_by_task won't match the new producer_id. - __atomic_store_n(&s_pto2_fanin_refcount[la_slot], 0, __ATOMIC_RELEASE); - // Advance last_task_alive to make this slot available. - int32_t expected = la; - if (__atomic_compare_exchange_n(&header->last_task_alive, &expected, la + 1, - false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) { - // Serialize heap_tail writes via ticket-based generation counter. - // Without this, concurrent CAS winners can interleave their - // heap_tail stores, causing stale regression (see design note below). - // - // DESIGN: heap_tail_gen tracks which task's tail was last written. - // Each CAS winner waits for gen==la (its ticket), writes heap_tail, - // then advances gen to la+1. The critical section is ~3 instructions, - // so the spin is effectively zero in the common (no-preemption) case. - while (__atomic_load_n(&header->heap_tail_gen, __ATOMIC_ACQUIRE) != la) { - } - - // Advance heap_tail for HeapRing flow control - PTO2TaskDescriptor* consumed_t = &task_descriptors[la_slot]; - if (consumed_t->packed_buffer_end != nullptr) { - uint64_t new_tail = (uint64_t)((char*)consumed_t->packed_buffer_end - (char*)gm_heap_base); - if (new_tail <= header->heap_size) { - __atomic_store_n(&header->heap_tail, new_tail, __ATOMIC_RELEASE); - } - } - - // Release next writer - __atomic_store_n(&header->heap_tail_gen, la + 1, __ATOMIC_RELEASE); - - la = la + 1; - } else { - break; - } - } - } - // Debug: periodic progress (thread 0 only) to find which task hangs if (thread_idx == 0 && task_count > 0) { int32_t c = completed_tasks_.load(std::memory_order_relaxed); @@ -808,86 +574,61 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } #endif - // Phase 2: Dispatch ready tasks to idle cores (register-based dispatch) - if (cur_thread_tasks_in_flight < core_num) { - for (int i = 0; i < core_num; i++) { - int core_id = cur_thread_cores[i]; - uint64_t reg_addr = core_id_to_reg_addr_[core_id]; - uint64_t reg_val = read_reg(reg_addr, RegId::COND); - int reg_state = EXTRACT_TASK_STATE(reg_val); - if (reg_state == TASK_FIN_STATE && executing_task_ids_[core_id] == AICPU_TASK_INVALID) { - Handshake* h = &hank[core_id]; - int32_t task_id = AICPU_TASK_INVALID; -#if PTO2_ORCH_PROFILING - bool found_task = false; - bool is_stolen = false; -#endif - int my_shard = thread_idx % active_shards_; - if (h->core_type == CoreType::AIC) { - for (int k = 0; k < active_shards_ && task_id < 0; k++) { - int shard = (my_shard + k) % active_shards_; -#if PTO2_ORCH_PROFILING - uint64_t _l0 = get_sys_cnt_aicpu(); -#endif - ready_queue_aic_lock_[shard].lock(); -#if PTO2_ORCH_PROFILING - uint64_t _l1 = get_sys_cnt_aicpu(); -#endif - if (ready_queue_aic_head_[shard] < ready_queue_aic_tail_[shard]) { - task_id = ready_queue_aic_[shard][ready_queue_aic_head_[shard]++ & AICPU_READY_MASK]; - ready_queue_aic_lock_[shard].unlock(); -#if PTO2_ORCH_PROFILING - uint64_t _l2 = get_sys_cnt_aicpu(); - sched_dispatch_hit_wait += (_l1 - _l0); - sched_dispatch_hit_hold += (_l2 - _l1); - found_task = true; - is_stolen = (k != 0); -#endif - break; - } - ready_queue_aic_lock_[shard].unlock(); -#if PTO2_ORCH_PROFILING - uint64_t _l2 = get_sys_cnt_aicpu(); - sched_dispatch_miss_wait += (_l1 - _l0); - sched_dispatch_miss_hold += (_l2 - _l1); -#endif - } - } else { - for (int k = 0; k < active_shards_ && task_id < 0; k++) { - int shard = (my_shard + k) % active_shards_; + // Phase 2: Scan (drain orch_pending into ready_queues + update perf header) + { #if PTO2_ORCH_PROFILING - uint64_t _l0 = get_sys_cnt_aicpu(); + uint32_t phase_scan_count = 0; #endif - ready_queue_aiv_lock_[shard].lock(); + // Drain orch_pending: move tasks enqueued by orchestrator into ready_queues + int32_t pending_task_id; + while (rt->scheduler.orch_pending_try_pop(&pending_task_id)) { + PTO2TaskDescriptor* pending_task = &task_descriptors[pending_task_id & window_mask]; + pto2_ready_queue_push(&rt->scheduler.ready_queues[pending_task->worker_type], pending_task_id); + made_progress = true; #if PTO2_ORCH_PROFILING - uint64_t _l1 = get_sys_cnt_aicpu(); + phase_scan_count++; + scan_enqueue_count++; #endif - if (ready_queue_aiv_head_[shard] < ready_queue_aiv_tail_[shard]) { - task_id = ready_queue_aiv_[shard][ready_queue_aiv_head_[shard]++ & AICPU_READY_MASK]; - ready_queue_aiv_lock_[shard].unlock(); + } + + // Update perf header total_tasks if visible tasks have changed + int32_t visible = __atomic_load_n(&header->current_task_index, __ATOMIC_ACQUIRE); + if (profiling_enabled && visible > 0 && visible != last_reported_task_count) { + perf_aicpu_update_total_tasks(runtime, static_cast(visible)); + + DEV_INFO("Thread %d: Updated perf total_tasks to %d%s", + thread_idx, visible, orch_done ? " (final)" : ""); + + last_reported_task_count = visible; + } #if PTO2_ORCH_PROFILING - uint64_t _l2 = get_sys_cnt_aicpu(); - sched_dispatch_hit_wait += (_l1 - _l0); - sched_dispatch_hit_hold += (_l2 - _l1); - found_task = true; - is_stolen = (k != 0); + (void)phase_scan_count; #endif - break; - } - ready_queue_aiv_lock_[shard].unlock(); + } + CYCLE_COUNT_LAP(sched_scan_cycle); #if PTO2_ORCH_PROFILING - uint64_t _l2 = get_sys_cnt_aicpu(); - sched_dispatch_miss_wait += (_l1 - _l0); - sched_dispatch_miss_hold += (_l2 - _l1); + if (profiling_enabled) { + perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_SCAN, + _t0_phase, _t1, static_cast(sched_loop_count), 0); + _t0_phase = _t1; + } #endif - } - } + + // Phase 3: Dispatch ready tasks to idle cores (register-based dispatch) + if (cur_thread_tasks_in_flight < core_num) { + for (int i = 0; i < core_num; i++) { + int core_id = cur_thread_cores[i]; + uint64_t reg_addr = core_id_to_reg_addr_[core_id]; + uint64_t reg_val = read_reg(reg_addr, RegId::COND); + int reg_state = EXTRACT_TASK_STATE(reg_val); + if (reg_state == TASK_FIN_STATE && executing_task_ids_[core_id] == AICPU_TASK_INVALID) { + Handshake* h = &hank[core_id]; + PTO2WorkerType wt = (h->core_type == CoreType::AIC) ? PTO2_WORKER_CUBE : PTO2_WORKER_VECTOR; + int32_t task_id = pto2_scheduler_get_ready_task(&rt->scheduler, wt); + if (task_id >= 0) { #if PTO2_ORCH_PROFILING - if (found_task) { - if (is_stolen) ready_pop_steal++; else ready_pop_own++; - } + pop_hit++; #endif - if (task_id >= 0) { PTO2TaskDescriptor* task = &task_descriptors[task_id & window_mask]; PTO2DispatchPayload* payload = &s_pto2_payload_per_core[core_id]; build_pto2_payload(payload, runtime, task, task_descriptors, dep_list_pool, window_size); @@ -900,13 +641,18 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } core_dispatch_counts_[core_id]++; } - write_reg(reg_addr, RegId::DATA_MAIN_BASE, static_cast(task_id + 1)); executing_task_ids_[core_id] = task_id; cur_thread_tasks_in_flight++; - phase_dispatch_count++; made_progress = true; +#if PTO2_ORCH_PROFILING + phase_dispatch_count++; +#endif DEV_DEBUG("Thread %d: Dispatching PTO2 task %d to core %d", thread_idx, task_id, core_id); + } else { +#if PTO2_ORCH_PROFILING + pop_miss++; +#endif } } } @@ -920,124 +666,36 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } #endif - // Incremental scan: discover root tasks (fanin_count == 0) - { - int32_t visible = __atomic_load_n(&header->current_task_index, __ATOMIC_ACQUIRE); - - // Update perf header total_tasks if visible tasks have changed - if (profiling_enabled && visible > 0 && visible != last_reported_task_count) { - perf_aicpu_update_total_tasks(runtime, static_cast(visible)); - - DEV_INFO("Thread %d: Updated perf total_tasks to %d%s", - thread_idx, visible, orch_done ? " (final)" : ""); - - last_reported_task_count = visible; - } - - while (true) { - int32_t idx = next_scan_index_.load(std::memory_order_acquire); - if (idx >= visible) break; - if (!next_scan_index_.compare_exchange_weak(idx, idx + 1, - std::memory_order_acq_rel, std::memory_order_acquire)) continue; - - int32_t slot = idx & window_mask; - - PTO2TaskDescriptor* t = &task_descriptors[slot]; - int32_t fanin_count = __atomic_load_n(&t->fanin_count, __ATOMIC_ACQUIRE); - if (fanin_count == 0) { - // Mark as enqueued (state=1) to prevent double-enqueue - __atomic_store_n(&s_pto2_task_completed[slot], 1, __ATOMIC_RELEASE); - enqueue_ready_task_with_profiling( - idx, t->worker_type, thread_idx -#if PTO2_ORCH_PROFILING - , sched_scan_ready_wait, sched_scan_ready_hold -#endif - ); - phase_scan_count++; - made_progress = true; - } - } - } - CYCLE_COUNT_LAP(sched_scan_cycle); -#if PTO2_ORCH_PROFILING - if (profiling_enabled) { - perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_SCAN, - _t0_phase, _t1, static_cast(sched_loop_count), phase_scan_count); - _t0_phase = _t1; - } -#endif - - // Early-ready drain: tasks whose deps were already met at submit time - // (orchestrator detected all producers completed → pushed to orch_ready_queue_) - if (orch_ready_queue_ != nullptr) { - while (true) { - int32_t head = __atomic_load_n(orch_ready_head_, __ATOMIC_ACQUIRE); - int32_t tail = __atomic_load_n(orch_ready_tail_, __ATOMIC_ACQUIRE); - if (head == tail) break; // queue empty - - // CAS to claim this slot (multiple scheduler threads compete) - if (!__atomic_compare_exchange_n(orch_ready_head_, &head, head + 1, - false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) continue; - - int32_t task_id = orch_ready_queue_[head & (orch_ready_capacity_ - 1)]; - int32_t slot = task_id & window_mask; - - // CAS from 0 → 1 to claim enqueue rights (may already be enqueued by fanout path) - int32_t expected = 0; - if (!__atomic_compare_exchange_n(&s_pto2_task_completed[slot], &expected, 1, - false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) continue; - - PTO2TaskDescriptor* t = &task_descriptors[slot]; - enqueue_ready_task_with_profiling( - task_id, t->worker_type, thread_idx -#if PTO2_ORCH_PROFILING - , sched_early_ready_wait, sched_early_ready_hold -#endif - ); - phase_early_ready_count++; - made_progress = true; - } - } - CYCLE_COUNT_LAP(sched_early_ready_cycle); -#if PTO2_ORCH_PROFILING - if (profiling_enabled) { - perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_EARLY_READY, - _t0_phase, _t1, static_cast(sched_loop_count), phase_early_ready_count); - _t0_phase = _t1; - } -#endif - if (!made_progress) { idle_iterations++; if (thread_idx == 0 && task_count > 0 && idle_iterations % STALL_LOG_INTERVAL == 0) { int32_t c = completed_tasks_.load(std::memory_order_relaxed); DEV_ALWAYS("PTO2 stall: no progress for %d iterations, completed=%d total=%d", idle_iterations, c, task_count); - // Scan all task slots to find truly stuck tasks - // state=0: not yet completed (may be waiting for deps or ready but not enqueued) - // state=1: enqueued in ready queue or dispatched to hardware - // state=2: completed by Phase 1 + // Scan all task slots to find truly stuck tasks using scheduler state + PTO2SchedulerState* sched = &rt->scheduler; int cnt_ready = 0, cnt_waiting = 0, cnt_inflight = 0; for (int si = 0; si < task_count; si++) { - int32_t st = __atomic_load_n(&s_pto2_task_completed[si], __ATOMIC_RELAXED); - int32_t rc = __atomic_load_n(&s_pto2_fanin_refcount[si], __ATOMIC_RELAXED); - int32_t fi = __atomic_load_n(&task_descriptors[si].fanin_count, __ATOMIC_RELAXED); - int32_t kid = task_descriptors[si].kernel_id; - if (st == 2) continue; // Already done - if (st == 1) { cnt_inflight++; continue; } - // st == 0 + int32_t slot = si & window_mask; + PTO2TaskState st = (PTO2TaskState)__atomic_load_n(&sched->task_state[slot], __ATOMIC_RELAXED); + int32_t rc = __atomic_load_n(&sched->fanin_refcount[slot], __ATOMIC_RELAXED); + int32_t fi = __atomic_load_n(&task_descriptors[slot].fanin_count, __ATOMIC_RELAXED); + int32_t kid = task_descriptors[slot].kernel_id; + if (st >= PTO2_TASK_COMPLETED) continue; // Already done + if (st == PTO2_TASK_READY || st == PTO2_TASK_RUNNING) { cnt_inflight++; continue; } + // PENDING if (rc >= fi) { // Ready (all deps satisfied) but not enqueued — this is the real bug cnt_ready++; if (cnt_ready <= STALL_DUMP_READY_MAX) { - DEV_ALWAYS(" STUCK-READY slot=%d kernel_id=%d refcount=%d fanin=%d", - si, kid, rc, fi); + DEV_ALWAYS(" STUCK-READY slot=%d kernel_id=%d refcount=%d fanin=%d state=%d", + slot, kid, rc, fi, (int)st); } } else { cnt_waiting++; if (cnt_waiting <= STALL_DUMP_WAIT_MAX) { - DEV_ALWAYS(" STUCK-WAIT slot=%d kernel_id=%d refcount=%d fanin=%d", - si, kid, rc, fi); + DEV_ALWAYS(" STUCK-WAIT slot=%d kernel_id=%d refcount=%d fanin=%d state=%d", + slot, kid, rc, fi, (int)st); } } } @@ -1069,6 +727,14 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } else { SPIN_WAIT_HINT(); } + CYCLE_COUNT_LAP(sched_idle_cycle); +#if PTO2_ORCH_PROFILING + if (profiling_enabled) { + perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_IDLE_WAIT, + _t0_phase, _t1, static_cast(sched_loop_count), 0); + _t0_phase = _t1; + } +#endif } else { idle_iterations = 0; } @@ -1076,56 +742,34 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, #if PTO2_ORCH_PROFILING uint64_t sched_total = - sched_scan_cycle + sched_early_ready_cycle + sched_complete_cycle + sched_dispatch_cycle; + sched_scan_cycle + sched_complete_cycle + sched_dispatch_cycle + sched_idle_cycle; if (sched_total == 0) sched_total = 1; // avoid div-by-zero + double total_us_f = cycles_to_us(sched_total); + double scan_us_f = cycles_to_us(sched_scan_cycle); + double complete_us_f = cycles_to_us(sched_complete_cycle); + double dispatch_us_f = cycles_to_us(sched_dispatch_cycle); + double idle_us_f = cycles_to_us(sched_idle_cycle); double tasks_per_loop = sched_loop_count > 0 ? (double)cur_thread_completed / sched_loop_count : 0.0; + double notify_avg = cur_thread_completed > 0 ? (double)notify_edges_total / cur_thread_completed : 0.0; + uint64_t pop_total = pop_hit + pop_miss; + double pop_hit_rate = pop_total > 0 ? pop_hit * 100.0 / pop_total : 0.0; - // === Summary === - DEV_ALWAYS("Thread %d: === PTO2 Scheduler Summary ===", thread_idx); + // Output format compatible with tools/sched_overhead_analysis.py DEV_ALWAYS("Thread %d: completed=%d tasks in %.0fus (%llu loops, %.1f tasks/loop)", - thread_idx, cur_thread_completed, cycles_to_us(sched_total), + thread_idx, cur_thread_completed, total_us_f, (unsigned long long)sched_loop_count, tasks_per_loop); - - // --- Phase Breakdown (execution order) --- - DEV_ALWAYS("Thread %d: --- Phase Breakdown (execution order) ---", thread_idx); - DEV_ALWAYS("Thread %d: scan: %8.0fus (%4.1f%%)", - thread_idx, cycles_to_us(sched_scan_cycle), sched_scan_cycle * 100.0 / sched_total); - DEV_ALWAYS("Thread %d: early_ready: %8.0fus (%4.1f%%) (deps already met at submit time)", - thread_idx, cycles_to_us(sched_early_ready_cycle), sched_early_ready_cycle * 100.0 / sched_total); - DEV_ALWAYS("Thread %d: complete: %8.0fus (%4.1f%%) [fanout: edges=%llu, max_degree=%d, avg=%.1f]", - thread_idx, cycles_to_us(sched_complete_cycle), sched_complete_cycle * 100.0 / sched_total, - (unsigned long long)fanout_edges_notified, fanout_max_degree, - cur_thread_completed > 0 ? (double)fanout_edges_notified / cur_thread_completed : 0.0); - DEV_ALWAYS("Thread %d: dispatch: %8.0fus (%4.1f%%) [steal: own=%llu, steal=%llu, pct=%.1f%%]", - thread_idx, cycles_to_us(sched_dispatch_cycle), sched_dispatch_cycle * 100.0 / sched_total, - (unsigned long long)ready_pop_own, (unsigned long long)ready_pop_steal, - (ready_pop_own + ready_pop_steal) > 0 ? 100.0 * (double)ready_pop_steal / (double)(ready_pop_own + ready_pop_steal) : 0.0); - - // --- Lock Contention (ready_q) --- - DEV_ALWAYS("Thread %d: --- Lock Contention (ready_q) ---", thread_idx); - DEV_ALWAYS("Thread %d: total: wait=%5.0fus hold=%5.0fus", - thread_idx, - (double)cycles_to_us(sched_scan_ready_wait + sched_early_ready_wait + sched_complete_ready_wait + sched_dispatch_hit_wait + sched_dispatch_miss_wait), - (double)cycles_to_us(sched_scan_ready_hold + sched_early_ready_hold + sched_complete_ready_hold + sched_dispatch_hit_hold + sched_dispatch_miss_hold)); - DEV_ALWAYS("Thread %d: scan: wait=%5.0fus hold=%5.0fus", - thread_idx, - (double)cycles_to_us(sched_scan_ready_wait), (double)cycles_to_us(sched_scan_ready_hold)); - DEV_ALWAYS("Thread %d: early_ready: wait=%5.0fus hold=%5.0fus", - thread_idx, - (double)cycles_to_us(sched_early_ready_wait), (double)cycles_to_us(sched_early_ready_hold)); - DEV_ALWAYS("Thread %d: complete: wait=%5.0fus hold=%5.0fus", - thread_idx, - (double)cycles_to_us(sched_complete_ready_wait), (double)cycles_to_us(sched_complete_ready_hold)); - DEV_ALWAYS("Thread %d: dispatch: wait=%5.0fus hold=%5.0fus", - thread_idx, - (double)cycles_to_us(sched_dispatch_hit_wait + sched_dispatch_miss_wait), - (double)cycles_to_us(sched_dispatch_hit_hold + sched_dispatch_miss_hold)); - DEV_ALWAYS("Thread %d: hit: wait=%5.0fus hold=%5.0fus (dequeued task)", - thread_idx, - (double)cycles_to_us(sched_dispatch_hit_wait), (double)cycles_to_us(sched_dispatch_hit_hold)); - DEV_ALWAYS("Thread %d: miss: wait=%5.0fus hold=%5.0fus (empty queue)", - thread_idx, - (double)cycles_to_us(sched_dispatch_miss_wait), (double)cycles_to_us(sched_dispatch_miss_hold)); + DEV_ALWAYS("Thread %d: --- Phase Breakdown ---", thread_idx); + DEV_ALWAYS("Thread %d: complete: %.0fus (%.1f%%) [notify: edges=%llu, max_degree=%d, avg=%.1f]", + thread_idx, complete_us_f, sched_complete_cycle * 100.0 / sched_total, + (unsigned long long)notify_edges_total, notify_max_degree, notify_avg); + DEV_ALWAYS("Thread %d: scan: %.0fus (%.1f%%) [enqueue: %llu]", + thread_idx, scan_us_f, sched_scan_cycle * 100.0 / sched_total, + (unsigned long long)scan_enqueue_count); + DEV_ALWAYS("Thread %d: dispatch: %.0fus (%.1f%%) [pop: hit=%llu, miss=%llu, hit_rate=%.1f%%]", + thread_idx, dispatch_us_f, sched_dispatch_cycle * 100.0 / sched_total, + (unsigned long long)pop_hit, (unsigned long long)pop_miss, pop_hit_rate); + DEV_ALWAYS("Thread %d: idle: %.0fus (%.1f%%)", + thread_idx, idle_us_f, sched_idle_cycle * 100.0 / sched_total); #endif // Flush performance buffers for cores managed by this thread @@ -1146,6 +790,7 @@ int AicpuExecutor::run(Runtime* runtime) { // Thread 3 when 4 AICPU threads: orchestrator (no cores) if (thread_num_ == 4 && thread_idx == 3) { + rt = nullptr; if (runtime->get_orch_built_on_host()) { DEV_INFO("Thread 3: Host orchestration mode, no-op"); } else { @@ -1256,6 +901,13 @@ int AicpuExecutor::run(Runtime* runtime) { DEV_INFO("Thread 3: No config function, using defaults"); } + if (expected_arg_count > 0 && arg_count < expected_arg_count) { + DEV_ERROR("Thread 3: arg_count %d < expected %d", arg_count, expected_arg_count); + dlclose(handle); + unlink(so_path); + return -1; + } + // Apply ring buffer size overrides from Runtime (set by host env vars) if (runtime->pto2_task_window_size > 0) { task_window_size = runtime->pto2_task_window_size; @@ -1269,16 +921,8 @@ int AicpuExecutor::run(Runtime* runtime) { DEV_INFO("Thread 3: Ring sizes: task_window=%lu, heap=%lu, dep_pool=%lu", (unsigned long)task_window_size, (unsigned long)heap_size, (unsigned long)dep_list_pool_size); - if (expected_arg_count > 0 && arg_count < expected_arg_count) { - DEV_ERROR("Thread 3: arg_count %d < expected %d", arg_count, expected_arg_count); - dlclose(handle); - unlink(so_path); - return -1; - } - // Get GM heap from runtime (dedicated field) void* sm_ptr = runtime->get_pto2_gm_sm_ptr(); - PTO2SharedMemoryHeader* header = static_cast(sm_ptr); void* gm_heap = runtime->get_pto2_gm_heap_ptr(); // Create shared memory handle and runtime (ops table populated inside) @@ -1293,10 +937,7 @@ int AicpuExecutor::run(Runtime* runtime) { return -1; } - // Signal scheduler threads that SM header is initialized - sm_header_ready_.store(true, std::memory_order_release); - - PTO2Runtime* rt = pto2_runtime_create_from_sm(PTO2_MODE_EXECUTE, + rt = pto2_runtime_create_from_sm(PTO2_MODE_EXECUTE, sm_handle, gm_heap, heap_size); if (!rt) { DEV_ERROR("Thread 3: Failed to create PTO2Runtime"); @@ -1306,27 +947,13 @@ int AicpuExecutor::run(Runtime* runtime) { return -1; } - // Wait for scheduler's one-time init to complete (ensures memset has executed) + runtime_init_ready_.store(true, std::memory_order_release); + + // Wait for scheduler's one-time init to complete while (!pto2_init_complete_.load(std::memory_order_acquire)) { + std::this_thread::yield(); } - // Set orchestrator's aicpu parallel mode pointers - uint64_t ws = header->task_window_size; - if (ws == 0 || ws > PTO2_MAX_SLOTS) ws = PTO2_MAX_SLOTS; - rt->orchestrator.aicpu_fanin_refcount = s_pto2_fanin_refcount; - rt->orchestrator.aicpu_task_completed = s_pto2_task_completed; - rt->orchestrator.aicpu_completed_by_task = s_pto2_completed_by_task; - rt->orchestrator.aicpu_window_mask = ws - 1; - - // Expose orchestrator ready queue to scheduler threads - orch_ready_queue_ = rt->orchestrator.orch_ready_queue; - orch_ready_tail_ = &rt->orchestrator.orch_ready_tail; - orch_ready_head_ = &rt->orchestrator.orch_ready_head; - orch_ready_capacity_ = PTO2OrchestratorState::ORCH_READY_QUEUE_SIZE; - - // Signal scheduler threads: all pointers are ready, safe to start scheduling. - orch_pointers_ready_.store(true, std::memory_order_release); - // Call orchestration wrapped in outer scope (matches old PTO2_ORCHESTRATION behavior) DEV_ALWAYS("Thread 3: Calling aicpu_orchestration_entry from SO"); uint64_t orch_cycle_start = get_sys_cnt_aicpu(); @@ -1419,6 +1046,13 @@ int AicpuExecutor::run(Runtime* runtime) { } else { // Note: Handshake already completed in init() via handshake_all_cores() + // Device orchestration: wait for Thread 3 to initialize SM header + if (!runtime->get_orch_built_on_host()) { + while (!runtime_init_ready_.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + } + always_assert(rt != nullptr); DEV_INFO("Thread %d: Starting PTO2 dispatch", thread_idx); int completed = resolve_and_dispatch_pto2(runtime, thread_idx, cur_thread_cores, my_cores); DEV_INFO("Thread %d: Executed %d tasks from runtime", thread_idx, completed); @@ -1442,14 +1076,6 @@ int AicpuExecutor::run(Runtime* runtime) { } void AicpuExecutor::deinit() { - // Cleanup runtime execution state (clear all max slots for safety) - for (int s = 0; s < MAX_AICPU_THREADS; s++) { - ready_queue_aic_head_[s] = 0; - ready_queue_aic_tail_[s] = 0; - ready_queue_aiv_head_[s] = 0; - ready_queue_aiv_tail_[s] = 0; - } - // Reset per-core dispatch timestamps and task counters for (int i = 0; i < RUNTIME_MAX_WORKER; i++) { dispatch_timestamps_[i] = 0; @@ -1462,13 +1088,7 @@ void AicpuExecutor::deinit() { orchestrator_done_.store(false, std::memory_order_release); pto2_init_done_.store(false, std::memory_order_release); pto2_init_complete_.store(false, std::memory_order_release); - next_scan_index_.store(0, std::memory_order_release); - sm_header_ready_.store(false, std::memory_order_release); - orch_pointers_ready_.store(false, std::memory_order_release); - orch_ready_queue_ = nullptr; - orch_ready_tail_ = nullptr; - orch_ready_head_ = nullptr; - orch_ready_capacity_ = 0; + runtime_init_ready_.store(false, std::memory_order_release); // Reset core discovery state aic_count_ = 0; @@ -1503,12 +1123,13 @@ void AicpuExecutor::diagnose_stuck_state(Runtime* runtime, int thread_idx, DEV_ALWAYS("Progress: %d/%d tasks (%.1f%%)", completed, total, total > 0 ? completed * 100.0 / total : 0.0); - int aic_ready = 0, aiv_ready = 0; - for (int s = 0; s < active_shards_; s++) { - aic_ready += ready_queue_aic_tail_[s] - ready_queue_aic_head_[s]; - aiv_ready += ready_queue_aiv_tail_[s] - ready_queue_aiv_head_[s]; + uint64_t aic_ready = 0, aiv_ready = 0; + if (rt) { + PTO2SchedulerState* sched = &rt->scheduler; + aic_ready = pto2_ready_queue_count(&sched->ready_queues[PTO2_WORKER_CUBE]); + aiv_ready = pto2_ready_queue_count(&sched->ready_queues[PTO2_WORKER_VECTOR]); } - DEV_ALWAYS("Ready Queues (%d shards, per-thread push + work-steal pop): AIC=%d, AIV=%d", active_shards_, aic_ready, aiv_ready); + DEV_ALWAYS("Ready Queues: AIC=%lu, AIV=%lu", aic_ready, aiv_ready); int busy_cores = 0; int idle_cores = 0; diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index 547791b4c..67c897aee 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -213,10 +213,6 @@ void pto2_add_consumer_to_producer( if (orch->aicpu_task_completed) { int32_t prod_slot = producer_id & orch->aicpu_window_mask; if (__atomic_load_n(&orch->aicpu_task_completed[prod_slot], __ATOMIC_ACQUIRE) >= 2 && - // RELAXED is sufficient: the ACQUIRE on aicpu_task_completed above - // synchronizes with the RELEASE on task_completed in the scheduler, - // and completed_by_task is stored (with RELEASE) sequenced-before - // task_completed — so it is visible after the ACQUIRE load above. __atomic_load_n(&orch->aicpu_completed_by_task[prod_slot], __ATOMIC_RELAXED) == producer_id) { int32_t cons_slot = consumer_id & orch->aicpu_window_mask; __atomic_fetch_add(&orch->aicpu_fanin_refcount[cons_slot], 1, __ATOMIC_ACQ_REL); @@ -245,22 +241,6 @@ void pto2_add_consumer_to_producer( pto2_fanout_unlock(producer); } -void* pto2_alloc_packed_buffer(PTO2OrchestratorState* orch, int32_t total_size) { - if (total_size <= 0) { - return NULL; - } - - void* buffer = orch->heap_ring.pto2_heap_ring_alloc(total_size); - - orch->buffers_allocated++; - orch->bytes_allocated += total_size; - - // Update shared memory with new heap top - PTO2_STORE_RELEASE(&orch->sm_handle->header->heap_top, orch->heap_ring.top); - - return buffer; -} - void pto2_submit_task(PTO2OrchestratorState* orch, int32_t kernel_id, PTO2WorkerType worker_type, @@ -309,16 +289,13 @@ void pto2_submit_task(PTO2OrchestratorState* orch, // Register this task in its owning scope scope_tasks_push(orch, task_id); - // Temporary storage for collecting output sizes - int32_t total_output_size = 0; - // Temporary storage for fanin int32_t fanin_temp[PTO2_MAX_INPUTS]; int32_t fanin_count = 0; task->param_count = num_params; for (int i = 0; i < num_params; i++) { - task->params[i].type = params[i].type; + task->params[i].type = params[i].type; if (params[i].type == PTOParamType::SCALAR) { task->params[i].scalar_value = params[i].scalar_value; } else { @@ -328,8 +305,28 @@ void pto2_submit_task(PTO2OrchestratorState* orch, CYCLE_COUNT_LAP_RECORD(g_orch_params_cycle, AicpuPhaseId::ORCH_PARAMS); - // === STEP 2: First pass - collect output sizes and process inputs === + // Temporary storage for collecting output sizes + int32_t total_output_size = 0; + for (int i = 0; i < num_params; i++) { + PTOParam& p = task->params[i]; + if (p.type != PTOParamType::OUTPUT) { + continue; + } + auto& tensor_data = p.tensor.data(); + // Only allocate from ring buffer when caller did not provide an address + if (tensor_data.buffer.addr == 0) { + total_output_size += PTO2_ALIGN_UP(tensor_data.buffer.size, PTO2_PACKED_OUTPUT_ALIGN); + } + } + if (total_output_size > 0) { + task->packed_buffer_base = orch->pto2_alloc_packed_buffer(total_output_size); + task->packed_buffer_end = (char*)task->packed_buffer_base + total_output_size; + } + CYCLE_COUNT_LAP_RECORD(g_orch_heap_cycle, AicpuPhaseId::ORCH_HEAP); + + // === STEP 2: First pass - set output addr and process tensor === + int32_t offset = 0; for (int i = 0; i < num_params; i++) { PTOParam& p = task->params[i]; @@ -358,10 +355,6 @@ void pto2_submit_task(PTO2OrchestratorState* orch, if (fanin_count < PTO2_MAX_INPUTS) { fanin_temp[fanin_count++] = producer_task_id; } - - // Add this task to producer's fanout list (with spinlock) - PTO2TaskDescriptor* producer = pto2_task_ring_get(&orch->task_ring, producer_task_id); - pto2_add_consumer_to_producer(orch, producer, producer_task_id, task_id); } if (p.type == PTOParamType::INOUT && overlap_status == OverlapStatus::COVERED) { // inout因为会再次insert进tensor map, @@ -378,9 +371,13 @@ void pto2_submit_task(PTO2OrchestratorState* orch, case PTOParamType::OUTPUT: { auto &tensor_data = p.tensor.data(); - // Only allocate from ring buffer when caller did not provide an address + // Offsets: each output at 1024B-aligned slot; slot size = ALIGN_UP(size, 1024) + // Allocation happens here only; no memcpy of buffer content. Caller's tensor gets addr written back. if (tensor_data.buffer.addr == 0) { - total_output_size += PTO2_ALIGN_UP(tensor_data.buffer.size, PTO2_PACKED_OUTPUT_ALIGN); + uint64_t alloc_addr = reinterpret_cast((char*)task->packed_buffer_base + offset); + tensor_data.buffer.addr = alloc_addr; + offset += PTO2_ALIGN_UP(tensor_data.buffer.size, PTO2_PACKED_OUTPUT_ALIGN); + } break; } @@ -391,29 +388,6 @@ void pto2_submit_task(PTO2OrchestratorState* orch, CYCLE_COUNT_LAP_RECORD(g_orch_lookup_cycle, AicpuPhaseId::ORCH_LOOKUP); - // === STEP 3: Allocate packed buffer from Heap Ring (may stall) === - // Each output slot is aligned to PTO2_PACKED_OUTPUT_ALIGN (1024B); gap after data is padding. - if (total_output_size > 0) { - task->packed_buffer_base = orch->pto2_alloc_packed_buffer(total_output_size); - task->packed_buffer_end = (char*)task->packed_buffer_base + total_output_size; - - // Offsets: each output at 1024B-aligned slot; slot size = ALIGN_UP(size, 1024) - // Allocation happens here only; no memcpy of buffer content. Caller's tensor gets addr written back. - int32_t offset = 0; - for (int i = 0; i < task->param_count; i++) { - PTOParam& p = task->params[i]; - if (p.type == PTOParamType::OUTPUT) { - auto &tensor_data = p.tensor.data(); - if (tensor_data.buffer.addr == 0) { - uint64_t alloc_addr = reinterpret_cast((char*)task->packed_buffer_base + offset); - tensor_data.buffer.addr = alloc_addr; - offset += PTO2_ALIGN_UP(tensor_data.buffer.size, PTO2_PACKED_OUTPUT_ALIGN); - } - } - } - } - - CYCLE_COUNT_LAP_RECORD(g_orch_heap_cycle, AicpuPhaseId::ORCH_HEAP); // === STEP 4: Second pass - register outputs in TensorMap === for (int i = 0; i < num_params; i++) { @@ -429,35 +403,45 @@ void pto2_submit_task(PTO2OrchestratorState* orch, // === STEP 5: Finalize fanin list === // First build the fanin list - for (int i = 0; i < fanin_count; i++) { - task->fanin_head = pto2_dep_list_prepend(&orch->dep_pool, task->fanin_head, fanin_temp[i]); + if (orch->scheduler) { + PTO2SchedulerState* sched = orch->scheduler; + int32_t slot = sched->pto2_task_slot(task_id); + + int32_t early_finished = 0; + task->fanin_count = fanin_count + 1; // +1 redundance for not being ready too early + for (int i = 0; i < fanin_count; i++) { + int32_t producer_task_id = fanin_temp[i]; + // Add this task to producer's fanout list (with spinlock) + PTO2TaskDescriptor* producer = pto2_task_ring_get(&orch->task_ring, producer_task_id); + pto2_fanout_lock(producer); + producer->fanout_head = pto2_dep_list_prepend(&orch->dep_pool, producer->fanout_head, task_id); + producer->fanout_count++; + // Normal path: prepend consumer to producer's fanout list + task->fanin_head = pto2_dep_list_prepend(&orch->dep_pool, task->fanin_head, producer_task_id); + + int32_t prod_slot = sched->pto2_task_slot(producer_task_id); + int32_t prod_state = __atomic_load_n(&sched->task_state[prod_slot], __ATOMIC_ACQUIRE); + if (prod_state == PTO2_TASK_COMPLETED) { + // Early return optimization: if producer already completed, we can skip adding dependency and directly decrement fanin_count + early_finished++; + } + pto2_fanout_unlock(producer); + } + if (early_finished > 0) { + __atomic_fetch_add(&sched->fanin_refcount[slot], early_finished, __ATOMIC_SEQ_CST); + } + } else { + // No scheduler: just build fanin list + add to producers using pto2_add_consumer_to_producer + for (int i = 0; i < fanin_count; i++) { + task->fanin_head = pto2_dep_list_prepend(&orch->dep_pool, task->fanin_head, fanin_temp[i]); + PTO2TaskDescriptor* producer = pto2_task_ring_get(&orch->task_ring, fanin_temp[i]); + pto2_add_consumer_to_producer(orch, producer, fanin_temp[i], task_id); + } + __atomic_store_n(&task->fanin_count, fanin_count, __ATOMIC_SEQ_CST); } - // SEQ_CST store: participates in the global total order with Phase 1's SEQ_CST - // fetch_add on s_pto2_fanin_refcount to prevent the IRIW hazard on ARM. - // (See comment above the fetch_add in aicpu_executor.cpp Phase 1 for details.) - __atomic_store_n(&task->fanin_count, fanin_count, __ATOMIC_SEQ_CST); CYCLE_COUNT_LAP_RECORD(g_orch_fanin_cycle, AicpuPhaseId::ORCH_FANIN); - // === STEP 5b: Check if task is already ready (all producers completed via early-return) === - // In AICPU parallel mode, early-return in pto2_add_consumer_to_producer may have - // already incremented aicpu_fanin_refcount for this task. Now that fanin_count is - // finalized, check if the task is already satisfied and push it to the orchestrator - // ready queue so scheduler threads can pick it up without an O(N) scan. - if (orch->aicpu_fanin_refcount && fanin_count > 0) { - int32_t slot = task_id & orch->aicpu_window_mask; - int32_t refcount = __atomic_load_n(&orch->aicpu_fanin_refcount[slot], __ATOMIC_SEQ_CST); - if (refcount >= fanin_count) { - // All producers already completed — push to orch ready queue - int32_t tail = orch->orch_ready_tail; - int32_t capacity = PTO2OrchestratorState::ORCH_READY_QUEUE_SIZE; - int32_t head = __atomic_load_n(&orch->orch_ready_head, __ATOMIC_ACQUIRE); - if (((tail + 1) & (capacity - 1)) != (head & (capacity - 1))) { - orch->orch_ready_queue[tail & (capacity - 1)] = task_id; - __atomic_store_n(&orch->orch_ready_tail, tail + 1, __ATOMIC_RELEASE); - } - } - } // === STEP 6: Initialize task in scheduler === // In multi-threaded mode, scheduler thread handles task initialization via polling diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h b/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h index 1185b9865..129c24243 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h @@ -82,18 +82,6 @@ struct PTO2OrchestratorState { int32_t* aicpu_completed_by_task; // task_id that set the completed state (for slot-reuse validation) int32_t aicpu_window_mask; - // === ORCHESTRATOR READY QUEUE (early-return path → scheduler) === - // When the orchestrator discovers a producer already completed, it - // increments the consumer's refcount directly. If that makes the - // consumer ready, the consumer_id is pushed here so scheduler threads - // can pick it up without an O(N) scan. - // SPSC-ish ring: orchestrator writes (single producer), scheduler - // threads read via CAS on orch_ready_head (multiple consumers). - static constexpr int32_t ORCH_READY_QUEUE_SIZE = 4096; - volatile int32_t orch_ready_queue[4096]; - volatile int32_t orch_ready_tail; // written by orchestrator only - volatile int32_t orch_ready_head; // advanced by scheduler via CAS - /** * Allocate packed output buffer for a task */ @@ -229,17 +217,6 @@ void pto2_orchestrator_wait_all(PTO2OrchestratorState* orch); */ bool pto2_orchestrator_has_space(PTO2OrchestratorState* orch); -// ============================================================================= -// Internal Helpers -// ============================================================================= - -/** - * Add consumer to producer's fanout list (with spinlock) - * Also checks if producer has already completed and updates consumer's fanin_refcount - */ -void pto2_add_consumer_to_producer( - PTO2OrchestratorState* orch, PTO2TaskDescriptor* producer, int32_t producer_id, int32_t consumer_id); - // ============================================================================= // Debug Utilities // ============================================================================= diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp b/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp index 34c922e2d..7b289e175 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp @@ -148,6 +148,21 @@ bool pto2_scheduler_init(PTO2SchedulerState* sched, } } + // Initialize orch_pending lock-free queue + sched->orch_pending_capacity = window_size; + sched->orch_pending = (int32_t*)malloc(window_size * sizeof(int32_t)); + if (!sched->orch_pending) { + for (int i = 0; i < PTO2_NUM_WORKER_TYPES; i++) { + pto2_ready_queue_destroy(&sched->ready_queues[i]); + } + free(sched->fanout_refcount); + free(sched->fanin_refcount); + free(sched->task_state); + return false; + } + sched->orch_pending_head = 0; + sched->orch_pending_tail = 0; + return true; } @@ -170,6 +185,11 @@ void pto2_scheduler_destroy(PTO2SchedulerState* sched) { for (int i = 0; i < PTO2_NUM_WORKER_TYPES; i++) { pto2_ready_queue_destroy(&sched->ready_queues[i]); } + + if (sched->orch_pending) { + free(sched->orch_pending); + sched->orch_pending = NULL; + } } void pto2_scheduler_reset(PTO2SchedulerState* sched) { @@ -184,6 +204,9 @@ void pto2_scheduler_reset(PTO2SchedulerState* sched) { pto2_ready_queue_reset(&sched->ready_queues[i]); } + sched->orch_pending_head = 0; + sched->orch_pending_tail = 0; + sched->tasks_completed = 0; sched->tasks_consumed = 0; } @@ -246,7 +269,8 @@ static void check_and_handle_consumed(PTO2SchedulerState* sched, } } -void pto2_scheduler_on_task_complete(PTO2SchedulerState* sched, int32_t task_id) { +PTO2CompletionStats pto2_scheduler_on_task_complete(PTO2SchedulerState* sched, int32_t task_id) { + PTO2CompletionStats stats = {0, 0}; int32_t slot = sched->pto2_task_slot(task_id); PTO2TaskDescriptor* task = pto2_sm_get_task(sched->sm_handle, task_id); @@ -271,7 +295,10 @@ void pto2_scheduler_on_task_complete(PTO2SchedulerState* sched, int32_t task_id) PTO2TaskDescriptor* consumer = pto2_sm_get_task(sched->sm_handle, consumer_id); // Atomically increment consumer's fanin_refcount and check if consumer is now ready - sched->release_fanin_and_check_ready(consumer_id, consumer); + stats.fanout_edges++; + if (sched->release_fanin_and_check_ready(consumer_id, consumer)) { + stats.tasks_enqueued++; + } current = entry->next_offset; } @@ -292,6 +319,8 @@ void pto2_scheduler_on_task_complete(PTO2SchedulerState* sched, int32_t task_id) // === STEP 3: Check if this task can transition to CONSUMED === check_and_handle_consumed(sched, task_id, task); + + return stats; } void pto2_scheduler_on_scope_end(PTO2SchedulerState* sched, diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h b/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h index 1a294d410..551aebbc8 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h @@ -22,6 +22,19 @@ #include "pto_shared_memory.h" #include "pto_ring_buffer.h" +// ============================================================================= +// Completion Statistics (returned by on_task_complete for profiling) +// ============================================================================= + +/** + * Statistics from a single on_task_complete call. + * Used by executor profiling to replace old fanout/steal counters. + */ +struct PTO2CompletionStats { + int32_t fanout_edges; // Number of fanout edges traversed + int32_t tasks_enqueued; // Number of consumers that became READY +}; + // ============================================================================= // Ready Queue Structure // ============================================================================= @@ -80,6 +93,14 @@ struct PTO2SchedulerState { // Ready queues (one per worker type) PTO2ReadyQueue ready_queues[PTO2_NUM_WORKER_TYPES]; + // Lock-free orchestrator pending queue (SPSC producer, MPMC consumer) + // Orchestrator pushes newly-ready tasks here instead of touching ready_queue spinlock. + // Scanner threads drain this into ready_queues during the scan phase. + int32_t* orch_pending; // Circular buffer of task IDs + volatile int64_t orch_pending_head; // Consumer position (CAS-based pop) + volatile int64_t orch_pending_tail; // Producer position (store-release only) + int64_t orch_pending_capacity; // = task_window_size + // Dependency list pool reference PTO2DepListPool* dep_pool; @@ -103,15 +124,14 @@ struct PTO2SchedulerState { // ============================================================================= /** - * Signal that one fanin dependency has been satisfied + * Mark task READY if all fanin dependencies are satisfied (CAS only, no enqueue). * - * Atomically increments fanin_refcount. If the new count equals - * fanin_count, CAS PENDING -> READY and enqueue. + * Atomically increments fanin_refcount. If the new count equals fanin_count, + * CAS PENDING -> READY. Does NOT push to any queue — caller decides where. * - * @param task_id Task ID - * @param task Task descriptor + * @return true if the task transitioned to READY */ - void release_fanin_and_check_ready(int32_t task_id, PTO2TaskDescriptor* task) { + bool release_fanin_and_mark_ready(int32_t task_id, PTO2TaskDescriptor* task) { int32_t slot = pto2_task_slot(task_id); // Atomically increment fanin_refcount and check if all producers are done @@ -125,8 +145,58 @@ struct PTO2SchedulerState { PTO2TaskState expected = PTO2_TASK_PENDING; if (__atomic_compare_exchange_n(&task_state[slot], &expected, PTO2_TASK_READY, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) { - pto2_ready_queue_push(&ready_queues[task->worker_type], task_id); + return true; + } + } + return false; + } + + /** + * Signal that one fanin dependency has been satisfied (on_task_complete path). + * + * Calls release_fanin_and_mark_ready, then pushes to the ready_queue if READY. + * + * @return true if the task was enqueued (became READY) + */ + bool release_fanin_and_check_ready(int32_t task_id, PTO2TaskDescriptor* task) { + if (release_fanin_and_mark_ready(task_id, task)) { + pto2_ready_queue_push(&ready_queues[task->worker_type], task_id); + return true; + } + return false; + } + + /** + * Push task_id to orch_pending (single producer, lock-free). + * Only called by orchestrator thread. + */ + void orch_pending_push(int32_t task_id) { + int64_t tail = __atomic_load_n(&orch_pending_tail, __ATOMIC_RELAXED); + orch_pending[tail % orch_pending_capacity] = task_id; + __atomic_store_n(&orch_pending_tail, tail + 1, __ATOMIC_RELEASE); + } + + /** + * Try to pop one task_id from orch_pending (multi-consumer, CAS-based). + * Called by scanner threads. + * + * @param out_task_id receives the popped task_id on success + * @return true if a task was popped + */ + bool orch_pending_try_pop(int32_t* out_task_id) { + while (true) { + int64_t head = __atomic_load_n(&orch_pending_head, __ATOMIC_ACQUIRE); + int64_t tail = __atomic_load_n(&orch_pending_tail, __ATOMIC_ACQUIRE); + if (head >= tail) { + return false; // empty + } + int32_t task_id = orch_pending[head % orch_pending_capacity]; + if (__atomic_compare_exchange_n(&orch_pending_head, &head, head + 1, + false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) { + *out_task_id = task_id; + return true; } + // CAS failed — another consumer won; retry } } @@ -140,7 +210,9 @@ struct PTO2SchedulerState { // concurrent on_task_complete between Step 5 and Step 6. fanout_refcount[slot] = 0; - release_fanin_and_check_ready(task_id, task); + if (release_fanin_and_mark_ready(task_id, task)) { + orch_pending_push(task_id); // Lock-free, no spinlock + } } }; @@ -252,8 +324,9 @@ int32_t pto2_scheduler_get_ready_task(PTO2SchedulerState* sched, * * @param sched Scheduler state * @param task_id Completed task ID + * @return Completion statistics for profiling */ -void pto2_scheduler_on_task_complete(PTO2SchedulerState* sched, int32_t task_id); +PTO2CompletionStats pto2_scheduler_on_task_complete(PTO2SchedulerState* sched, int32_t task_id); /** * Handle scope end (called when orchestrator ends a scope) diff --git a/tools/sched_overhead_analysis.py b/tools/sched_overhead_analysis.py index e6ea363df..fb66edc86 100644 --- a/tools/sched_overhead_analysis.py +++ b/tools/sched_overhead_analysis.py @@ -37,19 +37,11 @@ def parse_scheduler_threads(log_path): Expected log format (per thread): Thread N: completed=X tasks in Yus (Z loops, W tasks/loop) - Thread N: --- Phase Breakdown (execution order) --- - Thread N: scan: Xus (Y%) - Thread N: early_ready: Xus (Y%) (deps already met at submit time) - Thread N: complete: Xus (Y%) [fanout: edges=A, max_degree=B, avg=C] - Thread N: dispatch: Xus (Y%) [steal: own=A, steal=B, pct=C%] - Thread N: --- Lock Contention (ready_q) --- - Thread N: total: wait= Xus hold= Yus - Thread N: scan: wait= Xus hold= Yus - Thread N: early_ready: wait= Xus hold= Yus - Thread N: complete: wait= Xus hold= Yus - Thread N: dispatch: wait= Xus hold= Yus - Thread N: hit: wait= Xus hold= Yus (dequeued task) - Thread N: miss: wait= Xus hold= Yus (empty queue) + Thread N: --- Phase Breakdown --- + Thread N: complete: Xus (Y%) [notify: edges=A, max_degree=B, avg=C] + Thread N: dispatch: Xus (Y%) [pop: hit=A, miss=B, hit_rate=C%] + Thread N: scan: Xus (Y%) + Thread N: idle: Xus (Y%) """ threads = {} with open(log_path, 'r', errors='ignore') as f: @@ -65,99 +57,45 @@ def parse_scheduler_threads(log_path): 'tasks_per_loop': float(m.group(5)), } - # Phase: scan (distinguished from lock scan by absence of "wait=") - m = re.search(r'Thread (\d+):\s+scan:\s+([\d.]+)us \(\s*([\d.]+)%\)', line) - if m: - tid = int(m.group(1)) - if tid in threads: - threads[tid]['scan_us'] = float(m.group(2)) - threads[tid]['scan_pct'] = float(m.group(3)) - - # Phase: early_ready - m = re.search(r'Thread (\d+):\s+early_ready:\s+([\d.]+)us \(\s*([\d.]+)%\)', line) - if m: - tid = int(m.group(1)) - if tid in threads: - threads[tid]['early_ready_us'] = float(m.group(2)) - threads[tid]['early_ready_pct'] = float(m.group(3)) - - # Phase: complete [fanout: edges=X, max_degree=Y, avg=Z] - m = re.search(r'Thread (\d+):\s+complete:\s+([\d.]+)us \(\s*([\d.]+)%\)\s+\[fanout: edges=(\d+), max_degree=(\d+), avg=([\d.]+)\]', line) + # Phase: complete [notify: edges=X, max_degree=Y, avg=Z] + m = re.search(r'Thread (\d+):\s+complete:\s+([\d.]+)us \(\s*([\d.]+)%\)\s+\[notify: edges=(\d+), max_degree=(\d+), avg=([\d.]+)\]', line) if m: tid = int(m.group(1)) if tid in threads: threads[tid]['complete_us'] = float(m.group(2)) threads[tid]['complete_pct'] = float(m.group(3)) - threads[tid]['fanout_edges'] = int(m.group(4)) - threads[tid]['fanout_max_degree'] = int(m.group(5)) - threads[tid]['fanout_avg'] = float(m.group(6)) + threads[tid]['notify_edges'] = int(m.group(4)) + threads[tid]['notify_max_degree'] = int(m.group(5)) + threads[tid]['notify_avg'] = float(m.group(6)) - # Phase: dispatch [steal: own=X, steal=Y, pct=Z%] - m = re.search(r'Thread (\d+):\s+dispatch:\s+([\d.]+)us \(\s*([\d.]+)%\)\s+\[steal: own=(\d+), steal=(\d+), pct=([\d.]+)%\]', line) + # Phase: dispatch [pop: hit=X, miss=Y, hit_rate=Z%] + m = re.search(r'Thread (\d+):\s+dispatch:\s+([\d.]+)us \(\s*([\d.]+)%\)\s+\[pop: hit=(\d+), miss=(\d+), hit_rate=([\d.]+)%\]', line) if m: tid = int(m.group(1)) if tid in threads: threads[tid]['dispatch_us'] = float(m.group(2)) threads[tid]['dispatch_pct'] = float(m.group(3)) - threads[tid]['steal_own'] = int(m.group(4)) - threads[tid]['steal_steal'] = int(m.group(5)) - threads[tid]['steal_pct'] = float(m.group(6)) - - # Lock: total - m = re.search(r'Thread (\d+):\s+total:\s+wait=\s*(\d+)us hold=\s*(\d+)us', line) - if m: - tid = int(m.group(1)) - if tid in threads: - threads[tid]['lock_wait_us'] = int(m.group(2)) - threads[tid]['lock_hold_us'] = int(m.group(3)) - - # Lock: scan - m = re.search(r'Thread (\d+):\s+scan:\s+wait=\s*(\d+)us hold=\s*(\d+)us', line) - if m: - tid = int(m.group(1)) - if tid in threads: - threads[tid]['lock_scan_wait'] = int(m.group(2)) - threads[tid]['lock_scan_hold'] = int(m.group(3)) + threads[tid]['pop_hit'] = int(m.group(4)) + threads[tid]['pop_miss'] = int(m.group(5)) + threads[tid]['pop_hit_rate'] = float(m.group(6)) - # Lock: early_ready - m = re.search(r'Thread (\d+):\s+early_ready:\s+wait=\s*(\d+)us hold=\s*(\d+)us', line) + # Phase: scan with optional [enqueue: N] + m = re.search(r'Thread (\d+):\s+scan:\s+([\d.]+)us \(\s*([\d.]+)%\)(?:\s+\[enqueue: (\d+)\])?', line) if m: tid = int(m.group(1)) if tid in threads: - threads[tid]['lock_early_ready_wait'] = int(m.group(2)) - threads[tid]['lock_early_ready_hold'] = int(m.group(3)) - - # Lock: complete - m = re.search(r'Thread (\d+):\s+complete:\s+wait=\s*(\d+)us hold=\s*(\d+)us', line) - if m: - tid = int(m.group(1)) - if tid in threads: - threads[tid]['lock_complete_wait'] = int(m.group(2)) - threads[tid]['lock_complete_hold'] = int(m.group(3)) - - # Lock: dispatch - m = re.search(r'Thread (\d+):\s+dispatch:\s+wait=\s*(\d+)us hold=\s*(\d+)us', line) - if m: - tid = int(m.group(1)) - if tid in threads: - threads[tid]['lock_dispatch_wait'] = int(m.group(2)) - threads[tid]['lock_dispatch_hold'] = int(m.group(3)) - - # Lock: dispatch hit - m = re.search(r'Thread (\d+):\s+hit:\s+wait=\s*(\d+)us hold=\s*(\d+)us', line) - if m: - tid = int(m.group(1)) - if tid in threads: - threads[tid]['lock_dispatch_hit_wait'] = int(m.group(2)) - threads[tid]['lock_dispatch_hit_hold'] = int(m.group(3)) + threads[tid]['scan_us'] = float(m.group(2)) + threads[tid]['scan_pct'] = float(m.group(3)) + if m.group(4) is not None: + threads[tid]['scan_enqueue'] = int(m.group(4)) - # Lock: dispatch miss - m = re.search(r'Thread (\d+):\s+miss:\s+wait=\s*(\d+)us hold=\s*(\d+)us', line) + # Phase: idle + m = re.search(r'Thread (\d+):\s+idle:\s+([\d.]+)us \(\s*([\d.]+)%\)', line) if m: tid = int(m.group(1)) if tid in threads: - threads[tid]['lock_dispatch_miss_wait'] = int(m.group(2)) - threads[tid]['lock_dispatch_miss_hold'] = int(m.group(3)) + threads[tid]['idle_us'] = float(m.group(2)) + threads[tid]['idle_pct'] = float(m.group(3)) return threads @@ -304,12 +242,12 @@ def run_analysis(perf_path, log_path, print_sources=True, selection_strategy=Non print() # Phase breakdown - phases = ['scan', 'early_ready', 'complete', 'dispatch'] + phases = ['complete', 'scan', 'dispatch', 'idle'] phase_labels = { - 'scan': 'Scan (discover new root tasks)', - 'early_ready': 'Early ready (deps met at submit time)', - 'complete': 'Complete (poll handshake, resolve fanout)', - 'dispatch': 'Dispatch (pop queue, build payload, flush)', + 'complete': 'Complete (poll handshake, notify consumers)', + 'scan': 'Scan (drain orch_pending, perf header update)', + 'dispatch': 'Dispatch (pop queue, build payload, register write)', + 'idle': 'Idle (no progress, spinning)', } fmt3 = " {:<50} {:>11} {:>10} {:>14}" @@ -325,45 +263,25 @@ def run_analysis(perf_path, log_path, print_sources=True, selection_strategy=Non print(fmt3.format(phase_labels[p], f'{tot:.1f}', f'{pct:.1f}%', f'{avg:.2f}')) print() - # Fanout stats (from complete phase) - fanout_edges = sum(t.get('fanout_edges', 0) for t in threads.values()) - fanout_max = max((t.get('fanout_max_degree', 0) for t in threads.values()), default=0) - fanout_avg_weighted = sum(t.get('fanout_avg', 0) * t.get('fanout_edges', 0) for t in threads.values()) - fanout_avg = fanout_avg_weighted / fanout_edges if fanout_edges > 0 else 0 - print(f' Fanout: total edges={fanout_edges}, max_degree={fanout_max}, avg_degree={fanout_avg:.1f}') + # Notify stats (from complete phase) + notify_edges = sum(t.get('notify_edges', 0) for t in threads.values()) + notify_max = max((t.get('notify_max_degree', 0) for t in threads.values()), default=0) + notify_avg_weighted = sum(t.get('notify_avg', 0) * t.get('notify_edges', 0) for t in threads.values()) + notify_avg = notify_avg_weighted / notify_edges if notify_edges > 0 else 0 + print(f' Notify: total edges={notify_edges}, max_degree={notify_max}, avg_degree={notify_avg:.1f}') print() - # Work stealing stats (from dispatch phase) - steal_own = sum(t.get('steal_own', 0) for t in threads.values()) - steal_steal = sum(t.get('steal_steal', 0) for t in threads.values()) - steal_total = steal_own + steal_steal - steal_pct = steal_steal / steal_total * 100 if steal_total > 0 else 0 - print(f' Work stealing: own={steal_own}, stolen={steal_steal} ({steal_pct:.1f}% steal rate)') - print() - - # Lock contention breakdown - fmt4 = " {:<50} {:>11} {:>10}" - print(fmt4.format('Lock contention (ready_q)', 'Total (us)', '% of total')) - print(' ' + '-' * 75) - lock_wait = sum(t.get('lock_wait_us', 0) for t in threads.values()) - lock_hold = sum(t.get('lock_hold_us', 0) for t in threads.values()) - print(fmt4.format(' wait (spinning for lock)', str(lock_wait), f'{lock_wait/total_us*100:.1f}%' if total_us > 0 else '0.0%')) - print(fmt4.format(' hold (inside critical section)', str(lock_hold), f'{lock_hold/total_us*100:.1f}%' if total_us > 0 else '0.0%')) - print() + # Pop efficiency stats (from dispatch phase) + pop_hit = sum(t.get('pop_hit', 0) for t in threads.values()) + pop_miss = sum(t.get('pop_miss', 0) for t in threads.values()) + pop_total = pop_hit + pop_miss + pop_hit_rate = pop_hit / pop_total * 100 if pop_total > 0 else 0 + print(f' Pop efficiency: hit={pop_hit}, miss={pop_miss}, hit_rate={pop_hit_rate:.1f}%') - # Lock wait breakdown by phase - print(' Lock wait by phase:') - for p in phases: - w = sum(t.get(f'lock_{p}_wait', 0) for t in threads.values()) - h = sum(t.get(f'lock_{p}_hold', 0) for t in threads.values()) - print(f' {p:<14} wait={w:>6} us hold={h:>6} us') - # Dispatch hit/miss sub-breakdown - hit_w = sum(t.get('lock_dispatch_hit_wait', 0) for t in threads.values()) - hit_h = sum(t.get('lock_dispatch_hit_hold', 0) for t in threads.values()) - miss_w = sum(t.get('lock_dispatch_miss_wait', 0) for t in threads.values()) - miss_h = sum(t.get('lock_dispatch_miss_hold', 0) for t in threads.values()) - print(f' {"hit":<12} wait={hit_w:>6} us hold={hit_h:>6} us (dequeued task)') - print(f' {"miss":<12} wait={miss_w:>6} us hold={miss_h:>6} us (empty queue)') + # Enqueue stats (from scan phase) + scan_enqueue = sum(t.get('scan_enqueue', 0) for t in threads.values()) + if scan_enqueue > 0: + print(f' Scan enqueue: {scan_enqueue} tasks moved from orch_pending to ready_queue') print() print('=' * 90) @@ -397,33 +315,23 @@ def run_analysis(perf_path, log_path, print_sources=True, selection_strategy=Non print(' Scheduler CPU time breakdown (per completed task):') - # Build phase data with sub-items for sorting + # Build phase data for sorting phase_details = { 'dispatch': { - 'label': 'Dispatch phase (build payload + cache flush)', + 'label': 'Dispatch phase (pop queue + build payload + register write)', 'total': phase_totals.get('dispatch', 0), - 'sub_items': [ - ('Lock wait (ready_q pop)', sum(t.get('lock_dispatch_wait', 0) for t in threads.values())), - ('Lock hold + build + dc cvac/civac + dsb sy', phase_totals.get('dispatch', 0) - sum(t.get('lock_dispatch_wait', 0) for t in threads.values())), - ] }, 'complete': { - 'label': 'Complete phase (poll + fanout resolve)', + 'label': 'Complete phase (poll + notify consumers)', 'total': phase_totals.get('complete', 0), - 'sub_items': [ - ('Lock wait (ready_q push)', sum(t.get('lock_complete_wait', 0) for t in threads.values())), - ('Fanout traversal + atomic ops', phase_totals.get('complete', 0) - sum(t.get('lock_complete_wait', 0) for t in threads.values())), - ] }, 'scan': { - 'label': 'Scan phase (new task discovery)', + 'label': 'Scan phase (drain orch_pending + perf header update)', 'total': phase_totals.get('scan', 0), - 'sub_items': [] }, - 'early_ready': { - 'label': 'Early ready (deps met at submit time)', - 'total': phase_totals.get('early_ready', 0), - 'sub_items': [] + 'idle': { + 'label': 'Idle (spinning, no progress)', + 'total': phase_totals.get('idle', 0), }, } @@ -432,34 +340,23 @@ def run_analysis(perf_path, log_path, print_sources=True, selection_strategy=Non per_task = detail['total'] / total_completed if total_completed > 0 else 0 pct = detail['total'] / total_us * 100 if total_us > 0 else 0 print(f' - {detail["label"]:<50} {per_task:.2f} us/task ({pct:.1f}% of scheduler CPU)') - for sub_label, sub_total in detail['sub_items']: - sub_per_task = sub_total / total_completed if total_completed > 0 else 0 - print(f' {sub_label:<48} {sub_per_task:.2f} us/task') print() print(f' Avg Tail OH = {avg_tail_oh:.1f} us ~= {loop_ratio:.1f} x avg loop iteration ({avg_loop_us:.1f} us)') print(f' -> On average, a completed task waits ~{loop_ratio:.1f} loop iterations before being detected') print() - # Data-driven insight: find the dominant phase (excluding early_ready which is typically trivial) + # Data-driven insight: find the dominant phase (excluding idle) work_phases = {p: phase_totals.get(p, 0) for p in ['scan', 'complete', 'dispatch']} dominant_phase = max(work_phases, key=work_phases.get) dominant_pct = work_phases[dominant_phase] / total_us * 100 if total_us > 0 else 0 print(f' Key insight: {phase_labels[dominant_phase].split(" (")[0]} phase consumes ~{dominant_pct:.0f}% of scheduler CPU.') if dominant_phase == 'dispatch': - dispatch_total = phase_totals.get('dispatch', 0) - dispatch_lock_pct = sum(t.get('lock_dispatch_wait', 0) for t in threads.values()) / dispatch_total * 100 if dispatch_total > 0 else 0 - print(f' Within dispatch, lock contention accounts for {dispatch_lock_pct:.0f}% of time.') - if miss_w > hit_w: - print(f' Dispatch miss (empty queue) dominates lock wait: miss={miss_w}us vs hit={hit_w}us.') - print(' Cache flush (dc cvac + dsb sy) is the dominant non-lock cost.') + print(f' Pop hit rate = {pop_hit_rate:.1f}% — low hit rate means cores idle waiting for ready tasks.') elif dominant_phase == 'complete': - complete_total = phase_totals.get('complete', 0) - complete_lock_pct = sum(t.get('lock_complete_wait', 0) for t in threads.values()) / complete_total * 100 if complete_total > 0 else 0 - print(f' Within complete, lock contention accounts for {complete_lock_pct:.0f}% of time.') - print(' Fanout traversal and atomic ops dominate the non-lock cost.') + print(f' Notify avg_degree = {notify_avg:.1f} — high degree means many consumers per task.') elif dominant_phase == 'scan': - print(' Scan phase overhead indicates too many root tasks or inefficient task graph traversal.') + print(' Scan phase overhead indicates orch_pending drain and/or perf header updates.') print('=' * 90) return 0 diff --git a/tools/swimlane_converter.py b/tools/swimlane_converter.py index db1ec714c..1f2c9e958 100644 --- a/tools/swimlane_converter.py +++ b/tools/swimlane_converter.py @@ -564,7 +564,7 @@ def generate_chrome_trace_json(tasks, output_path, func_id_to_name=None, verbose "complete": "good", # green "dispatch": "terrible", # red "scan": "thread_state_running", # blue - "early_ready": "yellow", # yellow + "idle": "yellow", # yellow } for thread_idx, thread_records in enumerate(scheduler_phases):