diff --git a/crates/blockifier/cairo_native b/crates/blockifier/cairo_native index 76e83965d3..185e94bce3 160000 --- a/crates/blockifier/cairo_native +++ b/crates/blockifier/cairo_native @@ -1 +1 @@ -Subproject commit 76e83965d3bf1252eb6c68200a3accd5fd1ec004 +Subproject commit 185e94bce3e380db988f98d96b4ddcb3e6c044bc diff --git a/crates/papyrus_p2p_sync/src/client/test_utils.rs b/crates/papyrus_p2p_sync/src/client/test_utils.rs index 1d672b1d56..a594d1b921 100644 --- a/crates/papyrus_p2p_sync/src/client/test_utils.rs +++ b/crates/papyrus_p2p_sync/src/client/test_utils.rs @@ -150,7 +150,6 @@ pub enum Action { SendStateDiff(DataOrFin), /// Send a transaction as a response to a query we got from ReceiveQuery. Will panic if didn't /// call ReceiveQuery with DataType::Transaction before. - #[allow(dead_code)] SendTransaction(DataOrFin), /// Send a class as a response to a query we got from ReceiveQuery. Will panic if didn't /// call ReceiveQuery with DataType::Class before. diff --git a/crates/papyrus_p2p_sync/src/client/transaction_test.rs b/crates/papyrus_p2p_sync/src/client/transaction_test.rs index 443cd841cc..ec265ef8d4 100644 --- a/crates/papyrus_p2p_sync/src/client/transaction_test.rs +++ b/crates/papyrus_p2p_sync/src/client/transaction_test.rs @@ -1,181 +1,146 @@ use std::cmp::min; +use std::collections::HashMap; -use futures::{FutureExt, StreamExt}; -use papyrus_protobuf::sync::{ - BlockHashOrNumber, - DataOrFin, - Direction, - Query, - SignedBlockHeader, - TransactionQuery, -}; +use futures::FutureExt; +use papyrus_protobuf::sync::{BlockHashOrNumber, DataOrFin, Direction, Query}; use papyrus_storage::body::BodyStorageReader; -use papyrus_test_utils::get_test_body; -use starknet_api::block::{BlockBody, BlockHeader, BlockHeaderWithoutHash, BlockNumber}; -use starknet_api::transaction::FullTransaction; +use papyrus_test_utils::{get_rng, get_test_body}; +use starknet_api::block::{BlockBody, BlockNumber}; +use starknet_api::transaction::{FullTransaction, TransactionHash}; use super::test_utils::{ - create_block_hashes_and_signatures, - setup, - TestArgs, - HEADER_QUERY_LENGTH, + random_header, + run_test, + wait_for_marker, + Action, + DataType, SLEEP_DURATION_TO_LET_SYNC_ADVANCE, - TRANSACTION_QUERY_LENGTH, - WAIT_PERIOD_FOR_NEW_DATA, + TIMEOUT_FOR_TEST, }; -use crate::client::test_utils::{wait_for_marker, DataType, TIMEOUT_FOR_TEST}; #[tokio::test] async fn transaction_basic_flow() { - let TestArgs { - p2p_sync, - storage_reader, - mut mock_header_response_manager, - mut mock_transaction_response_manager, - // The test will fail if we drop these - mock_state_diff_response_manager: _mock_state_diff_response_manager, - mock_class_response_manager: _mock_class_responses_manager, - .. - } = setup(); - - const NUM_TRANSACTIONS_PER_BLOCK: u64 = 6; - let block_hashes_and_signatures = - create_block_hashes_and_signatures(HEADER_QUERY_LENGTH.try_into().unwrap()); - let BlockBody { transactions, transaction_outputs, transaction_hashes } = get_test_body( - (NUM_TRANSACTIONS_PER_BLOCK * HEADER_QUERY_LENGTH).try_into().unwrap(), - None, - None, - None, - ); - - // Create a future that will receive queries, send responses and validate the results. - let parse_queries_future = async move { - // We wait for the state diff sync to see that there are no headers and start sleeping - tokio::time::sleep(SLEEP_DURATION_TO_LET_SYNC_ADVANCE).await; - - // Check that before we send headers there is no transaction query. - assert!(mock_transaction_response_manager.next().now_or_never().is_none()); - let mut mock_header_responses_manager = mock_header_response_manager.next().await.unwrap(); - - // Send headers for entire query. - for (i, (block_hash, block_signature)) in block_hashes_and_signatures.iter().enumerate() { - // Send responses - mock_header_responses_manager - .send_response(DataOrFin(Some(SignedBlockHeader { - block_header: BlockHeader { - block_hash: *block_hash, - block_header_without_hash: BlockHeaderWithoutHash { - block_number: BlockNumber(i.try_into().unwrap()), - ..Default::default() - }, - n_transactions: NUM_TRANSACTIONS_PER_BLOCK.try_into().unwrap(), - state_diff_length: Some(0), - ..Default::default() - }, - signatures: vec![*block_signature], - }))) - .await - .unwrap(); + const NUM_BLOCKS: u64 = 5; + const TRANSACTION_QUERY_LENGTH: u64 = 2; + + let mut rng = get_rng(); + + let block_bodies = (0..NUM_BLOCKS) + // TODO(shahak): remove Some(0) once we separate events from transactions correctly. + .map(|i| { + let mut body = get_test_body(i.try_into().unwrap(), Some(0), None, None); + // get_test_body returns transaction hash in the range 0..num_transactions. We want to + // avoid collisions in transaction hash. + for transaction_hash in &mut body.transaction_hashes { + *transaction_hash = TransactionHash(transaction_hash.0 + NUM_BLOCKS * i); + } + body + }) + .collect::>(); + + let mut actions = vec![ + Action::RunP2pSync, + // We already validate the header query content in other tests. + Action::ReceiveQuery(Box::new(|_query| ()), DataType::Header), + ]; + + // Send headers with corresponding transaction length + for (i, block_body) in block_bodies.iter().enumerate() { + actions.push(Action::SendHeader(DataOrFin(Some(random_header( + &mut rng, + BlockNumber(i.try_into().unwrap()), + None, + Some(block_body.transactions.len()), + ))))); + } + actions.push(Action::SendHeader(DataOrFin(None))); + + // Send transactions for each block and then validate they were written + for (i, BlockBody { transactions, transaction_outputs, transaction_hashes }) in + block_bodies.into_iter().enumerate() + { + let i = u64::try_from(i).unwrap(); + // If this block starts a new transaction query, receive the new query. + if i % TRANSACTION_QUERY_LENGTH == 0 { + let limit = min(TRANSACTION_QUERY_LENGTH, NUM_BLOCKS - i); + actions.push(Action::ReceiveQuery( + Box::new(move |query| { + assert_eq!( + query, + Query { + start_block: BlockHashOrNumber::Number(BlockNumber(i)), + direction: Direction::Forward, + limit, + step: 1, + } + ) + }), + DataType::Transaction, + )); } - wait_for_marker( - DataType::Header, - &storage_reader, - BlockNumber(HEADER_QUERY_LENGTH), - SLEEP_DURATION_TO_LET_SYNC_ADVANCE, - TIMEOUT_FOR_TEST, - ) - .await; - - // Simulate time has passed so that transaction sync will resend query after it waited for - // new header - tokio::time::pause(); - tokio::time::advance(WAIT_PERIOD_FOR_NEW_DATA).await; - tokio::time::resume(); - - for start_block_number in - (0..HEADER_QUERY_LENGTH).step_by(TRANSACTION_QUERY_LENGTH.try_into().unwrap()) + for (transaction, (transaction_output, transaction_hash)) in transactions + .iter() + .cloned() + .zip(transaction_outputs.iter().cloned().zip(transaction_hashes.iter().cloned())) { - let num_blocks_in_query = - min(TRANSACTION_QUERY_LENGTH, HEADER_QUERY_LENGTH - start_block_number); - - // Receive query and validate it. - let mut mock_transaction_responses_manager = - mock_transaction_response_manager.next().await.unwrap(); - assert_eq!( - *mock_transaction_responses_manager.query(), - Ok(TransactionQuery(Query { - start_block: BlockHashOrNumber::Number(BlockNumber(start_block_number)), - direction: Direction::Forward, - limit: num_blocks_in_query, - step: 1, - })), - "If the limit of the query is too low, try to increase \ - SLEEP_DURATION_TO_LET_SYNC_ADVANCE", - ); - - for block_number in start_block_number..(start_block_number + num_blocks_in_query) { - let start_transaction_number = block_number * NUM_TRANSACTIONS_PER_BLOCK; - for transaction_number in start_transaction_number - ..(start_transaction_number + NUM_TRANSACTIONS_PER_BLOCK) - { - let transaction_idx = usize::try_from(transaction_number).unwrap(); - let transaction = transactions[transaction_idx].clone(); - let transaction_output = transaction_outputs[transaction_idx].clone(); - let transaction_hash = transaction_hashes[transaction_idx]; - - mock_transaction_responses_manager - .send_response(DataOrFin(Some(FullTransaction { - transaction, - transaction_output, - transaction_hash, - }))) - .await - .unwrap(); + // Check that before the last transaction was sent, the transactions aren't written. + actions.push(Action::CheckStorage(Box::new(move |reader| { + async move { + assert_eq!(i, reader.begin_ro_txn().unwrap().get_body_marker().unwrap().0); } + .boxed() + }))); + + actions.push(Action::SendTransaction(DataOrFin(Some(FullTransaction { + transaction, + transaction_output, + transaction_hash, + })))); + } - // Check responses were written to the storage. This way we make sure that the sync - // writes to the storage each response it receives before all query responses were - // sent. - let block_number = BlockNumber(block_number); + // Check that a block's transactions are written before the entire query finished. + actions.push(Action::CheckStorage(Box::new(move |reader| { + async move { + let block_number = BlockNumber(i); wait_for_marker( DataType::Transaction, - &storage_reader, + &reader, block_number.unchecked_next(), SLEEP_DURATION_TO_LET_SYNC_ADVANCE, TIMEOUT_FOR_TEST, ) .await; - let txn = storage_reader.begin_ro_txn().unwrap(); - - // TODO: Verify that the transaction outputs are equal aswell. currently is buggy. - let storage_transactions = + let txn = reader.begin_ro_txn().unwrap(); + let actual_transactions = txn.get_block_transactions(block_number).unwrap().unwrap(); - let storage_transaction_hashes = + // TODO(alonl): Uncomment this once we fix protobuf conversion for receipt + // builtins. + // let actual_transaction_outputs = + // txn.get_block_transaction_outputs(block_number).unwrap().unwrap(); + let actual_transaction_hashes = txn.get_block_transaction_hashes(block_number).unwrap().unwrap(); - for i in 0..NUM_TRANSACTIONS_PER_BLOCK { - let idx: usize = usize::try_from(i + start_transaction_number).unwrap(); - assert_eq!( - storage_transactions[usize::try_from(i).unwrap()], - transactions[idx] - ); - assert_eq!( - storage_transaction_hashes[usize::try_from(i).unwrap()], - transaction_hashes[idx] - ); - } + assert_eq!(actual_transactions, transactions); + // TODO(alonl): Uncomment this once we fix protobuf conversion for receipt + // builtins. + // assert_eq!(actual_transaction_outputs, transaction_outputs); + assert_eq!(actual_transaction_hashes, transaction_hashes); } + .boxed() + }))); - mock_transaction_responses_manager.send_response(DataOrFin(None)).await.unwrap(); + if (i + 1) % TRANSACTION_QUERY_LENGTH == 0 || i + 1 == NUM_BLOCKS { + actions.push(Action::SendTransaction(DataOrFin(None))); } - }; - - tokio::select! { - sync_result = p2p_sync.run() => { - sync_result.unwrap(); - panic!("P2p sync aborted with no failure."); - } - _ = parse_queries_future => {} } + + run_test( + HashMap::from([ + (DataType::Header, NUM_BLOCKS), + (DataType::Transaction, TRANSACTION_QUERY_LENGTH), + ]), + actions, + ) + .await; }