diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index cd4c4ae4734..d4d4ee002f4 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -49,6 +49,7 @@ import ( "github.com/onflow/flow-go/engine/execution/checker" "github.com/onflow/flow-go/engine/execution/computation" "github.com/onflow/flow-go/engine/execution/computation/committer" + txmetrics "github.com/onflow/flow-go/engine/execution/computation/metrics" "github.com/onflow/flow-go/engine/execution/ingestion" "github.com/onflow/flow-go/engine/execution/ingestion/fetcher" "github.com/onflow/flow-go/engine/execution/ingestion/loader" @@ -126,7 +127,7 @@ type ExecutionNode struct { ingestionUnit *engine.Unit - collector module.ExecutionMetrics + collector *metrics.ExecutionCollector executionState state.ExecutionState followerState protocol.FollowerState committee hotstuff.DynamicCommittee @@ -159,6 +160,7 @@ type ExecutionNode struct { executionDataTracker storage.ExecutionDataTracker blobService network.BlobService blobserviceDependable *module.ProxiedReadyDoneAware + metricsProvider txmetrics.TransactionExecutionMetricsProvider } func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() { @@ -227,6 +229,7 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() { Component("block data upload manager", exeNode.LoadBlockUploaderManager). Component("GCP block data uploader", exeNode.LoadGCPBlockDataUploader). Component("S3 block data uploader", exeNode.LoadS3BlockDataUploader). + Component("transaction execution metrics", exeNode.LoadTransactionExecutionMetrics). Component("provider engine", exeNode.LoadProviderEngine). Component("checker engine", exeNode.LoadCheckerEngine). Component("ingestion engine", exeNode.LoadIngestionEngine). @@ -543,10 +546,27 @@ func (exeNode *ExecutionNode) LoadProviderEngine( vmCtx := fvm.NewContext(opts...) + var collector module.ExecutionMetrics + collector = exeNode.collector + if exeNode.exeConf.transactionExecutionMetricsEnabled { + // inject the transaction execution metrics + collector = exeNode.collector.WithTransactionCallback( + func(dur time.Duration, stats module.TransactionExecutionResultStats, info module.TransactionExecutionResultInfo) { + exeNode.metricsProvider.Collect( + info.BlockID, + info.BlockHeight, + txmetrics.TransactionExecutionMetrics{ + TransactionID: info.TransactionID, + ExecutionTime: dur, + ExecutionEffortWeights: stats.ComputationIntensities, + }) + }) + } + ledgerViewCommitter := committer.NewLedgerViewCommitter(exeNode.ledgerStorage, node.Tracer) manager, err := computation.New( node.Logger, - exeNode.collector, + collector, node.Tracer, node.Me, node.State, @@ -1047,6 +1067,9 @@ func (exeNode *ExecutionNode) LoadIngestionEngine( // consistency of collection can be checked by checking hash, and hash comes from trusted source (blocks from consensus follower) // hence we not need to check origin requester.WithValidateStaking(false), + // we have observed execution nodes occasionally fail to retrieve collections using this engine, which can cause temporary execution halts + // setting a retry maximum of 10s results in a much faster recovery from these faults (default is 2m) + requester.WithRetryMaximum(10*time.Second), ) if err != nil { @@ -1126,6 +1149,24 @@ func (exeNode *ExecutionNode) LoadScriptsEngine(node *NodeConfig) (module.ReadyD return exeNode.scriptsEng, nil } +func (exeNode *ExecutionNode) LoadTransactionExecutionMetrics( + node *NodeConfig, +) (module.ReadyDoneAware, error) { + lastFinalizedHeader := node.LastFinalizedHeader + + metricsProvider := txmetrics.NewTransactionExecutionMetricsProvider( + node.Logger, + exeNode.executionState, + node.Storage.Headers, + lastFinalizedHeader.Height, + exeNode.exeConf.transactionExecutionMetricsBufferSize, + ) + + node.ProtocolEvents.AddConsumer(metricsProvider) + exeNode.metricsProvider = metricsProvider + return metricsProvider, nil +} + func (exeNode *ExecutionNode) LoadConsensusCommittee( node *NodeConfig, ) ( @@ -1327,6 +1368,7 @@ func (exeNode *ExecutionNode) LoadGrpcServer( exeNode.results, exeNode.txResults, node.Storage.Commits, + exeNode.metricsProvider, node.RootChainID, signature.NewBlockSignerDecoder(exeNode.committee), exeNode.exeConf.apiRatelimits, diff --git a/cmd/execution_config.go b/cmd/execution_config.go index 4a65850c789..c8ba7092c32 100644 --- a/cmd/execution_config.go +++ b/cmd/execution_config.go @@ -25,35 +25,37 @@ import ( // ExecutionConfig contains the configs for starting up execution nodes type ExecutionConfig struct { - rpcConf rpc.Config - triedir string - executionDataDir string - registerDir string - mTrieCacheSize uint32 - transactionResultsCacheSize uint - checkpointDistance uint - checkpointsToKeep uint - chunkDataPackDir string - chunkDataPackCacheSize uint - chunkDataPackRequestsCacheSize uint32 - requestInterval time.Duration - extensiveLog bool - pauseExecution bool - chunkDataPackQueryTimeout time.Duration - chunkDataPackDeliveryTimeout time.Duration - enableBlockDataUpload bool - gcpBucketName string - s3BucketName string - apiRatelimits map[string]int - apiBurstlimits map[string]int - executionDataAllowedPeers string - executionDataPrunerHeightRangeTarget uint64 - executionDataPrunerThreshold uint64 - blobstoreRateLimit int - blobstoreBurstLimit int - chunkDataPackRequestWorkers uint - maxGracefulStopDuration time.Duration - importCheckpointWorkerCount int + rpcConf rpc.Config + triedir string + executionDataDir string + registerDir string + mTrieCacheSize uint32 + transactionResultsCacheSize uint + checkpointDistance uint + checkpointsToKeep uint + chunkDataPackDir string + chunkDataPackCacheSize uint + chunkDataPackRequestsCacheSize uint32 + requestInterval time.Duration + extensiveLog bool + pauseExecution bool + chunkDataPackQueryTimeout time.Duration + chunkDataPackDeliveryTimeout time.Duration + enableBlockDataUpload bool + gcpBucketName string + s3BucketName string + apiRatelimits map[string]int + apiBurstlimits map[string]int + executionDataAllowedPeers string + executionDataPrunerHeightRangeTarget uint64 + executionDataPrunerThreshold uint64 + blobstoreRateLimit int + blobstoreBurstLimit int + chunkDataPackRequestWorkers uint + maxGracefulStopDuration time.Duration + importCheckpointWorkerCount int + transactionExecutionMetricsEnabled bool + transactionExecutionMetricsBufferSize uint // evm tracing configuration evmTracingEnabled bool @@ -122,6 +124,8 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) { flags.IntVar(&exeConf.blobstoreBurstLimit, "blobstore-burst-limit", 0, "outgoing burst limit for Execution Data blobstore") flags.DurationVar(&exeConf.maxGracefulStopDuration, "max-graceful-stop-duration", stop.DefaultMaxGracefulStopDuration, "the maximum amount of time stop control will wait for ingestion engine to gracefully shutdown before crashing") flags.IntVar(&exeConf.importCheckpointWorkerCount, "import-checkpoint-worker-count", 10, "number of workers to import checkpoint file during bootstrap") + flags.BoolVar(&exeConf.transactionExecutionMetricsEnabled, "tx-execution-metrics", true, "enable collection of transaction execution metrics") + flags.UintVar(&exeConf.transactionExecutionMetricsBufferSize, "tx-execution-metrics-buffer-size", 200, "buffer size for transaction execution metrics. The buffer size is the number of blocks that are kept in memory by the metrics provider engine") flags.BoolVar(&exeConf.evmTracingEnabled, "evm-tracing-enabled", false, "enable EVM tracing, when set it will generate traces and upload them to the GCP bucket provided by the --evm-traces-gcp-bucket. Warning: this might affect speed of execution") flags.StringVar(&exeConf.evmTracesGCPBucket, "evm-traces-gcp-bucket", "", "define GCP bucket name used for uploading EVM traces, must be used in combination with --evm-tracing-enabled. if left empty the upload step is skipped") diff --git a/cmd/util/cmd/check-storage/cmd.go b/cmd/util/cmd/check-storage/cmd.go index 5727d7a51d5..94d739d2768 100644 --- a/cmd/util/cmd/check-storage/cmd.go +++ b/cmd/util/cmd/check-storage/cmd.go @@ -2,7 +2,6 @@ package check_storage import ( "context" - "slices" "github.com/rs/zerolog/log" "github.com/spf13/cobra" @@ -14,6 +13,7 @@ import ( "github.com/onflow/flow-go/cmd/util/ledger/reporters" "github.com/onflow/flow-go/cmd/util/ledger/util" "github.com/onflow/flow-go/cmd/util/ledger/util/registers" + "github.com/onflow/flow-go/fvm/evm/emulator/state" "github.com/onflow/flow-go/fvm/systemcontracts" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/model/flow" @@ -29,6 +29,14 @@ var ( flagNWorker int ) +var ( + evmAccount flow.Address + evmStorageIDKeys = []string{ + state.AccountsStorageIDKey, + state.CodesStorageIDKey, + } +) + var Cmd = &cobra.Command{ Use: "check-storage", Short: "Check storage health", @@ -102,13 +110,8 @@ func run(*cobra.Command, []string) { log.Fatal().Msg("--state-commitment must be provided when --state is provided") } - // For now, skip EVM storage account since a different decoder is needed for decoding EVM registers. - - systemContracts := systemcontracts.SystemContractsForChain(chainID) - - acctsToSkip := []string{ - flow.AddressToRegisterOwner(systemContracts.EVMStorage.Address), - } + // Get EVM account by chain + evmAccount = systemcontracts.SystemContractsForChain(chainID).EVMStorage.Address // Create report in JSONL format rw := reporters.NewReportFileWriterFactoryWithFormat(flagOutputDirectory, log.Logger, reporters.ReportFormatJSONL). @@ -161,13 +164,13 @@ func run(*cobra.Command, []string) { len(payloads), ) - failedAccountAddresses, err := checkStorageHealth(registersByAccount, flagNWorker, rw, acctsToSkip) + failedAccountAddresses, err := checkStorageHealth(registersByAccount, flagNWorker, rw) if err != nil { log.Fatal().Err(err).Msg("failed to check storage health") } if len(failedAccountAddresses) == 0 { - log.Info().Msgf("All %d accounts are health", accountCount) + log.Info().Msgf("All %d accounts are healthy", accountCount) return } @@ -188,7 +191,6 @@ func checkStorageHealth( registersByAccount *registers.ByAccount, nWorkers int, rw reporters.ReportWriter, - acctsToSkip []string, ) (failedAccountAddresses []string, err error) { accountCount := registersByAccount.AccountCount() @@ -211,10 +213,6 @@ func checkStorageHealth( func(accountRegisters *registers.AccountRegisters) error { defer logAccount(1) - if slices.Contains(acctsToSkip, accountRegisters.Owner()) { - return nil - } - accountStorageIssues := checkAccountStorageHealth(accountRegisters, nWorkers) if len(accountStorageIssues) > 0 { @@ -281,9 +279,6 @@ func checkStorageHealth( err = registersByAccount.ForEachAccount( func(accountRegisters *registers.AccountRegisters) error { - if slices.Contains(acctsToSkip, accountRegisters.Owner()) { - return nil - } jobs <- job{accountRegisters: accountRegisters} return nil }) @@ -318,6 +313,10 @@ func checkAccountStorageHealth(accountRegisters *registers.AccountRegisters, nWo }} } + if isEVMAccount(address) { + return checkEVMAccountStorageHealth(address, accountRegisters) + } + var issues []accountStorageIssue // Check atree storage health @@ -331,7 +330,7 @@ func checkAccountStorageHealth(accountRegisters *registers.AccountRegisters, nWo issues, accountStorageIssue{ Address: address.Hex(), - Kind: storageErrorKindString[atreeStorageErrorKind], + Kind: storageErrorKindString[cadenceAtreeStorageErrorKind], Msg: err.Error(), }) } @@ -345,12 +344,14 @@ type storageErrorKind int const ( otherErrorKind storageErrorKind = iota - atreeStorageErrorKind + cadenceAtreeStorageErrorKind + evmAtreeStorageErrorKind ) var storageErrorKindString = map[storageErrorKind]string{ - otherErrorKind: "error_check_storage_failed", - atreeStorageErrorKind: "error_atree_storage", + otherErrorKind: "error_check_storage_failed", + cadenceAtreeStorageErrorKind: "error_cadence_atree_storage", + evmAtreeStorageErrorKind: "error_evm_atree_storage", } type accountStorageIssue struct { diff --git a/cmd/util/cmd/check-storage/evm_account_storage_health.go b/cmd/util/cmd/check-storage/evm_account_storage_health.go new file mode 100644 index 00000000000..bc5608e93cc --- /dev/null +++ b/cmd/util/cmd/check-storage/evm_account_storage_health.go @@ -0,0 +1,498 @@ +package check_storage + +import ( + "bytes" + "cmp" + "fmt" + "slices" + + "golang.org/x/exp/maps" + + "github.com/onflow/atree" + + "github.com/onflow/cadence/runtime" + "github.com/onflow/cadence/runtime/common" + + "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/cmd/util/ledger/util/registers" + "github.com/onflow/flow-go/fvm/evm/emulator/state" + "github.com/onflow/flow-go/model/flow" +) + +var ( + compareSlabID = func(a, b atree.SlabID) int { + return a.Compare(b) + } + + equalSlabID = func(a, b atree.SlabID) bool { + return a.Compare(b) == 0 + } +) + +// checkEVMAccountStorageHealth checks storage health of cadence-atree +// registers and evm-atree registers in evm account. +func checkEVMAccountStorageHealth( + address common.Address, + accountRegisters *registers.AccountRegisters, +) []accountStorageIssue { + var issues []accountStorageIssue + + ledger := NewReadOnlyLedgerWithAtreeRegisterReadSet(accountRegisters) + + // Check health of cadence-atree registers. + issues = append( + issues, + checkCadenceAtreeRegistersInEVMAccount(address, ledger)..., + ) + + // Check health of evm-atree registers. + issues = append( + issues, + checkEVMAtreeRegistersInEVMAccount(address, ledger)..., + ) + + // Check unreferenced atree registers. + // If any atree registers are not accessed during health check of + // cadence-atree and evm-atree registers, these atree registers are + // unreferenced. + issues = append( + issues, + checkUnreferencedAtreeRegisters(address, ledger, accountRegisters)..., + ) + + return issues +} + +// checkCadenceAtreeRegistersInEVMAccount checks health of cadence-atree registers. +func checkCadenceAtreeRegistersInEVMAccount( + address common.Address, + ledger atree.Ledger, +) []accountStorageIssue { + var issues []accountStorageIssue + + storage := runtime.NewStorage(ledger, nil) + + // Load Cadence domains storage map, so atree slab iterator can traverse connected slabs from loaded root slab. + // NOTE: don't preload all atree slabs in evm account because evm-atree registers require evm-atree decoder. + + for _, domain := range util.StorageMapDomains { + _ = storage.GetStorageMap(address, domain, false) + } + + err := storage.CheckHealth() + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[cadenceAtreeStorageErrorKind], + Msg: fmt.Sprintf("cadence-atree registers health check failed in evm account: %s", err), + }) + } + + return issues +} + +// checkEVMAtreeRegistersInEVMAccount checks health of evm-atree registers. +func checkEVMAtreeRegistersInEVMAccount( + address common.Address, + ledger atree.Ledger, +) []accountStorageIssue { + var issues []accountStorageIssue + + baseStorage := atree.NewLedgerBaseStorage(ledger) + + storage, err := state.NewPersistentSlabStorage(baseStorage) + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to create atree.PersistentSlabStorage for evm registers: %s", err), + }) + return issues + } + + domainSlabIDs := make(map[string]atree.SlabID) + + // Load evm domain root slabs. + for _, domain := range evmStorageIDKeys { + rawDomainSlabID, err := ledger.GetValue(address[:], []byte(domain)) + if err != nil { + issues = append(issues, accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to get evm domain %s raw slab ID: %s", domain, err), + }) + continue + } + + if len(rawDomainSlabID) == 0 { + continue + } + + domainSlabID, err := atree.NewSlabIDFromRawBytes(rawDomainSlabID) + if err != nil { + issues = append(issues, accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to convert evm domain %s raw slab ID %x to atree slab ID: %s", domain, rawDomainSlabID, err), + }) + continue + } + + // Retrieve evm domain storage register so slab iterator can traverse connected slabs from root slab. + + _, found, err := storage.Retrieve(domainSlabID) + if err != nil { + issues = append(issues, accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to retrieve evm domain %s root slab %s: %s", domain, domainSlabID, err), + }) + continue + } + if !found { + issues = append(issues, accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to find evm domain %s root slab %s", domain, domainSlabID), + }) + continue + } + + domainSlabIDs[domain] = domainSlabID + } + + if len(domainSlabIDs) == 0 { + return issues + } + + // Get evm storage slot slab IDs. + storageSlotSlabIDs, storageSlotIssues := getStorageSlotRootSlabIDs( + address, + domainSlabIDs[state.AccountsStorageIDKey], + storage) + + issues = append(issues, storageSlotIssues...) + + // Load evm storage slot slabs so slab iterator can traverse connected slabs in storage health check. + for _, id := range storageSlotSlabIDs { + _, found, err := storage.Retrieve(id) + if err != nil { + issues = append(issues, accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to retrieve evm storage slot %s: %s", id, err), + }) + } + if !found { + issues = append(issues, accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to find evm storage slot %s", id), + }) + } + } + + // Expected root slabs include domain root slabs and storage slot root slabs. + + expectedRootSlabIDs := make([]atree.SlabID, 0, len(domainSlabIDs)+len(storageSlotSlabIDs)) + expectedRootSlabIDs = append(expectedRootSlabIDs, maps.Values(domainSlabIDs)...) + expectedRootSlabIDs = append(expectedRootSlabIDs, storageSlotSlabIDs...) + + issues = append( + issues, + // Check storage health of evm-atree registers + checkHealthWithExpectedRootSlabIDs(address, storage, expectedRootSlabIDs)..., + ) + + return issues +} + +// getStorageSlotRootSlabIDs returns evm storage slot root slab IDs. +func getStorageSlotRootSlabIDs( + address common.Address, + accountStorageRootSlabID atree.SlabID, + storage *atree.PersistentSlabStorage, +) ([]atree.SlabID, []accountStorageIssue) { + + if accountStorageRootSlabID == atree.SlabIDUndefined { + return nil, nil + } + + var issues []accountStorageIssue + + // Load account storage map + m, err := atree.NewMapWithRootID(storage, accountStorageRootSlabID, atree.NewDefaultDigesterBuilder()) + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to load evm storage slot %s: %s", accountStorageRootSlabID, err), + }) + return nil, issues + } + + storageSlotRootSlabIDs := make(map[atree.SlabID]struct{}) + + // Iterate accounts in account storage map to get storage slot collection ID. + err = m.IterateReadOnly(func(key, value atree.Value) (bool, error) { + // Check address type + acctAddress, ok := key.(state.ByteStringValue) + if !ok { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("expect evm account address as ByteStringValue, got %T", key), + }) + return true, nil + } + + // Check encoded account type + encodedAccount, ok := value.(state.ByteStringValue) + if !ok { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("expect evm account as ByteStringValue, got %T", key), + }) + return true, nil + } + + // Decode account + acct, err := state.DecodeAccount(encodedAccount.Bytes()) + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to decode account %x in evm account storage: %s", acctAddress.Bytes(), err), + }) + return true, nil + } + + storageSlotCollectionID := acct.CollectionID + + if len(storageSlotCollectionID) == 0 { + return true, nil + } + + storageSlotSlabID, err := atree.NewSlabIDFromRawBytes(storageSlotCollectionID) + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to convert storage slot collection ID %x to atree slab ID: %s", storageSlotCollectionID, err), + }) + return true, nil + } + + // Check storage slot is not double referenced. + if _, ok := storageSlotRootSlabIDs[storageSlotSlabID]; ok { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("found storage slot collection %x referenced by multiple accounts", storageSlotCollectionID), + }) + } + + storageSlotRootSlabIDs[storageSlotSlabID] = struct{}{} + + return true, nil + }) + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to iterate EVM account storage map: %s", err), + }) + } + + return maps.Keys(storageSlotRootSlabIDs), nil +} + +// checkHealthWithExpectedRootSlabIDs checks atree registers in storage (loaded and connected registers). +func checkHealthWithExpectedRootSlabIDs( + address common.Address, + storage *atree.PersistentSlabStorage, + expectedRootSlabIDs []atree.SlabID, +) []accountStorageIssue { + var issues []accountStorageIssue + + // Check atree storage health + rootSlabIDSet, err := atree.CheckStorageHealth(storage, -1) + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("evm atree storage check failed: %s", err), + }) + return issues + } + + // Check if returned root slab IDs match expected root slab IDs. + + rootSlabIDs := maps.Keys(rootSlabIDSet) + slices.SortFunc(rootSlabIDs, compareSlabID) + + slices.SortFunc(expectedRootSlabIDs, compareSlabID) + + if !slices.EqualFunc(expectedRootSlabIDs, rootSlabIDs, equalSlabID) { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("root slabs %v from storage health check != expected root slabs %v", rootSlabIDs, expectedRootSlabIDs), + }) + } + + return issues +} + +// checkUnreferencedAtreeRegisters checks if all atree registers in account has been read through ledger. +func checkUnreferencedAtreeRegisters( + address common.Address, + ledger *ReadOnlyLedgerWithAtreeRegisterReadSet, + accountRegisters *registers.AccountRegisters, +) []accountStorageIssue { + var issues []accountStorageIssue + + allAtreeRegisterIDs, err := getAtreeRegisterIDsFromRegisters(accountRegisters) + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[otherErrorKind], + Msg: fmt.Sprintf("failed to get atree register IDs from account registers: %s", err), + }) + return issues + } + + // Check for unreferenced atree slabs by verifing all atree slabs in accountRegisters are read + // during storage health check for evm-atree and cadence-atree registers. + + if ledger.GetAtreeRegisterReadCount() == len(allAtreeRegisterIDs) { + return issues + } + + if ledger.GetAtreeRegisterReadCount() > len(allAtreeRegisterIDs) { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[otherErrorKind], + Msg: fmt.Sprintf("%d atree registers was read > %d atree registers in evm account", + ledger.GetAtreeRegisterReadCount(), + len(allAtreeRegisterIDs)), + }) + return issues + } + + unreferencedAtreeRegisterIDs := make([]flow.RegisterID, 0, len(allAtreeRegisterIDs)-ledger.GetAtreeRegisterReadCount()) + + for _, id := range allAtreeRegisterIDs { + if !ledger.IsAtreeRegisterRead(id) { + unreferencedAtreeRegisterIDs = append(unreferencedAtreeRegisterIDs, id) + } + } + + slices.SortFunc(unreferencedAtreeRegisterIDs, func(a, b flow.RegisterID) int { + return cmp.Compare(a.Key, b.Key) + }) + + issues = append(issues, accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf( + "number of read atree slabs %d != number of atree slabs in storage %d: unreferenced atree registers %v", + ledger.GetAtreeRegisterReadCount(), + len(allAtreeRegisterIDs), + unreferencedAtreeRegisterIDs, + ), + }) + + return issues +} + +func getAtreeRegisterIDsFromRegisters(registers registers.Registers) ([]flow.RegisterID, error) { + registerIDs := make([]flow.RegisterID, 0, registers.Count()) + + err := registers.ForEach(func(owner string, key string, _ []byte) error { + if !flow.IsSlabIndexKey(key) { + return nil + } + + registerIDs = append( + registerIDs, + flow.NewRegisterID(flow.BytesToAddress([]byte(owner)), key), + ) + + return nil + }) + if err != nil { + return nil, err + } + + return registerIDs, nil +} + +func isEVMAccount(owner common.Address) bool { + return bytes.Equal(owner[:], evmAccount[:]) +} + +type ReadOnlyLedgerWithAtreeRegisterReadSet struct { + *registers.ReadOnlyLedger + atreeRegistersReadSet map[flow.RegisterID]struct{} +} + +var _ atree.Ledger = &ReadOnlyLedgerWithAtreeRegisterReadSet{} + +func NewReadOnlyLedgerWithAtreeRegisterReadSet( + accountRegisters *registers.AccountRegisters, +) *ReadOnlyLedgerWithAtreeRegisterReadSet { + return &ReadOnlyLedgerWithAtreeRegisterReadSet{ + ReadOnlyLedger: ®isters.ReadOnlyLedger{Registers: accountRegisters}, + atreeRegistersReadSet: make(map[flow.RegisterID]struct{}), + } +} + +func (l *ReadOnlyLedgerWithAtreeRegisterReadSet) GetValue(address, key []byte) (value []byte, err error) { + value, err = l.ReadOnlyLedger.GetValue(address, key) + if err != nil { + return nil, err + } + + if flow.IsSlabIndexKey(string(key)) { + registerID := flow.NewRegisterID(flow.BytesToAddress(address), string(key)) + l.atreeRegistersReadSet[registerID] = struct{}{} + } + return value, nil +} + +func (l *ReadOnlyLedgerWithAtreeRegisterReadSet) GetAtreeRegisterReadCount() int { + return len(l.atreeRegistersReadSet) +} + +func (l *ReadOnlyLedgerWithAtreeRegisterReadSet) IsAtreeRegisterRead(id flow.RegisterID) bool { + _, ok := l.atreeRegistersReadSet[id] + return ok +} diff --git a/cmd/util/cmd/check-storage/evm_account_storage_health_test.go b/cmd/util/cmd/check-storage/evm_account_storage_health_test.go new file mode 100644 index 00000000000..babb96ea796 --- /dev/null +++ b/cmd/util/cmd/check-storage/evm_account_storage_health_test.go @@ -0,0 +1,153 @@ +package check_storage + +import ( + "strconv" + "testing" + + "github.com/holiman/uint256" + "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" + + "github.com/onflow/atree" + "github.com/onflow/cadence/runtime" + "github.com/onflow/cadence/runtime/common" + "github.com/onflow/cadence/runtime/interpreter" + gethCommon "github.com/onflow/go-ethereum/common" + + "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/cmd/util/ledger/util/registers" + "github.com/onflow/flow-go/fvm/evm/emulator/state" + "github.com/onflow/flow-go/fvm/evm/testutils" + "github.com/onflow/flow-go/fvm/evm/types" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/convert" + "github.com/onflow/flow-go/model/flow" +) + +func TestEVMAccountStorageHealth(t *testing.T) { + address := common.Address{1} + + t.Run("has storage slot", func(t *testing.T) { + led := createPayloadLedger() + + createEVMStorage(t, led, address) + + createCadenceStorage(t, led, address) + + payloads := maps.Values(led.Payloads) + + accountRegisters, err := registers.NewAccountRegistersFromPayloads(string(address[:]), payloads) + require.NoError(t, err) + + issues := checkEVMAccountStorageHealth( + address, + accountRegisters, + ) + require.Equal(t, 0, len(issues)) + }) + + t.Run("unreferenced slabs", func(t *testing.T) { + led := createPayloadLedger() + + createEVMStorage(t, led, address) + + createCadenceStorage(t, led, address) + + payloads := maps.Values(led.Payloads) + + // Add unreferenced slabs + slabIndex, err := led.AllocateSlabIndexFunc(address[:]) + require.NoError(t, err) + + registerID := flow.NewRegisterID( + flow.BytesToAddress(address[:]), + string(atree.SlabIndexToLedgerKey(slabIndex))) + + unreferencedPayload := ledger.NewPayload( + convert.RegisterIDToLedgerKey(registerID), + ledger.Value([]byte{1})) + + payloads = append(payloads, unreferencedPayload) + + accountRegisters, err := registers.NewAccountRegistersFromPayloads(string(address[:]), payloads) + require.NoError(t, err) + + issues := checkEVMAccountStorageHealth( + address, + accountRegisters, + ) + require.Equal(t, 1, len(issues)) + require.Equal(t, storageErrorKindString[evmAtreeStorageErrorKind], issues[0].Kind) + require.Contains(t, issues[0].Msg, "unreferenced atree registers") + }) +} + +func createEVMStorage(t *testing.T, ledger atree.Ledger, address common.Address) { + view, err := state.NewBaseView(ledger, flow.BytesToAddress(address[:])) + require.NoError(t, err) + + // Create an account without storage slot + addr1 := testutils.RandomCommonAddress(t) + + err = view.CreateAccount(addr1, uint256.NewInt(1), 2, []byte("ABC"), gethCommon.Hash{3, 4, 5}) + require.NoError(t, err) + + // Create an account with storage slot + addr2 := testutils.RandomCommonAddress(t) + + err = view.CreateAccount(addr2, uint256.NewInt(6), 7, []byte("DEF"), gethCommon.Hash{8, 9, 19}) + require.NoError(t, err) + + slot := types.SlotAddress{ + Address: addr2, + Key: testutils.RandomCommonHash(t), + } + + err = view.UpdateSlot(slot, testutils.RandomCommonHash(t)) + require.NoError(t, err) + + err = view.Commit() + require.NoError(t, err) +} + +func createCadenceStorage(t *testing.T, ledger atree.Ledger, address common.Address) { + storage := runtime.NewStorage(ledger, nil) + + inter, err := interpreter.NewInterpreter( + nil, + nil, + &interpreter.Config{ + Storage: storage, + }, + ) + require.NoError(t, err) + + // Create storage and public domains + for _, domain := range []string{"storage", "public"} { + storageDomain := storage.GetStorageMap(address, domain, true) + + // Create large domain map so there are more than one atree registers under the hood. + for i := 0; i < 100; i++ { + key := interpreter.StringStorageMapKey(domain + "_key_" + strconv.Itoa(i)) + value := interpreter.NewUnmeteredStringValue(domain + "_value_" + strconv.Itoa(i)) + storageDomain.SetValue(inter, key, value) + } + } + + // Commit domain data + err = storage.Commit(inter, false) + require.NoError(t, err) +} + +func createPayloadLedger() *util.PayloadsLedger { + nextSlabIndex := atree.SlabIndex{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1} + + return &util.PayloadsLedger{ + Payloads: make(map[flow.RegisterID]*ledger.Payload), + AllocateSlabIndexFunc: func([]byte) (atree.SlabIndex, error) { + var slabIndex atree.SlabIndex + slabIndex, nextSlabIndex = nextSlabIndex, nextSlabIndex.Next() + return slabIndex, nil + }, + } +} diff --git a/cmd/util/cmd/common/clusters.go b/cmd/util/cmd/common/clusters.go index 6c77db7fbb3..6451f1494e7 100644 --- a/cmd/util/cmd/common/clusters.go +++ b/cmd/util/cmd/common/clusters.go @@ -38,9 +38,9 @@ import ( // - error: if any error occurs. Any error returned from this function is irrecoverable. func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes flow.IdentityList, numCollectionClusters int) (flow.AssignmentList, flow.ClusterList, error) { - partners := partnerNodes.Filter(filter.HasRole[flow.Identity](flow.RoleCollection)) - internals := internalNodes.Filter(filter.HasRole[flow.Identity](flow.RoleCollection)) - nCollectors := len(partners) + len(internals) + partnerCollectors := partnerNodes.Filter(filter.HasRole[flow.Identity](flow.RoleCollection)) + internalCollectors := internalNodes.Filter(filter.HasRole[flow.Identity](flow.RoleCollection)) + nCollectors := len(partnerCollectors) + len(internalCollectors) // ensure we have at least as many collection nodes as clusters if nCollectors < int(numCollectionClusters) { @@ -49,32 +49,24 @@ func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes } // shuffle both collector lists based on a non-deterministic algorithm - partners, err := partners.Shuffle() + partnerCollectors, err := partnerCollectors.Shuffle() if err != nil { log.Fatal().Err(err).Msg("could not shuffle partners") } - internals, err = internals.Shuffle() + internalCollectors, err = internalCollectors.Shuffle() if err != nil { log.Fatal().Err(err).Msg("could not shuffle internals") } - // The following is a heuristic for distributing the internal collector nodes (private staking key available - // to generate QC for cluster root block) and partner nodes (private staking unknown). We need internal nodes - // to control strictly more than 2/3 of the cluster's total weight. - // The following is a heuristic that distributes collectors round-robbin across the specified number of clusters. - // This heuristic only works when all collectors have equal weight! The following sanity check enforces this: - if len(partnerNodes) > 0 && len(partnerNodes) > 2*len(internalNodes) { - return nil, nil, fmt.Errorf("requiring at least x>0 number of partner nodes and y > 2x number of internal nodes, but got x,y=%d,%d", len(partnerNodes), len(internalNodes)) - } - // sanity check ^ enforces that there is at least one internal node, hence `internalNodes[0].InitialWeight` is always a valid reference weight - refWeight := internalNodes[0].InitialWeight + // capture first reference weight to validate that all collectors have equal weight + refWeight := internalCollectors[0].InitialWeight identifierLists := make([]flow.IdentifierList, numCollectionClusters) // array to track the 2/3 internal-nodes constraint (internal_nodes > 2 * partner_nodes) constraint := make([]int, numCollectionClusters) // first, round-robin internal nodes into each cluster - for i, node := range internals { + for i, node := range internalCollectors { if node.InitialWeight != refWeight { return nil, nil, fmt.Errorf("current implementation requires all collectors (partner & interal nodes) to have equal weight") } @@ -84,7 +76,7 @@ func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes } // next, round-robin partner nodes into each cluster - for i, node := range partners { + for i, node := range partnerCollectors { if node.InitialWeight != refWeight { return nil, nil, fmt.Errorf("current implementation requires all collectors (partner & interal nodes) to have equal weight") } @@ -102,7 +94,7 @@ func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes assignments := assignment.FromIdentifierLists(identifierLists) - collectors := append(partners, internals...) + collectors := append(partnerCollectors, internalCollectors...) clusters, err := factory.NewClusterList(assignments, collectors.ToSkeleton()) if err != nil { log.Fatal().Err(err).Msg("could not create cluster list") diff --git a/engine/access/mock/execution_api_client.go b/engine/access/mock/execution_api_client.go index fd5fc20c718..7aa04d143c7 100644 --- a/engine/access/mock/execution_api_client.go +++ b/engine/access/mock/execution_api_client.go @@ -349,6 +349,43 @@ func (_m *ExecutionAPIClient) GetTransactionErrorMessagesByBlockID(ctx context.C return r0, r1 } +// GetTransactionExecutionMetricsAfter provides a mock function with given fields: ctx, in, opts +func (_m *ExecutionAPIClient) GetTransactionExecutionMetricsAfter(ctx context.Context, in *execution.GetTransactionExecutionMetricsAfterRequest, opts ...grpc.CallOption) (*execution.GetTransactionExecutionMetricsAfterResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for GetTransactionExecutionMetricsAfter") + } + + var r0 *execution.GetTransactionExecutionMetricsAfterResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest, ...grpc.CallOption) (*execution.GetTransactionExecutionMetricsAfterResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest, ...grpc.CallOption) *execution.GetTransactionExecutionMetricsAfterResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*execution.GetTransactionExecutionMetricsAfterResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetTransactionResult provides a mock function with given fields: ctx, in, opts func (_m *ExecutionAPIClient) GetTransactionResult(ctx context.Context, in *execution.GetTransactionResultRequest, opts ...grpc.CallOption) (*execution.GetTransactionResultResponse, error) { _va := make([]interface{}, len(opts)) diff --git a/engine/access/mock/execution_api_server.go b/engine/access/mock/execution_api_server.go index e61517cb617..2c3f1f3b5a7 100644 --- a/engine/access/mock/execution_api_server.go +++ b/engine/access/mock/execution_api_server.go @@ -284,6 +284,36 @@ func (_m *ExecutionAPIServer) GetTransactionErrorMessagesByBlockID(_a0 context.C return r0, r1 } +// GetTransactionExecutionMetricsAfter provides a mock function with given fields: _a0, _a1 +func (_m *ExecutionAPIServer) GetTransactionExecutionMetricsAfter(_a0 context.Context, _a1 *execution.GetTransactionExecutionMetricsAfterRequest) (*execution.GetTransactionExecutionMetricsAfterResponse, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for GetTransactionExecutionMetricsAfter") + } + + var r0 *execution.GetTransactionExecutionMetricsAfterResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest) (*execution.GetTransactionExecutionMetricsAfterResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest) *execution.GetTransactionExecutionMetricsAfterResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*execution.GetTransactionExecutionMetricsAfterResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetTransactionResult provides a mock function with given fields: _a0, _a1 func (_m *ExecutionAPIServer) GetTransactionResult(_a0 context.Context, _a1 *execution.GetTransactionResultRequest) (*execution.GetTransactionResultResponse, error) { ret := _m.Called(_a0, _a1) diff --git a/engine/execution/computation/computer/computer.go b/engine/execution/computation/computer/computer.go index c3e77b9be7f..e863a4d23d1 100644 --- a/engine/execution/computation/computer/computer.go +++ b/engine/execution/computation/computer/computer.go @@ -256,7 +256,6 @@ func (e *blockComputer) queueTransactionRequests( i == len(collection.Transactions)-1) txnIndex += 1 } - } systemCtx := fvm.NewContextFromParent( diff --git a/engine/execution/computation/metrics/collector.go b/engine/execution/computation/metrics/collector.go new file mode 100644 index 00000000000..8f3438d4658 --- /dev/null +++ b/engine/execution/computation/metrics/collector.go @@ -0,0 +1,130 @@ +package metrics + +import ( + "sync" + + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" +) + +type collector struct { + log zerolog.Logger + + collection chan metrics + + mu sync.Mutex + + lowestAvailableHeight uint64 + blocksAtHeight map[uint64]map[flow.Identifier]struct{} + metrics map[flow.Identifier][]TransactionExecutionMetrics +} + +func newCollector( + log zerolog.Logger, + lowestAvailableHeight uint64, +) *collector { + return &collector{ + log: log, + lowestAvailableHeight: lowestAvailableHeight, + + collection: make(chan metrics, 1000), + blocksAtHeight: make(map[uint64]map[flow.Identifier]struct{}), + metrics: make(map[flow.Identifier][]TransactionExecutionMetrics), + } +} + +// Collect should never block because it's called from the execution +func (c *collector) Collect( + blockId flow.Identifier, + blockHeight uint64, + t TransactionExecutionMetrics, +) { + select { + case c.collection <- metrics{ + TransactionExecutionMetrics: t, + blockHeight: blockHeight, + blockId: blockId, + }: + default: + c.log.Warn(). + Uint64("height", blockHeight). + Msg("dropping metrics because the collection channel is full") + } +} + +func (c *collector) metricsCollectorWorker( + ctx irrecoverable.SignalerContext, + ready component.ReadyFunc, +) { + ready() + + for { + select { + case <-ctx.Done(): + return + case m := <-c.collection: + c.collect(m.blockId, m.blockHeight, m.TransactionExecutionMetrics) + } + } +} + +func (c *collector) collect( + blockId flow.Identifier, + blockHeight uint64, + t TransactionExecutionMetrics, +) { + c.mu.Lock() + defer c.mu.Unlock() + + if blockHeight <= c.lowestAvailableHeight { + c.log.Warn(). + Uint64("height", blockHeight). + Uint64("lowestAvailableHeight", c.lowestAvailableHeight). + Msg("received metrics for a block that is older or equal than the most recent block") + return + } + + if _, ok := c.blocksAtHeight[blockHeight]; !ok { + c.blocksAtHeight[blockHeight] = make(map[flow.Identifier]struct{}) + } + c.blocksAtHeight[blockHeight][blockId] = struct{}{} + c.metrics[blockId] = append(c.metrics[blockId], t) +} + +// Pop returns the metrics for the given finalized block at the given height +// and clears all data up to the given height. +func (c *collector) Pop(height uint64, finalizedBlockId flow.Identifier) []TransactionExecutionMetrics { + c.mu.Lock() + defer c.mu.Unlock() + + if height <= c.lowestAvailableHeight { + c.log.Warn(). + Uint64("height", height). + Stringer("finalizedBlockId", finalizedBlockId). + Msg("requested metrics for a finalizedBlockId that is older or equal than the most recent finalizedBlockId") + return nil + } + + // only return metrics for finalized block + metrics := c.metrics[finalizedBlockId] + + c.advanceTo(height) + + return metrics +} + +// advanceTo moves the latest height to the given height +// all data at lower heights will be deleted +func (c *collector) advanceTo(height uint64) { + for c.lowestAvailableHeight < height { + blocks := c.blocksAtHeight[c.lowestAvailableHeight] + for block := range blocks { + delete(c.metrics, block) + } + delete(c.blocksAtHeight, c.lowestAvailableHeight) + c.lowestAvailableHeight++ + } +} diff --git a/engine/execution/computation/metrics/collector_test.go b/engine/execution/computation/metrics/collector_test.go new file mode 100644 index 00000000000..14882c5f1c0 --- /dev/null +++ b/engine/execution/computation/metrics/collector_test.go @@ -0,0 +1,98 @@ +package metrics + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/irrecoverable" +) + +func Test_CollectorPopOnEmpty(t *testing.T) { + t.Parallel() + + log := zerolog.New(zerolog.NewTestWriter(t)) + latestHeight := uint64(100) + + collector := newCollector(log, latestHeight) + + data := collector.Pop(latestHeight, flow.ZeroID) + require.Nil(t, data) +} + +func Test_CollectorCollection(t *testing.T) { + log := zerolog.New(zerolog.NewTestWriter(t)) + startHeight := uint64(100) + + collector := newCollector(log, startHeight) + + ctx := context.Background() + go func() { + ictx := irrecoverable.NewMockSignalerContext(t, ctx) + collector.metricsCollectorWorker(ictx, func() {}) + }() + + wg := sync.WaitGroup{} + + wg.Add(16 * 16 * 16) + for height := 0; height < 16; height++ { + // for each height we add multiple blocks. Only one block will be popped per height + for block := 0; block < 16; block++ { + // for each block we add multiple transactions + for transaction := 0; transaction < 16; transaction++ { + go func(h, b, t int) { + defer wg.Done() + + block := flow.Identifier{} + block[0] = byte(h) + block[1] = byte(b) + + collector.Collect( + block, + startHeight+1+uint64(h), + TransactionExecutionMetrics{ + ExecutionTime: time.Duration(b + t), + }, + ) + }(height, block, transaction) + } + // wait a bit for the collector to process the data + <-time.After(1 * time.Millisecond) + } + } + + wg.Wait() + // wait a bit for the collector to process the data + <-time.After(10 * time.Millisecond) + + // there should be no data at the start height + data := collector.Pop(startHeight, flow.ZeroID) + require.Nil(t, data) + + for height := 0; height < 16; height++ { + block := flow.Identifier{} + block[0] = byte(height) + // always pop the first block each height + block[1] = byte(0) + + data := collector.Pop(startHeight+1+uint64(height), block) + + require.Len(t, data, 16) + } + + block := flow.Identifier{} + block[0] = byte(15) + block[1] = byte(1) + // height 16 was already popped so there should be no more data for any blocks + data = collector.Pop(startHeight+16, block) + require.Nil(t, data) + + // there should be no data past the last collected height + data = collector.Pop(startHeight+17, flow.ZeroID) + require.Nil(t, data) +} diff --git a/engine/execution/computation/metrics/provider.go b/engine/execution/computation/metrics/provider.go new file mode 100644 index 00000000000..c23a426141d --- /dev/null +++ b/engine/execution/computation/metrics/provider.go @@ -0,0 +1,124 @@ +package metrics + +import ( + "sync" + + "github.com/rs/zerolog" +) + +// provider is responsible for providing the metrics for the rpc endpoint +// it has a circular buffer of metrics for the last N finalized and executed blocks. +type provider struct { + log zerolog.Logger + + mu sync.RWMutex + + bufferSize uint + bufferIndex uint + blockHeightAtBufferIndex uint64 + + buffer [][]TransactionExecutionMetrics +} + +func newProvider( + log zerolog.Logger, + bufferSize uint, + blockHeightAtBufferIndex uint64, +) *provider { + if bufferSize == 0 { + panic("buffer size must be greater than zero") + } + + return &provider{ + log: log, + bufferSize: bufferSize, + blockHeightAtBufferIndex: blockHeightAtBufferIndex, + bufferIndex: 0, + buffer: make([][]TransactionExecutionMetrics, bufferSize), + } +} + +// Push buffers the metrics for the given height. +// The call should ensure height are called in strictly increasing order, otherwise +// metrics for the skipped height will not buffered. +func (p *provider) Push( + height uint64, + data []TransactionExecutionMetrics, +) { + p.mu.Lock() + defer p.mu.Unlock() + + if height <= p.blockHeightAtBufferIndex { + p.log.Warn(). + Uint64("height", height). + Uint64("blockHeightAtBufferIndex", p.blockHeightAtBufferIndex). + Msg("received metrics for a block that is older or equal than the most recent block") + return + } + if height > p.blockHeightAtBufferIndex+1 { + p.log.Warn(). + Uint64("height", height). + Uint64("blockHeightAtBufferIndex", p.blockHeightAtBufferIndex). + Msg("received metrics for a block that is not the next block") + + // Fill in the gap with nil + for i := p.blockHeightAtBufferIndex; i < height-1; i++ { + p.pushData(nil) + } + } + + p.pushData(data) +} + +func (p *provider) pushData(data []TransactionExecutionMetrics) { + p.bufferIndex = (p.bufferIndex + 1) % p.bufferSize + p.blockHeightAtBufferIndex++ + p.buffer[p.bufferIndex] = data +} + +func (p *provider) GetTransactionExecutionMetricsAfter(height uint64) (GetTransactionExecutionMetricsAfterResponse, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + data := make(map[uint64][]TransactionExecutionMetrics) + + if height+1 > p.blockHeightAtBufferIndex { + return data, nil + } + + // start index is the lowest block height that is in the buffer + // missing heights are handled below + startHeight := uint64(0) + // assign startHeight with the lowest buffered height + if p.blockHeightAtBufferIndex > uint64(p.bufferSize) { + startHeight = p.blockHeightAtBufferIndex - uint64(p.bufferSize) + } + + // if the starting index is lower than the height we only need to return the data for + // the blocks that are later than the given height + if height+1 > startHeight { + startHeight = height + 1 + } + + for h := startHeight; h <= p.blockHeightAtBufferIndex; h++ { + // 0 <= diff; because of the bufferSize check above + diff := uint(p.blockHeightAtBufferIndex - h) + + // 0 <= diff < bufferSize; because of the bufferSize check above + // we are about to do a modulo operation with p.bufferSize on p.bufferIndex - diff, but diff could + // be larger than p.bufferIndex, which would result in a negative intermediate value. + // To avoid this, we add p.bufferSize to diff, which will guarantee that (p.bufferSize + p.bufferIndex - diff) + // is always positive, but the modulo operation will still return the same index. + intermediateIndex := p.bufferIndex + p.bufferSize - diff + index := intermediateIndex % p.bufferSize + + d := p.buffer[index] + if len(d) == 0 { + continue + } + + data[h] = p.buffer[index] + } + + return data, nil +} diff --git a/engine/execution/computation/metrics/provider_test.go b/engine/execution/computation/metrics/provider_test.go new file mode 100644 index 00000000000..152c9b45326 --- /dev/null +++ b/engine/execution/computation/metrics/provider_test.go @@ -0,0 +1,109 @@ +package metrics + +import ( + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func Test_ProviderGetOnEmpty(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + for i := 0; uint(i) < bufferSize; i++ { + data, err := provider.GetTransactionExecutionMetricsAfter(height - uint64(i)) + require.NoError(t, err) + require.Len(t, data, 0) + } +} + +func Test_ProviderGetOutOfBounds(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + res, err := provider.GetTransactionExecutionMetricsAfter(height + 1) + require.NoError(t, err) + require.Len(t, res, 0) +} + +func Test_ProviderPushSequential(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + for i := 0; uint(i) < bufferSize; i++ { + data := []TransactionExecutionMetrics{ + { + // Execution time is our label + ExecutionTime: time.Duration(i), + }, + } + + provider.Push(height+uint64(i)+1, data) + } + + data, err := provider.GetTransactionExecutionMetricsAfter(height) + require.Nil(t, err) + for i := 0; uint(i) < bufferSize; i++ { + require.Equal(t, time.Duration(uint(i)), data[height+uint64(i)+1][0].ExecutionTime) + } +} + +func Test_ProviderPushOutOfSequence(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + for i := 0; uint(i) < bufferSize; i++ { + data := []TransactionExecutionMetrics{ + { + ExecutionTime: time.Duration(i), + }, + } + + provider.Push(height+uint64(i)+1, data) + } + + newHeight := height + uint64(bufferSize) + + // Push out of sequence + data := []TransactionExecutionMetrics{ + { + ExecutionTime: time.Duration(newHeight + 2), + }, + } + + // no-op + provider.Push(newHeight, data) + + // skip 1 + provider.Push(newHeight+2, data) + + res, err := provider.GetTransactionExecutionMetricsAfter(height) + require.NoError(t, err) + + require.Len(t, res, int(bufferSize)) + + require.Nil(t, res[newHeight+1]) + require.Equal(t, time.Duration(newHeight+2), res[newHeight+2][0].ExecutionTime) +} diff --git a/engine/execution/computation/metrics/transaction_execution_metrics.go b/engine/execution/computation/metrics/transaction_execution_metrics.go new file mode 100644 index 00000000000..1c082049327 --- /dev/null +++ b/engine/execution/computation/metrics/transaction_execution_metrics.go @@ -0,0 +1,168 @@ +package metrics + +import ( + "time" + + "github.com/onflow/flow-go/engine" + + cadenceCommon "github.com/onflow/cadence/runtime/common" + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/engine/execution/state" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/state/protocol" + psEvents "github.com/onflow/flow-go/state/protocol/events" + "github.com/onflow/flow-go/storage" +) + +type TransactionExecutionMetricsProvider interface { + component.Component + protocol.Consumer + + // GetTransactionExecutionMetricsAfter returns the transaction metrics for all blocks higher than the given height + // It returns a map of block height to a list of transaction execution metrics + // Blocks that are out of scope (only a limited number blocks are kept in memory) are not returned + GetTransactionExecutionMetricsAfter(height uint64) (GetTransactionExecutionMetricsAfterResponse, error) + + // Collect the transaction metrics for the given block + // Collect does not block, it returns immediately + Collect( + blockId flow.Identifier, + blockHeight uint64, + t TransactionExecutionMetrics, + ) +} + +// GetTransactionExecutionMetricsAfterResponse is the response type for GetTransactionExecutionMetricsAfter +// It is a map of block height to a list of transaction execution metrics +type GetTransactionExecutionMetricsAfterResponse = map[uint64][]TransactionExecutionMetrics + +type TransactionExecutionMetrics struct { + TransactionID flow.Identifier + ExecutionTime time.Duration + ExecutionEffortWeights map[cadenceCommon.ComputationKind]uint +} + +type metrics struct { + TransactionExecutionMetrics + blockHeight uint64 + blockId flow.Identifier +} + +// transactionExecutionMetricsProvider is responsible for providing the metrics for the rpc endpoint. +// It has a circular buffer of metrics for the last N finalized and executed blocks. +// The metrics are not guaranteed to be available for all blocks. If the node is just starting up or catching up +// to the latest finalized block, some blocks may not have metrics available. +// The metrics are intended to be used for monitoring and analytics purposes. +type transactionExecutionMetricsProvider struct { + // collector is responsible for collecting the metrics + // the collector collects the metrics from the execution during block execution + // on a finalized and executed block, the metrics are moved to the provider, + // all non-finalized metrics for that height are discarded + *collector + + // provider is responsible for providing the metrics for the rpc endpoint + // it has a circular buffer of metrics for the last N finalized and executed blocks. + *provider + + component.Component + // transactionExecutionMetricsProvider needs to consume BlockFinalized events. + psEvents.Noop + + log zerolog.Logger + + executionState state.FinalizedExecutionState + headers storage.Headers + blockFinalizedNotifier engine.Notifier + + latestFinalizedAndExecutedHeight uint64 +} + +var _ TransactionExecutionMetricsProvider = (*transactionExecutionMetricsProvider)(nil) + +func NewTransactionExecutionMetricsProvider( + log zerolog.Logger, + executionState state.FinalizedExecutionState, + headers storage.Headers, + latestFinalizedAndExecutedHeight uint64, + bufferSize uint, +) TransactionExecutionMetricsProvider { + log = log.With().Str("component", "transaction_execution_metrics_provider").Logger() + + collector := newCollector(log, latestFinalizedAndExecutedHeight) + provider := newProvider(log, bufferSize, latestFinalizedAndExecutedHeight) + + p := &transactionExecutionMetricsProvider{ + collector: collector, + provider: provider, + log: log, + executionState: executionState, + headers: headers, + blockFinalizedNotifier: engine.NewNotifier(), + latestFinalizedAndExecutedHeight: latestFinalizedAndExecutedHeight, + } + + cm := component.NewComponentManagerBuilder() + cm.AddWorker(collector.metricsCollectorWorker) + cm.AddWorker(p.blockFinalizedWorker) + + p.Component = cm.Build() + + return p +} + +func (p *transactionExecutionMetricsProvider) BlockFinalized(*flow.Header) { + p.blockFinalizedNotifier.Notify() +} + +// move data from the collector to the provider +func (p *transactionExecutionMetricsProvider) onBlockExecutedAndFinalized(block flow.Identifier, height uint64) { + data := p.collector.Pop(height, block) + p.provider.Push(height, data) +} + +func (p *transactionExecutionMetricsProvider) blockFinalizedWorker( + ctx irrecoverable.SignalerContext, + ready component.ReadyFunc, +) { + ready() + + for { + select { + case <-ctx.Done(): + return + case <-p.blockFinalizedNotifier.Channel(): + p.onExecutedAndFinalized() + } + } +} + +func (p *transactionExecutionMetricsProvider) onExecutedAndFinalized() { + latestFinalizedAndExecutedHeight, err := p.executionState.GetHighestFinalizedExecuted() + + if err != nil { + p.log.Warn().Err(err).Msg("could not get highest finalized executed") + return + } + + // the latest finalized and executed block could be more than one block further than the last one handled + // step through all blocks between the last one handled and the latest finalized and executed + for height := p.latestFinalizedAndExecutedHeight + 1; height <= latestFinalizedAndExecutedHeight; height++ { + blockID, err := p.headers.BlockIDByHeight(height) + if err != nil { + p.log.Warn(). + Err(err). + Uint64("height", height). + Msg("could not get header by height") + return + } + + p.onBlockExecutedAndFinalized(blockID, height) + + if height == latestFinalizedAndExecutedHeight { + p.latestFinalizedAndExecutedHeight = height + } + } +} diff --git a/engine/execution/rpc/engine.go b/engine/execution/rpc/engine.go index 260495f1bf1..9d7a9f87fc4 100644 --- a/engine/execution/rpc/engine.go +++ b/engine/execution/rpc/engine.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net" + "sort" "strings" "unicode/utf8" @@ -26,6 +27,7 @@ import ( "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/rpc/convert" exeEng "github.com/onflow/flow-go/engine/execution" + "github.com/onflow/flow-go/engine/execution/computation/metrics" "github.com/onflow/flow-go/engine/execution/state" fvmerrors "github.com/onflow/flow-go/fvm/errors" "github.com/onflow/flow-go/model/flow" @@ -62,6 +64,7 @@ func New( exeResults storage.ExecutionResults, txResults storage.TransactionResults, commits storage.Commits, + transactionMetrics metrics.TransactionExecutionMetricsProvider, chainID flow.ChainID, signerIndicesDecoder hotstuff.BlockSignerDecoder, apiRatelimits map[string]int, // the api rate limit (max calls per second) for each of the gRPC API e.g. Ping->100, ExecuteScriptAtBlockID->300 @@ -105,6 +108,7 @@ func New( exeResults: exeResults, transactionResults: txResults, commits: commits, + transactionMetrics: transactionMetrics, log: log, maxBlockRange: DefaultMaxBlockRange, }, @@ -166,6 +170,7 @@ type handler struct { transactionResults storage.TransactionResults log zerolog.Logger commits storage.Commits + transactionMetrics metrics.TransactionExecutionMetricsProvider maxBlockRange int } @@ -802,6 +807,58 @@ func (h *handler) blockHeaderResponse(header *flow.Header) (*execution.BlockHead }, nil } +// GetTransactionExecutionMetricsAfter gets the execution metrics for a transaction after a given block. +func (h *handler) GetTransactionExecutionMetricsAfter( + _ context.Context, + req *execution.GetTransactionExecutionMetricsAfterRequest, +) (*execution.GetTransactionExecutionMetricsAfterResponse, error) { + height := req.GetBlockHeight() + + metrics, err := h.transactionMetrics.GetTransactionExecutionMetricsAfter(height) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get metrics after block height %v: %v", height, err) + } + + response := &execution.GetTransactionExecutionMetricsAfterResponse{ + Results: make([]*execution.GetTransactionExecutionMetricsAfterResponse_Result, 0, len(metrics)), + } + + for blockHeight, blockMetrics := range metrics { + blockResponse := &execution.GetTransactionExecutionMetricsAfterResponse_Result{ + BlockHeight: blockHeight, + Transactions: make([]*execution.GetTransactionExecutionMetricsAfterResponse_Transaction, len(blockMetrics)), + } + + for i, transactionMetrics := range blockMetrics { + transactionMetricsResponse := &execution.GetTransactionExecutionMetricsAfterResponse_Transaction{ + TransactionId: transactionMetrics.TransactionID[:], + ExecutionTime: uint64(transactionMetrics.ExecutionTime.Nanoseconds()), + ExecutionEffortWeights: make([]*execution.GetTransactionExecutionMetricsAfterResponse_ExecutionEffortWeight, 0, len(transactionMetrics.ExecutionEffortWeights)), + } + + for kind, weight := range transactionMetrics.ExecutionEffortWeights { + transactionMetricsResponse.ExecutionEffortWeights = append( + transactionMetricsResponse.ExecutionEffortWeights, + &execution.GetTransactionExecutionMetricsAfterResponse_ExecutionEffortWeight{ + Kind: uint64(kind), + Weight: uint64(weight), + }, + ) + } + + blockResponse.Transactions[i] = transactionMetricsResponse + } + response.Results = append(response.Results, blockResponse) + } + + // sort the response by block height in descending order + sort.Slice(response.Results, func(i, j int) bool { + return response.Results[i].BlockHeight > response.Results[j].BlockHeight + }) + + return response, nil +} + // additional check that when there is no event in the block, double check if the execution // result has no events as well, otherwise return an error. // we check the execution result has no event by checking if each chunk's EventCollection is diff --git a/go.mod b/go.mod index e557998ca60..d5a140eda9a 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,7 @@ require ( github.com/onflow/flow-core-contracts/lib/go/contracts v1.3.1 github.com/onflow/flow-core-contracts/lib/go/templates v1.3.1 github.com/onflow/flow-go-sdk v1.0.0-preview.55 - github.com/onflow/flow/protobuf/go/flow v0.4.6 + github.com/onflow/flow/protobuf/go/flow v0.4.7 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pierrec/lz4 v2.6.1+incompatible github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index e3cd8146665..ac8e956948f 100644 --- a/go.sum +++ b/go.sum @@ -2194,8 +2194,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0= github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= -github.com/onflow/flow/protobuf/go/flow v0.4.6 h1:KE/CsRVfyG5lGBtm1aNcjojMciQyS5GfPF3ixOWRfi0= -github.com/onflow/flow/protobuf/go/flow v0.4.6/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.7 h1:iP6DFx4wZ3ETORsyeqzHu7neFT3d1CXF6wdK+AOOjmc= +github.com/onflow/flow/protobuf/go/flow v0.4.7/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I= github.com/onflow/nft-storefront/lib/go/contracts v1.0.0 h1:sxyWLqGm/p4EKT6DUlQESDG1ZNMN9GjPCm1gTq7NGfc= diff --git a/insecure/go.mod b/insecure/go.mod index 0086182c027..8493273c41d 100644 --- a/insecure/go.mod +++ b/insecure/go.mod @@ -211,7 +211,7 @@ require ( github.com/onflow/flow-go-sdk v1.0.0-preview.55 // indirect github.com/onflow/flow-nft/lib/go/contracts v1.2.1 // indirect github.com/onflow/flow-nft/lib/go/templates v1.2.0 // indirect - github.com/onflow/flow/protobuf/go/flow v0.4.6 // indirect + github.com/onflow/flow/protobuf/go/flow v0.4.7 // indirect github.com/onflow/go-ethereum v1.14.7 // indirect github.com/onflow/sdks v0.6.0-preview.1 // indirect github.com/onflow/wal v1.0.2 // indirect diff --git a/insecure/go.sum b/insecure/go.sum index ab6a5936284..1777ec88fd9 100644 --- a/insecure/go.sum +++ b/insecure/go.sum @@ -2180,8 +2180,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0= github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= -github.com/onflow/flow/protobuf/go/flow v0.4.6 h1:KE/CsRVfyG5lGBtm1aNcjojMciQyS5GfPF3ixOWRfi0= -github.com/onflow/flow/protobuf/go/flow v0.4.6/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.7 h1:iP6DFx4wZ3ETORsyeqzHu7neFT3d1CXF6wdK+AOOjmc= +github.com/onflow/flow/protobuf/go/flow v0.4.7/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I= github.com/onflow/sdks v0.5.1-0.20230912225508-b35402f12bba/go.mod h1:F0dj0EyHC55kknLkeD10js4mo14yTdMotnWMslPirrU= diff --git a/integration/go.mod b/integration/go.mod index 8723aa74250..ef58e5c61f6 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -28,7 +28,7 @@ require ( github.com/onflow/flow-go v0.37.7-0.20240826193109-e211841b59f5 github.com/onflow/flow-go-sdk v1.0.0-preview.55 github.com/onflow/flow-go/insecure v0.0.0-00010101000000-000000000000 - github.com/onflow/flow/protobuf/go/flow v0.4.6 + github.com/onflow/flow/protobuf/go/flow v0.4.7 github.com/onflow/go-ethereum v1.14.7 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_model v0.5.0 diff --git a/integration/go.sum b/integration/go.sum index 17fec9e6715..5b6f4c5ccf9 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -2167,8 +2167,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0= github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= -github.com/onflow/flow/protobuf/go/flow v0.4.6 h1:KE/CsRVfyG5lGBtm1aNcjojMciQyS5GfPF3ixOWRfi0= -github.com/onflow/flow/protobuf/go/flow v0.4.6/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.7 h1:iP6DFx4wZ3ETORsyeqzHu7neFT3d1CXF6wdK+AOOjmc= +github.com/onflow/flow/protobuf/go/flow v0.4.7/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I= github.com/onflow/sdks v0.5.1-0.20230912225508-b35402f12bba/go.mod h1:F0dj0EyHC55kknLkeD10js4mo14yTdMotnWMslPirrU= diff --git a/integration/tests/execution/transaction_metrics_test.go b/integration/tests/execution/transaction_metrics_test.go new file mode 100644 index 00000000000..2e0ba03df5e --- /dev/null +++ b/integration/tests/execution/transaction_metrics_test.go @@ -0,0 +1,116 @@ +package execution + +import ( + "bytes" + "context" + "testing" + + "github.com/onflow/flow/protobuf/go/flow/execution" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/onflow/flow-go/integration/testnet" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + sdk "github.com/onflow/flow-go-sdk" + + "github.com/onflow/flow-go/integration/tests/lib" +) + +func TestTransactionMetrics(t *testing.T) { + suite.Run(t, new(TransactionMetricsSuite)) +} + +type TransactionMetricsSuite struct { + Suite +} + +func (s *TransactionMetricsSuite) TestTransactionMetrics() { + accessClient := s.AccessClient() + + // wait for next height finalized (potentially first height), called blockA + currentFinalized := s.BlockState.HighestFinalizedHeight() + blockA := s.BlockState.WaitForHighestFinalizedProgress(s.T(), currentFinalized) + s.T().Logf("got blockA height %v ID %v\n", blockA.Header.Height, blockA.Header.ID()) + + // send transaction + tx, err := accessClient.DeployContract(context.Background(), sdk.Identifier(s.net.Root().ID()), lib.CounterContract) + require.NoError(s.T(), err, "could not deploy counter") + + txres, err := accessClient.WaitForExecuted(context.Background(), tx.ID()) + require.NoError(s.T(), err, "could not wait for tx to be executed") + require.NoError(s.T(), txres.Error) + + client, closeClient := s.getClient() + defer func() { + _ = closeClient() + }() + + result, err := client.GetTransactionExecutionMetricsAfter( + context.Background(), + &execution.GetTransactionExecutionMetricsAfterRequest{ + BlockHeight: 0, + }, + ) + + require.NoError(s.T(), err, "could not get transaction execution metrics") + require.NotNil(s.T(), result.Results) + // there should be at least some results, due to each block having at least 1 transaction + require.Greater(s.T(), len(result.Results), 10) + + latestBlockResult := uint64(0) + for _, result := range result.Results { + if result.BlockHeight > latestBlockResult { + latestBlockResult = result.BlockHeight + } + } + + // send another transaction + tx, err = accessClient.UpdateContract(context.Background(), sdk.Identifier(s.net.Root().ID()), lib.CounterContract) + require.NoError(s.T(), err, "could not deploy counter") + + txres, err = accessClient.WaitForExecuted(context.Background(), tx.ID()) + require.NoError(s.T(), err, "could not wait for tx to be executed") + require.NoError(s.T(), txres.Error) + + result, err = client.GetTransactionExecutionMetricsAfter( + context.Background(), + &execution.GetTransactionExecutionMetricsAfterRequest{ + BlockHeight: latestBlockResult, + }, + ) + + require.NoError(s.T(), err, "could not get transaction execution metrics") + // there could be only 1 block since the last time + require.Greater(s.T(), len(result.Results), 0) + + transactionExists := false + for _, result := range result.Results { + for _, transaction := range result.Transactions { + if bytes.Equal(transaction.TransactionId, tx.ID().Bytes()) { + transactionExists = true + + // check that the transaction metrics are not 0 + require.Greater(s.T(), transaction.ExecutionTime, uint64(0)) + require.Greater(s.T(), len(transaction.ExecutionEffortWeights), 0) + } + } + require.Less(s.T(), latestBlockResult, result.BlockHeight) + + } + require.True(s.T(), transactionExists) +} + +func (s *TransactionMetricsSuite) getClient() (execution.ExecutionAPIClient, func() error) { + + exe1ID := s.net.ContainerByID(s.exe1ID) + addr := exe1ID.Addr(testnet.GRPCPort) + + conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(s.T(), err, "could not create execution client") + + grpcClient := execution.NewExecutionAPIClient(conn) + return grpcClient, conn.Close +} diff --git a/module/metrics/execution.go b/module/metrics/execution.go index 7419ec3014a..37d113061b7 100644 --- a/module/metrics/execution.go +++ b/module/metrics/execution.go @@ -849,6 +849,7 @@ func (ec *ExecutionCollector) ExecutionTransactionExecuted( if stats.Failed { ec.totalFailedTransactionsCounter.Inc() } + } // ExecutionChunkDataPackGenerated reports stats on chunk data pack generation @@ -1081,3 +1082,34 @@ func (ec *ExecutionCollector) ExecutionComputationResultUploaded() { func (ec *ExecutionCollector) ExecutionComputationResultUploadRetried() { ec.computationResultUploadRetriedCount.Inc() } + +type ExecutionCollectorWithTransactionCallback struct { + *ExecutionCollector + TransactionCallback func( + dur time.Duration, + stats module.TransactionExecutionResultStats, + info module.TransactionExecutionResultInfo, + ) +} + +func (ec *ExecutionCollector) WithTransactionCallback( + callback func( + time.Duration, + module.TransactionExecutionResultStats, + module.TransactionExecutionResultInfo, + ), +) *ExecutionCollectorWithTransactionCallback { + return &ExecutionCollectorWithTransactionCallback{ + ExecutionCollector: ec, + TransactionCallback: callback, + } +} + +func (ec *ExecutionCollectorWithTransactionCallback) ExecutionTransactionExecuted( + dur time.Duration, + stats module.TransactionExecutionResultStats, + info module.TransactionExecutionResultInfo, +) { + ec.ExecutionCollector.ExecutionTransactionExecuted(dur, stats, info) + ec.TransactionCallback(dur, stats, info) +}