Skip to content

Commit

Permalink
perf: support new api engine_opSealPayload and optimize overheads of …
Browse files Browse the repository at this point in the history
…block mining (bnb-chain#193)
  • Loading branch information
bnoieh authored Dec 16, 2024
1 parent 723f40f commit 84232cb
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 21 deletions.
12 changes: 12 additions & 0 deletions beacon/engine/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import (
// building of the payload to commence.
type PayloadVersion byte

const (
GetPayloadStage = "getPayload"
NewPayloadStage = "newPayload"
ForkchoiceUpdatedStage = "forkchoiceUpdated"
)

var (
PayloadV1 PayloadVersion = 0x1
PayloadV2 PayloadVersion = 0x2
Expand Down Expand Up @@ -181,6 +187,12 @@ type ForkchoiceStateV1 struct {
FinalizedBlockHash common.Hash `json:"finalizedBlockHash"`
}

type OpSealPayloadResponse struct {
ErrStage string `json:"errStage"`
PayloadStatus PayloadStatusV1 `json:"payloadStatus"`
Payload *ExecutionPayloadEnvelope `json:"payload"`
}

func encodeTransactions(txs []*types.Transaction) [][]byte {
var enc = make([][]byte, len(txs))
for i, tx := range txs {
Expand Down
12 changes: 9 additions & 3 deletions consensus/beacon/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,11 +399,17 @@ func (beacon *Beacon) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea
// Finalize and assemble the block.
beacon.Finalize(chain, header, state, txs, uncles, withdrawals)

// Assign the final state root to header.
header.Root = state.IntermediateRoot(true)
rootCh := make(chan common.Hash)
go func() {
rootCh <- state.IntermediateRoot(true)
}()

block := types.NewBlockWithWithdrawals(header, txs, uncles, receipts, withdrawals, trie.NewStackTrie(nil))
headerWithRoot := block.Header()
headerWithRoot.Root = <-rootCh

// Assemble and return the final block.
return types.NewBlockWithWithdrawals(header, txs, uncles, receipts, withdrawals, trie.NewStackTrie(nil)), nil
return block.WithSeal(headerWithRoot), nil
}

// Seal generates a new sealing request for the given input block and pushes
Expand Down
8 changes: 5 additions & 3 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {

// ValidateState validates the various changes that happen after a state transition,
// such as amount of used gas, the receipt roots and the state root itself.
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error {
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipRoot bool) error {
header := block.Header()
if block.GasUsed() != usedGas {
return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas)
Expand All @@ -186,14 +186,16 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
}
return nil
},
func() error {
}
if !skipRoot {
validateFuns = append(validateFuns, func() error {
// Validate the state root against the received state root and throw
// an error if they don't match.
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())
}
return nil
},
})
}
validateRes := make(chan error, len(validateFuns))
for _, f := range validateFuns {
Expand Down
74 changes: 66 additions & 8 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,13 @@ func (bc *BlockChain) procFutureBlocks() {
}
}

// CacheBlock cache block in memory
func (bc *BlockChain) CacheBlock(hash common.Hash, block *types.Block) {
bc.hc.numberCache.Add(hash, block.NumberU64())
bc.hc.headerCache.Add(hash, block.Header())
bc.blockCache.Add(hash, block)
}

// CacheMiningReceipts cache receipts in memory
func (bc *BlockChain) CacheMiningReceipts(hash common.Hash, receipts types.Receipts) {
bc.miningReceiptsCache.Add(hash, receipts)
Expand Down Expand Up @@ -1736,6 +1743,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
return 0, nil
}

minerMode := false
if len(chain) == 1 {
block := chain[0]
_, receiptExist := bc.miningReceiptsCache.Get(block.Hash())
_, logExist := bc.miningTxLogsCache.Get(block.Hash())
_, stateExist := bc.miningStateCache.Get(block.Hash())
minerMode = receiptExist && logExist && stateExist
}

// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
SenderCacher.RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number(), chain[0].Time()), chain)

Expand All @@ -1759,7 +1775,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

// Peek the error for the first block to decide the directing import logic
it := newInsertIterator(chain, results, bc.validator)
block, err := it.next()
var block *types.Block
var err error
if minerMode {
block = chain[0]
it.index = 0
} else {
block, err = it.next()
}

// Left-trim all the known blocks that don't need to build snapshot
if bc.skipBlock(err, it) {
Expand Down Expand Up @@ -1975,11 +1998,28 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
ptime := time.Since(pstart)

vstart := time.Now()
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)
return it.index, err
// Async validate if minerMode
asyncValidateStateCh := make(chan error, 1)
if minerMode {
header := block.Header()
// Can not validate root concurrently
if root := statedb.IntermediateRoot(bc.chainConfig.IsEIP158(header.Number)); header.Root != root {
err := fmt.Errorf("self mined block(hash: %x number %v) verify root err(mined: %x expected: %x) dberr: %w", block.Hash(), block.NumberU64(), header.Root, root, statedb.Error())
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)
return it.index, err
}
go func() {
asyncValidateStateCh <- bc.validator.ValidateState(block, statedb, receipts, usedGas, true)
}()
} else {
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, false); err != nil {
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)
return it.index, err
}
}

