Skip to content

Commit

Permalink
Merge branch 'master' into UlyanaAndrukhiv/6017-pebble-for-tracker-up…
Browse files Browse the repository at this point in the history
…dates
  • Loading branch information
UlyanaAndrukhiv authored Sep 12, 2024
2 parents fba4e0b + 9653906 commit 3362d0b
Show file tree
Hide file tree
Showing 23 changed files with 1,671 additions and 81 deletions.
46 changes: 44 additions & 2 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/onflow/flow-go/engine/execution/checker"
"github.com/onflow/flow-go/engine/execution/computation"
"github.com/onflow/flow-go/engine/execution/computation/committer"
txmetrics "github.com/onflow/flow-go/engine/execution/computation/metrics"
"github.com/onflow/flow-go/engine/execution/ingestion"
"github.com/onflow/flow-go/engine/execution/ingestion/fetcher"
"github.com/onflow/flow-go/engine/execution/ingestion/loader"
Expand Down Expand Up @@ -126,7 +127,7 @@ type ExecutionNode struct {

ingestionUnit *engine.Unit

collector module.ExecutionMetrics
collector *metrics.ExecutionCollector
executionState state.ExecutionState
followerState protocol.FollowerState
committee hotstuff.DynamicCommittee
Expand Down Expand Up @@ -159,6 +160,7 @@ type ExecutionNode struct {
executionDataTracker storage.ExecutionDataTracker
blobService network.BlobService
blobserviceDependable *module.ProxiedReadyDoneAware
metricsProvider txmetrics.TransactionExecutionMetricsProvider
}

func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
Expand Down Expand Up @@ -227,6 +229,7 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
Component("block data upload manager", exeNode.LoadBlockUploaderManager).
Component("GCP block data uploader", exeNode.LoadGCPBlockDataUploader).
Component("S3 block data uploader", exeNode.LoadS3BlockDataUploader).
Component("transaction execution metrics", exeNode.LoadTransactionExecutionMetrics).
Component("provider engine", exeNode.LoadProviderEngine).
Component("checker engine", exeNode.LoadCheckerEngine).
Component("ingestion engine", exeNode.LoadIngestionEngine).
Expand Down Expand Up @@ -543,10 +546,27 @@ func (exeNode *ExecutionNode) LoadProviderEngine(

vmCtx := fvm.NewContext(opts...)

var collector module.ExecutionMetrics
collector = exeNode.collector
if exeNode.exeConf.transactionExecutionMetricsEnabled {
// inject the transaction execution metrics
collector = exeNode.collector.WithTransactionCallback(
func(dur time.Duration, stats module.TransactionExecutionResultStats, info module.TransactionExecutionResultInfo) {
exeNode.metricsProvider.Collect(
info.BlockID,
info.BlockHeight,
txmetrics.TransactionExecutionMetrics{
TransactionID: info.TransactionID,
ExecutionTime: dur,
ExecutionEffortWeights: stats.ComputationIntensities,
})
})
}

ledgerViewCommitter := committer.NewLedgerViewCommitter(exeNode.ledgerStorage, node.Tracer)
manager, err := computation.New(
node.Logger,
exeNode.collector,
collector,
node.Tracer,
node.Me,
node.State,
Expand Down Expand Up @@ -1047,6 +1067,9 @@ func (exeNode *ExecutionNode) LoadIngestionEngine(
// consistency of collection can be checked by checking hash, and hash comes from trusted source (blocks from consensus follower)
// hence we not need to check origin
requester.WithValidateStaking(false),
// we have observed execution nodes occasionally fail to retrieve collections using this engine, which can cause temporary execution halts
// setting a retry maximum of 10s results in a much faster recovery from these faults (default is 2m)
requester.WithRetryMaximum(10*time.Second),
)

if err != nil {
Expand Down Expand Up @@ -1126,6 +1149,24 @@ func (exeNode *ExecutionNode) LoadScriptsEngine(node *NodeConfig) (module.ReadyD
return exeNode.scriptsEng, nil
}

func (exeNode *ExecutionNode) LoadTransactionExecutionMetrics(
node *NodeConfig,
) (module.ReadyDoneAware, error) {
lastFinalizedHeader := node.LastFinalizedHeader

metricsProvider := txmetrics.NewTransactionExecutionMetricsProvider(
node.Logger,
exeNode.executionState,
node.Storage.Headers,
lastFinalizedHeader.Height,
exeNode.exeConf.transactionExecutionMetricsBufferSize,
)

node.ProtocolEvents.AddConsumer(metricsProvider)
exeNode.metricsProvider = metricsProvider
return metricsProvider, nil
}

func (exeNode *ExecutionNode) LoadConsensusCommittee(
node *NodeConfig,
) (
Expand Down Expand Up @@ -1327,6 +1368,7 @@ func (exeNode *ExecutionNode) LoadGrpcServer(
exeNode.results,
exeNode.txResults,
node.Storage.Commits,
exeNode.metricsProvider,
node.RootChainID,
signature.NewBlockSignerDecoder(exeNode.committee),
exeNode.exeConf.apiRatelimits,
Expand Down
62 changes: 33 additions & 29 deletions cmd/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,37 @@ import (

// ExecutionConfig contains the configs for starting up execution nodes
type ExecutionConfig struct {
rpcConf rpc.Config
triedir string
executionDataDir string
registerDir string
mTrieCacheSize uint32
transactionResultsCacheSize uint
checkpointDistance uint
checkpointsToKeep uint
chunkDataPackDir string
chunkDataPackCacheSize uint
chunkDataPackRequestsCacheSize uint32
requestInterval time.Duration
extensiveLog bool
pauseExecution bool
chunkDataPackQueryTimeout time.Duration
chunkDataPackDeliveryTimeout time.Duration
enableBlockDataUpload bool
gcpBucketName string
s3BucketName string
apiRatelimits map[string]int
apiBurstlimits map[string]int
executionDataAllowedPeers string
executionDataPrunerHeightRangeTarget uint64
executionDataPrunerThreshold uint64
blobstoreRateLimit int
blobstoreBurstLimit int
chunkDataPackRequestWorkers uint
maxGracefulStopDuration time.Duration
importCheckpointWorkerCount int
rpcConf rpc.Config
triedir string
executionDataDir string
registerDir string
mTrieCacheSize uint32
transactionResultsCacheSize uint
checkpointDistance uint
checkpointsToKeep uint
chunkDataPackDir string
chunkDataPackCacheSize uint
chunkDataPackRequestsCacheSize uint32
requestInterval time.Duration
extensiveLog bool
pauseExecution bool
chunkDataPackQueryTimeout time.Duration
chunkDataPackDeliveryTimeout time.Duration
enableBlockDataUpload bool
gcpBucketName string
s3BucketName string
apiRatelimits map[string]int
apiBurstlimits map[string]int
executionDataAllowedPeers string
executionDataPrunerHeightRangeTarget uint64
executionDataPrunerThreshold uint64
blobstoreRateLimit int
blobstoreBurstLimit int
chunkDataPackRequestWorkers uint
maxGracefulStopDuration time.Duration
importCheckpointWorkerCount int
transactionExecutionMetricsEnabled bool
transactionExecutionMetricsBufferSize uint

// evm tracing configuration
evmTracingEnabled bool
Expand Down Expand Up @@ -122,6 +124,8 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
flags.IntVar(&exeConf.blobstoreBurstLimit, "blobstore-burst-limit", 0, "outgoing burst limit for Execution Data blobstore")
flags.DurationVar(&exeConf.maxGracefulStopDuration, "max-graceful-stop-duration", stop.DefaultMaxGracefulStopDuration, "the maximum amount of time stop control will wait for ingestion engine to gracefully shutdown before crashing")
flags.IntVar(&exeConf.importCheckpointWorkerCount, "import-checkpoint-worker-count", 10, "number of workers to import checkpoint file during bootstrap")
flags.BoolVar(&exeConf.transactionExecutionMetricsEnabled, "tx-execution-metrics", true, "enable collection of transaction execution metrics")
flags.UintVar(&exeConf.transactionExecutionMetricsBufferSize, "tx-execution-metrics-buffer-size", 200, "buffer size for transaction execution metrics. The buffer size is the number of blocks that are kept in memory by the metrics provider engine")
flags.BoolVar(&exeConf.evmTracingEnabled, "evm-tracing-enabled", false, "enable EVM tracing, when set it will generate traces and upload them to the GCP bucket provided by the --evm-traces-gcp-bucket. Warning: this might affect speed of execution")
flags.StringVar(&exeConf.evmTracesGCPBucket, "evm-traces-gcp-bucket", "", "define GCP bucket name used for uploading EVM traces, must be used in combination with --evm-tracing-enabled. if left empty the upload step is skipped")

Expand Down
45 changes: 23 additions & 22 deletions cmd/util/cmd/check-storage/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package check_storage

import (
"context"
"slices"

"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/onflow/flow-go/cmd/util/ledger/reporters"
"github.com/onflow/flow-go/cmd/util/ledger/util"
"github.com/onflow/flow-go/cmd/util/ledger/util/registers"
"github.com/onflow/flow-go/fvm/evm/emulator/state"
"github.com/onflow/flow-go/fvm/systemcontracts"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/model/flow"
Expand All @@ -29,6 +29,14 @@ var (
flagNWorker int
)

var (
evmAccount flow.Address
evmStorageIDKeys = []string{
state.AccountsStorageIDKey,
state.CodesStorageIDKey,
}
)

var Cmd = &cobra.Command{
Use: "check-storage",
Short: "Check storage health",
Expand Down Expand Up @@ -102,13 +110,8 @@ func run(*cobra.Command, []string) {
log.Fatal().Msg("--state-commitment must be provided when --state is provided")
}

// For now, skip EVM storage account since a different decoder is needed for decoding EVM registers.

systemContracts := systemcontracts.SystemContractsForChain(chainID)

acctsToSkip := []string{
flow.AddressToRegisterOwner(systemContracts.EVMStorage.Address),
}
// Get EVM account by chain
evmAccount = systemcontracts.SystemContractsForChain(chainID).EVMStorage.Address

// Create report in JSONL format
rw := reporters.NewReportFileWriterFactoryWithFormat(flagOutputDirectory, log.Logger, reporters.ReportFormatJSONL).
Expand Down Expand Up @@ -161,13 +164,13 @@ func run(*cobra.Command, []string) {
len(payloads),
)

failedAccountAddresses, err := checkStorageHealth(registersByAccount, flagNWorker, rw, acctsToSkip)
failedAccountAddresses, err := checkStorageHealth(registersByAccount, flagNWorker, rw)
if err != nil {
log.Fatal().Err(err).Msg("failed to check storage health")
}

if len(failedAccountAddresses) == 0 {
log.Info().Msgf("All %d accounts are health", accountCount)
log.Info().Msgf("All %d accounts are healthy", accountCount)
return
}

Expand All @@ -188,7 +191,6 @@ func checkStorageHealth(
registersByAccount *registers.ByAccount,
nWorkers int,
rw reporters.ReportWriter,
acctsToSkip []string,
) (failedAccountAddresses []string, err error) {

accountCount := registersByAccount.AccountCount()
Expand All @@ -211,10 +213,6 @@ func checkStorageHealth(
func(accountRegisters *registers.AccountRegisters) error {
defer logAccount(1)

if slices.Contains(acctsToSkip, accountRegisters.Owner()) {
return nil
}

accountStorageIssues := checkAccountStorageHealth(accountRegisters, nWorkers)

if len(accountStorageIssues) > 0 {
Expand Down Expand Up @@ -281,9 +279,6 @@ func checkStorageHealth(

err = registersByAccount.ForEachAccount(
func(accountRegisters *registers.AccountRegisters) error {
if slices.Contains(acctsToSkip, accountRegisters.Owner()) {
return nil
}
jobs <- job{accountRegisters: accountRegisters}
return nil
})
Expand Down Expand Up @@ -318,6 +313,10 @@ func checkAccountStorageHealth(accountRegisters *registers.AccountRegisters, nWo
}}
}

if isEVMAccount(address) {
return checkEVMAccountStorageHealth(address, accountRegisters)
}

var issues []accountStorageIssue

// Check atree storage health
Expand All @@ -331,7 +330,7 @@ func checkAccountStorageHealth(accountRegisters *registers.AccountRegisters, nWo
issues,
accountStorageIssue{
Address: address.Hex(),
Kind: storageErrorKindString[atreeStorageErrorKind],
Kind: storageErrorKindString[cadenceAtreeStorageErrorKind],
Msg: err.Error(),
})
}
Expand All @@ -345,12 +344,14 @@ type storageErrorKind int

const (
otherErrorKind storageErrorKind = iota
atreeStorageErrorKind
cadenceAtreeStorageErrorKind
evmAtreeStorageErrorKind
)

var storageErrorKindString = map[storageErrorKind]string{
otherErrorKind: "error_check_storage_failed",
atreeStorageErrorKind: "error_atree_storage",
otherErrorKind: "error_check_storage_failed",
cadenceAtreeStorageErrorKind: "error_cadence_atree_storage",
evmAtreeStorageErrorKind: "error_evm_atree_storage",
}

type accountStorageIssue struct {
Expand Down
Loading

0 comments on commit 3362d0b

Please sign in to comment.