Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: replace dependency on Block with BlockHeader and Chunks on apply chunk #12746

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 90 additions & 55 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use near_chain_primitives::error::{BlockKnownError, Error, LogTransientStorageEr
use near_epoch_manager::shard_tracker::ShardTracker;
use near_epoch_manager::EpochManagerAdapter;
use near_primitives::bandwidth_scheduler::BandwidthRequests;
use near_primitives::block::{genesis_chunks, Block, BlockValidityError, Tip};
use near_primitives::block::{genesis_chunks, Block, BlockValidityError, Chunks, Tip};
use near_primitives::block_header::BlockHeader;
use near_primitives::challenge::{
BlockDoubleSign, Challenge, ChallengeBody, ChallengesResult, ChunkProofs, ChunkState,
Expand Down Expand Up @@ -3192,12 +3192,13 @@ impl Chain {
/// Doesn't require state.
fn validate_chunk_transactions(
&self,
block: &Block,
block_metadata: &BlockHeader,
chunks: &Chunks,
prev_block_header: &BlockHeader,
chunk: &ShardChunk,
) -> Result<Vec<bool>, Error> {
let protocol_version =
self.epoch_manager.get_epoch_protocol_version(block.header().epoch_id())?;
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(prev_block_header.hash())?;
let protocol_version = self.epoch_manager.get_epoch_protocol_version(&epoch_id)?;
let relaxed_chunk_validation = checked_feature!(
"protocol_feature_relaxed_chunk_validation",
RelaxedChunkValidation,
Expand All @@ -3206,15 +3207,13 @@ impl Chain {

if !relaxed_chunk_validation {
if !validate_transactions_order(chunk.transactions()) {
let merkle_paths =
Block::compute_chunk_headers_root(block.chunks().iter_deprecated()).1;
let epoch_id = block.header().epoch_id();
let merkle_paths = Block::compute_chunk_headers_root(chunks.iter_deprecated()).1;
let shard_layout = self.epoch_manager.get_shard_layout(&epoch_id)?;
let shard_id = chunk.shard_id();
let shard_index = shard_layout.get_shard_index(shard_id)?;

let chunk_proof = ChunkProofs {
block_header: borsh::to_vec(&block.header()).expect("Failed to serialize"),
block_header: borsh::to_vec(&block_metadata).expect("Failed to serialize"),
merkle_proof: merkle_paths[shard_index].clone(),
chunk: MaybeEncodedShardChunk::Decoded(chunk.clone()).into(),
};
Expand All @@ -3238,34 +3237,55 @@ impl Chain {

/// For a given previous block header and current block, return information
/// about block necessary for processing shard update.
pub fn get_apply_chunk_block_context(
epoch_manager: &dyn EpochManagerAdapter,
block: &Block,
pub fn get_apply_chunk_block_context_from_block_metadata(
block_metadata: &BlockHeader,
chunks: &Chunks,
prev_block_header: &BlockHeader,
is_new_chunk: bool,
protocol_version: ProtocolVersion,
) -> Result<ApplyChunkBlockContext, Error> {
let block_header = &block.header();
let epoch_id = block_header.epoch_id();
let protocol_version = epoch_manager.get_epoch_protocol_version(epoch_id)?;
// Before `FixApplyChunks` feature, gas price was taken from current
// block by mistake. Preserve it for backwards compatibility.
let gas_price = if !is_new_chunk
&& protocol_version < ProtocolFeature::FixApplyChunks.protocol_version()
let gas_price = if protocol_version >= ProtocolFeature::FixApplyChunks.protocol_version()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let gas_price = if protocol_version >= ProtocolFeature::FixApplyChunks.protocol_version()
let gas_price = if ProtocolFeature::FixApplyChunks.enabled(protocol_version)

|| is_new_chunk
{
block_header.next_gas_price()
} else {
prev_block_header.next_gas_price()
} else {
// TODO(#10584): next_gas_price should be Some() if derived from
// Block and None if derived from OptimisticBlock. Attempt to take
// next_gas_price since OptimisticBlock enabled must fail.
block_metadata.next_gas_price()
};
let congestion_info = block.block_congestion_info();

let congestion_info = chunks.block_congestion_info();
let bandwidth_requests = chunks.block_bandwidth_requests();

// TODO(#10584): add method from_metadata.
Ok(ApplyChunkBlockContext::from_header(
block_header,
block_metadata,
gas_price,
congestion_info,
block.block_bandwidth_requests(),
bandwidth_requests,
))
}

pub fn get_apply_chunk_block_context(
epoch_manager: &dyn EpochManagerAdapter,
block: &Block,
prev_block_header: &BlockHeader,
is_new_chunk: bool,
) -> Result<ApplyChunkBlockContext, Error> {
let epoch_id = block.header().epoch_id();
let protocol_version = epoch_manager.get_epoch_protocol_version(epoch_id)?;
Self::get_apply_chunk_block_context_from_block_metadata(
block.header(),
&block.chunks(),
prev_block_header,
is_new_chunk,
protocol_version,
)
}

fn block_catch_up_postprocess(
&mut self,
me: &Option<AccountId>,
Expand Down Expand Up @@ -3525,16 +3545,17 @@ impl Chain {
pub fn create_chunk_state_challenge(
&self,
prev_block: &Block,
block: &Block,
block_metadata: &BlockHeader,
chunks: &Chunks,
chunk_header: &ShardChunkHeader,
) -> Result<ChunkState, Error> {
let epoch_id = block.header().epoch_id();
let shard_layout = self.epoch_manager.get_shard_layout(&epoch_id)?;
let shard_layout =
self.epoch_manager.get_shard_layout_from_prev_block(prev_block.hash())?;
let shard_id = chunk_header.shard_id();
let shard_index = shard_layout.get_shard_index(shard_id)?;
let prev_merkle_proofs =
Block::compute_chunk_headers_root(prev_block.chunks().iter_deprecated()).1;
let merkle_proofs = Block::compute_chunk_headers_root(block.chunks().iter_deprecated()).1;
let merkle_proofs = Block::compute_chunk_headers_root(chunks.iter_deprecated()).1;
let prev_chunk =
self.get_chunk_clone_from_header(&prev_block.chunks()[shard_index].clone()).unwrap();

Expand Down Expand Up @@ -3586,7 +3607,7 @@ impl Chain {
// let partial_state = apply_result.proof.unwrap().nodes;
Ok(ChunkState {
prev_block_header: borsh::to_vec(&prev_block.header())?,
block_header: borsh::to_vec(&block.header())?,
block_header: borsh::to_vec(&block_metadata)?,
prev_merkle_proof: prev_merkle_proofs[shard_index].clone(),
merkle_proof: merkle_proofs[shard_index].clone(),
prev_chunk,
Expand Down Expand Up @@ -3646,23 +3667,23 @@ impl Chain {
let shard_layout = self.epoch_manager.get_shard_layout(&epoch_id)?;

let mut maybe_jobs = vec![];
for (shard_index, (chunk_header, prev_chunk_header)) in
block.chunks().iter_deprecated().zip(prev_chunk_headers.iter()).enumerate()
{
let chunk_headers = &block.chunks();
for (shard_index, prev_chunk_header) in prev_chunk_headers.iter().enumerate() {
// XXX: This is a bit questionable -- sandbox state patching works
// only for a single shard. This so far has been enough.
let state_patch = state_patch.take();
let shard_id = shard_layout.get_shard_id(shard_index)?;

let incoming_receipts = incoming_receipts.get(&shard_id);
let storage_context =
StorageContext { storage_data_source: StorageDataSource::Db, state_patch };

let stateful_job = self.get_update_shard_job(
me,
block,
block.header(),
chunk_headers,
shard_index,
prev_block,
chunk_header,
prev_chunk_header,
shard_id,
mode,
incoming_receipts,
storage_context,
Expand Down Expand Up @@ -3699,12 +3720,11 @@ impl Chain {
fn get_shard_context(
&self,
me: &Option<AccountId>,
block_header: &BlockHeader,
prev_hash: &CryptoHash,
epoch_id: &EpochId,
shard_id: ShardId,
mode: ApplyChunksMode,
) -> Result<ShardContext, Error> {
let prev_hash = block_header.prev_hash();
let epoch_id = block_header.epoch_id();
let cares_about_shard_this_epoch =
self.shard_tracker.care_about_shard(me.as_ref(), prev_hash, shard_id, true);
let cares_about_shard_next_epoch =
Expand All @@ -3722,30 +3742,38 @@ impl Chain {
fn get_update_shard_job(
&self,
me: &Option<AccountId>,
block: &Block,
// TODO(#10584): introduce separate structure which can be derived from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be an enum that either has Block or OptimisticBlock.

// both Block and OptimisticBlock.
block_metadata: &BlockHeader,
chunk_headers: &Chunks,
shard_index: ShardIndex,
prev_block: &Block,
chunk_header: &ShardChunkHeader,
prev_chunk_header: &ShardChunkHeader,
shard_id: ShardId,
mode: ApplyChunksMode,
incoming_receipts: &HashMap<ShardId, Vec<ReceiptProof>>,
incoming_receipts: Option<&Vec<ReceiptProof>>,
storage_context: StorageContext,
) -> Result<Option<UpdateShardJob>, Error> {
let _span = tracing::debug_span!(target: "chain", "get_update_shard_job").entered();
let prev_hash = block.header().prev_hash();
let shard_context = self.get_shard_context(me, block.header(), shard_id, mode)?;
let prev_hash = prev_block.hash();
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(prev_hash)?;
let shard_layout = self.epoch_manager.get_shard_layout(&epoch_id)?;
let shard_id = shard_layout.get_shard_id(shard_index)?;
let chunk_header = &chunk_headers[shard_index];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Chunks struct, they say that Index trait is deprecated and recommends using .get() instead. Just to align with the new resharding implementattion.

Suggested change
let chunk_header = &chunk_headers[shard_index];
let chunk_header = &chunk_headers.get(shard_index);

let shard_context = self.get_shard_context(me, prev_hash, &epoch_id, shard_id, mode)?;
let block_height = block_metadata.height();
let is_new_chunk = chunk_header.is_new_chunk(block_height);

let is_new_chunk = chunk_header.is_new_chunk(block.header().height());
let shard_update_reason = if shard_context.should_apply_chunk {
let block_context = Self::get_apply_chunk_block_context(
self.epoch_manager.as_ref(),
&block,
let protocol_version = self.epoch_manager.get_epoch_protocol_version(&epoch_id)?;
let block_context = Self::get_apply_chunk_block_context_from_block_metadata(
block_metadata,
&chunk_headers,
prev_block.header(),
is_new_chunk,
protocol_version,
)?;
if is_new_chunk {
// Validate new chunk and collect incoming receipts for it.

let prev_chunk_extra = self.get_chunk_extra(prev_hash, &shard_context.shard_uid)?;
let chunk = self.get_chunk_clone_from_header(&chunk_header)?;
let prev_chunk_height_included = prev_chunk_header.height_included();
Expand All @@ -3766,28 +3794,35 @@ impl Chain {
target: "chain",
?err,
prev_block_hash=?prev_hash,
block_hash=?block.header().hash(),
block_height,
?shard_id,
prev_chunk_height_included,
?prev_chunk_extra,
?chunk_header,
"Failed to validate chunk extra"
);
byzantine_assert!(false);
match self.create_chunk_state_challenge(prev_block, block, &chunk_header) {
match self.create_chunk_state_challenge(
prev_block,
block_metadata,
chunk_headers,
&chunk_header,
) {
Ok(chunk_state) => Error::InvalidChunkState(Box::new(chunk_state)),
Err(err) => err,
}
})?;

let tx_valid_list =
self.validate_chunk_transactions(&block, prev_block.header(), &chunk)?;
let tx_valid_list = self.validate_chunk_transactions(
&block_metadata,
&chunk_headers,
prev_block.header(),
&chunk,
)?;

// we can't use hash from the current block here yet because the incoming receipts
// for this block is not stored yet
let new_receipts = collect_receipts(incoming_receipts.get(&shard_id).unwrap());
let shard_layout =
self.epoch_manager.get_shard_layout(&block.header().epoch_id())?;
let new_receipts = collect_receipts(incoming_receipts.unwrap());
let old_receipts = &self.chain_store().get_incoming_receipts_for_shard(
self.epoch_manager.as_ref(),
shard_id,
Expand All @@ -3807,7 +3842,7 @@ impl Chain {
check_if_block_is_first_with_chunk_of_version(
self.chain_store(),
self.epoch_manager.as_ref(),
block.header().prev_hash(),
prev_hash,
shard_id,
)?;

Expand Down
82 changes: 45 additions & 37 deletions core/primitives/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,46 +660,11 @@ impl Block {
}

pub fn block_congestion_info(&self) -> BlockCongestionInfo {
let mut result = BTreeMap::new();

for chunk in self.chunks().iter_deprecated() {
let shard_id = chunk.shard_id();

if let Some(congestion_info) = chunk.congestion_info() {
let height_included = chunk.height_included();
let height_current = self.header().height();
let missed_chunks_count = height_current.checked_sub(height_included);
let missed_chunks_count = missed_chunks_count
.expect("The chunk height included must be less or equal than block height!");

let extended_congestion_info =
ExtendedCongestionInfo::new(congestion_info, missed_chunks_count);
result.insert(shard_id, extended_congestion_info);
}
}
BlockCongestionInfo::new(result)
self.chunks().block_congestion_info()
}

pub fn block_bandwidth_requests(&self) -> BlockBandwidthRequests {
let mut result = BTreeMap::new();

for chunk in self.chunks().iter() {
// It's okay to take bandwidth requests from a missing chunk,
// the chunk was missing so it didn't send anything and still
// wants to send out the same receipts.
let chunk = match chunk {
MaybeNew::New(new_chunk) => new_chunk,
MaybeNew::Old(missing_chunk) => missing_chunk,
};

let shard_id = chunk.shard_id();

if let Some(bandwidth_requests) = chunk.bandwidth_requests() {
result.insert(shard_id, bandwidth_requests.clone());
}
}

BlockBandwidthRequests { shards_bandwidth_requests: result }
self.chunks().block_bandwidth_requests()
}

pub fn hash(&self) -> &CryptoHash {
Expand Down Expand Up @@ -862,6 +827,49 @@ impl<'a> Chunks<'a> {
ChunksCollection::V2(chunks) => chunks.get(index),
}
}

pub fn block_congestion_info(&self) -> BlockCongestionInfo {
let mut result = BTreeMap::new();

for chunk in self.iter_deprecated() {
let shard_id = chunk.shard_id();

if let Some(congestion_info) = chunk.congestion_info() {
let height_included = chunk.height_included();
let height_current = self.block_height;
let missed_chunks_count = height_current.checked_sub(height_included);
let missed_chunks_count = missed_chunks_count
.expect("The chunk height included must be less or equal than block height!");

let extended_congestion_info =
ExtendedCongestionInfo::new(congestion_info, missed_chunks_count);
result.insert(shard_id, extended_congestion_info);
}
}
BlockCongestionInfo::new(result)
}

pub fn block_bandwidth_requests(&self) -> BlockBandwidthRequests {
let mut result = BTreeMap::new();

for chunk in self.iter() {
// It's okay to take bandwidth requests from a missing chunk,
// the chunk was missing so it didn't send anything and still
// wants to send out the same receipts.
let chunk = match chunk {
MaybeNew::New(new_chunk) => new_chunk,
MaybeNew::Old(missing_chunk) => missing_chunk,
};

let shard_id = chunk.shard_id();

if let Some(bandwidth_requests) = chunk.bandwidth_requests() {
result.insert(shard_id, bandwidth_requests.clone());
}
}

BlockBandwidthRequests { shards_bandwidth_requests: result }
}
}

/// The tip of a fork. A handle to the fork ancestry from its leaf in the
Expand Down
Loading
Loading