From 52767aa34c3905d8d5872a4fd2e822a59f80b0c0 Mon Sep 17 00:00:00 2001 From: mrabine Date: Mon, 25 May 2026 11:01:20 +0200 Subject: [PATCH 1/2] Fix race in MPSC queue --- core/include/join/function.hpp | 12 ++++++------ core/include/join/queue.hpp | 18 ++++++++++++------ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/core/include/join/function.hpp b/core/include/join/function.hpp index b131aec4..dc777ff0 100644 --- a/core/include/join/function.hpp +++ b/core/include/join/function.hpp @@ -109,11 +109,11 @@ namespace join * @brief construct with a callable object. * @param callable callable object. */ - template < - typename Func, typename DecayedFunc = std::decay_t, - typename = std::enable_if_t::value && - (std::is_void::value || - std::is_convertible, Return>::value)>> + template , + typename = std::enable_if_t< + !std::is_same::value && + (std::is_void::value || + std::is_convertible::type, Return>::value)>> Function (Func&& callable) { static_assert (sizeof (DecayedFunc) <= Capacity, "Callable size exceeds Function capacity."); @@ -152,7 +152,7 @@ namespace join * @return result of the invocation. * @throw std::bad_function_call if no callable is stored. */ - Return operator() (Args... args) + Return operator() (Args&&... args) { if (!_invoker) { diff --git a/core/include/join/queue.hpp b/core/include/join/queue.hpp index 09fd4b62..e05c75db 100644 --- a/core/include/join/queue.hpp +++ b/core/include/join/queue.hpp @@ -766,6 +766,11 @@ namespace join for (uint64_t i = 0; i < toWrite; ++i) { auto* slot = &segment->_elements[(head + i) & sync._mask]; + Backoff slotBackoff; + while (slot->_seq.load (std::memory_order_acquire) != head + i) + { + slotBackoff (); + } slot->data = elements[i]; slot->_seq.store (head + i + 1, std::memory_order_release); } @@ -921,10 +926,11 @@ namespace join if (seq == (tail + 1)) { + Type local = slot->data; if (JOIN_LIKELY (sync._tail.compare_exchange_weak (tail, tail + 1, std::memory_order_acquire, std::memory_order_relaxed))) { - element = slot->data; + element = local; slot->_seq.store (tail + sync._capacity, std::memory_order_release); return 0; } @@ -976,11 +982,12 @@ namespace join uint64_t ready = 0; for (; ready < toRead; ++ready) { - if (JOIN_UNLIKELY (segment->_elements[(tail + ready) & sync._mask]._seq.load ( - std::memory_order_acquire) != tail + ready + 1)) + auto* slot = &segment->_elements[(tail + ready) & sync._mask]; + if (JOIN_UNLIKELY (slot->_seq.load (std::memory_order_acquire) != tail + ready + 1)) { break; } + elements[ready] = slot->data; } if (JOIN_UNLIKELY (ready == 0)) @@ -994,9 +1001,8 @@ namespace join { for (uint64_t i = 0; i < ready; ++i) { - auto* slot = &segment->_elements[(tail + i) & sync._mask]; - elements[i] = slot->data; - slot->_seq.store (tail + i + sync._capacity, std::memory_order_release); + segment->_elements[(tail + i) & sync._mask]._seq.store (tail + i + sync._capacity, + std::memory_order_release); } return static_cast (ready); From fb74c3ada821862d7598951e7eec478cfa98ce5d Mon Sep 17 00:00:00 2001 From: mrabine Date: Mon, 25 May 2026 11:25:52 +0200 Subject: [PATCH 2/2] coverage --- core/include/join/queue.hpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/include/join/queue.hpp b/core/include/join/queue.hpp index e05c75db..2b900995 100644 --- a/core/include/join/queue.hpp +++ b/core/include/join/queue.hpp @@ -247,7 +247,7 @@ namespace join { if (JOIN_UNLIKELY (lastError != Errc::TemporaryError)) { - return -1; + return -1; // LCOV_EXCL_LINE } backoff (); @@ -277,7 +277,7 @@ namespace join return -1; } - backoff (); + backoff (); // LCOV_EXCL_LINE } else { @@ -322,7 +322,7 @@ namespace join { if (JOIN_UNLIKELY (lastError != Errc::TemporaryError)) { - return -1; + return -1; // LCOV_EXCL_LINE } backoff (); @@ -769,7 +769,7 @@ namespace join Backoff slotBackoff; while (slot->_seq.load (std::memory_order_acquire) != head + i) { - slotBackoff (); + slotBackoff (); // LCOV_EXCL_LINE } slot->data = elements[i]; slot->_seq.store (head + i + 1, std::memory_order_release); @@ -778,7 +778,7 @@ namespace join return static_cast (toWrite); } - backoff (); + backoff (); // LCOV_EXCL_LINE } } @@ -1008,7 +1008,7 @@ namespace join return static_cast (ready); } - backoff (); + backoff (); // LCOV_EXCL_LINE } } };