Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[factory] implement versioned stateDB to support archive mode #4520

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ func (builder *Builder) createFactory(forTest bool) (factory.Factory, error) {
factory.RegistryStateDBOption(builder.cs.registry),
factory.DefaultPatchOption(),
}
if builder.cfg.Chain.EnableStateDBCaching {
if builder.cfg.Chain.EnableArchiveMode {
dao, err = db.CreateKVStoreVersioned(factoryDBCfg, builder.cfg.Chain.TrieDBPath, factory.VersionedNamespaces)
} else if builder.cfg.Chain.EnableStateDBCaching {
dao, err = db.CreateKVStoreWithCache(factoryDBCfg, builder.cfg.Chain.TrieDBPath, builder.cfg.Chain.StateDBCacheSize)
} else {
dao, err = db.CreateKVStore(factoryDBCfg, builder.cfg.Chain.TrieDBPath)
Expand Down
10 changes: 0 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ var (
// Validates is the collection config validation functions
Validates = []Validate{
ValidateRollDPoS,
ValidateArchiveMode,
ValidateDispatcher,
ValidateAPI,
ValidateActPool,
Expand Down Expand Up @@ -254,15 +253,6 @@ func ValidateRollDPoS(cfg Config) error {
return nil
}

// ValidateArchiveMode validates the state factory setting
func ValidateArchiveMode(cfg Config) error {
if !cfg.Chain.EnableArchiveMode || !cfg.Chain.EnableTrielessStateDB {
return nil
}

return errors.Wrap(ErrInvalidCfg, "Archive mode is incompatible with trieless state DB")
}

// ValidateAPI validates the api configs
func ValidateAPI(cfg Config) error {
if cfg.API.TpsWindow <= 0 {
Expand Down
17 changes: 0 additions & 17 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,23 +253,6 @@ func TestValidateRollDPoS(t *testing.T) {
)
}

func TestValidateArchiveMode(t *testing.T) {
cfg := Default
cfg.Chain.EnableArchiveMode = true
cfg.Chain.EnableTrielessStateDB = true
require.Error(t, ErrInvalidCfg, errors.Cause(ValidateArchiveMode(cfg)))
require.EqualError(t, ValidateArchiveMode(cfg), "Archive mode is incompatible with trieless state DB: invalid config value")
cfg.Chain.EnableArchiveMode = false
cfg.Chain.EnableTrielessStateDB = true
require.NoError(t, errors.Cause(ValidateArchiveMode(cfg)))
cfg.Chain.EnableArchiveMode = true
cfg.Chain.EnableTrielessStateDB = false
require.NoError(t, errors.Cause(ValidateArchiveMode(cfg)))
cfg.Chain.EnableArchiveMode = false
cfg.Chain.EnableTrielessStateDB = false
require.NoError(t, errors.Cause(ValidateArchiveMode(cfg)))
}

func TestValidateActPool(t *testing.T) {
cfg := Default
cfg.ActPool.MaxNumActsPerAcct = 0
Expand Down
19 changes: 19 additions & 0 deletions db/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import "github.com/pkg/errors"
var (
// ErrEmptyDBPath is the error when db path is empty
ErrEmptyDBPath = errors.New("empty db path")
// ErrEmptyVersionedNamespace is the error of empty versioned namespace
ErrEmptyVersionedNamespace = errors.New("cannot create versioned KVStore with empty versioned namespace")
)

// CreateKVStore creates db from config and db path
Expand Down Expand Up @@ -32,3 +34,20 @@ func CreateKVStoreWithCache(cfg Config, dbPath string, cacheSize int) (KVStore,

return NewKvStoreWithCache(dao, cacheSize), nil
}

// CreateKVStoreVersioned creates versioned db from config and db path
func CreateKVStoreVersioned(cfg Config, dbPath string, vns []string) (KVStore, error) {
if len(dbPath) == 0 {
return nil, ErrEmptyDBPath
}
if len(vns) == 0 {
return nil, ErrEmptyVersionedNamespace
}
for i := range vns {
if len(vns[i]) == 0 {
return nil, ErrEmptyVersionedNamespace
}
}
cfg.DbPath = dbPath
return NewKVStoreWithVersion(cfg, VersionedNamespaceOption(vns...)), nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix ci

}
25 changes: 25 additions & 0 deletions db/kvstore_versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,35 @@ type (
KvVersioned interface {
lifecycle.StartStopper

KVStore

// Base returns the underlying KVStore
Base() KVStore

// Version returns the key's most recent version
Version(string, []byte) (uint64, error)

// SetVersion sets the version, and returns a KVStore to call Put()/Get()
SetVersion(uint64) KVStore
}

// KvWithVersion wraps the versioned DB implementation with a certain version
KvWithVersion struct {
// code in PR4518
}
)

type Option func(*KvWithVersion)

func VersionedNamespaceOption(ns ...string) Option {
return func(k *KvWithVersion) {
// code in PR4518
}
}

// NewKVStoreWithVersion implements a KVStore that can handle both versioned
// and non-versioned namespace
func NewKVStoreWithVersion(cfg Config, opts ...Option) KvVersioned {
// code in PR4518
return nil
}
27 changes: 27 additions & 0 deletions state/factory/daoretrofitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (

"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/v2/action/protocol/execution/evm"
"github.com/iotexproject/iotex-core/v2/action/protocol/staking"
"github.com/iotexproject/iotex-core/v2/db"
"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
)

Expand Down Expand Up @@ -47,3 +50,27 @@ func (rtf *daoRTF) getHeight() (uint64, error) {
func (rtf *daoRTF) putHeight(h uint64) error {
return rtf.dao.Put(AccountKVNamespace, []byte(CurrentHeightKey), byteutil.Uint64ToBytes(h))
}

func (rtf *daoRTF) metadataNS() string {
return AccountKVNamespace
}

func (rtf *daoRTF) flusherOptions(preEaster bool) []db.KVStoreFlusherOption {
opts := []db.KVStoreFlusherOption{
db.SerializeOption(func(wi *batch.WriteInfo) []byte {
if preEaster {
return wi.SerializeWithoutWriteType()
}
return wi.Serialize()
}),
}
if !preEaster {
return opts
}
return append(
opts,
db.SerializeFilterOption(func(wi *batch.WriteInfo) bool {
return wi.Namespace() == evm.CodeKVNameSpace || wi.Namespace() == staking.CandsMapNS
}),
)
}
88 changes: 88 additions & 0 deletions state/factory/daoretrofitter_archive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2024 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package factory

import (
"context"

"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/v2/action/protocol/execution/evm"
"github.com/iotexproject/iotex-core/v2/action/protocol/staking"
"github.com/iotexproject/iotex-core/v2/db"
"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
)

type daoRTFArchive struct {
daoVersioned db.KvVersioned
metaNS string // namespace for metadata
}

func newDaoRetrofitterArchive(dao db.KvVersioned, mns string) *daoRTFArchive {
return &daoRTFArchive{
daoVersioned: dao,
metaNS: mns,
}
}

func (rtf *daoRTFArchive) Start(ctx context.Context) error {
return rtf.daoVersioned.Start(ctx)
}

func (rtf *daoRTFArchive) Stop(ctx context.Context) error {
return rtf.daoVersioned.Stop(ctx)
}

func (rtf *daoRTFArchive) atHeight(h uint64) db.KVStore {
return rtf.daoVersioned.SetVersion(h)
}

func (rtf *daoRTFArchive) getHeight() (uint64, error) {
height, err := rtf.daoVersioned.Base().Get(rtf.metaNS, []byte(CurrentHeightKey))
if err != nil {
return 0, errors.Wrap(err, "failed to get factory's height from underlying DB")
}
return byteutil.BytesToUint64(height), nil
}

func (rtf *daoRTFArchive) putHeight(h uint64) error {
return rtf.daoVersioned.Base().Put(rtf.metaNS, []byte(CurrentHeightKey), byteutil.Uint64ToBytes(h))
}

func (rtf *daoRTFArchive) metadataNS() string {
return rtf.metaNS
}

func (rtf *daoRTFArchive) flusherOptions(preEaster bool) []db.KVStoreFlusherOption {
opts := []db.KVStoreFlusherOption{
db.SerializeOption(func(wi *batch.WriteInfo) []byte {
// current height is moved to the metadata namespace
// transform it back for the purpose of calculating digest
wi = rtf.transCurrentHeight(wi)
if preEaster {
return wi.SerializeWithoutWriteType()
}
return wi.Serialize()
}),
}
if !preEaster {
return opts
}
return append(
opts,
db.SerializeFilterOption(func(wi *batch.WriteInfo) bool {
return wi.Namespace() == evm.CodeKVNameSpace || wi.Namespace() == staking.CandsMapNS
}),
)
}

func (rtf *daoRTFArchive) transCurrentHeight(wi *batch.WriteInfo) *batch.WriteInfo {
if wi.Namespace() == rtf.metaNS && string(wi.Key()) == CurrentHeightKey {
return batch.NewWriteInfo(wi.WriteType(), AccountKVNamespace, wi.Key(), wi.Value(), wi.Error())
}
return wi
}
48 changes: 21 additions & 27 deletions state/factory/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/iotexproject/iotex-core/v2/action"
"github.com/iotexproject/iotex-core/v2/action/protocol"
"github.com/iotexproject/iotex-core/v2/action/protocol/execution/evm"
"github.com/iotexproject/iotex-core/v2/action/protocol/staking"
"github.com/iotexproject/iotex-core/v2/actpool"
"github.com/iotexproject/iotex-core/v2/blockchain/block"
"github.com/iotexproject/iotex-core/v2/blockchain/genesis"
Expand All @@ -33,13 +32,22 @@ import (
"github.com/iotexproject/iotex-core/v2/state"
)

var (
// VersionedMetadata is the metadata namespace for versioned stateDB
VersionedMetadata = "Meta"
// VersionedNamespaces are the versioned namespaces for versioned stateDB
VersionedNamespaces = []string{AccountKVNamespace, evm.ContractKVNameSpace}
)

type (
// daoRetrofitter represents the DAO-related methods to accommodate archive-mode
daoRetrofitter interface {
lifecycle.StartStopper
atHeight(uint64) db.KVStore
getHeight() (uint64, error)
putHeight(uint64) error
metadataNS() string
flusherOptions(bool) []db.KVStoreFlusherOption
}
// stateDB implements StateFactory interface, tracks changes to account/contract and batch-commits to DB
stateDB struct {
Expand Down Expand Up @@ -106,7 +114,15 @@ func NewStateDB(cfg Config, dao db.KVStore, opts ...StateDBOption) (Factory, err
return nil, err
}
}
sdb.dao = newDaoRetrofitter(dao)
if cfg.Chain.EnableArchiveMode {
daoVersioned, ok := dao.(db.KvVersioned)
if !ok {
return nil, errors.Wrap(ErrNotSupported, "cannot enable archive mode StateDB with non-versioned DB")
}
sdb.dao = newDaoRetrofitterArchive(daoVersioned, VersionedMetadata)
} else {
sdb.dao = newDaoRetrofitter(dao)
}
Copy link
Member Author

@dustinxie dustinxie Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we can add a new struct/file to implement statedb for archive-mode, like

struct stateDBArchive {
    *stateDB
    xxx
}

but after checking, that would entail duplicating almost all the codes, and in the future, any changes would need to go to 2 files. Let me know if you have good suggestion.

timerFactory, err := prometheustimer.New(
"iotex_statefactory_perf",
"Performance of state factory module",
Expand Down Expand Up @@ -181,7 +197,7 @@ func (sdb *stateDB) newWorkingSet(ctx context.Context, height uint64) (*workingS
flusher, err := db.NewKVStoreFlusher(
sdb.dao.atHeight(height),
batch.NewCachedBatch(),
sdb.flusherOptions(!g.IsEaster(height))...,
sdb.dao.flusherOptions(!g.IsEaster(height))...,
)
if err != nil {
return nil, err
Expand All @@ -193,11 +209,10 @@ func (sdb *stateDB) newWorkingSet(ctx context.Context, height uint64) (*workingS
flusher.KVStoreWithBuffer().MustPut(p.Namespace, p.Key, p.Value)
}
}
store := newStateDBWorkingSetStore(sdb.protocolView, flusher, g.IsNewfoundland(height))
store := newStateDBWorkingSetStore(sdb.protocolView, flusher, g.IsNewfoundland(height), sdb.dao.metadataNS())
if err := store.Start(ctx); err != nil {
return nil, err
}

return newWorkingSet(height, store), nil
}

Expand Down Expand Up @@ -271,7 +286,6 @@ func (sdb *stateDB) WorkingSet(ctx context.Context) (protocol.StateManager, erro
}

func (sdb *stateDB) WorkingSetAtHeight(ctx context.Context, height uint64) (protocol.StateManager, error) {
// TODO: implement archive mode
return sdb.newWorkingSet(ctx, height)
}

Expand Down Expand Up @@ -368,29 +382,9 @@ func (sdb *stateDB) ReadView(name string) (interface{}, error) {
}

//======================================
// private trie constructor functions
// private statedb functions
//======================================

func (sdb *stateDB) flusherOptions(preEaster bool) []db.KVStoreFlusherOption {
opts := []db.KVStoreFlusherOption{
db.SerializeOption(func(wi *batch.WriteInfo) []byte {
if preEaster {
return wi.SerializeWithoutWriteType()
}
return wi.Serialize()
}),
}
if !preEaster {
return opts
}
return append(
opts,
db.SerializeFilterOption(func(wi *batch.WriteInfo) bool {
return wi.Namespace() == evm.CodeKVNameSpace || wi.Namespace() == staking.CandsMapNS
}),
)
}

func (sdb *stateDB) state(h uint64, ns string, addr []byte, s interface{}) error {
data, err := sdb.dao.atHeight(h).Get(ns, addr)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions state/factory/workingsetstore_statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ import (
type stateDBWorkingSetStore struct {
*workingSetStoreCommon
readBuffer bool
metaNS string // metadata namespace, only meaningful when archive-mode enabled
}

func newStateDBWorkingSetStore(view protocol.View, flusher db.KVStoreFlusher, readBuffer bool) workingSetStore {
func newStateDBWorkingSetStore(view protocol.View, flusher db.KVStoreFlusher, readBuffer bool, ns string) workingSetStore {
return &stateDBWorkingSetStore{
workingSetStoreCommon: &workingSetStoreCommon{
flusher: flusher,
view: view,
},
readBuffer: readBuffer,
metaNS: ns,
}
}

Expand Down Expand Up @@ -61,7 +63,7 @@ func (store *stateDBWorkingSetStore) States(ns string, keys [][]byte) ([][]byte,
func (store *stateDBWorkingSetStore) Finalize(height uint64) error {
// Persist current chain Height
store.flusher.KVStoreWithBuffer().MustPut(
AccountKVNamespace,
store.metaNS,
[]byte(CurrentHeightKey),
byteutil.Uint64ToBytes(height),
)
Expand Down
Loading
Loading