From ee3d2ed03a0fd76f147e0b54354beaaaaecca668 Mon Sep 17 00:00:00 2001 From: Guillaume Endignoux Date: Fri, 8 Nov 2024 12:42:51 +0100 Subject: [PATCH] Implement parallel sources over ranges. --- Cargo.toml | 1 + README.md | 32 ++++++--- src/iter/source/mod.rs | 1 + src/iter/source/range.rs | 151 +++++++++++++++++++++++++++++++++++++++ src/lib.rs | 93 +++++++++++++++++++++--- 5 files changed, 257 insertions(+), 21 deletions(-) create mode 100644 src/iter/source/range.rs diff --git a/Cargo.toml b/Cargo.toml index 9aba4ea..44e4c57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ rust-version = "1.75.0" default = [] log = ["dep:log"] log_parallelism = ["log"] +nightly = [] nightly_tests = [] [dependencies] diff --git a/README.md b/README.md index df4cb02..05dfd03 100644 --- a/README.md +++ b/README.md @@ -10,9 +10,10 @@ [![Build Status](https://github.com/gendx/paralight/actions/workflows/build.yml/badge.svg?branch=main)](https://github.com/gendx/paralight/actions/workflows/build.yml) [![Test Status](https://github.com/gendx/paralight/actions/workflows/tests.yml/badge.svg?branch=main)](https://github.com/gendx/paralight/actions/workflows/tests.yml) -This library allows you to distribute computation over slices among multiple -threads. Each thread processes a subset of the items, and a final step reduces -the outputs from all threads into a single result. +This library allows you to distribute computation over slices (and other +*indexed* sources) among multiple threads. Each thread processes a subset of the +items, and a final step reduces the outputs from all threads into a single +result. ```rust use paralight::iter::{ @@ -51,9 +52,10 @@ let mut output = [0; 10]; assert_eq!(output, [12, 14, 16, 18, 20, 22, 24, 26, 28, 30]); ``` -Note: In principle, Paralight could be extended to support other inputs than -slices as long as they are *indexed*, but for now only slices are supported. -Come back to check when future versions are published! +Paralight currently supports inputs that are a combination of slices and ranges, +but can be extended to support other sources as long as they are *indexed*. This +is done via the [`ParallelSource`](iter::ParallelSource) and +[`IntoParallelSource`](iter::IntoParallelSource) traits. ## Thread pool configuration @@ -240,7 +242,7 @@ With the [`WorkStealing`](RangeStrategy::WorkStealing) strategy, inputs with more than [`u32::MAX`](u32::MAX) elements are currently not supported. ```rust,should_panic -use paralight::iter::{IntoParallelRefSource, ParallelIteratorExt, WithThreadPool}; +use paralight::iter::{IntoParallelSource, ParallelIteratorExt, WithThreadPool}; use paralight::{CpuPinningPolicy, RangeStrategy, ThreadCount, ThreadPoolBuilder}; let mut thread_pool = ThreadPoolBuilder { @@ -250,11 +252,9 @@ let mut thread_pool = ThreadPoolBuilder { } .build(); -let input = vec![0u8; 5_000_000_000]; -let _sum = input - .par_iter() +let _sum = (0..5_000_000_000_usize) + .into_par_iter() .with_thread_pool(&mut thread_pool) - .copied() .reduce(|| 0, |x, y| x + y); ``` @@ -275,6 +275,16 @@ If you're concerned that these indices leak too much information about your data, you need to make sure that you depend on Paralight with the `log` and `log_parallelism` features disabled. +## Experimental nightly APIs + +Some experimental APIs are available under the `nightly` Cargo feature, for +users who compile with a +[nightly](https://rust-lang.github.io/rustup/concepts/channels.html#working-with-nightly-rust) +Rust toolchain. As the underlying implementation is based on +[experimental features](https://doc.rust-lang.org/unstable-book/) of the Rust +language, these APIs are provided without guarantee and may break at any time +when a new nightly toolchain is released. + ## Disclaimer This is not an officially supported Google product. diff --git a/src/iter/source/mod.rs b/src/iter/source/mod.rs index df2976e..0824a48 100644 --- a/src/iter/source/mod.rs +++ b/src/iter/source/mod.rs @@ -6,6 +6,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +pub mod range; pub mod slice; pub mod zip; diff --git a/src/iter/source/range.rs b/src/iter/source/range.rs new file mode 100644 index 0000000..d349334 --- /dev/null +++ b/src/iter/source/range.rs @@ -0,0 +1,151 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use super::{IntoParallelSource, ParallelSource, SourceDescriptor}; +#[cfg(feature = "nightly")] +use std::iter::Step; +use std::ops::{Range, RangeInclusive}; + +/// A parallel source over a [`Range`]. This struct is created by the +/// [`into_par_iter()`](IntoParallelSource::into_par_iter) method on +/// [`IntoParallelSource`]. +#[must_use = "iterator adaptors are lazy"] +pub struct RangeParallelSource { + range: Range, +} + +#[cfg(feature = "nightly")] +impl IntoParallelSource for Range { + type Item = T; + type Source = RangeParallelSource; + + fn into_par_iter(self) -> Self::Source { + RangeParallelSource { range: self } + } +} + +#[cfg(feature = "nightly")] +impl ParallelSource for RangeParallelSource { + type Item = T; + + fn descriptor(self) -> SourceDescriptor Self::Item + Sync> { + let range = self.range; + let len = T::steps_between(&range.start, &range.end).unwrap_or_else(|| { + panic!( + "cannot iterate over a range with more than usize::MAX ({}) items", + usize::MAX + ); + }); + SourceDescriptor { + len, + fetch_item: move |index| T::forward(range.start, index), + } + } +} + +#[cfg(not(feature = "nightly"))] +impl IntoParallelSource for Range { + type Item = usize; + type Source = RangeParallelSource; + + fn into_par_iter(self) -> Self::Source { + RangeParallelSource { range: self } + } +} + +#[cfg(not(feature = "nightly"))] +impl ParallelSource for RangeParallelSource { + type Item = usize; + + fn descriptor(self) -> SourceDescriptor Self::Item + Sync> { + let range = self.range; + let len = range + .end + .checked_sub(range.start) + .expect("cannot iterate over a backward range"); + SourceDescriptor { + len, + fetch_item: move |index| range.start + index, + } + } +} + +/// A parallel source over a [`RangeInclusive`]. This struct is created by the +/// [`into_par_iter()`](IntoParallelSource::into_par_iter) method on +/// [`IntoParallelSource`]. +#[must_use = "iterator adaptors are lazy"] +pub struct RangeInclusiveParallelSource { + range: RangeInclusive, +} + +#[cfg(feature = "nightly")] +impl IntoParallelSource for RangeInclusive { + type Item = T; + type Source = RangeInclusiveParallelSource; + + fn into_par_iter(self) -> Self::Source { + RangeInclusiveParallelSource { range: self } + } +} + +#[cfg(feature = "nightly")] +impl ParallelSource for RangeInclusiveParallelSource { + type Item = T; + + fn descriptor(self) -> SourceDescriptor Self::Item + Sync> { + let (start, end) = self.range.into_inner(); + let len = T::steps_between(&start, &end).unwrap_or_else(|| { + panic!( + "cannot iterate over a range with more than usize::MAX ({}) items", + usize::MAX + ); + }); + let len = len.checked_add(1).unwrap_or_else(|| { + panic!( + "cannot iterate over a range with more than usize::MAX ({}) items", + usize::MAX + ); + }); + SourceDescriptor { + len, + fetch_item: move |index| T::forward(start, index), + } + } +} + +#[cfg(not(feature = "nightly"))] +impl IntoParallelSource for RangeInclusive { + type Item = usize; + type Source = RangeInclusiveParallelSource; + + fn into_par_iter(self) -> Self::Source { + RangeInclusiveParallelSource { range: self } + } +} + +#[cfg(not(feature = "nightly"))] +impl ParallelSource for RangeInclusiveParallelSource { + type Item = usize; + + fn descriptor(self) -> SourceDescriptor Self::Item + Sync> { + let (start, end) = self.range.into_inner(); + let len = end + .checked_sub(start) + .expect("cannot iterate over a backward range"); + let len = len.checked_add(1).unwrap_or_else(|| { + panic!( + "cannot iterate over a range with more than usize::MAX ({}) items", + usize::MAX + ); + }); + SourceDescriptor { + len, + fetch_item: move |index| start + index, + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 8882e80..a2d0f3c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ )] #![cfg_attr(not(test), forbid(clippy::undocumented_unsafe_blocks))] #![cfg_attr(all(test, feature = "nightly_tests"), feature(negative_impls))] +#![cfg_attr(feature = "nightly", feature(step_trait))] mod core; pub mod iter; @@ -26,8 +27,8 @@ pub use core::{CpuPinningPolicy, RangeStrategy, ThreadCount, ThreadPool, ThreadP mod test { use super::*; use crate::iter::{ - IntoParallelRefMutSource, IntoParallelRefSource, ParallelIterator, ParallelIteratorExt, - ParallelSourceExt, WithThreadPool, ZipableSource, + IntoParallelRefMutSource, IntoParallelRefSource, IntoParallelSource, ParallelIterator, + ParallelIteratorExt, ParallelSourceExt, WithThreadPool, ZipableSource, }; use std::cell::Cell; use std::collections::HashSet; @@ -108,11 +109,17 @@ mod test { test_pipeline_local_lifetime_input, test_pipeline_local_lifetime_output, test_pipeline_local_lifetime_accumulator, - test_source_par_iter, + test_source_slice, #[cfg(feature = "nightly_tests")] - test_source_par_iter_not_send, - test_source_par_iter_mut, - test_source_par_iter_mut_not_sync, + test_source_slice_not_send, + test_source_slice_mut, + test_source_slice_mut_not_sync, + test_source_range, + #[cfg(feature = "nightly")] + test_source_range_u64, + test_source_range_inclusive, + #[cfg(feature = "nightly")] + test_source_range_inclusive_u64, test_source_adaptor_chain, test_source_adaptor_enumerate, test_source_adaptor_rev, @@ -789,7 +796,7 @@ mod test { assert_eq!(sum, INPUT_LEN * (INPUT_LEN + 1) / 2); } - fn test_source_par_iter(range_strategy: RangeStrategy) { + fn test_source_slice(range_strategy: RangeStrategy) { let mut thread_pool = ThreadPoolBuilder { num_threads: ThreadCount::AvailableParallelism, range_strategy, @@ -807,7 +814,7 @@ mod test { } #[cfg(feature = "nightly_tests")] - fn test_source_par_iter_not_send(range_strategy: RangeStrategy) { + fn test_source_slice_not_send(range_strategy: RangeStrategy) { let mut thread_pool = ThreadPoolBuilder { num_threads: ThreadCount::AvailableParallelism, range_strategy, @@ -824,7 +831,7 @@ mod test { assert_eq!(sum, INPUT_LEN * (INPUT_LEN + 1) / 2); } - fn test_source_par_iter_mut(range_strategy: RangeStrategy) { + fn test_source_slice_mut(range_strategy: RangeStrategy) { let mut thread_pool = ThreadPoolBuilder { num_threads: ThreadCount::AvailableParallelism, range_strategy, @@ -841,7 +848,7 @@ mod test { assert_eq!(values, (0..=INPUT_LEN).map(|x| x * 2).collect::>()); } - fn test_source_par_iter_mut_not_sync(range_strategy: RangeStrategy) { + fn test_source_slice_mut_not_sync(range_strategy: RangeStrategy) { let mut thread_pool = ThreadPoolBuilder { num_threads: ThreadCount::AvailableParallelism, range_strategy, @@ -863,6 +870,72 @@ mod test { ); } + fn test_source_range(range_strategy: RangeStrategy) { + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + let sum = (0..INPUT_LEN as usize) + .into_par_iter() + .with_thread_pool(&mut thread_pool) + .reduce(|| 0, |x, y| x + y); + + assert_eq!(sum, (INPUT_LEN as usize - 1) * INPUT_LEN as usize / 2); + } + + #[cfg(feature = "nightly")] + fn test_source_range_u64(range_strategy: RangeStrategy) { + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + let sum = (0..INPUT_LEN) + .into_par_iter() + .with_thread_pool(&mut thread_pool) + .reduce(|| 0, |x, y| x + y); + + assert_eq!(sum, (INPUT_LEN - 1) * INPUT_LEN / 2); + } + + fn test_source_range_inclusive(range_strategy: RangeStrategy) { + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + let sum = (0..=INPUT_LEN as usize) + .into_par_iter() + .with_thread_pool(&mut thread_pool) + .reduce(|| 0, |x, y| x + y); + + assert_eq!(sum, INPUT_LEN as usize * (INPUT_LEN as usize + 1) / 2); + } + + #[cfg(feature = "nightly")] + fn test_source_range_inclusive_u64(range_strategy: RangeStrategy) { + let mut thread_pool = ThreadPoolBuilder { + num_threads: ThreadCount::AvailableParallelism, + range_strategy, + cpu_pinning: CpuPinningPolicy::No, + } + .build(); + + let sum = (0..=INPUT_LEN) + .into_par_iter() + .with_thread_pool(&mut thread_pool) + .reduce(|| 0, |x, y| x + y); + + assert_eq!(sum, INPUT_LEN * (INPUT_LEN + 1) / 2); + } + fn test_source_adaptor_chain(range_strategy: RangeStrategy) { let mut thread_pool = ThreadPoolBuilder { num_threads: ThreadCount::AvailableParallelism,