Skip to content

Commit

Permalink
feat: add metrics and do validate and commit concurrently
Browse files Browse the repository at this point in the history
chore: block tx metrics

feat: do BlockBody verification concurrently

feat: do the calculation of intermediate root concurrently.

feat: commit the MPTs concurrently

feat: async update difflayer

feat: do state verification concurrently

chore: change validate time metrics

chore: change execute time metrices

fix: add flush mutex for nodebufferlist

fix: miner set expected root hash

test: set expected root

test: change set expected root
  • Loading branch information
joeylichang committed Apr 17, 2024
1 parent 0b1f3ed commit c69cbf2
Show file tree
Hide file tree
Showing 10 changed files with 583 additions and 256 deletions.
54 changes: 54 additions & 0 deletions common/gopool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package gopool

import (
"runtime"
"time"

"github.com/panjf2000/ants/v2"
)

var (
// Init a instance pool when importing ants.
defaultPool, _ = ants.NewPool(ants.DefaultAntsPoolSize, ants.WithExpiryDuration(10*time.Second))
minNumberPerTask = 5
)

// Submit submits a task to pool.
func Submit(task func()) error {
return defaultPool.Submit(task)
}

// Running returns the number of the currently running goroutines.
func Running() int {
return defaultPool.Running()
}

// Cap returns the capacity of this default pool.
func Cap() int {
return defaultPool.Cap()
}

// Free returns the available goroutines to work.
func Free() int {
return defaultPool.Free()
}

// Release Closes the default pool.
func Release() {
defaultPool.Release()
}

// Reboot reboots the default pool.
func Reboot() {
defaultPool.Reboot()
}

func Threads(tasks int) int {
threads := tasks / minNumberPerTask
if threads > runtime.NumCPU() {
threads = runtime.NumCPU()
} else if threads == 0 {
threads = 1
}
return threads
}
185 changes: 124 additions & 61 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package core
import (
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/common/gopool"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)
Expand Down Expand Up @@ -65,57 +68,81 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
if hash := types.CalcUncleHash(block.Uncles()); hash != header.UncleHash {
return fmt.Errorf("uncle root hash mismatch (header value %x, calculated %x)", header.UncleHash, hash)
}
if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != header.TxHash {
return fmt.Errorf("transaction root hash mismatch (header value %x, calculated %x)", header.TxHash, hash)
}
validateFuns := []func() error{
func() error {
if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != header.TxHash {
return fmt.Errorf("transaction root hash mismatch (header value %x, calculated %x)", header.TxHash, hash)
}
return nil
},
func() error {
// Withdrawals are present after the Shanghai fork.
if header.WithdrawalsHash != nil {
// Withdrawals list must be present in body after Shanghai.
if block.Withdrawals() == nil {
return errors.New("missing withdrawals in block body")
}
if hash := types.DeriveSha(block.Withdrawals(), trie.NewStackTrie(nil)); hash != *header.WithdrawalsHash {
return fmt.Errorf("withdrawals root hash mismatch (header value %x, calculated %x)", *header.WithdrawalsHash, hash)
}
} else if block.Withdrawals() != nil {
// Withdrawals are not allowed prior to Shanghai fork
return errors.New("withdrawals present in block body")
}
return nil
},
func() error {
// Blob transactions may be present after the Cancun fork.
var blobs int
for i, tx := range block.Transactions() {
// Count the number of blobs to validate against the header's blobGasUsed
blobs += len(tx.BlobHashes())

// Withdrawals are present after the Shanghai fork.
if header.WithdrawalsHash != nil {
// Withdrawals list must be present in body after Shanghai.
if block.Withdrawals() == nil {
return errors.New("missing withdrawals in block body")
}
if hash := types.DeriveSha(block.Withdrawals(), trie.NewStackTrie(nil)); hash != *header.WithdrawalsHash {
return fmt.Errorf("withdrawals root hash mismatch (header value %x, calculated %x)", *header.WithdrawalsHash, hash)
}
} else if block.Withdrawals() != nil {
// Withdrawals are not allowed prior to Shanghai fork
return errors.New("withdrawals present in block body")
}
// If the tx is a blob tx, it must NOT have a sidecar attached to be valid in a block.
if tx.BlobTxSidecar() != nil {
return fmt.Errorf("unexpected blob sidecar in transaction at index %d", i)
}

// Blob transactions may be present after the Cancun fork.
var blobs int
for i, tx := range block.Transactions() {
// Count the number of blobs to validate against the header's blobGasUsed
blobs += len(tx.BlobHashes())
// The individual checks for blob validity (version-check + not empty)
// happens in StateTransition.
}

// If the tx is a blob tx, it must NOT have a sidecar attached to be valid in a block.
if tx.BlobTxSidecar() != nil {
return fmt.Errorf("unexpected blob sidecar in transaction at index %d", i)
}

// The individual checks for blob validity (version-check + not empty)
// happens in StateTransition.
}

// Check blob gas usage.
if header.BlobGasUsed != nil {
if want := *header.BlobGasUsed / params.BlobTxBlobGasPerBlob; uint64(blobs) != want { // div because the header is surely good vs the body might be bloated
return fmt.Errorf("blob gas used mismatch (header %v, calculated %v)", *header.BlobGasUsed, blobs*params.BlobTxBlobGasPerBlob)
}
} else {
if blobs > 0 {
return errors.New("data blobs present in block body")
// Check blob gas usage.
if header.BlobGasUsed != nil {
if want := *header.BlobGasUsed / params.BlobTxBlobGasPerBlob; uint64(blobs) != want { // div because the header is surely good vs the body might be bloated
return fmt.Errorf("blob gas used mismatch (header %v, calculated %v)", *header.BlobGasUsed, blobs*params.BlobTxBlobGasPerBlob)
}
} else {
if blobs > 0 {
return errors.New("data blobs present in block body")
}
}
return nil
},
func() error {
// Ancestor block must be known.
if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) {
return consensus.ErrUnknownAncestor
}
return consensus.ErrPrunedAncestor
}
return nil
},
}
validateRes := make(chan error, len(validateFuns))
for _, f := range validateFuns {
tmpFunc := f
gopool.Submit(func() {
validateRes <- tmpFunc()
})
}
for i := 0; i < len(validateFuns); i++ {
err := <-validateRes
if err != nil {
return err
}
}

// Ancestor block must be known.
if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) {
return consensus.ErrUnknownAncestor
}
return consensus.ErrPrunedAncestor
}
return nil
}

