From 07f40d490c1a1be17e4806e48451a4154ff600c4 Mon Sep 17 00:00:00 2001 From: VM Date: Sat, 6 Jul 2024 10:57:57 +0800 Subject: [PATCH] feat: add recover node buffer list for pathdb --- cmd/geth/chaincmd.go | 38 ---- cmd/geth/dbcmd.go | 23 --- cmd/geth/main.go | 1 - core/blockchain.go | 4 +- core/rawdb/accessors_state.go | 33 +++- core/rawdb/ancient_scheme.go | 22 ++- eth/backend.go | 29 +-- trie/database.go | 18 -- trie/testutil/utils.go | 12 ++ trie/triedb/pathdb/asyncnodebuffer.go | 8 +- trie/triedb/pathdb/database.go | 265 +++----------------------- trie/triedb/pathdb/difflayer_test.go | 3 +- trie/triedb/pathdb/disklayer.go | 54 ++---- trie/triedb/pathdb/history.go | 98 ++++++++-- trie/triedb/pathdb/history_test.go | 41 ++-- trie/triedb/pathdb/journal.go | 96 +++++++--- trie/triedb/pathdb/journal_test.go | 37 ++++ trie/triedb/pathdb/metrics.go | 4 + trie/triedb/pathdb/nodebuffer.go | 10 +- trie/triedb/pathdb/nodebufferlist.go | 211 ++++++++++++++++++-- trie/trienode/node.go | 5 - trie/triestate/state.go | 166 ---------------- 22 files changed, 544 insertions(+), 634 deletions(-) create mode 100644 trie/triedb/pathdb/journal_test.go diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index 21e884ca53..0733a29392 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -39,9 +39,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/trie" - "github.com/ethereum/go-ethereum/trie/triedb/pathdb" - "github.com/olekukonko/tablewriter" "github.com/urfave/cli/v2" ) @@ -159,18 +156,6 @@ It's deprecated, please use "geth db import" instead. This command dumps out the state for a given block (or latest, if none provided). `, } - - dumpRootHashCommand = &cli.Command{ - Action: dumpAllRootHashInPath, - Name: "dump-roothash", - Usage: "Dump all available state root hash in path mode", - Flags: flags.Merge([]cli.Flag{}, utils.DatabaseFlags), - Description: ` -The dump-roothash command dump all available state root hash in path mode. -If you use "dump" command in path mode, please note that it only keeps at most 129 blocks which belongs to diffLayer or diskLayer. -Therefore, you must specify the blockNumber or blockHash that locates in diffLayer or diskLayer. -"geth" will print all available blockNumber and related block state root hash, and you can query block hash by block number. -`} ) // initGenesis will initialise the given JSON format genesis file and writes it as @@ -498,26 +483,3 @@ func hashish(x string) bool { _, err := strconv.Atoi(x) return err != nil } - -func dumpAllRootHashInPath(ctx *cli.Context) error { - stack, _ := makeConfigNode(ctx) - defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, true) - defer db.Close() - triedb := trie.NewDatabase(db, &trie.Config{PathDB: pathdb.ReadOnly}) - defer triedb.Close() - - scheme, err := rawdb.ParseStateScheme(ctx.String(utils.StateSchemeFlag.Name), db) - if err != nil { - return err - } - if scheme == rawdb.HashScheme { - return errors.New("incorrect state scheme, you should use it in path mode") - } - - table := tablewriter.NewWriter(os.Stdout) - table.SetHeader([]string{"Block Number", "Block State Root Hash"}) - table.AppendBulk(triedb.GetAllRooHash()) - table.Render() - return nil -} diff --git a/cmd/geth/dbcmd.go b/cmd/geth/dbcmd.go index 0f872b6278..7835cc5048 100644 --- a/cmd/geth/dbcmd.go +++ b/cmd/geth/dbcmd.go @@ -76,7 +76,6 @@ Remove blockchain and state databases`, dbPruneHashTrieCmd, dbTrieGetCmd, dbTrieDeleteCmd, - ancientToDiffLayerCmd, }, } dbInspectCmd = &cli.Command{ @@ -260,13 +259,6 @@ WARNING: This is a low-level operation which may cause database corruption!`, }, utils.NetworkFlags, utils.DatabaseFlags), Description: "Shows metadata about the chain status.", } - ancientToDiffLayerCmd = &cli.Command{ - Name: "ancient-to-dl", - Usage: "Convert the data in ancientDB into diffLayer", - Description: "A convenient test tool to for path db diffLayer converting", - Action: ancientToDiffLayer, - Flags: flags.Merge(utils.DatabaseFlags), - } ) func removeDB(ctx *cli.Context) error { @@ -1098,21 +1090,6 @@ func hbss2pbss(ctx *cli.Context) error { return nil } -func ancientToDiffLayer(ctx *cli.Context) error { - stack, _ := makeConfigNode(ctx) - defer stack.Close() - db := utils.MakeChainDatabase(ctx, stack, true) - defer db.Close() - triedb := utils.MakeTrieDatabase(ctx, stack, db, false, true, false) - // triedb := trie.NewDatabase(db, &trie.Config{PathDB: nil}) - defer triedb.Close() - - if err := triedb.DiffLayerConvertTool(); err != nil { - log.Error("Failed to get diff layer from ancient db", "error", err) - } - return nil -} - func pruneHashTrie(ctx *cli.Context) error { if ctx.NArg() != 0 { return fmt.Errorf("required none argument") diff --git a/cmd/geth/main.go b/cmd/geth/main.go index f4b960b1fa..23761964c4 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -238,7 +238,6 @@ func init() { removedbCommand, dumpCommand, dumpGenesisCommand, - dumpRootHashCommand, // See accountcmd.go: accountCommand, walletCommand, diff --git a/core/blockchain.go b/core/blockchain.go index 2250ccd6c6..16da82b6b8 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -165,8 +165,8 @@ type CacheConfig struct { KeepProofBlockSpan uint64 // Block span of keep proof SnapshotNoBuild bool // Whether the background generation is allowed SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it - JournalFilePath string - JournalFile bool + JournalFilePath string // The file path to journal pathdb diff layers + JournalFile bool // Whether to enable journal file TrieCommitInterval uint64 // Define a block height interval, commit trie every TrieCommitInterval block height. } diff --git a/core/rawdb/accessors_state.go b/core/rawdb/accessors_state.go index 9ce58e7d27..85029d858a 100644 --- a/core/rawdb/accessors_state.go +++ b/core/rawdb/accessors_state.go @@ -224,43 +224,60 @@ func ReadStateStorageHistory(db ethdb.AncientReaderOp, id uint64) []byte { return blob } +// ReadStateTrieNodesHistory retrieves the trie nodes corresponding to the specified +// state history. Compute the position of state history in freezer by minus one +// since the id of first state history starts from one(zero for initial state). +func ReadStateTrieNodesHistory(db ethdb.AncientReaderOp, id uint64) []byte { + blob, err := db.Ancient(stateHistoryTrieNodesData, id-1) + if err != nil { + return nil + } + return blob +} + // ReadStateHistory retrieves the state history from database with provided id. // Compute the position of state history in freezer by minus one since the id // of first state history starts from one(zero for initial state). -func ReadStateHistory(db ethdb.AncientReaderOp, id uint64) ([]byte, []byte, []byte, []byte, []byte, error) { +func ReadStateHistory(db ethdb.AncientReaderOp, id uint64) ([]byte, []byte, []byte, []byte, []byte, []byte, error) { meta, err := db.Ancient(stateHistoryMeta, id-1) if err != nil { - return nil, nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, nil, err } accountIndex, err := db.Ancient(stateHistoryAccountIndex, id-1) if err != nil { - return nil, nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, nil, err } storageIndex, err := db.Ancient(stateHistoryStorageIndex, id-1) if err != nil { - return nil, nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, nil, err } accountData, err := db.Ancient(stateHistoryAccountData, id-1) if err != nil { - return nil, nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, nil, err } storageData, err := db.Ancient(stateHistoryStorageData, id-1) if err != nil { - return nil, nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, nil, err + } + trieNodesData, err := db.Ancient(stateHistoryStorageData, id-1) + if err != nil { + return nil, nil, nil, nil, nil, nil, err } - return meta, accountIndex, storageIndex, accountData, storageData, nil + return meta, accountIndex, storageIndex, accountData, storageData, trieNodesData, nil } // WriteStateHistory writes the provided state history to database. Compute the // position of state history in freezer by minus one since the id of first state // history starts from one(zero for initial state). -func WriteStateHistory(db ethdb.AncientWriter, id uint64, meta []byte, accountIndex []byte, storageIndex []byte, accounts []byte, storages []byte) { +func WriteStateHistory(db ethdb.AncientWriter, id uint64, meta []byte, accountIndex []byte, storageIndex []byte, + accounts []byte, storages []byte, trieNodes []byte) { db.ModifyAncients(func(op ethdb.AncientWriteOp) error { op.AppendRaw(stateHistoryMeta, id-1, meta) op.AppendRaw(stateHistoryAccountIndex, id-1, accountIndex) op.AppendRaw(stateHistoryStorageIndex, id-1, storageIndex) op.AppendRaw(stateHistoryAccountData, id-1, accounts) op.AppendRaw(stateHistoryStorageData, id-1, storages) + op.AppendRaw(stateHistoryTrieNodesData, id-1, trieNodes) return nil }) } diff --git a/core/rawdb/ancient_scheme.go b/core/rawdb/ancient_scheme.go index d6b3f1b76d..181d1c413e 100644 --- a/core/rawdb/ancient_scheme.go +++ b/core/rawdb/ancient_scheme.go @@ -51,19 +51,21 @@ const ( stateHistoryTableSize = 2 * 1000 * 1000 * 1000 // stateHistoryAccountIndex indicates the name of the freezer state history table. - stateHistoryMeta = "history.meta" - stateHistoryAccountIndex = "account.index" - stateHistoryStorageIndex = "storage.index" - stateHistoryAccountData = "account.data" - stateHistoryStorageData = "storage.data" + stateHistoryMeta = "history.meta" + stateHistoryAccountIndex = "account.index" + stateHistoryStorageIndex = "storage.index" + stateHistoryAccountData = "account.data" + stateHistoryStorageData = "storage.data" + stateHistoryTrieNodesData = "trienodes.data" ) var stateFreezerNoSnappy = map[string]bool{ - stateHistoryMeta: true, - stateHistoryAccountIndex: false, - stateHistoryStorageIndex: false, - stateHistoryAccountData: false, - stateHistoryStorageData: false, + stateHistoryMeta: true, + stateHistoryAccountIndex: false, + stateHistoryStorageIndex: false, + stateHistoryAccountData: false, + stateHistoryStorageData: false, + stateHistoryTrieNodesData: false, } const ( diff --git a/eth/backend.go b/eth/backend.go index f2d02fca67..36d5931480 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -132,7 +132,19 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { log.Warn("Sanitizing invalid miner gas price", "provided", config.Miner.GasPrice, "updated", ethconfig.Defaults.Miner.GasPrice) config.Miner.GasPrice = new(big.Int).Set(ethconfig.Defaults.Miner.GasPrice) } - if config.NoPruning && config.TrieDirtyCache > 0 { + + // Assemble the Ethereum object + chainDb, err := stack.OpenDatabaseWithFreezer(ChainData, config.DatabaseCache, config.DatabaseHandles, + config.DatabaseFreezer, ChainDBNamespace, false) + if err != nil { + return nil, err + } + config.StateScheme, err = rawdb.ParseStateScheme(config.StateScheme, chainDb) + if err != nil { + return nil, err + } + + if config.StateScheme == rawdb.HashScheme && config.NoPruning && config.TrieDirtyCache > 0 { if config.SnapshotCache > 0 { config.TrieCleanCache += config.TrieDirtyCache * 3 / 5 config.SnapshotCache += config.TrieDirtyCache * 2 / 5 @@ -156,18 +168,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { "trie_clean_cache", common.StorageSize(config.TrieCleanCache)*1024*1024, "trie_dirty_cache", common.StorageSize(config.TrieDirtyCache)*1024*1024, "snapshot_cache", common.StorageSize(config.SnapshotCache)*1024*1024) - // Assemble the Ethereum object - chainDb, err := stack.OpenDatabaseWithFreezer(ChainData, config.DatabaseCache, config.DatabaseHandles, - config.DatabaseFreezer, ChainDBNamespace, false) - if err != nil { - return nil, err - } - scheme, err := rawdb.ParseStateScheme(config.StateScheme, chainDb) - if err != nil { - return nil, err - } + // Try to recover offline state pruning only in hash-based. - if scheme == rawdb.HashScheme { + if config.StateScheme == rawdb.HashScheme { if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb); err != nil { log.Error("Failed to recover state", "error", err) } @@ -235,7 +238,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { Preimages: config.Preimages, NoTries: config.NoTries, StateHistory: config.StateHistory, - StateScheme: scheme, + StateScheme: config.StateScheme, TrieCommitInterval: config.TrieCommitInterval, PathNodeBuffer: config.PathNodeBuffer, ProposeBlockInterval: config.ProposeBlockInterval, diff --git a/trie/database.go b/trie/database.go index fdad14c386..a49049c61b 100644 --- a/trie/database.go +++ b/trie/database.go @@ -363,21 +363,3 @@ func (db *Database) IsVerkle() bool { func (db *Database) Config() *Config { return db.config } - -// DiffLayerConvertTool -func (db *Database) DiffLayerConvertTool() error { - pdb, ok := db.backend.(*pathdb.Database) - if !ok { - return errors.New("not supported") - } - return pdb.ConvertTool1(&trieLoader{db: db}) -} - -func (db *Database) GetAllRooHash() [][]string { - pdb, ok := db.backend.(*pathdb.Database) - if !ok { - log.Error("Not supported") - return nil - } - return pdb.GetAllRooHash() -} diff --git a/trie/testutil/utils.go b/trie/testutil/utils.go index a75d0431b0..7339cffb7f 100644 --- a/trie/testutil/utils.go +++ b/trie/testutil/utils.go @@ -20,6 +20,7 @@ import ( crand "crypto/rand" "encoding/binary" mrand "math/rand" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -59,3 +60,14 @@ func RandomNode() *trienode.Node { val := RandBytes(100) return trienode.New(crypto.Keccak256Hash(val), val) } + +// RandomString generates specified length random string. +func RandomString() string { + mrand.New(mrand.NewSource(time.Now().UnixNano())) + const chars = "0123456789abcdefghijklmnopqrstuvwxyz" + characters := make([]byte, 10) + for i := range characters { + characters[i] = chars[mrand.Intn(len(chars))] + } + return string(characters) +} diff --git a/trie/triedb/pathdb/asyncnodebuffer.go b/trie/triedb/pathdb/asyncnodebuffer.go index 4e41e8173c..efa75911c8 100644 --- a/trie/triedb/pathdb/asyncnodebuffer.go +++ b/trie/triedb/pathdb/asyncnodebuffer.go @@ -30,7 +30,7 @@ type asyncnodebuffer struct { } // newAsyncNodeBuffer initializes the async node buffer with the provided nodes. -func newAsyncNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) *asyncnodebuffer { +func newAsyncNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) (*asyncnodebuffer, error) { if nodes == nil { nodes = make(map[common.Hash]map[string]*trienode.Node) } @@ -45,7 +45,7 @@ func newAsyncNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.No return &asyncnodebuffer{ current: newNodeCache(uint64(limit), size, nodes, layers), background: newNodeCache(uint64(limit), 0, make(map[common.Hash]map[string]*trienode.Node), 0), - } + }, nil } // node retrieves the trie node with given node info. @@ -213,6 +213,10 @@ func (a *asyncnodebuffer) proposedBlockReader(blockRoot common.Hash) (layer, err return nil, errors.New("async node buffer not support to get proposed block reader") } +func (a *asyncnodebuffer) getLatestStatus() (common.Hash, uint64, error) { + return common.Hash{}, 0, errors.New("unsupported method for async node buffer") +} + type nodecache struct { layers uint64 // The number of diff layers aggregated inside size uint64 // The size of aggregated writes diff --git a/trie/triedb/pathdb/database.go b/trie/triedb/pathdb/database.go index d3c7f8380e..82632d7c61 100644 --- a/trie/triedb/pathdb/database.go +++ b/trie/triedb/pathdb/database.go @@ -21,9 +21,6 @@ import ( "fmt" "io" "os" - "sort" - "strconv" - "strings" "sync" "time" @@ -162,7 +159,6 @@ type Database struct { freezer *rawdb.ResettableFreezer // Freezer for storing trie histories, nil possible in tests lock sync.RWMutex // Lock to prevent mutations from happening at the same time capLock sync.Mutex - tree2 *layerTree } // New attempts to load an already existing layer from a persistent key-value @@ -180,23 +176,28 @@ func New(diskdb ethdb.Database, config *Config) *Database { config: config, diskdb: diskdb, } - // Construct the layer tree by resolving the in-disk singleton state - // and in-memory layer journal. - db.tree = newLayerTree(db.loadLayers()) // Open the freezer for state history if the passed database contains an // ancient store. Otherwise, all the relevant functionalities are disabled. - // - // Because the freezer can only be opened once at the same time, this - // mechanism also ensures that at most one **non-readOnly** database - // is opened at the same time to prevent accidental mutation. if ancient, err := diskdb.AncientDatadir(); err == nil && ancient != "" && !db.readOnly { freezer, err := rawdb.NewStateFreezer(ancient, false) if err != nil { log.Crit("Failed to open state history freezer", "err", err) } db.freezer = freezer + } + // Construct the layer tree by resolving the in-disk singleton state + // and in-memory layer journal. + db.tree = newLayerTree(db.loadLayers()) + + // Open the freezer for state history if the passed database contains an + // ancient store. Otherwise, all the relevant functionalities are disabled. + // + // Because the freezer can only be opened once at the same time, this + // mechanism also ensures that at most one **non-readOnly** database + // is opened at the same time to prevent accidental mutation. + if db.freezer != nil && !db.readOnly { diskLayerID := db.tree.bottom().stateID() if diskLayerID == 0 { // Reset the entire state histories in case the trie database is @@ -215,7 +216,7 @@ func New(diskdb ethdb.Database, config *Config) *Database { } else { // Truncate the extra state histories above in freezer in case // it's not aligned with the disk layer. - pruned, err := truncateFromHead(db.diskdb, freezer, diskLayerID) + pruned, err := truncateFromHead(db.diskdb, db.freezer, diskLayerID) if err != nil { log.Crit("Failed to truncate extra state histories", "err", err) } @@ -361,7 +362,12 @@ func (db *Database) Enable(root common.Hash) error { } // Re-construct a new disk layer backed by persistent state // with **empty clean cache and node buffer**. - nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.NotifyKeep) + nb, err := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, + db.config.NotifyKeep, nil, false) + if err != nil { + log.Error("Failed to new trie node buffer", "error", err) + return err + } dl := newDiskLayer(root, 0, db, nil, nb) nb.setClean(dl.cleans) db.tree.reset(dl) @@ -413,12 +419,13 @@ func (db *Database) Recover(root common.Hash, loader triestate.TrieLoader) error // disk layer won't be accessible from outside. db.tree.reset(dl) } - db.DeleteTrieJournal(db.diskdb) - _, err := truncateFromHead(db.diskdb, db.freezer, dl.stateID()) + _ = db.DeleteTrieJournal(db.diskdb) + truncatedNumber, err := truncateFromHead(db.diskdb, db.freezer, dl.stateID()) if err != nil { return err } - log.Debug("Recovered state", "root", root, "elapsed", common.PrettyDuration(time.Since(start))) + log.Debug("Recovered state", "root", root, "elapsed", common.PrettyDuration(time.Since(start)), + "truncate number", truncatedNumber) return nil } @@ -584,229 +591,3 @@ func (db *Database) DeleteTrieJournal(writer ethdb.KeyValueWriter) error { } return nil } - -func (db *Database) ConvertTool1(loader triestate.TrieLoader) error { - dl := db.tree.bottom() - stateID := rawdb.ReadStateID(db.diskdb, dl.rootHash()) - persistentStateID := rawdb.ReadPersistentStateID(db.diskdb) - - ancient, err := db.diskdb.AncientDatadir() - if err != nil { - log.Error("Failed to get ancient datadir", "error", err) - return err - } - freezer, err := rawdb.NewStateFreezer(ancient, true) - if err != nil { - log.Error("Failed to new state freezer", "error", err) - return err - } - db.freezer = freezer - - freezerLength, err := db.freezer.Ancients() - if err != nil { - log.Error("Failed to get freezer ancients", "error", err) - return err - } - tail, err := db.freezer.Tail() - if err != nil { - log.Error("Failed to get freezer tail", "error", err) - return err - } - log.Info("Print ancient db meta", "state id", *stateID, "persistent state id", persistentStateID, - "freezer length", freezerLength, "freezer tail", tail, "config", db.config.StateHistory, - "bottom stateID", dl.stateID(), "bottom root", dl.rootHash().String()) - - db.tree2 = newLayerTree(dl) - waitingRecoverNum := freezerLength - tail - start := time.Now() - historySize := 0 - diffSize := uint64(0) - for i := uint64(0); i < waitingRecoverNum; i++ { - h, err := readHistory(db.freezer, *stateID-i) - if err != nil { - if checkError(err) { - log.Info("There are no more states in disk db", "state id", *stateID-i) - continue - } - log.Error("Failed to read history from freezer db", "error", err) - return err - } - historySize += h.Size() - log.Info("print history size", "size", common.StorageSize(h.Size()), "history root", h.meta.root.String(), - "history parent root", h.meta.parent.String(), "current state id", *stateID-i) - - incomplete := make(map[common.Address]struct{}) - for _, addr := range h.meta.incomplete { - incomplete[addr] = struct{}{} - } - states := triestate.New(h.accounts, h.storages, incomplete) - - size, err := db.addDiffLayer(h.meta.root, h.meta.parent, *stateID-i, h.meta.block, nil, states) - if err != nil { - log.Error("Failed to add diff layer", "error", err) - return err - } - diffSize += size - } - layerTreeSize := uint64(db.tree2.len() * 32) - log.Info("Succeed to add diff layer", "elapsed", common.PrettyDuration(time.Since(start)), - "waitingRecoverNum", waitingRecoverNum, "total history size", common.StorageSize(historySize), - "total diff size", common.StorageSize(diffSize), "layer tree size", common.StorageSize(layerTreeSize)) - - return nil -} - -func (db *Database) ConvertTool(loader triestate.TrieLoader) error { - dl := db.tree.bottom() - stateID := rawdb.ReadStateID(db.diskdb, dl.rootHash()) - persistentStateID := rawdb.ReadPersistentStateID(db.diskdb) - - ancient, err := db.diskdb.AncientDatadir() - if err != nil { - log.Error("Failed to get ancient datadir", "error", err) - return err - } - freezer, err := rawdb.NewStateFreezer(ancient, true) - if err != nil { - log.Error("Failed to new state freezer", "error", err) - return err - } - db.freezer = freezer - - freezerLength, err := db.freezer.Ancients() - if err != nil { - log.Error("Failed to get freezer ancients", "error", err) - return err - } - tail, err := db.freezer.Tail() - if err != nil { - log.Error("Failed to get freezer tail", "error", err) - return err - } - log.Info("Print ancient db meta", "state id", *stateID, "persistent state id", persistentStateID, - "freezer length", freezerLength, "freezer tail", tail, "config", db.config.StateHistory, - "bottom stateID", dl.stateID(), "bottom root", dl.rootHash().String()) - - db.tree2 = newLayerTree(dl) - waitingRecoverNum := freezerLength - persistentStateID - start := time.Now() - var ( - nodes *trienode.MergedNodeSet - count = uint64(2) - ) - for { - h, err := readHistory(db.freezer, *stateID+count) - if err != nil { - if checkError(err) { - log.Info("There are no more states in disk db", "count", count) - break - } - log.Error("Failed to read history from freezer db", "error", err) - return err - } - log.Info("print history size", "size", h.Size(), "history root", h.meta.root.String(), - "history parent root", h.meta.parent.String(), "current state id", *stateID+count) - - if count > 2 { - break - } - dl, nodes, err = dl.apply(dl.rootHash(), h, loader) - if err != nil { - log.Error("Failed to revert", "error", err) - return err - } - db.tree.reset(dl) - - incomplete := make(map[common.Address]struct{}) - for _, addr := range h.meta.incomplete { - incomplete[addr] = struct{}{} - } - states := triestate.New(h.accounts, h.storages, incomplete) - - if _, err = db.addDiffLayer(h.meta.root, h.meta.parent, *stateID+count, h.meta.block, nodes, states); err != nil { - log.Error("Failed to add diff layer", "error", err) - return err - } - count++ - } - // for i := uint64(0); i < 2; i++ { - // h, err := readHistory(db.freezer, *stateID-i) - // if err != nil { - // log.Error("Failed to read history from freezer db", "error", err) - // return err - // } - // log.Info("print history size", "size", h.Size(), "history root", h.meta.root.String(), - // "history parent root", h.meta.parent.String(), "current state id", *stateID-i) - // - // dl, nodes, err = dl.revert1(h, loader) - // if err != nil { - // log.Error("Failed to revert", "error", err) - // return err - // } - // db.tree.reset(dl) - // - // incomplete := make(map[common.Address]struct{}) - // for _, addr := range h.meta.incomplete { - // incomplete[addr] = struct{}{} - // } - // states := triestate.New(h.accounts, h.storages, incomplete) - // - // if err = db.addDiffLayer(h.meta.root, h.meta.parent, *stateID-i, h.meta.block, nodes, states); err != nil { - // log.Error("Failed to add diff layer", "error", err) - // return err - // } - // } - log.Info("Succeed to add diff layer", "elapsed", common.PrettyDuration(time.Since(start)), - "waitingRecoverNum", waitingRecoverNum) - - return nil -} - -func (db *Database) addDiffLayer(root common.Hash, parentRoot common.Hash, stateID uint64, block uint64, - nodes *trienode.MergedNodeSet, states *triestate.Set) (uint64, error) { - // Hold the lock to prevent concurrent mutations. - db.lock.Lock() - defer db.lock.Unlock() - - root, parentRoot = types.TrieRootHash(root), types.TrieRootHash(parentRoot) - if root == parentRoot { - return 0, errors.New("layer cycle") - } - // TODO: parent now is nil - l := newDiffLayer(nil, root, stateID, block, nil, states) - - // TODO: no need to use lock now - // db.tree2.lock.Lock() - db.tree2.layers[l.rootHash()] = l - // db.tree2.lock.Unlock() - - log.Info("done", "layer tree length", db.tree2.len(), "size", common.StorageSize(l.memory)) - return l.memory, nil -} - -func checkError(err error) bool { - if strings.Contains(err.Error(), "state history not found") { - return true - } - return false -} - -func (db *Database) GetAllRooHash() [][]string { - db.lock.Lock() - defer db.lock.Unlock() - - data := make([][]string, 0, len(db.tree.layers)) - for _, v := range db.tree.layers { - if dl, ok := v.(*diffLayer); ok { - data = append(data, []string{fmt.Sprintf("%d", dl.block), dl.rootHash().String()}) - } - } - sort.Slice(data, func(i, j int) bool { - block1, _ := strconv.Atoi(data[i][0]) - block2, _ := strconv.Atoi(data[j][0]) - return block1 > block2 - }) - - data = append(data, []string{"-1", db.tree.bottom().rootHash().String()}) - return data -} diff --git a/trie/triedb/pathdb/difflayer_test.go b/trie/triedb/pathdb/difflayer_test.go index 7c3d97ddfb..05f70e9b40 100644 --- a/trie/triedb/pathdb/difflayer_test.go +++ b/trie/triedb/pathdb/difflayer_test.go @@ -27,9 +27,10 @@ import ( ) func emptyLayer() *diskLayer { + buffer, _ := newNodeBuffer(DefaultBufferSize, nil, 0) return &diskLayer{ db: New(rawdb.NewMemoryDatabase(), nil), - buffer: newNodeBuffer(DefaultBufferSize, nil, 0), + buffer: buffer, } } diff --git a/trie/triedb/pathdb/disklayer.go b/trie/triedb/pathdb/disklayer.go index d1bcfade6a..26f49d471c 100644 --- a/trie/triedb/pathdb/disklayer.go +++ b/trie/triedb/pathdb/disklayer.go @@ -81,6 +81,9 @@ type trienodebuffer interface { // proposedBlockReader return the world state Reader of block that is proposed to L1. proposedBlockReader(blockRoot common.Hash) (layer, error) + + // getLatestStatus returns latest status for disk layer + getLatestStatus() (common.Hash, uint64, error) } type NodeBufferType int32 @@ -120,11 +123,13 @@ func NewTrieNodeBuffer( nodes map[common.Hash]map[string]*trienode.Node, layers, proposeBlockInterval uint64, keepFunc NotifyKeepFunc, -) trienodebuffer { + freezer *rawdb.ResettableFreezer, + recovery bool, +) (trienodebuffer, error) { log.Info("init trie node buffer", "type", nodeBufferTypeToString[trieNodeBufferType]) switch trieNodeBufferType { case NodeBufferList: - return newNodeBufferList(db, uint64(limit), nodes, layers, proposeBlockInterval, keepFunc) + return newNodeBufferList(db, uint64(limit), nodes, layers, proposeBlockInterval, keepFunc, freezer, recovery) case AsyncNodeBuffer: return newAsyncNodeBuffer(limit, nodes, layers) case SyncNodeBuffer: @@ -343,7 +348,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { return ndl, nil } oldest = targetOldest - log.Info("Forcing prune ancient under nodebufferlist", "disk_persistent_state_id", + log.Debug("Forcing prune ancient under nodebufferlist", "disk_persistent_state_id", persistentID, "truncate_tail", oldest) } @@ -404,48 +409,15 @@ func (dl *diskLayer) revert(h *history, loader triestate.TrieLoader) (*diskLayer if err := batch.Write(); err != nil { log.Crit("Failed to write states", "err", err) } + if nl, ok := dl.buffer.(*nodebufferlist); ok { + if nl.persistID != 0 { + nl.persistID-- + } + } } return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer), nil } -func (dl *diskLayer) apply(prevRoot common.Hash, h *history, loader triestate.TrieLoader) (*diskLayer, *trienode.MergedNodeSet, error) { - // if h.meta.parent != dl.rootHash() { - // return nil, nil, errUnexpectedHistory - // } - // Reject if the provided state history is incomplete. It's due to - // a large construct SELF-DESTRUCT which can't be handled because - // of memory limitation. - if len(h.meta.incomplete) > 0 { - return nil, nil, errors.New("incomplete state history") - } - if dl.id == 0 { - return nil, nil, fmt.Errorf("%w: zero state id", errStateUnrecoverable) - } - // Apply the reverse state changes upon the current state. This must - // be done before holding the lock in order to access state in "this" - // layer. - set, err := triestate.ApplyForDiff(prevRoot, h.meta.parent, h.accounts, h.storages, loader) - if err != nil { - log.Error("Failed to apply state diffs", "error", err) - return nil, nil, err - } - // Mark the diskLayer as stale before applying any mutations on top. - dl.lock.Lock() - defer dl.lock.Unlock() - - dl.stale = true - - // nodes := set.Flatten() - // batch := dl.db.diskdb.NewBatch() - // writeNodes(batch, nodes, dl.cleans) - // rawdb.WritePersistentStateID(batch, dl.id+1) - // if err = batch.Write(); err != nil { - // log.Crit("Failed to write states", "err", err) - // } - - return newDiskLayer(h.meta.parent, dl.id+1, dl.db, dl.cleans, dl.buffer), set, nil -} - // setBufferSize sets the node buffer size to the provided value. func (dl *diskLayer) setBufferSize(size int) error { dl.lock.RLock() diff --git a/trie/triedb/pathdb/history.go b/trie/triedb/pathdb/history.go index be58de0811..f7a212d99c 100644 --- a/trie/triedb/pathdb/history.go +++ b/trie/triedb/pathdb/history.go @@ -23,12 +23,15 @@ import ( "fmt" "time" + "golang.org/x/exp/slices" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie/trienode" "github.com/ethereum/go-ethereum/trie/triestate" - "golang.org/x/exp/slices" ) // State history records the state changes involved in executing a block. The @@ -250,10 +253,11 @@ type history struct { accountList []common.Address // Sorted account hash list storages map[common.Address]map[common.Hash][]byte // Storage data keyed by its address hash and slot hash storageList map[common.Address][]common.Hash // Sorted slot hash list + nodes []journalNodes // Changed trie nodes } // newHistory constructs the state history object with provided state change set. -func newHistory(root common.Hash, parent common.Hash, block uint64, states *triestate.Set) *history { +func newHistory(root common.Hash, parent common.Hash, block uint64, states *triestate.Set, nodes map[common.Hash]map[string]*trienode.Node) *history { var ( accountList []common.Address storageList = make(map[common.Address][]common.Hash) @@ -289,12 +293,13 @@ func newHistory(root common.Hash, parent common.Hash, block uint64, states *trie accountList: accountList, storages: states.Storages, storageList: storageList, + nodes: compressTrieNodes(nodes), } } // encode serializes the state history and returns four byte streams represent // concatenated account/storage data, account/storage indexes respectively. -func (h *history) encode() ([]byte, []byte, []byte, []byte) { +func (h *history) encode() ([]byte, []byte, []byte, []byte, []byte) { var ( slotNumber uint32 // the number of processed slots accountData []byte // the buffer for concatenated account data @@ -328,7 +333,13 @@ func (h *history) encode() ([]byte, []byte, []byte, []byte) { accountData = append(accountData, h.accounts[addr]...) accountIndexes = append(accountIndexes, accIndex.encode()...) } - return accountData, storageData, accountIndexes, storageIndexes + + nodesBytes, err := rlp.EncodeToBytes(h.nodes) + if err != nil { + log.Error("Failed to encode trie nodes", "error", err) + } + + return accountData, storageData, accountIndexes, storageIndexes, nodesBytes } // decoder wraps the byte streams for decoding with extra meta fields. @@ -446,12 +457,13 @@ func (r *decoder) readStorage(accIndex accountIndex) ([]common.Hash, map[common. } // decode deserializes the account and storage data from the provided byte stream. -func (h *history) decode(accountData, storageData, accountIndexes, storageIndexes []byte) error { +func (h *history) decode(accountData, storageData, accountIndexes, storageIndexes, trieNodes []byte) error { var ( - accounts = make(map[common.Address][]byte) - storages = make(map[common.Address]map[common.Hash][]byte) - accountList []common.Address - storageList = make(map[common.Address][]common.Hash) + accounts = make(map[common.Address][]byte) + storages = make(map[common.Address]map[common.Hash][]byte) + accountList []common.Address + storageList = make(map[common.Address][]common.Hash) + decodedTrieNodes []journalNodes r = &decoder{ accountData: accountData, @@ -482,10 +494,17 @@ func (h *history) decode(accountData, storageData, accountIndexes, storageIndexe storages[accIndex.address] = slotData } } + + if err := rlp.DecodeBytes(trieNodes, &decodedTrieNodes); err != nil { + log.Error("Failed to decode state history trie nodes", "error", err) + return err + } + h.accounts = accounts h.accountList = accountList h.storages = storages h.storageList = storageList + h.nodes = decodedTrieNodes return nil } @@ -519,10 +538,60 @@ func (h *history) Size() int { for _, slots := range h.storageList { size += len(slots) * common.HashLength } + // calculate size of journalNodes + for _, jn := range h.nodes { + size += common.AddressLength + for _, n := range jn.Nodes { + size += len(n.Path) + size += len(n.Blob) + } + } + + return size +} +func (h *history) trieNodesSize() int { + size := 0 + for _, jn := range h.nodes { + size += common.AddressLength + for _, n := range jn.Nodes { + size += len(n.Path) + size += len(n.Blob) + } + } return size } +// readBlockNumber reads and decodes the state history meta and returns block number. +func readBlockNumber(freezer *rawdb.ResettableFreezer, stateID uint64) (uint64, error) { + blob := rawdb.ReadStateHistoryMeta(freezer, stateID) + if len(blob) == 0 { + return 0, fmt.Errorf("state history not found %d", stateID) + } + var m meta + if err := m.decode(blob); err != nil { + return 0, err + } + return m.block, nil +} + +// readAllBlockNumbers returns all block number to stateID map. +func readAllBlockNumbers(freezer *rawdb.ResettableFreezer, startStateID, endStateID uint64) (map[uint64]uint64, error) { + blockMap := make(map[uint64]uint64) + for i := startStateID; i <= endStateID; i++ { + blob := rawdb.ReadStateHistoryMeta(freezer, i) + if len(blob) == 0 { + return nil, fmt.Errorf("state history not found %d", i) + } + var m meta + if err := m.decode(blob); err != nil { + return nil, err + } + blockMap[m.block] = i + } + return blockMap, nil +} + // readHistory reads and decodes the state history object by the given id. func readHistory(freezer *rawdb.ResettableFreezer, id uint64) (*history, error) { blob := rawdb.ReadStateHistoryMeta(freezer, id) @@ -539,8 +608,9 @@ func readHistory(freezer *rawdb.ResettableFreezer, id uint64) (*history, error) storageData = rawdb.ReadStateStorageHistory(freezer, id) accountIndexes = rawdb.ReadStateAccountIndex(freezer, id) storageIndexes = rawdb.ReadStateStorageIndex(freezer, id) + trieNodes = rawdb.ReadStateTrieNodesHistory(freezer, id) ) - if err := dec.decode(accountData, storageData, accountIndexes, storageIndexes); err != nil { + if err := dec.decode(accountData, storageData, accountIndexes, storageIndexes, trieNodes); err != nil { return nil, err } return &dec, nil @@ -554,18 +624,20 @@ func writeHistory(freezer *rawdb.ResettableFreezer, dl *diffLayer) error { } var ( start = time.Now() - history = newHistory(dl.rootHash(), dl.parentLayer().rootHash(), dl.block, dl.states) + history = newHistory(dl.rootHash(), dl.parentLayer().rootHash(), dl.block, dl.states, dl.nodes) ) - accountData, storageData, accountIndex, storageIndex := history.encode() + accountData, storageData, accountIndex, storageIndex, trieNodes := history.encode() dataSize := common.StorageSize(len(accountData) + len(storageData)) indexSize := common.StorageSize(len(accountIndex) + len(storageIndex)) // Write history data into five freezer table respectively. - rawdb.WriteStateHistory(freezer, dl.stateID(), history.meta.encode(), accountIndex, storageIndex, accountData, storageData) + rawdb.WriteStateHistory(freezer, dl.stateID(), history.meta.encode(), accountIndex, storageIndex, accountData, storageData, trieNodes) historyDataBytesMeter.Mark(int64(dataSize)) historyIndexBytesMeter.Mark(int64(indexSize)) historyBuildTimeMeter.UpdateSince(start) + historyTotalSizeMeter1.Mark(int64(history.Size())) + historyTrieNodesSizeMeter.Mark(int64(history.trieNodesSize())) log.Debug("Stored state history", "id", dl.stateID(), "block", dl.block, "data", dataSize, "index", indexSize, "elapsed", common.PrettyDuration(time.Since(start))) return nil diff --git a/trie/triedb/pathdb/history_test.go b/trie/triedb/pathdb/history_test.go index fee8ecdb1e..30679711d9 100644 --- a/trie/triedb/pathdb/history_test.go +++ b/trie/triedb/pathdb/history_test.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie/testutil" + "github.com/ethereum/go-ethereum/trie/trienode" "github.com/ethereum/go-ethereum/trie/triestate" ) @@ -50,8 +51,24 @@ func randomStateSet(n int) *triestate.Set { return triestate.New(accounts, storages, nil) } +func randomTrieNodes(n int) map[common.Hash]map[string]*trienode.Node { + trieNodes := make(map[common.Hash]map[string]*trienode.Node) + addr := testutil.RandomHash() + next := make(map[string]*trienode.Node) + trieNodes[addr] = next + for i := 0; i < n; i++ { + nodes := &trienode.Node{ + Hash: testutil.RandomHash(), + Blob: testutil.RandBytes(3), + } + path := testutil.RandomString() + trieNodes[addr][path] = nodes + } + return trieNodes +} + func makeHistory() *history { - return newHistory(testutil.RandomHash(), types.EmptyRootHash, 0, randomStateSet(3)) + return newHistory(testutil.RandomHash(), types.EmptyRootHash, 0, randomStateSet(3), randomTrieNodes(3)) } func makeHistories(n int) []*history { @@ -61,7 +78,7 @@ func makeHistories(n int) []*history { ) for i := 0; i < n; i++ { root := testutil.RandomHash() - h := newHistory(root, parent, uint64(i), randomStateSet(3)) + h := newHistory(root, parent, uint64(i), randomStateSet(3), randomTrieNodes(3)) parent = root result = append(result, h) } @@ -84,8 +101,8 @@ func TestEncodeDecodeHistory(t *testing.T) { } // check if account/storage data can be correctly encode/decode - accountData, storageData, accountIndexes, storageIndexes := obj.encode() - if err := dec.decode(accountData, storageData, accountIndexes, storageIndexes); err != nil { + accountData, storageData, accountIndexes, storageIndexes, trieNodes := obj.encode() + if err := dec.decode(accountData, storageData, accountIndexes, storageIndexes, trieNodes); err != nil { t.Fatalf("Failed to decode, err: %v", err) } if !compareSet(dec.accounts, obj.accounts) { @@ -134,8 +151,8 @@ func TestTruncateHeadHistory(t *testing.T) { defer freezer.Close() for i := 0; i < len(hs); i++ { - accountData, storageData, accountIndex, storageIndex := hs[i].encode() - rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData) + accountData, storageData, accountIndex, storageIndex, trieNodes := hs[i].encode() + rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData, trieNodes) rawdb.WriteStateID(db, hs[i].meta.root, uint64(i+1)) roots = append(roots, hs[i].meta.root) } @@ -162,8 +179,8 @@ func TestTruncateTailHistory(t *testing.T) { defer freezer.Close() for i := 0; i < len(hs); i++ { - accountData, storageData, accountIndex, storageIndex := hs[i].encode() - rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData) + accountData, storageData, accountIndex, storageIndex, trieNodes := hs[i].encode() + rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData, trieNodes) rawdb.WriteStateID(db, hs[i].meta.root, uint64(i+1)) roots = append(roots, hs[i].meta.root) } @@ -205,8 +222,8 @@ func TestTruncateTailHistories(t *testing.T) { defer freezer.Close() for i := 0; i < len(hs); i++ { - accountData, storageData, accountIndex, storageIndex := hs[i].encode() - rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData) + accountData, storageData, accountIndex, storageIndex, trieNodes := hs[i].encode() + rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData, trieNodes) rawdb.WriteStateID(db, hs[i].meta.root, uint64(i+1)) roots = append(roots, hs[i].meta.root) } @@ -233,8 +250,8 @@ func TestTruncateOutOfRange(t *testing.T) { defer freezer.Close() for i := 0; i < len(hs); i++ { - accountData, storageData, accountIndex, storageIndex := hs[i].encode() - rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData) + accountData, storageData, accountIndex, storageIndex, trieNodes := hs[i].encode() + rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData, trieNodes) rawdb.WriteStateID(db, hs[i].meta.root, uint64(i+1)) } truncateFromTail(db, freezer, uint64(len(hs)/2)) diff --git a/trie/triedb/pathdb/journal.go b/trie/triedb/pathdb/journal.go index c6358f6808..b6608fe42b 100644 --- a/trie/triedb/pathdb/journal.go +++ b/trie/triedb/pathdb/journal.go @@ -260,9 +260,40 @@ func (db *Database) loadLayers() layer { if !(root == types.EmptyRootHash && errors.Is(err, errMissJournal)) { log.Info("Failed to load journal, discard it", "err", err) } - // Return single layer with persistent state. - nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.NotifyKeep) - dl := newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, nb) + + var ( + nb trienodebuffer + dl *diskLayer + stateID = rawdb.ReadPersistentStateID(db.diskdb) + ) + if (errors.Is(err, errMissJournal) || errors.Is(err, errUnmatchedJournal)) && db.config.TrieNodeBufferType == NodeBufferList { + start := time.Now() + if db.freezer == nil { + log.Crit("Use unopened freezer db to recover node buffer list") + } + log.Info("Recover node buffer list from ancient db") + + nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, + db.config.ProposeBlockInterval, db.config.NotifyKeep, db.freezer, true) + if err != nil { + log.Error("Failed to new trie node buffer for recovery", "error", err) + } else { + root, stateID, _ = nb.getLatestStatus() + log.Info("Finish recovering node buffer list", "elapsed", common.PrettyDuration(time.Since(start)), + "latest root hash", root.String(), "latest state_id", stateID) + } + } + if nb == nil || err != nil { + // Return single layer with persistent state. + nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, + db.config.ProposeBlockInterval, db.config.NotifyKeep, nil, false) + if err != nil { + log.Crit("Failed to new trie node buffer", "error", err) + return nil + } + } + + dl = newDiskLayer(root, stateID, db, nil, nb) nb.setClean(dl.cleans) return dl } @@ -330,7 +361,12 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp } // Calculate the internal state transitions by id difference. - nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval, db.config.NotifyKeep) + nb, err := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval, + db.config.NotifyKeep, nil, false) + if err != nil { + log.Error("Failed to new trie node buffer", "error", err) + return nil, err + } base := newDiskLayer(root, id, db, nil, nb) nb.setClean(base.cleans) return base, nil @@ -374,18 +410,7 @@ func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream, journalTypeForRea if err := journalBuf.Decode(&encoded); err != nil { return nil, fmt.Errorf("failed to load diff nodes: %v", err) } - nodes := make(map[common.Hash]map[string]*trienode.Node) - for _, entry := range encoded { - subset := make(map[string]*trienode.Node) - for _, n := range entry.Nodes { - if len(n.Blob) > 0 { - subset[string(n.Path)] = trienode.New(crypto.Keccak256Hash(n.Blob), n.Blob) - } else { - subset[string(n.Path)] = trienode.NewDeleted() - } - } - nodes[entry.Owner] = subset - } + nodes := flattenTrieNodes(encoded) // Read state changes from journal var ( jaccounts journalAccounts @@ -509,14 +534,7 @@ func (dl *diffLayer) journal(w io.Writer, journalType JournalType) error { return err } // Write the accumulated trie nodes into buffer - nodes := make([]journalNodes, 0, len(dl.nodes)) - for owner, subset := range dl.nodes { - entry := journalNodes{Owner: owner} - for path, node := range subset { - entry.Nodes = append(entry.Nodes, journalNode{Path: []byte(path), Blob: node.Blob}) - } - nodes = append(nodes, entry) - } + nodes := compressTrieNodes(dl.nodes) if err := rlp.Encode(journalBuf, nodes); err != nil { return err } @@ -623,3 +641,33 @@ func (db *Database) Journal(root common.Hash) error { log.Info("Persisted dirty state to disk", "size", common.StorageSize(journalSize), "elapsed", common.PrettyDuration(time.Since(start))) return nil } + +// compressTrieNodes returns a compressed journal nodes slice. +func compressTrieNodes(nodes map[common.Hash]map[string]*trienode.Node) []journalNodes { + jn := make([]journalNodes, 0, len(nodes)) + for owner, subset := range nodes { + entry := journalNodes{Owner: owner} + for path, node := range subset { + entry.Nodes = append(entry.Nodes, journalNode{Path: []byte(path), Blob: node.Blob}) + } + jn = append(jn, entry) + } + return jn +} + +// flattenTrieNodes returns a two-dimensional map for internal nodes. +func flattenTrieNodes(jn []journalNodes) map[common.Hash]map[string]*trienode.Node { + nodes := make(map[common.Hash]map[string]*trienode.Node) + for _, entry := range jn { + subset := make(map[string]*trienode.Node) + for _, n := range entry.Nodes { + if len(n.Blob) > 0 { + subset[string(n.Path)] = trienode.New(crypto.Keccak256Hash(n.Blob), n.Blob) + } else { + subset[string(n.Path)] = trienode.NewDeleted() + } + } + nodes[entry.Owner] = subset + } + return nodes +} diff --git a/trie/triedb/pathdb/journal_test.go b/trie/triedb/pathdb/journal_test.go new file mode 100644 index 0000000000..ff976938e2 --- /dev/null +++ b/trie/triedb/pathdb/journal_test.go @@ -0,0 +1,37 @@ +package pathdb + +import ( + "testing" + + "github.com/ethereum/go-ethereum/trie/testutil" + "github.com/stretchr/testify/assert" +) + +func randomJournalNodes(n int) []journalNodes { + jns := make([]journalNodes, 0, n) + for i := 0; i < n; i++ { + jn := make([]journalNode, 0, n) + for j := 0; j < n; j++ { + jn = append(jn, journalNode{ + Path: testutil.RandBytes(n), + Blob: testutil.RandBytes(n), + }) + } + jns = append(jns, journalNodes{ + Owner: testutil.RandomHash(), + Nodes: jn, + }) + } + return jns +} + +func TestCompressTrieNodes(t *testing.T) { + trieNodes := randomTrieNodes(3) + jn := compressTrieNodes(trieNodes) + assert.Equal(t, 1, len(jn)) +} + +func TestFlattenTrieNodes(t *testing.T) { + jn := flattenTrieNodes(randomJournalNodes(3)) + assert.Equal(t, 3, len(jn)) +} diff --git a/trie/triedb/pathdb/metrics.go b/trie/triedb/pathdb/metrics.go index 59191d5d92..d5b3117d4d 100644 --- a/trie/triedb/pathdb/metrics.go +++ b/trie/triedb/pathdb/metrics.go @@ -68,4 +68,8 @@ var ( diffHashCacheMissMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/miss", nil) diffHashCacheSlowPathMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/slowpath", nil) diffHashCacheLengthGauge = metrics.NewRegisteredGauge("pathdb/difflayer/hashcache/size", nil) + + // temp metrics + historyTotalSizeMeter1 = metrics.NewRegisteredMeter("pathdb/history/total", nil) + historyTrieNodesSizeMeter = metrics.NewRegisteredMeter("pathdb/history/trienodes", nil) ) diff --git a/trie/triedb/pathdb/nodebuffer.go b/trie/triedb/pathdb/nodebuffer.go index 4ecd83b4fb..6e72eb36ad 100644 --- a/trie/triedb/pathdb/nodebuffer.go +++ b/trie/triedb/pathdb/nodebuffer.go @@ -43,7 +43,7 @@ type nodebuffer struct { } // newNodeBuffer initializes the node buffer with the provided nodes. -func newNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) *nodebuffer { +func newNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) (*nodebuffer, error) { if nodes == nil { nodes = make(map[common.Hash]map[string]*trienode.Node) } @@ -60,7 +60,7 @@ func newNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.Node, l nodes: nodes, size: size, limit: uint64(limit), - } + }, nil } // node retrieves the trie node with given node info. @@ -305,5 +305,9 @@ func (b *nodebuffer) setClean(clean *fastcache.Cache) { // proposedBlockReader return the world state Reader of block that is proposed to L1. func (b *nodebuffer) proposedBlockReader(blockRoot common.Hash) (layer, error) { - return nil, errors.New("anode buffer not support to get proposed block reader") + return nil, errors.New("node buffer not support to get proposed block reader") +} + +func (b *nodebuffer) getLatestStatus() (common.Hash, uint64, error) { + return common.Hash{}, 0, errors.New("unsupported method for node buffer") } diff --git a/trie/triedb/pathdb/nodebufferlist.go b/trie/triedb/pathdb/nodebufferlist.go index 12d055b06d..d7c81aebee 100644 --- a/trie/triedb/pathdb/nodebufferlist.go +++ b/trie/triedb/pathdb/nodebufferlist.go @@ -4,11 +4,14 @@ import ( "errors" "fmt" "io" + "sort" "sync" "sync/atomic" "time" "github.com/VictoriaMetrics/fastcache" + "golang.org/x/sync/errgroup" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/crypto" @@ -82,7 +85,10 @@ func newNodeBufferList( nodes map[common.Hash]map[string]*trienode.Node, layers uint64, proposeBlockInterval uint64, - keepFunc NotifyKeepFunc) *nodebufferlist { + keepFunc NotifyKeepFunc, + freezer *rawdb.ResettableFreezer, + recovery bool, +) (*nodebufferlist, error) { var ( rsevMdNum uint64 dlInMd uint64 @@ -110,31 +116,199 @@ func newNodeBufferList( } } base := newMultiDifflayer(limit, size, common.Hash{}, nodes, layers) - ele := newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0) - nf := &nodebufferlist{ + + var ( + nf *nodebufferlist + err error + ) + if recovery { + nf, err = recoverNodeBufferList(db, freezer, base, limit, wpBlocks, rsevMdNum, dlInMd) + if err != nil { + log.Error("Failed to recover node buffer list", "error", err) + return nil, err + } + } else { + ele := newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0) + nf = &nodebufferlist{ + db: db, + wpBlocks: wpBlocks, + rsevMdNum: rsevMdNum, + dlInMd: dlInMd, + limit: limit, + base: base, + head: ele, + tail: ele, + count: 1, + persistID: rawdb.ReadPersistentStateID(db), + stopCh: make(chan struct{}), + waitStopCh: make(chan struct{}), + forceKeepCh: make(chan struct{}), + waitForceKeepCh: make(chan struct{}), + keepFunc: keepFunc, + } + } + + go nf.loop() + + log.Info("new node buffer list", "proposed block interval", nf.wpBlocks, + "reserve multi difflayers", nf.rsevMdNum, "difflayers in multidifflayer", nf.dlInMd, + "limit", common.StorageSize(limit), "layers", layers, "persist id", nf.persistID, "base_size", size) + return nf, nil +} + +// recoverNodeBufferList recovers node buffer list +func recoverNodeBufferList(db ethdb.Database, freezer *rawdb.ResettableFreezer, base *multiDifflayer, + limit, wpBlocks, rsevMdNum, dlInMd uint64) (*nodebufferlist, error) { + nbl := &nodebufferlist{ db: db, wpBlocks: wpBlocks, rsevMdNum: rsevMdNum, dlInMd: dlInMd, limit: limit, base: base, - head: ele, - tail: ele, - count: 1, persistID: rawdb.ReadPersistentStateID(db), stopCh: make(chan struct{}), waitStopCh: make(chan struct{}), forceKeepCh: make(chan struct{}), waitForceKeepCh: make(chan struct{}), - keepFunc: keepFunc, } + head, err := freezer.Ancients() + if err != nil { + log.Error("Failed to get freezer ancients", "error", err) + return nil, err + } + tail, err := freezer.Tail() + if err != nil { + log.Error("Failed to get freezer tail", "error", err) + return nil, err + } + log.Info("Ancient db meta info", "persistent_state_id", nbl.persistID, "head_state_id", head, + "tail_state_id", tail, "waiting_recover_num", head-nbl.persistID) - go nf.loop() + startStateID := nbl.persistID + 1 + startBlock, err := readBlockNumber(freezer, startStateID) + if err != nil { + log.Error("Failed to read start block number", "error", err, "tail_state_id", startStateID) + return nil, err + } + endBlock, err := readBlockNumber(freezer, head) + if err != nil { + log.Error("Failed to read end block number", "error", err, "head_state_id", head) + return nil, err + } + blockIntervals := nbl.createBlockInterval(startBlock, endBlock) + stateIntervals, err := nbl.createStateInterval(freezer, startStateID, head, blockIntervals) + if err != nil { + return nil, err + } + log.Info("block intervals info", "blockIntervals", blockIntervals, "stateIntervals", stateIntervals, + "startBlock", startBlock, "endBlock", endBlock) + + var eg errgroup.Group + nbl.linkMultiDiffLayers(len(blockIntervals)) + for current, i := nbl.head, 0; current != nil; current, i = current.next, i+1 { + index := i + mdl := current + eg.Go(func() error { + for j := stateIntervals[index][0]; j <= stateIntervals[index][1]; j++ { + h, err := nbl.readStateHistory(freezer, j) + if err != nil { + log.Error("Failed to read state history", "error", err) + return err + } + if err = mdl.commit(h.meta.root, j, h.meta.block, 1, flattenTrieNodes(h.nodes)); err != nil { + log.Error("Failed to commit trie nodes to multi diff layer", "error", err) + return err + } + } + return nil + }) + } + if err = eg.Wait(); err != nil { + return nil, err + } - log.Info("new node buffer list", "proposed block interval", nf.wpBlocks, - "reserve multi difflayers", nf.rsevMdNum, "difflayers in multidifflayer", nf.dlInMd, - "limit", common.StorageSize(limit), "layers", layers, "persist id", nf.persistID, "base_size", size) - return nf + for current, i := nbl.head, 0; current != nil; current, i = current.next, i+1 { + nbl.size += current.size + nbl.layers += current.layers + } + nbl.diffToBase() + + log.Info("Succeed to add diff layer", "base_size", nbl.base.size, "tail_state_id", nbl.tail.id, + "head_state_id", nbl.head.id, "nbl_layers", nbl.layers, "base_layers", nbl.base.layers) + return nbl, nil +} + +// linkMultiDiffLayers links specified amount of multiDiffLayers for recovering +func (nf *nodebufferlist) linkMultiDiffLayers(blockIntervalLength int) { + for i := 0; i < blockIntervalLength; i++ { + mdl := newMultiDifflayer(nf.limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0) + nf.pushFront(mdl) + } + nf.count = uint64(blockIntervalLength) +} + +func (nf *nodebufferlist) readStateHistory(freezer *rawdb.ResettableFreezer, stateID uint64) (*history, error) { + h, err := readHistory(freezer, stateID) + if err != nil { + log.Error("Failed to read history from freezer db", "error", err) + return nil, err + } + return h, nil +} + +func (nf *nodebufferlist) createBlockInterval(startBlock, endBlock uint64) [][]uint64 { + var intervalBoundaries [][]uint64 + firstIntervalEnd := startBlock + nf.dlInMd - (startBlock % nf.dlInMd) + if endBlock < firstIntervalEnd { + firstIntervalEnd = endBlock + } + intervalBoundaries = append(intervalBoundaries, []uint64{startBlock, firstIntervalEnd}) + + for start := firstIntervalEnd + 1; start <= endBlock; start += nf.dlInMd { + end := start + nf.dlInMd - 1 + if end > endBlock { + end = endBlock + } + intervalBoundaries = append(intervalBoundaries, []uint64{start, end}) + } + + sort.Slice(intervalBoundaries, func(i, j int) bool { + return intervalBoundaries[i][0] > intervalBoundaries[j][0] + }) + return intervalBoundaries +} + +func (nf *nodebufferlist) createStateInterval(freezer *rawdb.ResettableFreezer, startStateID, endStateID uint64, + blockIntervals [][]uint64) ([][]uint64, error) { + blockMap, err := readAllBlockNumbers(freezer, startStateID, endStateID) + if err != nil { + log.Crit("Failed to read all history meta", "error", err) + } + + var stateIntervals [][]uint64 + for _, blockList := range blockIntervals { + firstStateID, ok := blockMap[blockList[0]] + if !ok { + log.Error("Corresponding state id is not found", "block", blockList[0]) + return nil, fmt.Errorf("block %d is not found", blockList[0]) + } + + secondStateID, ok := blockMap[blockList[1]] + if !ok { + log.Error("Corresponding state id is not found", "block", blockList[1]) + return nil, fmt.Errorf("block %d is not found", blockList[1]) + } + stateIntervals = append(stateIntervals, []uint64{firstStateID, secondStateID}) + } + return stateIntervals, nil +} + +func (nf *nodebufferlist) getLatestStatus() (common.Hash, uint64, error) { + head := nf.head + log.Info("last head multi diff layer info", "root", head.root, "id", head.id, "block", head.block, + "layer", head.layers, "size", head.size) + return head.root, head.id, nil } // node retrieves the trie node with given node info. @@ -351,10 +525,23 @@ func (nf *nodebufferlist) getAllNodes() map[common.Hash]map[string]*trienode.Nod // getLayers return the size of cached difflayers. func (nf *nodebufferlist) getLayers() uint64 { + for { + if nf.isFlushing.Swap(true) { + time.Sleep(time.Duration(DefaultBackgroundFlushInterval) * time.Second) + log.Info("waiting base node buffer to be flushed to disk") + continue + } else { + break + } + } + defer nf.isFlushing.Store(false) + nf.mux.RLock() nf.baseMux.RLock() + nf.flushMux.Lock() defer nf.mux.RUnlock() defer nf.baseMux.RUnlock() + defer nf.flushMux.Unlock() return nf.layers + nf.base.layers } diff --git a/trie/trienode/node.go b/trie/trienode/node.go index 3c58d40fc1..95315c2e9a 100644 --- a/trie/trienode/node.go +++ b/trie/trienode/node.go @@ -197,8 +197,3 @@ func (set *MergedNodeSet) Flatten() map[common.Hash]map[string]*Node { } return nodes } - -// // ReverseFlatten reverts flattened two-dimensional map to merge node set -// func ReverseFlatten(nodes map[common.Hash]map[string]*Node) MergedNodeSet { -// -// } diff --git a/trie/triestate/state.go b/trie/triestate/state.go index 7c73c8acaf..4c47e9c397 100644 --- a/trie/triestate/state.go +++ b/trie/triestate/state.go @@ -24,7 +24,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie/trienode" "golang.org/x/crypto/sha3" @@ -143,47 +142,6 @@ func Apply(prevRoot common.Hash, postRoot common.Hash, accounts map[common.Addre return ctx.nodes.Flatten(), nil } -func ApplyForDiff(prevRoot common.Hash, postRoot common.Hash, accounts map[common.Address][]byte, storages map[common.Address]map[common.Hash][]byte, - loader TrieLoader) (*trienode.MergedNodeSet, error) { - tr, err := loader.OpenTrie(prevRoot) - if err != nil { - log.Error("Failed to open trie", "error", err) - return nil, err - } - ctx := &context{ - prevRoot: prevRoot, - postRoot: postRoot, - accounts: accounts, - storages: storages, - accountTrie: tr, - nodes: trienode.NewMergedNodeSet(), - } - for addr, account := range accounts { - var err error - if len(account) == 0 { - err = deleteAccountForRecovering(ctx, loader, addr) - } else { - err = updateAccountForRecovering(ctx, loader, addr) - } - // err = updateAccountForRecovering(ctx, loader, addr) - if err != nil { - return nil, fmt.Errorf("failed to apply state, err: %w", err) - } - } - root, result, err := tr.Commit(false) - - if err != nil { - return nil, err - } - if root != postRoot { - return nil, fmt.Errorf("failed to revert state, want %#x, got %#x", postRoot, root) - } - if err := ctx.nodes.Merge(result); err != nil { - return nil, err - } - return ctx.nodes, nil -} - // updateAccount the account was present in prev-state, and may or may not // existent in post-state. Apply the reverse diff and verify if the storage // root matches the one in prev-state account. @@ -259,7 +217,6 @@ func deleteAccount(ctx *context, loader TrieLoader, addr common.Address) error { addrHash := h.hash(addr.Bytes()) blob, err := ctx.accountTrie.Get(addrHash.Bytes()) if err != nil { - log.Error("9") return err } if len(blob) == 0 { @@ -283,7 +240,6 @@ func deleteAccount(ctx *context, loader TrieLoader, addr common.Address) error { } root, result, err := st.Commit(false) if err != nil { - log.Error("13") return err } if root != types.EmptyRootHash { @@ -300,128 +256,6 @@ func deleteAccount(ctx *context, loader TrieLoader, addr common.Address) error { return ctx.accountTrie.Delete(addrHash.Bytes()) } -// updateAccountForRecovering the account is present in post-state, and may or may not -// be existent in prev-state. Apply the diff and verify if the storage root matches the -// one in post-state account. -func updateAccountForRecovering(ctx *context, loader TrieLoader, addr common.Address) error { - // The account was present in post-state, decode it from the - // 'slim-rlp' format bytes. - h := newHasher() - defer h.release() - - addrHash := h.hash(addr.Bytes()) - post, err := types.FullAccount(ctx.accounts[addr]) - if err != nil { - log.Error("Failed to full account for updating", "error", err, "addr", addr.String()) - return err - } - // The account may or may not be existent in prev-state, try to - // load it and decode if it's found. - blob, err := ctx.accountTrie.Get(addrHash.Bytes()) - if err != nil { - log.Error("Failed to get for updating", "error", err) - return err - } - prev := types.NewEmptyStateAccount() - if len(blob) != 0 { - if err = rlp.DecodeBytes(blob, &prev); err != nil { - log.Error("Failed to decode bytes for updating", "error", err) - return err - } - } - // Apply all storage changes into the prev-state storage trie - st, err := loader.OpenStorageTrie(ctx.prevRoot, addrHash, prev.Root) - if err != nil { - log.Error("Failed to open storage trie for updating", "error", err) - return err - } - for k, v := range ctx.storages[addr] { - if len(v) == 0 { - err = st.Delete(k.Bytes()) - } else { - err = st.Update(k.Bytes(), v) - } - if err != nil { - log.Error("Failed to delete or update", "error", err) - return err - } - } - root, result, err := st.Commit(false) - if err != nil { - log.Error("Failed to commit for updating", "error", err) - return err - } - if root != post.Root { - return errors.New("failed to reset storage trie") - } - if result != nil { - if err = ctx.nodes.Merge(result); err != nil { - log.Error("Failed to merge for updating", "error", err) - return err - } - } - // Write the post-state account into the main trie - full, err := rlp.EncodeToBytes(post) - if err != nil { - log.Error("Failed to encode bytes", "error", err) - return err - } - return ctx.accountTrie.Update(addrHash.Bytes(), full) -} - -// deleteAccountForRecovering the account is not present in post-state, and was expected -// to be existent in prev-state. Apply the diff and verify if the account and storage -// is wiped out correctly. -func deleteAccountForRecovering(ctx *context, loader TrieLoader, addr common.Address) error { - // The account must be existent in prev-state, load the account - h := newHasher() - defer h.release() - - addrHash := h.hash(addr.Bytes()) - blob, err := ctx.accountTrie.Get(addrHash.Bytes()) - if err != nil { - return err - } - if len(blob) == 0 { - return fmt.Errorf("account is nonexistent %#x", addrHash) - } - var prev types.StateAccount - if err = rlp.DecodeBytes(blob, prev); err != nil { - log.Error("Failed to decode bytes for deleting accounts", "error", err) - return err - } - st, err := loader.OpenStorageTrie(ctx.prevRoot, addrHash, prev.Root) - if err != nil { - log.Error("Failed to open storage trie for del", "error", err) - return err - } - for k, v := range ctx.storages[addr] { - if len(v) != 0 { - return errors.New("expect storage deletion") - } - if err = st.Delete(k.Bytes()); err != nil { - log.Error("Failed to delete for del", "error", err) - return err - } - } - root, result, err := st.Commit(false) - if err != nil { - log.Error("Failed to commit for del", "error", err) - return err - } - if root != types.EmptyRootHash { - return errors.New("failed to clear storage trie") - } - if result != nil { - if err = ctx.nodes.Merge(result); err != nil { - log.Error("Failed to merge for del", "error", err) - return err - } - } - // Delete the prev-state account from the main trie. - return ctx.accountTrie.Delete(addrHash.Bytes()) -} - // hasher is used to compute the sha256 hash of the provided data. type hasher struct{ sha crypto.KeccakState }