diff --git a/core/include/join/memory.hpp b/core/include/join/memory.hpp index 3da530fd..0801ebb6 100644 --- a/core/include/join/memory.hpp +++ b/core/include/join/memory.hpp @@ -265,11 +265,13 @@ namespace join */ void create () { - _ptr = ::mmap (nullptr, _size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB, -1, 0); + _ptr = ::mmap (nullptr, _size, PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE | MAP_HUGETLB, -1, 0); if ((_ptr == MAP_FAILED) && ((errno == ENOMEM) || (errno == EINVAL))) { // no hugepages available or no support. - _ptr = ::mmap (nullptr, _size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + _ptr = + ::mmap (nullptr, _size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE, -1, 0); } if (_ptr == MAP_FAILED) diff --git a/core/include/join/queue.hpp b/core/include/join/queue.hpp index 2b900995..ec037fbc 100644 --- a/core/include/join/queue.hpp +++ b/core/include/join/queue.hpp @@ -57,12 +57,6 @@ namespace join /// read position. alignas (64) std::atomic_uint64_t _tail; - - /// total queue capacity (power of 2). - alignas (64) uint64_t _capacity; - - /// bit mask for fast modulo. - alignas (64) uint64_t _mask; }; /** @@ -148,6 +142,7 @@ namespace join template explicit BasicQueue (uint64_t capacity, Args&&... args) : _capacity (roundPow2 (capacity)) + , _mask (_capacity - 1) , _elementSize (sizeof (Slot)) , _totalSize (sizeof (QueueSync) + (_capacity * _elementSize)) , _backend (_totalSize, std::forward (args)...) @@ -160,8 +155,6 @@ namespace join { _segment->_sync._head.store (0, std::memory_order_relaxed); _segment->_sync._tail.store (0, std::memory_order_relaxed); - _segment->_sync._capacity = _capacity; - _segment->_sync._mask = _capacity - 1; initSlots::value> (); @@ -175,11 +168,6 @@ namespace join backoff (); // LCOV_EXCL_LINE } } - - if (_segment->_sync._capacity != _capacity) - { - throw std::runtime_error ("capacity mismatch"); - } } /** @@ -220,7 +208,7 @@ namespace join */ int tryPush (const Type& element) noexcept { - return SyncPolicy::tryPush (_segment, element, _cachedTail); + return SyncPolicy::tryPush (_segment, element, _cachedTail, _capacity, _mask); } /** @@ -231,7 +219,7 @@ namespace join */ ssize_t tryPush (const Type* elements, size_t size) noexcept { - return SyncPolicy::tryPush (_segment, elements, size, _cachedTail); + return SyncPolicy::tryPush (_segment, elements, size, _cachedTail, _capacity, _mask); } /** @@ -295,7 +283,7 @@ namespace join */ int tryPop (Type& element) noexcept { - return SyncPolicy::tryPop (_segment, element, _cachedHead); + return SyncPolicy::tryPop (_segment, element, _cachedHead, _capacity, _mask); } /** @@ -306,7 +294,7 @@ namespace join */ ssize_t tryPop (Type* elements, size_t size) noexcept { - return SyncPolicy::tryPop (_segment, elements, size, _cachedHead); + return SyncPolicy::tryPop (_segment, elements, size, _cachedHead, _capacity, _mask); } /** @@ -352,7 +340,7 @@ namespace join return -1; } - backoff (); + backoff (); // LCOV_EXCL_LINE } else { @@ -388,7 +376,7 @@ namespace join { return 0; // LCOV_EXCL_LINE } - return _segment->_sync._capacity - pending (); + return _capacity - pending (); } /** @@ -401,7 +389,7 @@ namespace join { return false; // LCOV_EXCL_LINE } - return pending () == _segment->_sync._capacity; + return pending () == _capacity; } /** @@ -484,6 +472,9 @@ namespace join /// memory segment capacity. const uint64_t _capacity = 0; + /// bit mask for fast modulo. + const uint64_t _mask = 0; + /// memory segment element size. const uint64_t _elementSize = 0; @@ -516,10 +507,13 @@ namespace join * @brief try to push element into the ring buffer. * @param segment shared memory segment. * @param element element to push. - * @param cachedTail producer-side cached tail index. + * @param cachedTail producer-side cached index. + * @param capacity memory segment capacity. + * @param mask bit mask for fast modulo. * @return 0 on success, -1 otherwise. */ - static int tryPush (Segment* segment, const Type& element, uint64_t& cachedTail) noexcept + static int tryPush (Segment* segment, const Type& element, uint64_t& cachedTail, uint64_t capacity, + uint64_t mask) noexcept { if (JOIN_UNLIKELY (segment == nullptr)) { @@ -532,17 +526,17 @@ namespace join auto& sync = segment->_sync; uint64_t head = sync._head.load (std::memory_order_relaxed); - if (JOIN_UNLIKELY ((head - cachedTail) == sync._capacity)) + if (JOIN_UNLIKELY ((head - cachedTail) == capacity)) { cachedTail = sync._tail.load (std::memory_order_acquire); - if ((head - cachedTail) == sync._capacity) + if ((head - cachedTail) == capacity) { lastError = make_error_code (Errc::TemporaryError); return -1; } } - segment->_elements[head & sync._mask].data = element; + segment->_elements[head & mask].data = element; sync._head.store (head + 1, std::memory_order_release); return 0; @@ -553,10 +547,13 @@ namespace join * @param segment shared memory segment. * @param elements pointer to the first element. * @param size number of elements to push. - * @param cachedTail producer-side cached tail index. + * @param cachedTail producer-side cached index. + * @param capacity memory segment capacity. + * @param mask bit mask for fast modulo. * @return number of elements successfully pushed, -1 otherwise. */ - static ssize_t tryPush (Segment* segment, const Type* elements, size_t size, uint64_t& cachedTail) noexcept + static ssize_t tryPush (Segment* segment, const Type* elements, size_t size, uint64_t& cachedTail, + uint64_t capacity, uint64_t mask) noexcept { if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0)) { @@ -566,12 +563,12 @@ namespace join auto& sync = segment->_sync; uint64_t head = sync._head.load (std::memory_order_relaxed); - uint64_t avail = sync._capacity - (head - cachedTail); + uint64_t avail = capacity - (head - cachedTail); if (JOIN_UNLIKELY (avail == 0)) { cachedTail = sync._tail.load (std::memory_order_acquire); - avail = sync._capacity - (head - cachedTail); + avail = capacity - (head - cachedTail); if (avail == 0) { lastError = make_error_code (Errc::TemporaryError); @@ -583,7 +580,7 @@ namespace join for (uint64_t i = 0; i < toWrite; ++i) { - segment->_elements[(head + i) & sync._mask].data = elements[i]; + segment->_elements[(head + i) & mask].data = elements[i]; } sync._head.store (head + toWrite, std::memory_order_release); @@ -595,10 +592,13 @@ namespace join * @brief try to pop element from the ring buffer. * @param segment shared memory segment. * @param element output element. - * @param cachedHead consumer-side cached head index. + * @param cachedHead consumer-side cached index. + * @param capacity memory segment capacity (not used). + * @param mask bit mask for fast modulo. * @return 0 on success, -1 otherwise. */ - static int tryPop (Segment* segment, Type& element, uint64_t& cachedHead) noexcept + static int tryPop (Segment* segment, Type& element, uint64_t& cachedHead, uint64_t /*capacity*/, + uint64_t mask) noexcept { if (JOIN_UNLIKELY (segment == nullptr)) { @@ -621,7 +621,7 @@ namespace join } } - element = segment->_elements[tail & sync._mask].data; + element = segment->_elements[tail & mask].data; sync._tail.store (tail + 1, std::memory_order_release); return 0; @@ -632,10 +632,13 @@ namespace join * @param segment shared memory segment. * @param elements pointer to the output buffer. * @param size maximum number of elements to pop. - * @param cachedHead consumer-side cached head index. + * @param cachedHead consumer-side cached index. + * @param capacity memory segment capacity (not used). + * @param mask bit mask for fast modulo. * @return number of elements successfully popped, -1 otherwise. */ - static ssize_t tryPop (Segment* segment, Type* elements, size_t size, uint64_t& cachedHead) noexcept + static ssize_t tryPop (Segment* segment, Type* elements, size_t size, uint64_t& cachedHead, + uint64_t /*capacity*/, uint64_t mask) noexcept { if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0)) { @@ -662,7 +665,7 @@ namespace join for (uint64_t i = 0; i < toRead; ++i) { - elements[i] = segment->_elements[(tail + i) & sync._mask].data; + elements[i] = segment->_elements[(tail + i) & mask].data; } sync._tail.store (tail + toRead, std::memory_order_release); @@ -684,10 +687,13 @@ namespace join * @brief try to push element into the ring buffer. * @param segment shared memory segment. * @param element element to push. - * @param unused unused cache parameter. + * @param cachedTail producer-side cached index (not used). + * @param capacity memory segment capacity (not used). + * @param mask bit mask for fast modulo. * @return 0 on success, -1 otherwise. */ - static int tryPush (Segment* segment, const Type& element, uint64_t& /*unused*/) noexcept + static int tryPush (Segment* segment, const Type& element, uint64_t& /*cachedTail*/, uint64_t /*capacity*/, + uint64_t mask) noexcept { if (JOIN_UNLIKELY (segment == nullptr)) { @@ -703,7 +709,7 @@ namespace join for (;;) { - auto* slot = &segment->_elements[head & sync._mask]; + auto* slot = &segment->_elements[head & mask]; uint64_t seq = slot->_seq.load (std::memory_order_acquire); if (seq == head) @@ -734,10 +740,13 @@ namespace join * @param segment shared memory segment. * @param elements pointer to the first element. * @param size number of elements to push. - * @param unused unused cache parameter. + * @param cachedTail producer-side cached index (not used). + * @param capacity memory segment capacity. + * @param mask bit mask for fast modulo. * @return number of elements successfully pushed, -1 otherwise. */ - static ssize_t tryPush (Segment* segment, const Type* elements, size_t size, uint64_t& /*unused*/) noexcept + static ssize_t tryPush (Segment* segment, const Type* elements, size_t size, uint64_t& /*cachedTail*/, + uint64_t capacity, uint64_t mask) noexcept { if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0)) { @@ -752,7 +761,7 @@ namespace join for (;;) { uint64_t tail = sync._tail.load (std::memory_order_acquire); - uint64_t toWrite = std::min (static_cast (size), sync._capacity - (head - tail)); + uint64_t toWrite = std::min (static_cast (size), capacity - (head - tail)); if (JOIN_UNLIKELY (toWrite == 0)) { @@ -765,12 +774,7 @@ namespace join { for (uint64_t i = 0; i < toWrite; ++i) { - auto* slot = &segment->_elements[(head + i) & sync._mask]; - Backoff slotBackoff; - while (slot->_seq.load (std::memory_order_acquire) != head + i) - { - slotBackoff (); // LCOV_EXCL_LINE - } + auto* slot = &segment->_elements[(head + i) & mask]; slot->data = elements[i]; slot->_seq.store (head + i + 1, std::memory_order_release); } @@ -786,10 +790,13 @@ namespace join * @brief try to pop element from the ring buffer. * @param segment shared memory segment. * @param element output element. - * @param unused unused cache parameter. + * @param cachedHead consumer-side cached index (not used). + * @param capacity memory segment capacity. + * @param mask bit mask for fast modulo. * @return 0 on success, -1 otherwise. */ - static int tryPop (Segment* segment, Type& element, uint64_t& /*unused*/) noexcept + static int tryPop (Segment* segment, Type& element, uint64_t& /*cachedHead*/, uint64_t capacity, + uint64_t mask) noexcept { if (JOIN_UNLIKELY (segment == nullptr)) { @@ -801,7 +808,7 @@ namespace join auto& sync = segment->_sync; uint64_t tail = sync._tail.load (std::memory_order_relaxed); - auto* slot = &segment->_elements[tail & sync._mask]; + auto* slot = &segment->_elements[tail & mask]; uint64_t seq = slot->_seq.load (std::memory_order_acquire); if (JOIN_UNLIKELY (seq != tail + 1)) @@ -811,7 +818,7 @@ namespace join } element = slot->data; - slot->_seq.store (tail + sync._capacity, std::memory_order_release); + slot->_seq.store (tail + capacity, std::memory_order_release); sync._tail.store (tail + 1, std::memory_order_release); return 0; @@ -822,10 +829,13 @@ namespace join * @param segment shared memory segment. * @param elements pointer to the output buffer. * @param size maximum number of elements to pop. - * @param unused unused cache parameter. + * @param cachedHead consumer-side cached index (not used). + * @param capacity memory segment capacity. + * @param mask bit mask for fast modulo. * @return number of elements successfully popped, -1 otherwise. */ - static ssize_t tryPop (Segment* segment, Type* elements, size_t size, uint64_t& /*unused*/) noexcept + static ssize_t tryPop (Segment* segment, Type* elements, size_t size, uint64_t& /*cachedHead*/, + uint64_t capacity, uint64_t mask) noexcept { if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0)) { @@ -841,7 +851,7 @@ namespace join for (uint64_t i = 0; i < toRead; ++i) { - auto* slot = &segment->_elements[(tail + i) & sync._mask]; + auto* slot = &segment->_elements[(tail + i) & mask]; if (JOIN_UNLIKELY (slot->_seq.load (std::memory_order_acquire) != tail + i + 1)) { @@ -849,7 +859,7 @@ namespace join } elements[i] = slot->data; - slot->_seq.store (tail + i + sync._capacity, std::memory_order_release); + slot->_seq.store (tail + i + capacity, std::memory_order_release); ++popped; } @@ -877,12 +887,15 @@ namespace join * @brief try to push element into the ring buffer. * @param segment shared memory segment. * @param element element to push. - * @param unused unused cache parameter. + * @param cachedTail producer-side cached index. + * @param capacity memory segment capacity. + * @param mask bit mask for fast modulo. * @return 0 on success, -1 otherwise. */ - static int tryPush (Segment* segment, const Type& element, uint64_t& unused) noexcept + static int tryPush (Segment* segment, const Type& element, uint64_t& cachedTail, uint64_t capacity, + uint64_t mask) noexcept { - return Mpsc::tryPush (segment, element, unused); + return Mpsc::tryPush (segment, element, cachedTail, capacity, mask); } /** @@ -890,22 +903,68 @@ namespace join * @param segment shared memory segment. * @param elements pointer to the first element. * @param size number of elements to push. - * @param unused unused cache parameter. + * @param cachedTail producer-side cached index (not used). + * @param capacity memory segment capacity. + * @param mask bit mask for fast modulo. * @return number of elements successfully pushed, -1 otherwise. */ - static ssize_t tryPush (Segment* segment, const Type* elements, size_t size, uint64_t& unused) noexcept + static ssize_t tryPush (Segment* segment, const Type* elements, size_t size, uint64_t& /*cachedTail*/, + uint64_t capacity, uint64_t mask) noexcept { - return Mpsc::tryPush (segment, elements, size, unused); + if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0)) + { + lastError = make_error_code (Errc::InvalidParam); + return -1; + } + + Backoff backoff; + auto& sync = segment->_sync; + uint64_t head = sync._head.load (std::memory_order_relaxed); + + for (;;) + { + uint64_t tail = sync._tail.load (std::memory_order_acquire); + uint64_t toWrite = std::min (static_cast (size), capacity - (head - tail)); + + if (JOIN_UNLIKELY (toWrite == 0)) + { + lastError = make_error_code (Errc::TemporaryError); + return -1; + } + + if (JOIN_LIKELY (sync._head.compare_exchange_weak (head, head + toWrite, std::memory_order_acquire, + std::memory_order_relaxed))) + { + for (uint64_t i = 0; i < toWrite; ++i) + { + auto* slot = &segment->_elements[(head + i) & mask]; + Backoff slotBackoff; + while (slot->_seq.load (std::memory_order_acquire) != head + i) + { + slotBackoff (); // LCOV_EXCL_LINE + } + slot->data = elements[i]; + slot->_seq.store (head + i + 1, std::memory_order_release); + } + + return static_cast (toWrite); + } + + backoff (); // LCOV_EXCL_LINE + } } /** * @brief try to pop element from the ring buffer. * @param segment shared memory segment. * @param element output element. - * @param unused unused cache parameter. + * @param cachedHead consumre-side cached index (not used). + * @param capacity memory segment capacity. + * @param mask bit mask for fast modulo. * @return 0 on success, -1 otherwise. */ - static int tryPop (Segment* segment, Type& element, uint64_t& /*unused*/) noexcept + static int tryPop (Segment* segment, Type& element, uint64_t& /*cachedHead*/, uint64_t capacity, + uint64_t mask) noexcept { if (JOIN_UNLIKELY (segment == nullptr)) { @@ -921,7 +980,7 @@ namespace join for (;;) { - auto* slot = &segment->_elements[tail & sync._mask]; + auto* slot = &segment->_elements[tail & mask]; uint64_t seq = slot->_seq.load (std::memory_order_acquire); if (seq == (tail + 1)) @@ -931,7 +990,7 @@ namespace join std::memory_order_relaxed))) { element = local; - slot->_seq.store (tail + sync._capacity, std::memory_order_release); + slot->_seq.store (tail + capacity, std::memory_order_release); return 0; } } @@ -953,10 +1012,13 @@ namespace join * @param segment shared memory segment. * @param elements pointer to the output buffer. * @param size maximum number of elements to pop. - * @param unused unused cache parameter. + * @param cachedHead consumre-side cached index (not used). + * @param capacity memory segment capacity. + * @param mask bit mask for fast modulo. * @return number of elements successfully popped, -1 otherwise. */ - static ssize_t tryPop (Segment* segment, Type* elements, size_t size, uint64_t& /*unused*/) noexcept + static ssize_t tryPop (Segment* segment, Type* elements, size_t size, uint64_t& /*cachedHead*/, + uint64_t capacity, uint64_t mask) noexcept { if (JOIN_UNLIKELY (segment == nullptr || elements == nullptr || size == 0)) { @@ -982,7 +1044,7 @@ namespace join uint64_t ready = 0; for (; ready < toRead; ++ready) { - auto* slot = &segment->_elements[(tail + ready) & sync._mask]; + auto* slot = &segment->_elements[(tail + ready) & mask]; if (JOIN_UNLIKELY (slot->_seq.load (std::memory_order_acquire) != tail + ready + 1)) { break; @@ -1001,8 +1063,8 @@ namespace join { for (uint64_t i = 0; i < ready; ++i) { - segment->_elements[(tail + i) & sync._mask]._seq.store (tail + i + sync._capacity, - std::memory_order_release); + segment->_elements[(tail + i) & mask]._seq.store (tail + i + capacity, + std::memory_order_release); } return static_cast (ready); diff --git a/core/tests/shmmpmc_test.cpp b/core/tests/shmmpmc_test.cpp index f2b8a06d..f7d2e3de 100644 --- a/core/tests/shmmpmc_test.cpp +++ b/core/tests/shmmpmc_test.cpp @@ -65,15 +65,6 @@ class ShmMpmc : public ::testing::Test const std::string ShmMpmc::_name = "/test_mpmc_shm"; -/** - * @brief test create. - */ -TEST_F (ShmMpmc, create) -{ - ShmMem::Mpmc::Queue prod1 (0, _name); - ASSERT_THROW (ShmMem::Mpmc::Queue (2, _name), std::runtime_error); -} - /** * @brief test tryPush. */ diff --git a/core/tests/shmmpsc_test.cpp b/core/tests/shmmpsc_test.cpp index 26fbdaec..807edcc3 100644 --- a/core/tests/shmmpsc_test.cpp +++ b/core/tests/shmmpsc_test.cpp @@ -65,15 +65,6 @@ class ShmMpsc : public ::testing::Test const std::string ShmMpsc::_name = "/test_mpsc_shm"; -/** - * @brief test create. - */ -TEST_F (ShmMpsc, create) -{ - ShmMem::Mpsc::Queue prod1 (0, _name); - ASSERT_THROW (ShmMem::Mpsc::Queue (2, _name), std::runtime_error); -} - /** * @brief test tryPush. */ diff --git a/core/tests/shmspsc_test.cpp b/core/tests/shmspsc_test.cpp index 1cceae9b..0ec0a69f 100644 --- a/core/tests/shmspsc_test.cpp +++ b/core/tests/shmspsc_test.cpp @@ -66,15 +66,6 @@ class ShmSpsc : public ::testing::Test const std::string ShmSpsc::_name = "/test_spsc_shm"; -/** - * @brief test create. - */ -TEST_F (ShmSpsc, create) -{ - ShmMem::Spsc::Queue prod1 (0, _name); - ASSERT_THROW (ShmMem::Spsc::Queue (2, _name), std::runtime_error); -} - /** * @brief test tryPush. */