Skip to content

Commit

Permalink
Add into_par_iter() implementations for Vec<T>, Box<[T]> and [T; N].
Browse files Browse the repository at this point in the history
  • Loading branch information
gendx committed Jan 2, 2025
1 parent 3f09534 commit 9277db6
Show file tree
Hide file tree
Showing 9 changed files with 1,191 additions and 155 deletions.
349 changes: 255 additions & 94 deletions src/core/range.rs

Large diffs are not rendered by default.

101 changes: 83 additions & 18 deletions src/core/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
//! A thread pool implementing parallelism at a lightweight cost.
use super::range::{
FixedRangeFactory, Range, RangeFactory, RangeOrchestrator, WorkStealingRangeFactory,
FixedRangeFactory, Range, RangeFactory, RangeOrchestrator, SkipIterator,
WorkStealingRangeFactory,
};
use super::sync::{make_lending_group, Borrower, Lender, WorkerState};
use super::util::LifetimeParameterized;
use crate::iter::Accumulator;
use crate::iter::{Accumulator, SourceCleanup};
use crate::macros::{log_debug, log_error, log_warn};
use crossbeam_utils::CachePadded;
// Platforms that support `libc::sched_setaffinity()`.
Expand Down Expand Up @@ -152,9 +153,10 @@ impl ThreadPool {
process_item: impl Fn(Accum, usize) -> ControlFlow<Accum, Accum> + Sync,
finalize: impl Fn(Accum) -> Output + Sync,
reduce: impl Fn(Output, Output) -> Output,
cleanup: impl SourceCleanup + Sync,
) -> Output {
self.inner
.upper_bounded_pipeline(input_len, init, process_item, finalize, reduce)
.upper_bounded_pipeline(input_len, init, process_item, finalize, reduce, cleanup)
}

/// Processes an input of the given length in parallel and returns the
Expand All @@ -164,8 +166,9 @@ impl ThreadPool {
input_len: usize,
accum: impl Accumulator<usize, Output> + Sync,
reduce: impl Accumulator<Output, Output>,
cleanup: impl SourceCleanup + Sync,
) -> Output {
self.inner.iter_pipeline(input_len, accum, reduce)
self.inner.iter_pipeline(input_len, accum, reduce, cleanup)
}
}

Expand Down Expand Up @@ -221,14 +224,25 @@ impl ThreadPoolEnum {
process_item: impl Fn(Accum, usize) -> ControlFlow<Accum, Accum> + Sync,
finalize: impl Fn(Accum) -> Output + Sync,
reduce: impl Fn(Output, Output) -> Output,
cleanup: impl SourceCleanup + Sync,
) -> Output {
match self {
ThreadPoolEnum::Fixed(inner) => {
inner.upper_bounded_pipeline(input_len, init, process_item, finalize, reduce)
}
ThreadPoolEnum::WorkStealing(inner) => {
inner.upper_bounded_pipeline(input_len, init, process_item, finalize, reduce)
}
ThreadPoolEnum::Fixed(inner) => inner.upper_bounded_pipeline(
input_len,
init,
process_item,
finalize,
reduce,
cleanup,
),
ThreadPoolEnum::WorkStealing(inner) => inner.upper_bounded_pipeline(
input_len,
init,
process_item,
finalize,
reduce,
cleanup,
),
}
}

Expand All @@ -239,10 +253,13 @@ impl ThreadPoolEnum {
input_len: usize,
accum: impl Accumulator<usize, Output> + Sync,
reduce: impl Accumulator<Output, Output>,
cleanup: impl SourceCleanup + Sync,
) -> Output {
match self {
ThreadPoolEnum::Fixed(inner) => inner.iter_pipeline(input_len, accum, reduce),
ThreadPoolEnum::WorkStealing(inner) => inner.iter_pipeline(input_len, accum, reduce),
ThreadPoolEnum::Fixed(inner) => inner.iter_pipeline(input_len, accum, reduce, cleanup),
ThreadPoolEnum::WorkStealing(inner) => {
inner.iter_pipeline(input_len, accum, reduce, cleanup)
}
}
}
}
Expand Down Expand Up @@ -370,6 +387,7 @@ impl<F: RangeFactory> ThreadPoolImpl<F> {
process_item: impl Fn(Accum, usize) -> ControlFlow<Accum, Accum> + Sync,
finalize: impl Fn(Accum) -> Output + Sync,
reduce: impl Fn(Output, Output) -> Output,
cleanup: impl SourceCleanup + Sync,
) -> Output {
self.range_orchestrator.reset_ranges(input_len);

Expand All @@ -385,6 +403,7 @@ impl<F: RangeFactory> ThreadPoolImpl<F> {
init,
process_item,
finalize,
cleanup,
});

