Skip to content

Commit 000f8bc

Browse files
authored
Merge pull request #86 from orxfun/support-for-concurrent-queue
Support for concurrent queue
2 parents 6f13132 + b7d92a4 commit 000f8bc

10 files changed

Lines changed: 282 additions & 10 deletions

File tree

.github/workflows/ci.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ jobs:
1818
toolchain: ["stable"]
1919

2020
steps:
21-
- uses: actions/checkout@v4
21+
- uses: actions/checkout@v5
2222

2323
- name: Install toolchain
24-
uses: dtolnay/rust-toolchain@master
24+
uses: dtolnay/rust-toolchain@stable
2525
with:
2626
toolchain: ${{ matrix.toolchain }}
2727

28+
- name: Install clippy
29+
run: rustup component add clippy
2830
- name: Install 32bit target
2931
run: rustup target add i686-unknown-linux-musl
3032
- name: Install wasm target

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "orx-split-vec"
3-
version = "3.19.0"
3+
version = "3.20.0"
44
edition = "2024"
55
authors = ["orxfun <orx.ugur.arikan@gmail.com>"]
66
description = "An efficient dynamic capacity vector with pinned element guarantees."
@@ -12,7 +12,7 @@ categories = ["data-structures", "rust-patterns", "no-std"]
1212
[dependencies]
1313
orx-iterable = { version = "1.3.0", default-features = false }
1414
orx-pseudo-default = { version = "2.1.0", default-features = false }
15-
orx-pinned-vec = { version = "3.17.0", default-features = false }
15+
orx-pinned-vec = { version = "3.18.0", default-features = false }
1616
orx-concurrent-iter = { version = "3.1.0", default-features = false }
1717

1818
[[bench]]
@@ -24,4 +24,4 @@ criterion = "0.7.0"
2424
rand = { version = "0.9.2", default-features = false }
2525
rand_chacha = { version = "0.9", default-features = false }
2626
test-case = "3.3.1"
27-
orx-concurrent-bag = "3.0.0"
27+
# orx-concurrent-bag = "3.1.0"

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ Naturally, it has certain specific differences and operations. For instance, we
108108