vtime := time.Since(vstart)
proctime := time.Since(start) // processing + validation

Expand Down Expand Up @@ -2015,6 +2055,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
if err != nil {
return it.index, err
}
if minerMode {
if err := <-asyncValidateStateCh; err != nil {
panic(fmt.Errorf("self mined block(hash: %x number %v) async verify state err: %w", block.Hash(), block.NumberU64(), err))
}
}
bc.CacheBlock(block.Hash(), block)

// Update the metrics touched during block commit
accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
Expand All @@ -2033,10 +2080,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
stats.usedGas += usedGas

var snapDiffItems, snapBufItems common.StorageSize
if bc.snaps != nil {
if bc.snaps != nil && !minerMode {
snapDiffItems, snapBufItems = bc.snaps.Size()
}
trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size()

var trieDiffNodes, trieBufNodes, trieImmutableBufNodes common.StorageSize
if !minerMode {
trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ = bc.triedb.Size()
}
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead)
blockGasUsedGauge.Update(int64(block.GasUsed()) / 1000000)

Expand Down Expand Up @@ -2536,10 +2587,17 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
return common.Hash{}, err
}
}
bc.writeHeadBlock(head)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
bc.writeHeadBlock(head)
}()
// Emit events
logs := bc.collectLogs(head, false)
wg.Wait()

