Skip to content

Commit

Permalink
feat: implement post-state-root chunk production (#9537)
Browse files Browse the repository at this point in the history
This PR adds chunk production with post-state-root. This means applying
transactions and receipts before producing the chunk and including the
necessary fields in the header.

The code is not used anywhere yet, that will be implemented as a
separate PR. The issues marked with `TODO(post-state-root)` will be
resolved separately.

Part of #9450.
  • Loading branch information
pugachAG authored Oct 3, 2023
1 parent 5d4a7f3 commit a27075f
Show file tree
Hide file tree
Showing 4 changed files with 303 additions and 31 deletions.
70 changes: 70 additions & 0 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use near_primitives::state_sync::{
use near_primitives::static_clock::StaticClock;
use near_primitives::transaction::{ExecutionOutcomeWithIdAndProof, SignedTransaction};
use near_primitives::types::chunk_extra::ChunkExtra;
use near_primitives::types::validator_stake::ValidatorStakeIter;
use near_primitives::types::{
AccountId, Balance, BlockExtra, BlockHeight, BlockHeightDelta, EpochId, Gas, MerkleHash,
NumBlocks, NumShards, ShardId, StateChangesForSplitStates, StateRoot,
Expand Down Expand Up @@ -831,6 +832,75 @@ impl Chain {
Ok(())
}

pub fn apply_chunk_for_post_state_root(
&self,
shard_id: ShardId,
prev_state_root: StateRoot,
block_height: BlockHeight,
prev_block: &Block,
transactions: &[SignedTransaction],
last_validator_proposals: ValidatorStakeIter,
gas_limit: Gas,
last_chunk_height_included: BlockHeight,
) -> Result<ApplyTransactionResult, Error> {
let prev_block_hash = prev_block.hash();
let is_first_block_with_chunk_of_version = check_if_block_is_first_with_chunk_of_version(
self.store(),
self.epoch_manager.as_ref(),
prev_block_hash,
shard_id,
)?;
// TODO(post-state-root):
// This misses outgoing receipts from the last non-post-state-root block B.
// Before post-state-root incoming receipts store receipts that are supposed to be applied
// in this block, which corresponds to the outgoing receipts from the previous block.
// After post-state-root incoming receipts store receipts that are the result of executing
// that block, which corresponds to the outgoing receipts from the current block.
// So considering which outgoing receipts correspond to the incoming receipts for the blocks:
// * ...
// * pre-state-root block B-1: outgoing B-2 -> incoming B-1
// * pre-state-root block B: outgoing B-1 -> incoming B
// * post-state-root block B+1: outgoing B+1 -> incoming B+1
// * post-state-root block B+2: outgoing B+2 -> incoming B+2
// * ...
// We can see that outgoing receipts of block B are not stored anywhere in the incoming receipts.
// These receipts can be obtained from the db using get_outgoing_receipts_for_shard since we
// currently track all shard. This will be implemented later along with an intergation test
// to reproduce the issue.
let receipts =
collect_receipts_from_response(&self.store.get_incoming_receipts_for_shard(
self.epoch_manager.as_ref(),
shard_id,
*prev_block_hash,
last_chunk_height_included,
)?);
// TODO(post-state-root): block-level fields, take values from the previous block for now
let block_timestamp = prev_block.header().raw_timestamp();
let block_hash = prev_block_hash;
let random_seed = *prev_block.header().random_value();
let gas_price = prev_block.header().gas_price();

self.runtime_adapter.apply_transactions(
shard_id,
&prev_state_root,
block_height,
block_timestamp,
prev_block_hash,
&block_hash,
&receipts,
transactions,
last_validator_proposals,
gas_price,
gas_limit,
&vec![],
random_seed,
true,
is_first_block_with_chunk_of_version,
Default::default(),
true,
)
}

pub fn save_orphan(
&mut self,
block: MaybeValidated<Block>,
Expand Down
245 changes: 214 additions & 31 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use near_chain::flat_storage_creator::FlatStorageCreator;
use near_chain::resharding::StateSplitRequest;
use near_chain::state_snapshot_actor::MakeSnapshotCallback;
use near_chain::test_utils::format_hash;
use near_chain::types::ApplyTransactionResult;
use near_chain::types::RuntimeAdapter;
use near_chain::types::{ChainConfig, LatestKnown};
use near_chain::{
Expand Down Expand Up @@ -55,14 +56,21 @@ use near_primitives::hash::CryptoHash;
use near_primitives::merkle::{merklize, MerklePath, PartialMerkleTree};
use near_primitives::network::PeerId;
use near_primitives::receipt::Receipt;
use near_primitives::sharding::shard_chunk_header_inner::ShardChunkHeaderInnerV3;
use near_primitives::sharding::EncodedShardChunkBody;
use near_primitives::sharding::EncodedShardChunkV2;
use near_primitives::sharding::ShardChunkHeaderInner;
use near_primitives::sharding::ShardChunkHeaderV3;
use near_primitives::sharding::StateSyncInfo;
use near_primitives::sharding::{
ChunkHash, EncodedShardChunk, PartialEncodedChunk, ReedSolomonWrapper, ShardChunk,
ShardChunkHeader, ShardInfo,
};
use near_primitives::static_clock::StaticClock;
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::chunk_extra::ChunkExtra;
use near_primitives::types::validator_stake::ValidatorStakeIter;
use near_primitives::types::Gas;
use near_primitives::types::StateRoot;
use near_primitives::types::{AccountId, ApprovalStake, BlockHeight, EpochId, NumBlocks, ShardId};
use near_primitives::unwrap_or_return;
use near_primitives::utils::MaybeValidated;
Expand Down Expand Up @@ -801,15 +809,48 @@ impl Client {
validator_signer.validator_id()
);

let ret = self.produce_pre_state_root_chunk(
validator_signer.as_ref(),
prev_block_hash,
epoch_id,
last_header,
next_height,
shard_id,
)?;

metrics::CHUNK_PRODUCED_TOTAL.inc();
self.chunk_production_info.put(
(next_height, shard_id),
ChunkProduction {
chunk_production_time: Some(StaticClock::utc()),
chunk_production_duration_millis: Some(timer.elapsed().as_millis() as u64),
},
);
Ok(Some(ret))
}

fn produce_pre_state_root_chunk(
&mut self,
validator_signer: &dyn ValidatorSigner,
prev_block_hash: CryptoHash,
epoch_id: &EpochId,
last_header: ShardChunkHeader,
next_height: BlockHeight,
shard_id: ShardId,
) -> Result<(EncodedShardChunk, Vec<MerklePath>, Vec<Receipt>), Error> {
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?;
let chunk_extra = self
.chain
.get_chunk_extra(&prev_block_hash, &shard_uid)
.map_err(|err| Error::ChunkProducer(format!("No chunk extra available: {}", err)))?;

let prev_block_header = self.chain.get_block_header(&prev_block_hash)?;
let transactions =
self.prepare_transactions(shard_uid, &chunk_extra, &prev_block_header)?;
let transactions = self.prepare_transactions(
shard_uid,
chunk_extra.gas_limit(),
*chunk_extra.state_root(),
&prev_block_header,
)?;
let transactions = transactions;
#[cfg(feature = "test_features")]
let transactions = Self::maybe_insert_invalid_transaction(
Expand All @@ -825,23 +866,7 @@ impl Client {
last_header.height_included(),
)?;

// Receipts proofs root is calculating here
//
// For each subset of incoming_receipts_into_shard_i_from_the_current_one
// we calculate hash here and save it
// and then hash all of them into a single receipts root
//
// We check validity in two ways:
// 1. someone who cares about shard will download all the receipts
// and checks that receipts_root equals to all receipts hashed
// 2. anyone who just asks for one's incoming receipts
// will receive a piece of incoming receipts only
// with merkle receipts proofs which can be checked locally
let shard_layout = self.epoch_manager.get_shard_layout(epoch_id)?;
let outgoing_receipts_hashes =
Chain::build_receipts_hashes(&outgoing_receipts, &shard_layout);
let (outgoing_receipts_root, _) = merklize(&outgoing_receipts_hashes);

let outgoing_receipts_root = self.calculate_receipts_root(epoch_id, &outgoing_receipts)?;
let protocol_version = self.epoch_manager.get_epoch_protocol_version(epoch_id)?;
let gas_used = chunk_extra.gas_used();
#[cfg(feature = "test_features")]
Expand Down Expand Up @@ -875,15 +900,172 @@ impl Client {
outgoing_receipts.len(),
);

metrics::CHUNK_PRODUCED_TOTAL.inc();
self.chunk_production_info.put(
(next_height, shard_id),
ChunkProduction {
chunk_production_time: Some(StaticClock::utc()),
chunk_production_duration_millis: Some(timer.elapsed().as_millis() as u64),
},
Ok((encoded_chunk, merkle_paths, outgoing_receipts))
}

#[allow(dead_code)]
fn produce_post_state_root_chunk(
&mut self,
validator_signer: &dyn ValidatorSigner,
prev_block_hash: CryptoHash,
epoch_id: &EpochId,
last_header: ShardChunkHeader,
next_height: BlockHeight,
shard_id: ShardId,
) -> Result<(EncodedShardChunk, Vec<MerklePath>, Vec<Receipt>), Error> {
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?;
let prev_block = self.chain.get_block(&prev_block_hash)?;
let prev_block_header = prev_block.header();
let gas_limit;
let prev_gas_used;
let prev_state_root;
let prev_validator_proposals;
let prev_outcome_root;
let prev_balance_burnt;
let prev_outgoing_receipts_root;
match &last_header {
ShardChunkHeader::V3(ShardChunkHeaderV3 {
inner: ShardChunkHeaderInner::V3(last_header_inner),
..
}) => {
gas_limit = last_header_inner.next_gas_limit;
prev_gas_used = last_header_inner.gas_used;
prev_state_root = last_header_inner.post_state_root;
prev_validator_proposals =
ValidatorStakeIter::new(&last_header_inner.validator_proposals)
.collect::<Vec<_>>();
prev_outcome_root = last_header_inner.outcome_root;
prev_balance_burnt = last_header_inner.balance_burnt;
prev_outgoing_receipts_root = last_header_inner.outgoing_receipts_root;
}
_ => {
let chunk_extra =
self.chain.get_chunk_extra(&prev_block_hash, &shard_uid).map_err(|err| {
Error::ChunkProducer(format!("No chunk extra available: {}", err))
})?;
gas_limit = chunk_extra.gas_limit();
prev_gas_used = chunk_extra.gas_used();
prev_state_root = *chunk_extra.state_root();
prev_validator_proposals = chunk_extra.validator_proposals().collect();
prev_outcome_root = *chunk_extra.outcome_root();
prev_balance_burnt = chunk_extra.balance_burnt();
let prev_outgoing_receipts = self.chain.get_outgoing_receipts_for_shard(
prev_block_hash,
shard_id,
last_header.height_included(),
)?;
prev_outgoing_receipts_root =
self.calculate_receipts_root(epoch_id, &prev_outgoing_receipts)?;
}
}
#[cfg(feature = "test_features")]
let prev_gas_used =
if self.produce_invalid_chunks { prev_gas_used + 1 } else { prev_gas_used };

let transactions =
self.prepare_transactions(shard_uid, gas_limit, prev_state_root, prev_block_header)?;
#[cfg(feature = "test_features")]
let transactions = Self::maybe_insert_invalid_transaction(
transactions,
prev_block_hash,
self.produce_invalid_tx_in_chunks,
);
Ok(Some((encoded_chunk, merkle_paths, outgoing_receipts)))
let num_filtered_transactions = transactions.len();
let (tx_root, _) = merklize(&transactions);

// TODO(post-state-root): applying the chunk can be time consuming, so probably
// we should not block the client thread here.
let apply_result = self.chain.apply_chunk_for_post_state_root(
shard_id,
prev_state_root,
// TODO(post-state-root): block-level field, need to double check if using next_height is correct here
next_height,
&prev_block,
&transactions,
ValidatorStakeIter::new(&prev_validator_proposals),
gas_limit,
last_header.height_included(),
)?;

let (transaction_receipts_parts, encoded_length) =
EncodedShardChunk::encode_transaction_receipts(
&mut self.rs_for_chunk_production,
transactions,
&apply_result.outgoing_receipts,
)
.map_err(|err| Error::Chunk(err.into()))?;
let mut content = EncodedShardChunkBody { parts: transaction_receipts_parts };
content.reconstruct(&mut self.rs_for_chunk_production).unwrap();
let (encoded_merkle_root, merkle_paths) = content.get_merkle_hash_and_paths();

let (outcome_root, _) =
ApplyTransactionResult::compute_outcomes_proof(&apply_result.outcomes);
let header_inner = ShardChunkHeaderInnerV3 {
prev_block_hash,
prev_state_root,
prev_outcome_root,
encoded_merkle_root,
encoded_length,
height_created: next_height,
shard_id,
prev_gas_used,
gas_limit,
prev_balance_burnt,
prev_outgoing_receipts_root,
tx_root,
prev_validator_proposals,
post_state_root: apply_result.new_root,
// Currently we don't change gas limit, also with pre-state-root
next_gas_limit: gas_limit,
gas_used: apply_result.total_gas_burnt,
validator_proposals: apply_result.validator_proposals,
outcome_root,
balance_burnt: apply_result.total_balance_burnt,
outgoing_receipts_root: self
.calculate_receipts_root(epoch_id, &apply_result.outgoing_receipts)?,
};
let header = ShardChunkHeaderV3::from_inner(
ShardChunkHeaderInner::V3(header_inner),
validator_signer,
);
let encoded_chunk = EncodedShardChunk::V2(EncodedShardChunkV2 {
header: ShardChunkHeader::V3(header),
content,
});

debug!(
target: "client",
me=%validator_signer.validator_id(),
chunk_hash=%encoded_chunk.chunk_hash().0,
%prev_block_hash,
"Produced post-state-root chunk with {} txs and {} receipts",
num_filtered_transactions,
apply_result.outgoing_receipts.len(),
);

Ok((encoded_chunk, merkle_paths, apply_result.outgoing_receipts))
}

/// Calculates the root of receipt proofs.
/// All receipts are groupped by receiver_id and hash is calculated
/// for each such group. Then we merkalize these hashes to calculate
/// the receipts root.
///
/// Receipts root is used in the following ways:
/// 1. Someone who cares about shard will download all the receipts
/// and checks if those correspond to receipts_root.
/// 2. Anyone who asks for one's incoming receipts will receive a piece
/// of incoming receipts only with merkle receipts proofs which can
/// be checked locally.
fn calculate_receipts_root(
&self,
epoch_id: &EpochId,
receipts: &[Receipt],
) -> Result<CryptoHash, Error> {
let shard_layout = self.epoch_manager.get_shard_layout(epoch_id)?;
let receipts_hashes = Chain::build_receipts_hashes(&receipts, &shard_layout);
let (receipts_root, _) = merklize(&receipts_hashes);
Ok(receipts_root)
}

#[cfg(feature = "test_features")]
Expand Down Expand Up @@ -911,7 +1093,8 @@ impl Client {
fn prepare_transactions(
&mut self,
shard_uid: ShardUId,
chunk_extra: &ChunkExtra,
gas_limit: Gas,
state_root: StateRoot,
prev_block_header: &BlockHeader,
) -> Result<Vec<SignedTransaction>, Error> {
let Self { chain, sharded_tx_pool, epoch_manager, runtime_adapter: runtime, .. } = self;
Expand All @@ -924,10 +1107,10 @@ impl Client {
let transaction_validity_period = chain.transaction_validity_period;
runtime.prepare_transactions(
prev_block_header.gas_price(),
chunk_extra.gas_limit(),
gas_limit,
&next_epoch_id,
shard_id,
*chunk_extra.state_root(),
state_root,
// while the height of the next block that includes the chunk might not be prev_height + 1,
// passing it will result in a more conservative check and will not accidentally allow
// invalid transactions to be included.
Expand Down
Loading

0 comments on commit a27075f

Please sign in to comment.