109109
```rust
110110
use orx_split_vec::*;
111+
use orx_pseudo_default::PseudoDefault;
111112

112113
#[derive(Clone)]
113114
struct MyCustomGrowth;
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
mod con_iter;
2-
mod into;
1+
// mod con_iter;
2+
// mod into;
33
mod par;
44
mod transformations;
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
mod con_iter;
2-
mod into;
1+
// mod con_iter;
2+
// mod into;
33
mod par;
44
mod transformations;
Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use crate::{
22
Doubling, Fragment, GrowthWithConstantTimeAccess, SplitVec,
33
common_traits::iterator::{IterOfSlicesOfCon, SliceBorrowAsMut, SliceBorrowAsRef},
4+
concurrent_pinned_vec::iter_ptr::IterPtrOfCon,
45
fragment::transformations::{fragment_from_raw, fragment_into_raw},
56
};
67
use alloc::vec::Vec;
7-
use core::cell::UnsafeCell;
88
use core::ops::RangeBounds;
99
use core::sync::atomic::{AtomicUsize, Ordering};
10+
use core::{cell::UnsafeCell, ops::Range};
1011
use orx_pinned_vec::ConcurrentPinnedVec;
1112

1213
struct FragmentData {
@@ -189,6 +190,11 @@ impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSp
189190
where
190191
Self: 'a;
191192

193+
type PtrIter<'a>
194+
= IterPtrOfCon<'a, T, G>
195+
where
196+
Self: 'a;
197+
192198
unsafe fn into_inner(mut self, len: usize) -> Self::P {
193199
let mut fragments = Vec::with_capacity(self.max_num_fragments);
194200
let mut take_fragment = |fragment| fragments.push(fragment);
@@ -424,4 +430,8 @@ impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSp
424430
self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
425431
self.pinned_vec_len = 0;
426432
}
433+
434+
unsafe fn ptr_iter_unchecked(&self, range: Range<usize>) -> Self::PtrIter<'_> {
435+
IterPtrOfCon::new(self.capacity(), &self.data, self.growth.clone(), range)
436+
}
427437
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use crate::{
2+
GrowthWithConstantTimeAccess, concurrent_pinned_vec::iter_ptr_slices::IterPtrOfConSlices,
3+
};
4+
use core::{cell::UnsafeCell, ops::Range};
5+
6+
pub struct IterPtrOfCon<'a, T, G>
7+
where
8+
G: GrowthWithConstantTimeAccess,
9+
{
10+
slices: IterPtrOfConSlices<'a, T, G>,
11+
len_of_remaining_slices: usize,
12+
current_ptr: *const T,
13+
current_last: *const T,
14+
}
15+
16+
impl<'a, T, G> IterPtrOfCon<'a, T, G>
17+
where
18+
G: GrowthWithConstantTimeAccess,
19+
{
20+
pub fn new(
21+
capacity: usize,
22+
fragments: &'a [UnsafeCell<*mut T>],
23+
growth: G,
24+
range: Range<usize>,
25+
) -> Self {
26+
let len_of_remaining_slices = range.len();
27+
let slices = IterPtrOfConSlices::new(capacity, fragments, growth, range);
28+
Self {
29+
slices,
30+
len_of_remaining_slices,
31+
current_ptr: core::ptr::null(),
32+
current_last: core::ptr::null(),
33+
}
34+
}
35+
36+
fn remaining(&self) -> usize {
37+
let remaining_current = match self.current_ptr.is_null() {
38+
true => 0,
39+
// SAFETY: whenever current_ptr is not null, we know that current_last is also not
40+
// null which is >= current_ptr.
41+
false => unsafe { self.current_last.offset_from(self.current_ptr) as usize + 1 },
42+
};
43+
44+
self.len_of_remaining_slices + remaining_current
45+
}
46+
47+
fn next_slice(&mut self) -> Option<*mut T> {
48+
self.slices.next().and_then(|(ptr, len)| {
49+
debug_assert!(len > 0);
50+
self.len_of_remaining_slices -= len;
51+
// SAFETY: pointers are not null since slice is not empty
52+
self.current_ptr = ptr;
53+
self.current_last = unsafe { ptr.add(len - 1) };
54+
self.next()
55+
})
56+
}
57+
}
58+
59+
impl<'a, T, G> Iterator for IterPtrOfCon<'a, T, G>
60+
where
61+
G: GrowthWithConstantTimeAccess,
62+
{
63+
type Item = *mut T;
64+
65+
fn next(&mut self) -> Option<Self::Item> {
66+
match self.current_ptr {
67+
ptr if ptr.is_null() => self.next_slice(),
68+
ptr if ptr == self.current_last => {
69+
self.current_ptr = core::ptr::null_mut();
70+
Some(ptr as *mut T)
71+
}
72+
ptr => {
73+
// SAFETY: current_ptr is not the last element, hance current_ptr+1 is in bounds
74+
self.current_ptr = unsafe { self.current_ptr.add(1) };
75+
76+
// SAFETY: ptr is valid and its value can be taken.
77+
// Drop will skip this position which is now uninitialized.
78+
Some(ptr as *mut T)
79+
}
80+
}
81+
}
82+
83+
fn size_hint(&self) -> (usize, Option<usize>) {
84+
let len = self.remaining();
85+
(len, Some(len))
86+
}
87+
}
88+
89+
impl<'a, T, G> ExactSizeIterator for IterPtrOfCon<'a, T, G>
90+
where
91+
G: GrowthWithConstantTimeAccess,
92+
{
93+
fn len(&self) -> usize {
94+
self.remaining()
95+
}
96+
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
use crate::{
2+
GrowthWithConstantTimeAccess,
3+
range_helpers::{range_end, range_start},
4+
};
5+
use core::cmp::min;
6+
use core::{cell::UnsafeCell, iter::FusedIterator, ops::Range};
7+
8+
pub struct IterPtrOfConSlices<'a, T, G>
9+
where
10+
G: GrowthWithConstantTimeAccess,
11+
{
12+
fragments: &'a [UnsafeCell<*mut T>],
13+
growth: G,
14+
sf: usize,
15+
si: usize,
16+
si_end: usize,
17+
ef: usize,
18+
ei: usize,
19+
f: usize,
20+
}
21+
22+
impl<'a, T, G> IterPtrOfConSlices<'a, T, G>
23+
where
24+
G: GrowthWithConstantTimeAccess,
25+
{
26+
fn empty() -> Self {
27+
Self {
28+
fragments: &[],
29+
growth: G::pseudo_default(),
30+
sf: 0,
31+
si: 0,
32+
si_end: 0,
33+
ef: 0,
34+
ei: 0,
35+
f: 1,
36+
}
37+
}
38+
39+
fn single_slice(
40+
fragments: &'a [UnsafeCell<*mut T>],
41+
growth: G,
42+
f: usize,
43+
begin: usize,
44+
end: usize,
45+
) -> Self {
46+
Self {
47+
fragments,
48+
growth,
49+
sf: f,
50+
si: begin,
51+
si_end: end,
52+
ef: f,
53+
ei: 0,
54+
f,
55+
}
56+
}
57+
58+
pub fn new(
59+
capacity: usize,
60+
fragments: &'a [UnsafeCell<*mut T>],
61+
growth: G,
62+
range: Range<usize>,
63+
) -> Self {
64+
let fragment_and_inner_indices = |i| growth.get_fragment_and_inner_indices_unchecked(i);
65+
66+
let a = range_start(&range);
67+
let b = min(capacity, range_end(&range, capacity));
68+
69+
match b.saturating_sub(a) {
70+
0 => Self::empty(),
71+
_ => {
72+
let (sf, si) = fragment_and_inner_indices(a);
73+
let (ef, ei) = fragment_and_inner_indices(b - 1);
74+
75+
match sf == ef {
76+
true => Self::single_slice(fragments, growth, sf, si, ei + 1),
77+
false => {
78+
let si_end = growth.fragment_capacity_of(sf);
79+
Self {
80+
fragments,
81+
growth,
82+
sf,
83+
si,
84+
si_end,
85+
ef,
86+
ei,
87+
f: sf,
88+
}
89+
}
90+
}
91+
}
92+
}
93+
}
94+
95+
#[inline(always)]
96+
fn remaining_len(&self) -> usize {
97+
(1 + self.ef).saturating_sub(self.f)
98+
}
99+
100+
#[inline(always)]
101+
fn get_ptr_fi(&self, f: usize, i: usize) -> *mut T {
102+
let p = unsafe { *self.fragments[f].get() };
103+
unsafe { p.add(i) }
104+
}
105+
106+
#[inline(always)]
107+
fn capacity_of(&self, f: usize) -> usize {
108+
self.growth.fragment_capacity_of(f)
109+
}
110+
}
111+
112+
impl<'a, T, G> Iterator for IterPtrOfConSlices<'a, T, G>
113+
where
114+
G: GrowthWithConstantTimeAccess,
115+
{
116+
type Item = (*mut T, usize);
117+
118+
fn next(&mut self) -> Option<Self::Item> {
119+
match self.f {
120+
f if f == self.sf => {
121+
self.f += 1;
122+
let len = self.si_end - self.si;
123+
let p = self.get_ptr_fi(self.sf, self.si);
124+
Some((p, len))
125+
}
126+
f if f < self.ef => {
127+
self.f += 1;
128+
let len = self.capacity_of(f);
129+
let p = self.get_ptr_fi(f, 0);
130+
Some((p, len))
131+
}
132+
f if f == self.ef => {
133+
self.f += 1;
134+
let len = self.ei + 1;
135+
let p = self.get_ptr_fi(self.ef, 0);
136+
Some((p, len))
137+
}
138+
_ => None,
139+
}
140+
}
141+
142+
fn size_hint(&self) -> (usize, Option<usize>) {
143+
let len = self.remaining_len();
144+
(len, Some(len))
145+
}
146+
}
147+
148+
impl<'a, T, G> FusedIterator for IterPtrOfConSlices<'a, T, G> where G: GrowthWithConstantTimeAccess {}
149+
150+
impl<'a, T, G> ExactSizeIterator for IterPtrOfConSlices<'a, T, G>
151+
where
152+
G: GrowthWithConstantTimeAccess,
153+
{
154+
fn len(&self) -> usize {
155+
self.remaining_len()
156+
}
157+
}

src/concurrent_pinned_vec/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
mod con_pinvec;
2+
mod iter_ptr;
3+
mod iter_ptr_slices;
4+
5+
pub use con_pinvec::ConcurrentSplitVec;

src/new_split_vec/new.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ where
3232
///
3333
/// ```
3434
/// use orx_split_vec::*;
35+
/// use orx_pseudo_default::PseudoDefault;
3536
///
3637
/// #[derive(Clone)]
3738
/// pub struct DoubleEverySecondFragment(usize); // any custom growth strategy

0 commit comments

Comments
 (0)