Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(papyrus_p2p_sync): use run_test in transaction test #3260

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/blockifier/cairo_native
Submodule cairo_native updated 109 files
1 change: 0 additions & 1 deletion crates/papyrus_p2p_sync/src/client/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ pub enum Action {
SendStateDiff(DataOrFin<StateDiffChunk>),
/// Send a transaction as a response to a query we got from ReceiveQuery. Will panic if didn't
/// call ReceiveQuery with DataType::Transaction before.
#[allow(dead_code)]
SendTransaction(DataOrFin<FullTransaction>),
/// Send a class as a response to a query we got from ReceiveQuery. Will panic if didn't
/// call ReceiveQuery with DataType::Class before.
Expand Down
267 changes: 116 additions & 151 deletions crates/papyrus_p2p_sync/src/client/transaction_test.rs
Original file line number Diff line number Diff line change
@@ -1,181 +1,146 @@
use std::cmp::min;
use std::collections::HashMap;

use futures::{FutureExt, StreamExt};
use papyrus_protobuf::sync::{
BlockHashOrNumber,
DataOrFin,
Direction,
Query,
SignedBlockHeader,
TransactionQuery,
};
use futures::FutureExt;
use papyrus_protobuf::sync::{BlockHashOrNumber, DataOrFin, Direction, Query};
use papyrus_storage::body::BodyStorageReader;
use papyrus_test_utils::get_test_body;
use starknet_api::block::{BlockBody, BlockHeader, BlockHeaderWithoutHash, BlockNumber};
use starknet_api::transaction::FullTransaction;
use papyrus_test_utils::{get_rng, get_test_body};
use starknet_api::block::{BlockBody, BlockNumber};
use starknet_api::transaction::{FullTransaction, TransactionHash};

use super::test_utils::{
create_block_hashes_and_signatures,
setup,
TestArgs,
HEADER_QUERY_LENGTH,
random_header,
run_test,
wait_for_marker,
Action,
DataType,
SLEEP_DURATION_TO_LET_SYNC_ADVANCE,
TRANSACTION_QUERY_LENGTH,
WAIT_PERIOD_FOR_NEW_DATA,
TIMEOUT_FOR_TEST,
};
use crate::client::test_utils::{wait_for_marker, DataType, TIMEOUT_FOR_TEST};

#[tokio::test]
async fn transaction_basic_flow() {
let TestArgs {
p2p_sync,
storage_reader,
mut mock_header_response_manager,
mut mock_transaction_response_manager,
// The test will fail if we drop these
mock_state_diff_response_manager: _mock_state_diff_response_manager,
mock_class_response_manager: _mock_class_responses_manager,
..
} = setup();

const NUM_TRANSACTIONS_PER_BLOCK: u64 = 6;
let block_hashes_and_signatures =
create_block_hashes_and_signatures(HEADER_QUERY_LENGTH.try_into().unwrap());
let BlockBody { transactions, transaction_outputs, transaction_hashes } = get_test_body(
(NUM_TRANSACTIONS_PER_BLOCK * HEADER_QUERY_LENGTH).try_into().unwrap(),
None,
None,
None,
);

// Create a future that will receive queries, send responses and validate the results.
let parse_queries_future = async move {
// We wait for the state diff sync to see that there are no headers and start sleeping
tokio::time::sleep(SLEEP_DURATION_TO_LET_SYNC_ADVANCE).await;

// Check that before we send headers there is no transaction query.
assert!(mock_transaction_response_manager.next().now_or_never().is_none());
let mut mock_header_responses_manager = mock_header_response_manager.next().await.unwrap();

// Send headers for entire query.
for (i, (block_hash, block_signature)) in block_hashes_and_signatures.iter().enumerate() {
// Send responses
mock_header_responses_manager
.send_response(DataOrFin(Some(SignedBlockHeader {
block_header: BlockHeader {
block_hash: *block_hash,
block_header_without_hash: BlockHeaderWithoutHash {
block_number: BlockNumber(i.try_into().unwrap()),
..Default::default()
},
n_transactions: NUM_TRANSACTIONS_PER_BLOCK.try_into().unwrap(),
state_diff_length: Some(0),
..Default::default()
},
signatures: vec![*block_signature],
})))
.await
.unwrap();
const NUM_BLOCKS: u64 = 5;
const TRANSACTION_QUERY_LENGTH: u64 = 2;

let mut rng = get_rng();

let block_bodies = (0..NUM_BLOCKS)
// TODO(shahak): remove Some(0) once we separate events from transactions correctly.
.map(|i| {
let mut body = get_test_body(i.try_into().unwrap(), Some(0), None, None);
// get_test_body returns transaction hash in the range 0..num_transactions. We want to
// avoid collisions in transaction hash.
for transaction_hash in &mut body.transaction_hashes {
*transaction_hash = TransactionHash(transaction_hash.0 + NUM_BLOCKS * i);
}
body
})
.collect::<Vec<_>>();

let mut actions = vec![
Action::RunP2pSync,
// We already validate the header query content in other tests.
Action::ReceiveQuery(Box::new(|_query| ()), DataType::Header),
];

// Send headers with corresponding transaction length
for (i, block_body) in block_bodies.iter().enumerate() {
actions.push(Action::SendHeader(DataOrFin(Some(random_header(
&mut rng,
BlockNumber(i.try_into().unwrap()),
None,
Some(block_body.transactions.len()),
)))));
}
actions.push(Action::SendHeader(DataOrFin(None)));

// Send transactions for each block and then validate they were written
for (i, BlockBody { transactions, transaction_outputs, transaction_hashes }) in
block_bodies.into_iter().enumerate()
{
let i = u64::try_from(i).unwrap();
// If this block starts a new transaction query, receive the new query.
if i % TRANSACTION_QUERY_LENGTH == 0 {
let limit = min(TRANSACTION_QUERY_LENGTH, NUM_BLOCKS - i);
actions.push(Action::ReceiveQuery(
Box::new(move |query| {
assert_eq!(
query,
Query {
start_block: BlockHashOrNumber::Number(BlockNumber(i)),
direction: Direction::Forward,
limit,
step: 1,
}
)
}),
DataType::Transaction,
));
}

wait_for_marker(
DataType::Header,
&storage_reader,
BlockNumber(HEADER_QUERY_LENGTH),
SLEEP_DURATION_TO_LET_SYNC_ADVANCE,
TIMEOUT_FOR_TEST,
)
.await;

// Simulate time has passed so that transaction sync will resend query after it waited for
// new header
tokio::time::pause();
tokio::time::advance(WAIT_PERIOD_FOR_NEW_DATA).await;
tokio::time::resume();

for start_block_number in
(0..HEADER_QUERY_LENGTH).step_by(TRANSACTION_QUERY_LENGTH.try_into().unwrap())
for (transaction, (transaction_output, transaction_hash)) in transactions
.iter()
.cloned()
.zip(transaction_outputs.iter().cloned().zip(transaction_hashes.iter().cloned()))
{
let num_blocks_in_query =
min(TRANSACTION_QUERY_LENGTH, HEADER_QUERY_LENGTH - start_block_number);

// Receive query and validate it.
let mut mock_transaction_responses_manager =
mock_transaction_response_manager.next().await.unwrap();
assert_eq!(
*mock_transaction_responses_manager.query(),
Ok(TransactionQuery(Query {
start_block: BlockHashOrNumber::Number(BlockNumber(start_block_number)),
direction: Direction::Forward,
limit: num_blocks_in_query,
step: 1,
})),
"If the limit of the query is too low, try to increase \
SLEEP_DURATION_TO_LET_SYNC_ADVANCE",
);

for block_number in start_block_number..(start_block_number + num_blocks_in_query) {
let start_transaction_number = block_number * NUM_TRANSACTIONS_PER_BLOCK;
for transaction_number in start_transaction_number
..(start_transaction_number + NUM_TRANSACTIONS_PER_BLOCK)
{
let transaction_idx = usize::try_from(transaction_number).unwrap();
let transaction = transactions[transaction_idx].clone();
let transaction_output = transaction_outputs[transaction_idx].clone();
let transaction_hash = transaction_hashes[transaction_idx];

mock_transaction_responses_manager
.send_response(DataOrFin(Some(FullTransaction {
transaction,
transaction_output,
transaction_hash,
})))
.await
.unwrap();
// Check that before the last transaction was sent, the transactions aren't written.
actions.push(Action::CheckStorage(Box::new(move |reader| {
async move {
assert_eq!(i, reader.begin_ro_txn().unwrap().get_body_marker().unwrap().0);
}
.boxed()
})));

actions.push(Action::SendTransaction(DataOrFin(Some(FullTransaction {
transaction,
transaction_output,
transaction_hash,
}))));
}

// Check responses were written to the storage. This way we make sure that the sync
// writes to the storage each response it receives before all query responses were
// sent.
let block_number = BlockNumber(block_number);
// Check that a block's transactions are written before the entire query finished.
actions.push(Action::CheckStorage(Box::new(move |reader| {
async move {
let block_number = BlockNumber(i);
wait_for_marker(
DataType::Transaction,
&storage_reader,
&reader,
block_number.unchecked_next(),
SLEEP_DURATION_TO_LET_SYNC_ADVANCE,
TIMEOUT_FOR_TEST,
)
.await;

let txn = storage_reader.begin_ro_txn().unwrap();

// TODO: Verify that the transaction outputs are equal aswell. currently is buggy.
let storage_transactions =
let txn = reader.begin_ro_txn().unwrap();
let actual_transactions =
txn.get_block_transactions(block_number).unwrap().unwrap();
let storage_transaction_hashes =
// TODO(alonl): Uncomment this once we fix protobuf conversion for receipt
// builtins.
// let actual_transaction_outputs =
// txn.get_block_transaction_outputs(block_number).unwrap().unwrap();
let actual_transaction_hashes =
txn.get_block_transaction_hashes(block_number).unwrap().unwrap();
for i in 0..NUM_TRANSACTIONS_PER_BLOCK {
let idx: usize = usize::try_from(i + start_transaction_number).unwrap();
assert_eq!(
storage_transactions[usize::try_from(i).unwrap()],
transactions[idx]
);
assert_eq!(
storage_transaction_hashes[usize::try_from(i).unwrap()],
transaction_hashes[idx]
);
}
assert_eq!(actual_transactions, transactions);
// TODO(alonl): Uncomment this once we fix protobuf conversion for receipt
// builtins.
// assert_eq!(actual_transaction_outputs, transaction_outputs);
assert_eq!(actual_transaction_hashes, transaction_hashes);
}
.boxed()
})));

mock_transaction_responses_manager.send_response(DataOrFin(None)).await.unwrap();
if (i + 1) % TRANSACTION_QUERY_LENGTH == 0 || i + 1 == NUM_BLOCKS {
actions.push(Action::SendTransaction(DataOrFin(None)));
}
};

tokio::select! {
sync_result = p2p_sync.run() => {
sync_result.unwrap();
panic!("P2p sync aborted with no failure.");
}
_ = parse_queries_future => {}
}

run_test(
HashMap::from([
(DataType::Header, NUM_BLOCKS),
(DataType::Transaction, TRANSACTION_QUERY_LENGTH),
]),
actions,
)
.await;
}
Loading