Skip to content

Commit

Permalink
feat: autodetect consensus engine (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
troykessler authored Apr 23, 2024
1 parent e022313 commit 888889c
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 66 deletions.
30 changes: 12 additions & 18 deletions blocksync/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,36 @@ func StartBlockSync(engine types.Engine, chainRest, storageRest string, poolId,
return StartDBExecutor(engine, chainRest, storageRest, poolId, targetHeight, metrics, port, 0, 0, utils.DefaultSnapshotServerPort, false, false, backupCfg)
}

func PerformBlockSyncValidationChecks(engine types.Engine, chainRest string, blockPoolId, targetHeight int64, checkEndHeight, userInput bool) error {
continuationHeight, err := engine.GetContinuationHeight()
func PerformBlockSyncValidationChecks(engine types.Engine, chainRest string, blockPoolId, targetHeight int64, checkEndHeight, userInput bool) (continuationHeight int64, err error) {
continuationHeight, err = engine.GetContinuationHeight()
if err != nil {
return fmt.Errorf("failed to get continuation height from engine: %w", err)
return continuationHeight, fmt.Errorf("failed to get continuation height from engine: %w", err)
}

logger.Info().Msg(fmt.Sprintf("loaded current block height of node: %d", continuationHeight-1))

// perform boundary checks
_, startHeight, endHeight, err := helpers.GetBlockBoundaries(chainRest, blockPoolId)
if err != nil {
return fmt.Errorf("failed to get block boundaries: %w", err)
return continuationHeight, fmt.Errorf("failed to get block boundaries: %w", err)
}

logger.Info().Msg(fmt.Sprintf("retrieved block boundaries, earliest block height = %d, latest block height %d", startHeight, endHeight))

if continuationHeight < startHeight {
return fmt.Errorf("app is currently at height %d but first available block on pool is %d", continuationHeight, startHeight)
return continuationHeight, fmt.Errorf("app is currently at height %d but first available block on pool is %d", continuationHeight, startHeight)
}

if continuationHeight > endHeight {
return fmt.Errorf("app is currently at height %d but last available block on pool is %d", continuationHeight, endHeight)
return continuationHeight, fmt.Errorf("app is currently at height %d but last available block on pool is %d", continuationHeight, endHeight)
}

if targetHeight > 0 && continuationHeight > targetHeight {
return fmt.Errorf("requested target height is %d but app is already at block height %d", targetHeight, continuationHeight)
return continuationHeight, fmt.Errorf("requested target height is %d but app is already at block height %d", targetHeight, continuationHeight)
}

if checkEndHeight && targetHeight > 0 && targetHeight > endHeight {
return fmt.Errorf("requested target height is %d but last available block on pool is %d", targetHeight, endHeight)
return continuationHeight, fmt.Errorf("requested target height is %d but last available block on pool is %d", targetHeight, endHeight)
}

if targetHeight == 0 {
Expand All @@ -66,28 +66,22 @@ func PerformBlockSyncValidationChecks(engine types.Engine, chainRest string, blo
}

if _, err := fmt.Scan(&answer); err != nil {
return fmt.Errorf("failed to read in user input: %s", err)
return continuationHeight, fmt.Errorf("failed to read in user input: %s", err)
}

if strings.ToLower(answer) != "y" {
return errors.New("aborted block-sync")
return continuationHeight, errors.New("aborted block-sync")
}
}

return nil
return
}

func StartBlockSyncWithBinary(engine types.Engine, binaryPath, homePath, chainId, chainRest, storageRest string, blockPoolId, targetHeight int64, metrics bool, port int64, backupCfg *types.BackupConfig, skipCrisisInvariants, optOut, debug, userInput bool) {
func StartBlockSyncWithBinary(engine types.Engine, binaryPath, homePath, chainId, chainRest, storageRest string, blockPoolId, targetHeight int64, metrics bool, port int64, backupCfg *types.BackupConfig, skipCrisisInvariants, optOut, debug bool) {
logger.Info().Msg("starting block-sync")

utils.TrackSyncStartEvent(engine, utils.BLOCK_SYNC, chainId, chainRest, storageRest, targetHeight, optOut)

// perform validation checks before booting state-sync process
if err := PerformBlockSyncValidationChecks(engine, chainRest, blockPoolId, targetHeight, true, userInput); err != nil {
logger.Error().Msg(fmt.Sprintf("block-sync validation checks failed: %s", err))
os.Exit(1)
}

if err := bootstrap.StartBootstrapWithBinary(engine, binaryPath, homePath, chainRest, storageRest, blockPoolId, skipCrisisInvariants, debug); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to bootstrap node: %s", err))
os.Exit(1)
Expand Down
28 changes: 23 additions & 5 deletions cmd/ksync/commands/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func init() {
blockSyncCmd.Flags().StringVarP(&engine, "engine", "e", utils.DefaultEngine, "consensus engine of the binary, list all engines with \"ksync engines\"")
blockSyncCmd.Flags().StringVarP(&engine, "engine", "e", "", fmt.Sprintf("consensus engine of the binary by default %s is used, list all engines with \"ksync engines\"", utils.DefaultEngine))

blockSyncCmd.Flags().StringVarP(&binaryPath, "binary", "b", "", "binary path of node to be synced")
if err := blockSyncCmd.MarkFlagRequired("binary"); err != nil {
Expand Down Expand Up @@ -76,21 +76,39 @@ var blockSyncCmd = &cobra.Command{
return
}

consensusEngine := engines.EngineFactory(engine)

tmEngine := engines.EngineFactory(utils.EngineTendermintV34)
if reset {
if err := consensusEngine.ResetAll(homePath, true); err != nil {
if err := tmEngine.ResetAll(homePath, true); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to reset tendermint application: %s", err))
os.Exit(1)
}
}

if err := tmEngine.OpenDBs(homePath); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to open dbs in engine: %s", err))
os.Exit(1)
}

// perform validation checks before booting state-sync process
continuationHeight, err := blocksync.PerformBlockSyncValidationChecks(tmEngine, chainRest, bId, targetHeight, true, !y)
if err != nil {
logger.Error().Msg(fmt.Sprintf("block-sync validation checks failed: %s", err))
os.Exit(1)
}

if err := tmEngine.CloseDBs(); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to close dbs in engine: %s", err))
os.Exit(1)
}

consensusEngine := engines.EngineSourceFactory(engine, registryUrl, chainId, source, continuationHeight)

if err := consensusEngine.OpenDBs(homePath); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to open dbs in engine: %s", err))
os.Exit(1)
}

blocksync.StartBlockSyncWithBinary(consensusEngine, binaryPath, homePath, chainId, chainRest, storageRest, bId, targetHeight, metrics, metricsPort, backupCfg, skipCrisisInvariants, optOut, debug, !y)
blocksync.StartBlockSyncWithBinary(consensusEngine, binaryPath, homePath, chainId, chainRest, storageRest, bId, targetHeight, metrics, metricsPort, backupCfg, skipCrisisInvariants, optOut, debug)

if err := consensusEngine.CloseDBs(); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to close dbs in engine: %s", err))
Expand Down
39 changes: 34 additions & 5 deletions cmd/ksync/commands/heightsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package commands

import (
"fmt"
"github.com/KYVENetwork/ksync/blocksync"
"github.com/KYVENetwork/ksync/engines"
"github.com/KYVENetwork/ksync/heightsync"
"github.com/KYVENetwork/ksync/sources"
Expand All @@ -12,7 +13,7 @@ import (
)

func init() {
heightSyncCmd.Flags().StringVarP(&engine, "engine", "e", utils.DefaultEngine, "consensus engine of the binary, list all engines with \"ksync engines\"")
heightSyncCmd.Flags().StringVarP(&engine, "engine", "e", "", fmt.Sprintf("consensus engine of the binary by default %s is used, list all engines with \"ksync engines\"", utils.DefaultEngine))

heightSyncCmd.Flags().StringVarP(&binaryPath, "binary", "b", "", "binary path of node to be synced")
if err := heightSyncCmd.MarkFlagRequired("binary"); err != nil {
Expand Down Expand Up @@ -60,21 +61,49 @@ var heightSyncCmd = &cobra.Command{
os.Exit(1)
}

consensusEngine := engines.EngineFactory(engine)

tmEngine := engines.EngineFactory(utils.EngineTendermintV34)
if reset {
if err := consensusEngine.ResetAll(homePath, true); err != nil {
if err := tmEngine.ResetAll(homePath, true); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to reset tendermint application: %s", err))
os.Exit(1)
}
}

if err := tmEngine.OpenDBs(homePath); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to open dbs in engine: %s", err))
os.Exit(1)
}

// perform validation checks before booting state-sync process
snapshotBundleId, snapshotHeight, err := heightsync.PerformHeightSyncValidationChecks(tmEngine, chainRest, sId, bId, targetHeight, !y)
if err != nil {
logger.Error().Msg(fmt.Sprintf("block-sync validation checks failed: %s", err))
os.Exit(1)
}

continuationHeight := snapshotHeight

if continuationHeight == 0 {
continuationHeight, err = blocksync.PerformBlockSyncValidationChecks(tmEngine, chainRest, bId, targetHeight, true, false)
if err != nil {
logger.Error().Msg(fmt.Sprintf("block-sync validation checks failed: %s", err))
os.Exit(1)
}
}

if err := tmEngine.CloseDBs(); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to close dbs in engine: %s", err))
os.Exit(1)
}

consensusEngine := engines.EngineSourceFactory(engine, registryUrl, chainId, source, continuationHeight)

if err := consensusEngine.OpenDBs(homePath); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to open dbs in engine: %s", err))
os.Exit(1)
}

heightsync.StartHeightSyncWithBinary(consensusEngine, binaryPath, homePath, chainId, chainRest, storageRest, sId, bId, targetHeight, optOut, debug, !y)
heightsync.StartHeightSyncWithBinary(consensusEngine, binaryPath, homePath, chainId, chainRest, storageRest, sId, bId, targetHeight, snapshotBundleId, snapshotHeight, optOut, debug)

if err := consensusEngine.CloseDBs(); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to close dbs in engine: %s", err))
Expand Down
2 changes: 1 addition & 1 deletion cmd/ksync/commands/resetall.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func init() {
resetCmd.Flags().StringVarP(&engine, "engine", "e", utils.DefaultEngine, "consensus engine of the binary, list all engines with \"ksync engines\"")
resetCmd.Flags().StringVarP(&engine, "engine", "e", "", fmt.Sprintf("consensus engine of the binary by default %s is used, list all engines with \"ksync engines\"", utils.DefaultEngine))

resetCmd.Flags().StringVar(&homePath, "home", "", "home directory")
if err := resetCmd.MarkFlagRequired("home"); err != nil {
Expand Down
39 changes: 34 additions & 5 deletions cmd/ksync/commands/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package commands

import (
"fmt"
"github.com/KYVENetwork/ksync/blocksync"
"github.com/KYVENetwork/ksync/engines"
"github.com/KYVENetwork/ksync/servesnapshots"
"github.com/KYVENetwork/ksync/sources"
Expand All @@ -12,7 +13,7 @@ import (
)

func init() {
serveCmd.Flags().StringVarP(&engine, "engine", "e", utils.DefaultEngine, "consensus engine of the binary, list all engines with \"ksync engines\"")
serveCmd.Flags().StringVarP(&engine, "engine", "e", "", fmt.Sprintf("consensus engine of the binary by default %s is used, list all engines with \"ksync engines\"", utils.DefaultEngine))

serveCmd.Flags().StringVarP(&binaryPath, "binary", "b", "", "binary path of node to be synced")
if err := serveCmd.MarkFlagRequired("binary"); err != nil {
Expand Down Expand Up @@ -71,22 +72,50 @@ var serveCmd = &cobra.Command{
os.Exit(1)
}

consensusEngine := engines.EngineFactory(engine)

tmEngine := engines.EngineFactory(utils.EngineTendermintV34)
if reset {
if err := consensusEngine.ResetAll(homePath, true); err != nil {
if err := tmEngine.ResetAll(homePath, true); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to reset tendermint application: %s", err))
os.Exit(1)
}
}

if err := tmEngine.OpenDBs(homePath); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to open dbs in engine: %s", err))
os.Exit(1)
}

// perform validation checks before booting state-sync process
snapshotBundleId, snapshotHeight, err := servesnapshots.PerformServeSnapshotsValidationChecks(tmEngine, chainRest, sId, bId, startHeight, targetHeight)
if err != nil {
logger.Error().Msg(fmt.Sprintf("block-sync validation checks failed: %s", err))
os.Exit(1)
}

continuationHeight := snapshotHeight

if continuationHeight == 0 {
continuationHeight, err = blocksync.PerformBlockSyncValidationChecks(tmEngine, chainRest, bId, targetHeight, false, false)
if err != nil {
logger.Error().Msg(fmt.Sprintf("block-sync validation checks failed: %s", err))
os.Exit(1)
}
}

if err := tmEngine.CloseDBs(); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to close dbs in engine: %s", err))
os.Exit(1)
}

consensusEngine := engines.EngineSourceFactory(engine, registryUrl, chainId, source, continuationHeight)

if err := consensusEngine.OpenDBs(homePath); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to open dbs engine: %s", err))
os.Exit(1)
}

utils.TrackServeSnapshotsEvent(consensusEngine, chainId, chainRest, storageRest, snapshotPort, metrics, metricsPort, startHeight, pruning, keepSnapshots, debug, optOut)
servesnapshots.StartServeSnapshotsWithBinary(consensusEngine, binaryPath, homePath, chainRest, storageRest, bId, metrics, metricsPort, sId, snapshotPort, startHeight, targetHeight, skipCrisisInvariants, pruning, keepSnapshots, skipWaiting, debug)
servesnapshots.StartServeSnapshotsWithBinary(consensusEngine, binaryPath, homePath, chainRest, storageRest, bId, metrics, metricsPort, sId, snapshotPort, targetHeight, snapshotBundleId, snapshotHeight, skipCrisisInvariants, pruning, keepSnapshots, skipWaiting, debug)

if err := consensusEngine.CloseDBs(); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to close dbs in engine: %s", err))
Expand Down
18 changes: 13 additions & 5 deletions cmd/ksync/commands/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func init() {
stateSyncCmd.Flags().StringVarP(&engine, "engine", "e", utils.DefaultEngine, "consensus engine of the binary, list all engines with \"ksync engines\"")
stateSyncCmd.Flags().StringVarP(&engine, "engine", "e", "", fmt.Sprintf("consensus engine of the binary by default %s is used, list all engines with \"ksync engines\"", utils.DefaultEngine))

stateSyncCmd.Flags().StringVarP(&binaryPath, "binary", "b", "", "binary path of node to be synced")
if err := stateSyncCmd.MarkFlagRequired("binary"); err != nil {
Expand Down Expand Up @@ -59,21 +59,29 @@ var stateSyncCmd = &cobra.Command{
os.Exit(1)
}

consensusEngine := engines.EngineFactory(engine)

tmEngine := engines.EngineFactory(utils.EngineTendermintV34)
if reset {
if err := consensusEngine.ResetAll(homePath, true); err != nil {
if err := tmEngine.ResetAll(homePath, true); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to reset tendermint application: %s", err))
os.Exit(1)
}
}

// perform validation checks before booting state-sync process
snapshotBundleId, snapshotHeight, err := statesync.PerformStateSyncValidationChecks(chainRest, sId, targetHeight, !y)
if err != nil {
logger.Error().Msg(fmt.Sprintf("state-sync validation checks failed: %s", err))
os.Exit(1)
}

consensusEngine := engines.EngineSourceFactory(engine, registryUrl, chainId, source, snapshotHeight)

if err := consensusEngine.OpenDBs(homePath); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to open dbs in engine: %s", err))
os.Exit(1)
}

statesync.StartStateSyncWithBinary(consensusEngine, binaryPath, chainId, chainRest, storageRest, sId, targetHeight, optOut, debug, !y)
statesync.StartStateSyncWithBinary(consensusEngine, binaryPath, chainId, chainRest, storageRest, sId, targetHeight, snapshotBundleId, snapshotHeight, optOut, debug)

if err := consensusEngine.CloseDBs(); err != nil {
logger.Error().Msg(fmt.Sprintf("failed to close dbs in engine: %s", err))
Expand Down
34 changes: 34 additions & 0 deletions engines/engines.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,51 @@ import (
"github.com/KYVENetwork/ksync/engines/cometbft-v37"
"github.com/KYVENetwork/ksync/engines/cometbft-v38"
"github.com/KYVENetwork/ksync/engines/tendermint-v34"
"github.com/KYVENetwork/ksync/sources/helpers"
"github.com/KYVENetwork/ksync/types"
"github.com/KYVENetwork/ksync/utils"
"os"
"strconv"
)

var (
logger = utils.KsyncLogger("engines")
)

func EngineSourceFactory(engine, registryUrl, chainId, source string, continuationHeight int64) types.Engine {
// if the engine was specified by the user or the source is empty we determine the engine by the engine input
if engine != "" || source == "" {
return EngineFactory(engine)
}

entry, err := helpers.GetSourceRegistryEntry(registryUrl, chainId, source)
if err != nil {
logger.Error().Msg(fmt.Sprintf("failed to get source registry entry: %s", err))
os.Exit(1)
}

for _, upgrade := range entry.Codebase.Settings.Upgrades {
height, err := strconv.ParseInt(upgrade.Height, 10, 64)
if err != nil {
logger.Error().Msg(fmt.Sprintf("failed to parse upgrade height %s: %s", upgrade.Height, err))
os.Exit(1)
}

if continuationHeight < height {
break
}

engine = upgrade.Engine
}

logger.Info().Msg(fmt.Sprintf("using \"%s\" as consensus engine", engine))
return EngineFactory(engine)
}

func EngineFactory(engine string) types.Engine {
switch engine {
case "":
return &tendermint_v34.Engine{}
case utils.EngineTendermintV34:
return &tendermint_v34.Engine{}
case utils.EngineCometBFTV37:
Expand Down
Loading

0 comments on commit 888889c

Please sign in to comment.