Skip to content

Commit

Permalink
test(papyrus_p2p_sync): use run_test in transaction test
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama committed Jan 12, 2025
1 parent 0c9e730 commit d133537
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 153 deletions.
2 changes: 1 addition & 1 deletion crates/blockifier/cairo_native
Submodule cairo_native updated 109 files
1 change: 0 additions & 1 deletion crates/papyrus_p2p_sync/src/client/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ pub enum Action {
SendStateDiff(DataOrFin<StateDiffChunk>),
/// Send a transaction as a response to a query we got from ReceiveQuery. Will panic if didn't
/// call ReceiveQuery with DataType::Transaction before.
#[allow(dead_code)]
SendTransaction(DataOrFin<FullTransaction>),
/// Send a class as a response to a query we got from ReceiveQuery. Will panic if didn't
/// call ReceiveQuery with DataType::Class before.
Expand Down
270 changes: 119 additions & 151 deletions crates/papyrus_p2p_sync/src/client/transaction_test.rs
Original file line number Diff line number Diff line change
@@ -1,181 +1,149 @@
use std::cmp::min;
use std::collections::HashMap;

use futures::{FutureExt, StreamExt};
use papyrus_protobuf::sync::{
BlockHashOrNumber,
DataOrFin,
Direction,
Query,
SignedBlockHeader,
TransactionQuery,
};
use futures::FutureExt;
use papyrus_protobuf::sync::{BlockHashOrNumber, DataOrFin, Direction, Query};
use papyrus_storage::body::BodyStorageReader;
use papyrus_test_utils::get_test_body;
use starknet_api::block::{BlockBody, BlockHeader, BlockHeaderWithoutHash, BlockNumber};
use starknet_api::transaction::FullTransaction;
use papyrus_test_utils::{get_rng, get_test_body};
use starknet_api::block::{BlockBody, BlockNumber};
use starknet_api::transaction::{FullTransaction, TransactionHash};

use super::test_utils::{
create_block_hashes_and_signatures,
setup,
TestArgs,
HEADER_QUERY_LENGTH,
random_header,
run_test,
wait_for_marker,
Action,
DataType,
SLEEP_DURATION_TO_LET_SYNC_ADVANCE,
TRANSACTION_QUERY_LENGTH,
WAIT_PERIOD_FOR_NEW_DATA,
TIMEOUT_FOR_TEST,
};
use crate::client::test_utils::{wait_for_marker, DataType, TIMEOUT_FOR_TEST};

#[tokio::test]
async fn transaction_basic_flow() {
let TestArgs {
p2p_sync,
storage_reader,
mut mock_header_response_manager,
mut mock_transaction_response_manager,
// The test will fail if we drop these
mock_state_diff_response_manager: _mock_state_diff_response_manager,
mock_class_response_manager: _mock_class_responses_manager,
..
} = setup();

const NUM_TRANSACTIONS_PER_BLOCK: u64 = 6;
let block_hashes_and_signatures =
create_block_hashes_and_signatures(HEADER_QUERY_LENGTH.try_into().unwrap());
let BlockBody { transactions, transaction_outputs, transaction_hashes } = get_test_body(
(NUM_TRANSACTIONS_PER_BLOCK * HEADER_QUERY_LENGTH).try_into().unwrap(),
None,
None,
None,
);

// Create a future that will receive queries, send responses and validate the results.
let parse_queries_future = async move {
// We wait for the state diff sync to see that there are no headers and start sleeping
tokio::time::sleep(SLEEP_DURATION_TO_LET_SYNC_ADVANCE).await;

// Check that before we send headers there is no transaction query.
assert!(mock_transaction_response_manager.next().now_or_never().is_none());
let mut mock_header_responses_manager = mock_header_response_manager.next().await.unwrap();

// Send headers for entire query.
for (i, (block_hash, block_signature)) in block_hashes_and_signatures.iter().enumerate() {
// Send responses
mock_header_responses_manager
.send_response(DataOrFin(Some(SignedBlockHeader {
block_header: BlockHeader {
block_hash: *block_hash,
block_header_without_hash: BlockHeaderWithoutHash {
block_number: BlockNumber(i.try_into().unwrap()),
..Default::default()
},
n_transactions: NUM_TRANSACTIONS_PER_BLOCK.try_into().unwrap(),
state_diff_length: Some(0),
..Default::default()
},
signatures: vec![*block_signature],
})))
.await
.unwrap();
const NUM_BLOCKS: u64 = 5;
const TRANSACTION_QUERY_LENGTH: u64 = 2;

let mut rng = get_rng();

let block_bodies = (0..NUM_BLOCKS)
// TODO(shahak): remove Some(0) once we separate events from transactions correctly.
.map(|i| {
let mut body = get_test_body(i.try_into().unwrap(), Some(0), None, None);
// get_test_body returns transaction hash in the range 0..num_transactions. We want to
// avoid collisions in transaction hash.
for transaction_hash in &mut body.transaction_hashes {
*transaction_hash = TransactionHash(transaction_hash.0 + NUM_BLOCKS * i);
}
body
})
.collect::<Vec<_>>();

let mut actions = vec![
Action::RunP2pSync,
// We already validate the header query content in other tests.
Action::ReceiveQuery(Box::new(|_query| ()), DataType::Header),
];

// Send headers with corresponding transaction length
for (i, block_body) in block_bodies.iter().enumerate() {
actions.push(Action::SendHeader(DataOrFin(Some(random_header(
&mut rng,
BlockNumber(i.try_into().unwrap()),
None,
Some(block_body.transactions.len()),
)))));
}
actions.push(Action::SendHeader(DataOrFin(None)));

// Send transactions for each block and then validate they were written
for (i, BlockBody { transactions, transaction_outputs, transaction_hashes }) in
block_bodies.into_iter().enumerate()
{
let i = u64::try_from(i).unwrap();
// If this block starts a new transaction query, receive the new query.
if i % TRANSACTION_QUERY_LENGTH == 0 {
let limit = min(TRANSACTION_QUERY_LENGTH, NUM_BLOCKS - i).try_into().unwrap();
actions.push(Action::ReceiveQuery(
Box::new(move |query| {
assert_eq!(
query,
Query {
start_block: BlockHashOrNumber::Number(BlockNumber(i)),
direction: Direction::Forward,
limit,
step: 1,
}
)
}),
DataType::Transaction,
));
}

wait_for_marker(
DataType::Header,
&storage_reader,
BlockNumber(HEADER_QUERY_LENGTH),
SLEEP_DURATION_TO_LET_SYNC_ADVANCE,
TIMEOUT_FOR_TEST,
)
.await;

// Simulate time has passed so that transaction sync will resend query after it waited for
// new header
tokio::time::pause();
tokio::time::advance(WAIT_PERIOD_FOR_NEW_DATA).await;
tokio::time::resume();

for start_block_number in
(0..HEADER_QUERY_LENGTH).step_by(TRANSACTION_QUERY_LENGTH.try_into().unwrap())
for (transaction, (transaction_output, transaction_hash)) in transactions
.iter()
.cloned()
.zip(transaction_outputs.iter().cloned().zip(transaction_hashes.iter().cloned()))
{
let num_blocks_in_query =
min(TRANSACTION_QUERY_LENGTH, HEADER_QUERY_LENGTH - start_block_number);

// Receive query and validate it.
let mut mock_transaction_responses_manager =
mock_transaction_response_manager.next().await.unwrap();
assert_eq!(
*mock_transaction_responses_manager.query(),
Ok(TransactionQuery(Query {
start_block: BlockHashOrNumber::Number(BlockNumber(start_block_number)),
direction: Direction::Forward,
limit: num_blocks_in_query,
step: 1,
})),
"If the limit of the query is too low, try to increase \
SLEEP_DURATION_TO_LET_SYNC_ADVANCE",
);

for block_number in start_block_number..(start_block_number + num_blocks_in_query) {
let start_transaction_number = block_number * NUM_TRANSACTIONS_PER_BLOCK;
for transaction_number in start_transaction_number
..(start_transaction_number + NUM_TRANSACTIONS_PER_BLOCK)
{
let transaction_idx = usize::try_from(transaction_number).unwrap();
let transaction = transactions[transaction_idx].clone();
let transaction_output = transaction_outputs[transaction_idx].clone();
let transaction_hash = transaction_hashes[transaction_idx];

mock_transaction_responses_manager
.send_response(DataOrFin(Some(FullTransaction {
transaction,
transaction_output,
transaction_hash,
})))
.await
.unwrap();
// Check that before the last transaction was sent, the transactions aren't written.
actions.push(Action::CheckStorage(Box::new(move |reader| {
async move {
assert_eq!(
u64::try_from(i).unwrap(),
reader.begin_ro_txn().unwrap().get_body_marker().unwrap().0
);
}
.boxed()
})));

actions.push(Action::SendTransaction(DataOrFin(Some(FullTransaction {
transaction,
transaction_output,
transaction_hash,
}))));
}

// Check responses were written to the storage. This way we make sure that the sync
// writes to the storage each response it receives before all query responses were
// sent.
let block_number = BlockNumber(block_number);
// Check that a block's transactions are written before the entire query finished.
actions.push(Action::CheckStorage(Box::new(move |reader| {
async move {
let block_number = BlockNumber(i.try_into().unwrap());
wait_for_marker(
DataType::Transaction,
&storage_reader,
&reader,
block_number.unchecked_next(),
SLEEP_DURATION_TO_LET_SYNC_ADVANCE,
TIMEOUT_FOR_TEST,
)
.await;

let txn = storage_reader.begin_ro_txn().unwrap();

// TODO: Verify that the transaction outputs are equal aswell. currently is buggy.
let storage_transactions =
let txn = reader.begin_ro_txn().unwrap();
let actual_transactions =
txn.get_block_transactions(block_number).unwrap().unwrap();
let storage_transaction_hashes =
// TODO(alonl): Uncomment this once we fix protobuf conversion for receipt
// builtins.
// let actual_transaction_outputs =
// txn.get_block_transaction_outputs(block_number).unwrap().unwrap();
let actual_transaction_hashes =
txn.get_block_transaction_hashes(block_number).unwrap().unwrap();
for i in 0..NUM_TRANSACTIONS_PER_BLOCK {
let idx: usize = usize::try_from(i + start_transaction_number).unwrap();
assert_eq!(
storage_transactions[usize::try_from(i).unwrap()],
transactions[idx]
);
assert_eq!(
storage_transaction_hashes[usize::try_from(i).unwrap()],
transaction_hashes[idx]
);
}
assert_eq!(actual_transactions, transactions);
// TODO(alonl): Uncomment this once we fix protobuf conversion for receipt
// builtins.
// assert_eq!(actual_transaction_outputs, transaction_outputs);
assert_eq!(actual_transaction_hashes, transaction_hashes);
}
.boxed()
})));

mock_transaction_responses_manager.send_response(DataOrFin(None)).await.unwrap();
if (i + 1) % TRANSACTION_QUERY_LENGTH == 0 || i + 1 == NUM_BLOCKS {
actions.push(Action::SendTransaction(DataOrFin(None)));
}
};

tokio::select! {
sync_result = p2p_sync.run() => {
sync_result.unwrap();
panic!("P2P sync aborted with no failure.");
}
_ = parse_queries_future => {}
}

run_test(
HashMap::from([
(DataType::Header, NUM_BLOCKS),
(DataType::Transaction, TRANSACTION_QUERY_LENGTH),
]),
actions,
)
.await;
}

0 comments on commit d133537

Please sign in to comment.