Skip to content

Commit

Permalink
feat: maybe_parallel_map util function
Browse files Browse the repository at this point in the history
  • Loading branch information
pakhomov-dfinity committed Sep 6, 2024
1 parent 656d7a6 commit 1fb6a8f
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 118 deletions.
86 changes: 34 additions & 52 deletions rs/state_layout/src/state_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use ic_types::{
CanisterLog, ComputeAllocation, Cycles, ExecutionRound, Height, LongExecutionMode,
MemoryAllocation, NumInstructions, PrincipalId, SnapshotId, Time,
};
use ic_utils::thread::parallel_map;
use ic_utils::thread::maybe_parallel_map;
use ic_wasm_types::{CanisterModule, WasmHash};
use prometheus::{Histogram, IntCounterVec};
use std::collections::{BTreeMap, BTreeSet};
Expand Down Expand Up @@ -2637,25 +2637,17 @@ fn sync_and_mark_files_readonly(
#[allow(unused)] log: &ReplicaLogger,
path: &Path,
#[allow(unused)] metrics: &StateLayoutMetrics,
thread_pool: Option<&mut scoped_threadpool::Pool>,
mut thread_pool: Option<&mut scoped_threadpool::Pool>,
) -> std::io::Result<()> {
let paths = dir_list_recursive(path)?;
if let Some(thread_pool) = thread_pool {
let results = parallel_map(thread_pool, paths.iter(), |p| {
mark_readonly_if_file(p)?;
#[cfg(not(target_os = "linux"))]
sync_path(p)?;
Ok::<(), std::io::Error>(())
});

results.into_iter().try_for_each(identity)?;
} else {
for p in paths {
mark_readonly_if_file(&p)?;
#[cfg(not(target_os = "linux"))]
sync_path(p)?;
}
}
let results = maybe_parallel_map(&mut thread_pool, paths.iter(), |p| {
mark_readonly_if_file(p)?;
#[cfg(not(target_os = "linux"))]
sync_path(p)?;
Ok::<(), std::io::Error>(())
});

results.into_iter().try_for_each(identity)?;
#[cfg(target_os = "linux")]
{
let f = std::fs::File::open(path)?;
Expand Down Expand Up @@ -2694,7 +2686,7 @@ fn copy_recursively<P>(
root_dst: &Path,
fsync: FSync,
file_copy_instruction: P,
thread_pool: Option<&mut scoped_threadpool::Pool>,
mut thread_pool: Option<&mut scoped_threadpool::Pool>,
) -> std::io::Result<()>
where
P: Fn(&Path) -> CopyInstruction,
Expand All @@ -2719,39 +2711,29 @@ where
// directory that is wiped out on replica start, so we don't care much
// about this temporary directory being properly synced.
std::fs::create_dir_all(root_dst)?;
match thread_pool {
Some(thread_pool) => {
let results = parallel_map(thread_pool, copy_plan.create_and_sync_dir.iter(), |op| {
// We keep directories writeable to make sure we can rename
// them or delete the files.
std::fs::create_dir_all(&op.dst)
});
results.into_iter().try_for_each(identity)?;
let results = parallel_map(thread_pool, copy_plan.copy_and_sync_file.iter(), |op| {
copy_checkpoint_file(log, metrics, &op.src, &op.dst, op.dst_permissions, fsync)
});
results.into_iter().try_for_each(identity)?;
if let FSync::Yes = fsync {
let results =
parallel_map(thread_pool, copy_plan.create_and_sync_dir.iter(), |op| {
sync_path(&op.dst)
});
results.into_iter().try_for_each(identity)?;
}
}
None => {
for op in copy_plan.create_and_sync_dir.iter() {
std::fs::create_dir_all(&op.dst)?;
}
for op in copy_plan.copy_and_sync_file.into_iter() {
copy_checkpoint_file(log, metrics, &op.src, &op.dst, op.dst_permissions, fsync)?;
}
if let FSync::Yes = fsync {
for op in copy_plan.create_and_sync_dir.iter() {
sync_path(&op.dst)?;
}
}
}
let results = maybe_parallel_map(
&mut thread_pool,
copy_plan.create_and_sync_dir.iter(),
|op| {
// We keep directories writeable to make sure we can rename
// them or delete the files.
std::fs::create_dir_all(&op.dst)
},
);
results.into_iter().try_for_each(identity)?;
let results = maybe_parallel_map(
&mut thread_pool,
copy_plan.copy_and_sync_file.iter(),
|op| copy_checkpoint_file(log, metrics, &op.src, &op.dst, op.dst_permissions, fsync),
);
results.into_iter().try_for_each(identity)?;
if let FSync::Yes = fsync {
let results = maybe_parallel_map(
&mut thread_pool,
copy_plan.create_and_sync_dir.iter(),
|op| sync_path(&op.dst),
);
results.into_iter().try_for_each(identity)?;
}
Ok(())
}
Expand Down
97 changes: 31 additions & 66 deletions rs/state_manager/src/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use ic_state_layout::{
};
use ic_types::batch::RawQueryStats;
use ic_types::{CanisterTimer, Height, Time};
use ic_utils::thread::parallel_map;
use ic_utils::thread::maybe_parallel_map;
use std::collections::BTreeMap;
use std::convert::TryFrom;
use std::sync::Arc;
Expand Down Expand Up @@ -250,39 +250,20 @@ pub fn load_checkpoint(

let mut canister_states = BTreeMap::new();
let canister_ids = checkpoint_layout.canister_ids()?;
match thread_pool {
Some(ref mut thread_pool) => {
let results = parallel_map(thread_pool, canister_ids.iter(), |canister_id| {
load_canister_state_from_checkpoint(
checkpoint_layout,
canister_id,
Arc::clone(&fd_factory),
metrics,
)
});

for canister_state in results.into_iter() {
let (canister_state, durations) = canister_state?;
canister_states
.insert(canister_state.system_state.canister_id(), canister_state);

durations.apply(metrics);
}
}
None => {
for canister_id in canister_ids.iter() {
let (canister_state, durations) = load_canister_state_from_checkpoint(
checkpoint_layout,
canister_id,
Arc::clone(&fd_factory),
metrics,
)?;
canister_states
.insert(canister_state.system_state.canister_id(), canister_state);

durations.apply(metrics);
}
}
let results = maybe_parallel_map(&mut thread_pool, canister_ids.iter(), |canister_id| {
load_canister_state_from_checkpoint(
checkpoint_layout,
canister_id,
Arc::clone(&fd_factory),
metrics,
)
});

for canister_state in results.into_iter() {
let (canister_state, durations) = canister_state?;
canister_states.insert(canister_state.system_state.canister_id(), canister_state);

durations.apply(metrics);
}

canister_states
Expand All @@ -296,38 +277,22 @@ pub fn load_checkpoint(

let mut canister_snapshots = BTreeMap::new();
let snapshot_ids = checkpoint_layout.snapshot_ids()?;
match thread_pool {
Some(thread_pool) => {
let results = parallel_map(thread_pool, snapshot_ids.iter(), |snapshot_id| {
(
**snapshot_id,
load_snapshot_from_checkpoint(
checkpoint_layout,
snapshot_id,
Arc::clone(&fd_factory),
),
)
});

for (snapshot_id, canister_snapshot) in results.into_iter() {
let (canister_snapshot, durations) = canister_snapshot?;
canister_snapshots.insert(snapshot_id, Arc::new(canister_snapshot));

durations.apply(metrics);
}
}
None => {
for snapshot_id in snapshot_ids.iter() {
let (canister_snapshot, durations) = load_snapshot_from_checkpoint(
checkpoint_layout,
snapshot_id,
Arc::clone(&fd_factory),
)?;
canister_snapshots.insert(*snapshot_id, Arc::new(canister_snapshot));

durations.apply(metrics);
}
}
let results = maybe_parallel_map(&mut thread_pool, snapshot_ids.iter(), |snapshot_id| {
(
**snapshot_id,
load_snapshot_from_checkpoint(
checkpoint_layout,
snapshot_id,
Arc::clone(&fd_factory),
),
)
});

for (snapshot_id, canister_snapshot) in results.into_iter() {
let (canister_snapshot, durations) = canister_snapshot?;
canister_snapshots.insert(snapshot_id, Arc::new(canister_snapshot));

durations.apply(metrics);
}

CanisterSnapshots::new(canister_snapshots)
Expand Down
18 changes: 18 additions & 0 deletions rs/utils/src/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@ where
.collect()
}

/// parallel_map(...) if thread_pool is Some(); map if None.
pub fn maybe_parallel_map<S, T, I, F>(
thread_pool: &mut Option<&mut scoped_threadpool::Pool>,
items: I,
f: F,
) -> Vec<T>
where
S: Send,
T: Send,
I: Iterator<Item = S>,
F: Fn(&S) -> T + Send + Copy,
{
match thread_pool {
Some(thread_pool) => parallel_map(thread_pool, items, f),
None => items.map(|x| f(&x)).collect::<Vec<T>>(),
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 1fb6a8f

Please sign in to comment.