Skip to content

Commit

Permalink
Add sources over VecDeque references (mutable or not).
Browse files Browse the repository at this point in the history
  • Loading branch information
gendx committed Jan 9, 2025
1 parent cd45976 commit 76917fb
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/iter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crossbeam_utils::CachePadded;
pub use source::range::{RangeInclusiveParallelSource, RangeParallelSource};
pub use source::slice::{MutSliceParallelSource, SliceParallelSource};
pub use source::vec::VecParallelSource;
pub use source::vec_deque::{VecDequeRefMutParallelSource, VecDequeRefParallelSource};
pub use source::zip::{ZipEq, ZipMax, ZipMin, ZipableSource};
pub use source::{
IntoParallelRefMutSource, IntoParallelRefSource, IntoParallelSource, ParallelSource,
Expand Down
1 change: 1 addition & 0 deletions src/iter/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
pub mod range;
pub mod slice;
pub mod vec;
pub mod vec_deque;
pub mod zip;

use super::{Accumulator, ParallelIterator};
Expand Down
123 changes: 123 additions & 0 deletions src/iter/source/vec_deque.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use super::{
IntoParallelRefMutSource, IntoParallelRefSource, ParallelSource, ParallelSourceExt,
SourceDescriptor,
};
use std::collections::VecDeque;

/// A parallel source over a reference to a [`VecDeque`]. This struct is created
/// by the [`par_iter()`](IntoParallelRefSource::par_iter) method on
/// [`IntoParallelRefSource`].
///
/// You most likely won't need to interact with this struct directly, as it
/// implements the [`ParallelSource`] and [`ParallelSourceExt`] traits, but it
/// is nonetheless public because of the `must_use` annotation.
///
/// See also [`VecDequeRefMutParallelSource`].
///
/// ```
/// # use paralight::iter::{
/// # IntoParallelRefSource, ParallelIteratorExt, ParallelSourceExt, VecDequeRefParallelSource,
/// # };
/// # use paralight::{CpuPinningPolicy, RangeStrategy, ThreadCount, ThreadPoolBuilder};
/// # use std::collections::VecDeque;
/// # let mut thread_pool = ThreadPoolBuilder {
/// # num_threads: ThreadCount::AvailableParallelism,
/// # range_strategy: RangeStrategy::WorkStealing,
/// # cpu_pinning: CpuPinningPolicy::No,
/// # }
/// # .build();
/// let input: VecDeque<_> = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].into_iter().collect();
/// let iter: VecDequeRefParallelSource<_> = input.par_iter();
/// let sum = iter.with_thread_pool(&mut thread_pool).sum::<i32>();
/// assert_eq!(sum, 5 * 11);
/// ```
#[must_use = "iterator adaptors are lazy"]
pub struct VecDequeRefParallelSource<'data, T> {
vec_deque: &'data VecDeque<T>,
}

impl<'data, T: Sync + 'data> IntoParallelRefSource<'data> for VecDeque<T> {
type Item = &'data T;
type Source = VecDequeRefParallelSource<'data, T>;

fn par_iter(&'data self) -> Self::Source {
VecDequeRefParallelSource { vec_deque: self }
}
}

impl<'data, T: Sync> ParallelSource for VecDequeRefParallelSource<'data, T> {
type Item = &'data T;

fn descriptor(self) -> impl SourceDescriptor<Item = Self::Item> + Sync {
let (first, second) = self.vec_deque.as_slices();
first.par_iter().chain(second.par_iter()).descriptor()
}
}

/// A parallel source over a mutable reference to a [`VecDeque`]. This struct is
/// created by the [`par_iter_mut()`](IntoParallelRefMutSource::par_iter_mut)
/// method on [`IntoParallelRefMutSource`].
///
/// You most likely won't need to interact with this struct directly, as it
/// implements the [`ParallelSource`] and [`ParallelSourceExt`] traits, but it
/// is nonetheless public because of the `must_use` annotation.
///
/// See also [`VecDequeRefParallelSource`].
///
/// ```
/// # use paralight::iter::{
/// # IntoParallelRefMutSource, ParallelIteratorExt, ParallelSourceExt,
/// # VecDequeRefMutParallelSource,
/// # };
/// # use paralight::{CpuPinningPolicy, RangeStrategy, ThreadCount, ThreadPoolBuilder};
/// # use std::collections::VecDeque;
/// # let mut thread_pool = ThreadPoolBuilder {
/// # num_threads: ThreadCount::AvailableParallelism,
/// # range_strategy: RangeStrategy::WorkStealing,
/// # cpu_pinning: CpuPinningPolicy::No,
/// # }
/// # .build();
/// let mut values: VecDeque<_> = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].into_iter().collect();
/// let iter: VecDequeRefMutParallelSource<_> = values.par_iter_mut();
/// iter.with_thread_pool(&mut thread_pool)
/// .for_each(|x| *x *= 2);
/// assert_eq!(
/// values,
/// [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
/// .into_iter()
/// .collect::<VecDeque<_>>()
/// );
/// ```
#[must_use = "iterator adaptors are lazy"]
pub struct VecDequeRefMutParallelSource<'data, T> {
vec_deque: &'data mut VecDeque<T>,
}

