Skip to content

Commit

Permalink
feat: [WIP] adopt DaveConsensus contract
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenctw committed Sep 7, 2024
1 parent ab101d7 commit ed2f1a8
Show file tree
Hide file tree
Showing 15 changed files with 269 additions and 106 deletions.
2 changes: 2 additions & 0 deletions cartesi-rollups/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ repository = "https://github.com/cartesi/dave"

[workspace.dependencies]
rollups-blockchain-reader = { version = "0.1", path = "blockchain-reader" }
rollups-dave-runner = { version = "0.1", path = "dave-runner" }
rollups-epoch-manager = { version = "0.1", path = "epoch-manager" }
rollups-machine-runner = { version = "0.1", path = "machine-runner" }
rollups-state-manager = { version = "0.1", path = "state-manager" }
Expand All @@ -33,6 +34,7 @@ cartesi-dave-arithmetic = { path = "../../common-rs/arithmetic" }
cartesi-dave-contracts = { path = "../contract-bindings" }
cartesi-dave-merkle = { path = "../../common-rs/merkle" }
cartesi-prt-core = { path = "../../prt/prt-rs/core" }
cartesi-prt-compute = { path = "../../prt/prt-rs/compute" }

alloy = { version = "0.3.1", features = ["sol-types", "contract", "network", "reqwest", "signers", "signer-local"] }
anyhow = "1.0"
Expand Down
133 changes: 63 additions & 70 deletions cartesi-rollups/node/blockchain-reader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,18 @@ use alloy_rpc_types_eth::Topic;
use async_recursion::async_recursion;
use clap::Parser;
use error::BlockchainReaderError;
use num_traits::cast::ToPrimitive;
use std::{
marker::{Send, Sync},
str::FromStr,
sync::Arc,
time::Duration,
};

use cartesi_dave_contracts::daveconsensus::DaveConsensus::EpochSealed;
use cartesi_rollups_contracts::inputbox::InputBox::InputAdded;
use rollups_state_manager::{Epoch, Input, InputId, StateManager};

/// this is a placeholder for the non-existing epoch event
// #[derive(EthEvent)]
// #[ethevent(name = "EpochSealed", abi = "EpochSealed(uint256,uint256)")]
// pub struct EpochSealedFilter {
// #[ethevent(indexed)]
// pub epoch_index: ::ethers::core::types::U256,
// #[ethevent(indexed)]
// pub input_count: ::ethers::core::types::U256,
// }

#[derive(Debug, Clone, Parser)]
#[command(name = "cartesi_rollups_config")]
#[command(about = "Addresses of Cartesi Rollups")]
Expand All @@ -58,7 +50,7 @@ pub struct BlockchainReader<SM: StateManager> {
prev_block: u64,
provider: PartitionProvider,
input_reader: EventReader<InputAdded>,
epoch_reader: EventReader<InputAdded>,
epoch_reader: EventReader<EpochSealed>,
sleep_duration: Duration,
}

Expand All @@ -85,7 +77,7 @@ where
prev_block,
provider: partition_provider,
input_reader: EventReader::<InputAdded>::new(),
epoch_reader: EventReader::<InputAdded>::new(),
epoch_reader: EventReader::<EpochSealed>::new(),
sleep_duration: Duration::from_secs(sleep_duration),
})
}
Expand Down Expand Up @@ -119,32 +111,21 @@ where
prev_block: u64,
current_block: u64,
) -> Result<(Vec<Input>, Vec<Epoch>), SM> {
let prev_sealed_epoch = self
.state_manager
.epoch_count()
.map_err(|e| BlockchainReaderError::StateManagerError(e))?;

// read epochs from blockchain
let epochs: Vec<Epoch> = self
// read sealed epochs from blockchain
let sealed_epochs: Vec<Epoch> = self
.collect_sealed_epochs(prev_block, current_block)
.await?;

let latest_sealed_epoch = match epochs.last() {
Some(e) => e.epoch_number,
None => prev_sealed_epoch,
};

// read inputs from blockchain
let inputs = self
.collect_inputs(
prev_block,
current_block,
prev_sealed_epoch,
latest_sealed_epoch,
sealed_epochs.iter().collect::<Vec<&Epoch>>().into_iter(),
)
.await?;

Ok((inputs, epochs))
Ok((inputs, sealed_epochs))
}

