From 7b8d88fa7b254070ce5e820edad590ee61b9ce3f Mon Sep 17 00:00:00 2001 From: zhusy54 Date: Wed, 15 Apr 2026 20:33:59 +0800 Subject: [PATCH] Add: parallel for iteration isolation in tensormap and orchestrator (a2a3/a5) Introduces PTO2_PARALLEL_FOR macro and supporting orchestrator APIs (pto2_parallel_for_begin/end, pto2_parallel_scope_begin/end) to isolate tensormap lookups per loop iteration. Initializes iter_start_local_ids to -1 in PTO2TensorMap::init. Updates alternating_matmul_add, batch_paged_attention, benchmark_bgemm, and paged_attention_unroll scene tests to use PTO2_PARALLEL_FOR. --- .../orchestration/pto_orchestration_api.h | 100 ++++++++++++++++++ .../runtime/pto_orchestrator.cpp | 43 ++++++++ .../runtime/pto_orchestrator.h | 28 +++++ .../runtime/pto_ring_buffer.h | 1 + .../runtime/pto_runtime2.cpp | 12 +++ .../runtime/pto_runtime2.h | 6 ++ .../runtime/pto_tensormap.cpp | 1 + .../runtime/pto_tensormap.h | 16 +++ .../orchestration/pto_orchestration_api.h | 100 ++++++++++++++++++ .../runtime/pto_orchestrator.cpp | 43 ++++++++ .../runtime/pto_orchestrator.h | 28 +++++ .../runtime/pto_ring_buffer.h | 1 + .../runtime/pto_runtime2.cpp | 12 +++ .../runtime/pto_runtime2.h | 6 ++ .../runtime/pto_tensormap.cpp | 1 + .../runtime/pto_tensormap.h | 16 +++ .../orchestration/alternating_orch.cpp | 2 +- .../orchestration/paged_attention_orch.cpp | 12 +-- .../kernels/orchestration/bgemm_orch.cpp | 4 +- .../orchestration/paged_attention_orch.cpp | 8 +- 20 files changed, 426 insertions(+), 14 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h b/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h index e025c91ba..6486505cb 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h @@ -136,6 +136,12 @@ typedef struct PTO2RuntimeOps { PTO2Runtime *rt, const Tensor &tensor, uint32_t ndims, const uint32_t indices[], uint64_t value ); TaskOutputTensors (*alloc_tensors)(PTO2Runtime *rt, const Arg &args); + + // Parallel for iteration isolation + void (*parallel_for_begin)(PTO2Runtime *rt); + void (*parallel_scope_begin)(PTO2Runtime *rt); + void (*parallel_scope_end)(PTO2Runtime *rt); + void (*parallel_for_end)(PTO2Runtime *rt); } PTO2RuntimeOps; /** @@ -255,6 +261,38 @@ static inline void pto2_rt_scope_end() { rt->ops->scope_end(rt); } +static inline void pto2_rt_parallel_for_begin() { + PTO2Runtime *rt = pto2_current_runtime(); + if (rt->ops->is_fatal(rt)) { + return; + } + rt->ops->parallel_for_begin(rt); +} + +static inline void pto2_rt_parallel_scope_begin() { + PTO2Runtime *rt = pto2_current_runtime(); + if (rt->ops->is_fatal(rt)) { + return; + } + rt->ops->parallel_scope_begin(rt); +} + +static inline void pto2_rt_parallel_scope_end() { + PTO2Runtime *rt = pto2_current_runtime(); + if (rt->ops->is_fatal(rt)) { + return; + } + rt->ops->parallel_scope_end(rt); +} + +static inline void pto2_rt_parallel_for_end() { + PTO2Runtime *rt = pto2_current_runtime(); + if (rt->ops->is_fatal(rt)) { + return; + } + rt->ops->parallel_for_end(rt); +} + static inline void pto2_rt_orchestration_done() { PTO2Runtime *rt = pto2_current_runtime(); rt->ops->orchestration_done(rt); @@ -381,6 +419,68 @@ class PTO2ScopeGuard { */ #define PTO2_SCOPE() if (PTO2_SCOPE_GUARD(); true) +/** + * RAII guard for parallel for region (calls parallel_for_begin/end) + */ +class PTO2ParallelForGuard { +public: // NOLINT(whitespace/indent) + PTO2ParallelForGuard() : + rt_(pto2_current_runtime()) { + if (!rt_->ops->is_fatal(rt_)) { + rt_->ops->parallel_for_begin(rt_); + } + } + ~PTO2ParallelForGuard() { + if (!rt_->ops->is_fatal(rt_)) { + rt_->ops->parallel_for_end(rt_); + } + } + +private: // NOLINT(whitespace/indent) + PTO2Runtime *rt_; +}; + +/** + * RAII guard for parallel scope (one iteration; calls parallel_scope_begin/end) + */ +class PTO2ParallelScopeGuard { +public: // NOLINT(whitespace/indent) + PTO2ParallelScopeGuard() : + rt_(pto2_current_runtime()) { + if (!rt_->ops->is_fatal(rt_)) { + rt_->ops->parallel_scope_begin(rt_); + } + } + ~PTO2ParallelScopeGuard() { + if (!rt_->ops->is_fatal(rt_)) { + rt_->ops->parallel_scope_end(rt_); + } + } + +private: // NOLINT(whitespace/indent) + PTO2Runtime *rt_; +}; + +#define PTO2_PARALLEL_FOR_GUARD() [[maybe_unused]] PTO2ParallelForGuard _PTO2_CONCATENATE(pf_guard_, __COUNTER__) +#define PTO2_PARALLEL_SCOPE_GUARD() [[maybe_unused]] PTO2ParallelScopeGuard _PTO2_CONCATENATE(ps_guard_, __COUNTER__) + +/** + * Parallel for loop with automatic iteration isolation: + * PTO2_PARALLEL_FOR(i, N) { + * submit_iter_tasks(i); + * } + */ +#define PTO2_PARALLEL_FOR(var, count) \ + if (PTO2_PARALLEL_FOR_GUARD(); true) \ + for (int var = 0; var < (count); ++var) \ + if (PTO2_PARALLEL_SCOPE_GUARD(); true) + +/** + * Single parallel scope (for manual loop control): + * PTO2_PARALLEL_SCOPE() { submit_iter_tasks(); } + */ +#define PTO2_PARALLEL_SCOPE() if (PTO2_PARALLEL_SCOPE_GUARD(); true) + // ============================================================================= // Orchestration Config // ============================================================================= diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index 85420e245..1d815ea37 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -516,6 +516,49 @@ void pto2_scope_end(PTO2OrchestratorState *orch) { #endif } +// ============================================================================= +// Parallel For Iteration Isolation +// ============================================================================= + +void pto2_parallel_for_begin(PTO2OrchestratorState *orch) { + if (orch->fatal) { + return; + } + // Currently a marker; the real work is done per-iteration in + // parallel_scope_begin. Reserved for future diagnostics/assertions. +} + +void pto2_parallel_scope_begin(PTO2OrchestratorState *orch) { + if (orch->fatal) { + return; + } + uint8_t outer_ring = orch->current_ring_id(); + pto2_scope_begin(orch); + uint8_t inner_ring = orch->current_ring_id(); + if (inner_ring != outer_ring) { + // Normal case: a new ring was allocated; set the iteration filter. + int32_t next_id = orch->rings[inner_ring].task_allocator.next_local_id(); + orch->tensor_map.iter_start_local_ids[inner_ring] = next_id; + } + // else: depth overflow (clamped) — silently degrade to a plain scope. +} + +void pto2_parallel_scope_end(PTO2OrchestratorState *orch) { + // iter_start_local_ids is NOT cleared here; the next iteration's + // parallel_scope_begin will overwrite it. parallel_for_end clears it. + pto2_scope_end(orch); +} + +void pto2_parallel_for_end(PTO2OrchestratorState *orch) { + if (orch->fatal) { + return; + } + uint8_t ring_id = orch->current_ring_id(); + // Clear the filter; subsequent lookups see all entries on this ring. + // In the depth-overflow case the value is already -1 (idempotent). + orch->tensor_map.iter_start_local_ids[ring_id] = -1; +} + // ============================================================================= // Task Submission // ============================================================================= diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h index 0ad5e6873..f64075c0a 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h @@ -168,6 +168,34 @@ void pto2_scope_begin(PTO2OrchestratorState *orch); */ void pto2_scope_end(PTO2OrchestratorState *orch); +// ============================================================================= +// Parallel For Iteration Isolation +// ============================================================================= + +/** + * Begin a parallel for region. + * Currently a no-op marker; reserved for future diagnostics/assertions. + */ +void pto2_parallel_for_begin(PTO2OrchestratorState *orch); + +/** + * Begin a parallel scope (one iteration of a parallel for). + * Combines scope_begin + setting the iteration filter boundary. + */ +void pto2_parallel_scope_begin(PTO2OrchestratorState *orch); + +/** + * End a parallel scope (one iteration of a parallel for). + * Calls scope_end; does NOT clear the filter (next iteration overwrites it). + */ +void pto2_parallel_scope_end(PTO2OrchestratorState *orch); + +/** + * End a parallel for region. + * Clears the iteration filter so subsequent lookups see all entries. + */ +void pto2_parallel_for_end(PTO2OrchestratorState *orch); + // ============================================================================= // Task Submission // ============================================================================= diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h index 1116b4028..ffd114566 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h @@ -209,6 +209,7 @@ class PTO2TaskAllocator { uint64_t heap_top() const { return heap_top_; } uint64_t heap_capacity() const { return heap_size_; } + int32_t next_local_id() const { return local_task_id_; } private: // --- Task Ring --- diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp index 0ab584d60..f8bffd168 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp @@ -50,6 +50,14 @@ void pto2_rt_scope_begin(PTO2Runtime *rt) { pto2_scope_begin(&rt->orchestrator); void pto2_rt_scope_end(PTO2Runtime *rt) { pto2_scope_end(&rt->orchestrator); } +static void pto2_rt_parallel_for_begin(PTO2Runtime *rt) { pto2_parallel_for_begin(&rt->orchestrator); } + +static void pto2_rt_parallel_scope_begin(PTO2Runtime *rt) { pto2_parallel_scope_begin(&rt->orchestrator); } + +static void pto2_rt_parallel_scope_end(PTO2Runtime *rt) { pto2_parallel_scope_end(&rt->orchestrator); } + +static void pto2_rt_parallel_for_end(PTO2Runtime *rt) { pto2_parallel_for_end(&rt->orchestrator); } + void pto2_rt_orchestration_done(PTO2Runtime *rt) { pto2_orchestrator_done(&rt->orchestrator); } static bool is_fatal_impl(PTO2Runtime *rt) { return rt->orchestrator.fatal; } @@ -206,6 +214,10 @@ static const PTO2RuntimeOps s_runtime_ops = { .get_tensor_data = pto2_get_tensor_data, .set_tensor_data = pto2_set_tensor_data, .alloc_tensors = alloc_tensors_impl, + .parallel_for_begin = pto2_rt_parallel_for_begin, + .parallel_scope_begin = pto2_rt_parallel_scope_begin, + .parallel_scope_end = pto2_rt_parallel_scope_end, + .parallel_for_end = pto2_rt_parallel_for_end, }; // ============================================================================= diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h index 44b71aab3..5145644ae 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h @@ -87,6 +87,12 @@ struct PTO2RuntimeOps { PTO2Runtime *rt, const Tensor &tensor, uint32_t ndims, const uint32_t indices[], uint64_t value ); TaskOutputTensors (*alloc_tensors)(PTO2Runtime *rt, const Arg &args); + + // Parallel for iteration isolation + void (*parallel_for_begin)(PTO2Runtime *rt); + void (*parallel_scope_begin)(PTO2Runtime *rt); + void (*parallel_scope_end)(PTO2Runtime *rt); + void (*parallel_for_end)(PTO2Runtime *rt); }; /** diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp index 3c7447362..6e31ec477 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp @@ -129,6 +129,7 @@ bool PTO2TensorMap::init( for (int r = 0; r < PTO2_MAX_RING_DEPTH; r++) { last_task_alives[r] = 0; last_cleanup[r] = 0; + iter_start_local_ids[r] = -1; } return true; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h index 61524348a..d909947ea 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h @@ -216,6 +216,11 @@ struct PTO2TensorMap { // Per-ring validity threshold (for lazy invalidation) int32_t last_task_alives[PTO2_MAX_RING_DEPTH]; // Cached from shared memory per ring + // Per-ring iteration isolation for parallel for. + // -1 = normal mode (no filtering); >= 0 = parallel for mode, entries with + // local_id < iter_start on the same ring are filtered out during lookup. + int32_t iter_start_local_ids[PTO2_MAX_RING_DEPTH]; + // Per-ring cleanup progress (for periodic cleanup_retired) int32_t last_cleanup[PTO2_MAX_RING_DEPTH]{}; @@ -328,6 +333,17 @@ struct PTO2TensorMap { continue; } + // Parallel for iteration isolation: skip entries from prior iterations + // on the same ring. Outer-ring entries have iter_start_local_ids == -1 + // and pass through unconditionally. + { + int32_t iter_start = iter_start_local_ids[cur_entry->producer_task_id.ring()]; + if (iter_start >= 0 && static_cast(cur_entry->producer_task_id.local()) < iter_start) { + cur_entry = next_entry; + continue; + } + } + // Entry is valid - check if regions OVERLAP (not just exact match) // Since we hash only by base_ptr, all entries in this bucket have // potential to overlap. We must check actual byte-range overlap. diff --git a/src/a5/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h b/src/a5/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h index e8b0a08b6..a3e1dba93 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h @@ -136,6 +136,12 @@ typedef struct PTO2RuntimeOps { PTO2Runtime *rt, const Tensor &tensor, uint32_t ndims, const uint32_t indices[], uint64_t value ); TaskOutputTensors (*alloc_tensors)(PTO2Runtime *rt, const Arg &args); + + // Parallel for iteration isolation + void (*parallel_for_begin)(PTO2Runtime *rt); + void (*parallel_scope_begin)(PTO2Runtime *rt); + void (*parallel_scope_end)(PTO2Runtime *rt); + void (*parallel_for_end)(PTO2Runtime *rt); } PTO2RuntimeOps; /** @@ -255,6 +261,38 @@ static inline void pto2_rt_scope_end() { rt->ops->scope_end(rt); } +static inline void pto2_rt_parallel_for_begin() { + PTO2Runtime *rt = pto2_current_runtime(); + if (rt->ops->is_fatal(rt)) { + return; + } + rt->ops->parallel_for_begin(rt); +} + +static inline void pto2_rt_parallel_scope_begin() { + PTO2Runtime *rt = pto2_current_runtime(); + if (rt->ops->is_fatal(rt)) { + return; + } + rt->ops->parallel_scope_begin(rt); +} + +static inline void pto2_rt_parallel_scope_end() { + PTO2Runtime *rt = pto2_current_runtime(); + if (rt->ops->is_fatal(rt)) { + return; + } + rt->ops->parallel_scope_end(rt); +} + +static inline void pto2_rt_parallel_for_end() { + PTO2Runtime *rt = pto2_current_runtime(); + if (rt->ops->is_fatal(rt)) { + return; + } + rt->ops->parallel_for_end(rt); +} + static inline void pto2_rt_orchestration_done() { PTO2Runtime *rt = pto2_current_runtime(); rt->ops->orchestration_done(rt); @@ -381,6 +419,68 @@ class PTO2ScopeGuard { */ #define PTO2_SCOPE() if (PTO2_SCOPE_GUARD(); true) +/** + * RAII guard for parallel for region (calls parallel_for_begin/end) + */ +class PTO2ParallelForGuard { +public: // NOLINT(whitespace/indent) + PTO2ParallelForGuard() : + rt_(pto2_current_runtime()) { + if (!rt_->ops->is_fatal(rt_)) { + rt_->ops->parallel_for_begin(rt_); + } + } + ~PTO2ParallelForGuard() { + if (!rt_->ops->is_fatal(rt_)) { + rt_->ops->parallel_for_end(rt_); + } + } + +private: // NOLINT(whitespace/indent) + PTO2Runtime *rt_; +}; + +/** + * RAII guard for parallel scope (one iteration; calls parallel_scope_begin/end) + */ +class PTO2ParallelScopeGuard { +public: // NOLINT(whitespace/indent) + PTO2ParallelScopeGuard() : + rt_(pto2_current_runtime()) { + if (!rt_->ops->is_fatal(rt_)) { + rt_->ops->parallel_scope_begin(rt_); + } + } + ~PTO2ParallelScopeGuard() { + if (!rt_->ops->is_fatal(rt_)) { + rt_->ops->parallel_scope_end(rt_); + } + } + +private: // NOLINT(whitespace/indent) + PTO2Runtime *rt_; +}; + +#define PTO2_PARALLEL_FOR_GUARD() [[maybe_unused]] PTO2ParallelForGuard _PTO2_CONCATENATE(pf_guard_, __COUNTER__) +#define PTO2_PARALLEL_SCOPE_GUARD() [[maybe_unused]] PTO2ParallelScopeGuard _PTO2_CONCATENATE(ps_guard_, __COUNTER__) + +/** + * Parallel for loop with automatic iteration isolation: + * PTO2_PARALLEL_FOR(i, N) { + * submit_iter_tasks(i); + * } + */ +#define PTO2_PARALLEL_FOR(var, count) \ + if (PTO2_PARALLEL_FOR_GUARD(); true) \ + for (int var = 0; var < (count); ++var) \ + if (PTO2_PARALLEL_SCOPE_GUARD(); true) + +/** + * Single parallel scope (for manual loop control): + * PTO2_PARALLEL_SCOPE() { submit_iter_tasks(); } + */ +#define PTO2_PARALLEL_SCOPE() if (PTO2_PARALLEL_SCOPE_GUARD(); true) + // ============================================================================= // Orchestration Config // ============================================================================= diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index 54f03f347..b803caadb 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -516,6 +516,49 @@ void pto2_scope_end(PTO2OrchestratorState *orch) { #endif } +// ============================================================================= +// Parallel For Iteration Isolation +// ============================================================================= + +void pto2_parallel_for_begin(PTO2OrchestratorState *orch) { + if (orch->fatal) { + return; + } + // Currently a marker; the real work is done per-iteration in + // parallel_scope_begin. Reserved for future diagnostics/assertions. +} + +void pto2_parallel_scope_begin(PTO2OrchestratorState *orch) { + if (orch->fatal) { + return; + } + uint8_t outer_ring = orch->current_ring_id(); + pto2_scope_begin(orch); + uint8_t inner_ring = orch->current_ring_id(); + if (inner_ring != outer_ring) { + // Normal case: a new ring was allocated; set the iteration filter. + int32_t next_id = orch->rings[inner_ring].task_allocator.next_local_id(); + orch->tensor_map.iter_start_local_ids[inner_ring] = next_id; + } + // else: depth overflow (clamped) — silently degrade to a plain scope. +} + +void pto2_parallel_scope_end(PTO2OrchestratorState *orch) { + // iter_start_local_ids is NOT cleared here; the next iteration's + // parallel_scope_begin will overwrite it. parallel_for_end clears it. + pto2_scope_end(orch); +} + +void pto2_parallel_for_end(PTO2OrchestratorState *orch) { + if (orch->fatal) { + return; + } + uint8_t ring_id = orch->current_ring_id(); + // Clear the filter; subsequent lookups see all entries on this ring. + // In the depth-overflow case the value is already -1 (idempotent). + orch->tensor_map.iter_start_local_ids[ring_id] = -1; +} + // ============================================================================= // Task Submission // ============================================================================= diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h index 0ad5e6873..f64075c0a 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h @@ -168,6 +168,34 @@ void pto2_scope_begin(PTO2OrchestratorState *orch); */ void pto2_scope_end(PTO2OrchestratorState *orch); +// ============================================================================= +// Parallel For Iteration Isolation +// ============================================================================= + +/** + * Begin a parallel for region. + * Currently a no-op marker; reserved for future diagnostics/assertions. + */ +void pto2_parallel_for_begin(PTO2OrchestratorState *orch); + +/** + * Begin a parallel scope (one iteration of a parallel for). + * Combines scope_begin + setting the iteration filter boundary. + */ +void pto2_parallel_scope_begin(PTO2OrchestratorState *orch); + +/** + * End a parallel scope (one iteration of a parallel for). + * Calls scope_end; does NOT clear the filter (next iteration overwrites it). + */ +void pto2_parallel_scope_end(PTO2OrchestratorState *orch); + +/** + * End a parallel for region. + * Clears the iteration filter so subsequent lookups see all entries. + */ +void pto2_parallel_for_end(PTO2OrchestratorState *orch); + // ============================================================================= // Task Submission // ============================================================================= diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h index 1116b4028..ffd114566 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h @@ -209,6 +209,7 @@ class PTO2TaskAllocator { uint64_t heap_top() const { return heap_top_; } uint64_t heap_capacity() const { return heap_size_; } + int32_t next_local_id() const { return local_task_id_; } private: // --- Task Ring --- diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp index 0ab584d60..f8bffd168 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp @@ -50,6 +50,14 @@ void pto2_rt_scope_begin(PTO2Runtime *rt) { pto2_scope_begin(&rt->orchestrator); void pto2_rt_scope_end(PTO2Runtime *rt) { pto2_scope_end(&rt->orchestrator); } +static void pto2_rt_parallel_for_begin(PTO2Runtime *rt) { pto2_parallel_for_begin(&rt->orchestrator); } + +static void pto2_rt_parallel_scope_begin(PTO2Runtime *rt) { pto2_parallel_scope_begin(&rt->orchestrator); } + +static void pto2_rt_parallel_scope_end(PTO2Runtime *rt) { pto2_parallel_scope_end(&rt->orchestrator); } + +static void pto2_rt_parallel_for_end(PTO2Runtime *rt) { pto2_parallel_for_end(&rt->orchestrator); } + void pto2_rt_orchestration_done(PTO2Runtime *rt) { pto2_orchestrator_done(&rt->orchestrator); } static bool is_fatal_impl(PTO2Runtime *rt) { return rt->orchestrator.fatal; } @@ -206,6 +214,10 @@ static const PTO2RuntimeOps s_runtime_ops = { .get_tensor_data = pto2_get_tensor_data, .set_tensor_data = pto2_set_tensor_data, .alloc_tensors = alloc_tensors_impl, + .parallel_for_begin = pto2_rt_parallel_for_begin, + .parallel_scope_begin = pto2_rt_parallel_scope_begin, + .parallel_scope_end = pto2_rt_parallel_scope_end, + .parallel_for_end = pto2_rt_parallel_for_end, }; // ============================================================================= diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h index 44b71aab3..5145644ae 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.h @@ -87,6 +87,12 @@ struct PTO2RuntimeOps { PTO2Runtime *rt, const Tensor &tensor, uint32_t ndims, const uint32_t indices[], uint64_t value ); TaskOutputTensors (*alloc_tensors)(PTO2Runtime *rt, const Arg &args); + + // Parallel for iteration isolation + void (*parallel_for_begin)(PTO2Runtime *rt); + void (*parallel_scope_begin)(PTO2Runtime *rt); + void (*parallel_scope_end)(PTO2Runtime *rt); + void (*parallel_for_end)(PTO2Runtime *rt); }; /** diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp index 3c7447362..6e31ec477 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp @@ -129,6 +129,7 @@ bool PTO2TensorMap::init( for (int r = 0; r < PTO2_MAX_RING_DEPTH; r++) { last_task_alives[r] = 0; last_cleanup[r] = 0; + iter_start_local_ids[r] = -1; } return true; diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h index 61524348a..d909947ea 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h @@ -216,6 +216,11 @@ struct PTO2TensorMap { // Per-ring validity threshold (for lazy invalidation) int32_t last_task_alives[PTO2_MAX_RING_DEPTH]; // Cached from shared memory per ring + // Per-ring iteration isolation for parallel for. + // -1 = normal mode (no filtering); >= 0 = parallel for mode, entries with + // local_id < iter_start on the same ring are filtered out during lookup. + int32_t iter_start_local_ids[PTO2_MAX_RING_DEPTH]; + // Per-ring cleanup progress (for periodic cleanup_retired) int32_t last_cleanup[PTO2_MAX_RING_DEPTH]{}; @@ -328,6 +333,17 @@ struct PTO2TensorMap { continue; } + // Parallel for iteration isolation: skip entries from prior iterations + // on the same ring. Outer-ring entries have iter_start_local_ids == -1 + // and pass through unconditionally. + { + int32_t iter_start = iter_start_local_ids[cur_entry->producer_task_id.ring()]; + if (iter_start >= 0 && static_cast(cur_entry->producer_task_id.local()) < iter_start) { + cur_entry = next_entry; + continue; + } + } + // Entry is valid - check if regions OVERLAP (not just exact match) // Since we hash only by base_ptr, all entries in this bucket have // potential to overlap. We must check actual byte-range overlap. diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/alternating_matmul_add/kernels/orchestration/alternating_orch.cpp b/tests/st/a2a3/tensormap_and_ringbuffer/alternating_matmul_add/kernels/orchestration/alternating_orch.cpp index 308a1d66e..e7ff178c8 100644 --- a/tests/st/a2a3/tensormap_and_ringbuffer/alternating_matmul_add/kernels/orchestration/alternating_orch.cpp +++ b/tests/st/a2a3/tensormap_and_ringbuffer/alternating_matmul_add/kernels/orchestration/alternating_orch.cpp @@ -79,7 +79,7 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry(const Chip int max_groups = num_matmul_groups > num_add_groups ? num_matmul_groups : num_add_groups; // Interleaved submit: matmul and add groups alternate - for (int group_idx = 0; group_idx < max_groups; group_idx++) { + PTO2_PARALLEL_FOR(group_idx, max_groups) { if (group_idx < num_matmul_groups) { int start_task_idx = group_idx * matmul_batch; uint64_t offset = static_cast(start_task_idx) * MATMUL_ELEMS; diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/batch_paged_attention/kernels/orchestration/paged_attention_orch.cpp b/tests/st/a2a3/tensormap_and_ringbuffer/batch_paged_attention/kernels/orchestration/paged_attention_orch.cpp index 68b794f6b..0d6d2d3c2 100644 --- a/tests/st/a2a3/tensormap_and_ringbuffer/batch_paged_attention/kernels/orchestration/paged_attention_orch.cpp +++ b/tests/st/a2a3/tensormap_and_ringbuffer/batch_paged_attention/kernels/orchestration/paged_attention_orch.cpp @@ -113,15 +113,15 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry(const Chip constexpr uint64_t IN_CORE_BATCH = 16; uint64_t num_chunks = (batch + IN_CORE_BATCH - 1) / IN_CORE_BATCH; - for (uint64_t q_idx = 0; q_idx < q_loop; q_idx++) { - uint64_t q_offset = q_idx * q_tile; + PTO2_PARALLEL_FOR(q_idx, (int)q_loop) { + uint64_t q_offset = (uint64_t)q_idx * q_tile; - for (uint64_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) { - uint64_t chunk_bc = batch - chunk_idx * IN_CORE_BATCH; + PTO2_PARALLEL_FOR(chunk_idx, (int)num_chunks) { + uint64_t chunk_bc = batch - (uint64_t)chunk_idx * IN_CORE_BATCH; if (chunk_bc > IN_CORE_BATCH) chunk_bc = IN_CORE_BATCH; - uint64_t batch_start = chunk_idx * IN_CORE_BATCH; + uint64_t batch_start = (uint64_t)chunk_idx * IN_CORE_BATCH; - PTO2_SCOPE() { + { uint32_t oi_acc_shapes[2] = {static_cast(chunk_bc * q_tile), static_cast(head_dim)}; uint32_t scalar_acc_shapes[1] = {static_cast(chunk_bc * q_tile)}; TensorCreateInfo oi_batch_ci(oi_acc_shapes, 2, DataType::FLOAT32); diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/benchmark_bgemm/kernels/orchestration/bgemm_orch.cpp b/tests/st/a2a3/tensormap_and_ringbuffer/benchmark_bgemm/kernels/orchestration/bgemm_orch.cpp index b313dbe41..749f2d0d0 100644 --- a/tests/st/a2a3/tensormap_and_ringbuffer/benchmark_bgemm/kernels/orchestration/bgemm_orch.cpp +++ b/tests/st/a2a3/tensormap_and_ringbuffer/benchmark_bgemm/kernels/orchestration/bgemm_orch.cpp @@ -78,9 +78,7 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry(const Chip // A/B layout: [num_groups, grid_k, incore_loop, tile_size, tile_size] // C layout: [incore_loop * num_groups, tile_size, tile_size] - for (int group_idx = 0; group_idx < num_groups; group_idx++) { - PTO2_SCOPE_GUARD(); - + PTO2_PARALLEL_FOR(group_idx, num_groups) { uint32_t c_elem_offset = static_cast(static_cast(group_idx) * group_tile_elems); uint32_t c_view_offsets[1] = {c_elem_offset}; Tensor C_view = ext_C.view(group_shapes, c_view_offsets); diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll/kernels/orchestration/paged_attention_orch.cpp b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll/kernels/orchestration/paged_attention_orch.cpp index 954ee478f..c69416feb 100644 --- a/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll/kernels/orchestration/paged_attention_orch.cpp +++ b/tests/st/a2a3/tensormap_and_ringbuffer/paged_attention_unroll/kernels/orchestration/paged_attention_orch.cpp @@ -151,15 +151,15 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry(const Chip CYCLE_COUNT_LAP(prof_make_tensor); #endif - for (uint64_t b_idx = 0; b_idx < batch; b_idx++) { + PTO2_PARALLEL_FOR(b_idx, (int)batch) { uint32_t cl_idx[1] = {static_cast(b_idx)}; uint64_t cur_seq = static_cast(get_tensor_data(context_lens, 1, cl_idx)); uint64_t bn_this_batch = (cur_seq + block_size - 1) / block_size; - for (uint64_t q_idx = 0; q_idx < q_loop; q_idx++) { + PTO2_PARALLEL_FOR(q_idx, (int)q_loop) { CYCLE_COUNT_LAP(prof_scope_and_loop); - PTO2_SCOPE() { - uint64_t cur_offset = b_idx * q_head_num + q_idx * q_tile; + { + uint64_t cur_offset = (uint64_t)b_idx * q_head_num + (uint64_t)q_idx * q_tile; uint32_t qi_shapes[2] = {static_cast(q_tile), static_cast(head_dim)}; uint32_t qi_offsets[2] = {static_cast(cur_offset), 0};