Skip to content

Commit

Permalink
Implement parallel sources over ranges.
Browse files Browse the repository at this point in the history
  • Loading branch information
gendx committed Nov 8, 2024
1 parent 3589515 commit ee3d2ed
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ rust-version = "1.75.0"
default = []
log = ["dep:log"]
log_parallelism = ["log"]
nightly = []
nightly_tests = []

[dependencies]
Expand Down
32 changes: 21 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
[![Build Status](https://github.com/gendx/paralight/actions/workflows/build.yml/badge.svg?branch=main)](https://github.com/gendx/paralight/actions/workflows/build.yml)
[![Test Status](https://github.com/gendx/paralight/actions/workflows/tests.yml/badge.svg?branch=main)](https://github.com/gendx/paralight/actions/workflows/tests.yml)

This library allows you to distribute computation over slices among multiple
threads. Each thread processes a subset of the items, and a final step reduces
the outputs from all threads into a single result.
This library allows you to distribute computation over slices (and other
*indexed* sources) among multiple threads. Each thread processes a subset of the
items, and a final step reduces the outputs from all threads into a single
result.

```rust
use paralight::iter::{
Expand Down Expand Up @@ -51,9 +52,10 @@ let mut output = [0; 10];
assert_eq!(output, [12, 14, 16, 18, 20, 22, 24, 26, 28, 30]);
```

Note: In principle, Paralight could be extended to support other inputs than
slices as long as they are *indexed*, but for now only slices are supported.
Come back to check when future versions are published!
Paralight currently supports inputs that are a combination of slices and ranges,
but can be extended to support other sources as long as they are *indexed*. This
is done via the [`ParallelSource`](iter::ParallelSource) and
[`IntoParallelSource`](iter::IntoParallelSource) traits.

## Thread pool configuration

Expand Down Expand Up @@ -240,7 +242,7 @@ With the [`WorkStealing`](RangeStrategy::WorkStealing) strategy, inputs with
more than [`u32::MAX`](u32::MAX) elements are currently not supported.

```rust,should_panic
use paralight::iter::{IntoParallelRefSource, ParallelIteratorExt, WithThreadPool};
use paralight::iter::{IntoParallelSource, ParallelIteratorExt, WithThreadPool};
use paralight::{CpuPinningPolicy, RangeStrategy, ThreadCount, ThreadPoolBuilder};
let mut thread_pool = ThreadPoolBuilder {
Expand All @@ -250,11 +252,9 @@ let mut thread_pool = ThreadPoolBuilder {
}
.build();
let input = vec![0u8; 5_000_000_000];
let _sum = input
.par_iter()
let _sum = (0..5_000_000_000_usize)
.into_par_iter()
.with_thread_pool(&mut thread_pool)
.copied()
.reduce(|| 0, |x, y| x + y);
```

Expand All @@ -275,6 +275,16 @@ If you're concerned that these indices leak too much information about your
data, you need to make sure that you depend on Paralight with the `log` and
`log_parallelism` features disabled.

## Experimental nightly APIs

Some experimental APIs are available under the `nightly` Cargo feature, for
users who compile with a
[nightly](https://rust-lang.github.io/rustup/concepts/channels.html#working-with-nightly-rust)
Rust toolchain. As the underlying implementation is based on
[experimental features](https://doc.rust-lang.org/unstable-book/) of the Rust
language, these APIs are provided without guarantee and may break at any time
when a new nightly toolchain is released.

## Disclaimer

This is not an officially supported Google product.
Expand Down
1 change: 1 addition & 0 deletions src/iter/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

pub mod range;
pub mod slice;
pub mod zip;

Expand Down
151 changes: 151 additions & 0 deletions src/iter/source/range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use super::{IntoParallelSource, ParallelSource, SourceDescriptor};
#[cfg(feature = "nightly")]
use std::iter::Step;
use std::ops::{Range, RangeInclusive};

/// A parallel source over a [`Range`]. This struct is created by the
/// [`into_par_iter()`](IntoParallelSource::into_par_iter) method on
/// [`IntoParallelSource`].
#[must_use = "iterator adaptors are lazy"]
pub struct RangeParallelSource<T> {
range: Range<T>,
}

#[cfg(feature = "nightly")]
impl<T: Step + Copy + Send + Sync> IntoParallelSource for Range<T> {
type Item = T;
type Source = RangeParallelSource<T>;

fn into_par_iter(self) -> Self::Source {
RangeParallelSource { range: self }
}
}

#[cfg(feature = "nightly")]
impl<T: Step + Copy + Send + Sync> ParallelSource for RangeParallelSource<T> {
type Item = T;

fn descriptor(self) -> SourceDescriptor<Self::Item, impl Fn(usize) -> Self::Item + Sync> {
let range = self.range;
let len = T::steps_between(&range.start, &range.end).unwrap_or_else(|| {
panic!(
"cannot iterate over a range with more than usize::MAX ({}) items",
usize::MAX
);
});
SourceDescriptor {
len,
fetch_item: move |index| T::forward(range.start, index),
}
}
}

#[cfg(not(feature = "nightly"))]
impl IntoParallelSource for Range<usize> {
type Item = usize;
type Source = RangeParallelSource<usize>;

fn into_par_iter(self) -> Self::Source {
RangeParallelSource { range: self }
}
}

#[cfg(not(feature = "nightly"))]
impl ParallelSource for RangeParallelSource<usize> {
type Item = usize;

fn descriptor(self) -> SourceDescriptor<Self::Item, impl Fn(usize) -> Self::Item + Sync> {
let range = self.range;
let len = range
.end
.checked_sub(range.start)
.expect("cannot iterate over a backward range");
SourceDescriptor {
len,
fetch_item: move |index| range.start + index,
}
}
}

/// A parallel source over a [`RangeInclusive`]. This struct is created by the
/// [`into_par_iter()`](IntoParallelSource::into_par_iter) method on
/// [`IntoParallelSource`].
#[must_use = "iterator adaptors are lazy"]
pub struct RangeInclusiveParallelSource<T> {
range: RangeInclusive<T>,
}

#[cfg(feature = "nightly")]
impl<T: Step + Copy + Send + Sync> IntoParallelSource for RangeInclusive<T> {
type Item = T;
type Source = RangeInclusiveParallelSource<T>;

fn into_par_iter(self) -> Self::Source {
RangeInclusiveParallelSource { range: self }
}
}

#[cfg(feature = "nightly")]
impl<T: Step + Copy + Send + Sync> ParallelSource for RangeInclusiveParallelSource<T> {
type Item = T;

fn descriptor(self) -> SourceDescriptor<Self::Item, impl Fn(usize) -> Self::Item + Sync> {
let (start, end) = self.range.into_inner();
let len = T::steps_between(&start, &end).unwrap_or_else(|| {
panic!(
"cannot iterate over a range with more than usize::MAX ({}) items",
usize::MAX
);
});
let len = len.checked_add(1).unwrap_or_else(|| {
panic!(
"cannot iterate over a range with more than usize::MAX ({}) items",
usize::MAX
);
});
SourceDescriptor {
len,
fetch_item: move |index| T::forward(start, index),
}
}
}

#[cfg(not(feature = "nightly"))]
impl IntoParallelSource for RangeInclusive<usize> {
type Item = usize;
type Source = RangeInclusiveParallelSource<usize>;

fn into_par_iter(self) -> Self::Source {
RangeInclusiveParallelSource { range: self }
}
}

#[cfg(not(feature = "nightly"))]
impl ParallelSource for RangeInclusiveParallelSource<usize> {
type Item = usize;

fn descriptor(self) -> SourceDescriptor<Self::Item, impl Fn(usize) -> Self::Item + Sync> {
let (start, end) = self.range.into_inner();
let len = end
.checked_sub(start)
.expect("cannot iterate over a backward range");
let len = len.checked_add(1).unwrap_or_else(|| {
panic!(
"cannot iterate over a range with more than usize::MAX ({}) items",
usize::MAX
);
});
SourceDescriptor {
len,
fetch_item: move |index| start + index,
}
}
}
93 changes: 83 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)]
#![cfg_attr(not(test), forbid(clippy::undocumented_unsafe_blocks))]
#![cfg_attr(all(test, feature = "nightly_tests"), feature(negative_impls))]
#![cfg_attr(feature = "nightly", feature(step_trait))]

mod core;
pub mod iter;
Expand All @@ -26,8 +27,8 @@ pub use core::{CpuPinningPolicy, RangeStrategy, ThreadCount, ThreadPool, ThreadP
mod test {
use super::*;
use crate::iter::{
IntoParallelRefMutSource, IntoParallelRefSource, ParallelIterator, ParallelIteratorExt,
ParallelSourceExt, WithThreadPool, ZipableSource,
IntoParallelRefMutSource, IntoParallelRefSource, IntoParallelSource, ParallelIterator,
ParallelIteratorExt, ParallelSourceExt, WithThreadPool, ZipableSource,
};
use std::cell::Cell;
use std::collections::HashSet;
Expand Down Expand Up @@ -108,11 +109,17 @@ mod test {
test_pipeline_local_lifetime_input,
test_pipeline_local_lifetime_output,
test_pipeline_local_lifetime_accumulator,
test_source_par_iter,
test_source_slice,
#[cfg(feature = "nightly_tests")]
test_source_par_iter_not_send,
test_source_par_iter_mut,
test_source_par_iter_mut_not_sync,
test_source_slice_not_send,
test_source_slice_mut,
test_source_slice_mut_not_sync,
test_source_range,
#[cfg(feature = "nightly")]
test_source_range_u64,
test_source_range_inclusive,
#[cfg(feature = "nightly")]
test_source_range_inclusive_u64,
test_source_adaptor_chain,
test_source_adaptor_enumerate,
test_source_adaptor_rev,
Expand Down Expand Up @@ -789,7 +796,7 @@ mod test {
assert_eq!(sum, INPUT_LEN * (INPUT_LEN + 1) / 2);
}

fn test_source_par_iter(range_strategy: RangeStrategy) {
fn test_source_slice(range_strategy: RangeStrategy) {
let mut thread_pool = ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
range_strategy,
Expand All @@ -807,7 +814,7 @@ mod test {
}

#[cfg(feature = "nightly_tests")]
fn test_source_par_iter_not_send(range_strategy: RangeStrategy) {
fn test_source_slice_not_send(range_strategy: RangeStrategy) {
let mut thread_pool = ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
range_strategy,
Expand All @@ -824,7 +831,7 @@ mod test {
assert_eq!(sum, INPUT_LEN * (INPUT_LEN + 1) / 2);
}

fn test_source_par_iter_mut(range_strategy: RangeStrategy) {
fn test_source_slice_mut(range_strategy: RangeStrategy) {
let mut thread_pool = ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
range_strategy,
Expand All @@ -841,7 +848,7 @@ mod test {
assert_eq!(values, (0..=INPUT_LEN).map(|x| x * 2).collect::<Vec<_>>());
}

fn test_source_par_iter_mut_not_sync(range_strategy: RangeStrategy) {
fn test_source_slice_mut_not_sync(range_strategy: RangeStrategy) {
let mut thread_pool = ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
range_strategy,
Expand All @@ -863,6 +870,72 @@ mod test {
);
}

fn test_source_range(range_strategy: RangeStrategy) {
let mut thread_pool = ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
range_strategy,
cpu_pinning: CpuPinningPolicy::No,
}
.build();

let sum = (0..INPUT_LEN as usize)
.into_par_iter()
.with_thread_pool(&mut thread_pool)
.reduce(|| 0, |x, y| x + y);

assert_eq!(sum, (INPUT_LEN as usize - 1) * INPUT_LEN as usize / 2);
}

#[cfg(feature = "nightly")]
fn test_source_range_u64(range_strategy: RangeStrategy) {
let mut thread_pool = ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
range_strategy,
cpu_pinning: CpuPinningPolicy::No,
}
.build();

let sum = (0..INPUT_LEN)
.into_par_iter()
.with_thread_pool(&mut thread_pool)
.reduce(|| 0, |x, y| x + y);

assert_eq!(sum, (INPUT_LEN - 1) * INPUT_LEN / 2);
}

fn test_source_range_inclusive(range_strategy: RangeStrategy) {
let mut thread_pool = ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
range_strategy,
cpu_pinning: CpuPinningPolicy::No,
}
.build();

let sum = (0..=INPUT_LEN as usize)
.into_par_iter()
.with_thread_pool(&mut thread_pool)
.reduce(|| 0, |x, y| x + y);

assert_eq!(sum, INPUT_LEN as usize * (INPUT_LEN as usize + 1) / 2);
}

#[cfg(feature = "nightly")]
fn test_source_range_inclusive_u64(range_strategy: RangeStrategy) {
let mut thread_pool = ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
range_strategy,
cpu_pinning: CpuPinningPolicy::No,
}
.build();

let sum = (0..=INPUT_LEN)
.into_par_iter()
.with_thread_pool(&mut thread_pool)
.reduce(|| 0, |x, y| x + y);

assert_eq!(sum, INPUT_LEN * (INPUT_LEN + 1) / 2);
}

fn test_source_adaptor_chain(range_strategy: RangeStrategy) {
let mut thread_pool = ThreadPoolBuilder {
num_threads: ThreadCount::AvailableParallelism,
Expand Down

0 comments on commit ee3d2ed

Please sign in to comment.