bc.chainFeed.Send(ChainEvent{Block: head, Hash: head.Hash(), Logs: logs})
if len(logs) > 0 {
bc.logsFeed.Send(logs)
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
blockchain.reportBlock(block, receipts, err)
return err
}
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas)
err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, false)
if err != nil {
blockchain.reportBlock(block, receipts, err)
return err
Expand Down
1 change: 0 additions & 1 deletion core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,6 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er

if metrics.EnabledExpensive {
defer func(start time.Time) {
s.AccountCommits += time.Since(start)
accountUpdatedMeter.Mark(int64(s.AccountUpdated))
storageUpdatedMeter.Mark(int64(s.StorageUpdated))
accountDeletedMeter.Mark(int64(s.AccountDeleted))
Expand Down
2 changes: 1 addition & 1 deletion core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Validator interface {

// ValidateState validates the given statedb and optionally the receipts and
// gas used.
ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error
ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64, skipRoot bool) error
}

// Prefetcher is an interface for pre-caching transaction signatures and state.
Expand Down
75 changes: 71 additions & 4 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
forkchoiceUpdateHeadsTimer = metrics.NewRegisteredTimer("api/engine/forkchoiceUpdate/heads", nil)
getPayloadTimer = metrics.NewRegisteredTimer("api/engine/get/payload", nil)
newPayloadTimer = metrics.NewRegisteredTimer("api/engine/new/payload", nil)
sealPayloadTimer = metrics.NewRegisteredTimer("api/engine/seal/payload", nil)
)

// Register adds the engine API to the full node.
Expand Down Expand Up @@ -99,6 +100,8 @@ var caps = []string{
"engine_getPayloadBodiesByHashV1",
"engine_getPayloadBodiesByRangeV1",
"engine_getClientVersionV1",
"engine_opSealPayloadV2",
"engine_opSealPayloadV3",
}

type ConsensusAPI struct {
Expand Down Expand Up @@ -602,11 +605,17 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
defer api.newPayloadLock.Unlock()

log.Trace("Engine API request received", "method", "NewPayload", "number", params.Number, "hash", params.BlockHash)
block, err := engine.ExecutableDataToBlock(params, versionedHashes, beaconRoot)
if err != nil {
log.Warn("Invalid NewPayload params", "params", params, "error", err)
return api.invalid(err, nil), nil

block := api.localBlocks.getBlockByHash(params.BlockHash)
if block == nil {
var err error
block, err = engine.ExecutableDataToBlock(params, versionedHashes, beaconRoot)
if err != nil {
log.Warn("Invalid NewPayload params", "params", params, "error", err)
return api.invalid(err, nil), nil
}
}

// Stash away the last update to warn the user if the beacon client goes offline
api.lastNewPayloadLock.Lock()
api.lastNewPayloadUpdate = time.Now()
Expand Down Expand Up @@ -691,6 +700,64 @@ func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashe
return engine.PayloadStatusV1{Status: engine.VALID, LatestValidHash: &hash}, nil
}

// OpSealPayloadV2 is combination API of payload sealing: getPayload, newPayload, forkchoiceUpdated.
func (api *ConsensusAPI) OpSealPayloadV2(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool) (engine.OpSealPayloadResponse, error) {
return api.opSealPayload(payloadID, update, needPayload, "V2")
}

// OpSealPayloadV3 is combination API of payload sealing: getPayload, newPayload, forkchoiceUpdated.
func (api *ConsensusAPI) OpSealPayloadV3(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool) (engine.OpSealPayloadResponse, error) {
return api.opSealPayload(payloadID, update, needPayload, "V3")
}

func (api *ConsensusAPI) opSealPayload(payloadID engine.PayloadID, update engine.ForkchoiceStateV1, needPayload bool, version string) (engine.OpSealPayloadResponse, error) {
start := time.Now()
defer func() {
sealPayloadTimer.UpdateSince(start)
log.Debug("sealPayloadTimer", "duration", common.PrettyDuration(time.Since(start)), "payloadID", payloadID)
}()
var payloadEnvelope *engine.ExecutionPayloadEnvelope
var err error
if version == "V2" {
payloadEnvelope, err = api.GetPayloadV2(payloadID)
} else if version == "V3" {
payloadEnvelope, err = api.GetPayloadV3(payloadID)
} else {
return engine.OpSealPayloadResponse{ErrStage: engine.GetPayloadStage}, engine.UnsupportedFork.With(errors.New("invalid engine api version"))
}
if err != nil {
log.Error("Seal payload error when get payload", "error", err, "payloadID", payloadID)
return engine.OpSealPayloadResponse{ErrStage: engine.GetPayloadStage}, err
}

var payloadStatus engine.PayloadStatusV1
if version == "V2" {
payloadStatus, err = api.NewPayloadV2(*payloadEnvelope.ExecutionPayload)
} else if version == "V3" {
payloadStatus, err = api.NewPayloadV3(*payloadEnvelope.ExecutionPayload, []common.Hash{}, payloadEnvelope.ParentBeaconBlockRoot)
} else {
return engine.OpSealPayloadResponse{ErrStage: engine.NewPayloadStage}, engine.UnsupportedFork.With(errors.New("invalid engine api version"))
}
if err != nil || payloadStatus.Status != engine.VALID {
log.Error("Seal payload error when new payload", "error", err, "payloadStatus", payloadStatus)
return engine.OpSealPayloadResponse{ErrStage: engine.NewPayloadStage, PayloadStatus: payloadStatus}, err
}

update.HeadBlockHash = payloadEnvelope.ExecutionPayload.BlockHash
updateResponse, err := api.ForkchoiceUpdatedV3(update, nil)
if err != nil || updateResponse.PayloadStatus.Status != engine.VALID {
log.Error("Seal payload error when forkchoiceUpdated", "error", err, "payloadStatus", updateResponse.PayloadStatus)
return engine.OpSealPayloadResponse{ErrStage: engine.ForkchoiceUpdatedStage, PayloadStatus: updateResponse.PayloadStatus}, err
}

log.Info("opSealPayload succeed", "hash", payloadEnvelope.ExecutionPayload.BlockHash, "number", payloadEnvelope.ExecutionPayload.Number, "id", payloadID, "payloadStatus", updateResponse.PayloadStatus)
if needPayload {
return engine.OpSealPayloadResponse{PayloadStatus: updateResponse.PayloadStatus, Payload: payloadEnvelope}, nil
} else {
return engine.OpSealPayloadResponse{PayloadStatus: updateResponse.PayloadStatus}, nil
}
}

// delayPayloadImport stashes the given block away for import at a later time,
// either via a forkchoice update or a sync extension. This method is meant to
// be called by the newpayload command when the block seems to be ok, but some
Expand Down
17 changes: 17 additions & 0 deletions eth/catalyst/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,23 @@ func (q *payloadQueue) has(id engine.PayloadID) bool {
return false
}

// getBlock retrieves block from a previously stored payload or nil if it does not exist.
func (q *payloadQueue) getBlockByHash(hash common.Hash) *types.Block {
q.lock.RLock()
defer q.lock.RUnlock()

for _, item := range q.payloads {
if item == nil {
return nil
}
block := item.payload.GetBlock()
if block != nil && block.Hash() == hash {
return block
}
}
return nil
}

// headerQueueItem represents an hash->header tuple to store until it's retrieved
// or evicted.
type headerQueueItem struct {
Expand Down
9 changes: 9 additions & 0 deletions miner/payload_building.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,15 @@ func (payload *Payload) resolve(onlyFull bool) *engine.ExecutionPayloadEnvelope
return nil
}

func (payload *Payload) GetBlock() *types.Block {
if payload.full != nil {
return payload.full
} else if payload.empty != nil {
return payload.empty
}
return nil
}

// interruptBuilding sets an interrupt for a potentially ongoing
// block building process.
// This will prevent it from adding new transactions to the block, and if it is
Expand Down

0 comments on commit 84232cb

Please sign in to comment.