From c69cbf2c5bb3031b9bf0777c344326f85c2dba33 Mon Sep 17 00:00:00 2001 From: joeylichang Date: Thu, 28 Mar 2024 12:23:54 +0800 Subject: [PATCH] feat: add metrics and do validate and commit concurrently chore: block tx metrics feat: do BlockBody verification concurrently feat: do the calculation of intermediate root concurrently. feat: commit the MPTs concurrently feat: async update difflayer feat: do state verification concurrently chore: change validate time metrics chore: change execute time metrices fix: add flush mutex for nodebufferlist fix: miner set expected root hash test: set expected root test: change set expected root --- common/gopool/pool.go | 54 +++++ core/block_validator.go | 185 ++++++++++------ core/blockchain.go | 92 +++++--- core/blockchain_insert.go | 3 +- core/state/state_object.go | 109 ++++++---- core/state/statedb.go | 371 +++++++++++++++++++++++---------- go.mod | 1 + go.sum | 3 + miner/worker.go | 2 + trie/triedb/pathdb/database.go | 19 +- 10 files changed, 583 insertions(+), 256 deletions(-) create mode 100644 common/gopool/pool.go diff --git a/common/gopool/pool.go b/common/gopool/pool.go new file mode 100644 index 0000000000..ad5c80e36a --- /dev/null +++ b/common/gopool/pool.go @@ -0,0 +1,54 @@ +package gopool + +import ( + "runtime" + "time" + + "github.com/panjf2000/ants/v2" +) + +var ( + // Init a instance pool when importing ants. + defaultPool, _ = ants.NewPool(ants.DefaultAntsPoolSize, ants.WithExpiryDuration(10*time.Second)) + minNumberPerTask = 5 +) + +// Submit submits a task to pool. +func Submit(task func()) error { + return defaultPool.Submit(task) +} + +// Running returns the number of the currently running goroutines. +func Running() int { + return defaultPool.Running() +} + +// Cap returns the capacity of this default pool. +func Cap() int { + return defaultPool.Cap() +} + +// Free returns the available goroutines to work. +func Free() int { + return defaultPool.Free() +} + +// Release Closes the default pool. +func Release() { + defaultPool.Release() +} + +// Reboot reboots the default pool. +func Reboot() { + defaultPool.Reboot() +} + +func Threads(tasks int) int { + threads := tasks / minNumberPerTask + if threads > runtime.NumCPU() { + threads = runtime.NumCPU() + } else if threads == 0 { + threads = 1 + } + return threads +} diff --git a/core/block_validator.go b/core/block_validator.go index f3d65cea25..be72f1e165 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -19,10 +19,13 @@ package core import ( "errors" "fmt" + "time" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" ) @@ -65,57 +68,81 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { if hash := types.CalcUncleHash(block.Uncles()); hash != header.UncleHash { return fmt.Errorf("uncle root hash mismatch (header value %x, calculated %x)", header.UncleHash, hash) } - if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != header.TxHash { - return fmt.Errorf("transaction root hash mismatch (header value %x, calculated %x)", header.TxHash, hash) - } + validateFuns := []func() error{ + func() error { + if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != header.TxHash { + return fmt.Errorf("transaction root hash mismatch (header value %x, calculated %x)", header.TxHash, hash) + } + return nil + }, + func() error { + // Withdrawals are present after the Shanghai fork. + if header.WithdrawalsHash != nil { + // Withdrawals list must be present in body after Shanghai. + if block.Withdrawals() == nil { + return errors.New("missing withdrawals in block body") + } + if hash := types.DeriveSha(block.Withdrawals(), trie.NewStackTrie(nil)); hash != *header.WithdrawalsHash { + return fmt.Errorf("withdrawals root hash mismatch (header value %x, calculated %x)", *header.WithdrawalsHash, hash) + } + } else if block.Withdrawals() != nil { + // Withdrawals are not allowed prior to Shanghai fork + return errors.New("withdrawals present in block body") + } + return nil + }, + func() error { + // Blob transactions may be present after the Cancun fork. + var blobs int + for i, tx := range block.Transactions() { + // Count the number of blobs to validate against the header's blobGasUsed + blobs += len(tx.BlobHashes()) - // Withdrawals are present after the Shanghai fork. - if header.WithdrawalsHash != nil { - // Withdrawals list must be present in body after Shanghai. - if block.Withdrawals() == nil { - return errors.New("missing withdrawals in block body") - } - if hash := types.DeriveSha(block.Withdrawals(), trie.NewStackTrie(nil)); hash != *header.WithdrawalsHash { - return fmt.Errorf("withdrawals root hash mismatch (header value %x, calculated %x)", *header.WithdrawalsHash, hash) - } - } else if block.Withdrawals() != nil { - // Withdrawals are not allowed prior to Shanghai fork - return errors.New("withdrawals present in block body") - } + // If the tx is a blob tx, it must NOT have a sidecar attached to be valid in a block. + if tx.BlobTxSidecar() != nil { + return fmt.Errorf("unexpected blob sidecar in transaction at index %d", i) + } - // Blob transactions may be present after the Cancun fork. - var blobs int - for i, tx := range block.Transactions() { - // Count the number of blobs to validate against the header's blobGasUsed - blobs += len(tx.BlobHashes()) + // The individual checks for blob validity (version-check + not empty) + // happens in StateTransition. + } - // If the tx is a blob tx, it must NOT have a sidecar attached to be valid in a block. - if tx.BlobTxSidecar() != nil { - return fmt.Errorf("unexpected blob sidecar in transaction at index %d", i) - } - - // The individual checks for blob validity (version-check + not empty) - // happens in StateTransition. - } - - // Check blob gas usage. - if header.BlobGasUsed != nil { - if want := *header.BlobGasUsed / params.BlobTxBlobGasPerBlob; uint64(blobs) != want { // div because the header is surely good vs the body might be bloated - return fmt.Errorf("blob gas used mismatch (header %v, calculated %v)", *header.BlobGasUsed, blobs*params.BlobTxBlobGasPerBlob) - } - } else { - if blobs > 0 { - return errors.New("data blobs present in block body") + // Check blob gas usage. + if header.BlobGasUsed != nil { + if want := *header.BlobGasUsed / params.BlobTxBlobGasPerBlob; uint64(blobs) != want { // div because the header is surely good vs the body might be bloated + return fmt.Errorf("blob gas used mismatch (header %v, calculated %v)", *header.BlobGasUsed, blobs*params.BlobTxBlobGasPerBlob) + } + } else { + if blobs > 0 { + return errors.New("data blobs present in block body") + } + } + return nil + }, + func() error { + // Ancestor block must be known. + if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) { + if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) { + return consensus.ErrUnknownAncestor + } + return consensus.ErrPrunedAncestor + } + return nil + }, + } + validateRes := make(chan error, len(validateFuns)) + for _, f := range validateFuns { + tmpFunc := f + gopool.Submit(func() { + validateRes <- tmpFunc() + }) + } + for i := 0; i < len(validateFuns); i++ { + err := <-validateRes + if err != nil { + return err } } - - // Ancestor block must be known. - if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) { - if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) { - return consensus.ErrUnknownAncestor - } - return consensus.ErrPrunedAncestor - } return nil } @@ -126,23 +153,59 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD if block.GasUsed() != usedGas { return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas) } - // Validate the received block's bloom with the one derived from the generated receipts. - // For valid blocks this should always validate to true. - rbloom := types.CreateBloom(receipts) - if rbloom != header.Bloom { - return fmt.Errorf("invalid bloom (remote: %x local: %x)", header.Bloom, rbloom) - } - // Tre receipt Trie's root (R = (Tr [[H1, R1], ... [Hn, Rn]])) - receiptSha := types.DeriveSha(receipts, trie.NewStackTrie(nil)) - if receiptSha != header.ReceiptHash { - return fmt.Errorf("invalid receipt root hash (remote: %x local: %x)", header.ReceiptHash, receiptSha) + + validateFuns := []func() error{ + func() error { + if metrics.EnabledExpensive { + defer func(start time.Time) { ReceiptBloomValidateTimer.Update(time.Since(start)) }(time.Now()) + } + // Validate the received block's bloom with the one derived from the generated receipts. + // For valid blocks this should always validate to true. + rbloom := types.CreateBloom(receipts) + if rbloom != header.Bloom { + return fmt.Errorf("invalid bloom (remote: %x local: %x)", header.Bloom, rbloom) + } + return nil + }, + func() error { + if metrics.EnabledExpensive { + defer func(start time.Time) { ReceiptHashValidateTimer.Update(time.Since(start)) }(time.Now()) + } + // Tre receipt Trie's root (R = (Tr [[H1, R1], ... [Hn, Rn]])) + receiptSha := types.DeriveSha(receipts, trie.NewStackTrie(nil)) + if receiptSha != header.ReceiptHash { + return fmt.Errorf("invalid receipt root hash (remote: %x local: %x)", header.ReceiptHash, receiptSha) + } + return nil + }, + func() error { + if metrics.EnabledExpensive { + defer func(start time.Time) { RootHashValidateTimer.Update(time.Since(start)) }(time.Now()) + } + // Validate the state root against the received state root and throw + // an error if they don't match. + if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root { + return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error()) + } + return nil + }, + } + validateRes := make(chan error, len(validateFuns)) + for _, f := range validateFuns { + tmpFunc := f + go func() { + validateRes <- tmpFunc() + }() } - // Validate the state root against the received state root and throw - // an error if they don't match. - if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root { - return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error()) + + var err error + for i := 0; i < len(validateFuns); i++ { + r := <-validateRes + if r != nil && err == nil { + err = r + } } - return nil + return err } // CalcGasLimit computes the gas limit of the next block after parent. It aims diff --git a/core/blockchain.go b/core/blockchain.go index 1dd7490ee3..501007ab7b 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -78,18 +78,25 @@ var ( triedbCommitTimer = metrics.NewRegisteredTimer("chain/triedb/commits", nil) + trieAllCommitTimer = metrics.NewRegisteredTimer("chain/triedball/commits", nil) + CodeCommitTimer = metrics.NewRegisteredTimer("chain/code/commits", nil) + MetaCommitTimer = metrics.NewRegisteredTimer("chain/meta/commits", nil) + + ReceiptBloomValidateTimer = metrics.NewRegisteredTimer("chain/validate/receipt/bloom", nil) + ReceiptHashValidateTimer = metrics.NewRegisteredTimer("chain/validate/receipt/hash", nil) + RootHashValidateTimer = metrics.NewRegisteredTimer("chain/validate/roothash", nil) + blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil) blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil) blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil) blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil) - blockWriteExternalTimer = metrics.NewRegisteredTimer("chain/block/write/external", nil) - stateCommitExternalTimer = metrics.NewRegisteredTimer("chain/state/commit/external", nil) + blockWriteExternalTimer = metrics.NewRegisteredTimer("chain/block/write/external", nil) + stateCommitExternalTimer = metrics.NewRegisteredTimer("chain/state/commit/external", nil) triedbCommitExternalTimer = metrics.NewRegisteredTimer("chain/triedb/commit/external", nil) - innerExecutionTimer = metrics.NewRegisteredTimer("chain/inner/execution", nil) + innerExecutionTimer = metrics.NewRegisteredTimer("chain/inner/execution", nil) blockGasUsedGauge = metrics.NewRegisteredGauge("chain/block/gas/used", nil) - mgaspsGauge = metrics.NewRegisteredGauge("chain/mgas/ps", nil) blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil) blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil) @@ -98,6 +105,9 @@ var ( blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil) blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) + mgaspsGauge = metrics.NewRegisteredGauge("chain/execution/mgasps", nil) + blockTxGuage = metrics.NewRegisteredGauge("chain/block/tx", nil) + errInsertionInterrupted = errors.New("insertion is interrupted") errChainStopped = errors.New("blockchain is stopped") errInvalidOldChain = errors.New("invalid old chain") @@ -1445,24 +1455,31 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // Make sure no inconsistent state is leaked during insertion externTd := new(big.Int).Add(block.Difficulty(), ptd) + state.SetExpectedStateRoot(block.Root()) // Irrelevant of the canonical status, write the block itself to the database. // // Note all the components of block(td, hash->number map, header, body, receipts) // should be written atomically. BlockBatch is used for containing all components. - start := time.Now() - blockBatch := bc.db.NewBatch() - rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd) - rawdb.WriteBlock(blockBatch, block) - rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts) - rawdb.WritePreimages(blockBatch, state.Preimages()) - if err := blockBatch.Write(); err != nil { - log.Crit("Failed to write block into disk", "err", err) - } - blockWriteExternalTimer.UpdateSince(start) - log.Debug("blockWriteExternalTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", block.Hash()) - + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + if metrics.EnabledExpensive { + defer func(start time.Time) { MetaCommitTimer.Update(time.Since(start)) }(time.Now()) + } + blockBatch := bc.db.NewBatch() + rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd) + rawdb.WriteBlock(blockBatch, block) + rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts) + rawdb.WritePreimages(blockBatch, state.Preimages()) + if err := blockBatch.Write(); err != nil { + log.Crit("Failed to write block into disk", "err", err) + } + wg.Done() + }() + // Commit all cached state changes into underlying memory database. - start = time.Now() + start := time.Now() root, err := state.Commit(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number())) if err != nil { return err @@ -1473,16 +1490,19 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // If node is running in path mode, skip explicit gc operation // which is unnecessary in this mode. if bc.triedb.Scheme() == rawdb.PathScheme { + wg.Wait() return nil } // If we're running an archive node, always flush start = time.Now() - defer func () { + defer func() { triedbCommitExternalTimer.UpdateSince(start) log.Debug("triedbCommitExternalTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", block.Hash()) - } () + }() if bc.cacheConfig.TrieDirtyDisabled { - return bc.triedb.Commit(root, false) + err = bc.triedb.Commit(root, false) + wg.Wait() + return err } // Full but not archive node, do proper garbage collection bc.triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive @@ -1491,6 +1511,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // Flush limits are not considered for the first TriesInMemory blocks. current := block.NumberU64() if current <= TriesInMemory { + wg.Wait() return nil } // If we exceeded our memory allowance, flush matured singleton nodes to disk @@ -1532,6 +1553,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } bc.triedb.Dereference(root) } + wg.Wait() return nil } @@ -1785,7 +1807,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) } }() - defer func () { + defer func() { DebugInnerExecutionDuration = 0 }() for ; block != nil && err == nil || errors.Is(err, ErrKnownBlock); block, err = it.next() { @@ -1871,7 +1893,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) // If we have a followup block, run that against the current state to pre-cache // transactions and probabilistically some of the account/storage trie nodes. - var followupInterrupt atomic.Bool + //var followupInterrupt atomic.Bool if !bc.cacheConfig.TrieCleanNoPrefetch { if followup, err := it.peek(); followup != nil && err == nil { throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps) @@ -1887,6 +1909,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) } } + statedb.SetExpectedStateRoot(block.Root()) // Process block using the parent state as reference point pstart = time.Now() receipts, logs, usedGas, err = bc.processor.Process(block, statedb, bc.vmConfig) @@ -1908,16 +1931,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) proctime := time.Since(start) // processing + validation // Update the metrics touched during block processing and validation - accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing) - storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing) - snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete(in processing) - snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete(in processing) - accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation) - storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation) - accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation) - storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation) - blockExecutionTimer.Update(ptime) // The time spent on block execution - blockValidationTimer.Update(vtime) // The time spent on block validation + accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing) + storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing) + snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete(in processing) + snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete(in processing) + accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation) + storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation) + accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation) + storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation) + blockExecutionTimer.Update(ptime) // The time spent on block execution + blockValidationTimer.Update(vtime) // The time spent on block validation innerExecutionTimer.Update(DebugInnerExecutionDuration) @@ -1943,9 +1966,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them + trieAllCommitTimer.Update(statedb.TrieAllCommits) + CodeCommitTimer.Update(statedb.CodeCommits) blockWriteTimer.UpdateSince(wstart) blockInsertTimer.UpdateSince(start) + blockTxGuage.Update(int64(block.Transactions().Len())) log.Debug("New payload db write metrics", "hash", block.Hash(), "insert", common.PrettyDuration(time.Since(start)), "writeDB", common.PrettyDuration(time.Since(wstart)), "writeBlock", common.PrettyDuration(time.Since(wstart)), "accountCommit", common.PrettyDuration(statedb.AccountCommits), "storageCommit", common.PrettyDuration(statedb.StorageCommits), "snapshotCommits", common.PrettyDuration(statedb.SnapshotCommits), "triedbCommit", common.PrettyDuration(statedb.TrieDBCommits)) @@ -1959,7 +1985,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) } trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size() stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead) - blockGasUsedGauge.Update(int64(block.GasUsed())/1000000) + blockGasUsedGauge.Update(int64(block.GasUsed()) / 1000000) if !setHead { // After merge we expect few side chains. Simply count diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go index 82a480d0be..9dc6017e42 100644 --- a/core/blockchain_insert.go +++ b/core/blockchain_insert.go @@ -60,7 +60,8 @@ func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, sn "blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000, "elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed), } - mgaspsGauge.Update(int64(st.usedGas)*1000/int64(elapsed)) + + mgaspsGauge.Update(int64(float64(st.usedGas) * 1000 / float64(elapsed))) if timestamp := time.Unix(int64(end.Time()), 0); time.Since(timestamp) > time.Minute { context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...) } diff --git a/core/state/state_object.go b/core/state/state_object.go index 11fcb01871..f710c57aac 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -19,11 +19,13 @@ package state import ( "bytes" "fmt" - "github.com/ethereum/go-ethereum/core/opcodeCompiler/compiler" "io" "math/big" + "sync" "time" + "github.com/ethereum/go-ethereum/core/opcodeCompiler/compiler" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -287,6 +289,7 @@ func (s *stateObject) updateTrie() (Trie, error) { var ( storage map[common.Hash][]byte origin map[common.Hash][]byte + hasher = crypto.NewKeccakState() ) tr, err := s.getTrie() if err != nil { @@ -295,61 +298,83 @@ func (s *stateObject) updateTrie() (Trie, error) { } // Insert all the pending storage updates into the trie usedStorage := make([][]byte, 0, len(s.pendingStorage)) + dirtyStorage := make(map[common.Hash][]byte) for key, value := range s.pendingStorage { // Skip noop changes, persist actual changes if value == s.originStorage[key] { continue } - prev := s.originStorage[key] - s.originStorage[key] = value - - var encoded []byte // rlp-encoded value to be used by the snapshot - if (value == common.Hash{}) { - if err := tr.DeleteStorage(s.address, key[:]); err != nil { - s.db.setError(err) - return nil, err - } - s.db.StorageDeleted += 1 - } else { - // Encoding []byte cannot fail, ok to ignore the error. - trimmed := common.TrimLeftZeroes(value[:]) - encoded, _ = rlp.EncodeToBytes(trimmed) - if err := tr.UpdateStorage(s.address, key[:], trimmed); err != nil { - s.db.setError(err) - return nil, err + var v []byte + if value != (common.Hash{}) { + value := value + v = common.TrimLeftZeroes(value[:]) + } + dirtyStorage[key] = v + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for key, value := range dirtyStorage { + if len(value) == 0 { + if err := tr.DeleteStorage(s.address, key[:]); err != nil { + s.db.setError(err) + } + s.db.StorageDeleted += 1 + } else { + if err := tr.UpdateStorage(s.address, key[:], value); err != nil { + s.db.setError(err) + } + s.db.StorageUpdated += 1 } - s.db.StorageUpdated += 1 + // Cache the items for preloading + usedStorage = append(usedStorage, common.CopyBytes(key[:])) } - // Cache the mutated storage slots until commit + }() + // If state snapshotting is active, cache the data til commit + wg.Add(1) + go func() { + defer wg.Done() + s.db.StorageMux.Lock() + // The snapshot storage map for the object + storage = s.db.storages[s.addrHash] if storage == nil { - if storage = s.db.storages[s.addrHash]; storage == nil { - storage = make(map[common.Hash][]byte) - s.db.storages[s.addrHash] = storage - } + storage = make(map[common.Hash][]byte, len(dirtyStorage)) + s.db.storages[s.addrHash] = storage } - khash := crypto.HashData(s.db.hasher, key[:]) - storage[khash] = encoded // encoded will be nil if it's deleted - // Cache the original value of mutated storage slots + origin = s.db.storagesOrigin[s.address] if origin == nil { - if origin = s.db.storagesOrigin[s.address]; origin == nil { - origin = make(map[common.Hash][]byte) - s.db.storagesOrigin[s.address] = origin - } + origin = make(map[common.Hash][]byte) + s.db.storagesOrigin[s.address] = origin } - // Track the original value of slot only if it's mutated first time - if _, ok := origin[khash]; !ok { - if prev == (common.Hash{}) { - origin[khash] = nil // nil if it was not present previously - } else { - // Encoding []byte cannot fail, ok to ignore the error. - b, _ := rlp.EncodeToBytes(common.TrimLeftZeroes(prev[:])) - origin[khash] = b + s.db.StorageMux.Unlock() + for key, value := range dirtyStorage { + khash := crypto.HashData(hasher, key[:]) + + // rlp-encoded value to be used by the snapshot + var encoded []byte + if len(value) != 0 { + encoded, _ = rlp.EncodeToBytes(value) + } + storage[khash] = encoded // encoded will be nil if it's deleted + + // Track the original value of slot only if it's mutated first time + prev := s.originStorage[key] + s.originStorage[key] = common.BytesToHash(value) // fill back left zeroes by BytesToHash + if _, ok := origin[khash]; !ok { + if prev == (common.Hash{}) { + origin[khash] = nil // nil if it was not present previously + } else { + // Encoding []byte cannot fail, ok to ignore the error. + b, _ := rlp.EncodeToBytes(common.TrimLeftZeroes(prev[:])) + origin[khash] = b + } } } - // Cache the items for preloading - usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure - } + }() + wg.Wait() + if s.db.prefetcher != nil { s.db.prefetcher.used(s.addrHash, s.data.Root, usedStorage) } diff --git a/core/state/statedb.go b/core/state/statedb.go index d28cd29b30..f98df2da19 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -20,14 +20,18 @@ package state import ( "fmt" "math/big" + "runtime" "sort" + "sync" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" @@ -69,9 +73,13 @@ type StateDB struct { // originalRoot is the pre-state root, before any changes were made. // It will be updated when the Commit is called. originalRoot common.Hash + expectedRoot common.Hash // The state root in the block header + stateRoot common.Hash // The calculation result of IntermediateRoot // These maps hold the state changes (including the corresponding // original value) that occurred in this **block**. + AccountMux sync.Mutex // Mutex for accounts access + StorageMux sync.Mutex // Mutex for storages access accounts map[common.Hash][]byte // The mutated accounts in 'slim RLP' encoding storages map[common.Hash]map[common.Hash][]byte // The mutated slots in prefix-zero trimmed rlp format accountsOrigin map[common.Address][]byte // The original value of mutated accounts in 'slim RLP' encoding @@ -130,6 +138,8 @@ type StateDB struct { SnapshotStorageReads time.Duration SnapshotCommits time.Duration TrieDBCommits time.Duration + TrieAllCommits time.Duration + CodeCommits time.Duration AccountUpdated int StorageUpdated int @@ -875,7 +885,55 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // Finalise all the dirty storage states and write them into the tries s.Finalise(deleteEmptyObjects) + s.AccountsIntermediateRoot() + return s.StateIntermediateRoot() +} + +func (s *StateDB) AccountsIntermediateRoot() { + tasks := make(chan func()) + finishCh := make(chan struct{}) + defer close(finishCh) + wg := sync.WaitGroup{} + for i := 0; i < runtime.NumCPU(); i++ { + go func() { + for { + select { + case task := <-tasks: + task() + case <-finishCh: + return + } + } + }() + } + + // Although naively it makes sense to retrieve the account trie and then do + // the contract storage and account updates sequentially, that short circuits + // the account prefetcher. Instead, let's process all the storage updates + // first, giving the account prefetches just a few more milliseconds of time + // to pull useful data from disk. + for addr := range s.stateObjectsPending { + if obj := s.stateObjects[addr]; !obj.deleted { + wg.Add(1) + tasks <- func() { + obj.updateRoot() + + // Cache the data until commit. Note, this update mechanism is not symmetric + // to the deletion, because whereas it is enough to track account updates + // at commit time, deletions need tracking at transaction boundary level to + // ensure we capture state clearing. + s.AccountMux.Lock() + s.accounts[obj.addrHash] = types.SlimAccountRLP(obj.data) + s.AccountMux.Unlock() + + wg.Done() + } + } + } + wg.Wait() +} +func (s *StateDB) StateIntermediateRoot() common.Hash { // If there was a trie prefetcher operating, it gets aborted and irrevocably // modified after we start retrieving tries. Remove it from the statedb after // this round of use. @@ -890,16 +948,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { s.prefetcher = nil }() } - // Although naively it makes sense to retrieve the account trie and then do - // the contract storage and account updates sequentially, that short circuits - // the account prefetcher. Instead, let's process all the storage updates - // first, giving the account prefetches just a few more milliseconds of time - // to pull useful data from disk. - for addr := range s.stateObjectsPending { - if obj := s.stateObjects[addr]; !obj.deleted { - obj.updateRoot() - } - } + // Now we're about to start to write changes to the trie. The trie is so far // _untouched_. We can check with the prefetcher, if it can give us a trie // which has the same root, but also has some content loaded into it. @@ -908,14 +957,20 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { s.trie = trie } } + if s.trie == nil { + tr, err := s.db.OpenTrie(s.originalRoot) + if err != nil { + panic(fmt.Sprintf("failed to open trie tree %s", s.originalRoot)) + } + s.trie = tr + } + usedAddrs := make([][]byte, 0, len(s.stateObjectsPending)) for addr := range s.stateObjectsPending { if obj := s.stateObjects[addr]; obj.deleted { s.deleteStateObject(obj) - s.AccountDeleted += 1 } else { s.updateStateObject(obj) - s.AccountUpdated += 1 } usedAddrs = append(usedAddrs, common.CopyBytes(addr[:])) // Copy needed for closure } @@ -1154,6 +1209,11 @@ func (s *StateDB) handleDestruction(nodes *trienode.MergedNodeSet) (map[common.A return incomplete, nil } +// Mark that the block is processed by diff layer +func (s *StateDB) SetExpectedStateRoot(root common.Hash) { + s.expectedRoot = root +} + // Commit writes the state to the underlying in-memory trie database. // Once the state is committed, tries cached in stateDB (including account // trie, storage tries) will no longer be functional. A new state instance @@ -1165,130 +1225,215 @@ func (s *StateDB) handleDestruction(nodes *trienode.MergedNodeSet) (map[common.A func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, error) { // Short circuit in case any database failure occurred earlier. if s.dbErr != nil { + s.StopPrefetcher() return common.Hash{}, fmt.Errorf("commit aborted due to earlier error: %v", s.dbErr) } - // Finalize any pending changes and merge everything into the tries - s.IntermediateRoot(deleteEmptyObjects) - // Commit objects to the trie, measuring the elapsed time var ( - accountTrieNodesUpdated int - accountTrieNodesDeleted int - storageTrieNodesUpdated int - storageTrieNodesDeleted int - nodes = trienode.NewMergedNodeSet() - codeWriter = s.db.DiskDB().NewBatch() + nodes = trienode.NewMergedNodeSet() + incomplete map[common.Address]struct{} ) - // Handle all state deletions first - incomplete, err := s.handleDestruction(nodes) - if err != nil { - return common.Hash{}, err - } - // Handle all state updates afterwards - for addr := range s.stateObjectsDirty { - obj := s.stateObjects[addr] - if obj.deleted { - continue + //if s.stateRoot = s.StateIntermediateRoot(); s.expectedRoot != s.stateRoot { + // s.expectedRoot = s.stateRoot + // //log.Error("Invalid merkle root", "remote", s.expectedRoot, "local", s.stateRoot) + // //return fmt.Errorf("invalid merkle root (remote: %x local: %x)", s.expectedRoot, s.stateRoot) + //} + commmitTrie := func() error { + if metrics.EnabledExpensive { + defer func(start time.Time) { s.TrieAllCommits += time.Since(start) }(time.Now()) + } + if s.stateRoot = s.StateIntermediateRoot(); s.expectedRoot != s.stateRoot { + log.Error("Invalid merkle root", "remote", s.expectedRoot, "local", s.stateRoot) + return fmt.Errorf("invalid merkle root (remote: %x local: %x)", s.expectedRoot, s.stateRoot) + } + + var err error + // Handle all state deletions first + incomplete, err = s.handleDestruction(nodes) + if err != nil { + return err + } + + tasks := make(chan func()) + type taskResult struct { + err error + nodeSet *trienode.NodeSet + } + taskResults := make(chan taskResult, len(s.stateObjectsDirty)) + tasksNum := 0 + finishCh := make(chan struct{}) + + threads := gopool.Threads(len(s.stateObjectsDirty)) + wg := sync.WaitGroup{} + for i := 0; i < threads; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case task := <-tasks: + task() + case <-finishCh: + return + } + } + }() } - // Write any contract code associated with the state object - if obj.code != nil && obj.dirtyCode { - rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) - obj.dirtyCode = false + + for addr := range s.stateObjectsDirty { + if obj := s.stateObjects[addr]; !obj.deleted { + tasks <- func() { + // Write any storage changes in the state object to its storage trie + if set, err := obj.commit(); err != nil { + taskResults <- taskResult{err, nil} + return + } else { + taskResults <- taskResult{nil, set} + } + + } + tasksNum++ + } + } + + for i := 0; i < tasksNum; i++ { + res := <-taskResults + if res.err != nil { + close(finishCh) + return res.err + } + // Merge the dirty nodes of storage trie into global set. It is possible + // that the account was destructed and then resurrected in the same block. + // In this case, the node set is shared by both accounts. + if res.nodeSet != nil { + if err := nodes.Merge(res.nodeSet); err != nil { + return err + } + } } - // Write any storage changes in the state object to its storage trie - set, err := obj.commit() + close(finishCh) + + var start time.Time + if metrics.EnabledExpensive { + start = time.Now() + } + root, set, err := s.trie.Commit(true) if err != nil { - return common.Hash{}, err + return err } - // Merge the dirty nodes of storage trie into global set. It is possible - // that the account was destructed and then resurrected in the same block. - // In this case, the node set is shared by both accounts. + // Merge the dirty nodes of account trie into global set if set != nil { if err := nodes.Merge(set); err != nil { - return common.Hash{}, err + return err } - updates, deleted := set.Size() - storageTrieNodesUpdated += updates - storageTrieNodesDeleted += deleted } - } - if codeWriter.ValueSize() > 0 { - if err := codeWriter.Write(); err != nil { - log.Crit("Failed to commit dirty codes", "error", err) + if metrics.EnabledExpensive { + s.AccountCommits += time.Since(start) } - } - // Write the account trie changes, measuring the amount of wasted time - var start time.Time - if metrics.EnabledExpensive { - start = time.Now() - } - root, set, err := s.trie.Commit(true) - if err != nil { - return common.Hash{}, err - } - // Merge the dirty nodes of account trie into global set - if set != nil { - if err := nodes.Merge(set); err != nil { - return common.Hash{}, err + + origin := s.originalRoot + if origin == (common.Hash{}) { + origin = types.EmptyRootHash } - accountTrieNodesUpdated, accountTrieNodesDeleted = set.Size() - } - if metrics.EnabledExpensive { - s.AccountCommits += time.Since(start) - - accountUpdatedMeter.Mark(int64(s.AccountUpdated)) - storageUpdatedMeter.Mark(int64(s.StorageUpdated)) - accountDeletedMeter.Mark(int64(s.AccountDeleted)) - storageDeletedMeter.Mark(int64(s.StorageDeleted)) - accountTrieUpdatedMeter.Mark(int64(accountTrieNodesUpdated)) - accountTrieDeletedMeter.Mark(int64(accountTrieNodesDeleted)) - storageTriesUpdatedMeter.Mark(int64(storageTrieNodesUpdated)) - storageTriesDeletedMeter.Mark(int64(storageTrieNodesDeleted)) - s.AccountUpdated, s.AccountDeleted = 0, 0 - s.StorageUpdated, s.StorageDeleted = 0, 0 - } - // If snapshotting is enabled, update the snapshot tree with this new version - if s.snap != nil { - start := time.Now() - // Only update if there's a state transition (skip empty Clique blocks) - if parent := s.snap.Root(); parent != root { - if err := s.snaps.Update(root, parent, s.convertAccountSet(s.stateObjectsDestruct), s.accounts, s.storages); err != nil { - log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err) + + if root != origin { + start := time.Now() + set := triestate.New(s.accountsOrigin, s.storagesOrigin, incomplete) + if err := s.db.TrieDB().Update(root, origin, block, nodes, set); err != nil { + return err + } + s.originalRoot = root + if metrics.EnabledExpensive { + s.TrieDBCommits += time.Since(start) } - // Keep 128 diff layers in the memory, persistent layer is 129th. - // - head layer is paired with HEAD state - // - head-1 layer is paired with HEAD-1 state - // - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state - if err := s.snaps.Cap(root, 128); err != nil { - log.Warn("Failed to cap snapshot tree", "root", root, "layers", 128, "err", err) + if s.onCommit != nil { + s.onCommit(set) } } - if metrics.EnabledExpensive { - s.SnapshotCommits += time.Since(start) + + wg.Wait() + return nil + + } + commitFuncs := []func() error{ + func() error { + if metrics.EnabledExpensive { + defer func(start time.Time) { s.CodeCommits += time.Since(start) }(time.Now()) + } + codeWriter := s.db.DiskDB().NewBatch() + for addr := range s.stateObjectsDirty { + if obj := s.stateObjects[addr]; !obj.deleted { + // Write any contract code associated with the state object + if obj.code != nil && obj.dirtyCode { + rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) + obj.dirtyCode = false + if codeWriter.ValueSize() > ethdb.IdealBatchSize { + if err := codeWriter.Write(); err != nil { + return err + } + codeWriter.Reset() + } + } + } + } + if codeWriter.ValueSize() > 0 { + if err := codeWriter.Write(); err != nil { + log.Crit("Failed to commit dirty codes", "error", err) + return err + } + } + return nil + }, + func() error { + // If snapshotting is enabled, update the snapshot tree with this new version + if s.snap != nil { + if metrics.EnabledExpensive { + defer func(start time.Time) { s.SnapshotCommits += time.Since(start) }(time.Now()) + } + // Only update if there's a state transition (skip empty Clique blocks) + if parent := s.snap.Root(); parent != s.expectedRoot { + err := s.snaps.Update(s.expectedRoot, parent, s.convertAccountSet(s.stateObjectsDestruct), s.accounts, s.storages) + + if err != nil { + log.Warn("Failed to update snapshot tree", "from", parent, "to", s.expectedRoot, "err", err) + } + + // Keep n diff layers in the memory + // - head layer is paired with HEAD state + // - head-1 layer is paired with HEAD-1 state + // - head-(n-1) layer(bottom-most diff layer) is paired with HEAD-(n-1)state + go func() { + if err := s.snaps.Cap(s.expectedRoot, 128); err != nil { + log.Warn("Failed to cap snapshot tree", "root", s.expectedRoot, "layers", 128, "err", err) + } + }() + } + } + return nil + }, + } + defer s.StopPrefetcher() + commitFuncs = append(commitFuncs, commmitTrie) + commitRes := make(chan error, len(commitFuncs)) + for _, f := range commitFuncs { + // commitFuncs[0] and commitFuncs[1] both read map `stateObjects`, but no conflicts + tmpFunc := f + go func() { + commitRes <- tmpFunc() + }() + } + for i := 0; i < len(commitFuncs); i++ { + r := <-commitRes + if r != nil { + return common.Hash{}, r } - s.snap = nil } + + root := s.stateRoot + s.snap = nil if root == (common.Hash{}) { root = types.EmptyRootHash } - origin := s.originalRoot - if origin == (common.Hash{}) { - origin = types.EmptyRootHash - } - if root != origin { - start := time.Now() - set := triestate.New(s.accountsOrigin, s.storagesOrigin, incomplete) - if err := s.db.TrieDB().Update(root, origin, block, nodes, set); err != nil { - return common.Hash{}, err - } - s.originalRoot = root - if metrics.EnabledExpensive { - s.TrieDBCommits += time.Since(start) - } - if s.onCommit != nil { - s.onCommit(set) - } - } // Clear all internal flags at the end of commit operation. s.accounts = make(map[common.Hash][]byte) s.storages = make(map[common.Hash]map[common.Hash][]byte) diff --git a/go.mod b/go.mod index 1ce804bfa4..faae69af08 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/mattn/go-isatty v0.0.20 github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 github.com/olekukonko/tablewriter v0.0.5 + github.com/panjf2000/ants/v2 v2.4.5 github.com/peterh/liner v1.2.0 github.com/protolambda/bls12-381-util v0.0.0-20220416220906-d8552aa452c7 github.com/prysmaticlabs/prysm/v4 v4.2.0 diff --git a/go.sum b/go.sum index bb9fdca4ef..79d757a81e 100644 --- a/go.sum +++ b/go.sum @@ -1169,6 +1169,8 @@ github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJ github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM= +github.com/panjf2000/ants/v2 v2.4.5 h1:kcGvjXB7ea0MrzzszpnlVFthhYKoFxLi75nRbsq01HY= +github.com/panjf2000/ants/v2 v2.4.5/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= @@ -2078,6 +2080,7 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/miner/worker.go b/miner/worker.go index 8b03825cbf..823fa81ad1 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -732,6 +732,8 @@ func (w *worker) resultLoop() { logs = append(logs, receipt.Logs...) } // Commit block and state to database. + task.state.SetExpectedStateRoot(block.Root()) + log.Error("miner pre write block", "root", block.Root()) _, err := w.chain.WriteBlockAndSetHead(block, receipts, logs, task.state, true) if err != nil { log.Error("Failed writing block to chain", "err", err) diff --git a/trie/triedb/pathdb/database.go b/trie/triedb/pathdb/database.go index 1d22189068..b982e9bcbd 100644 --- a/trie/triedb/pathdb/database.go +++ b/trie/triedb/pathdb/database.go @@ -24,6 +24,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" @@ -233,12 +234,18 @@ func (db *Database) Update(root common.Hash, parentRoot common.Hash, block uint6 if err := db.tree.add(root, parentRoot, block, nodes, states); err != nil { return err } - // Keep 128 diff layers in the memory, persistent layer is 129th. - // - head layer is paired with HEAD state - // - head-1 layer is paired with HEAD-1 state - // - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state - // - head-128 layer(disk layer) is paired with HEAD-128 state - return db.tree.cap(root, maxDiffLayers) + gopool.Submit(func() { + // Keep 128 diff layers in the memory, persistent layer is 129th. + // - head layer is paired with HEAD state + // - head-1 layer is paired with HEAD-1 state + // - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state + // - head-128 layer(disk layer) is paired with HEAD-128 state + err := db.tree.cap(root, maxDiffLayers) + if err != nil { + log.Crit("failed to cap layer tree", "error", err) + } + }) + return nil } // Commit traverses downwards the layer tree from a specified layer with the