Skip to content

Commit

Permalink
[factory] implement versioned stateDB to support archive mode
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Dec 13, 2024
1 parent a3309f0 commit 1c93d4f
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 58 deletions.
5 changes: 5 additions & 0 deletions blockchain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type (
EnableStateDBCaching bool `yaml:"enableStateDBCaching"`
// EnableArchiveMode is only meaningful when EnableTrielessStateDB is false
EnableArchiveMode bool `yaml:"enableArchiveMode"`
// VersionedNamespaces specifies the versioned namespaces for versioned state DB
VersionedNamespaces []string `yaml:"versionedNamespaces"`
// VersionedMetadata specifies the metadata namespace for versioned state DB
VersionedMetadata string `yaml:"versionedMetadata"`
// EnableAsyncIndexWrite enables writing the block actions' and receipts' index asynchronously
EnableAsyncIndexWrite bool `yaml:"enableAsyncIndexWrite"`
// deprecated
Expand Down Expand Up @@ -107,6 +111,7 @@ var (
GravityChainAPIs: []string{},
},
EnableTrielessStateDB: true,
VersionedNamespaces: []string{},
EnableStateDBCaching: false,
EnableArchiveMode: false,
EnableAsyncIndexWrite: true,
Expand Down
5 changes: 4 additions & 1 deletion chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ func (builder *Builder) createFactory(forTest bool) (factory.Factory, error) {
factory.RegistryStateDBOption(builder.cs.registry),
factory.DefaultPatchOption(),
}
if builder.cfg.Chain.EnableStateDBCaching {
if builder.cfg.Chain.EnableArchiveMode {
dao, err = db.CreateKVStoreVersioned(factoryDBCfg, builder.cfg.Chain.TrieDBPath, builder.cfg.Chain.VersionedNamespaces)
opts = append(opts, factory.MetadataNamespaceOption(builder.cfg.Chain.VersionedMetadata))
} else if builder.cfg.Chain.EnableStateDBCaching {
dao, err = db.CreateKVStoreWithCache(factoryDBCfg, builder.cfg.Chain.TrieDBPath, builder.cfg.Chain.StateDBCacheSize)
} else {
dao, err = db.CreateKVStore(factoryDBCfg, builder.cfg.Chain.TrieDBPath)
Expand Down
17 changes: 13 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,20 @@ func ValidateRollDPoS(cfg Config) error {

// ValidateArchiveMode validates the state factory setting
func ValidateArchiveMode(cfg Config) error {
if !cfg.Chain.EnableArchiveMode || !cfg.Chain.EnableTrielessStateDB {
return nil
if cfg.Chain.EnableArchiveMode && cfg.Chain.EnableTrielessStateDB {
if len(cfg.Chain.VersionedMetadata) == 0 {
return errors.Wrap(ErrInvalidCfg, "State DB archive mode is enabled with empty metadata namespace")
}
if len(cfg.Chain.VersionedNamespaces) == 0 {
return errors.Wrap(ErrInvalidCfg, "State DB archive mode is enabled with empty versioned namespace")
}
for _, v := range cfg.Chain.VersionedNamespaces {
if len(v) == 0 {
return errors.Wrap(ErrInvalidCfg, "State DB archive mode is enabled with empty versioned namespace")
}
}
}

return errors.Wrap(ErrInvalidCfg, "Archive mode is incompatible with trieless state DB")
return nil
}

// ValidateAPI validates the api configs
Expand Down
48 changes: 35 additions & 13 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,20 +254,42 @@ func TestValidateRollDPoS(t *testing.T) {
}

func TestValidateArchiveMode(t *testing.T) {
r := require.New(t)
cfg := Default
cfg.Chain.EnableArchiveMode = true
cfg.Chain.EnableTrielessStateDB = true
require.Error(t, ErrInvalidCfg, errors.Cause(ValidateArchiveMode(cfg)))
require.EqualError(t, ValidateArchiveMode(cfg), "Archive mode is incompatible with trieless state DB: invalid config value")
cfg.Chain.EnableArchiveMode = false
cfg.Chain.EnableTrielessStateDB = true
require.NoError(t, errors.Cause(ValidateArchiveMode(cfg)))
cfg.Chain.EnableArchiveMode = true
cfg.Chain.EnableTrielessStateDB = false
require.NoError(t, errors.Cause(ValidateArchiveMode(cfg)))
cfg.Chain.EnableArchiveMode = false
cfg.Chain.EnableTrielessStateDB = false
require.NoError(t, errors.Cause(ValidateArchiveMode(cfg)))
cfg.Chain.VersionedMetadata = "meta"
for _, v := range [][2]bool{
{false, false},
{false, true},
{true, false},
{true, true},
} {
cfg.Chain.EnableArchiveMode = v[0]
cfg.Chain.EnableTrielessStateDB = v[1]
if !(cfg.Chain.EnableArchiveMode && cfg.Chain.EnableTrielessStateDB) {
r.NoError(ValidateArchiveMode(cfg))
continue
}
for _, v := range []struct {
ns []string
err string
}{
{[]string{"", ""}, "State DB archive mode is enabled with empty versioned namespace"},
{[]string{"Account", ""}, "State DB archive mode is enabled with empty versioned namespace"},
{[]string{"", "Account"}, "State DB archive mode is enabled with empty versioned namespace"},
{[]string{"Account", "Contract"}, ""},
} {
cfg.Chain.VersionedNamespaces = v.ns
if len(v.err) > 0 {
r.ErrorContains(ValidateArchiveMode(cfg), v.err)
} else {
r.NoError(ValidateArchiveMode(cfg))
}
}
cfg.Chain.VersionedMetadata = ""
r.ErrorContains(ValidateArchiveMode(cfg), "State DB archive mode is enabled with empty metadata namespace")
cfg.Chain.VersionedMetadata = "meta"
r.NoError(ValidateArchiveMode(cfg))
}
}

func TestValidateActPool(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions db/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,12 @@ func CreateKVStoreWithCache(cfg Config, dbPath string, cacheSize int) (KVStore,

return NewKvStoreWithCache(dao, cacheSize), nil
}

// CreateKVStoreVersioned creates versioned db from config and db path
func CreateKVStoreVersioned(cfg Config, dbPath string, vns []string) (KVStore, error) {
if len(dbPath) == 0 {
return nil, ErrEmptyDBPath
}
cfg.DbPath = dbPath
return NewKVStoreWithVersion(cfg, VersionedNamespaceOption(vns...)), nil
}
25 changes: 25 additions & 0 deletions db/kvstore_versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,35 @@ type (
KvVersioned interface {
lifecycle.StartStopper

KVStore

// Base returns the underlying KVStore
Base() KVStore

// Version returns the key's most recent version
Version(string, []byte) (uint64, error)

// SetVersion sets the version, and returns a KVStore to call Put()/Get()
SetVersion(uint64) KVStore
}

// KvWithVersion wraps the versioned DB implementation with a certain version
KvWithVersion struct {
// code in PR4518
}
)

type Option func(*KvWithVersion)

func VersionedNamespaceOption(ns ...string) Option {
return func(k *KvWithVersion) {
// code in PR4518
}
}

// NewKVStoreWithVersion implements a KVStore that can handle both versioned
// and non-versioned namespace
func NewKVStoreWithVersion(cfg Config, opts ...Option) KvVersioned {
// code in PR4518
return nil
}
27 changes: 27 additions & 0 deletions state/factory/daoretrofitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (

"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/v2/action/protocol/execution/evm"
"github.com/iotexproject/iotex-core/v2/action/protocol/staking"
"github.com/iotexproject/iotex-core/v2/db"
"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
)

Expand Down Expand Up @@ -47,3 +50,27 @@ func (rtf *daoRTF) getHeight() (uint64, error) {
func (rtf *daoRTF) putHeight(h uint64) error {
return rtf.dao.Put(AccountKVNamespace, []byte(CurrentHeightKey), byteutil.Uint64ToBytes(h))
}

func (rtf *daoRTF) metadataNS() string {
return AccountKVNamespace
}

func (rtf *daoRTF) flusherOptions(preEaster bool) []db.KVStoreFlusherOption {
opts := []db.KVStoreFlusherOption{
db.SerializeOption(func(wi *batch.WriteInfo) []byte {
if preEaster {
return wi.SerializeWithoutWriteType()
}
return wi.Serialize()
}),
}
if !preEaster {
return opts
}
return append(
opts,
db.SerializeFilterOption(func(wi *batch.WriteInfo) bool {
return wi.Namespace() == evm.CodeKVNameSpace || wi.Namespace() == staking.CandsMapNS
}),
)
}
88 changes: 88 additions & 0 deletions state/factory/daoretrofitter_archive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2024 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package factory

import (
"context"

"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/v2/action/protocol/execution/evm"
"github.com/iotexproject/iotex-core/v2/action/protocol/staking"
"github.com/iotexproject/iotex-core/v2/db"
"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
)

type daoRTFArchive struct {
daoVersioned db.KvVersioned
metaNS string // namespace for metadata
}

func newDaoRetrofitterArchive(dao db.KvVersioned, mns string) *daoRTFArchive {
return &daoRTFArchive{
daoVersioned: dao,
metaNS: mns,
}
}

func (rtf *daoRTFArchive) Start(ctx context.Context) error {
return rtf.daoVersioned.Start(ctx)
}

func (rtf *daoRTFArchive) Stop(ctx context.Context) error {
return rtf.daoVersioned.Stop(ctx)
}

func (rtf *daoRTFArchive) atHeight(h uint64) db.KVStore {
return rtf.daoVersioned.SetVersion(h)
}

func (rtf *daoRTFArchive) getHeight() (uint64, error) {
height, err := rtf.daoVersioned.Base().Get(rtf.metaNS, []byte(CurrentHeightKey))
if err != nil {
return 0, errors.Wrap(err, "failed to get factory's height from underlying DB")
}
return byteutil.BytesToUint64(height), nil
}

func (rtf *daoRTFArchive) putHeight(h uint64) error {
return rtf.daoVersioned.Base().Put(rtf.metaNS, []byte(CurrentHeightKey), byteutil.Uint64ToBytes(h))
}

func (rtf *daoRTFArchive) metadataNS() string {
return rtf.metaNS
}

func (rtf *daoRTFArchive) flusherOptions(preEaster bool) []db.KVStoreFlusherOption {
opts := []db.KVStoreFlusherOption{
db.SerializeOption(func(wi *batch.WriteInfo) []byte {
// current height is moved to the metadata namespace
// transform it back for the purpose of calculating digest
wi = rtf.transCurrentHeight(wi)
if preEaster {
return wi.SerializeWithoutWriteType()
}
return wi.Serialize()
}),
}
if !preEaster {
return opts
}
return append(
opts,
db.SerializeFilterOption(func(wi *batch.WriteInfo) bool {
return wi.Namespace() == evm.CodeKVNameSpace || wi.Namespace() == staking.CandsMapNS
}),
)
}

func (rtf *daoRTFArchive) transCurrentHeight(wi *batch.WriteInfo) *batch.WriteInfo {
if wi.Namespace() == rtf.metaNS && string(wi.Key()) == CurrentHeightKey {
return batch.NewWriteInfo(wi.WriteType(), AccountKVNamespace, wi.Key(), wi.Value(), wi.Error())
}
return wi
}
Loading

0 comments on commit 1c93d4f

Please sign in to comment.