diff --git a/CMakeLists.txt b/CMakeLists.txt index c7f245d..c244e05 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,6 +21,7 @@ add_library(Caffeine src/input/InputManager.cpp src/debug/LogSystem.cpp src/debug/Profiler.cpp + src/threading/JobSystem.cpp ) if(SDL3_FOUND) diff --git a/src/threading/JobSystem.cpp b/src/threading/JobSystem.cpp new file mode 100644 index 0000000..a0773ef --- /dev/null +++ b/src/threading/JobSystem.cpp @@ -0,0 +1,272 @@ +// ============================================================================ +// @file JobSystem.cpp +// @brief Job System implementation — work-stealing thread pool +// @note Part of threading/ module +// ============================================================================ + +#include "threading/JobSystem.hpp" + +#include +#include +#include + +namespace Caffeine::Threading { + +// ============================================================================ +// JobHandle +// ============================================================================ + +JobHandle::JobHandle(u32 index, u32 version, std::atomic* completionFlag) + : m_index(index) + , m_version(version) + , m_completionFlag(completionFlag) {} + +void JobHandle::wait() const { + if (!m_completionFlag) return; + while (m_completionFlag->load(std::memory_order_acquire) == 0) { + std::this_thread::yield(); + } +} + +bool JobHandle::isComplete() const { + if (!m_completionFlag) return false; + return m_completionFlag->load(std::memory_order_acquire) != 0; +} + +// ============================================================================ +// JobBarrier +// ============================================================================ + +JobBarrier::JobBarrier(u32 targetCount) + : m_count(targetCount) {} + +void JobBarrier::add() { + m_count.fetch_add(1, std::memory_order_acq_rel); +} + +void JobBarrier::release() { + u32 prev = m_count.fetch_sub(1, std::memory_order_acq_rel); + if (prev == 1) { + std::lock_guard lock(m_mutex); + m_cv.notify_all(); + } +} + +void JobBarrier::wait() { + if (m_count.load(std::memory_order_acquire) == 0) return; + std::unique_lock lock(m_mutex); + m_cv.wait(lock, [this]() { + return m_count.load(std::memory_order_acquire) == 0; + }); +} + +// ============================================================================ +// WorkStealDeque +// ============================================================================ + +void JobSystem::WorkStealDeque::push(JobEntry&& entry) { + std::lock_guard lock(mutex); + jobs.push_front(std::move(entry)); +} + +bool JobSystem::WorkStealDeque::pop(JobEntry& out) { + std::lock_guard lock(mutex); + if (jobs.empty()) return false; + out = std::move(jobs.front()); + jobs.pop_front(); + return true; +} + +bool JobSystem::WorkStealDeque::steal(JobEntry& out) { + std::lock_guard lock(mutex); + if (jobs.empty()) return false; + out = std::move(jobs.back()); + jobs.pop_back(); + return true; +} + +bool JobSystem::WorkStealDeque::empty() const { + return jobs.empty(); +} + +u32 JobSystem::WorkStealDeque::size() const { + return static_cast(jobs.size()); +} + +// ============================================================================ +// JobSystem +// ============================================================================ + +JobSystem::JobSystem(u32 workerCount) { + if (workerCount == 0) { + u32 hw = std::thread::hardware_concurrency(); + m_workerCount = (hw > 1) ? (hw - 1) : 1; + } else { + m_workerCount = workerCount; + } + + m_localQueues.reserve(static_cast(m_workerCount) * PRIORITY_COUNT); + for (usize i = 0; i < static_cast(m_workerCount) * PRIORITY_COUNT; ++i) { + m_localQueues.push_back(std::make_unique()); + } + + m_running.store(true, std::memory_order_release); + + m_workers.reserve(m_workerCount); + for (u32 i = 0; i < m_workerCount; ++i) { + m_workers.emplace_back(&JobSystem::workerMain, this, i); + } +} + +JobSystem::~JobSystem() { + waitAll(); + m_running.store(false, std::memory_order_release); + m_wakeCV.notify_all(); + + for (auto& t : m_workers) { + if (t.joinable()) t.join(); + } +} + +std::pair JobSystem::allocateSlot() { + u32 idx = m_nextSlot.fetch_add(1, std::memory_order_relaxed) % MAX_SLOTS; + u32 ver = m_slots[idx].version.fetch_add(1, std::memory_order_relaxed) + 1; + m_slots[idx].flag.store(0, std::memory_order_release); + return {idx, ver}; +} + +JobHandle JobSystem::schedule(std::unique_ptr job, + JobBarrier* barrier, + JobPriority prio) { + auto [idx, ver] = allocateSlot(); + + JobEntry entry; + entry.job = std::move(job); + entry.barrier = barrier; + entry.slotIndex = idx; + entry.slotVersion = ver; + + m_pendingJobs.fetch_add(1, std::memory_order_acq_rel); + + u32 prioIdx = static_cast(prio); + m_globalQueues[prioIdx].push(std::move(entry)); + + m_wakeCV.notify_one(); + + return JobHandle(idx, ver, &m_slots[idx].flag); +} + +void JobSystem::pushToQueue(JobEntry&& entry, JobPriority prio, u32 workerHint) { + u32 prioIdx = static_cast(prio); + if (workerHint < m_workerCount) { + m_localQueues[static_cast(workerHint) * PRIORITY_COUNT + prioIdx]->push(std::move(entry)); + } else { + m_globalQueues[prioIdx].push(std::move(entry)); + } +} + +bool JobSystem::tryExecuteOne(u32 workerIndex) { + JobEntry entry; + + // 1. Try local queues in priority order (Critical first) + for (u32 p = 0; p < PRIORITY_COUNT; ++p) { + auto& localQ = *m_localQueues[static_cast(workerIndex) * PRIORITY_COUNT + p]; + if (localQ.pop(entry)) goto execute; + } + + // 2. Try global queues in priority order + for (u32 p = 0; p < PRIORITY_COUNT; ++p) { + if (m_globalQueues[p].pop(entry)) goto execute; + } + + // 3. Work-stealing: try to steal from other workers (priority order) + for (u32 p = 0; p < PRIORITY_COUNT; ++p) { + for (u32 w = 1; w <= m_workerCount; ++w) { + u32 victim = (workerIndex + w) % m_workerCount; + auto& victimQ = *m_localQueues[static_cast(victim) * PRIORITY_COUNT + p]; + if (victimQ.steal(entry)) goto execute; + } + } + + return false; + +execute: + m_activeWorkers.fetch_add(1, std::memory_order_relaxed); + + auto startTime = std::chrono::high_resolution_clock::now(); + + entry.job->execute(); + + auto endTime = std::chrono::high_resolution_clock::now(); + auto elapsed = std::chrono::duration_cast(endTime - startTime).count(); + m_totalJobTimeNs.fetch_add(static_cast(elapsed), std::memory_order_relaxed); + + m_slots[entry.slotIndex].flag.store(1, std::memory_order_release); + + if (entry.barrier) { + entry.barrier->release(); + } + + m_completedTotal.fetch_add(1, std::memory_order_relaxed); + m_activeWorkers.fetch_sub(1, std::memory_order_relaxed); + + u32 prev = m_pendingJobs.fetch_sub(1, std::memory_order_acq_rel); + if (prev == 1) { + std::lock_guard lock(m_waitAllMutex); + m_waitAllCV.notify_all(); + } + + return true; +} + +void JobSystem::workerMain(u32 workerIndex) { + while (m_running.load(std::memory_order_acquire)) { + if (!tryExecuteOne(workerIndex)) { + std::unique_lock lock(m_wakeMutex); + m_wakeCV.wait_for(lock, std::chrono::milliseconds(1), [this]() { + return !m_running.load(std::memory_order_acquire) || + m_pendingJobs.load(std::memory_order_acquire) > 0; + }); + } + } + + // Drain remaining jobs on shutdown + while (tryExecuteOne(workerIndex)) {} +} + +void JobSystem::waitAll() { + if (m_pendingJobs.load(std::memory_order_acquire) == 0) return; + std::unique_lock lock(m_waitAllMutex); + m_waitAllCV.wait(lock, [this]() { + return m_pendingJobs.load(std::memory_order_acquire) == 0; + }); +} + +JobSystem::Stats JobSystem::stats() const { + Stats s; + s.activeWorkers = m_activeWorkers.load(std::memory_order_relaxed); + s.completedJobsTotal = m_completedTotal.load(std::memory_order_relaxed); + + u32 pending = 0; + for (u32 p = 0; p < PRIORITY_COUNT; ++p) { + pending += m_globalQueues[p].size(); + } + for (u32 w = 0; w < m_workerCount; ++w) { + for (u32 p = 0; p < PRIORITY_COUNT; ++p) { + pending += m_localQueues[static_cast(w) * PRIORITY_COUNT + p]->size(); + } + } + s.pendingJobs = pending; + + u64 completed = s.completedJobsTotal; + if (completed > 0) { + u64 totalNs = m_totalJobTimeNs.load(std::memory_order_relaxed); + s.avgJobMs = static_cast(totalNs) / static_cast(completed) / 1e6; + } else { + s.avgJobMs = 0.0; + } + + return s; +} + +} // namespace Caffeine::Threading diff --git a/src/threading/JobSystem.hpp b/src/threading/JobSystem.hpp new file mode 100644 index 0000000..6728383 --- /dev/null +++ b/src/threading/JobSystem.hpp @@ -0,0 +1,262 @@ +// ============================================================================ +// @file JobSystem.hpp +// @brief Job System with work-stealing thread pool (RF2.2-RF2.6) +// @note Part of threading/ module — namespace Caffeine::Threading +// ============================================================================ +#pragma once + +#include "../core/Types.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Caffeine::Threading { + +using namespace Caffeine; + +// ============================================================================ +// JobPriority +// ============================================================================ + +enum class JobPriority : u8 { + Critical = 0, + Normal = 1, + Background = 2 +}; + +constexpr u32 PRIORITY_COUNT = 3; + +// ============================================================================ +// IJob / JobWithData +// ============================================================================ + +struct IJob { + virtual ~IJob() = default; + virtual void execute() = 0; + virtual JobPriority priority() const { return JobPriority::Normal; } +}; + +template +struct JobWithData : IJob { + DataT data; + std::function func; + JobPriority prio = JobPriority::Normal; + + void execute() override { func(data); } + JobPriority priority() const override { return prio; } +}; + +// ============================================================================ +// JobHandle — index + version to avoid ABA problem +// ============================================================================ + +class JobHandle { +public: + JobHandle() = default; + explicit JobHandle(u32 index, u32 version, std::atomic* completionFlag); + + void wait() const; + bool isComplete() const; + explicit operator bool() const { return isComplete(); } + +private: + u32 m_index = ~u32(0); + u32 m_version = ~u32(0); + std::atomic* m_completionFlag = nullptr; +}; + +// ============================================================================ +// JobBarrier — wait() blocks until N jobs release() +// ============================================================================ + +class JobBarrier { +public: + explicit JobBarrier(u32 targetCount = 0); + + void add(); + void release(); + void wait(); + +private: + std::atomic m_count{0}; + std::mutex m_mutex; + std::condition_variable m_cv; +}; + +// ============================================================================ +// JobSystem +// ============================================================================ + +class JobSystem { +public: + explicit JobSystem(u32 workerCount = 0); + ~JobSystem(); + + JobSystem(const JobSystem&) = delete; + JobSystem& operator=(const JobSystem&) = delete; + + JobHandle schedule(std::unique_ptr job, + JobBarrier* barrier = nullptr, + JobPriority prio = JobPriority::Normal); + + template + JobHandle scheduleData(DataT data, + FuncT&& func, + JobBarrier* barrier = nullptr, + JobPriority prio = JobPriority::Normal) { + auto job = std::make_unique>(); + job->data = std::move(data); + job->func = std::forward(func); + job->prio = prio; + return schedule(std::move(job), barrier, prio); + } + + template + JobHandle scheduleParallelFor(u32 count, + FuncT&& func, + JobBarrier* barrier = nullptr, + JobPriority prio = JobPriority::Normal); + + void waitAll(); + u32 workerCount() const { return m_workerCount; } + + struct Stats { + u32 activeWorkers; + u32 pendingJobs; + u64 completedJobsTotal; + f64 avgJobMs; + }; + Stats stats() const; + +private: + struct JobEntry { + std::unique_ptr job; + JobBarrier* barrier = nullptr; + u32 slotIndex = 0; + u32 slotVersion = 0; + }; + + struct WorkStealDeque { + std::mutex mutex; + std::deque jobs; + + void push(JobEntry&& entry); + bool pop(JobEntry& out); + bool steal(JobEntry& out); + bool empty() const; + u32 size() const; + }; + + struct CompletionSlot { + std::atomic flag{0}; + std::atomic version{0}; + }; + + void workerMain(u32 workerIndex); + bool tryExecuteOne(u32 workerIndex); + void pushToQueue(JobEntry&& entry, JobPriority prio, u32 workerHint); + std::pair allocateSlot(); + + u32 m_workerCount = 0; + std::vector m_workers; + std::atomic m_running{false}; + std::atomic m_activeWorkers{0}; + + // Per-worker deques (one set of 3 priority queues per worker) + // Layout: m_localQueues[workerIndex * PRIORITY_COUNT + priorityLevel] + std::vector> m_localQueues; + + // Global overflow queues (one per priority level) + WorkStealDeque m_globalQueues[PRIORITY_COUNT]; + + std::mutex m_wakeMutex; + std::condition_variable m_wakeCV; + + // Completion tracking + static constexpr u32 MAX_SLOTS = 4096; + CompletionSlot m_slots[MAX_SLOTS]; + std::atomic m_nextSlot{0}; + + // Stats + std::atomic m_completedTotal{0}; + std::atomic m_totalJobTimeNs{0}; + + // Pending job count for waitAll + std::atomic m_pendingJobs{0}; + std::mutex m_waitAllMutex; + std::condition_variable m_waitAllCV; +}; + +// ============================================================================ +// scheduleParallelFor — template implementation +// ============================================================================ + +template +JobHandle JobSystem::scheduleParallelFor(u32 count, + FuncT&& func, + JobBarrier* barrier, + JobPriority prio) { + if (count == 0) { + auto [idx, ver] = allocateSlot(); + m_slots[idx].flag.store(1, std::memory_order_release); + return JobHandle(idx, ver, &m_slots[idx].flag); + } + + u32 chunkCount = m_workerCount > 0 ? m_workerCount : 1; + if (chunkCount > count) chunkCount = count; + u32 chunkSize = count / chunkCount; + u32 remainder = count % chunkCount; + + auto [idx, ver] = allocateSlot(); + + struct ParallelChunk { + u32 begin; + u32 end; + }; + + std::atomic* chunkCounter = new std::atomic(chunkCount); + auto* slotFlag = &m_slots[idx].flag; + + if (barrier) { + barrier->add(); // one add per parallelFor + // We already have chunkCount sub-jobs; barrier tracks the parallelFor as 1 unit + } + + u32 offset = 0; + for (u32 c = 0; c < chunkCount; ++c) { + u32 begin = offset; + u32 end = begin + chunkSize + (c < remainder ? 1 : 0); + offset = end; + + // Capture func by value for each chunk + auto chunkFunc = func; // copy the callable + auto chunkJob = std::make_unique>(); + chunkJob->data = ParallelChunk{begin, end}; + chunkJob->prio = prio; + chunkJob->func = [chunkFunc, chunkCounter, slotFlag, barrier](ParallelChunk& chunk) mutable { + for (u32 i = chunk.begin; i < chunk.end; ++i) { + chunkFunc(i); + } + u32 prev = chunkCounter->fetch_sub(1, std::memory_order_acq_rel); + if (prev == 1) { + // Last chunk done + slotFlag->store(1, std::memory_order_release); + if (barrier) barrier->release(); + delete chunkCounter; + } + }; + + // Schedule chunk but don't attach to barrier directly (handled above) + schedule(std::move(chunkJob), nullptr, prio); + } + + return JobHandle(idx, ver, slotFlag); +} + +} // namespace Caffeine::Threading diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9381dc1..f4556b5 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -9,6 +9,7 @@ set(CAFFEINE_TEST_SOURCES test_gameloop.cpp test_input.cpp test_debug.cpp + test_jobsystem.cpp ) if(SDL3_FOUND) diff --git a/tests/test_jobsystem.cpp b/tests/test_jobsystem.cpp new file mode 100644 index 0000000..ec8cfc5 --- /dev/null +++ b/tests/test_jobsystem.cpp @@ -0,0 +1,595 @@ +// ============================================================================ +// @file test_jobsystem.cpp +// @brief Tests for Caffeine::Threading Job System (RF2.2-RF2.6) +// @note TDD: tests written first, implementation follows +// ============================================================================ +#include "catch.hpp" + +#include +#include +#include +#include +#include + +#include "../src/threading/JobSystem.hpp" + +using namespace Caffeine; +using namespace Caffeine::Threading; + +// ============================================================================ +// JobPriority — enum values +// ============================================================================ + +TEST_CASE("JobPriority - Critical is highest priority (lowest value)", "[threading][priority]") { + REQUIRE(static_cast(JobPriority::Critical) == 0); + REQUIRE(static_cast(JobPriority::Normal) == 1); + REQUIRE(static_cast(JobPriority::Background) == 2); +} + +TEST_CASE("JobPriority - Critical < Normal < Background ordering", "[threading][priority]") { + REQUIRE(static_cast(JobPriority::Critical) < static_cast(JobPriority::Normal)); + REQUIRE(static_cast(JobPriority::Normal) < static_cast(JobPriority::Background)); +} + +// ============================================================================ +// IJob / JobWithData — job types +// ============================================================================ + +TEST_CASE("IJob - default priority is Normal", "[threading][job]") { + struct TestJob : IJob { + void execute() override {} + }; + TestJob job; + REQUIRE(job.priority() == JobPriority::Normal); +} + +TEST_CASE("IJob - custom priority override", "[threading][job]") { + struct CriticalJob : IJob { + void execute() override {} + JobPriority priority() const override { return JobPriority::Critical; } + }; + CriticalJob job; + REQUIRE(job.priority() == JobPriority::Critical); +} + +TEST_CASE("IJob - virtual destructor is safe", "[threading][job]") { + struct CounterJob : IJob { + int* counter; + explicit CounterJob(int* c) : counter(c) {} + ~CounterJob() override { (*counter)++; } + void execute() override {} + }; + int destructorCount = 0; + { + std::unique_ptr job = std::make_unique(&destructorCount); + } + REQUIRE(destructorCount == 1); +} + +TEST_CASE("JobWithData - stores data and executes function", "[threading][job]") { + int result = 0; + JobWithData job; + job.data = 42; + job.func = [&result](int& d) { result = d * 2; }; + job.prio = JobPriority::Normal; + + job.execute(); + REQUIRE(result == 84); +} + +TEST_CASE("JobWithData - priority is configurable", "[threading][job]") { + JobWithData job; + job.data = 0; + job.func = [](int&) {}; + job.prio = JobPriority::Background; + + REQUIRE(job.priority() == JobPriority::Background); +} + +TEST_CASE("JobWithData - complex data type", "[threading][job]") { + struct PhysicsData { + f32 x, y, z; + f32 mass; + }; + + f32 totalMass = 0.0f; + JobWithData job; + job.data = PhysicsData{1.0f, 2.0f, 3.0f, 10.5f}; + job.func = [&totalMass](PhysicsData& d) { totalMass = d.mass; }; + job.prio = JobPriority::Critical; + + job.execute(); + REQUIRE(totalMass == Approx(10.5f)); + REQUIRE(job.priority() == JobPriority::Critical); +} + +// ============================================================================ +// JobHandle — index + version tracking +// ============================================================================ + +TEST_CASE("JobHandle - default constructed is not complete", "[threading][handle]") { + JobHandle handle; + REQUIRE_FALSE(handle.isComplete()); +} + +TEST_CASE("JobHandle - bool conversion matches isComplete", "[threading][handle]") { + JobHandle handle; + REQUIRE(static_cast(handle) == handle.isComplete()); +} + +// ============================================================================ +// JobBarrier — group synchronization +// ============================================================================ + +TEST_CASE("JobBarrier - initial count zero means already done", "[threading][barrier]") { + JobBarrier barrier(0); + // wait() should return immediately + barrier.wait(); + REQUIRE(true); // If we got here, wait() didn't hang +} + +TEST_CASE("JobBarrier - release decrements count", "[threading][barrier]") { + JobBarrier barrier(2); + barrier.release(); + barrier.release(); + barrier.wait(); // Should return now + REQUIRE(true); +} + +TEST_CASE("JobBarrier - add increments pending count", "[threading][barrier]") { + JobBarrier barrier(0); + barrier.add(); + barrier.add(); + barrier.release(); + barrier.release(); + barrier.wait(); + REQUIRE(true); +} + +TEST_CASE("JobBarrier - wait blocks until all released", "[threading][barrier]") { + JobBarrier barrier(3); + std::atomic done{false}; + + std::thread waiter([&]() { + barrier.wait(); + done.store(true, std::memory_order_release); + }); + + // Release 2 of 3 — should still be waiting + barrier.release(); + barrier.release(); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + REQUIRE_FALSE(done.load(std::memory_order_acquire)); + + // Release final one + barrier.release(); + waiter.join(); + REQUIRE(done.load(std::memory_order_acquire)); +} + +// ============================================================================ +// JobSystem — construction and lifecycle +// ============================================================================ + +TEST_CASE("JobSystem - default constructor creates workers", "[threading][system]") { + JobSystem system; + REQUIRE(system.workerCount() > 0); +} + +TEST_CASE("JobSystem - explicit worker count", "[threading][system]") { + JobSystem system(2); + REQUIRE(system.workerCount() == 2); +} + +TEST_CASE("JobSystem - zero means auto (hardware_concurrency - 1)", "[threading][system]") { + JobSystem system(0); + u32 expected = std::thread::hardware_concurrency(); + if (expected > 1) expected -= 1; + if (expected == 0) expected = 1; + REQUIRE(system.workerCount() == expected); +} + +TEST_CASE("JobSystem - destructor joins all workers cleanly", "[threading][system]") { + { + JobSystem system(2); + // Just let it destruct + } + REQUIRE(true); // No hang, no crash +} + +// ============================================================================ +// JobSystem — scheduling and execution +// ============================================================================ + +TEST_CASE("JobSystem - schedule single job executes", "[threading][system]") { + JobSystem system(2); + std::atomic counter{0}; + + struct IncrementJob : IJob { + std::atomic* counter; + explicit IncrementJob(std::atomic* c) : counter(c) {} + void execute() override { + counter->fetch_add(1, std::memory_order_relaxed); + } + }; + + auto handle = system.schedule(std::make_unique(&counter)); + handle.wait(); + REQUIRE(counter.load() == 1); +} + +TEST_CASE("JobSystem - schedule returns valid handle", "[threading][system]") { + JobSystem system(2); + std::atomic dummy{0}; + + struct DummyJob : IJob { + std::atomic* d; + explicit DummyJob(std::atomic* d) : d(d) {} + void execute() override { d->fetch_add(1); } + }; + + auto handle = system.schedule(std::make_unique(&dummy)); + handle.wait(); + REQUIRE(handle.isComplete()); +} + +TEST_CASE("JobSystem - schedule with barrier", "[threading][system]") { + JobSystem system(2); + std::atomic counter{0}; + JobBarrier barrier(3); + + struct AddJob : IJob { + std::atomic* c; + explicit AddJob(std::atomic* c) : c(c) {} + void execute() override { c->fetch_add(1); } + }; + + system.schedule(std::make_unique(&counter), &barrier); + system.schedule(std::make_unique(&counter), &barrier); + system.schedule(std::make_unique(&counter), &barrier); + + barrier.wait(); + REQUIRE(counter.load() == 3); +} + +TEST_CASE("JobSystem - schedule with explicit priority", "[threading][system]") { + JobSystem system(2); + std::atomic counter{0}; + + struct PrioJob : IJob { + std::atomic* c; + JobPriority p; + PrioJob(std::atomic* c, JobPriority p) : c(c), p(p) {} + void execute() override { c->fetch_add(1); } + JobPriority priority() const override { return p; } + }; + + auto h1 = system.schedule( + std::make_unique(&counter, JobPriority::Critical), nullptr, JobPriority::Critical); + auto h2 = system.schedule( + std::make_unique(&counter, JobPriority::Background), nullptr, JobPriority::Background); + + h1.wait(); + h2.wait(); + REQUIRE(counter.load() == 2); +} + +// ============================================================================ +// JobSystem — scheduleData sugar +// ============================================================================ + +TEST_CASE("JobSystem - scheduleData with int", "[threading][system]") { + JobSystem system(2); + std::atomic result{0}; + + auto handle = system.scheduleData( + 42, + [&result](int& val) { result.store(val * 2); } + ); + handle.wait(); + REQUIRE(result.load() == 84); +} + +TEST_CASE("JobSystem - scheduleData with struct", "[threading][system]") { + JobSystem system(2); + + struct Payload { int a; int b; }; + std::atomic sum{0}; + + auto handle = system.scheduleData( + Payload{10, 20}, + [&sum](Payload& p) { sum.store(p.a + p.b); } + ); + handle.wait(); + REQUIRE(sum.load() == 30); +} + +TEST_CASE("JobSystem - scheduleData with barrier", "[threading][system]") { + JobSystem system(2); + std::atomic total{0}; + JobBarrier barrier(4); + + for (int i = 1; i <= 4; ++i) { + system.scheduleData( + i, + [&total](int& val) { total.fetch_add(val); }, + &barrier + ); + } + + barrier.wait(); + REQUIRE(total.load() == 10); // 1 + 2 + 3 + 4 +} + +TEST_CASE("JobSystem - scheduleData with priority", "[threading][system]") { + JobSystem system(2); + std::atomic counter{0}; + + auto handle = system.scheduleData( + 1, + [&counter](int& val) { counter.fetch_add(val); }, + nullptr, + JobPriority::Critical + ); + handle.wait(); + REQUIRE(counter.load() == 1); +} + +// ============================================================================ +// JobSystem — scheduleParallelFor +// ============================================================================ + +TEST_CASE("JobSystem - parallelFor processes all elements", "[threading][parallel]") { + JobSystem system(4); + constexpr u32 COUNT = 1000; + std::vector> data(COUNT); + for (auto& d : data) d.store(0); + + auto handle = system.scheduleParallelFor(COUNT, + [&data](u32 i) { data[i].store(static_cast(i * 2)); } + ); + handle.wait(); + + for (u32 i = 0; i < COUNT; ++i) { + REQUIRE(data[i].load() == static_cast(i * 2)); + } +} + +TEST_CASE("JobSystem - parallelFor with zero count", "[threading][parallel]") { + JobSystem system(2); + auto handle = system.scheduleParallelFor(0, [](u32) {}); + handle.wait(); + REQUIRE(handle.isComplete()); +} + +TEST_CASE("JobSystem - parallelFor with barrier", "[threading][parallel]") { + JobSystem system(4); + constexpr u32 COUNT = 500; + std::atomic sum{0}; + JobBarrier barrier(0); // parallelFor will set barrier count internally + + auto handle = system.scheduleParallelFor(COUNT, + [&sum](u32 i) { sum.fetch_add(i); }, + &barrier + ); + + barrier.wait(); + // sum of 0..499 = 499*500/2 = 124750 + REQUIRE(sum.load() == 124750); +} + +TEST_CASE("JobSystem - parallelFor with single element", "[threading][parallel]") { + JobSystem system(2); + std::atomic value{0}; + + auto handle = system.scheduleParallelFor(1, + [&value](u32) { value.store(42); } + ); + handle.wait(); + REQUIRE(value.load() == 42); +} + +// ============================================================================ +// JobSystem — waitAll +// ============================================================================ + +TEST_CASE("JobSystem - waitAll completes all pending jobs", "[threading][system]") { + JobSystem system(2); + std::atomic counter{0}; + + for (int i = 0; i < 50; ++i) { + system.scheduleData( + 1, + [&counter](int& val) { counter.fetch_add(val); } + ); + } + + system.waitAll(); + REQUIRE(counter.load() == 50); +} + +TEST_CASE("JobSystem - waitAll on empty system returns immediately", "[threading][system]") { + JobSystem system(2); + system.waitAll(); + REQUIRE(true); +} + +// ============================================================================ +// JobSystem — stats +// ============================================================================ + +TEST_CASE("JobSystem - stats returns valid data", "[threading][stats]") { + JobSystem system(2); + auto s = system.stats(); + + REQUIRE(s.activeWorkers <= system.workerCount()); + REQUIRE(s.completedJobsTotal == 0); +} + +TEST_CASE("JobSystem - stats tracks completed jobs", "[threading][stats]") { + JobSystem system(2); + std::atomic dummy{0}; + + for (int i = 0; i < 10; ++i) { + auto h = system.scheduleData(1, [&dummy](int& v) { dummy.fetch_add(v); }); + h.wait(); + } + + auto s = system.stats(); + REQUIRE(s.completedJobsTotal >= 10); +} + +// ============================================================================ +// Stress tests — concurrency correctness +// ============================================================================ + +TEST_CASE("Stress - 1K jobs all complete correctly", "[threading][stress]") { + JobSystem system; + constexpr int JOB_COUNT = 1000; + std::atomic counter{0}; + + JobBarrier barrier(JOB_COUNT); + + for (int i = 0; i < JOB_COUNT; ++i) { + system.scheduleData( + 1, + [&counter](int& val) { counter.fetch_add(val); }, + &barrier + ); + } + + barrier.wait(); + REQUIRE(counter.load() == JOB_COUNT); +} + +TEST_CASE("Stress - 10K jobs complete", "[threading][stress]") { + JobSystem system; + constexpr int JOB_COUNT = 10000; + std::atomic counter{0}; + + JobBarrier barrier(JOB_COUNT); + + for (int i = 0; i < JOB_COUNT; ++i) { + system.scheduleData( + 1, + [&counter](int& val) { counter.fetch_add(val); }, + &barrier + ); + } + + barrier.wait(); + REQUIRE(counter.load() == JOB_COUNT); +} + +TEST_CASE("Stress - parallelFor 100K elements", "[threading][stress]") { + JobSystem system; + constexpr u32 COUNT = 100000; + std::atomic sum{0}; + + auto handle = system.scheduleParallelFor(COUNT, + [&sum](u32 i) { sum.fetch_add(i); } + ); + handle.wait(); + + // sum of 0..99999 = 99999*100000/2 = 4999950000 + REQUIRE(sum.load() == 4999950000ULL); +} + +TEST_CASE("Stress - mixed priorities all complete", "[threading][stress]") { + JobSystem system; + constexpr int PER_LEVEL = 500; + std::atomic critical{0}; + std::atomic normal{0}; + std::atomic background{0}; + + JobBarrier barrier(PER_LEVEL * 3); + + for (int i = 0; i < PER_LEVEL; ++i) { + system.scheduleData(1, [&critical](int& v) { critical.fetch_add(v); }, + &barrier, JobPriority::Critical); + system.scheduleData(1, [&normal](int& v) { normal.fetch_add(v); }, + &barrier, JobPriority::Normal); + system.scheduleData(1, [&background](int& v) { background.fetch_add(v); }, + &barrier, JobPriority::Background); + } + + barrier.wait(); + REQUIRE(critical.load() == PER_LEVEL); + REQUIRE(normal.load() == PER_LEVEL); + REQUIRE(background.load() == PER_LEVEL); +} + +TEST_CASE("Stress - multiple barriers independent", "[threading][stress]") { + JobSystem system; + constexpr int COUNT = 200; + + std::atomic sumA{0}; + std::atomic sumB{0}; + JobBarrier barrierA(COUNT); + JobBarrier barrierB(COUNT); + + for (int i = 0; i < COUNT; ++i) { + system.scheduleData(1, [&sumA](int& v) { sumA.fetch_add(v); }, &barrierA); + system.scheduleData(2, [&sumB](int& v) { sumB.fetch_add(v); }, &barrierB); + } + + barrierA.wait(); + barrierB.wait(); + + REQUIRE(sumA.load() == COUNT); + REQUIRE(sumB.load() == COUNT * 2); +} + +TEST_CASE("Stress - rapid create-destroy cycles", "[threading][stress]") { + for (int cycle = 0; cycle < 5; ++cycle) { + JobSystem system(2); + std::atomic counter{0}; + + for (int i = 0; i < 100; ++i) { + system.scheduleData(1, [&counter](int& v) { counter.fetch_add(v); }); + } + system.waitAll(); + REQUIRE(counter.load() == 100); + } +} + +// ============================================================================ +// Background jobs never starve critical jobs +// ============================================================================ + +TEST_CASE("Priority - critical jobs complete before background under load", "[threading][priority]") { + JobSystem system(2); + + // Flood with background jobs + std::atomic bgDone{0}; + for (int i = 0; i < 200; ++i) { + system.scheduleData( + 1, + [&bgDone](int& v) { + // Simulate slow work + volatile int sink = 0; + for (int j = 0; j < 1000; ++j) sink += j; + (void)sink; + bgDone.fetch_add(v); + }, + nullptr, + JobPriority::Background + ); + } + + // Schedule a critical job after the flood + std::atomic critDone{false}; + auto critHandle = system.scheduleData( + 0, + [&critDone](int&) { critDone.store(true); }, + nullptr, + JobPriority::Critical + ); + + critHandle.wait(); + REQUIRE(critDone.load()); + + // Wait for everything to finish + system.waitAll(); + REQUIRE(bgDone.load() == 200); +}