Skip to content
This repository has been archived by the owner on Nov 19, 2024. It is now read-only.

Commit

Permalink
feat: fetch and validate from s3
Browse files Browse the repository at this point in the history
  • Loading branch information
pedrohba1 committed Feb 27, 2024
1 parent c0de6bd commit b607899
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 1 deletion.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ env_logger = "0.11.2"
header_accumulator = { git = "ssh://[email protected]/semiotic-ai/header_accumulator.git", branch = "refactor" }
trin-validation = { git = "https://github.com/ethereum/trin.git", version = "0.1.0" }
tree_hash = "0.5.2"
object_store = { version = "0.9.0", features = ["gcp", "http"] }
object_store = { version = "0.9.0", features = ["gcp", "http", "aws"] }
log = "0.4.20"
dotenv = "0.15.0"
95 changes: 95 additions & 0 deletions src/bin/fetch-s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use dotenv::dotenv;
use header_accumulator::era_validator::era_validate;
use std::env;

use clap::Parser;
use decoder::{handle_buf, sf::ethereum::r#type::v2::Block};
use flat_head::{era_verifier::MAX_EPOCH_SIZE, utils::gen_dbin_filenames};
use object_store::{aws::AmazonS3Builder, path::Path, ObjectStore};

/// This program is intended for fetching
/// flat files from an FTP server and verifying them. It skips fetching files
/// that were already verified or are already present
#[derive(Parser, Debug)]
#[command(version, about = "a flat files FTP server fetch and verify", long_about = None)]
struct Args {
/// epoch to start fetching flat files
#[arg(short, long)]
start_epoch: u64,

/// epoch where flat files end
#[arg(short, long)]
end_epoch: u64,

/// directly set an endpoint such as http://locahlost:900
/// for local development or another s3 compatible API
#[arg(short = 'p', long)]
endpoint: Option<String>,
}

fn handle_var(var_name: &str) -> String {
match env::var(var_name) {
Ok(value) => value,
Err(e) => {
println!("Error reading environment variable {}: {}", var_name, e);
std::process::exit(1);
}
}
}

#[tokio::main]
async fn main() {
dotenv().ok();
let args = Args::parse();

let aws_region = handle_var("AWS_REGION");
let bucket_name = handle_var("BUCKET_NAME");
let access_key_id = handle_var("ACCESS_KEY_ID");
let secret_key = handle_var("SECRET_KEY");

let mut builder = AmazonS3Builder::new()
.with_region(aws_region)
.with_bucket_name(bucket_name)
.with_access_key_id(access_key_id)
.with_secret_access_key(secret_key)
.with_allow_http(true);

if let Some(endpoint) = args.endpoint {
builder = builder.with_endpoint(endpoint);
}

let s3 = builder.build().unwrap();

let file_names = gen_dbin_filenames(args.start_epoch, args.end_epoch);

let mut blocks: Vec<Block> = Vec::new();
for file_name in file_names {
let path_string = format!("/{}", file_name);
let path = Path::from(path_string);
let result = s3.get(&path).await.unwrap();

let bytes = result.bytes().await.unwrap();

// Use `as_ref` to get a &[u8] from `bytes` and pass it to `handle_buf`
match handle_buf(bytes.as_ref()) {
Ok(new_blocks) => {
blocks.extend(new_blocks);
// Handle the successfully decoded blocks
}
Err(e) => {
log::error!("error: {:?}", e);
// Handle the decoding error
}
}
if blocks.len() >= 8192 {
let epoch_blocks: Vec<Block> = blocks.drain(0..MAX_EPOCH_SIZE).collect();
let valid_blocks = era_validate(
epoch_blocks,
None,
args.start_epoch as usize,
Some(args.end_epoch as usize),
);
println!("{:?} valid epochs", valid_blocks);
}
}
}

0 comments on commit b607899

Please sign in to comment.