Skip to content

Commit

Permalink
Implement ZipableSource for arrays of parallel sources and bump MSRV …
Browse files Browse the repository at this point in the history
…to 1.77.
  • Loading branch information
gendx committed Jan 9, 2025
1 parent ac4b620 commit cd45976
Show file tree
Hide file tree
Showing 5 changed files with 365 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- stable
- beta
- nightly
- 1.75.0 # MSRV
- 1.77.0 # MSRV
fail-fast: false
runs-on: ${{ matrix.os }}
env:
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ categories = ["concurrency"]
keywords = ["parallelism"]
exclude = [".github/*"]
edition = "2021"
rust-version = "1.75.0"
rust-version = "1.77.0"

[package.metadata.docs.rs]
features = ["log", "log_parallelism"]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[![Crate](https://img.shields.io/crates/v/paralight.svg?logo=rust)](https://crates.io/crates/paralight)
[![Documentation](https://img.shields.io/docsrs/paralight?logo=rust)](https://docs.rs/paralight)
[![Minimum Rust 1.75.0](https://img.shields.io/badge/rust-1.75.0%2B-orange.svg?logo=rust)](https://releases.rs/docs/1.75.0/)
[![Minimum Rust 1.77.0](https://img.shields.io/badge/rust-1.77.0%2B-orange.svg?logo=rust)](https://releases.rs/docs/1.77.0/)
[![Lines of Code](https://www.aschey.tech/tokei/github/gendx/paralight?category=code&branch=main)](https://github.com/gendx/paralight)
[![Dependencies](https://deps.rs/repo/github/gendx/paralight/status.svg)](https://deps.rs/repo/github/gendx/paralight)
[![License](https://img.shields.io/crates/l/paralight.svg)](https://github.com/gendx/paralight/blob/main/LICENSE)
Expand Down
239 changes: 233 additions & 6 deletions src/iter/source/zip.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 Google LLC
// Copyright 2024-2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
Expand All @@ -12,8 +12,38 @@ use super::{ParallelSource, SourceCleanup, SourceDescriptor};
/// single [`ParallelSource`] that produces items grouped from the original
/// sources.
///
/// This trait is automatically implemented for [tuples](tuple) of
/// [`ParallelSource`]s (with up to 12 elements).
/// This trait is automatically implemented for [tuples](tuple) (with up to 12
/// elements) and [arrays](array) of [`ParallelSource`]s.
///
/// ```
/// # use paralight::iter::{
/// # IntoParallelRefSource, ParallelIteratorExt, ParallelSourceExt, ZipableSource,
/// # };
/// # use paralight::{CpuPinningPolicy, RangeStrategy, ThreadCount, ThreadPoolBuilder};
/// use std::array;
///
/// # let mut thread_pool = ThreadPoolBuilder {
/// # num_threads: ThreadCount::AvailableParallelism,
/// # range_strategy: RangeStrategy::WorkStealing,
/// # cpu_pinning: CpuPinningPolicy::No,
/// # }
/// # .build();
/// // arrays[0] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
/// // arrays[1] = [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
/// // ...
/// let arrays: [[i32; 10]; 20] = array::from_fn(|i| array::from_fn(|j| (10 * i + j) as i32));
///
/// let sums: [i32; 20] = arrays
/// .each_ref()
/// .map(|a| a.par_iter())
/// .zip_eq()
/// .with_thread_pool(&mut thread_pool)
/// .map(|a| a.map(|x| *x))
/// .reduce(|| [0; 20], |a, b| array::from_fn(|i| a[i] + b[i]));
///
/// // Each sum is: 100 * i + (0 + ... + 9)
/// assert_eq!(sums, array::from_fn(|i| 100 * i as i32 + 45));
/// ```
pub trait ZipableSource: Sized
where
ZipEq<Self>: ParallelSource,
Expand Down Expand Up @@ -45,7 +75,7 @@ where
/// .reduce(|| (0, 0), |(a, b), (c, d)| (a + c, b + d));
///
/// assert_eq!(sum_left, 5 * 11); // 1 + ... + 10
/// assert_eq!(sum_right, 10 * 21 - 5 * 11); // 11 + ... + 20
/// assert_eq!(sum_right, 100 + 5 * 11); // 11 + ... + 20
/// ```
///
/// ```should_panic
Expand Down Expand Up @@ -100,7 +130,7 @@ where
/// .reduce(|| (0, 0), |(a, b), (c, d)| (a + c, b + d));
///
/// assert_eq!(sum_left, 5 * 11); // 1 + ... + 10
/// assert_eq!(sum_right, 15 * 8 - 5 * 11); // 11 + ... + 15
/// assert_eq!(sum_right, 50 + 5 * 3); // 11 + ... + 15
/// ```
fn zip_max(self) -> ZipMax<Self> {
ZipMax(self)
Expand Down Expand Up @@ -131,7 +161,7 @@ where
/// .reduce(|| (0, 0), |(a, b), (c, d)| (a + c, b + d));
///
/// assert_eq!(sum_left, 5 * 3); // 1 + ... + 5
/// assert_eq!(sum_right, 15 * 8 - 5 * 11); // 11 + ... + 15
/// assert_eq!(sum_right, 50 + 5 * 3); // 11 + ... + 15
/// ```
fn zip_min(self) -> ZipMin<Self> {
ZipMin(self)
Expand Down Expand Up @@ -387,6 +417,190 @@ zipable_tuple!(zip10, A 0, B 1, C 2, D 3, E 4, F 5, G 6, H 7, I 8, J 9);
zipable_tuple!(zip11, A 0, B 1, C 2, D 3, E 4, F 5, G 6, H 7, I 8, J 9, K 10);
zipable_tuple!(zip12, A 0, B 1, C 2, D 3, E 4, F 5, G 6, H 7, I 8, J 9, K 10, L 11);

impl<T, const N: usize> ZipableSource for [T; N] where T: ParallelSource {}

impl<T, const N: usize> ParallelSource for ZipEq<[T; N]>
where
T: ParallelSource,
{
type Item = [T::Item; N];

fn descriptor(self) -> impl SourceDescriptor<Item = Self::Item> + Sync {
let array = self.0;
let descriptors = array.map(|source| source.descriptor());
for i in 1..N {
assert_eq!(
descriptors[0].len(),
descriptors[i].len(),
"called zip_eq() with sources of different lengths"
);
}
let len = if N == 0 { 0 } else { descriptors[0].len() };
ZipEqSourceDescriptor { descriptors, len }
}
}

impl<T, const N: usize> ParallelSource for ZipMax<[T; N]>
where
T: ParallelSource,
{
type Item = [Option<T::Item>; N];

fn descriptor(self) -> impl SourceDescriptor<Item = Self::Item> + Sync {
let array = self.0;
let descriptors = array.map(|source| source.descriptor());
let len = *descriptors
.each_ref()
.map(|desc| desc.len())
.iter()
.max()
.unwrap_or(&0);
ZipMaxSourceDescriptor { descriptors, len }
}
}

impl<T, const N: usize> ParallelSource for ZipMin<[T; N]>
where
T: ParallelSource,
{
type Item = [T::Item; N];

fn descriptor(self) -> impl SourceDescriptor<Item = Self::Item> + Sync {
let array = self.0;
let descriptors = array.map(|source| source.descriptor());
let len = *descriptors
.each_ref()
.map(|desc| desc.len())
.iter()
.min()
.unwrap_or(&0);
ZipMinArraySourceDescriptor { descriptors, len }
}
}

impl<T, const N: usize> SourceCleanup for ZipEqSourceDescriptor<[T; N]>
where
T: SourceCleanup,
{
const NEEDS_CLEANUP: bool = T::NEEDS_CLEANUP;

fn cleanup_item_range(&self, range: std::ops::Range<usize>) {
if Self::NEEDS_CLEANUP {
self.descriptors
.each_ref()
.map(|desc| desc.cleanup_item_range(range.clone()));
}
}
}

impl<T, const N: usize> SourceDescriptor for ZipEqSourceDescriptor<[T; N]>
where
T: SourceDescriptor,
{
type Item = [T::Item; N];

fn len(&self) -> usize {
self.len
}

fn fetch_item(&self, index: usize) -> Self::Item {
self.descriptors
.each_ref()
.map(|desc| desc.fetch_item(index))
}
}

impl<T, const N: usize> SourceCleanup for ZipMaxSourceDescriptor<[T; N]>
where
T: SourceDescriptor,
{
const NEEDS_CLEANUP: bool = T::NEEDS_CLEANUP;

fn cleanup_item_range(&self, range: std::ops::Range<usize>) {
if Self::NEEDS_CLEANUP {
self.descriptors.each_ref().map(|desc| {
let this_len = desc.len();
let this_range = range.start.min(this_len)..range.end.min(this_len);
desc.cleanup_item_range(this_range);
});
}
}
}

impl<T, const N: usize> SourceDescriptor for ZipMaxSourceDescriptor<[T; N]>
where
T: SourceDescriptor,
{
type Item = [Option<T::Item>; N];

fn len(&self) -> usize {
self.len
}

fn fetch_item(&self, index: usize) -> Self::Item {
self.descriptors.each_ref().map(|desc| {
if index < desc.len() {
Some(desc.fetch_item(index))
} else {
None
}
})
}
}

struct ZipMinArraySourceDescriptor<T, const N: usize>
where
T: SourceDescriptor,
{
descriptors: [T; N],
len: usize,
}

impl<T, const N: usize> SourceCleanup for ZipMinArraySourceDescriptor<T, N>
where
T: SourceDescriptor,
{
const NEEDS_CLEANUP: bool = T::NEEDS_CLEANUP;

fn cleanup_item_range(&self, range: std::ops::Range<usize>) {
if Self::NEEDS_CLEANUP {
self.descriptors.each_ref().map(|desc| {
desc.cleanup_item_range(range.clone());
});
}
}
}

impl<T, const N: usize> SourceDescriptor for ZipMinArraySourceDescriptor<T, N>
where
T: SourceDescriptor,
{
type Item = [T::Item; N];

fn len(&self) -> usize {
self.len
}

fn fetch_item(&self, index: usize) -> Self::Item {
self.descriptors
.each_ref()
.map(|desc| desc.fetch_item(index))
}
}

impl<T, const N: usize> Drop for ZipMinArraySourceDescriptor<T, N>
where
T: SourceDescriptor,
{
fn drop(&mut self) {
if Self::NEEDS_CLEANUP {
self.descriptors.each_ref().map(|desc| {
desc.cleanup_item_range(self.len..desc.len());
});
}
}
}

#[cfg(test)]
mod test {
#[test]
Expand Down Expand Up @@ -420,4 +634,17 @@ mod test {
assert_eq!(min_of!((3, 1, 2), 0, 1, 2), 1);
assert_eq!(min_of!((3, 2, 1), 0, 1, 2), 1);
}

#[test]
fn max_of() {
assert_eq!(max_of!((1, 2), 0, 1), 2);
assert_eq!(max_of!((2, 1), 0, 1), 2);

assert_eq!(max_of!((1, 2, 3), 0, 1, 2), 3);
assert_eq!(max_of!((1, 3, 2), 0, 1, 2), 3);
assert_eq!(max_of!((2, 1, 3), 0, 1, 2), 3);
assert_eq!(max_of!((2, 3, 1), 0, 1, 2), 3);
assert_eq!(max_of!((3, 1, 2), 0, 1, 2), 3);
assert_eq!(max_of!((3, 2, 1), 0, 1, 2), 3);
}
}
Loading

0 comments on commit cd45976

Please sign in to comment.