outputs
Expand All @@ -401,6 +420,7 @@ impl<F: RangeFactory> ThreadPoolImpl<F> {
input_len: usize,
accum: impl Accumulator<usize, Output> + Sync,
reduce: impl Accumulator<Output, Output>,
cleanup: impl SourceCleanup + Sync,
) -> Output {
self.range_orchestrator.reset_ranges(input_len);

Expand All @@ -412,6 +432,7 @@ impl<F: RangeFactory> ThreadPoolImpl<F> {
self.pipeline.lend(&IterPipelineImpl {
outputs: outputs.clone(),
accum,
cleanup,
});

reduce.accumulate(
Expand Down Expand Up @@ -463,25 +484,32 @@ struct UpperBoundedPipelineImpl<
Init: Fn() -> Accum,
ProcessItem: Fn(Accum, usize) -> ControlFlow<Accum, Accum>,
Finalize: Fn(Accum) -> Output,
Cleanup: SourceCleanup,
> {
bound: CachePadded<AtomicUsize>,
outputs: Arc<[Mutex<Option<Output>>]>,
init: Init,
process_item: ProcessItem,
finalize: Finalize,
cleanup: Cleanup,
}

impl<R, Output, Accum, Init, ProcessItem, Finalize> Pipeline<R>
for UpperBoundedPipelineImpl<Output, Accum, Init, ProcessItem, Finalize>
impl<R, Output, Accum, Init, ProcessItem, Finalize, Cleanup> Pipeline<R>
for UpperBoundedPipelineImpl<Output, Accum, Init, ProcessItem, Finalize, Cleanup>
where
R: Range,
Init: Fn() -> Accum,
ProcessItem: Fn(Accum, usize) -> ControlFlow<Accum, Accum>,
Finalize: Fn(Accum) -> Output,
Cleanup: SourceCleanup,
{
fn run(&self, worker_id: usize, range: &R) {
let mut accumulator = (self.init)();
for i in range.upper_bounded_iter(&self.bound) {
let iter = SkipIteratorWrapper {
iter: range.upper_bounded_iter(&self.bound),
cleanup: &self.cleanup,
};
for i in iter {
let acc = (self.process_item)(accumulator, i);
accumulator = match acc {
ControlFlow::Continue(acc) => acc,
Expand All @@ -496,22 +524,59 @@ where
}
}

struct IterPipelineImpl<Output, Accum: Accumulator<usize, Output>> {
struct IterPipelineImpl<Output, Accum: Accumulator<usize, Output>, Cleanup: SourceCleanup> {
outputs: Arc<[Mutex<Option<Output>>]>,
accum: Accum,
cleanup: Cleanup,
}

impl<R, Output, Accum> Pipeline<R> for IterPipelineImpl<Output, Accum>
impl<R, Output, Accum, Cleanup> Pipeline<R> for IterPipelineImpl<Output, Accum, Cleanup>
where
R: Range,
Accum: Accumulator<usize, Output>,
Cleanup: SourceCleanup,
{
fn run(&self, worker_id: usize, range: &R) {
let output = self.accum.accumulate(range.iter());
let iter = SkipIteratorWrapper {
iter: range.iter(),
cleanup: &self.cleanup,
};
let output = self.accum.accumulate(iter);
*self.outputs[worker_id].lock().unwrap() = Some(output);
}
}

struct SkipIteratorWrapper<'a, I: SkipIterator, Cleanup: SourceCleanup> {
iter: I,
cleanup: &'a Cleanup,
}

impl<I: SkipIterator, Cleanup: SourceCleanup> Iterator for SkipIteratorWrapper<'_, I, Cleanup> {
type Item = usize;

fn next(&mut self) -> Option<Self::Item> {
loop {
match self.iter.next() {
(index, None) => return index,
(index, Some(skipped_range)) => {
self.cleanup.cleanup_item_range(skipped_range);
if index.is_some() {
return index;
}
}
}
}
}
}

impl<I: SkipIterator, Cleanup: SourceCleanup> Drop for SkipIteratorWrapper<'_, I, Cleanup> {
fn drop(&mut self) {
if let Some(range) = self.iter.remaining_range() {
self.cleanup.cleanup_item_range(range);
}
}
}

/// Context object owned by a worker thread.
struct ThreadContext<R: Range> {
/// Thread index.
Expand Down
3 changes: 2 additions & 1 deletion src/iter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
mod source;

use crossbeam_utils::CachePadded;
pub use source::owned_slice::{ArrayParallelSource, VecParallelSource};
pub use source::range::{RangeInclusiveParallelSource, RangeParallelSource};
pub use source::slice::{MutSliceParallelSource, SliceParallelSource};
pub use source::zip::{ZipEq, ZipMax, ZipMin, ZipableSource};
pub use source::{
IntoParallelRefMutSource, IntoParallelRefSource, IntoParallelSource, ParallelSource,
ParallelSourceExt, SourceDescriptor,
ParallelSourceExt, SourceCleanup, SourceDescriptor,
};
use std::cmp::Ordering;
use std::iter::{Product, Sum};
Expand Down
Loading

0 comments on commit 9277db6

Please sign in to comment.