diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 0ca1cc51008..4c7f5f511e3 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -49,6 +49,7 @@ import ( "github.com/onflow/flow-go/engine/execution/checker" "github.com/onflow/flow-go/engine/execution/computation" "github.com/onflow/flow-go/engine/execution/computation/committer" + txmetrics "github.com/onflow/flow-go/engine/execution/computation/metrics" "github.com/onflow/flow-go/engine/execution/ingestion" "github.com/onflow/flow-go/engine/execution/ingestion/fetcher" "github.com/onflow/flow-go/engine/execution/ingestion/loader" @@ -127,7 +128,7 @@ type ExecutionNode struct { ingestionUnit *engine.Unit - collector module.ExecutionMetrics + collector *metrics.ExecutionCollector executionState state.ExecutionState followerState protocol.FollowerState committee hotstuff.DynamicCommittee @@ -160,6 +161,7 @@ type ExecutionNode struct { executionDataTracker tracker.Storage blobService network.BlobService blobserviceDependable *module.ProxiedReadyDoneAware + metricsProvider txmetrics.TransactionExecutionMetricsProvider } func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() { @@ -228,6 +230,7 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() { Component("block data upload manager", exeNode.LoadBlockUploaderManager). Component("GCP block data uploader", exeNode.LoadGCPBlockDataUploader). Component("S3 block data uploader", exeNode.LoadS3BlockDataUploader). + Component("transaction execution metrics", exeNode.LoadTransactionExecutionMetrics). Component("provider engine", exeNode.LoadProviderEngine). Component("checker engine", exeNode.LoadCheckerEngine). Component("ingestion engine", exeNode.LoadIngestionEngine). @@ -544,10 +547,27 @@ func (exeNode *ExecutionNode) LoadProviderEngine( vmCtx := fvm.NewContext(opts...) + var collector module.ExecutionMetrics + collector = exeNode.collector + if exeNode.exeConf.transactionExecutionMetricsEnabled { + // inject the transaction execution metrics + collector = exeNode.collector.WithTransactionCallback( + func(dur time.Duration, stats module.TransactionExecutionResultStats, info module.TransactionExecutionResultInfo) { + exeNode.metricsProvider.Collect( + info.BlockID, + info.BlockHeight, + txmetrics.TransactionExecutionMetrics{ + TransactionID: info.TransactionID, + ExecutionTime: dur, + ExecutionEffortWeights: stats.ComputationIntensities, + }) + }) + } + ledgerViewCommitter := committer.NewLedgerViewCommitter(exeNode.ledgerStorage, node.Tracer) manager, err := computation.New( node.Logger, - exeNode.collector, + collector, node.Tracer, node.Me, node.State, @@ -1130,6 +1150,24 @@ func (exeNode *ExecutionNode) LoadScriptsEngine(node *NodeConfig) (module.ReadyD return exeNode.scriptsEng, nil } +func (exeNode *ExecutionNode) LoadTransactionExecutionMetrics( + node *NodeConfig, +) (module.ReadyDoneAware, error) { + lastFinalizedHeader := node.LastFinalizedHeader + + metricsProvider := txmetrics.NewTransactionExecutionMetricsProvider( + node.Logger, + exeNode.executionState, + node.Storage.Headers, + lastFinalizedHeader.Height, + exeNode.exeConf.transactionExecutionMetricsBufferSize, + ) + + node.ProtocolEvents.AddConsumer(metricsProvider) + exeNode.metricsProvider = metricsProvider + return metricsProvider, nil +} + func (exeNode *ExecutionNode) LoadConsensusCommittee( node *NodeConfig, ) ( @@ -1331,6 +1369,7 @@ func (exeNode *ExecutionNode) LoadGrpcServer( exeNode.results, exeNode.txResults, node.Storage.Commits, + exeNode.metricsProvider, node.RootChainID, signature.NewBlockSignerDecoder(exeNode.committee), exeNode.exeConf.apiRatelimits, diff --git a/cmd/execution_config.go b/cmd/execution_config.go index 4a65850c789..c8ba7092c32 100644 --- a/cmd/execution_config.go +++ b/cmd/execution_config.go @@ -25,35 +25,37 @@ import ( // ExecutionConfig contains the configs for starting up execution nodes type ExecutionConfig struct { - rpcConf rpc.Config - triedir string - executionDataDir string - registerDir string - mTrieCacheSize uint32 - transactionResultsCacheSize uint - checkpointDistance uint - checkpointsToKeep uint - chunkDataPackDir string - chunkDataPackCacheSize uint - chunkDataPackRequestsCacheSize uint32 - requestInterval time.Duration - extensiveLog bool - pauseExecution bool - chunkDataPackQueryTimeout time.Duration - chunkDataPackDeliveryTimeout time.Duration - enableBlockDataUpload bool - gcpBucketName string - s3BucketName string - apiRatelimits map[string]int - apiBurstlimits map[string]int - executionDataAllowedPeers string - executionDataPrunerHeightRangeTarget uint64 - executionDataPrunerThreshold uint64 - blobstoreRateLimit int - blobstoreBurstLimit int - chunkDataPackRequestWorkers uint - maxGracefulStopDuration time.Duration - importCheckpointWorkerCount int + rpcConf rpc.Config + triedir string + executionDataDir string + registerDir string + mTrieCacheSize uint32 + transactionResultsCacheSize uint + checkpointDistance uint + checkpointsToKeep uint + chunkDataPackDir string + chunkDataPackCacheSize uint + chunkDataPackRequestsCacheSize uint32 + requestInterval time.Duration + extensiveLog bool + pauseExecution bool + chunkDataPackQueryTimeout time.Duration + chunkDataPackDeliveryTimeout time.Duration + enableBlockDataUpload bool + gcpBucketName string + s3BucketName string + apiRatelimits map[string]int + apiBurstlimits map[string]int + executionDataAllowedPeers string + executionDataPrunerHeightRangeTarget uint64 + executionDataPrunerThreshold uint64 + blobstoreRateLimit int + blobstoreBurstLimit int + chunkDataPackRequestWorkers uint + maxGracefulStopDuration time.Duration + importCheckpointWorkerCount int + transactionExecutionMetricsEnabled bool + transactionExecutionMetricsBufferSize uint // evm tracing configuration evmTracingEnabled bool @@ -122,6 +124,8 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) { flags.IntVar(&exeConf.blobstoreBurstLimit, "blobstore-burst-limit", 0, "outgoing burst limit for Execution Data blobstore") flags.DurationVar(&exeConf.maxGracefulStopDuration, "max-graceful-stop-duration", stop.DefaultMaxGracefulStopDuration, "the maximum amount of time stop control will wait for ingestion engine to gracefully shutdown before crashing") flags.IntVar(&exeConf.importCheckpointWorkerCount, "import-checkpoint-worker-count", 10, "number of workers to import checkpoint file during bootstrap") + flags.BoolVar(&exeConf.transactionExecutionMetricsEnabled, "tx-execution-metrics", true, "enable collection of transaction execution metrics") + flags.UintVar(&exeConf.transactionExecutionMetricsBufferSize, "tx-execution-metrics-buffer-size", 200, "buffer size for transaction execution metrics. The buffer size is the number of blocks that are kept in memory by the metrics provider engine") flags.BoolVar(&exeConf.evmTracingEnabled, "evm-tracing-enabled", false, "enable EVM tracing, when set it will generate traces and upload them to the GCP bucket provided by the --evm-traces-gcp-bucket. Warning: this might affect speed of execution") flags.StringVar(&exeConf.evmTracesGCPBucket, "evm-traces-gcp-bucket", "", "define GCP bucket name used for uploading EVM traces, must be used in combination with --evm-tracing-enabled. if left empty the upload step is skipped") diff --git a/engine/access/mock/execution_api_client.go b/engine/access/mock/execution_api_client.go index fd5fc20c718..7aa04d143c7 100644 --- a/engine/access/mock/execution_api_client.go +++ b/engine/access/mock/execution_api_client.go @@ -349,6 +349,43 @@ func (_m *ExecutionAPIClient) GetTransactionErrorMessagesByBlockID(ctx context.C return r0, r1 } +// GetTransactionExecutionMetricsAfter provides a mock function with given fields: ctx, in, opts +func (_m *ExecutionAPIClient) GetTransactionExecutionMetricsAfter(ctx context.Context, in *execution.GetTransactionExecutionMetricsAfterRequest, opts ...grpc.CallOption) (*execution.GetTransactionExecutionMetricsAfterResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for GetTransactionExecutionMetricsAfter") + } + + var r0 *execution.GetTransactionExecutionMetricsAfterResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest, ...grpc.CallOption) (*execution.GetTransactionExecutionMetricsAfterResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest, ...grpc.CallOption) *execution.GetTransactionExecutionMetricsAfterResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*execution.GetTransactionExecutionMetricsAfterResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetTransactionResult provides a mock function with given fields: ctx, in, opts func (_m *ExecutionAPIClient) GetTransactionResult(ctx context.Context, in *execution.GetTransactionResultRequest, opts ...grpc.CallOption) (*execution.GetTransactionResultResponse, error) { _va := make([]interface{}, len(opts)) diff --git a/engine/access/mock/execution_api_server.go b/engine/access/mock/execution_api_server.go index e61517cb617..2c3f1f3b5a7 100644 --- a/engine/access/mock/execution_api_server.go +++ b/engine/access/mock/execution_api_server.go @@ -284,6 +284,36 @@ func (_m *ExecutionAPIServer) GetTransactionErrorMessagesByBlockID(_a0 context.C return r0, r1 } +// GetTransactionExecutionMetricsAfter provides a mock function with given fields: _a0, _a1 +func (_m *ExecutionAPIServer) GetTransactionExecutionMetricsAfter(_a0 context.Context, _a1 *execution.GetTransactionExecutionMetricsAfterRequest) (*execution.GetTransactionExecutionMetricsAfterResponse, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for GetTransactionExecutionMetricsAfter") + } + + var r0 *execution.GetTransactionExecutionMetricsAfterResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest) (*execution.GetTransactionExecutionMetricsAfterResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest) *execution.GetTransactionExecutionMetricsAfterResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*execution.GetTransactionExecutionMetricsAfterResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetTransactionResult provides a mock function with given fields: _a0, _a1 func (_m *ExecutionAPIServer) GetTransactionResult(_a0 context.Context, _a1 *execution.GetTransactionResultRequest) (*execution.GetTransactionResultResponse, error) { ret := _m.Called(_a0, _a1) diff --git a/engine/execution/computation/computer/computer.go b/engine/execution/computation/computer/computer.go index c3e77b9be7f..e863a4d23d1 100644 --- a/engine/execution/computation/computer/computer.go +++ b/engine/execution/computation/computer/computer.go @@ -256,7 +256,6 @@ func (e *blockComputer) queueTransactionRequests( i == len(collection.Transactions)-1) txnIndex += 1 } - } systemCtx := fvm.NewContextFromParent( diff --git a/engine/execution/computation/metrics/collector.go b/engine/execution/computation/metrics/collector.go new file mode 100644 index 00000000000..8f3438d4658 --- /dev/null +++ b/engine/execution/computation/metrics/collector.go @@ -0,0 +1,130 @@ +package metrics + +import ( + "sync" + + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" +) + +type collector struct { + log zerolog.Logger + + collection chan metrics + + mu sync.Mutex + + lowestAvailableHeight uint64 + blocksAtHeight map[uint64]map[flow.Identifier]struct{} + metrics map[flow.Identifier][]TransactionExecutionMetrics +} + +func newCollector( + log zerolog.Logger, + lowestAvailableHeight uint64, +) *collector { + return &collector{ + log: log, + lowestAvailableHeight: lowestAvailableHeight, + + collection: make(chan metrics, 1000), + blocksAtHeight: make(map[uint64]map[flow.Identifier]struct{}), + metrics: make(map[flow.Identifier][]TransactionExecutionMetrics), + } +} + +// Collect should never block because it's called from the execution +func (c *collector) Collect( + blockId flow.Identifier, + blockHeight uint64, + t TransactionExecutionMetrics, +) { + select { + case c.collection <- metrics{ + TransactionExecutionMetrics: t, + blockHeight: blockHeight, + blockId: blockId, + }: + default: + c.log.Warn(). + Uint64("height", blockHeight). + Msg("dropping metrics because the collection channel is full") + } +} + +func (c *collector) metricsCollectorWorker( + ctx irrecoverable.SignalerContext, + ready component.ReadyFunc, +) { + ready() + + for { + select { + case <-ctx.Done(): + return + case m := <-c.collection: + c.collect(m.blockId, m.blockHeight, m.TransactionExecutionMetrics) + } + } +} + +func (c *collector) collect( + blockId flow.Identifier, + blockHeight uint64, + t TransactionExecutionMetrics, +) { + c.mu.Lock() + defer c.mu.Unlock() + + if blockHeight <= c.lowestAvailableHeight { + c.log.Warn(). + Uint64("height", blockHeight). + Uint64("lowestAvailableHeight", c.lowestAvailableHeight). + Msg("received metrics for a block that is older or equal than the most recent block") + return + } + + if _, ok := c.blocksAtHeight[blockHeight]; !ok { + c.blocksAtHeight[blockHeight] = make(map[flow.Identifier]struct{}) + } + c.blocksAtHeight[blockHeight][blockId] = struct{}{} + c.metrics[blockId] = append(c.metrics[blockId], t) +} + +// Pop returns the metrics for the given finalized block at the given height +// and clears all data up to the given height. +func (c *collector) Pop(height uint64, finalizedBlockId flow.Identifier) []TransactionExecutionMetrics { + c.mu.Lock() + defer c.mu.Unlock() + + if height <= c.lowestAvailableHeight { + c.log.Warn(). + Uint64("height", height). + Stringer("finalizedBlockId", finalizedBlockId). + Msg("requested metrics for a finalizedBlockId that is older or equal than the most recent finalizedBlockId") + return nil + } + + // only return metrics for finalized block + metrics := c.metrics[finalizedBlockId] + + c.advanceTo(height) + + return metrics +} + +// advanceTo moves the latest height to the given height +// all data at lower heights will be deleted +func (c *collector) advanceTo(height uint64) { + for c.lowestAvailableHeight < height { + blocks := c.blocksAtHeight[c.lowestAvailableHeight] + for block := range blocks { + delete(c.metrics, block) + } + delete(c.blocksAtHeight, c.lowestAvailableHeight) + c.lowestAvailableHeight++ + } +} diff --git a/engine/execution/computation/metrics/collector_test.go b/engine/execution/computation/metrics/collector_test.go new file mode 100644 index 00000000000..14882c5f1c0 --- /dev/null +++ b/engine/execution/computation/metrics/collector_test.go @@ -0,0 +1,98 @@ +package metrics + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/irrecoverable" +) + +func Test_CollectorPopOnEmpty(t *testing.T) { + t.Parallel() + + log := zerolog.New(zerolog.NewTestWriter(t)) + latestHeight := uint64(100) + + collector := newCollector(log, latestHeight) + + data := collector.Pop(latestHeight, flow.ZeroID) + require.Nil(t, data) +} + +func Test_CollectorCollection(t *testing.T) { + log := zerolog.New(zerolog.NewTestWriter(t)) + startHeight := uint64(100) + + collector := newCollector(log, startHeight) + + ctx := context.Background() + go func() { + ictx := irrecoverable.NewMockSignalerContext(t, ctx) + collector.metricsCollectorWorker(ictx, func() {}) + }() + + wg := sync.WaitGroup{} + + wg.Add(16 * 16 * 16) + for height := 0; height < 16; height++ { + // for each height we add multiple blocks. Only one block will be popped per height + for block := 0; block < 16; block++ { + // for each block we add multiple transactions + for transaction := 0; transaction < 16; transaction++ { + go func(h, b, t int) { + defer wg.Done() + + block := flow.Identifier{} + block[0] = byte(h) + block[1] = byte(b) + + collector.Collect( + block, + startHeight+1+uint64(h), + TransactionExecutionMetrics{ + ExecutionTime: time.Duration(b + t), + }, + ) + }(height, block, transaction) + } + // wait a bit for the collector to process the data + <-time.After(1 * time.Millisecond) + } + } + + wg.Wait() + // wait a bit for the collector to process the data + <-time.After(10 * time.Millisecond) + + // there should be no data at the start height + data := collector.Pop(startHeight, flow.ZeroID) + require.Nil(t, data) + + for height := 0; height < 16; height++ { + block := flow.Identifier{} + block[0] = byte(height) + // always pop the first block each height + block[1] = byte(0) + + data := collector.Pop(startHeight+1+uint64(height), block) + + require.Len(t, data, 16) + } + + block := flow.Identifier{} + block[0] = byte(15) + block[1] = byte(1) + // height 16 was already popped so there should be no more data for any blocks + data = collector.Pop(startHeight+16, block) + require.Nil(t, data) + + // there should be no data past the last collected height + data = collector.Pop(startHeight+17, flow.ZeroID) + require.Nil(t, data) +} diff --git a/engine/execution/computation/metrics/provider.go b/engine/execution/computation/metrics/provider.go new file mode 100644 index 00000000000..c23a426141d --- /dev/null +++ b/engine/execution/computation/metrics/provider.go @@ -0,0 +1,124 @@ +package metrics + +import ( + "sync" + + "github.com/rs/zerolog" +) + +// provider is responsible for providing the metrics for the rpc endpoint +// it has a circular buffer of metrics for the last N finalized and executed blocks. +type provider struct { + log zerolog.Logger + + mu sync.RWMutex + + bufferSize uint + bufferIndex uint + blockHeightAtBufferIndex uint64 + + buffer [][]TransactionExecutionMetrics +} + +func newProvider( + log zerolog.Logger, + bufferSize uint, + blockHeightAtBufferIndex uint64, +) *provider { + if bufferSize == 0 { + panic("buffer size must be greater than zero") + } + + return &provider{ + log: log, + bufferSize: bufferSize, + blockHeightAtBufferIndex: blockHeightAtBufferIndex, + bufferIndex: 0, + buffer: make([][]TransactionExecutionMetrics, bufferSize), + } +} + +// Push buffers the metrics for the given height. +// The call should ensure height are called in strictly increasing order, otherwise +// metrics for the skipped height will not buffered. +func (p *provider) Push( + height uint64, + data []TransactionExecutionMetrics, +) { + p.mu.Lock() + defer p.mu.Unlock() + + if height <= p.blockHeightAtBufferIndex { + p.log.Warn(). + Uint64("height", height). + Uint64("blockHeightAtBufferIndex", p.blockHeightAtBufferIndex). + Msg("received metrics for a block that is older or equal than the most recent block") + return + } + if height > p.blockHeightAtBufferIndex+1 { + p.log.Warn(). + Uint64("height", height). + Uint64("blockHeightAtBufferIndex", p.blockHeightAtBufferIndex). + Msg("received metrics for a block that is not the next block") + + // Fill in the gap with nil + for i := p.blockHeightAtBufferIndex; i < height-1; i++ { + p.pushData(nil) + } + } + + p.pushData(data) +} + +func (p *provider) pushData(data []TransactionExecutionMetrics) { + p.bufferIndex = (p.bufferIndex + 1) % p.bufferSize + p.blockHeightAtBufferIndex++ + p.buffer[p.bufferIndex] = data +} + +func (p *provider) GetTransactionExecutionMetricsAfter(height uint64) (GetTransactionExecutionMetricsAfterResponse, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + data := make(map[uint64][]TransactionExecutionMetrics) + + if height+1 > p.blockHeightAtBufferIndex { + return data, nil + } + + // start index is the lowest block height that is in the buffer + // missing heights are handled below + startHeight := uint64(0) + // assign startHeight with the lowest buffered height + if p.blockHeightAtBufferIndex > uint64(p.bufferSize) { + startHeight = p.blockHeightAtBufferIndex - uint64(p.bufferSize) + } + + // if the starting index is lower than the height we only need to return the data for + // the blocks that are later than the given height + if height+1 > startHeight { + startHeight = height + 1 + } + + for h := startHeight; h <= p.blockHeightAtBufferIndex; h++ { + // 0 <= diff; because of the bufferSize check above + diff := uint(p.blockHeightAtBufferIndex - h) + + // 0 <= diff < bufferSize; because of the bufferSize check above + // we are about to do a modulo operation with p.bufferSize on p.bufferIndex - diff, but diff could + // be larger than p.bufferIndex, which would result in a negative intermediate value. + // To avoid this, we add p.bufferSize to diff, which will guarantee that (p.bufferSize + p.bufferIndex - diff) + // is always positive, but the modulo operation will still return the same index. + intermediateIndex := p.bufferIndex + p.bufferSize - diff + index := intermediateIndex % p.bufferSize + + d := p.buffer[index] + if len(d) == 0 { + continue + } + + data[h] = p.buffer[index] + } + + return data, nil +} diff --git a/engine/execution/computation/metrics/provider_test.go b/engine/execution/computation/metrics/provider_test.go new file mode 100644 index 00000000000..152c9b45326 --- /dev/null +++ b/engine/execution/computation/metrics/provider_test.go @@ -0,0 +1,109 @@ +package metrics + +import ( + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func Test_ProviderGetOnEmpty(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + for i := 0; uint(i) < bufferSize; i++ { + data, err := provider.GetTransactionExecutionMetricsAfter(height - uint64(i)) + require.NoError(t, err) + require.Len(t, data, 0) + } +} + +func Test_ProviderGetOutOfBounds(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + res, err := provider.GetTransactionExecutionMetricsAfter(height + 1) + require.NoError(t, err) + require.Len(t, res, 0) +} + +func Test_ProviderPushSequential(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + for i := 0; uint(i) < bufferSize; i++ { + data := []TransactionExecutionMetrics{ + { + // Execution time is our label + ExecutionTime: time.Duration(i), + }, + } + + provider.Push(height+uint64(i)+1, data) + } + + data, err := provider.GetTransactionExecutionMetricsAfter(height) + require.Nil(t, err) + for i := 0; uint(i) < bufferSize; i++ { + require.Equal(t, time.Duration(uint(i)), data[height+uint64(i)+1][0].ExecutionTime) + } +} + +func Test_ProviderPushOutOfSequence(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + for i := 0; uint(i) < bufferSize; i++ { + data := []TransactionExecutionMetrics{ + { + ExecutionTime: time.Duration(i), + }, + } + + provider.Push(height+uint64(i)+1, data) + } + + newHeight := height + uint64(bufferSize) + + // Push out of sequence + data := []TransactionExecutionMetrics{ + { + ExecutionTime: time.Duration(newHeight + 2), + }, + } + + // no-op + provider.Push(newHeight, data) + + // skip 1 + provider.Push(newHeight+2, data) + + res, err := provider.GetTransactionExecutionMetricsAfter(height) + require.NoError(t, err) + + require.Len(t, res, int(bufferSize)) + + require.Nil(t, res[newHeight+1]) + require.Equal(t, time.Duration(newHeight+2), res[newHeight+2][0].ExecutionTime) +} diff --git a/engine/execution/computation/metrics/transaction_execution_metrics.go b/engine/execution/computation/metrics/transaction_execution_metrics.go new file mode 100644 index 00000000000..1c082049327 --- /dev/null +++ b/engine/execution/computation/metrics/transaction_execution_metrics.go @@ -0,0 +1,168 @@ +package metrics + +import ( + "time" + + "github.com/onflow/flow-go/engine" + + cadenceCommon "github.com/onflow/cadence/runtime/common" + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/engine/execution/state" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/state/protocol" + psEvents "github.com/onflow/flow-go/state/protocol/events" + "github.com/onflow/flow-go/storage" +) + +type TransactionExecutionMetricsProvider interface { + component.Component + protocol.Consumer + + // GetTransactionExecutionMetricsAfter returns the transaction metrics for all blocks higher than the given height + // It returns a map of block height to a list of transaction execution metrics + // Blocks that are out of scope (only a limited number blocks are kept in memory) are not returned + GetTransactionExecutionMetricsAfter(height uint64) (GetTransactionExecutionMetricsAfterResponse, error) + + // Collect the transaction metrics for the given block + // Collect does not block, it returns immediately + Collect( + blockId flow.Identifier, + blockHeight uint64, + t TransactionExecutionMetrics, + ) +} + +// GetTransactionExecutionMetricsAfterResponse is the response type for GetTransactionExecutionMetricsAfter +// It is a map of block height to a list of transaction execution metrics +type GetTransactionExecutionMetricsAfterResponse = map[uint64][]TransactionExecutionMetrics + +type TransactionExecutionMetrics struct { + TransactionID flow.Identifier + ExecutionTime time.Duration + ExecutionEffortWeights map[cadenceCommon.ComputationKind]uint +} + +type metrics struct { + TransactionExecutionMetrics + blockHeight uint64 + blockId flow.Identifier +} + +// transactionExecutionMetricsProvider is responsible for providing the metrics for the rpc endpoint. +// It has a circular buffer of metrics for the last N finalized and executed blocks. +// The metrics are not guaranteed to be available for all blocks. If the node is just starting up or catching up +// to the latest finalized block, some blocks may not have metrics available. +// The metrics are intended to be used for monitoring and analytics purposes. +type transactionExecutionMetricsProvider struct { + // collector is responsible for collecting the metrics + // the collector collects the metrics from the execution during block execution + // on a finalized and executed block, the metrics are moved to the provider, + // all non-finalized metrics for that height are discarded + *collector + + // provider is responsible for providing the metrics for the rpc endpoint + // it has a circular buffer of metrics for the last N finalized and executed blocks. + *provider + + component.Component + // transactionExecutionMetricsProvider needs to consume BlockFinalized events. + psEvents.Noop + + log zerolog.Logger + + executionState state.FinalizedExecutionState + headers storage.Headers + blockFinalizedNotifier engine.Notifier + + latestFinalizedAndExecutedHeight uint64 +} + +var _ TransactionExecutionMetricsProvider = (*transactionExecutionMetricsProvider)(nil) + +func NewTransactionExecutionMetricsProvider( + log zerolog.Logger, + executionState state.FinalizedExecutionState, + headers storage.Headers, + latestFinalizedAndExecutedHeight uint64, + bufferSize uint, +) TransactionExecutionMetricsProvider { + log = log.With().Str("component", "transaction_execution_metrics_provider").Logger() + + collector := newCollector(log, latestFinalizedAndExecutedHeight) + provider := newProvider(log, bufferSize, latestFinalizedAndExecutedHeight) + + p := &transactionExecutionMetricsProvider{ + collector: collector, + provider: provider, + log: log, + executionState: executionState, + headers: headers, + blockFinalizedNotifier: engine.NewNotifier(), + latestFinalizedAndExecutedHeight: latestFinalizedAndExecutedHeight, + } + + cm := component.NewComponentManagerBuilder() + cm.AddWorker(collector.metricsCollectorWorker) + cm.AddWorker(p.blockFinalizedWorker) + + p.Component = cm.Build() + + return p +} + +func (p *transactionExecutionMetricsProvider) BlockFinalized(*flow.Header) { + p.blockFinalizedNotifier.Notify() +} + +// move data from the collector to the provider +func (p *transactionExecutionMetricsProvider) onBlockExecutedAndFinalized(block flow.Identifier, height uint64) { + data := p.collector.Pop(height, block) + p.provider.Push(height, data) +} + +func (p *transactionExecutionMetricsProvider) blockFinalizedWorker( + ctx irrecoverable.SignalerContext, + ready component.ReadyFunc, +) { + ready() + + for { + select { + case <-ctx.Done(): + return + case <-p.blockFinalizedNotifier.Channel(): + p.onExecutedAndFinalized() + } + } +} + +func (p *transactionExecutionMetricsProvider) onExecutedAndFinalized() { + latestFinalizedAndExecutedHeight, err := p.executionState.GetHighestFinalizedExecuted() + + if err != nil { + p.log.Warn().Err(err).Msg("could not get highest finalized executed") + return + } + + // the latest finalized and executed block could be more than one block further than the last one handled + // step through all blocks between the last one handled and the latest finalized and executed + for height := p.latestFinalizedAndExecutedHeight + 1; height <= latestFinalizedAndExecutedHeight; height++ { + blockID, err := p.headers.BlockIDByHeight(height) + if err != nil { + p.log.Warn(). + Err(err). + Uint64("height", height). + Msg("could not get header by height") + return + } + + p.onBlockExecutedAndFinalized(blockID, height) + + if height == latestFinalizedAndExecutedHeight { + p.latestFinalizedAndExecutedHeight = height + } + } +} diff --git a/engine/execution/rpc/engine.go b/engine/execution/rpc/engine.go index 260495f1bf1..9d7a9f87fc4 100644 --- a/engine/execution/rpc/engine.go +++ b/engine/execution/rpc/engine.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net" + "sort" "strings" "unicode/utf8" @@ -26,6 +27,7 @@ import ( "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/rpc/convert" exeEng "github.com/onflow/flow-go/engine/execution" + "github.com/onflow/flow-go/engine/execution/computation/metrics" "github.com/onflow/flow-go/engine/execution/state" fvmerrors "github.com/onflow/flow-go/fvm/errors" "github.com/onflow/flow-go/model/flow" @@ -62,6 +64,7 @@ func New( exeResults storage.ExecutionResults, txResults storage.TransactionResults, commits storage.Commits, + transactionMetrics metrics.TransactionExecutionMetricsProvider, chainID flow.ChainID, signerIndicesDecoder hotstuff.BlockSignerDecoder, apiRatelimits map[string]int, // the api rate limit (max calls per second) for each of the gRPC API e.g. Ping->100, ExecuteScriptAtBlockID->300 @@ -105,6 +108,7 @@ func New( exeResults: exeResults, transactionResults: txResults, commits: commits, + transactionMetrics: transactionMetrics, log: log, maxBlockRange: DefaultMaxBlockRange, }, @@ -166,6 +170,7 @@ type handler struct { transactionResults storage.TransactionResults log zerolog.Logger commits storage.Commits + transactionMetrics metrics.TransactionExecutionMetricsProvider maxBlockRange int } @@ -802,6 +807,58 @@ func (h *handler) blockHeaderResponse(header *flow.Header) (*execution.BlockHead }, nil } +// GetTransactionExecutionMetricsAfter gets the execution metrics for a transaction after a given block. +func (h *handler) GetTransactionExecutionMetricsAfter( + _ context.Context, + req *execution.GetTransactionExecutionMetricsAfterRequest, +) (*execution.GetTransactionExecutionMetricsAfterResponse, error) { + height := req.GetBlockHeight() + + metrics, err := h.transactionMetrics.GetTransactionExecutionMetricsAfter(height) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get metrics after block height %v: %v", height, err) + } + + response := &execution.GetTransactionExecutionMetricsAfterResponse{ + Results: make([]*execution.GetTransactionExecutionMetricsAfterResponse_Result, 0, len(metrics)), + } + + for blockHeight, blockMetrics := range metrics { + blockResponse := &execution.GetTransactionExecutionMetricsAfterResponse_Result{ + BlockHeight: blockHeight, + Transactions: make([]*execution.GetTransactionExecutionMetricsAfterResponse_Transaction, len(blockMetrics)), + } + + for i, transactionMetrics := range blockMetrics { + transactionMetricsResponse := &execution.GetTransactionExecutionMetricsAfterResponse_Transaction{ + TransactionId: transactionMetrics.TransactionID[:], + ExecutionTime: uint64(transactionMetrics.ExecutionTime.Nanoseconds()), + ExecutionEffortWeights: make([]*execution.GetTransactionExecutionMetricsAfterResponse_ExecutionEffortWeight, 0, len(transactionMetrics.ExecutionEffortWeights)), + } + + for kind, weight := range transactionMetrics.ExecutionEffortWeights { + transactionMetricsResponse.ExecutionEffortWeights = append( + transactionMetricsResponse.ExecutionEffortWeights, + &execution.GetTransactionExecutionMetricsAfterResponse_ExecutionEffortWeight{ + Kind: uint64(kind), + Weight: uint64(weight), + }, + ) + } + + blockResponse.Transactions[i] = transactionMetricsResponse + } + response.Results = append(response.Results, blockResponse) + } + + // sort the response by block height in descending order + sort.Slice(response.Results, func(i, j int) bool { + return response.Results[i].BlockHeight > response.Results[j].BlockHeight + }) + + return response, nil +} + // additional check that when there is no event in the block, double check if the execution // result has no events as well, otherwise return an error. // we check the execution result has no event by checking if each chunk's EventCollection is diff --git a/go.mod b/go.mod index e557998ca60..d5a140eda9a 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,7 @@ require ( github.com/onflow/flow-core-contracts/lib/go/contracts v1.3.1 github.com/onflow/flow-core-contracts/lib/go/templates v1.3.1 github.com/onflow/flow-go-sdk v1.0.0-preview.55 - github.com/onflow/flow/protobuf/go/flow v0.4.6 + github.com/onflow/flow/protobuf/go/flow v0.4.7 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pierrec/lz4 v2.6.1+incompatible github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index e3cd8146665..ac8e956948f 100644 --- a/go.sum +++ b/go.sum @@ -2194,8 +2194,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0= github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= -github.com/onflow/flow/protobuf/go/flow v0.4.6 h1:KE/CsRVfyG5lGBtm1aNcjojMciQyS5GfPF3ixOWRfi0= -github.com/onflow/flow/protobuf/go/flow v0.4.6/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.7 h1:iP6DFx4wZ3ETORsyeqzHu7neFT3d1CXF6wdK+AOOjmc= +github.com/onflow/flow/protobuf/go/flow v0.4.7/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I= github.com/onflow/nft-storefront/lib/go/contracts v1.0.0 h1:sxyWLqGm/p4EKT6DUlQESDG1ZNMN9GjPCm1gTq7NGfc= diff --git a/insecure/go.mod b/insecure/go.mod index 0086182c027..8493273c41d 100644 --- a/insecure/go.mod +++ b/insecure/go.mod @@ -211,7 +211,7 @@ require ( github.com/onflow/flow-go-sdk v1.0.0-preview.55 // indirect github.com/onflow/flow-nft/lib/go/contracts v1.2.1 // indirect github.com/onflow/flow-nft/lib/go/templates v1.2.0 // indirect - github.com/onflow/flow/protobuf/go/flow v0.4.6 // indirect + github.com/onflow/flow/protobuf/go/flow v0.4.7 // indirect github.com/onflow/go-ethereum v1.14.7 // indirect github.com/onflow/sdks v0.6.0-preview.1 // indirect github.com/onflow/wal v1.0.2 // indirect diff --git a/insecure/go.sum b/insecure/go.sum index ab6a5936284..1777ec88fd9 100644 --- a/insecure/go.sum +++ b/insecure/go.sum @@ -2180,8 +2180,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0= github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= -github.com/onflow/flow/protobuf/go/flow v0.4.6 h1:KE/CsRVfyG5lGBtm1aNcjojMciQyS5GfPF3ixOWRfi0= -github.com/onflow/flow/protobuf/go/flow v0.4.6/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.7 h1:iP6DFx4wZ3ETORsyeqzHu7neFT3d1CXF6wdK+AOOjmc= +github.com/onflow/flow/protobuf/go/flow v0.4.7/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I= github.com/onflow/sdks v0.5.1-0.20230912225508-b35402f12bba/go.mod h1:F0dj0EyHC55kknLkeD10js4mo14yTdMotnWMslPirrU= diff --git a/integration/go.mod b/integration/go.mod index 8723aa74250..ef58e5c61f6 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -28,7 +28,7 @@ require ( github.com/onflow/flow-go v0.37.7-0.20240826193109-e211841b59f5 github.com/onflow/flow-go-sdk v1.0.0-preview.55 github.com/onflow/flow-go/insecure v0.0.0-00010101000000-000000000000 - github.com/onflow/flow/protobuf/go/flow v0.4.6 + github.com/onflow/flow/protobuf/go/flow v0.4.7 github.com/onflow/go-ethereum v1.14.7 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_model v0.5.0 diff --git a/integration/go.sum b/integration/go.sum index 17fec9e6715..5b6f4c5ccf9 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -2167,8 +2167,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0= github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= -github.com/onflow/flow/protobuf/go/flow v0.4.6 h1:KE/CsRVfyG5lGBtm1aNcjojMciQyS5GfPF3ixOWRfi0= -github.com/onflow/flow/protobuf/go/flow v0.4.6/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.7 h1:iP6DFx4wZ3ETORsyeqzHu7neFT3d1CXF6wdK+AOOjmc= +github.com/onflow/flow/protobuf/go/flow v0.4.7/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I= github.com/onflow/sdks v0.5.1-0.20230912225508-b35402f12bba/go.mod h1:F0dj0EyHC55kknLkeD10js4mo14yTdMotnWMslPirrU= diff --git a/integration/tests/execution/transaction_metrics_test.go b/integration/tests/execution/transaction_metrics_test.go new file mode 100644 index 00000000000..2e0ba03df5e --- /dev/null +++ b/integration/tests/execution/transaction_metrics_test.go @@ -0,0 +1,116 @@ +package execution + +import ( + "bytes" + "context" + "testing" + + "github.com/onflow/flow/protobuf/go/flow/execution" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/onflow/flow-go/integration/testnet" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + sdk "github.com/onflow/flow-go-sdk" + + "github.com/onflow/flow-go/integration/tests/lib" +) + +func TestTransactionMetrics(t *testing.T) { + suite.Run(t, new(TransactionMetricsSuite)) +} + +type TransactionMetricsSuite struct { + Suite +} + +func (s *TransactionMetricsSuite) TestTransactionMetrics() { + accessClient := s.AccessClient() + + // wait for next height finalized (potentially first height), called blockA + currentFinalized := s.BlockState.HighestFinalizedHeight() + blockA := s.BlockState.WaitForHighestFinalizedProgress(s.T(), currentFinalized) + s.T().Logf("got blockA height %v ID %v\n", blockA.Header.Height, blockA.Header.ID()) + + // send transaction + tx, err := accessClient.DeployContract(context.Background(), sdk.Identifier(s.net.Root().ID()), lib.CounterContract) + require.NoError(s.T(), err, "could not deploy counter") + + txres, err := accessClient.WaitForExecuted(context.Background(), tx.ID()) + require.NoError(s.T(), err, "could not wait for tx to be executed") + require.NoError(s.T(), txres.Error) + + client, closeClient := s.getClient() + defer func() { + _ = closeClient() + }() + + result, err := client.GetTransactionExecutionMetricsAfter( + context.Background(), + &execution.GetTransactionExecutionMetricsAfterRequest{ + BlockHeight: 0, + }, + ) + + require.NoError(s.T(), err, "could not get transaction execution metrics") + require.NotNil(s.T(), result.Results) + // there should be at least some results, due to each block having at least 1 transaction + require.Greater(s.T(), len(result.Results), 10) + + latestBlockResult := uint64(0) + for _, result := range result.Results { + if result.BlockHeight > latestBlockResult { + latestBlockResult = result.BlockHeight + } + } + + // send another transaction + tx, err = accessClient.UpdateContract(context.Background(), sdk.Identifier(s.net.Root().ID()), lib.CounterContract) + require.NoError(s.T(), err, "could not deploy counter") + + txres, err = accessClient.WaitForExecuted(context.Background(), tx.ID()) + require.NoError(s.T(), err, "could not wait for tx to be executed") + require.NoError(s.T(), txres.Error) + + result, err = client.GetTransactionExecutionMetricsAfter( + context.Background(), + &execution.GetTransactionExecutionMetricsAfterRequest{ + BlockHeight: latestBlockResult, + }, + ) + + require.NoError(s.T(), err, "could not get transaction execution metrics") + // there could be only 1 block since the last time + require.Greater(s.T(), len(result.Results), 0) + + transactionExists := false + for _, result := range result.Results { + for _, transaction := range result.Transactions { + if bytes.Equal(transaction.TransactionId, tx.ID().Bytes()) { + transactionExists = true + + // check that the transaction metrics are not 0 + require.Greater(s.T(), transaction.ExecutionTime, uint64(0)) + require.Greater(s.T(), len(transaction.ExecutionEffortWeights), 0) + } + } + require.Less(s.T(), latestBlockResult, result.BlockHeight) + + } + require.True(s.T(), transactionExists) +} + +func (s *TransactionMetricsSuite) getClient() (execution.ExecutionAPIClient, func() error) { + + exe1ID := s.net.ContainerByID(s.exe1ID) + addr := exe1ID.Addr(testnet.GRPCPort) + + conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(s.T(), err, "could not create execution client") + + grpcClient := execution.NewExecutionAPIClient(conn) + return grpcClient, conn.Close +} diff --git a/module/metrics/execution.go b/module/metrics/execution.go index 7419ec3014a..37d113061b7 100644 --- a/module/metrics/execution.go +++ b/module/metrics/execution.go @@ -849,6 +849,7 @@ func (ec *ExecutionCollector) ExecutionTransactionExecuted( if stats.Failed { ec.totalFailedTransactionsCounter.Inc() } + } // ExecutionChunkDataPackGenerated reports stats on chunk data pack generation @@ -1081,3 +1082,34 @@ func (ec *ExecutionCollector) ExecutionComputationResultUploaded() { func (ec *ExecutionCollector) ExecutionComputationResultUploadRetried() { ec.computationResultUploadRetriedCount.Inc() } + +type ExecutionCollectorWithTransactionCallback struct { + *ExecutionCollector + TransactionCallback func( + dur time.Duration, + stats module.TransactionExecutionResultStats, + info module.TransactionExecutionResultInfo, + ) +} + +func (ec *ExecutionCollector) WithTransactionCallback( + callback func( + time.Duration, + module.TransactionExecutionResultStats, + module.TransactionExecutionResultInfo, + ), +) *ExecutionCollectorWithTransactionCallback { + return &ExecutionCollectorWithTransactionCallback{ + ExecutionCollector: ec, + TransactionCallback: callback, + } +} + +func (ec *ExecutionCollectorWithTransactionCallback) ExecutionTransactionExecuted( + dur time.Duration, + stats module.TransactionExecutionResultStats, + info module.TransactionExecutionResultInfo, +) { + ec.ExecutionCollector.ExecutionTransactionExecuted(dur, stats, info) + ec.TransactionCallback(dur, stats, info) +}