From a7b6126b4971744fde7b5fd2f055a849e6ae0127 Mon Sep 17 00:00:00 2001 From: wcwxy <26245345+ChaoWao@users.noreply.github.com> Date: Mon, 2 Mar 2026 09:40:42 +0800 Subject: [PATCH] Feature: AICPU phase profiling with per-task orchestrator recording and dependency arrows - Add scheduler phase profiling: record COMPLETE/DISPATCH/SCAN/EARLY_READY phases per loop iteration with per-thread buffers in shared memory - Add per-task orchestrator phase recording (sync/alloc/params/lookup/heap/ insert/fanin/finalize/scope_end) using AicpuPhaseRecord with dedicated buffer slot, exported as aicpu_orchestrator_phases JSON array - Write cumulative AicpuOrchSummary to shared memory for backward compat - Host-side collection reads both scheduler and orchestrator phase records, exports version 2 JSON with scheduler phases, orchestrator summary, and per-task orchestrator phases - Swimlane converter renders scheduler phases as color-coded bars on pid=3, per-task orchestrator phases on pid=4 with per-phase colors - Add AICPU View (pid=2) fanout dependency arrows mirroring AICore View - Add Scheduler DISPATCH to AICore/AICPU task execution flow arrows - Set process sort order: Orchestrator, Scheduler, AICPU View, AICore View --- src/platform/a2a3/host/device_runner.cpp | 3 +- src/platform/a2a3sim/aicpu/CMakeLists.txt | 2 + src/platform/a2a3sim/host/device_runner.cpp | 3 +- .../aicpu/performance_collector_aicpu.h | 64 ++++ src/platform/include/common/perf_profiling.h | 137 +++++++- .../include/host/performance_collector.h | 14 + .../src/aicpu/performance_collector_aicpu.cpp | 97 ++++++ .../src/host/performance_collector.cpp | 209 ++++++++++- .../aicpu/aicpu_executor.cpp | 65 ++++ .../runtime/pto_orchestrator.cpp | 35 +- tools/swimlane_converter.py | 327 +++++++++++++++++- 11 files changed, 931 insertions(+), 25 deletions(-) diff --git a/src/platform/a2a3/host/device_runner.cpp b/src/platform/a2a3/host/device_runner.cpp index 97474f501..a692818cf 100644 --- a/src/platform/a2a3/host/device_runner.cpp +++ b/src/platform/a2a3/host/device_runner.cpp @@ -427,8 +427,9 @@ int DeviceRunner::run(Runtime& runtime, return rc; } - // Print collected performance data (after stream sync) + // Collect phase data and print performance data (after stream sync) if (runtime.enable_profiling) { + perf_collector_.collect_phase_data(); export_swimlane_json(); } diff --git a/src/platform/a2a3sim/aicpu/CMakeLists.txt b/src/platform/a2a3sim/aicpu/CMakeLists.txt index a7d39e4be..42731556c 100644 --- a/src/platform/a2a3sim/aicpu/CMakeLists.txt +++ b/src/platform/a2a3sim/aicpu/CMakeLists.txt @@ -50,6 +50,8 @@ target_compile_options(aicpu_kernel PRIVATE -Wall -Wextra + -Werror + -Wno-error=class-memaccess -fPIC -O3 -g diff --git a/src/platform/a2a3sim/host/device_runner.cpp b/src/platform/a2a3sim/host/device_runner.cpp index af85db934..fd8cd37b7 100644 --- a/src/platform/a2a3sim/host/device_runner.cpp +++ b/src/platform/a2a3sim/host/device_runner.cpp @@ -297,8 +297,9 @@ int DeviceRunner::run(Runtime& runtime, LOG_INFO("All threads completed"); - // Print performance data after execution completes + // Collect AICPU phase data and print performance data after execution completes if (runtime.enable_profiling) { + perf_collector_.collect_phase_data(); export_swimlane_json(); } diff --git a/src/platform/include/aicpu/performance_collector_aicpu.h b/src/platform/include/aicpu/performance_collector_aicpu.h index 7a831ebaa..11695c3f4 100644 --- a/src/platform/include/aicpu/performance_collector_aicpu.h +++ b/src/platform/include/aicpu/performance_collector_aicpu.h @@ -78,4 +78,68 @@ void perf_aicpu_flush_buffers(Runtime* runtime, */ void perf_aicpu_update_total_tasks(Runtime* runtime, uint32_t total_tasks); +/** + * Initialize AICPU phase profiling + * + * Sets up AicpuPhaseHeader and clears per-thread phase record buffers. + * Must be called once from thread 0 after perf_aicpu_init_profiling(). + * + * @param runtime Runtime instance pointer + * @param num_sched_threads Number of scheduler threads + */ +void perf_aicpu_init_phase_profiling(Runtime* runtime, int num_sched_threads); + +/** + * Record a single scheduler phase + * + * Appends an AicpuPhaseRecord to the specified thread's buffer. + * Silently drops records when the buffer is full. + * + * @param thread_idx Scheduler thread index + * @param phase_id Phase identifier + * @param start_time Phase start timestamp + * @param end_time Phase end timestamp + * @param loop_iter Current loop iteration number + * @param tasks_processed Number of tasks processed in this phase + */ +void perf_aicpu_record_phase(int thread_idx, + AicpuPhaseId phase_id, + uint64_t start_time, uint64_t end_time, + uint32_t loop_iter, uint32_t tasks_processed); + +/** + * Write orchestrator cumulative summary + * + * Writes the orchestrator's accumulated profiling data to shared memory + * for host-side collection. + * + * @param src Pointer to populated AicpuOrchSummary (magic field is set internally) + */ +void perf_aicpu_write_orch_summary(const AicpuOrchSummary* src); + +/** + * Set orchestrator thread index for per-task phase recording + * + * Must be called once from the orchestrator thread before any + * perf_aicpu_record_orch_phase() calls. + * + * @param thread_idx Thread index for the orchestrator (typically num_sched_threads) + */ +void perf_aicpu_set_orch_thread_idx(int thread_idx); + +/** + * Record a single orchestrator phase + * + * Appends an AicpuPhaseRecord for one sub-step of pto2_submit_task(). + * Uses the orchestrator's dedicated buffer slot (set via set_orch_thread_idx). + * + * @param phase_id Orchestrator phase identifier (ORCH_SYNC..ORCH_SCOPE_END) + * @param start_time Phase start timestamp + * @param end_time Phase end timestamp + * @param submit_idx Task submission index (acts as loop_iter) + */ +void perf_aicpu_record_orch_phase(AicpuPhaseId phase_id, + uint64_t start_time, uint64_t end_time, + uint32_t submit_idx); + #endif // PLATFORM_AICPU_PERFORMANCE_COLLECTOR_AICPU_H_ diff --git a/src/platform/include/common/perf_profiling.h b/src/platform/include/common/perf_profiling.h index a1673e1b1..596f52a16 100644 --- a/src/platform/include/common/perf_profiling.h +++ b/src/platform/include/common/perf_profiling.h @@ -2,7 +2,7 @@ * @file perf_profiling.h * @brief Performance profiling data structures * - * Architecture: Fixed header + dynamic tail + * Architecture: Fixed header + dynamic tail + optional phase profiling region * * Memory layout: * ┌─────────────────────────────────────────────────────────────┐ @@ -19,9 +19,21 @@ * │ ... │ * ├─────────────────────────────────────────────────────────────┤ * │ DoubleBuffer[num_cores-1] │ + * ├─────────────────────────────────────────────────────────────┤ + * │ AicpuPhaseHeader (optional, present when phase profiling) │ + * │ - magic, num_sched_threads, records_per_thread │ + * │ - buffer_counts[PLATFORM_MAX_AICPU_THREADS] │ + * │ - orch_summary │ + * ├─────────────────────────────────────────────────────────────┤ + * │ AicpuPhaseRecord[thread0][0..records_per_thread-1] │ + * ├─────────────────────────────────────────────────────────────┤ + * │ AicpuPhaseRecord[thread1][0..records_per_thread-1] │ + * ├─────────────────────────────────────────────────────────────┤ + * │ ... │ * └─────────────────────────────────────────────────────────────┘ * - * Total size = sizeof(PerfDataHeader) + num_cores * sizeof(DoubleBuffer) + * Base size = sizeof(PerfDataHeader) + num_cores * sizeof(DoubleBuffer) + * With phases = Base + sizeof(AicpuPhaseHeader) + num_sched_threads * records_per_thread * sizeof(AicpuPhaseRecord) */ #ifndef PLATFORM_COMMON_PERF_PROFILING_H_ @@ -178,6 +190,90 @@ struct PerfDataHeader { volatile uint32_t total_tasks; // Total tasks (AICPU writes after orchestration) } __attribute__((aligned(64))); +// ============================================================================= +// AICPU Phase Profiling - Scheduler and Orchestrator Records +// ============================================================================= + +/** + * AICPU phase identifier + * + * Scheduler phases (0-3): four phases in each scheduler loop iteration. + * Orchestrator phases (16-24): sub-steps within each pto2_submit_task() call. + */ +enum class AicpuPhaseId : uint32_t { + // Scheduler phases (0-3) + SCHED_COMPLETE = 0, // Process completed tasks (fanout traversal) + SCHED_DISPATCH = 1, // Dispatch ready tasks to idle cores + SCHED_SCAN = 2, // Incremental scan for root tasks + SCHED_EARLY_READY = 3, // Drain orchestrator's early-ready queue + // Orchestrator phases (16-24) + ORCH_SYNC = 16, // tensormap sync + ORCH_ALLOC = 17, // task_ring_alloc + ORCH_PARAMS = 18, // param copy + ORCH_LOOKUP = 19, // tensormap lookup + dep + ORCH_HEAP = 20, // heap alloc + ORCH_INSERT = 21, // tensormap insert + ORCH_FANIN = 22, // fanin + early-ready + ORCH_FINALIZE = 23, // scheduler init + SM + ORCH_SCOPE_END = 24 // scope_end +}; + +/** + * Single AICPU scheduler phase record (32 bytes) + * + * Records one phase within one loop iteration of a scheduler thread. + * No thread_id field: identity is derived from array index (position = identity). + */ +struct AicpuPhaseRecord { + uint64_t start_time; // Phase start timestamp + uint64_t end_time; // Phase end timestamp + uint32_t loop_iter; // Loop iteration number + AicpuPhaseId phase_id; // Phase type + uint32_t tasks_processed; // Tasks processed in this phase + uint32_t padding; // Alignment padding +}; + +/** + * AICPU orchestrator cumulative summary + * + * Contains accumulated cycle counts from the orchestrator thread. + * Written once after orchestration completes. + */ +struct AicpuOrchSummary { + uint64_t start_time; // Orchestrator start timestamp + uint64_t end_time; // Orchestrator end timestamp + uint64_t sync_cycle; // sync_tensormap phase + uint64_t alloc_cycle; // task_ring_alloc phase + uint64_t params_cycle; // param_copy phase + uint64_t lookup_cycle; // lookup+dep phase + uint64_t heap_cycle; // heap_alloc phase + uint64_t insert_cycle; // tensormap_insert phase + uint64_t fanin_cycle; // fanin+ready phase + uint64_t finalize_cycle; // finalize+SM phase + uint64_t scope_end_cycle; // scope_end phase + int64_t submit_count; // Total tasks submitted + uint32_t magic; // Validation magic (AICPU_PHASE_MAGIC) + uint32_t padding; // Alignment padding +} __attribute__((aligned(64))); + +constexpr uint32_t AICPU_PHASE_MAGIC = 0x41435048; // "ACPH" +constexpr int PLATFORM_PHASE_RECORDS_PER_THREAD = 16384; // ~512KB per thread + +/** + * AICPU phase profiling header + * + * Located after the DoubleBuffer array in shared memory. + * Contains metadata and per-thread record counts. + */ +struct AicpuPhaseHeader { + uint32_t magic; // Validation magic (AICPU_PHASE_MAGIC) + uint32_t num_sched_threads; // Number of scheduler threads + uint32_t records_per_thread; // Max records per thread + uint32_t padding; // Alignment padding + volatile uint32_t buffer_counts[PLATFORM_MAX_AICPU_THREADS]; // Per-thread record counts + AicpuOrchSummary orch_summary; // Orchestrator cumulative data +} __attribute__((aligned(64))); + // ============================================================================= // Helper Functions - Memory Layout // ============================================================================= @@ -250,6 +346,43 @@ inline void get_buffer_and_status(DoubleBuffer* db, uint32_t buffer_id, } } +/** + * Calculate total memory size including phase profiling region + * + * @param num_cores Number of AICore instances + * @param num_sched_threads Number of scheduler threads (typically 3) + * @return Total bytes needed + */ +inline size_t calc_perf_data_size_with_phases(int num_cores, int num_sched_threads) { + return calc_perf_data_size(num_cores) + + sizeof(AicpuPhaseHeader) + + num_sched_threads * PLATFORM_PHASE_RECORDS_PER_THREAD * sizeof(AicpuPhaseRecord); +} + +/** + * Get AicpuPhaseHeader pointer (located after DoubleBuffer array) + * + * @param base_ptr Shared memory base address + * @param num_cores Number of AICore instances + * @return AicpuPhaseHeader pointer + */ +inline AicpuPhaseHeader* get_phase_header(void* base_ptr, int num_cores) { + return (AicpuPhaseHeader*)((char*)base_ptr + calc_perf_data_size(num_cores)); +} + +/** + * Get AicpuPhaseRecord array for specified thread + * + * @param base_ptr Shared memory base address + * @param num_cores Number of AICore instances + * @param thread_idx Scheduler thread index + * @return AicpuPhaseRecord array pointer + */ +inline AicpuPhaseRecord* get_phase_records(void* base_ptr, int num_cores, int thread_idx) { + char* phase_start = (char*)get_phase_header(base_ptr, num_cores) + sizeof(AicpuPhaseHeader); + return (AicpuPhaseRecord*)(phase_start + thread_idx * PLATFORM_PHASE_RECORDS_PER_THREAD * sizeof(AicpuPhaseRecord)); +} + #ifdef __cplusplus } #endif diff --git a/src/platform/include/host/performance_collector.h b/src/platform/include/host/performance_collector.h index d5578d981..f1c7fd172 100644 --- a/src/platform/include/host/performance_collector.h +++ b/src/platform/include/host/performance_collector.h @@ -135,6 +135,14 @@ class PerformanceCollector { */ bool is_initialized() const { return perf_shared_mem_host_ != nullptr; } + /** + * Collect AICPU phase profiling data from shared memory + * + * Reads scheduler phase records and orchestrator summary from the + * phase profiling region. Must be called after AICPU threads have joined. + */ + void collect_phase_data(); + /** * Get collected records (for testing) */ @@ -152,6 +160,12 @@ class PerformanceCollector { // Collected data (per-core vectors, indexed by core_index) std::vector> collected_perf_records_; + + // AICPU phase profiling data + std::vector> collected_phase_records_; + std::vector collected_orch_phase_records_; + AicpuOrchSummary collected_orch_summary_{}; + bool has_phase_data_{false}; }; #endif // PLATFORM_HOST_PERFORMANCE_COLLECTOR_H_ diff --git a/src/platform/src/aicpu/performance_collector_aicpu.cpp b/src/platform/src/aicpu/performance_collector_aicpu.cpp index 77f5ad0c6..71533b854 100644 --- a/src/platform/src/aicpu/performance_collector_aicpu.cpp +++ b/src/platform/src/aicpu/performance_collector_aicpu.cpp @@ -8,6 +8,13 @@ #include "common/unified_log.h" #include "common/platform_config.h" +#include + +// Cached phase profiling pointers (set during init, used on hot path) +static AicpuPhaseHeader* s_phase_header = nullptr; +static AicpuPhaseRecord* s_phase_records[PLATFORM_MAX_AICPU_THREADS] = {}; +static int s_orch_thread_idx = -1; + /** * Enqueue ready buffer to per-thread queue * @@ -290,3 +297,93 @@ void perf_aicpu_update_total_tasks(Runtime* runtime, uint32_t total_tasks) { header->total_tasks = total_tasks; wmb(); } + +void perf_aicpu_init_phase_profiling(Runtime* runtime, int num_sched_threads) { + void* perf_base = (void*)runtime->perf_data_base; + if (perf_base == nullptr) { + LOG_ERROR("perf_data_base is NULL, cannot initialize phase profiling"); + return; + } + + s_phase_header = get_phase_header(perf_base, runtime->worker_count); + + s_phase_header->magic = AICPU_PHASE_MAGIC; + s_phase_header->num_sched_threads = num_sched_threads; + s_phase_header->records_per_thread = PLATFORM_PHASE_RECORDS_PER_THREAD; + s_phase_header->padding = 0; + + for (int i = 0; i < PLATFORM_MAX_AICPU_THREADS; i++) { + s_phase_header->buffer_counts[i] = 0; + } + + memset(&s_phase_header->orch_summary, 0, sizeof(AicpuOrchSummary)); + + // Cache per-thread record pointers and clear buffers + // Include orchestrator slot (index = num_sched_threads) if within bounds + int total_threads = (num_sched_threads < PLATFORM_MAX_AICPU_THREADS) + ? num_sched_threads + 1 : num_sched_threads; + for (int t = 0; t < total_threads; t++) { + s_phase_records[t] = get_phase_records(perf_base, runtime->worker_count, t); + memset(s_phase_records[t], 0, PLATFORM_PHASE_RECORDS_PER_THREAD * sizeof(AicpuPhaseRecord)); + } + + wmb(); + + LOG_INFO("Phase profiling initialized: %d scheduler threads (+1 orch), %d records/thread", + num_sched_threads, PLATFORM_PHASE_RECORDS_PER_THREAD); +} + +void perf_aicpu_record_phase(int thread_idx, + AicpuPhaseId phase_id, + uint64_t start_time, uint64_t end_time, + uint32_t loop_iter, uint32_t tasks_processed) { + if (s_phase_header == nullptr) { + return; + } + + uint32_t idx = s_phase_header->buffer_counts[thread_idx]; + + if (idx >= PLATFORM_PHASE_RECORDS_PER_THREAD) { + return; // Buffer full, silently drop + } + + AicpuPhaseRecord* record = &s_phase_records[thread_idx][idx]; + + record->start_time = start_time; + record->end_time = end_time; + record->loop_iter = loop_iter; + record->phase_id = phase_id; + record->tasks_processed = tasks_processed; + record->padding = 0; + + s_phase_header->buffer_counts[thread_idx] = idx + 1; +} + +void perf_aicpu_write_orch_summary(const AicpuOrchSummary* src) { + if (s_phase_header == nullptr) { + return; + } + + AicpuOrchSummary* dst = &s_phase_header->orch_summary; + + memcpy(dst, src, sizeof(AicpuOrchSummary)); + dst->magic = AICPU_PHASE_MAGIC; + dst->padding = 0; + + wmb(); + + LOG_INFO("Orchestrator summary written: %lld tasks, %.3fus", + (long long)src->submit_count, + cycles_to_us(src->end_time - src->start_time)); +} + +void perf_aicpu_set_orch_thread_idx(int thread_idx) { + s_orch_thread_idx = thread_idx; +} + +void perf_aicpu_record_orch_phase(AicpuPhaseId phase_id, + uint64_t start_time, uint64_t end_time, + uint32_t submit_idx) { + if (s_orch_thread_idx < 0 || s_phase_header == nullptr) return; + perf_aicpu_record_phase(s_orch_thread_idx, phase_id, start_time, end_time, submit_idx, 0); +} diff --git a/src/platform/src/host/performance_collector.cpp b/src/platform/src/host/performance_collector.cpp index 09cab5955..ea7a86cab 100644 --- a/src/platform/src/host/performance_collector.cpp +++ b/src/platform/src/host/performance_collector.cpp @@ -44,8 +44,10 @@ int PerformanceCollector::initialize(Runtime& runtime, device_id_ = device_id; num_aicore_ = num_aicore; - // Step 1: Calculate total memory size - size_t total_size = calc_perf_data_size(num_aicore); + // Step 1: Calculate total memory size (with phase profiling region) + // All PLATFORM_MAX_AICPU_THREADS slots: scheduler threads + orchestrator thread + int num_phase_threads = PLATFORM_MAX_AICPU_THREADS; + size_t total_size = calc_perf_data_size_with_phases(num_aicore, num_phase_threads); size_t header_size = sizeof(PerfDataHeader); size_t single_db_size = sizeof(DoubleBuffer); size_t buffers_size = num_aicore * single_db_size; @@ -274,6 +276,79 @@ void PerformanceCollector::poll_and_collect(int expected_tasks) { LOG_INFO("Performance data collection complete"); } +void PerformanceCollector::collect_phase_data() { + if (perf_shared_mem_host_ == nullptr) { + return; + } + + rmb(); + + AicpuPhaseHeader* phase_header = get_phase_header(perf_shared_mem_host_, num_aicore_); + + // Validate magic + if (phase_header->magic != AICPU_PHASE_MAGIC) { + LOG_INFO("No phase profiling data found (magic mismatch: 0x%x vs 0x%x)", + phase_header->magic, AICPU_PHASE_MAGIC); + return; + } + + int num_sched_threads = phase_header->num_sched_threads; + if (num_sched_threads > PLATFORM_MAX_AICPU_THREADS) { + LOG_ERROR("Invalid num_sched_threads %d from shared memory (max=%d)", + num_sched_threads, PLATFORM_MAX_AICPU_THREADS); + return; + } + LOG_INFO("Collecting phase data: %d scheduler threads", num_sched_threads); + + // Read per-thread phase records + collected_phase_records_.clear(); + collected_phase_records_.resize(num_sched_threads); + + int total_phase_records = 0; + for (int t = 0; t < num_sched_threads; t++) { + uint32_t count = phase_header->buffer_counts[t]; + if (count > PLATFORM_PHASE_RECORDS_PER_THREAD) { + count = PLATFORM_PHASE_RECORDS_PER_THREAD; + } + + AicpuPhaseRecord* records = get_phase_records(perf_shared_mem_host_, num_aicore_, t); + collected_phase_records_[t].assign(records, records + count); + total_phase_records += count; + LOG_INFO(" Thread %d: %u phase records", t, count); + } + + // Read orchestrator per-task phase records (slot = num_sched_threads) + collected_orch_phase_records_.clear(); + if (num_sched_threads < PLATFORM_MAX_AICPU_THREADS) { + uint32_t orch_count = phase_header->buffer_counts[num_sched_threads]; + if (orch_count > PLATFORM_PHASE_RECORDS_PER_THREAD) { + orch_count = PLATFORM_PHASE_RECORDS_PER_THREAD; + } + if (orch_count > 0) { + AicpuPhaseRecord* orch_records = get_phase_records(perf_shared_mem_host_, num_aicore_, num_sched_threads); + collected_orch_phase_records_.assign(orch_records, orch_records + orch_count); + total_phase_records += orch_count; + LOG_INFO(" Orchestrator: %u per-task phase records", orch_count); + } + } + + // Read orchestrator summary + collected_orch_summary_ = phase_header->orch_summary; + bool orch_valid = (collected_orch_summary_.magic == AICPU_PHASE_MAGIC); + + if (orch_valid) { + LOG_INFO(" Orchestrator: %lld tasks, %.3fus", + (long long)collected_orch_summary_.submit_count, + cycles_to_us(collected_orch_summary_.end_time - collected_orch_summary_.start_time)); + } else { + LOG_INFO(" Orchestrator: no summary data"); + } + + has_phase_data_ = (total_phase_records > 0 || orch_valid); + LOG_INFO("Phase data collection complete: %d records (%zu orch), orch_summary=%s", + total_phase_records, collected_orch_phase_records_.size(), orch_valid ? "yes" : "no"); +} + int PerformanceCollector::export_swimlane_json(const std::string& output_path) { // Step 1: Validate collected data bool has_any_records = false; @@ -320,7 +395,7 @@ int PerformanceCollector::export_swimlane_json(const std::string& output_path) { return a.record->task_id < b.record->task_id; }); - // Step 4: Calculate base time (minimum kernel_ready_time) + // Step 4: Calculate base time (minimum kernel_ready_time, including phase timestamps) uint64_t base_time_cycles = UINT64_MAX; for (const auto& tagged : tagged_records) { if (tagged.record->kernel_ready_time < base_time_cycles) { @@ -333,6 +408,27 @@ int PerformanceCollector::export_swimlane_json(const std::string& output_path) { } } + // Include phase record timestamps in base_time calculation + if (has_phase_data_) { + for (const auto& thread_records : collected_phase_records_) { + for (const auto& pr : thread_records) { + if (pr.start_time > 0 && pr.start_time < base_time_cycles) { + base_time_cycles = pr.start_time; + } + } + } + for (const auto& pr : collected_orch_phase_records_) { + if (pr.start_time > 0 && pr.start_time < base_time_cycles) { + base_time_cycles = pr.start_time; + } + } + if (collected_orch_summary_.magic == AICPU_PHASE_MAGIC && + collected_orch_summary_.start_time > 0 && + collected_orch_summary_.start_time < base_time_cycles) { + base_time_cycles = collected_orch_summary_.start_time; + } + } + // Step 5: Generate filename with timestamp (YYYYMMDD_HHMMSS) std::time_t now = time(nullptr); std::tm* timeinfo = std::localtime(&now); @@ -349,8 +445,9 @@ int PerformanceCollector::export_swimlane_json(const std::string& output_path) { } // Step 7: Write JSON data + int version = has_phase_data_ ? 2 : 1; outfile << "{\n"; - outfile << " \"version\": 1,\n"; + outfile << " \"version\": " << version << ",\n"; outfile << " \"tasks\": [\n"; for (size_t i = 0; i < tagged_records.size(); ++i) { @@ -395,8 +492,105 @@ int PerformanceCollector::export_swimlane_json(const std::string& output_path) { } outfile << "\n"; } - outfile << " ]\n"; - outfile << "}\n"; + outfile << " ]"; + + // Step 8: Write phase profiling data (version 2) + if (has_phase_data_) { + // AICPU scheduler phases + outfile << ",\n \"aicpu_scheduler_phases\": [\n"; + for (size_t t = 0; t < collected_phase_records_.size(); t++) { + outfile << " [\n"; + const auto& thread_records = collected_phase_records_[t]; + for (size_t r = 0; r < thread_records.size(); r++) { + const auto& pr = thread_records[r]; + double start_us = cycles_to_us(pr.start_time - base_time_cycles); + double end_us = cycles_to_us(pr.end_time - base_time_cycles); + + const char* phase_name = "unknown"; + switch (pr.phase_id) { + case AicpuPhaseId::SCHED_COMPLETE: phase_name = "complete"; break; + case AicpuPhaseId::SCHED_DISPATCH: phase_name = "dispatch"; break; + case AicpuPhaseId::SCHED_SCAN: phase_name = "scan"; break; + case AicpuPhaseId::SCHED_EARLY_READY: phase_name = "early_ready"; break; + default: break; + } + + outfile << " {\"start_time_us\": " << std::fixed << std::setprecision(3) << start_us + << ", \"end_time_us\": " << std::fixed << std::setprecision(3) << end_us + << ", \"phase\": \"" << phase_name << "\"" + << ", \"loop_iter\": " << pr.loop_iter + << ", \"tasks_processed\": " << pr.tasks_processed + << "}"; + if (r < thread_records.size() - 1) outfile << ","; + outfile << "\n"; + } + outfile << " ]"; + if (t < collected_phase_records_.size() - 1) outfile << ","; + outfile << "\n"; + } + outfile << " ]"; + + // AICPU orchestrator summary + if (collected_orch_summary_.magic == AICPU_PHASE_MAGIC) { + double orch_start_us = cycles_to_us(collected_orch_summary_.start_time - base_time_cycles); + double orch_end_us = cycles_to_us(collected_orch_summary_.end_time - base_time_cycles); + + outfile << ",\n \"aicpu_orchestrator\": {\n"; + outfile << " \"start_time_us\": " << std::fixed << std::setprecision(3) << orch_start_us << ",\n"; + outfile << " \"end_time_us\": " << std::fixed << std::setprecision(3) << orch_end_us << ",\n"; + outfile << " \"submit_count\": " << collected_orch_summary_.submit_count << ",\n"; + outfile << " \"phase_us\": {\n"; + outfile << " \"sync\": " << std::fixed << std::setprecision(3) << cycles_to_us(collected_orch_summary_.sync_cycle) << ",\n"; + outfile << " \"alloc\": " << std::fixed << std::setprecision(3) << cycles_to_us(collected_orch_summary_.alloc_cycle) << ",\n"; + outfile << " \"params\": " << std::fixed << std::setprecision(3) << cycles_to_us(collected_orch_summary_.params_cycle) << ",\n"; + outfile << " \"lookup\": " << std::fixed << std::setprecision(3) << cycles_to_us(collected_orch_summary_.lookup_cycle) << ",\n"; + outfile << " \"heap\": " << std::fixed << std::setprecision(3) << cycles_to_us(collected_orch_summary_.heap_cycle) << ",\n"; + outfile << " \"insert\": " << std::fixed << std::setprecision(3) << cycles_to_us(collected_orch_summary_.insert_cycle) << ",\n"; + outfile << " \"fanin\": " << std::fixed << std::setprecision(3) << cycles_to_us(collected_orch_summary_.fanin_cycle) << ",\n"; + outfile << " \"finalize\": " << std::fixed << std::setprecision(3) << cycles_to_us(collected_orch_summary_.finalize_cycle) << ",\n"; + outfile << " \"scope_end\": " << std::fixed << std::setprecision(3) << cycles_to_us(collected_orch_summary_.scope_end_cycle) << "\n"; + outfile << " }\n"; + outfile << " }"; + } + + // Per-task orchestrator phase records + if (!collected_orch_phase_records_.empty()) { + outfile << ",\n \"aicpu_orchestrator_phases\": [\n"; + + // Map orchestrator phase IDs to names + auto orch_phase_name = [](AicpuPhaseId id) -> const char* { + switch (id) { + case AicpuPhaseId::ORCH_SYNC: return "orch_sync"; + case AicpuPhaseId::ORCH_ALLOC: return "orch_alloc"; + case AicpuPhaseId::ORCH_PARAMS: return "orch_params"; + case AicpuPhaseId::ORCH_LOOKUP: return "orch_lookup"; + case AicpuPhaseId::ORCH_HEAP: return "orch_heap"; + case AicpuPhaseId::ORCH_INSERT: return "orch_insert"; + case AicpuPhaseId::ORCH_FANIN: return "orch_fanin"; + case AicpuPhaseId::ORCH_FINALIZE: return "orch_finalize"; + case AicpuPhaseId::ORCH_SCOPE_END: return "orch_scope_end"; + default: return "unknown"; + } + }; + + for (size_t r = 0; r < collected_orch_phase_records_.size(); r++) { + const auto& pr = collected_orch_phase_records_[r]; + double start_us = cycles_to_us(pr.start_time - base_time_cycles); + double end_us = cycles_to_us(pr.end_time - base_time_cycles); + + outfile << " {\"phase\": \"" << orch_phase_name(pr.phase_id) << "\"" + << ", \"start_time_us\": " << std::fixed << std::setprecision(3) << start_us + << ", \"end_time_us\": " << std::fixed << std::setprecision(3) << end_us + << ", \"submit_idx\": " << pr.loop_iter + << "}"; + if (r < collected_orch_phase_records_.size() - 1) outfile << ","; + outfile << "\n"; + } + outfile << " ]"; + } + } + + outfile << "\n}\n"; // Step 9: Close file outfile.close(); @@ -438,6 +632,9 @@ int PerformanceCollector::finalize(PerfUnregisterCallback unregister_cb, perf_shared_mem_host_ = nullptr; was_registered_ = false; collected_perf_records_.clear(); + collected_phase_records_.clear(); + collected_orch_phase_records_.clear(); + has_phase_data_ = false; device_id_ = -1; LOG_DEBUG("Performance profiling cleanup complete"); diff --git a/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index dbd0246bd..8bed0cbe7 100644 --- a/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -523,6 +523,10 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, // (total_tasks written to header later when orchestrator completes) if (runtime->enable_profiling) { perf_aicpu_init_profiling(runtime); + // Initialize phase profiling for scheduler threads + orchestrator + int sched_threads = (thread_num_ == 4) ? 3 : thread_num_; + perf_aicpu_init_phase_profiling(runtime, sched_threads); + perf_aicpu_set_orch_thread_idx(sched_threads); } DEV_INFO("Thread %d: one-time init done", thread_idx); @@ -568,6 +572,11 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, uint64_t sched_dispatch_miss_wait = 0, sched_dispatch_miss_hold = 0; uint64_t ready_pop_own = 0, ready_pop_steal = 0; #endif + // Phase profiling: per-phase task counters + uint32_t phase_complete_count = 0; + uint32_t phase_dispatch_count = 0; + uint32_t phase_scan_count = 0; + uint32_t phase_early_ready_count = 0; // Fanout traversal statistics: how many downstream deps were notified after task completions uint64_t fanout_edges_notified = 0; int32_t fanout_max_degree = 0; @@ -577,6 +586,12 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, sched_loop_count++; #endif CYCLE_COUNT_START(); + // Phase profiling: record start time for this iteration + uint64_t _t0_phase = _t0; + phase_complete_count = 0; + phase_dispatch_count = 0; + phase_scan_count = 0; + phase_early_ready_count = 0; // Dynamic task_count (Thread 3 sets total_tasks_ when orchestration completes) int32_t task_count = total_tasks_.load(std::memory_order_acquire); bool orch_done = orchestrator_done_.load(std::memory_order_acquire); @@ -712,6 +727,7 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, cur_thread_tasks_in_flight--; cur_thread_completed++; + phase_complete_count++; made_progress = true; completed_tasks_.fetch_add(1, std::memory_order_release); @@ -784,6 +800,13 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } } CYCLE_COUNT_LAP(sched_complete_cycle); +#if PTO2_ORCH_PROFILING + if (profiling_enabled) { + perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_COMPLETE, + _t0_phase, _t1, static_cast(sched_loop_count), phase_complete_count); + _t0_phase = _t1; + } +#endif // Phase 2: Dispatch ready tasks to idle cores (register-based dispatch) if (cur_thread_tasks_in_flight < core_num) { @@ -881,6 +904,7 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, write_reg(reg_addr, RegId::DATA_MAIN_BASE, static_cast(task_id + 1)); executing_task_ids_[core_id] = task_id; cur_thread_tasks_in_flight++; + phase_dispatch_count++; made_progress = true; DEV_DEBUG("Thread %d: Dispatching PTO2 task %d to core %d", thread_idx, task_id, core_id); } @@ -888,6 +912,13 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, } } CYCLE_COUNT_LAP(sched_dispatch_cycle); +#if PTO2_ORCH_PROFILING + if (profiling_enabled) { + perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_DISPATCH, + _t0_phase, _t1, static_cast(sched_loop_count), phase_dispatch_count); + _t0_phase = _t1; + } +#endif // Incremental scan: discover root tasks (fanin_count == 0) { @@ -922,11 +953,19 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, , sched_scan_ready_wait, sched_scan_ready_hold #endif ); + phase_scan_count++; made_progress = true; } } } CYCLE_COUNT_LAP(sched_scan_cycle); +#if PTO2_ORCH_PROFILING + if (profiling_enabled) { + perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_SCAN, + _t0_phase, _t1, static_cast(sched_loop_count), phase_scan_count); + _t0_phase = _t1; + } +#endif // Early-ready drain: tasks whose deps were already met at submit time // (orchestrator detected all producers completed → pushed to orch_ready_queue_) @@ -955,10 +994,18 @@ int AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int thread_idx, , sched_early_ready_wait, sched_early_ready_hold #endif ); + phase_early_ready_count++; made_progress = true; } } CYCLE_COUNT_LAP(sched_early_ready_cycle); +#if PTO2_ORCH_PROFILING + if (profiling_enabled) { + perf_aicpu_record_phase(thread_idx, AicpuPhaseId::SCHED_EARLY_READY, + _t0_phase, _t1, static_cast(sched_loop_count), phase_early_ready_count); + _t0_phase = _t1; + } +#endif if (!made_progress) { idle_iterations++; @@ -1319,6 +1366,24 @@ int AicpuExecutor::run(Runtime* runtime) { DEV_ALWAYS(" overlap checks : %llu, hits=%llu (%.1f%%)", (unsigned long long)tp.overlap_checks, (unsigned long long)tp.overlap_hits, tp.overlap_checks > 0 ? tp.overlap_hits * 100.0 / tp.overlap_checks : 0.0); + + // Write orchestrator summary to shared memory for host-side export + if (runtime->enable_profiling) { + AicpuOrchSummary orch_summary = {}; + orch_summary.start_time = orch_cycle_start; + orch_summary.end_time = orch_cycle_end; + orch_summary.sync_cycle = p.sync_cycle; + orch_summary.alloc_cycle = p.alloc_cycle; + orch_summary.params_cycle = p.params_cycle; + orch_summary.lookup_cycle = p.lookup_cycle; + orch_summary.heap_cycle = p.heap_cycle; + orch_summary.insert_cycle = p.insert_cycle; + orch_summary.fanin_cycle = p.fanin_cycle; + orch_summary.finalize_cycle = p.finalize_cycle; + orch_summary.scope_end_cycle = p.scope_end_cycle; + orch_summary.submit_count = p.submit_count; + perf_aicpu_write_orch_summary(&orch_summary); + } } #endif diff --git a/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index 0d26a3bac..2a9908a08 100644 --- a/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -22,9 +22,14 @@ // ============================================================================= #if PTO2_ORCH_PROFILING #include "aicpu/device_time.h" +#include "aicpu/performance_collector_aicpu.h" // Weak fallback for builds that don't link device_time.cpp (e.g. host). // The strong symbol from platform/.../device_time.cpp wins in the AICPU build. __attribute__((weak)) uint64_t get_sys_cnt_aicpu() { return 0; } +// Weak fallback for builds that don't link performance_collector_aicpu.cpp. +// The strong symbol from the AICPU build wins when profiling is available. +__attribute__((weak)) void perf_aicpu_record_orch_phase( + AicpuPhaseId, uint64_t, uint64_t, uint32_t) {} // Accumulated nanoseconds per sub-step static uint64_t g_orch_sync_cycle = 0; // tensormap sync static uint64_t g_orch_alloc_cycle = 0; // task ring alloc @@ -36,11 +41,19 @@ static uint64_t g_orch_fanin_cycle = 0; // fanin list + early-return check static uint64_t g_orch_finalize_cycle = 0; // scheduler init + SM update static uint64_t g_orch_scope_end_cycle = 0; // scope_end overhead static int64_t g_orch_submit_count = 0; +static uint32_t g_orch_submit_idx = 0; #define CYCLE_COUNT_START() uint64_t _t0 = get_sys_cnt_aicpu(), _t1 #define CYCLE_COUNT_LAP(acc) do { _t1 = get_sys_cnt_aicpu(); acc += (_t1 - _t0); _t0 = _t1; } while(0) +#define CYCLE_COUNT_LAP_RECORD(acc, phase_id) do { \ + _t1 = get_sys_cnt_aicpu(); \ + acc += (_t1 - _t0); \ + perf_aicpu_record_orch_phase((phase_id), _t0, _t1, g_orch_submit_idx); \ + _t0 = _t1; \ +} while(0) #else #define CYCLE_COUNT_START() #define CYCLE_COUNT_LAP(acc) +#define CYCLE_COUNT_LAP_RECORD(acc, phase_id) #endif // ============================================================================= @@ -176,7 +189,9 @@ void pto2_scope_end(PTO2OrchestratorState* orch) { orch->scope_tasks_size = begin; #if PTO2_ORCH_PROFILING - g_orch_scope_end_cycle += (get_sys_cnt_aicpu() - _se0); + uint64_t _se1 = get_sys_cnt_aicpu(); + g_orch_scope_end_cycle += (_se1 - _se0); + perf_aicpu_record_orch_phase(AicpuPhaseId::ORCH_SCOPE_END, _se0, _se1, g_orch_submit_idx); #endif } @@ -255,7 +270,7 @@ void pto2_submit_task(PTO2OrchestratorState* orch, // === STEP 0: Sync TensorMap validity and optional cleanup === pto2_orchestrator_sync_tensormap(&orch->tensor_map); - CYCLE_COUNT_LAP(g_orch_sync_cycle); + CYCLE_COUNT_LAP_RECORD(g_orch_sync_cycle, AicpuPhaseId::ORCH_SYNC); // Submission without an open scope is illegal assert(orch->scope_stack_top >= 0 && "Cannot submit task outside a scope"); @@ -263,7 +278,7 @@ void pto2_submit_task(PTO2OrchestratorState* orch, // === STEP 1: Allocate task slot from Task Ring (blocks until available) === int32_t task_id = pto2_task_ring_alloc(&orch->task_ring); - CYCLE_COUNT_LAP(g_orch_alloc_cycle); + CYCLE_COUNT_LAP_RECORD(g_orch_alloc_cycle, AicpuPhaseId::ORCH_ALLOC); PTO2TaskDescriptor* task = pto2_task_ring_get(&orch->task_ring, task_id); @@ -311,7 +326,7 @@ void pto2_submit_task(PTO2OrchestratorState* orch, } } - CYCLE_COUNT_LAP(g_orch_params_cycle); + CYCLE_COUNT_LAP_RECORD(g_orch_params_cycle, AicpuPhaseId::ORCH_PARAMS); // === STEP 2: First pass - collect output sizes and process inputs === @@ -375,7 +390,7 @@ void pto2_submit_task(PTO2OrchestratorState* orch, } } - CYCLE_COUNT_LAP(g_orch_lookup_cycle); + CYCLE_COUNT_LAP_RECORD(g_orch_lookup_cycle, AicpuPhaseId::ORCH_LOOKUP); // === STEP 3: Allocate packed buffer from Heap Ring (may stall) === // Each output slot is aligned to PTO2_PACKED_OUTPUT_ALIGN (1024B); gap after data is padding. @@ -400,7 +415,7 @@ void pto2_submit_task(PTO2OrchestratorState* orch, } } - CYCLE_COUNT_LAP(g_orch_heap_cycle); + CYCLE_COUNT_LAP_RECORD(g_orch_heap_cycle, AicpuPhaseId::ORCH_HEAP); // === STEP 4: Second pass - register outputs in TensorMap === for (int i = 0; i < num_params; i++) { @@ -412,7 +427,7 @@ void pto2_submit_task(PTO2OrchestratorState* orch, } } - CYCLE_COUNT_LAP(g_orch_insert_cycle); + CYCLE_COUNT_LAP_RECORD(g_orch_insert_cycle, AicpuPhaseId::ORCH_INSERT); // === STEP 5: Finalize fanin list === // First build the fanin list @@ -424,7 +439,7 @@ void pto2_submit_task(PTO2OrchestratorState* orch, // (See comment above the fetch_add in aicpu_executor.cpp Phase 1 for details.) __atomic_store_n(&task->fanin_count, fanin_count, __ATOMIC_SEQ_CST); - CYCLE_COUNT_LAP(g_orch_fanin_cycle); + CYCLE_COUNT_LAP_RECORD(g_orch_fanin_cycle, AicpuPhaseId::ORCH_FANIN); // === STEP 5b: Check if task is already ready (all producers completed via early-return) === // In AICPU parallel mode, early-return in pto2_add_consumer_to_producer may have @@ -455,11 +470,12 @@ void pto2_submit_task(PTO2OrchestratorState* orch, // === STEP 7: Update shared memory with current task index === PTO2_STORE_RELEASE(&orch->sm_handle->header->current_task_index, orch->task_ring.current_index); - CYCLE_COUNT_LAP(g_orch_finalize_cycle); + CYCLE_COUNT_LAP_RECORD(g_orch_finalize_cycle, AicpuPhaseId::ORCH_FINALIZE); orch->tasks_submitted++; #if PTO2_ORCH_PROFILING g_orch_submit_count++; + g_orch_submit_idx++; #endif } @@ -535,6 +551,7 @@ PTO2OrchProfilingData pto2_orchestrator_get_profiling() { g_orch_lookup_cycle = g_orch_heap_cycle = g_orch_insert_cycle = 0; g_orch_fanin_cycle = g_orch_finalize_cycle = g_orch_scope_end_cycle = 0; g_orch_submit_count = 0; + g_orch_submit_idx = 0; return d; } #endif diff --git a/tools/swimlane_converter.py b/tools/swimlane_converter.py index da8c2ee28..db1ec714c 100644 --- a/tools/swimlane_converter.py +++ b/tools/swimlane_converter.py @@ -55,8 +55,8 @@ def read_perf_data(filepath): raise ValueError(f"Missing required field: {field}") # Validate version - if data['version'] != 1: - raise ValueError(f"Unsupported version: {data['version']} (expected 1)") + if data['version'] not in [1, 2]: + raise ValueError(f"Unsupported version: {data['version']} (expected 1 or 2)") return data @@ -286,7 +286,9 @@ def print_task_statistics(tasks, func_id_to_name=None, sched_cpu_us_per_task=Non -def generate_chrome_trace_json(tasks, output_path, func_id_to_name=None, verbose=False): +def generate_chrome_trace_json(tasks, output_path, func_id_to_name=None, verbose=False, + scheduler_phases=None, orchestrator_data=None, + orchestrator_phases=None): """Generate Chrome Trace Event Format JSON from task data. Args: @@ -299,10 +301,15 @@ def generate_chrome_trace_json(tasks, output_path, func_id_to_name=None, verbose output_path: Path to output JSON file func_id_to_name: Optional dict mapping func_id to function name verbose: Print progress information + scheduler_phases: Optional list of per-thread phase record lists (version 2) + orchestrator_data: Optional dict with orchestrator summary (version 2) + orchestrator_phases: Optional list of per-task orchestrator phase records (version 2) - Generates two processes in the trace: + Generates processes in the trace: - pid=1 "AICore View": start_time_us to end_time_us (kernel execution) - pid=2 "AICPU View": dispatch_time_us to finish_time_us (AICPU perspective) + - pid=3 "AICPU Scheduler": scheduler phase bars (version 2) + - pid=4 "AICPU Orchestrator": orchestrator phase bars or summary (version 2) """ if verbose: print(f"Generating Chrome Trace JSON...") @@ -327,7 +334,8 @@ def generate_chrome_trace_json(tasks, output_path, func_id_to_name=None, verbose # Step 2: Generate JSON events events = [] - # Metadata event: Process names + # Metadata event: Process names and sort order + # Display order: Orchestrator (pid=4) → Scheduler (pid=3) → AICPU View (pid=2) → AICore View (pid=1) events.append({ "args": {"name": "AICore View"}, "cat": "__metadata", @@ -335,6 +343,13 @@ def generate_chrome_trace_json(tasks, output_path, func_id_to_name=None, verbose "ph": "M", "pid": 1 }) + events.append({ + "args": {"sort_index": 4}, + "cat": "__metadata", + "name": "process_sort_index", + "ph": "M", + "pid": 1 + }) # Check if any task has AICPU timestamps has_aicpu_data = any( @@ -350,6 +365,13 @@ def generate_chrome_trace_json(tasks, output_path, func_id_to_name=None, verbose "ph": "M", "pid": 2 }) + events.append({ + "args": {"sort_index": 3}, + "cat": "__metadata", + "name": "process_sort_index", + "ph": "M", + "pid": 2 + }) # Metadata events: Thread names (one per core) for core_id, tid in core_to_tid.items(): @@ -519,6 +541,281 @@ def generate_chrome_trace_json(tasks, output_path, func_id_to_name=None, verbose flow_id += 1 + # AICPU Scheduler phase events (version 2) + if scheduler_phases: + # Process metadata + events.append({ + "args": {"name": "AICPU Scheduler"}, + "cat": "__metadata", + "name": "process_name", + "ph": "M", + "pid": 3 + }) + events.append({ + "args": {"sort_index": 2}, + "cat": "__metadata", + "name": "process_sort_index", + "ph": "M", + "pid": 3 + }) + + # Phase color mapping + phase_colors = { + "complete": "good", # green + "dispatch": "terrible", # red + "scan": "thread_state_running", # blue + "early_ready": "yellow", # yellow + } + + for thread_idx, thread_records in enumerate(scheduler_phases): + tid = 3000 + thread_idx + + # Thread name metadata + events.append({ + "args": {"name": f"Sched_{thread_idx}"}, + "cat": "__metadata", + "name": "thread_name", + "ph": "M", + "pid": 3, + "tid": tid + }) + + for record in thread_records: + phase = record.get("phase", "unknown") + start_us = record["start_time_us"] + end_us = record["end_time_us"] + dur = end_us - start_us + tasks_processed = record.get("tasks_processed", 0) + + event = { + "args": { + "phase": phase, + "loop_iter": record.get("loop_iter", 0), + "tasks_processed": tasks_processed + }, + "cat": "scheduler", + "cname": phase_colors.get(phase, "generic_work"), + "name": f"{phase}({tasks_processed})", + "ph": "X", + "pid": 3, + "tid": tid, + "ts": start_us, + "dur": dur + } + events.append(event) + + # AICPU Orchestrator event (version 2) + if orchestrator_phases or orchestrator_data: + # Process metadata + events.append({ + "args": {"name": "AICPU Orchestrator"}, + "cat": "__metadata", + "name": "process_name", + "ph": "M", + "pid": 4 + }) + events.append({ + "args": {"sort_index": 1}, + "cat": "__metadata", + "name": "process_sort_index", + "ph": "M", + "pid": 4 + }) + + # Thread name metadata + events.append({ + "args": {"name": "Orchestrator"}, + "cat": "__metadata", + "name": "thread_name", + "ph": "M", + "pid": 4, + "tid": 4000 + }) + + if orchestrator_phases: + # Per-task orchestrator phase bars + orch_phase_colors = { + "orch_sync": "thread_state_iowait", # orange + "orch_alloc": "terrible", # red + "orch_params": "good", # green + "orch_lookup": "thread_state_running", # blue + "orch_heap": "yellow", + "orch_insert": "olive", + "orch_fanin": "rail_animation", + "orch_finalize": "cq_build_passed", + "orch_scope_end": "generic_work", + } + + for record in orchestrator_phases: + phase = record.get("phase", "unknown") + start_us = record["start_time_us"] + end_us = record["end_time_us"] + dur = end_us - start_us + submit_idx = record.get("submit_idx", 0) + + # Strip "orch_" prefix for display name + display_name = phase.replace("orch_", "") if phase.startswith("orch_") else phase + + event = { + "args": { + "phase": phase, + "submit_idx": submit_idx + }, + "cat": "orchestrator", + "cname": orch_phase_colors.get(phase, "generic_work"), + "name": f"{display_name}({submit_idx})", + "ph": "X", + "pid": 4, + "tid": 4000, + "ts": start_us, + "dur": dur + } + events.append(event) + + elif orchestrator_data: + # Fallback: cumulative summary as single bar + orch_start = orchestrator_data["start_time_us"] + orch_end = orchestrator_data["end_time_us"] + orch_dur = orch_end - orch_start + phase_us = orchestrator_data.get("phase_us", {}) + + # Build args with phase breakdown (cumulative totals, shown in detail panel) + orch_args = { + "submit_count": orchestrator_data.get("submit_count", 0), + } + total_phase_us = sum(phase_us.values()) + if total_phase_us > 0: + for phase_name, dur in phase_us.items(): + if dur > 0: + pct = dur / total_phase_us * 100 + orch_args[f"{phase_name}_us"] = round(dur, 3) + orch_args[f"{phase_name}_%"] = round(pct, 1) + + # Total orchestrator bar + events.append({ + "args": orch_args, + "cat": "orchestrator", + "name": f"Orchestrator({orchestrator_data.get('submit_count', 0)} tasks)", + "ph": "X", + "pid": 4, + "tid": 4000, + "ts": orch_start, + "dur": orch_dur + }) + + # AICPU View fanout arrows (duplicate AICore View flow events using AICPU timestamps) + if has_aicpu_data: + for task in tasks: + src_finish_us = task.get('finish_time_us', 0) + if src_finish_us <= 0: + continue + + src_tid = core_to_tid[task['core_id']] + + for succ_task_id in task['fanout']: + if succ_task_id not in task_map: + continue + + succ_task = task_map[succ_task_id] + dst_dispatch_us = succ_task.get('dispatch_time_us', 0) + if dst_dispatch_us <= 0: + continue + + dst_tid = core_to_tid[succ_task['core_id']] + + events.append({ + "cat": "flow", + "id": flow_id, + "name": "dependency", + "ph": "s", + "pid": 2, + "tid": src_tid, + "ts": src_finish_us - 0.01 + }) + events.append({ + "cat": "flow", + "id": flow_id, + "name": "dependency", + "ph": "f", + "pid": 2, + "tid": dst_tid, + "ts": dst_dispatch_us, + "bp": "e" + }) + flow_id += 1 + + # Scheduler DISPATCH → task execution arrows + if scheduler_phases and has_aicpu_data: + # Build index of DISPATCH phase records per thread for fast lookup + dispatch_phases_by_thread = {} + for thread_idx, thread_records in enumerate(scheduler_phases): + dispatch_records = [r for r in thread_records if r.get("phase") == "dispatch"] + if dispatch_records: + dispatch_phases_by_thread[thread_idx] = dispatch_records + + for task in tasks: + dispatch_us = task.get('dispatch_time_us', 0) + if dispatch_us <= 0: + continue + + # Find the scheduler DISPATCH phase that contains this dispatch timestamp + matched_thread = None + for thread_idx, dispatch_records in dispatch_phases_by_thread.items(): + for dr in dispatch_records: + if dr["start_time_us"] <= dispatch_us <= dr["end_time_us"]: + matched_thread = thread_idx + break + if matched_thread is not None: + break + + if matched_thread is not None: + sched_tid = 3000 + matched_thread + core_tid = core_to_tid[task['core_id']] + + # Flow: scheduler DISPATCH → AICore View task start + events.append({ + "cat": "flow", + "id": flow_id, + "name": "dispatch", + "ph": "s", + "pid": 3, + "tid": sched_tid, + "ts": dispatch_us + }) + events.append({ + "cat": "flow", + "id": flow_id, + "name": "dispatch", + "ph": "f", + "pid": 1, + "tid": core_tid, + "ts": task['start_time_us'], + "bp": "e" + }) + flow_id += 1 + + # Flow: scheduler DISPATCH → AICPU View task start + events.append({ + "cat": "flow", + "id": flow_id, + "name": "dispatch", + "ph": "s", + "pid": 3, + "tid": sched_tid, + "ts": dispatch_us + }) + events.append({ + "cat": "flow", + "id": flow_id, + "name": "dispatch", + "ph": "f", + "pid": 2, + "tid": core_tid, + "ts": dispatch_us, + "bp": "e" + }) + flow_id += 1 + if verbose: print(f" Total events: {len(events)}") print(f" Flow events: {flow_id}") @@ -638,8 +935,26 @@ def main(): perf_path=input_path, ) + # Extract version 2 data if available + scheduler_phases = data.get('aicpu_scheduler_phases') + orchestrator_data = data.get('aicpu_orchestrator') + orchestrator_phases = data.get('aicpu_orchestrator_phases') + + if args.verbose and data['version'] == 2: + if scheduler_phases: + total_phase_records = sum(len(t) for t in scheduler_phases) + print(f" Scheduler threads: {len(scheduler_phases)}") + print(f" Total phase records: {total_phase_records}") + if orchestrator_data: + print(f" Orchestrator: {orchestrator_data.get('submit_count', 0)} tasks") + if orchestrator_phases: + print(f" Orchestrator phases: {len(orchestrator_phases)} per-task records") + # Generate Perfetto JSON - generate_chrome_trace_json(data['tasks'], str(output_path), func_names, args.verbose) + generate_chrome_trace_json(data['tasks'], str(output_path), func_names, args.verbose, + scheduler_phases=scheduler_phases, + orchestrator_data=orchestrator_data, + orchestrator_phases=orchestrator_phases) print(f"\n✓ Conversion complete") print(f" Input: {input_path}")