Skip to content
This repository has been archived by the owner on Nov 19, 2024. It is now read-only.

Commit

Permalink
feat(era_verifier): usage of channels to parallelize epoch validation
Browse files Browse the repository at this point in the history
  • Loading branch information
pedro bufulin committed Apr 1, 2024
1 parent 3c20bc4 commit 90b0c67
Showing 1 changed file with 48 additions and 24 deletions.
72 changes: 48 additions & 24 deletions src/era_verifier.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use futures::stream::{FuturesOrdered, StreamExt};
use header_accumulator::era_validator::era_validate;
use tokio::task;

use header_accumulator::types::ExtHeaderRecord;
use sf_protos::ethereum::r#type::v2::Block;
use tokio::sync::mpsc;

use crate::store;
pub const MAX_EPOCH_SIZE: usize = 8192;
Expand All @@ -18,31 +21,52 @@ pub async fn verify_eras(
decompress: Option<bool>,
) -> Result<Vec<usize>, anyhow::Error> {
let mut validated_epochs = Vec::new();
let (tx, mut rx) = mpsc::unbounded_channel();
for epoch in start_epoch..=end_epoch.unwrap_or(start_epoch + 1) {
let blocks = get_blocks_from_store(epoch, store_url, decompress).await?;
let (successful_headers, _): (Vec<_>, Vec<_>) = blocks
.iter()
.cloned()
.map(|block| ExtHeaderRecord::try_from(&block))
.fold((Vec::new(), Vec::new()), |(mut succ, mut errs), res| {
match res {
Ok(header) => succ.push(header),
Err(e) => {
// Log the error or handle it as needed
eprintln!("Error converting block: {:?}", e);
errs.push(e);
}
};
(succ, errs)
});
let root = era_validate(
successful_headers,
master_acc_file,
epoch,
Some(epoch + 1),
None,
)?;
validated_epochs.extend(root);
let tx = tx.clone();
let store_url = store_url.clone();
let decompress = decompress.clone();
let master_acc_file = Some(master_acc_file.unwrap().clone());

task::spawn(async move {
match get_blocks_from_store(epoch, &store_url, decompress).await {
Ok(blocks) => {
let (successful_headers, _): (Vec<_>, Vec<_>) = blocks
.iter()
.map(ExtHeaderRecord::try_from)
.fold((Vec::new(), Vec::new()), |(mut succ, mut errs), res| {
match res {
Ok(header) => succ.push(header),
Err(e) => {
// Log the error or handle it as needed
eprintln!("Error converting block: {:?}", e);
errs.push(e);
}
};
(succ, errs)
});
let valid_epochs = era_validate(
successful_headers,
master_acc_file.as_ref(),
epoch,
Some(epoch + 1),
None,
)
.unwrap();

let _ = tx.send(valid_epochs);
}
Err(e) => eprintln!("Error fetching blocks for epoch {}: {:?}", epoch, e),
}
});
}

// Drop the original sender to close the channel once all senders are dropped
drop(tx);

// Process blocks as they arrive
while let Some((epochs)) = rx.recv().await {
validated_epochs.extend(epochs);
}

Ok(validated_epochs)
Expand Down

0 comments on commit 90b0c67

Please sign in to comment.