Expand All @@ -126,23 +153,59 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
if block.GasUsed() != usedGas {
return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas)
}
// Validate the received block's bloom with the one derived from the generated receipts.
// For valid blocks this should always validate to true.
rbloom := types.CreateBloom(receipts)
if rbloom != header.Bloom {
return fmt.Errorf("invalid bloom (remote: %x local: %x)", header.Bloom, rbloom)
}
// Tre receipt Trie's root (R = (Tr [[H1, R1], ... [Hn, Rn]]))
receiptSha := types.DeriveSha(receipts, trie.NewStackTrie(nil))
if receiptSha != header.ReceiptHash {
return fmt.Errorf("invalid receipt root hash (remote: %x local: %x)", header.ReceiptHash, receiptSha)

validateFuns := []func() error{
func() error {
if metrics.EnabledExpensive {
defer func(start time.Time) { ReceiptBloomValidateTimer.Update(time.Since(start)) }(time.Now())
}
// Validate the received block's bloom with the one derived from the generated receipts.
// For valid blocks this should always validate to true.
rbloom := types.CreateBloom(receipts)
if rbloom != header.Bloom {
return fmt.Errorf("invalid bloom (remote: %x local: %x)", header.Bloom, rbloom)
}
return nil
},
func() error {
if metrics.EnabledExpensive {
defer func(start time.Time) { ReceiptHashValidateTimer.Update(time.Since(start)) }(time.Now())
}
// Tre receipt Trie's root (R = (Tr [[H1, R1], ... [Hn, Rn]]))
receiptSha := types.DeriveSha(receipts, trie.NewStackTrie(nil))
if receiptSha != header.ReceiptHash {
return fmt.Errorf("invalid receipt root hash (remote: %x local: %x)", header.ReceiptHash, receiptSha)
}
return nil
},
func() error {
if metrics.EnabledExpensive {
defer func(start time.Time) { RootHashValidateTimer.Update(time.Since(start)) }(time.Now())
}
// Validate the state root against the received state root and throw
// an error if they don't match.
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())
}
return nil
},
}
validateRes := make(chan error, len(validateFuns))
for _, f := range validateFuns {
tmpFunc := f
go func() {
validateRes <- tmpFunc()
}()
}
// Validate the state root against the received state root and throw
// an error if they don't match.
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())

var err error
for i := 0; i < len(validateFuns); i++ {
r := <-validateRes
if r != nil && err == nil {
err = r
}
}
return nil
return err
}

// CalcGasLimit computes the gas limit of the next block after parent. It aims
Expand Down
Loading

0 comments on commit c69cbf2

Please sign in to comment.