impl<'data, T: Send + 'data> IntoParallelRefMutSource<'data> for VecDeque<T> {
type Item = &'data mut T;
type Source = VecDequeRefMutParallelSource<'data, T>;

fn par_iter_mut(&'data mut self) -> Self::Source {
VecDequeRefMutParallelSource { vec_deque: self }
}
}

impl<'data, T: Send> ParallelSource for VecDequeRefMutParallelSource<'data, T> {
type Item = &'data mut T;

fn descriptor(self) -> impl SourceDescriptor<Item = Self::Item> + Sync {
let (first, second) = self.vec_deque.as_mut_slices();
first
.par_iter_mut()
.chain(second.par_iter_mut())
.descriptor()
}
}
76 changes: 75 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mod test {
};
use rand::Rng;
use std::cell::Cell;
use std::collections::HashSet;
use std::collections::{HashSet, VecDeque};
use std::rc::Rc;
use std::sync::atomic::AtomicU64;
#[cfg(all(not(miri), feature = "log"))]
Expand Down Expand Up @@ -140,6 +140,8 @@ mod test {
test_source_vec_panic => fail("worker thread(s) panicked!"),
test_source_vec_find_any_panic => fail("worker thread(s) panicked!"),
test_source_vec_find_first_panic => fail("worker thread(s) panicked!"),
test_source_vec_deque_ref,
test_source_vec_deque_ref_mut,
test_source_adaptor_chain,
test_source_adaptor_chain_cleanup,
test_source_adaptor_chain_overflow => fail("called chain() with sources that together produce more than usize::MAX items"),
Expand Down Expand Up @@ -1224,6 +1226,72 @@ mod test {
});
}

fn test_source_vec_deque_ref(range_strategy: RangeStrategy) {
let mut thread_pool = ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
range_strategy,
cpu_pinning: CpuPinningPolicy::No,
}
.build();

// Simple contiguous VecDeque.
let input = (0..=INPUT_LEN).collect::<VecDeque<u64>>();
assert!(vec_deque_is_contiguous(&input));

let sum = input
.par_iter()
.with_thread_pool(&mut thread_pool)
.sum::<u64>();
assert_eq!(sum, INPUT_LEN * (INPUT_LEN + 1) / 2);

// VecDeque split in 2 parts.
let mut input = (1..=INPUT_LEN).collect::<VecDeque<u64>>();
input.push_front(0);
assert!(!vec_deque_is_contiguous(&input));

let sum = input
.par_iter()
.with_thread_pool(&mut thread_pool)
.sum::<u64>();
assert_eq!(sum, INPUT_LEN * (INPUT_LEN + 1) / 2);
}

fn test_source_vec_deque_ref_mut(range_strategy: RangeStrategy) {
let mut thread_pool = ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
range_strategy,
cpu_pinning: CpuPinningPolicy::No,
}
.build();

// Simple contiguous VecDeque.
let mut values = (0..=INPUT_LEN).collect::<VecDeque<u64>>();
assert!(vec_deque_is_contiguous(&values));

values
.par_iter_mut()
.with_thread_pool(&mut thread_pool)
.for_each(|x| *x *= 2);
assert_eq!(
values,
(0..=INPUT_LEN).map(|x| x * 2).collect::<VecDeque<_>>()
);

// VecDeque split in 2 parts.
let mut values = (1..=INPUT_LEN).collect::<VecDeque<u64>>();
values.push_front(0);
assert!(!vec_deque_is_contiguous(&values));

values
.par_iter_mut()
.with_thread_pool(&mut thread_pool)
.for_each(|x| *x *= 2);
assert_eq!(
values,
(0..=INPUT_LEN).map(|x| x * 2).collect::<VecDeque<_>>()
);
}

fn test_source_adaptor_chain(range_strategy: RangeStrategy) {
let mut thread_pool = ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
Expand Down Expand Up @@ -3560,6 +3628,7 @@ mod test {
assert!(result.is_none());
}

/* Helper functions */
const fn expected_sum_lengths(max: u64) -> u64 {
if max < 10 {
max + 1
Expand All @@ -3586,6 +3655,11 @@ mod test {
}
}

fn vec_deque_is_contiguous<T>(v: &VecDeque<T>) -> bool {
let (left, right) = v.as_slices();
left.is_empty() || right.is_empty()
}

#[test]
fn test_expected_sum_lengths() {
assert_eq!(expected_sum_lengths(0), 1);
Expand Down

0 comments on commit 76917fb

Please sign in to comment.