diff --git a/core/include/join/function.hpp b/core/include/join/function.hpp index b131aec4..dc777ff0 100644 --- a/core/include/join/function.hpp +++ b/core/include/join/function.hpp @@ -109,11 +109,11 @@ namespace join * @brief construct with a callable object. * @param callable callable object. */ - template < - typename Func, typename DecayedFunc = std::decay_t, - typename = std::enable_if_t::value && - (std::is_void::value || - std::is_convertible, Return>::value)>> + template , + typename = std::enable_if_t< + !std::is_same::value && + (std::is_void::value || + std::is_convertible::type, Return>::value)>> Function (Func&& callable) { static_assert (sizeof (DecayedFunc) <= Capacity, "Callable size exceeds Function capacity."); @@ -152,7 +152,7 @@ namespace join * @return result of the invocation. * @throw std::bad_function_call if no callable is stored. */ - Return operator() (Args... args) + Return operator() (Args&&... args) { if (!_invoker) { diff --git a/core/include/join/queue.hpp b/core/include/join/queue.hpp index 09fd4b62..2b900995 100644 --- a/core/include/join/queue.hpp +++ b/core/include/join/queue.hpp @@ -247,7 +247,7 @@ namespace join { if (JOIN_UNLIKELY (lastError != Errc::TemporaryError)) { - return -1; + return -1; // LCOV_EXCL_LINE } backoff (); @@ -277,7 +277,7 @@ namespace join return -1; } - backoff (); + backoff (); // LCOV_EXCL_LINE } else { @@ -322,7 +322,7 @@ namespace join { if (JOIN_UNLIKELY (lastError != Errc::TemporaryError)) { - return -1; + return -1; // LCOV_EXCL_LINE } backoff (); @@ -766,6 +766,11 @@ namespace join for (uint64_t i = 0; i < toWrite; ++i) { auto* slot = &segment->_elements[(head + i) & sync._mask]; + Backoff slotBackoff; + while (slot->_seq.load (std::memory_order_acquire) != head + i) + { + slotBackoff (); // LCOV_EXCL_LINE + } slot->data = elements[i]; slot->_seq.store (head + i + 1, std::memory_order_release); } @@ -773,7 +778,7 @@ namespace join return static_cast (toWrite); } - backoff (); + backoff (); // LCOV_EXCL_LINE } } @@ -921,10 +926,11 @@ namespace join if (seq == (tail + 1)) { + Type local = slot->data; if (JOIN_LIKELY (sync._tail.compare_exchange_weak (tail, tail + 1, std::memory_order_acquire, std::memory_order_relaxed))) { - element = slot->data; + element = local; slot->_seq.store (tail + sync._capacity, std::memory_order_release); return 0; } @@ -976,11 +982,12 @@ namespace join uint64_t ready = 0; for (; ready < toRead; ++ready) { - if (JOIN_UNLIKELY (segment->_elements[(tail + ready) & sync._mask]._seq.load ( - std::memory_order_acquire) != tail + ready + 1)) + auto* slot = &segment->_elements[(tail + ready) & sync._mask]; + if (JOIN_UNLIKELY (slot->_seq.load (std::memory_order_acquire) != tail + ready + 1)) { break; } + elements[ready] = slot->data; } if (JOIN_UNLIKELY (ready == 0)) @@ -994,15 +1001,14 @@ namespace join { for (uint64_t i = 0; i < ready; ++i) { - auto* slot = &segment->_elements[(tail + i) & sync._mask]; - elements[i] = slot->data; - slot->_seq.store (tail + i + sync._capacity, std::memory_order_release); + segment->_elements[(tail + i) & sync._mask]._seq.store (tail + i + sync._capacity, + std::memory_order_release); } return static_cast (ready); } - backoff (); + backoff (); // LCOV_EXCL_LINE } } };