Skip to content

Commit 34e9cc8

Browse files
authored
Merge pull request #47 from devscafecommunity/10-job-system-worker-threads
feat(threading): implement Job System with work-stealing thread pool …
2 parents 166c910 + 7f34033 commit 34e9cc8

5 files changed

Lines changed: 1131 additions & 0 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ add_library(Caffeine
2121
src/input/InputManager.cpp
2222
src/debug/LogSystem.cpp
2323
src/debug/Profiler.cpp
24+
src/threading/JobSystem.cpp
2425
)
2526

2627
if(SDL3_FOUND)

src/threading/JobSystem.cpp

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
// ============================================================================
2+
// @file JobSystem.cpp
3+
// @brief Job System implementation — work-stealing thread pool
4+
// @note Part of threading/ module
5+
// ============================================================================
6+
7+
#include "threading/JobSystem.hpp"
8+
9+
#include <algorithm>
10+
#include <chrono>
11+
#include <random>
12+
13+
namespace Caffeine::Threading {
14+
15+
// ============================================================================
16+
// JobHandle
17+
// ============================================================================
18+
19+
JobHandle::JobHandle(u32 index, u32 version, std::atomic<u32>* completionFlag)
20+
: m_index(index)
21+
, m_version(version)
22+
, m_completionFlag(completionFlag) {}
23+
24+
void JobHandle::wait() const {
25+
if (!m_completionFlag) return;
26+
while (m_completionFlag->load(std::memory_order_acquire) == 0) {
27+
std::this_thread::yield();
28+
}
29+
}
30+
31+
bool JobHandle::isComplete() const {
32+
if (!m_completionFlag) return false;
33+
return m_completionFlag->load(std::memory_order_acquire) != 0;
34+
}
35+
36+
// ============================================================================
37+
// JobBarrier
38+
// ============================================================================
39+
40+
JobBarrier::JobBarrier(u32 targetCount)
41+
: m_count(targetCount) {}
42+
43+
void JobBarrier::add() {
44+
m_count.fetch_add(1, std::memory_order_acq_rel);
45+
}
46+
47+
void JobBarrier::release() {
48+
u32 prev = m_count.fetch_sub(1, std::memory_order_acq_rel);
49+
if (prev == 1) {
50+
std::lock_guard<std::mutex> lock(m_mutex);
51+
m_cv.notify_all();
52+
}
53+
}
54+
55+
void JobBarrier::wait() {
56+
if (m_count.load(std::memory_order_acquire) == 0) return;
57+
std::unique_lock<std::mutex> lock(m_mutex);
58+
m_cv.wait(lock, [this]() {
59+
return m_count.load(std::memory_order_acquire) == 0;
60+
});
61+
}
62+
63+
// ============================================================================
64+
// WorkStealDeque
65+
// ============================================================================
66+
67+
void JobSystem::WorkStealDeque::push(JobEntry&& entry) {
68+
std::lock_guard<std::mutex> lock(mutex);
69+
jobs.push_front(std::move(entry));
70+
}
71+
72+
bool JobSystem::WorkStealDeque::pop(JobEntry& out) {
73+
std::lock_guard<std::mutex> lock(mutex);
74+
if (jobs.empty()) return false;
75+
out = std::move(jobs.front());
76+
jobs.pop_front();
77+
return true;
78+
}
79+
80+
bool JobSystem::WorkStealDeque::steal(JobEntry& out) {
81+
std::lock_guard<std::mutex> lock(mutex);
82+
if (jobs.empty()) return false;
83+
out = std::move(jobs.back());
84+
jobs.pop_back();
85+
return true;
86+
}
87+
88+
bool JobSystem::WorkStealDeque::empty() const {
89+
return jobs.empty();
90+
}
91+
92+
u32 JobSystem::WorkStealDeque::size() const {
93+
return static_cast<u32>(jobs.size());
94+
}
95+
96+
// ============================================================================
97+
// JobSystem
98+
// ============================================================================
99+
100+
JobSystem::JobSystem(u32 workerCount) {
101+
if (workerCount == 0) {
102+
u32 hw = std::thread::hardware_concurrency();
103+
m_workerCount = (hw > 1) ? (hw - 1) : 1;
104+
} else {
105+
m_workerCount = workerCount;
106+
}
107+
108+
m_localQueues.reserve(static_cast<usize>(m_workerCount) * PRIORITY_COUNT);
109+
for (usize i = 0; i < static_cast<usize>(m_workerCount) * PRIORITY_COUNT; ++i) {
110+
m_localQueues.push_back(std::make_unique<WorkStealDeque>());
111+
}
112+
113+
m_running.store(true, std::memory_order_release);
114+
115+
m_workers.reserve(m_workerCount);
116+
for (u32 i = 0; i < m_workerCount; ++i) {
117+
m_workers.emplace_back(&JobSystem::workerMain, this, i);
118+
}
119+
}
120+
121+
JobSystem::~JobSystem() {
122+
waitAll();
123+
m_running.store(false, std::memory_order_release);
124+
m_wakeCV.notify_all();
125+
126+
for (auto& t : m_workers) {
127+
if (t.joinable()) t.join();
128+
}
129+
}
130+
131+
std::pair<u32, u32> JobSystem::allocateSlot() {
132+
u32 idx = m_nextSlot.fetch_add(1, std::memory_order_relaxed) % MAX_SLOTS;
133+
u32 ver = m_slots[idx].version.fetch_add(1, std::memory_order_relaxed) + 1;
134+
m_slots[idx].flag.store(0, std::memory_order_release);
135+
return {idx, ver};
136+
}
137+
138+
JobHandle JobSystem::schedule(std::unique_ptr<IJob> job,
139+
JobBarrier* barrier,
140+
JobPriority prio) {
141+
auto [idx, ver] = allocateSlot();
142+
143+
JobEntry entry;
144+
entry.job = std::move(job);
145+
entry.barrier = barrier;
146+
entry.slotIndex = idx;
147+
entry.slotVersion = ver;
148+
149+
m_pendingJobs.fetch_add(1, std::memory_order_acq_rel);
150+
151+
u32 prioIdx = static_cast<u32>(prio);
152+
m_globalQueues[prioIdx].push(std::move(entry));
153+
154+
m_wakeCV.notify_one();
155+
156+
return JobHandle(idx, ver, &m_slots[idx].flag);
157+
}
158+
159+
void JobSystem::pushToQueue(JobEntry&& entry, JobPriority prio, u32 workerHint) {
160+
u32 prioIdx = static_cast<u32>(prio);
161+
if (workerHint < m_workerCount) {
162+
m_localQueues[static_cast<usize>(workerHint) * PRIORITY_COUNT + prioIdx]->push(std::move(entry));
163+
} else {
164+
m_globalQueues[prioIdx].push(std::move(entry));
165+
}
166+
}
167+
168+
bool JobSystem::tryExecuteOne(u32 workerIndex) {
169+
JobEntry entry;
170+
171+
// 1. Try local queues in priority order (Critical first)
172+
for (u32 p = 0; p < PRIORITY_COUNT; ++p) {
173+
auto& localQ = *m_localQueues[static_cast<usize>(workerIndex) * PRIORITY_COUNT + p];
174+
if (localQ.pop(entry)) goto execute;
175+
}
176+
177+
// 2. Try global queues in priority order
178+
for (u32 p = 0; p < PRIORITY_COUNT; ++p) {
179+
if (m_globalQueues[p].pop(entry)) goto execute;
180+
}
181+
182+
// 3. Work-stealing: try to steal from other workers (priority order)
183+
for (u32 p = 0; p < PRIORITY_COUNT; ++p) {
184+
for (u32 w = 1; w <= m_workerCount; ++w) {
185+
u32 victim = (workerIndex + w) % m_workerCount;
186+
auto& victimQ = *m_localQueues[static_cast<usize>(victim) * PRIORITY_COUNT + p];
187+
if (victimQ.steal(entry)) goto execute;
188+
}
189+
}
190+
191+
return false;
192+
193+
execute:
194+
m_activeWorkers.fetch_add(1, std::memory_order_relaxed);
195+
196+
auto startTime = std::chrono::high_resolution_clock::now();
197+
198+
entry.job->execute();
199+
200+
auto endTime = std::chrono::high_resolution_clock::now();
201+
auto elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(endTime - startTime).count();
202+
m_totalJobTimeNs.fetch_add(static_cast<u64>(elapsed), std::memory_order_relaxed);
203+
204+
m_slots[entry.slotIndex].flag.store(1, std::memory_order_release);
205+
206+
if (entry.barrier) {
207+
entry.barrier->release();
208+
}
209+
210+
m_completedTotal.fetch_add(1, std::memory_order_relaxed);
211+
m_activeWorkers.fetch_sub(1, std::memory_order_relaxed);
212+
213+
u32 prev = m_pendingJobs.fetch_sub(1, std::memory_order_acq_rel);
214+
if (prev == 1) {
215+
std::lock_guard<std::mutex> lock(m_waitAllMutex);
216+
m_waitAllCV.notify_all();
217+
}
218+
219+
return true;
220+
}
221+
222+
void JobSystem::workerMain(u32 workerIndex) {
223+
while (m_running.load(std::memory_order_acquire)) {
224+
if (!tryExecuteOne(workerIndex)) {
225+
std::unique_lock<std::mutex> lock(m_wakeMutex);
226+
m_wakeCV.wait_for(lock, std::chrono::milliseconds(1), [this]() {
227+
return !m_running.load(std::memory_order_acquire) ||
228+
m_pendingJobs.load(std::memory_order_acquire) > 0;
229+
});
230+
}
231+
}
232+
233+
// Drain remaining jobs on shutdown
234+
while (tryExecuteOne(workerIndex)) {}
235+
}
236+
237+
void JobSystem::waitAll() {
238+
if (m_pendingJobs.load(std::memory_order_acquire) == 0) return;
239+
std::unique_lock<std::mutex> lock(m_waitAllMutex);
240+
m_waitAllCV.wait(lock, [this]() {
241+
return m_pendingJobs.load(std::memory_order_acquire) == 0;
242+
});
243+
}
244+
245+
JobSystem::Stats JobSystem::stats() const {
246+
Stats s;
247+
s.activeWorkers = m_activeWorkers.load(std::memory_order_relaxed);
248+
s.completedJobsTotal = m_completedTotal.load(std::memory_order_relaxed);
249+
250+
u32 pending = 0;
251+
for (u32 p = 0; p < PRIORITY_COUNT; ++p) {
252+
pending += m_globalQueues[p].size();
253+
}
254+
for (u32 w = 0; w < m_workerCount; ++w) {
255+
for (u32 p = 0; p < PRIORITY_COUNT; ++p) {
256+
pending += m_localQueues[static_cast<usize>(w) * PRIORITY_COUNT + p]->size();
257+
}
258+
}
259+
s.pendingJobs = pending;
260+
261+
u64 completed = s.completedJobsTotal;
262+
if (completed > 0) {
263+
u64 totalNs = m_totalJobTimeNs.load(std::memory_order_relaxed);
264+
s.avgJobMs = static_cast<f64>(totalNs) / static_cast<f64>(completed) / 1e6;
265+
} else {
266+
s.avgJobMs = 0.0;
267+
}
268+
269+
return s;
270+
}
271+
272+
} // namespace Caffeine::Threading

0 commit comments

Comments
 (0)