async fn collect_sealed_epochs(
Expand All @@ -164,10 +145,17 @@ where
.await?
.iter()
.map(|e| Epoch {
epoch_number: 0,
input_count: 0,
// epoch_number: e.epoch_index.as_u64(),
// input_count: e.input_count.as_u64(),
epoch_number: e
.0
.epochNumber
.to_u64()
.expect("fail to convert epoch number"),
epoch_boundary: e
.0
.blockNumberUpperBound
.to_u64()
.expect("fail to convert epoch boundary"),
root_tournament: e.0.tournament.to_string(),
})
.collect())
}
Expand All @@ -176,11 +164,9 @@ where
&self,
prev_block: u64,
current_block: u64,
prev_sealed_epoch: u64,
latest_sealed_epoch: u64,
sealed_epochs_iter: impl Iterator<Item = &Epoch>,
) -> Result<Vec<Input>, SM> {
// read new inputs from blockchain
// collected inputs should belong to `prev_sealed_epoch` + 1 and/or later epochs
let input_events = self
.input_reader
.next(
Expand All @@ -192,45 +178,43 @@ where
)
.await?;

let last_input = self
.state_manager
.last_input()
.map_err(|e| BlockchainReaderError::StateManagerError(e))?;

let (mut next_input_index_in_epoch, mut last_epoch_number) = {
match last_input {
// continue inserting inputs from where last were left
Some(input) => (input.input_index_in_epoch + 1, input.epoch_number),
// first ever input for the application
None => (0, 0),
}
};

let mut inputs = vec![];
let input_events_len = input_events.len();
let mut input_events_iter = input_events.into_iter();

// all inputs from `prev_sealed_epoch` should be in database already because it's sealed in previous tick
for epoch_number in prev_sealed_epoch + 1..latest_sealed_epoch + 1 {
// iterate through newly sealed epochs
// get total input count submitted to the sealed epoch
let total_input_count_of_epoch = self
.state_manager
.epoch(epoch_number)
.map_err(|e| BlockchainReaderError::StateManagerError(e))?
.unwrap()
.input_count;
// get input count of epoch that currently exist in database
let current_input_count_of_epoch = self
.state_manager
.input_count(epoch_number)
.map_err(|e| BlockchainReaderError::StateManagerError(e))?;

// fill in the inputs of the sealed epoch
let mut input_events_iter = input_events.iter();
for epoch in sealed_epochs_iter {
// iterate through newly sealed epochs, fill in the inputs accordingly
let inputs_of_epoch = self
.construct_input_ids(
epoch_number,
current_input_count_of_epoch,
total_input_count_of_epoch,
epoch.epoch_number,
epoch.epoch_boundary,
&mut next_input_index_in_epoch,
&mut input_events_iter,
)
.await;

inputs.extend(inputs_of_epoch);
last_epoch_number = epoch.epoch_number + 1;
}

// all remaining inputs belong to an epoch that's not sealed yet
let inputs_of_epoch = self
.construct_input_ids(
latest_sealed_epoch + 1,
0,
(input_events_len - inputs.len()).try_into().unwrap(),
last_epoch_number,
u64::MAX,
&mut next_input_index_in_epoch,
&mut input_events_iter,
)
.await;
Expand All @@ -243,23 +227,32 @@ where
async fn construct_input_ids(
&self,
epoch_number: u64,
input_index_in_epoch_start: u64,
input_index_in_epoch_end: u64,
input_events_iter: &mut impl Iterator<Item = InputAdded>,
epoch_boundary: u64,
next_input_index_in_epoch: &mut u64,
input_events_iter: &mut impl Iterator<Item = (InputAdded, u64)>,
) -> Vec<Input> {
let mut inputs = vec![];

for input_index_in_epoch in input_index_in_epoch_start..input_index_in_epoch_end {
while input_events_iter
.peekable()
.peek()
.expect("fail to get peek next input")
.1
< epoch_boundary
{
let input = Input {
id: InputId {
epoch_number,
input_index_in_epoch,
input_index_in_epoch: *next_input_index_in_epoch,
},
data: input_events_iter.next().unwrap().input.to_vec(),
data: input_events_iter.next().unwrap().0.input.to_vec(),
};

*next_input_index_in_epoch += 1;
inputs.push(input);
}
// input index in epoch should be reset when a new epoch starts
*next_input_index_in_epoch = 0;

inputs
}
Expand All @@ -283,7 +276,7 @@ impl<E: SolEvent + Send + Sync> EventReader<E> {
prev_finalized: u64,
current_finalized: u64,
provider: &PartitionProvider,
) -> std::result::Result<Vec<E>, ProviderErrors> {
) -> std::result::Result<Vec<(E, Option<u64>)>, ProviderErrors> {
assert!(current_finalized > prev_finalized);

let logs = provider
Expand Down Expand Up @@ -320,7 +313,7 @@ impl PartitionProvider {
read_from: &Address,
start_block: u64,
end_block: u64,
) -> std::result::Result<Vec<E>, Vec<Error>> {
) -> std::result::Result<Vec<(E, Option<u64>)>, Vec<Error>> {
self.get_events_rec(topic1, read_from, start_block, end_block)
.await
}
Expand All @@ -332,7 +325,7 @@ impl PartitionProvider {
read_from: &Address,
start_block: u64,
end_block: u64,
) -> std::result::Result<Vec<E>, Vec<Error>> {
) -> std::result::Result<Vec<(E, Option<u64>)>, Vec<Error>> {
// TODO: partition log queries if range too large
let event = {
let mut e = Event::new_sol(&self.inner, read_from)
Expand All @@ -349,7 +342,7 @@ impl PartitionProvider {

match event.query().await {
Ok(l) => {
let logs = l.into_iter().map(|x| x.0).collect();
let logs = l.into_iter().map(|x| (x.0, x.1.block_number)).collect();

Ok(logs)
}
Expand Down
2 changes: 2 additions & 0 deletions cartesi-rollups/node/dave-rollups/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ repository = { workspace = true }

[dependencies]
rollups-blockchain-reader = { workspace = true }
rollups-dave-runner = { workspace = true }
rollups-epoch-manager = { workspace = true }
rollups-machine-runner = { workspace = true }
rollups-state-manager = { workspace = true }

cartesi-rollups-contracts = { workspace = true }
cartesi-prt-core = { workspace = true }
cartesi-prt-compute = { workspace = true }

alloy = { workspace = true }
anyhow = { workspace = true }
Expand Down
28 changes: 28 additions & 0 deletions cartesi-rollups/node/dave-rollups/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use alloy::primitives::Address;
use cartesi_prt_core::arena::BlockchainConfig;

use clap::Parser;
use dave_runner::DaveRunner;
use log::error;
use rollups_blockchain_reader::{AddressBook, BlockchainReader};
use rollups_dave_runner::DaveRunner;
use rollups_epoch_manager::EpochManager;
use rollups_machine_runner::MachineRunner;
use rollups_state_manager::persistent_state_access::PersistentStateAccess;
use rusqlite::config;
use std::sync::Arc;
use tokio::task::JoinHandle;
use tokio::task::{spawn, spawn_blocking};
Expand Down Expand Up @@ -54,6 +59,29 @@ pub fn create_blockchain_reader_task(
})
}

pub fn create_dave_runner_task(
state_manager: Arc<PersistentStateAccess>,
parameters: &DaveParameters,
) -> JoinHandle<()> {
let params = parameters.clone();

spawn_blocking(move || {
let mut dave_runner = DaveRunner::new(
&parameters.blockchain_config,
state_manager,
params.sleep_duration,
)
.inspect_err(|e| error!("{e}"))
.unwrap();

dave_runner
.start()
.await
.inspect_err(|e| error!("{e}"))
.unwrap();
})
}

pub fn create_epoch_manager_task(
state_manager: Arc<PersistentStateAccess>,
parameters: &DaveParameters,
Expand Down
6 changes: 4 additions & 2 deletions cartesi-rollups/node/dave-rollups/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ async fn main() -> Result<()> {
let blockchain_reader_task = create_blockchain_reader_task(state_manager.clone(), &parameters);
let epoch_manager_task = create_epoch_manager_task(state_manager.clone(), &parameters);
let machine_runner_task = create_machine_runner_task(state_manager.clone(), &parameters);
let dave_runner_task = create_dave_runner_task(state_manager.clone(), &parameters);

let (_blockchain_reader_res, _epoch_manager_res, _machine_runner_res) = futures::join!(
let (_blockchain_reader_res, _epoch_manager_res, _machine_runner_res, _dave_runner_res) = futures::join!(
blockchain_reader_task,
epoch_manager_task,
machine_runner_task
machine_runner_task,
dave_runner_task
);

Ok(())
Expand Down
14 changes: 14 additions & 0 deletions cartesi-rollups/node/dave-runner/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "rollups-dave-runner"
version.workspace = true
authors.workspace = true
description.workspace = true
edition.workspace = true
homepage.workspace = true
license-file.workspace = true
readme.workspace = true
repository.workspace = true

[dependencies]
cartesi-prt-core = { workspace = true }
rollups-state-manager = { workspace = true }
Loading

0 comments on commit ed2f1a8

Please sign in to comment.