From eab95a6a985815fd69c0cd37794b7bc776a78873 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Wed, 3 Jul 2024 17:32:39 +0200 Subject: [PATCH 01/18] Exposed transaction metrics to grpc endpoint --- cmd/execution_builder.go | 40 ++++- .../computation/computer/computer.go | 1 - .../computation/metrics/collector.go | 130 ++++++++++++++ .../computation/metrics/collector_test.go | 82 +++++++++ .../execution/computation/metrics/provider.go | 120 +++++++++++++ .../computation/metrics/provider_test.go | 108 +++++++++++ .../metrics/transaction_execution_metrics.go | 170 ++++++++++++++++++ module/metrics/execution.go | 42 +++++ 8 files changed, 690 insertions(+), 3 deletions(-) create mode 100644 engine/execution/computation/metrics/collector.go create mode 100644 engine/execution/computation/metrics/collector_test.go create mode 100644 engine/execution/computation/metrics/provider.go create mode 100644 engine/execution/computation/metrics/provider_test.go create mode 100644 engine/execution/computation/metrics/transaction_execution_metrics.go diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 6b276c4ac61..d03334b6ada 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -49,6 +49,7 @@ import ( "github.com/onflow/flow-go/engine/execution/checker" "github.com/onflow/flow-go/engine/execution/computation" "github.com/onflow/flow-go/engine/execution/computation/committer" + txmetrics "github.com/onflow/flow-go/engine/execution/computation/metrics" "github.com/onflow/flow-go/engine/execution/ingestion" "github.com/onflow/flow-go/engine/execution/ingestion/fetcher" "github.com/onflow/flow-go/engine/execution/ingestion/loader" @@ -127,7 +128,7 @@ type ExecutionNode struct { ingestionUnit *engine.Unit - collector module.ExecutionMetrics + collector *metrics.ExecutionCollector executionState state.ExecutionState followerState protocol.FollowerState committee hotstuff.DynamicCommittee @@ -160,6 +161,7 @@ type ExecutionNode struct { executionDataTracker tracker.Storage blobService network.BlobService blobserviceDependable *module.ProxiedReadyDoneAware + metricsProvider txmetrics.TransactionExecutionMetricsProvider } func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() { @@ -228,6 +230,7 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() { Component("block data upload manager", exeNode.LoadBlockUploaderManager). Component("GCP block data uploader", exeNode.LoadGCPBlockDataUploader). Component("S3 block data uploader", exeNode.LoadS3BlockDataUploader). + Component("transaction execution metrics", exeNode.LoadTransactionExecutionMetrics). Component("provider engine", exeNode.LoadProviderEngine). Component("checker engine", exeNode.LoadCheckerEngine). Component("ingestion engine", exeNode.LoadIngestionEngine). @@ -544,10 +547,23 @@ func (exeNode *ExecutionNode) LoadProviderEngine( vmCtx := fvm.NewContext(opts...) + collector := exeNode.collector.WithTransactionCallback( + func(dur time.Duration, stats module.TransactionExecutionResultStats, info module.TransactionExecutionResultInfo) { + exeNode.metricsProvider.Collect( + info.BlockID, + info.BlockHeight, + txmetrics.TransactionExecutionMetrics{ + TransactionID: info.TransactionID, + ExecutionTime: dur, + ExecutionEffortWeights: stats.ComputationIntensities, + }) + }) + ledgerViewCommitter := committer.NewLedgerViewCommitter(exeNode.ledgerStorage, node.Tracer) manager, err := computation.New( node.Logger, - exeNode.collector, + // todo inject metrics for computation intensities + collector, node.Tracer, node.Me, node.State, @@ -1127,6 +1143,26 @@ func (exeNode *ExecutionNode) LoadScriptsEngine(node *NodeConfig) (module.ReadyD return exeNode.scriptsEng, nil } +func (exeNode *ExecutionNode) LoadTransactionExecutionMetrics( + node *NodeConfig, +) (module.ReadyDoneAware, error) { + latestFinalizedBlock, err := node.State.Final().Head() + if err != nil { + return nil, fmt.Errorf("could not get latest finalized block: %w", err) + } + + metricsProvider := txmetrics.NewTransactionExecutionMetricsProvider( + node.Logger, + exeNode.executionState, + node.Storage.Headers, + latestFinalizedBlock, + 1000, + ) + node.ProtocolEvents.AddConsumer(metricsProvider) + exeNode.metricsProvider = metricsProvider + return metricsProvider, nil +} + func (exeNode *ExecutionNode) LoadConsensusCommittee( node *NodeConfig, ) ( diff --git a/engine/execution/computation/computer/computer.go b/engine/execution/computation/computer/computer.go index 0d73645f6a7..a6bcc074014 100644 --- a/engine/execution/computation/computer/computer.go +++ b/engine/execution/computation/computer/computer.go @@ -255,7 +255,6 @@ func (e *blockComputer) queueTransactionRequests( i == len(collection.Transactions)-1) txnIndex += 1 } - } systemCtx := fvm.NewContextFromParent( diff --git a/engine/execution/computation/metrics/collector.go b/engine/execution/computation/metrics/collector.go new file mode 100644 index 00000000000..b6a022bde5d --- /dev/null +++ b/engine/execution/computation/metrics/collector.go @@ -0,0 +1,130 @@ +package metrics + +import ( + "sync" + + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" +) + +type collector struct { + log zerolog.Logger + + collection chan metrics + + mu *sync.Mutex + + latestHeight uint64 + blocksAtHeight map[uint64]map[flow.Identifier]struct{} + metrics map[flow.Identifier][]TransactionExecutionMetrics +} + +func newCollector( + log zerolog.Logger, + latestHeight uint64, +) *collector { + return &collector{ + log: log, + latestHeight: latestHeight, + + collection: make(chan metrics, 1000), + mu: &sync.Mutex{}, + blocksAtHeight: make(map[uint64]map[flow.Identifier]struct{}), + metrics: make(map[flow.Identifier][]TransactionExecutionMetrics), + } +} + +// Collect should never block because it's called from the execution +func (c *collector) Collect( + blockId flow.Identifier, + blockHeight uint64, + t TransactionExecutionMetrics, +) { + select { + case c.collection <- metrics{ + TransactionExecutionMetrics: t, + blockHeight: blockHeight, + blockId: blockId, + }: + default: + c.log.Warn(). + Uint64("height", blockHeight). + Msg("dropping metrics because the collection channel is full") + } +} + +func (c *collector) metricsCollectorWorker( + ctx irrecoverable.SignalerContext, + ready component.ReadyFunc, +) { + ready() + + for { + select { + case <-ctx.Done(): + return + case m := <-c.collection: + c.collect(m.blockId, m.blockHeight, m.TransactionExecutionMetrics) + } + } +} + +func (c *collector) collect( + blockId flow.Identifier, + blockHeight uint64, + t TransactionExecutionMetrics, +) { + c.mu.Lock() + defer c.mu.Unlock() + + if blockHeight <= c.latestHeight { + c.log.Warn(). + Uint64("height", blockHeight). + Uint64("latestHeight", c.latestHeight). + Msg("received metrics for a block that is older or equal than the most recent block") + return + } + + if _, ok := c.blocksAtHeight[blockHeight]; !ok { + c.blocksAtHeight[blockHeight] = make(map[flow.Identifier]struct{}) + } + c.blocksAtHeight[blockHeight][blockId] = struct{}{} + c.metrics[blockId] = append(c.metrics[blockId], t) +} + +// Pop returns the metrics for the given block at the given height +// and clears all data up to the given height. +func (c *collector) Pop(height uint64, block flow.Identifier) []TransactionExecutionMetrics { + c.mu.Lock() + defer c.mu.Unlock() + + if height <= c.latestHeight && c.latestHeight != 0 { + c.log.Warn(). + Uint64("height", height). + Stringer("block", block). + Msg("requested metrics for a block that is older or equal than the most recent block") + return nil + } + + metrics := c.metrics[block] + + c.advanceTo(height) + + return metrics +} + +// advanceTo moves the latest height to the given height +// all data at lower heights will be deleted +func (c *collector) advanceTo(height uint64) { + for c.latestHeight < height { + c.latestHeight++ + blocks := c.blocksAtHeight[c.latestHeight] + for block := range blocks { + delete(c.metrics, block) + } + delete(c.blocksAtHeight, c.latestHeight) + } +} diff --git a/engine/execution/computation/metrics/collector_test.go b/engine/execution/computation/metrics/collector_test.go new file mode 100644 index 00000000000..3f103c9638a --- /dev/null +++ b/engine/execution/computation/metrics/collector_test.go @@ -0,0 +1,82 @@ +package metrics + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/irrecoverable" +) + +func Test_CollectorPopOnEmpty(t *testing.T) { + t.Parallel() + + log := zerolog.New(zerolog.NewTestWriter(t)) + latestHeight := uint64(100) + + collector := newCollector(log, latestHeight) + + data := collector.Pop(latestHeight, flow.ZeroID) + require.Nil(t, data) +} + +func Test_CollectorCollection(t *testing.T) { + log := zerolog.New(zerolog.NewTestWriter(t)) + latestHeight := uint64(100) + + collector := newCollector(log, latestHeight) + + ctx := context.Background() + go func() { + ictx := irrecoverable.NewMockSignalerContext(t, ctx) + collector.metricsCollectorWorker(ictx, func() {}) + }() + + wg := sync.WaitGroup{} + + wg.Add(16 * 16 * 16) + for h := 0; h < 16; h++ { + for b := 0; b < 16; b++ { + for t := 0; t < 16; t++ { + go func(h, b, t int) { + defer wg.Done() + + block := flow.Identifier{} + // 4 different blocks per height + block[0] = byte(h) + block[1] = byte(b) + + collector.Collect(block, latestHeight+1+uint64(h), TransactionExecutionMetrics{ + ExecutionTime: time.Duration(b + t), + }) + }(h, b, t) + } + // wait a bit for the collector to process the data + <-time.After(1 * time.Millisecond) + } + } + + wg.Wait() + // wait a bit for the collector to process the data + <-time.After(10 * time.Millisecond) + + for h := 0; h < 16; h++ { + block := flow.Identifier{} + block[0] = byte(h) + + data := collector.Pop(latestHeight+1+uint64(h), block) + + require.Len(t, data, 16) + } + + data := collector.Pop(latestHeight, flow.ZeroID) + require.Nil(t, data) + + data = collector.Pop(latestHeight+17, flow.ZeroID) + require.Nil(t, data) +} diff --git a/engine/execution/computation/metrics/provider.go b/engine/execution/computation/metrics/provider.go new file mode 100644 index 00000000000..57253f18771 --- /dev/null +++ b/engine/execution/computation/metrics/provider.go @@ -0,0 +1,120 @@ +package metrics + +import ( + "fmt" + "sync" + + "github.com/rs/zerolog" +) + +// provider is responsible for providing the metrics for the rpc endpoint +// it has a circular buffer of metrics for the last N finalized and executed blocks. +type provider struct { + log zerolog.Logger + + mu *sync.RWMutex + + bufferSize uint + bufferIndex uint + blockHeightAtBufferIndex uint64 + + buffer [][]TransactionExecutionMetrics +} + +func newProvider( + log zerolog.Logger, + bufferSize uint, + blockHeightAtBufferIndex uint64, +) *provider { + if bufferSize == 0 { + panic("buffer size must be greater than zero") + } + + return &provider{ + mu: &sync.RWMutex{}, + log: log, + bufferSize: bufferSize, + blockHeightAtBufferIndex: blockHeightAtBufferIndex, + bufferIndex: 0, + buffer: make([][]TransactionExecutionMetrics, bufferSize), + } +} + +func (p *provider) Push( + height uint64, + data []TransactionExecutionMetrics, +) { + p.mu.Lock() + defer p.mu.Unlock() + + if height <= p.blockHeightAtBufferIndex { + p.log.Warn(). + Uint64("height", height). + Uint64("latestHeight", p.blockHeightAtBufferIndex). + Msg("received metrics for a block that is older or equal than the most recent block") + return + } + if height > p.blockHeightAtBufferIndex+1 { + p.log.Warn(). + Uint64("height", height). + Uint64("latestHeight", p.blockHeightAtBufferIndex). + Msg("received metrics for a block that is not the next block") + + // Fill in the gap with nil + for i := p.blockHeightAtBufferIndex; i < height-1; i++ { + p.pushData(nil) + } + } + + p.pushData(data) +} + +func (p *provider) pushData(data []TransactionExecutionMetrics) { + p.bufferIndex = (p.bufferIndex + 1) % p.bufferSize + p.blockHeightAtBufferIndex++ + p.buffer[p.bufferIndex] = data +} + +func (p *provider) GetTransactionExecutionMetricsAfter(height uint64) (GetTransactionExecutionMetricsAfterResponse, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + data := make(map[uint64][]TransactionExecutionMetrics) + + if height+1 > p.blockHeightAtBufferIndex { + return data, nil + } + + // start index is the lowest block height that is in the buffer + startIndex := uint64(0) + if p.blockHeightAtBufferIndex < uint64(p.bufferSize) { + startIndex = 0 + } else { + startIndex = p.blockHeightAtBufferIndex - uint64(p.bufferSize) + } + + // if the starting index is lower than the height we only need to return the data for + // the blocks that are later than the given height + if height+1 > startIndex { + startIndex = height + 1 + } + + for i := startIndex; i <= p.blockHeightAtBufferIndex; i++ { + // 0 <= diff + diff := uint(p.blockHeightAtBufferIndex - i) + + // 0 <= diff < bufferSize + // we add bufferSize to avoid negative values + index := (p.bufferIndex + (p.bufferSize - diff)) % p.bufferSize + d := p.buffer[index] + if len(d) == 0 { + continue + } + + data[i] = p.buffer[index] + } + + return data, nil +} + +var NoTransactionExecutionMetricsError = fmt.Errorf("no transaction execution metrics available") diff --git a/engine/execution/computation/metrics/provider_test.go b/engine/execution/computation/metrics/provider_test.go new file mode 100644 index 00000000000..931aa42dd8d --- /dev/null +++ b/engine/execution/computation/metrics/provider_test.go @@ -0,0 +1,108 @@ +package metrics + +import ( + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func Test_ProviderGetOnEmpty(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + for i := 0; uint(i) < bufferSize; i++ { + data, err := provider.GetTransactionExecutionMetricsAfter(height - uint64(i)) + require.NoError(t, err) + require.Len(t, data, 0) + } +} + +func Test_ProviderGetOutOfBounds(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + res, err := provider.GetTransactionExecutionMetricsAfter(height + 1) + require.NoError(t, err) + require.Len(t, res, 0) +} + +func Test_ProviderPushSequential(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + for i := 0; uint(i) < bufferSize; i++ { + data := []TransactionExecutionMetrics{ + { + // Execution time is our label + ExecutionTime: time.Duration(i), + }, + } + + provider.Push(height+uint64(i)+1, data) + } + + data, err := provider.GetTransactionExecutionMetricsAfter(height) + require.Nil(t, err) + for i := 0; uint(i) < bufferSize; i++ { + require.Equal(t, time.Duration(uint(i)), data[height+uint64(i)+1][0].ExecutionTime) + } +} + +func Test_ProviderPushOutOfSequence(t *testing.T) { + t.Parallel() + + height := uint64(100) + bufferSize := uint(10) + log := zerolog.New(zerolog.NewTestWriter(t)) + + provider := newProvider(log, bufferSize, height) + + for i := 0; uint(i) < bufferSize; i++ { + data := []TransactionExecutionMetrics{ + { + ExecutionTime: time.Duration(i), + }, + } + + provider.Push(height+uint64(i)+1, data) + } + + newHeight := height + uint64(bufferSize) + + // Push out of sequence + data := []TransactionExecutionMetrics{ + { + ExecutionTime: time.Duration(newHeight + 2), + }, + } + + // no-op + provider.Push(newHeight, data) + + // skip 1 + provider.Push(newHeight+2, data) + + res, err := provider.GetTransactionExecutionMetricsAfter(height) + require.NoError(t, err) + + require.Len(t, res, int(bufferSize)) + + require.Equal(t, time.Duration(newHeight+2), res[newHeight+2][0].ExecutionTime) +} diff --git a/engine/execution/computation/metrics/transaction_execution_metrics.go b/engine/execution/computation/metrics/transaction_execution_metrics.go new file mode 100644 index 00000000000..7e5fe207bad --- /dev/null +++ b/engine/execution/computation/metrics/transaction_execution_metrics.go @@ -0,0 +1,170 @@ +package metrics + +import ( + "time" + + cadenceCommon "github.com/onflow/cadence/runtime/common" + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/engine/execution/state" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/state/protocol" + psEvents "github.com/onflow/flow-go/state/protocol/events" + "github.com/onflow/flow-go/storage" +) + +type TransactionExecutionMetricsProvider interface { + component.Component + protocol.Consumer + + // GetTransactionExecutionMetricsAfter returns the transaction metrics for all blocks higher than the given height + // It returns a map of block height to a list of transaction execution metrics + // Blocks that are out of scope (only a limited number blocks are kept in memory) are not returned + GetTransactionExecutionMetricsAfter(height uint64) (GetTransactionExecutionMetricsAfterResponse, error) + + // Collect the transaction metrics for the given block + // Collect does not block, it returns immediately + Collect( + blockId flow.Identifier, + blockHeight uint64, + t TransactionExecutionMetrics, + ) +} + +// GetTransactionExecutionMetricsAfterResponse is the response type for GetTransactionExecutionMetricsAfter +// It is a map of block height to a list of transaction execution metrics +type GetTransactionExecutionMetricsAfterResponse = map[uint64][]TransactionExecutionMetrics + +type TransactionExecutionMetrics struct { + TransactionID flow.Identifier + ExecutionTime time.Duration + ExecutionEffortWeights map[cadenceCommon.ComputationKind]uint +} + +type metrics struct { + TransactionExecutionMetrics + blockHeight uint64 + blockId flow.Identifier +} + +// transactionExecutionMetricsProvider is responsible for providing the metrics for the rpc endpoint. +// It has a circular buffer of metrics for the last N finalized and executed blocks. +// The metrics are not guaranteed to be available for all blocks. If the node is just starting up or catching up +// to the latest finalized block, some blocks may not have metrics available. +// The metrics are intended to be used for monitoring and analytics purposes. +type transactionExecutionMetricsProvider struct { + // collector is responsible for collecting the metrics + // the collector collects the metrics from the execution during block execution + // on a finalized and executed block, the metrics are moved to the provider, + // all non-finalized metrics for that height are discarded + *collector + + // provider is responsible for providing the metrics for the rpc endpoint + // it has a circular buffer of metrics for the last N finalized and executed blocks. + *provider + + component.Component + // transactionExecutionMetricsProvider needs to consume BlockFinalized events. + psEvents.Noop + + log zerolog.Logger + + executionState state.FinalizedExecutionState + headers storage.Headers + blockFinalized chan struct{} + + latestFinalizedAndExecuted *flow.Header +} + +var _ TransactionExecutionMetricsProvider = (*transactionExecutionMetricsProvider)(nil) + +func NewTransactionExecutionMetricsProvider( + log zerolog.Logger, + executionState state.FinalizedExecutionState, + headers storage.Headers, + latestFinalizedBlock *flow.Header, + bufferSize uint, +) TransactionExecutionMetricsProvider { + log = log.With().Str("component", "transaction_execution_metrics_provider").Logger() + + collector := newCollector(log, latestFinalizedBlock.Height) + provider := newProvider(log, bufferSize, latestFinalizedBlock.Height) + + p := &transactionExecutionMetricsProvider{ + collector: collector, + provider: provider, + log: log, + executionState: executionState, + headers: headers, + blockFinalized: make(chan struct{}), + latestFinalizedAndExecuted: latestFinalizedBlock, + } + + cm := component.NewComponentManagerBuilder() + cm.AddWorker(collector.metricsCollectorWorker) + cm.AddWorker(p.blockFinalizedWorker) + + p.Component = cm.Build() + + return p +} + +func (p *transactionExecutionMetricsProvider) BlockFinalized(*flow.Header) { + // only handle a single finalized event at a time + select { + case p.blockFinalized <- struct{}{}: + default: + } +} + +// move data from the collector to the provider +func (p *transactionExecutionMetricsProvider) onBlockExecutedAndFinalized(block flow.Identifier, height uint64) { + data := p.collector.Pop(height, block) + p.provider.Push(height, data) +} + +func (p *transactionExecutionMetricsProvider) blockFinalizedWorker( + ctx irrecoverable.SignalerContext, + ready component.ReadyFunc, +) { + ready() + + for { + select { + case <-ctx.Done(): + return + case <-p.blockFinalized: + p.onExecutedAndFinalized() + } + } +} + +func (p *transactionExecutionMetricsProvider) onExecutedAndFinalized() { + latestFinalizedAndExecutedHeight, err := p.executionState.GetHighestFinalizedExecuted() + + if err != nil { + p.log.Warn().Err(err).Msg("could not get highest finalized executed") + return + } + + // the latest finalized and executed block could be more than one block further than the last one handled + // step through all blocks between the last one handled and the latest finalized and executed + for height := p.latestFinalizedAndExecuted.Height + 1; height <= latestFinalizedAndExecutedHeight; height++ { + header, err := p.headers.ByHeight(height) + if err != nil { + p.log.Warn(). + Err(err). + Uint64("height", height). + Msg("could not get header by height") + return + } + + p.onBlockExecutedAndFinalized(header.ID(), height) + + if header.Height == latestFinalizedAndExecutedHeight { + p.latestFinalizedAndExecuted = header + } + } +} diff --git a/module/metrics/execution.go b/module/metrics/execution.go index 7419ec3014a..cf110a0918d 100644 --- a/module/metrics/execution.go +++ b/module/metrics/execution.go @@ -849,6 +849,7 @@ func (ec *ExecutionCollector) ExecutionTransactionExecuted( if stats.Failed { ec.totalFailedTransactionsCounter.Inc() } + } // ExecutionChunkDataPackGenerated reports stats on chunk data pack generation @@ -1081,3 +1082,44 @@ func (ec *ExecutionCollector) ExecutionComputationResultUploaded() { func (ec *ExecutionCollector) ExecutionComputationResultUploadRetried() { ec.computationResultUploadRetriedCount.Inc() } + +type ExecutionCollectorWithTransactionCallback struct { + *ExecutionCollector + TransactionCallback func( + dur time.Duration, + stats module.TransactionExecutionResultStats, + info module.TransactionExecutionResultInfo, + ) +} + +func (ec *ExecutionCollector) WithTransactionCallback( + callback func( + time.Duration, + module.TransactionExecutionResultStats, + module.TransactionExecutionResultInfo, + ), +) *ExecutionCollectorWithTransactionCallback { + // if callback is nil, use a no-op callback + if callback == nil { + callback = func( + time.Duration, + module.TransactionExecutionResultStats, + module.TransactionExecutionResultInfo, + ) { + } + } + + return &ExecutionCollectorWithTransactionCallback{ + ExecutionCollector: ec, + TransactionCallback: callback, + } +} + +func (ec *ExecutionCollectorWithTransactionCallback) ExecutionTransactionExecuted( + dur time.Duration, + stats module.TransactionExecutionResultStats, + info module.TransactionExecutionResultInfo, +) { + ec.ExecutionCollector.ExecutionTransactionExecuted(dur, stats, info) + ec.TransactionCallback(dur, stats, info) +} From 5f0dfa90bb4c8b8b05beb48a27e7c83207e87eae Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Tue, 23 Jul 2024 15:34:16 +0200 Subject: [PATCH 02/18] add grpc endpoint for transaction execution metrics --- cmd/execution_builder.go | 1 + engine/access/mock/execution_api_client.go | 37 ++++++++++++++++ engine/access/mock/execution_api_server.go | 30 +++++++++++++ engine/execution/rpc/engine.go | 51 ++++++++++++++++++++++ go.mod | 2 +- go.sum | 4 +- insecure/go.mod | 2 +- insecure/go.sum | 4 +- integration/go.mod | 2 +- integration/go.sum | 4 +- 10 files changed, 128 insertions(+), 9 deletions(-) diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index d03334b6ada..e73ccd4edd5 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -1364,6 +1364,7 @@ func (exeNode *ExecutionNode) LoadGrpcServer( exeNode.results, exeNode.txResults, node.Storage.Commits, + exeNode.metricsProvider, node.RootChainID, signature.NewBlockSignerDecoder(exeNode.committee), exeNode.exeConf.apiRatelimits, diff --git a/engine/access/mock/execution_api_client.go b/engine/access/mock/execution_api_client.go index fd5fc20c718..7aa04d143c7 100644 --- a/engine/access/mock/execution_api_client.go +++ b/engine/access/mock/execution_api_client.go @@ -349,6 +349,43 @@ func (_m *ExecutionAPIClient) GetTransactionErrorMessagesByBlockID(ctx context.C return r0, r1 } +// GetTransactionExecutionMetricsAfter provides a mock function with given fields: ctx, in, opts +func (_m *ExecutionAPIClient) GetTransactionExecutionMetricsAfter(ctx context.Context, in *execution.GetTransactionExecutionMetricsAfterRequest, opts ...grpc.CallOption) (*execution.GetTransactionExecutionMetricsAfterResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for GetTransactionExecutionMetricsAfter") + } + + var r0 *execution.GetTransactionExecutionMetricsAfterResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest, ...grpc.CallOption) (*execution.GetTransactionExecutionMetricsAfterResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest, ...grpc.CallOption) *execution.GetTransactionExecutionMetricsAfterResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*execution.GetTransactionExecutionMetricsAfterResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetTransactionResult provides a mock function with given fields: ctx, in, opts func (_m *ExecutionAPIClient) GetTransactionResult(ctx context.Context, in *execution.GetTransactionResultRequest, opts ...grpc.CallOption) (*execution.GetTransactionResultResponse, error) { _va := make([]interface{}, len(opts)) diff --git a/engine/access/mock/execution_api_server.go b/engine/access/mock/execution_api_server.go index e61517cb617..2c3f1f3b5a7 100644 --- a/engine/access/mock/execution_api_server.go +++ b/engine/access/mock/execution_api_server.go @@ -284,6 +284,36 @@ func (_m *ExecutionAPIServer) GetTransactionErrorMessagesByBlockID(_a0 context.C return r0, r1 } +// GetTransactionExecutionMetricsAfter provides a mock function with given fields: _a0, _a1 +func (_m *ExecutionAPIServer) GetTransactionExecutionMetricsAfter(_a0 context.Context, _a1 *execution.GetTransactionExecutionMetricsAfterRequest) (*execution.GetTransactionExecutionMetricsAfterResponse, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for GetTransactionExecutionMetricsAfter") + } + + var r0 *execution.GetTransactionExecutionMetricsAfterResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest) (*execution.GetTransactionExecutionMetricsAfterResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest) *execution.GetTransactionExecutionMetricsAfterResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*execution.GetTransactionExecutionMetricsAfterResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *execution.GetTransactionExecutionMetricsAfterRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetTransactionResult provides a mock function with given fields: _a0, _a1 func (_m *ExecutionAPIServer) GetTransactionResult(_a0 context.Context, _a1 *execution.GetTransactionResultRequest) (*execution.GetTransactionResultResponse, error) { ret := _m.Called(_a0, _a1) diff --git a/engine/execution/rpc/engine.go b/engine/execution/rpc/engine.go index 260495f1bf1..67b0d50febe 100644 --- a/engine/execution/rpc/engine.go +++ b/engine/execution/rpc/engine.go @@ -26,6 +26,7 @@ import ( "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/rpc/convert" exeEng "github.com/onflow/flow-go/engine/execution" + "github.com/onflow/flow-go/engine/execution/computation/metrics" "github.com/onflow/flow-go/engine/execution/state" fvmerrors "github.com/onflow/flow-go/fvm/errors" "github.com/onflow/flow-go/model/flow" @@ -62,6 +63,7 @@ func New( exeResults storage.ExecutionResults, txResults storage.TransactionResults, commits storage.Commits, + transactionMetrics metrics.TransactionExecutionMetricsProvider, chainID flow.ChainID, signerIndicesDecoder hotstuff.BlockSignerDecoder, apiRatelimits map[string]int, // the api rate limit (max calls per second) for each of the gRPC API e.g. Ping->100, ExecuteScriptAtBlockID->300 @@ -105,6 +107,7 @@ func New( exeResults: exeResults, transactionResults: txResults, commits: commits, + transactionMetrics: transactionMetrics, log: log, maxBlockRange: DefaultMaxBlockRange, }, @@ -166,6 +169,7 @@ type handler struct { transactionResults storage.TransactionResults log zerolog.Logger commits storage.Commits + transactionMetrics metrics.TransactionExecutionMetricsProvider maxBlockRange int } @@ -802,6 +806,53 @@ func (h *handler) blockHeaderResponse(header *flow.Header) (*execution.BlockHead }, nil } +// GetTransactionExecutionMetricsAfter gets the execution metrics for a transaction after a given block. +func (h *handler) GetTransactionExecutionMetricsAfter( + _ context.Context, + req *execution.GetTransactionExecutionMetricsAfterRequest, +) (*execution.GetTransactionExecutionMetricsAfterResponse, error) { + height := req.GetBlockHeight() + + metrics, err := h.transactionMetrics.GetTransactionExecutionMetricsAfter(height) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get metrics after block height %v: %v", height, err) + } + + response := &execution.GetTransactionExecutionMetricsAfterResponse{ + Results: make([]*execution.GetTransactionExecutionMetricsAfterResponse_Result, 0, len(metrics)), + } + + for blockHeight, blockMetrics := range metrics { + blockResponse := &execution.GetTransactionExecutionMetricsAfterResponse_Result{ + BlockHeight: blockHeight, + Transactions: make([]*execution.GetTransactionExecutionMetricsAfterResponse_Transaction, 0, len(blockMetrics)), + } + + for i, transactionMetrics := range blockMetrics { + transactionMetricsResponse := &execution.GetTransactionExecutionMetricsAfterResponse_Transaction{ + TransactionId: transactionMetrics.TransactionID[:], + ExecutionTime: uint64(transactionMetrics.ExecutionTime.Nanoseconds()), + ExecutionEffortWeights: make([]*execution.GetTransactionExecutionMetricsAfterResponse_ExecutionEffortWeight, 0, len(transactionMetrics.ExecutionEffortWeights)), + } + + for kind, weight := range transactionMetrics.ExecutionEffortWeights { + transactionMetricsResponse.ExecutionEffortWeights = append( + transactionMetricsResponse.ExecutionEffortWeights, + &execution.GetTransactionExecutionMetricsAfterResponse_ExecutionEffortWeight{ + Kind: uint64(kind), + Weight: uint64(weight), + }, + ) + } + + blockResponse.Transactions[i] = transactionMetricsResponse + } + response.Results = append(response.Results, blockResponse) + } + + return response, nil +} + // additional check that when there is no event in the block, double check if the execution // result has no events as well, otherwise return an error. // we check the execution result has no event by checking if each chunk's EventCollection is diff --git a/go.mod b/go.mod index 50230de0cf0..b162fd90e1d 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/onflow/flow-core-contracts/lib/go/contracts v1.3.1 github.com/onflow/flow-core-contracts/lib/go/templates v1.3.1 github.com/onflow/flow-go-sdk v1.0.0-preview.41 - github.com/onflow/flow/protobuf/go/flow v0.4.5 + github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240722184235-2bd609e53ea4 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pierrec/lz4 v2.6.1+incompatible github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index ecd4ff47c65..50adf0625ab 100644 --- a/go.sum +++ b/go.sum @@ -2190,8 +2190,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0= github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= -github.com/onflow/flow/protobuf/go/flow v0.4.5 h1:6o+pgYGqwXdEhqSJxu2BdnDXkOQVOkfGAb6IiXB+NPM= -github.com/onflow/flow/protobuf/go/flow v0.4.5/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240722184235-2bd609e53ea4 h1:IjlMTmEAfaC5gIQoXSICbD9OO16/rdVpgOrkm0BELRw= +github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240722184235-2bd609e53ea4/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I= github.com/onflow/nft-storefront/lib/go/contracts v1.0.0 h1:sxyWLqGm/p4EKT6DUlQESDG1ZNMN9GjPCm1gTq7NGfc= diff --git a/insecure/go.mod b/insecure/go.mod index 5343d191829..9254a2c02f7 100644 --- a/insecure/go.mod +++ b/insecure/go.mod @@ -209,7 +209,7 @@ require ( github.com/onflow/flow-go-sdk v1.0.0-preview.41 // indirect github.com/onflow/flow-nft/lib/go/contracts v1.2.1 // indirect github.com/onflow/flow-nft/lib/go/templates v1.2.0 // indirect - github.com/onflow/flow/protobuf/go/flow v0.4.5 // indirect + github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240722184235-2bd609e53ea4 // indirect github.com/onflow/go-ethereum v1.14.7 // indirect github.com/onflow/sdks v0.6.0-preview.1 // indirect github.com/onflow/wal v1.0.2 // indirect diff --git a/insecure/go.sum b/insecure/go.sum index 8c233a62bf0..b14d2866edd 100644 --- a/insecure/go.sum +++ b/insecure/go.sum @@ -2178,8 +2178,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0= github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= -github.com/onflow/flow/protobuf/go/flow v0.4.5 h1:6o+pgYGqwXdEhqSJxu2BdnDXkOQVOkfGAb6IiXB+NPM= -github.com/onflow/flow/protobuf/go/flow v0.4.5/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240722184235-2bd609e53ea4 h1:IjlMTmEAfaC5gIQoXSICbD9OO16/rdVpgOrkm0BELRw= +github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240722184235-2bd609e53ea4/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I= github.com/onflow/sdks v0.5.1-0.20230912225508-b35402f12bba/go.mod h1:F0dj0EyHC55kknLkeD10js4mo14yTdMotnWMslPirrU= diff --git a/integration/go.mod b/integration/go.mod index e44e89f7816..88632817f2b 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -27,7 +27,7 @@ require ( github.com/onflow/flow-go v0.36.2-0.20240717214129-9ea6faeee3e7 github.com/onflow/flow-go-sdk v1.0.0-preview.41 github.com/onflow/flow-go/insecure v0.0.0-00010101000000-000000000000 - github.com/onflow/flow/protobuf/go/flow v0.4.5 + github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240722184235-2bd609e53ea4 github.com/onflow/go-ethereum v1.14.7 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_model v0.5.0 diff --git a/integration/go.sum b/integration/go.sum index 41c47576042..dbafea7bc50 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -2164,8 +2164,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0= github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= -github.com/onflow/flow/protobuf/go/flow v0.4.5 h1:6o+pgYGqwXdEhqSJxu2BdnDXkOQVOkfGAb6IiXB+NPM= -github.com/onflow/flow/protobuf/go/flow v0.4.5/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240722184235-2bd609e53ea4 h1:IjlMTmEAfaC5gIQoXSICbD9OO16/rdVpgOrkm0BELRw= +github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240722184235-2bd609e53ea4/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I= github.com/onflow/sdks v0.5.1-0.20230912225508-b35402f12bba/go.mod h1:F0dj0EyHC55kknLkeD10js4mo14yTdMotnWMslPirrU= From a698e5ccd5e085aeb1fa35334276f9e9c8da9c49 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Tue, 23 Jul 2024 21:12:59 +0200 Subject: [PATCH 03/18] add integration test --- cmd/execution_builder.go | 10 +- engine/execution/rpc/engine.go | 2 +- .../execution/transaction_metrics_test.go | 116 ++++++++++++++++++ 3 files changed, 125 insertions(+), 3 deletions(-) create mode 100644 integration/tests/execution/transaction_metrics_test.go diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index e73ccd4edd5..4be0400b894 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -547,6 +547,7 @@ func (exeNode *ExecutionNode) LoadProviderEngine( vmCtx := fvm.NewContext(opts...) + // inject the transaction execution metrics collector := exeNode.collector.WithTransactionCallback( func(dur time.Duration, stats module.TransactionExecutionResultStats, info module.TransactionExecutionResultInfo) { exeNode.metricsProvider.Collect( @@ -562,7 +563,6 @@ func (exeNode *ExecutionNode) LoadProviderEngine( ledgerViewCommitter := committer.NewLedgerViewCommitter(exeNode.ledgerStorage, node.Tracer) manager, err := computation.New( node.Logger, - // todo inject metrics for computation intensities collector, node.Tracer, node.Me, @@ -1151,13 +1151,19 @@ func (exeNode *ExecutionNode) LoadTransactionExecutionMetrics( return nil, fmt.Errorf("could not get latest finalized block: %w", err) } + // buffer size is the number of blocks that are kept in memory by the metrics provider + // If the size is to small the clients might not have the opportunity to get the metrics for all blocks + // If the size is too large the memory usage will increase + bufferSize := uint(200) + metricsProvider := txmetrics.NewTransactionExecutionMetricsProvider( node.Logger, exeNode.executionState, node.Storage.Headers, latestFinalizedBlock, - 1000, + bufferSize, ) + node.ProtocolEvents.AddConsumer(metricsProvider) exeNode.metricsProvider = metricsProvider return metricsProvider, nil diff --git a/engine/execution/rpc/engine.go b/engine/execution/rpc/engine.go index 67b0d50febe..39e1e3b5957 100644 --- a/engine/execution/rpc/engine.go +++ b/engine/execution/rpc/engine.go @@ -825,7 +825,7 @@ func (h *handler) GetTransactionExecutionMetricsAfter( for blockHeight, blockMetrics := range metrics { blockResponse := &execution.GetTransactionExecutionMetricsAfterResponse_Result{ BlockHeight: blockHeight, - Transactions: make([]*execution.GetTransactionExecutionMetricsAfterResponse_Transaction, 0, len(blockMetrics)), + Transactions: make([]*execution.GetTransactionExecutionMetricsAfterResponse_Transaction, len(blockMetrics)), } for i, transactionMetrics := range blockMetrics { diff --git a/integration/tests/execution/transaction_metrics_test.go b/integration/tests/execution/transaction_metrics_test.go new file mode 100644 index 00000000000..2e0ba03df5e --- /dev/null +++ b/integration/tests/execution/transaction_metrics_test.go @@ -0,0 +1,116 @@ +package execution + +import ( + "bytes" + "context" + "testing" + + "github.com/onflow/flow/protobuf/go/flow/execution" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/onflow/flow-go/integration/testnet" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + sdk "github.com/onflow/flow-go-sdk" + + "github.com/onflow/flow-go/integration/tests/lib" +) + +func TestTransactionMetrics(t *testing.T) { + suite.Run(t, new(TransactionMetricsSuite)) +} + +type TransactionMetricsSuite struct { + Suite +} + +func (s *TransactionMetricsSuite) TestTransactionMetrics() { + accessClient := s.AccessClient() + + // wait for next height finalized (potentially first height), called blockA + currentFinalized := s.BlockState.HighestFinalizedHeight() + blockA := s.BlockState.WaitForHighestFinalizedProgress(s.T(), currentFinalized) + s.T().Logf("got blockA height %v ID %v\n", blockA.Header.Height, blockA.Header.ID()) + + // send transaction + tx, err := accessClient.DeployContract(context.Background(), sdk.Identifier(s.net.Root().ID()), lib.CounterContract) + require.NoError(s.T(), err, "could not deploy counter") + + txres, err := accessClient.WaitForExecuted(context.Background(), tx.ID()) + require.NoError(s.T(), err, "could not wait for tx to be executed") + require.NoError(s.T(), txres.Error) + + client, closeClient := s.getClient() + defer func() { + _ = closeClient() + }() + + result, err := client.GetTransactionExecutionMetricsAfter( + context.Background(), + &execution.GetTransactionExecutionMetricsAfterRequest{ + BlockHeight: 0, + }, + ) + + require.NoError(s.T(), err, "could not get transaction execution metrics") + require.NotNil(s.T(), result.Results) + // there should be at least some results, due to each block having at least 1 transaction + require.Greater(s.T(), len(result.Results), 10) + + latestBlockResult := uint64(0) + for _, result := range result.Results { + if result.BlockHeight > latestBlockResult { + latestBlockResult = result.BlockHeight + } + } + + // send another transaction + tx, err = accessClient.UpdateContract(context.Background(), sdk.Identifier(s.net.Root().ID()), lib.CounterContract) + require.NoError(s.T(), err, "could not deploy counter") + + txres, err = accessClient.WaitForExecuted(context.Background(), tx.ID()) + require.NoError(s.T(), err, "could not wait for tx to be executed") + require.NoError(s.T(), txres.Error) + + result, err = client.GetTransactionExecutionMetricsAfter( + context.Background(), + &execution.GetTransactionExecutionMetricsAfterRequest{ + BlockHeight: latestBlockResult, + }, + ) + + require.NoError(s.T(), err, "could not get transaction execution metrics") + // there could be only 1 block since the last time + require.Greater(s.T(), len(result.Results), 0) + + transactionExists := false + for _, result := range result.Results { + for _, transaction := range result.Transactions { + if bytes.Equal(transaction.TransactionId, tx.ID().Bytes()) { + transactionExists = true + + // check that the transaction metrics are not 0 + require.Greater(s.T(), transaction.ExecutionTime, uint64(0)) + require.Greater(s.T(), len(transaction.ExecutionEffortWeights), 0) + } + } + require.Less(s.T(), latestBlockResult, result.BlockHeight) + + } + require.True(s.T(), transactionExists) +} + +func (s *TransactionMetricsSuite) getClient() (execution.ExecutionAPIClient, func() error) { + + exe1ID := s.net.ContainerByID(s.exe1ID) + addr := exe1ID.Addr(testnet.GRPCPort) + + conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(s.T(), err, "could not create execution client") + + grpcClient := execution.NewExecutionAPIClient(conn) + return grpcClient, conn.Close +} From e559f66fe873e5fbf9028a470f444fa5c220aff2 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Fri, 26 Jul 2024 12:26:50 +0200 Subject: [PATCH 04/18] cleanup and add a way to disable tx metric collection --- cmd/execution_builder.go | 28 ++++++----- cmd/execution_config.go | 2 + .../computation/metrics/collector.go | 8 ++-- .../computation/metrics/collector_test.go | 46 +++++++++++++------ .../execution/computation/metrics/provider.go | 24 +++++----- .../metrics/transaction_execution_metrics.go | 18 ++++---- 6 files changed, 73 insertions(+), 53 deletions(-) diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 4be0400b894..999524296a9 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -547,18 +547,22 @@ func (exeNode *ExecutionNode) LoadProviderEngine( vmCtx := fvm.NewContext(opts...) - // inject the transaction execution metrics - collector := exeNode.collector.WithTransactionCallback( - func(dur time.Duration, stats module.TransactionExecutionResultStats, info module.TransactionExecutionResultInfo) { - exeNode.metricsProvider.Collect( - info.BlockID, - info.BlockHeight, - txmetrics.TransactionExecutionMetrics{ - TransactionID: info.TransactionID, - ExecutionTime: dur, - ExecutionEffortWeights: stats.ComputationIntensities, - }) - }) + var collector module.ExecutionMetrics + collector = exeNode.collector + if exeNode.exeConf.transactionExecutionMetricsEnabled { + // inject the transaction execution metrics + collector = exeNode.collector.WithTransactionCallback( + func(dur time.Duration, stats module.TransactionExecutionResultStats, info module.TransactionExecutionResultInfo) { + exeNode.metricsProvider.Collect( + info.BlockID, + info.BlockHeight, + txmetrics.TransactionExecutionMetrics{ + TransactionID: info.TransactionID, + ExecutionTime: dur, + ExecutionEffortWeights: stats.ComputationIntensities, + }) + }) + } ledgerViewCommitter := committer.NewLedgerViewCommitter(exeNode.ledgerStorage, node.Tracer) manager, err := computation.New( diff --git a/cmd/execution_config.go b/cmd/execution_config.go index 63f2b893c01..b979e46902b 100644 --- a/cmd/execution_config.go +++ b/cmd/execution_config.go @@ -54,6 +54,7 @@ type ExecutionConfig struct { chunkDataPackRequestWorkers uint maxGracefulStopDuration time.Duration importCheckpointWorkerCount int + transactionExecutionMetricsEnabled bool // evm tracing configuration evmTracingEnabled bool @@ -122,6 +123,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) { flags.IntVar(&exeConf.blobstoreBurstLimit, "blobstore-burst-limit", 0, "outgoing burst limit for Execution Data blobstore") flags.DurationVar(&exeConf.maxGracefulStopDuration, "max-graceful-stop-duration", stop.DefaultMaxGracefulStopDuration, "the maximum amount of time stop control will wait for ingestion engine to gracefully shutdown before crashing") flags.IntVar(&exeConf.importCheckpointWorkerCount, "import-checkpoint-worker-count", 10, "number of workers to import checkpoint file during bootstrap") + flags.BoolVar(&exeConf.transactionExecutionMetricsEnabled, "tx-execution-metrics", true, "enable collection of transaction execution metrics") flags.BoolVar(&exeConf.evmTracingEnabled, "evm-tracing-enabled", false, "enable EVM tracing, when set it will generate traces and upload them to the GCP bucket provided by the --evm-traces-gcp-bucket. Warning: this might affect speed of execution") flags.StringVar(&exeConf.evmTracesGCPBucket, "evm-traces-gcp-bucket", "", "define GCP bucket name used for uploading EVM traces, must be used in combination with --evm-tracing-enabled.") diff --git a/engine/execution/computation/metrics/collector.go b/engine/execution/computation/metrics/collector.go index b6a022bde5d..fa726aea464 100644 --- a/engine/execution/computation/metrics/collector.go +++ b/engine/execution/computation/metrics/collector.go @@ -97,19 +97,19 @@ func (c *collector) collect( // Pop returns the metrics for the given block at the given height // and clears all data up to the given height. -func (c *collector) Pop(height uint64, block flow.Identifier) []TransactionExecutionMetrics { +func (c *collector) Pop(height uint64, blockID flow.Identifier) []TransactionExecutionMetrics { c.mu.Lock() defer c.mu.Unlock() if height <= c.latestHeight && c.latestHeight != 0 { c.log.Warn(). Uint64("height", height). - Stringer("block", block). - Msg("requested metrics for a block that is older or equal than the most recent block") + Stringer("blockID", blockID). + Msg("requested metrics for a blockID that is older or equal than the most recent blockID") return nil } - metrics := c.metrics[block] + metrics := c.metrics[blockID] c.advanceTo(height) diff --git a/engine/execution/computation/metrics/collector_test.go b/engine/execution/computation/metrics/collector_test.go index 3f103c9638a..14882c5f1c0 100644 --- a/engine/execution/computation/metrics/collector_test.go +++ b/engine/execution/computation/metrics/collector_test.go @@ -27,9 +27,9 @@ func Test_CollectorPopOnEmpty(t *testing.T) { func Test_CollectorCollection(t *testing.T) { log := zerolog.New(zerolog.NewTestWriter(t)) - latestHeight := uint64(100) + startHeight := uint64(100) - collector := newCollector(log, latestHeight) + collector := newCollector(log, startHeight) ctx := context.Background() go func() { @@ -40,21 +40,26 @@ func Test_CollectorCollection(t *testing.T) { wg := sync.WaitGroup{} wg.Add(16 * 16 * 16) - for h := 0; h < 16; h++ { - for b := 0; b < 16; b++ { - for t := 0; t < 16; t++ { + for height := 0; height < 16; height++ { + // for each height we add multiple blocks. Only one block will be popped per height + for block := 0; block < 16; block++ { + // for each block we add multiple transactions + for transaction := 0; transaction < 16; transaction++ { go func(h, b, t int) { defer wg.Done() block := flow.Identifier{} - // 4 different blocks per height block[0] = byte(h) block[1] = byte(b) - collector.Collect(block, latestHeight+1+uint64(h), TransactionExecutionMetrics{ - ExecutionTime: time.Duration(b + t), - }) - }(h, b, t) + collector.Collect( + block, + startHeight+1+uint64(h), + TransactionExecutionMetrics{ + ExecutionTime: time.Duration(b + t), + }, + ) + }(height, block, transaction) } // wait a bit for the collector to process the data <-time.After(1 * time.Millisecond) @@ -65,18 +70,29 @@ func Test_CollectorCollection(t *testing.T) { // wait a bit for the collector to process the data <-time.After(10 * time.Millisecond) - for h := 0; h < 16; h++ { + // there should be no data at the start height + data := collector.Pop(startHeight, flow.ZeroID) + require.Nil(t, data) + + for height := 0; height < 16; height++ { block := flow.Identifier{} - block[0] = byte(h) + block[0] = byte(height) + // always pop the first block each height + block[1] = byte(0) - data := collector.Pop(latestHeight+1+uint64(h), block) + data := collector.Pop(startHeight+1+uint64(height), block) require.Len(t, data, 16) } - data := collector.Pop(latestHeight, flow.ZeroID) + block := flow.Identifier{} + block[0] = byte(15) + block[1] = byte(1) + // height 16 was already popped so there should be no more data for any blocks + data = collector.Pop(startHeight+16, block) require.Nil(t, data) - data = collector.Pop(latestHeight+17, flow.ZeroID) + // there should be no data past the last collected height + data = collector.Pop(startHeight+17, flow.ZeroID) require.Nil(t, data) } diff --git a/engine/execution/computation/metrics/provider.go b/engine/execution/computation/metrics/provider.go index 57253f18771..94feaf29020 100644 --- a/engine/execution/computation/metrics/provider.go +++ b/engine/execution/computation/metrics/provider.go @@ -1,7 +1,6 @@ package metrics import ( - "fmt" "sync" "github.com/rs/zerolog" @@ -12,7 +11,7 @@ import ( type provider struct { log zerolog.Logger - mu *sync.RWMutex + mu sync.RWMutex bufferSize uint bufferIndex uint @@ -31,7 +30,6 @@ func newProvider( } return &provider{ - mu: &sync.RWMutex{}, log: log, bufferSize: bufferSize, blockHeightAtBufferIndex: blockHeightAtBufferIndex, @@ -86,10 +84,9 @@ func (p *provider) GetTransactionExecutionMetricsAfter(height uint64) (GetTransa } // start index is the lowest block height that is in the buffer + // missing heights are handled below startIndex := uint64(0) - if p.blockHeightAtBufferIndex < uint64(p.bufferSize) { - startIndex = 0 - } else { + if p.blockHeightAtBufferIndex > uint64(p.bufferSize) { startIndex = p.blockHeightAtBufferIndex - uint64(p.bufferSize) } @@ -100,12 +97,17 @@ func (p *provider) GetTransactionExecutionMetricsAfter(height uint64) (GetTransa } for i := startIndex; i <= p.blockHeightAtBufferIndex; i++ { - // 0 <= diff + // 0 <= diff; because of the bufferSize check above diff := uint(p.blockHeightAtBufferIndex - i) - // 0 <= diff < bufferSize - // we add bufferSize to avoid negative values - index := (p.bufferIndex + (p.bufferSize - diff)) % p.bufferSize + // 0 <= diff < bufferSize; because of the bufferSize check above + // we are about to do a modulo operation with p.bufferSize on p.bufferIndex - diff, but diff could + // be larger than p.bufferIndex, which would result in a negative intermediate value. + // To avoid this, we add p.bufferSize to diff, which will guarantee that (p.bufferSize + p.bufferIndex - diff) + // is always positive, but the modulo operation will still return the same index. + intermediateIndex := p.bufferIndex + p.bufferSize - diff + index := intermediateIndex % p.bufferSize + d := p.buffer[index] if len(d) == 0 { continue @@ -116,5 +118,3 @@ func (p *provider) GetTransactionExecutionMetricsAfter(height uint64) (GetTransa return data, nil } - -var NoTransactionExecutionMetricsError = fmt.Errorf("no transaction execution metrics available") diff --git a/engine/execution/computation/metrics/transaction_execution_metrics.go b/engine/execution/computation/metrics/transaction_execution_metrics.go index 7e5fe207bad..a4500829a62 100644 --- a/engine/execution/computation/metrics/transaction_execution_metrics.go +++ b/engine/execution/computation/metrics/transaction_execution_metrics.go @@ -3,6 +3,8 @@ package metrics import ( "time" + "github.com/onflow/flow-go/engine" + cadenceCommon "github.com/onflow/cadence/runtime/common" "github.com/rs/zerolog" @@ -71,9 +73,9 @@ type transactionExecutionMetricsProvider struct { log zerolog.Logger - executionState state.FinalizedExecutionState - headers storage.Headers - blockFinalized chan struct{} + executionState state.FinalizedExecutionState + headers storage.Headers + blockFinalizedNotifier engine.Notifier latestFinalizedAndExecuted *flow.Header } @@ -98,7 +100,7 @@ func NewTransactionExecutionMetricsProvider( log: log, executionState: executionState, headers: headers, - blockFinalized: make(chan struct{}), + blockFinalizedNotifier: engine.NewNotifier(), latestFinalizedAndExecuted: latestFinalizedBlock, } @@ -112,11 +114,7 @@ func NewTransactionExecutionMetricsProvider( } func (p *transactionExecutionMetricsProvider) BlockFinalized(*flow.Header) { - // only handle a single finalized event at a time - select { - case p.blockFinalized <- struct{}{}: - default: - } + p.blockFinalizedNotifier.Notify() } // move data from the collector to the provider @@ -135,7 +133,7 @@ func (p *transactionExecutionMetricsProvider) blockFinalizedWorker( select { case <-ctx.Done(): return - case <-p.blockFinalized: + case <-p.blockFinalizedNotifier.Channel(): p.onExecutedAndFinalized() } } From fcd3e4619a89094b7e302b5c4a6fe8e710400bf4 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Wed, 7 Aug 2024 20:11:54 +0200 Subject: [PATCH 05/18] add sorting to transaction metrics response --- engine/execution/rpc/engine.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/engine/execution/rpc/engine.go b/engine/execution/rpc/engine.go index 39e1e3b5957..9d7a9f87fc4 100644 --- a/engine/execution/rpc/engine.go +++ b/engine/execution/rpc/engine.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net" + "sort" "strings" "unicode/utf8" @@ -850,6 +851,11 @@ func (h *handler) GetTransactionExecutionMetricsAfter( response.Results = append(response.Results, blockResponse) } + // sort the response by block height in descending order + sort.Slice(response.Results, func(i, j int) bool { + return response.Results[i].BlockHeight > response.Results[j].BlockHeight + }) + return response, nil } From c31b6b58132f2f1576e20c2513170abb3b78bb04 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Wed, 7 Aug 2024 20:32:03 +0200 Subject: [PATCH 06/18] update dependencies --- go.mod | 2 +- go.sum | 4 ++-- insecure/go.mod | 2 +- insecure/go.sum | 4 ++-- integration/go.mod | 2 +- integration/go.sum | 4 ++-- 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 77af77773ae..211e596d52b 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,7 @@ require ( github.com/onflow/flow-core-contracts/lib/go/contracts v1.3.1 github.com/onflow/flow-core-contracts/lib/go/templates v1.3.1 github.com/onflow/flow-go-sdk v1.0.0-preview.45 - github.com/onflow/flow/protobuf/go/flow v0.4.5 + github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240807181630-cbebf11253ae github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pierrec/lz4 v2.6.1+incompatible github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 1ec823ee51f..dcde49eff08 100644 --- a/go.sum +++ b/go.sum @@ -2190,8 +2190,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0= github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= -github.com/onflow/flow/protobuf/go/flow v0.4.5 h1:6o+pgYGqwXdEhqSJxu2BdnDXkOQVOkfGAb6IiXB+NPM= -github.com/onflow/flow/protobuf/go/flow v0.4.5/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240807181630-cbebf11253ae h1:pUq9wUoVF8jSkIiLLPoTnaU+VBOypc+22QKfeoFkK00= +github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240807181630-cbebf11253ae/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ds-pebble v0.0.0-20240731130313-f186539f382c h1:T0jDCm7k7uqDo26JiiujQ5oryl30itPnlmZQywTu9ng= github.com/onflow/go-ds-pebble v0.0.0-20240731130313-f186539f382c/go.mod h1:XYnWtulwJvHVOr2B0WVA/UC3dvRgFevjp8Pn9a3E1xo= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= diff --git a/insecure/go.mod b/insecure/go.mod index 49176c4d828..eca925d6a5f 100644 --- a/insecure/go.mod +++ b/insecure/go.mod @@ -210,7 +210,7 @@ require ( github.com/onflow/flow-go-sdk v1.0.0-preview.45 // indirect github.com/onflow/flow-nft/lib/go/contracts v1.2.1 // indirect github.com/onflow/flow-nft/lib/go/templates v1.2.0 // indirect - github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240722184235-2bd609e53ea4 // indirect + github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240807181630-cbebf11253ae // indirect github.com/onflow/go-ethereum v1.14.7 // indirect github.com/onflow/sdks v0.6.0-preview.1 // indirect github.com/onflow/wal v1.0.2 // indirect diff --git a/insecure/go.sum b/insecure/go.sum index be84a925211..d33a38d0730 100644 --- a/insecure/go.sum +++ b/insecure/go.sum @@ -2178,8 +2178,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0= github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= -github.com/onflow/flow/protobuf/go/flow v0.4.5 h1:6o+pgYGqwXdEhqSJxu2BdnDXkOQVOkfGAb6IiXB+NPM= -github.com/onflow/flow/protobuf/go/flow v0.4.5/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240807181630-cbebf11253ae h1:pUq9wUoVF8jSkIiLLPoTnaU+VBOypc+22QKfeoFkK00= +github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240807181630-cbebf11253ae/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ds-pebble v0.0.0-20240731130313-f186539f382c h1:T0jDCm7k7uqDo26JiiujQ5oryl30itPnlmZQywTu9ng= github.com/onflow/go-ds-pebble v0.0.0-20240731130313-f186539f382c/go.mod h1:XYnWtulwJvHVOr2B0WVA/UC3dvRgFevjp8Pn9a3E1xo= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= diff --git a/integration/go.mod b/integration/go.mod index 26f6c409038..5c1d9ab9af1 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -28,7 +28,7 @@ require ( github.com/onflow/flow-go v0.36.8-0.20240729193633-433a32eeb0cd github.com/onflow/flow-go-sdk v1.0.0-preview.45 github.com/onflow/flow-go/insecure v0.0.0-00010101000000-000000000000 - github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240722184235-2bd609e53ea4 + github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240807181630-cbebf11253ae github.com/onflow/go-ethereum v1.14.7 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_model v0.5.0 diff --git a/integration/go.sum b/integration/go.sum index e73a0221eb5..b6b7280c596 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -2164,8 +2164,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0= github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= -github.com/onflow/flow/protobuf/go/flow v0.4.5 h1:6o+pgYGqwXdEhqSJxu2BdnDXkOQVOkfGAb6IiXB+NPM= -github.com/onflow/flow/protobuf/go/flow v0.4.5/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240807181630-cbebf11253ae h1:pUq9wUoVF8jSkIiLLPoTnaU+VBOypc+22QKfeoFkK00= +github.com/onflow/flow/protobuf/go/flow v0.4.6-0.20240807181630-cbebf11253ae/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ds-pebble v0.0.0-20240731130313-f186539f382c h1:T0jDCm7k7uqDo26JiiujQ5oryl30itPnlmZQywTu9ng= github.com/onflow/go-ds-pebble v0.0.0-20240731130313-f186539f382c/go.mod h1:XYnWtulwJvHVOr2B0WVA/UC3dvRgFevjp8Pn9a3E1xo= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= From e598efd8e6e4b70fc94523c89307c1edd2c28778 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Mon, 12 Aug 2024 18:54:46 +0200 Subject: [PATCH 07/18] address review comments --- cmd/execution_builder.go | 14 +---- cmd/execution_config.go | 62 ++++++++++--------- .../computation/metrics/collector.go | 29 +++++---- .../execution/computation/metrics/provider.go | 4 +- .../computation/metrics/provider_test.go | 1 + module/metrics/execution.go | 10 --- 6 files changed, 52 insertions(+), 68 deletions(-) diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 21575602029..007f1c5a99c 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -1150,22 +1150,14 @@ func (exeNode *ExecutionNode) LoadScriptsEngine(node *NodeConfig) (module.ReadyD func (exeNode *ExecutionNode) LoadTransactionExecutionMetrics( node *NodeConfig, ) (module.ReadyDoneAware, error) { - latestFinalizedBlock, err := node.State.Final().Head() - if err != nil { - return nil, fmt.Errorf("could not get latest finalized block: %w", err) - } - - // buffer size is the number of blocks that are kept in memory by the metrics provider - // If the size is to small the clients might not have the opportunity to get the metrics for all blocks - // If the size is too large the memory usage will increase - bufferSize := uint(200) + lastFinalizedHeader := node.LastFinalizedHeader metricsProvider := txmetrics.NewTransactionExecutionMetricsProvider( node.Logger, exeNode.executionState, node.Storage.Headers, - latestFinalizedBlock, - bufferSize, + lastFinalizedHeader, + exeNode.exeConf.transactionExecutionMetricsBufferSize, ) node.ProtocolEvents.AddConsumer(metricsProvider) diff --git a/cmd/execution_config.go b/cmd/execution_config.go index b979e46902b..707028343ed 100644 --- a/cmd/execution_config.go +++ b/cmd/execution_config.go @@ -25,36 +25,37 @@ import ( // ExecutionConfig contains the configs for starting up execution nodes type ExecutionConfig struct { - rpcConf rpc.Config - triedir string - executionDataDir string - registerDir string - mTrieCacheSize uint32 - transactionResultsCacheSize uint - checkpointDistance uint - checkpointsToKeep uint - chunkDataPackDir string - chunkDataPackCacheSize uint - chunkDataPackRequestsCacheSize uint32 - requestInterval time.Duration - extensiveLog bool - pauseExecution bool - chunkDataPackQueryTimeout time.Duration - chunkDataPackDeliveryTimeout time.Duration - enableBlockDataUpload bool - gcpBucketName string - s3BucketName string - apiRatelimits map[string]int - apiBurstlimits map[string]int - executionDataAllowedPeers string - executionDataPrunerHeightRangeTarget uint64 - executionDataPrunerThreshold uint64 - blobstoreRateLimit int - blobstoreBurstLimit int - chunkDataPackRequestWorkers uint - maxGracefulStopDuration time.Duration - importCheckpointWorkerCount int - transactionExecutionMetricsEnabled bool + rpcConf rpc.Config + triedir string + executionDataDir string + registerDir string + mTrieCacheSize uint32 + transactionResultsCacheSize uint + checkpointDistance uint + checkpointsToKeep uint + chunkDataPackDir string + chunkDataPackCacheSize uint + chunkDataPackRequestsCacheSize uint32 + requestInterval time.Duration + extensiveLog bool + pauseExecution bool + chunkDataPackQueryTimeout time.Duration + chunkDataPackDeliveryTimeout time.Duration + enableBlockDataUpload bool + gcpBucketName string + s3BucketName string + apiRatelimits map[string]int + apiBurstlimits map[string]int + executionDataAllowedPeers string + executionDataPrunerHeightRangeTarget uint64 + executionDataPrunerThreshold uint64 + blobstoreRateLimit int + blobstoreBurstLimit int + chunkDataPackRequestWorkers uint + maxGracefulStopDuration time.Duration + importCheckpointWorkerCount int + transactionExecutionMetricsEnabled bool + transactionExecutionMetricsBufferSize uint // evm tracing configuration evmTracingEnabled bool @@ -124,6 +125,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) { flags.DurationVar(&exeConf.maxGracefulStopDuration, "max-graceful-stop-duration", stop.DefaultMaxGracefulStopDuration, "the maximum amount of time stop control will wait for ingestion engine to gracefully shutdown before crashing") flags.IntVar(&exeConf.importCheckpointWorkerCount, "import-checkpoint-worker-count", 10, "number of workers to import checkpoint file during bootstrap") flags.BoolVar(&exeConf.transactionExecutionMetricsEnabled, "tx-execution-metrics", true, "enable collection of transaction execution metrics") + flags.UintVar(&exeConf.transactionExecutionMetricsBufferSize, "tx-execution-metrics-buffer-size", 200, "buffer size for transaction execution metrics. The buffer size is the number of blocks that are kept in memory by the metrics provider engine") flags.BoolVar(&exeConf.evmTracingEnabled, "evm-tracing-enabled", false, "enable EVM tracing, when set it will generate traces and upload them to the GCP bucket provided by the --evm-traces-gcp-bucket. Warning: this might affect speed of execution") flags.StringVar(&exeConf.evmTracesGCPBucket, "evm-traces-gcp-bucket", "", "define GCP bucket name used for uploading EVM traces, must be used in combination with --evm-tracing-enabled.") diff --git a/engine/execution/computation/metrics/collector.go b/engine/execution/computation/metrics/collector.go index fa726aea464..5d4b6b6c0cd 100644 --- a/engine/execution/computation/metrics/collector.go +++ b/engine/execution/computation/metrics/collector.go @@ -15,23 +15,22 @@ type collector struct { collection chan metrics - mu *sync.Mutex + mu sync.Mutex - latestHeight uint64 - blocksAtHeight map[uint64]map[flow.Identifier]struct{} - metrics map[flow.Identifier][]TransactionExecutionMetrics + lowestAvailableHeight uint64 + blocksAtHeight map[uint64]map[flow.Identifier]struct{} + metrics map[flow.Identifier][]TransactionExecutionMetrics } func newCollector( log zerolog.Logger, - latestHeight uint64, + lowestAvailableHeight uint64, ) *collector { return &collector{ - log: log, - latestHeight: latestHeight, + log: log, + lowestAvailableHeight: lowestAvailableHeight, collection: make(chan metrics, 1000), - mu: &sync.Mutex{}, blocksAtHeight: make(map[uint64]map[flow.Identifier]struct{}), metrics: make(map[flow.Identifier][]TransactionExecutionMetrics), } @@ -80,10 +79,10 @@ func (c *collector) collect( c.mu.Lock() defer c.mu.Unlock() - if blockHeight <= c.latestHeight { + if blockHeight <= c.lowestAvailableHeight { c.log.Warn(). Uint64("height", blockHeight). - Uint64("latestHeight", c.latestHeight). + Uint64("lowestAvailableHeight", c.lowestAvailableHeight). Msg("received metrics for a block that is older or equal than the most recent block") return } @@ -101,7 +100,7 @@ func (c *collector) Pop(height uint64, blockID flow.Identifier) []TransactionExe c.mu.Lock() defer c.mu.Unlock() - if height <= c.latestHeight && c.latestHeight != 0 { + if height <= c.lowestAvailableHeight && c.lowestAvailableHeight != 0 { c.log.Warn(). Uint64("height", height). Stringer("blockID", blockID). @@ -119,12 +118,12 @@ func (c *collector) Pop(height uint64, blockID flow.Identifier) []TransactionExe // advanceTo moves the latest height to the given height // all data at lower heights will be deleted func (c *collector) advanceTo(height uint64) { - for c.latestHeight < height { - c.latestHeight++ - blocks := c.blocksAtHeight[c.latestHeight] + for c.lowestAvailableHeight < height { + c.lowestAvailableHeight++ + blocks := c.blocksAtHeight[c.lowestAvailableHeight] for block := range blocks { delete(c.metrics, block) } - delete(c.blocksAtHeight, c.latestHeight) + delete(c.blocksAtHeight, c.lowestAvailableHeight) } } diff --git a/engine/execution/computation/metrics/provider.go b/engine/execution/computation/metrics/provider.go index 94feaf29020..27aabe0376c 100644 --- a/engine/execution/computation/metrics/provider.go +++ b/engine/execution/computation/metrics/provider.go @@ -48,14 +48,14 @@ func (p *provider) Push( if height <= p.blockHeightAtBufferIndex { p.log.Warn(). Uint64("height", height). - Uint64("latestHeight", p.blockHeightAtBufferIndex). + Uint64("lowestAvailableHeight", p.blockHeightAtBufferIndex). Msg("received metrics for a block that is older or equal than the most recent block") return } if height > p.blockHeightAtBufferIndex+1 { p.log.Warn(). Uint64("height", height). - Uint64("latestHeight", p.blockHeightAtBufferIndex). + Uint64("lowestAvailableHeight", p.blockHeightAtBufferIndex). Msg("received metrics for a block that is not the next block") // Fill in the gap with nil diff --git a/engine/execution/computation/metrics/provider_test.go b/engine/execution/computation/metrics/provider_test.go index 931aa42dd8d..152c9b45326 100644 --- a/engine/execution/computation/metrics/provider_test.go +++ b/engine/execution/computation/metrics/provider_test.go @@ -104,5 +104,6 @@ func Test_ProviderPushOutOfSequence(t *testing.T) { require.Len(t, res, int(bufferSize)) + require.Nil(t, res[newHeight+1]) require.Equal(t, time.Duration(newHeight+2), res[newHeight+2][0].ExecutionTime) } diff --git a/module/metrics/execution.go b/module/metrics/execution.go index cf110a0918d..37d113061b7 100644 --- a/module/metrics/execution.go +++ b/module/metrics/execution.go @@ -1099,16 +1099,6 @@ func (ec *ExecutionCollector) WithTransactionCallback( module.TransactionExecutionResultInfo, ), ) *ExecutionCollectorWithTransactionCallback { - // if callback is nil, use a no-op callback - if callback == nil { - callback = func( - time.Duration, - module.TransactionExecutionResultStats, - module.TransactionExecutionResultInfo, - ) { - } - } - return &ExecutionCollectorWithTransactionCallback{ ExecutionCollector: ec, TransactionCallback: callback, From 1f94baf04722dd9acd763520ce0b2e98888ebcc6 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Mon, 26 Aug 2024 17:45:02 +0200 Subject: [PATCH 08/18] address review comments --- cmd/execution_builder.go | 2 +- .../computation/metrics/collector.go | 15 +++++---- .../execution/computation/metrics/provider.go | 19 +++++------ .../metrics/transaction_execution_metrics.go | 32 +++++++++---------- 4 files changed, 35 insertions(+), 33 deletions(-) diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 007f1c5a99c..297471b5536 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -1156,7 +1156,7 @@ func (exeNode *ExecutionNode) LoadTransactionExecutionMetrics( node.Logger, exeNode.executionState, node.Storage.Headers, - lastFinalizedHeader, + lastFinalizedHeader.Height, exeNode.exeConf.transactionExecutionMetricsBufferSize, ) diff --git a/engine/execution/computation/metrics/collector.go b/engine/execution/computation/metrics/collector.go index 5d4b6b6c0cd..8f3438d4658 100644 --- a/engine/execution/computation/metrics/collector.go +++ b/engine/execution/computation/metrics/collector.go @@ -94,21 +94,22 @@ func (c *collector) collect( c.metrics[blockId] = append(c.metrics[blockId], t) } -// Pop returns the metrics for the given block at the given height +// Pop returns the metrics for the given finalized block at the given height // and clears all data up to the given height. -func (c *collector) Pop(height uint64, blockID flow.Identifier) []TransactionExecutionMetrics { +func (c *collector) Pop(height uint64, finalizedBlockId flow.Identifier) []TransactionExecutionMetrics { c.mu.Lock() defer c.mu.Unlock() - if height <= c.lowestAvailableHeight && c.lowestAvailableHeight != 0 { + if height <= c.lowestAvailableHeight { c.log.Warn(). Uint64("height", height). - Stringer("blockID", blockID). - Msg("requested metrics for a blockID that is older or equal than the most recent blockID") + Stringer("finalizedBlockId", finalizedBlockId). + Msg("requested metrics for a finalizedBlockId that is older or equal than the most recent finalizedBlockId") return nil } - metrics := c.metrics[blockID] + // only return metrics for finalized block + metrics := c.metrics[finalizedBlockId] c.advanceTo(height) @@ -119,11 +120,11 @@ func (c *collector) Pop(height uint64, blockID flow.Identifier) []TransactionExe // all data at lower heights will be deleted func (c *collector) advanceTo(height uint64) { for c.lowestAvailableHeight < height { - c.lowestAvailableHeight++ blocks := c.blocksAtHeight[c.lowestAvailableHeight] for block := range blocks { delete(c.metrics, block) } delete(c.blocksAtHeight, c.lowestAvailableHeight) + c.lowestAvailableHeight++ } } diff --git a/engine/execution/computation/metrics/provider.go b/engine/execution/computation/metrics/provider.go index 27aabe0376c..39b483fc531 100644 --- a/engine/execution/computation/metrics/provider.go +++ b/engine/execution/computation/metrics/provider.go @@ -48,14 +48,14 @@ func (p *provider) Push( if height <= p.blockHeightAtBufferIndex { p.log.Warn(). Uint64("height", height). - Uint64("lowestAvailableHeight", p.blockHeightAtBufferIndex). + Uint64("blockHeightAtBufferIndex", p.blockHeightAtBufferIndex). Msg("received metrics for a block that is older or equal than the most recent block") return } if height > p.blockHeightAtBufferIndex+1 { p.log.Warn(). Uint64("height", height). - Uint64("lowestAvailableHeight", p.blockHeightAtBufferIndex). + Uint64("blockHeightAtBufferIndex", p.blockHeightAtBufferIndex). Msg("received metrics for a block that is not the next block") // Fill in the gap with nil @@ -85,20 +85,21 @@ func (p *provider) GetTransactionExecutionMetricsAfter(height uint64) (GetTransa // start index is the lowest block height that is in the buffer // missing heights are handled below - startIndex := uint64(0) + startHeight := uint64(0) + // assign startHeight with the lowest buffered height if p.blockHeightAtBufferIndex > uint64(p.bufferSize) { - startIndex = p.blockHeightAtBufferIndex - uint64(p.bufferSize) + startHeight = p.blockHeightAtBufferIndex - uint64(p.bufferSize) } // if the starting index is lower than the height we only need to return the data for // the blocks that are later than the given height - if height+1 > startIndex { - startIndex = height + 1 + if height+1 > startHeight { + startHeight = height + 1 } - for i := startIndex; i <= p.blockHeightAtBufferIndex; i++ { + for h := startHeight; h <= p.blockHeightAtBufferIndex; h++ { // 0 <= diff; because of the bufferSize check above - diff := uint(p.blockHeightAtBufferIndex - i) + diff := uint(p.blockHeightAtBufferIndex - h) // 0 <= diff < bufferSize; because of the bufferSize check above // we are about to do a modulo operation with p.bufferSize on p.bufferIndex - diff, but diff could @@ -113,7 +114,7 @@ func (p *provider) GetTransactionExecutionMetricsAfter(height uint64) (GetTransa continue } - data[i] = p.buffer[index] + data[h] = p.buffer[index] } return data, nil diff --git a/engine/execution/computation/metrics/transaction_execution_metrics.go b/engine/execution/computation/metrics/transaction_execution_metrics.go index a4500829a62..1c082049327 100644 --- a/engine/execution/computation/metrics/transaction_execution_metrics.go +++ b/engine/execution/computation/metrics/transaction_execution_metrics.go @@ -77,7 +77,7 @@ type transactionExecutionMetricsProvider struct { headers storage.Headers blockFinalizedNotifier engine.Notifier - latestFinalizedAndExecuted *flow.Header + latestFinalizedAndExecutedHeight uint64 } var _ TransactionExecutionMetricsProvider = (*transactionExecutionMetricsProvider)(nil) @@ -86,22 +86,22 @@ func NewTransactionExecutionMetricsProvider( log zerolog.Logger, executionState state.FinalizedExecutionState, headers storage.Headers, - latestFinalizedBlock *flow.Header, + latestFinalizedAndExecutedHeight uint64, bufferSize uint, ) TransactionExecutionMetricsProvider { log = log.With().Str("component", "transaction_execution_metrics_provider").Logger() - collector := newCollector(log, latestFinalizedBlock.Height) - provider := newProvider(log, bufferSize, latestFinalizedBlock.Height) + collector := newCollector(log, latestFinalizedAndExecutedHeight) + provider := newProvider(log, bufferSize, latestFinalizedAndExecutedHeight) p := &transactionExecutionMetricsProvider{ - collector: collector, - provider: provider, - log: log, - executionState: executionState, - headers: headers, - blockFinalizedNotifier: engine.NewNotifier(), - latestFinalizedAndExecuted: latestFinalizedBlock, + collector: collector, + provider: provider, + log: log, + executionState: executionState, + headers: headers, + blockFinalizedNotifier: engine.NewNotifier(), + latestFinalizedAndExecutedHeight: latestFinalizedAndExecutedHeight, } cm := component.NewComponentManagerBuilder() @@ -149,8 +149,8 @@ func (p *transactionExecutionMetricsProvider) onExecutedAndFinalized() { // the latest finalized and executed block could be more than one block further than the last one handled // step through all blocks between the last one handled and the latest finalized and executed - for height := p.latestFinalizedAndExecuted.Height + 1; height <= latestFinalizedAndExecutedHeight; height++ { - header, err := p.headers.ByHeight(height) + for height := p.latestFinalizedAndExecutedHeight + 1; height <= latestFinalizedAndExecutedHeight; height++ { + blockID, err := p.headers.BlockIDByHeight(height) if err != nil { p.log.Warn(). Err(err). @@ -159,10 +159,10 @@ func (p *transactionExecutionMetricsProvider) onExecutedAndFinalized() { return } - p.onBlockExecutedAndFinalized(header.ID(), height) + p.onBlockExecutedAndFinalized(blockID, height) - if header.Height == latestFinalizedAndExecutedHeight { - p.latestFinalizedAndExecuted = header + if height == latestFinalizedAndExecutedHeight { + p.latestFinalizedAndExecutedHeight = height } } } From 0b7bdcf11c6b5dd01b889f40205767ef8b0edf86 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Mon, 2 Sep 2024 16:29:52 -0500 Subject: [PATCH 09/18] Support EVM registers in check-storage --- cmd/util/cmd/check-storage/cmd.go | 45 +- .../evm_account_storage_health.go | 498 ++++++++++++++++++ .../evm_account_storage_health_test.go | 153 ++++++ 3 files changed, 674 insertions(+), 22 deletions(-) create mode 100644 cmd/util/cmd/check-storage/evm_account_storage_health.go create mode 100644 cmd/util/cmd/check-storage/evm_account_storage_health_test.go diff --git a/cmd/util/cmd/check-storage/cmd.go b/cmd/util/cmd/check-storage/cmd.go index 5727d7a51d5..94d739d2768 100644 --- a/cmd/util/cmd/check-storage/cmd.go +++ b/cmd/util/cmd/check-storage/cmd.go @@ -2,7 +2,6 @@ package check_storage import ( "context" - "slices" "github.com/rs/zerolog/log" "github.com/spf13/cobra" @@ -14,6 +13,7 @@ import ( "github.com/onflow/flow-go/cmd/util/ledger/reporters" "github.com/onflow/flow-go/cmd/util/ledger/util" "github.com/onflow/flow-go/cmd/util/ledger/util/registers" + "github.com/onflow/flow-go/fvm/evm/emulator/state" "github.com/onflow/flow-go/fvm/systemcontracts" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/model/flow" @@ -29,6 +29,14 @@ var ( flagNWorker int ) +var ( + evmAccount flow.Address + evmStorageIDKeys = []string{ + state.AccountsStorageIDKey, + state.CodesStorageIDKey, + } +) + var Cmd = &cobra.Command{ Use: "check-storage", Short: "Check storage health", @@ -102,13 +110,8 @@ func run(*cobra.Command, []string) { log.Fatal().Msg("--state-commitment must be provided when --state is provided") } - // For now, skip EVM storage account since a different decoder is needed for decoding EVM registers. - - systemContracts := systemcontracts.SystemContractsForChain(chainID) - - acctsToSkip := []string{ - flow.AddressToRegisterOwner(systemContracts.EVMStorage.Address), - } + // Get EVM account by chain + evmAccount = systemcontracts.SystemContractsForChain(chainID).EVMStorage.Address // Create report in JSONL format rw := reporters.NewReportFileWriterFactoryWithFormat(flagOutputDirectory, log.Logger, reporters.ReportFormatJSONL). @@ -161,13 +164,13 @@ func run(*cobra.Command, []string) { len(payloads), ) - failedAccountAddresses, err := checkStorageHealth(registersByAccount, flagNWorker, rw, acctsToSkip) + failedAccountAddresses, err := checkStorageHealth(registersByAccount, flagNWorker, rw) if err != nil { log.Fatal().Err(err).Msg("failed to check storage health") } if len(failedAccountAddresses) == 0 { - log.Info().Msgf("All %d accounts are health", accountCount) + log.Info().Msgf("All %d accounts are healthy", accountCount) return } @@ -188,7 +191,6 @@ func checkStorageHealth( registersByAccount *registers.ByAccount, nWorkers int, rw reporters.ReportWriter, - acctsToSkip []string, ) (failedAccountAddresses []string, err error) { accountCount := registersByAccount.AccountCount() @@ -211,10 +213,6 @@ func checkStorageHealth( func(accountRegisters *registers.AccountRegisters) error { defer logAccount(1) - if slices.Contains(acctsToSkip, accountRegisters.Owner()) { - return nil - } - accountStorageIssues := checkAccountStorageHealth(accountRegisters, nWorkers) if len(accountStorageIssues) > 0 { @@ -281,9 +279,6 @@ func checkStorageHealth( err = registersByAccount.ForEachAccount( func(accountRegisters *registers.AccountRegisters) error { - if slices.Contains(acctsToSkip, accountRegisters.Owner()) { - return nil - } jobs <- job{accountRegisters: accountRegisters} return nil }) @@ -318,6 +313,10 @@ func checkAccountStorageHealth(accountRegisters *registers.AccountRegisters, nWo }} } + if isEVMAccount(address) { + return checkEVMAccountStorageHealth(address, accountRegisters) + } + var issues []accountStorageIssue // Check atree storage health @@ -331,7 +330,7 @@ func checkAccountStorageHealth(accountRegisters *registers.AccountRegisters, nWo issues, accountStorageIssue{ Address: address.Hex(), - Kind: storageErrorKindString[atreeStorageErrorKind], + Kind: storageErrorKindString[cadenceAtreeStorageErrorKind], Msg: err.Error(), }) } @@ -345,12 +344,14 @@ type storageErrorKind int const ( otherErrorKind storageErrorKind = iota - atreeStorageErrorKind + cadenceAtreeStorageErrorKind + evmAtreeStorageErrorKind ) var storageErrorKindString = map[storageErrorKind]string{ - otherErrorKind: "error_check_storage_failed", - atreeStorageErrorKind: "error_atree_storage", + otherErrorKind: "error_check_storage_failed", + cadenceAtreeStorageErrorKind: "error_cadence_atree_storage", + evmAtreeStorageErrorKind: "error_evm_atree_storage", } type accountStorageIssue struct { diff --git a/cmd/util/cmd/check-storage/evm_account_storage_health.go b/cmd/util/cmd/check-storage/evm_account_storage_health.go new file mode 100644 index 00000000000..bc5608e93cc --- /dev/null +++ b/cmd/util/cmd/check-storage/evm_account_storage_health.go @@ -0,0 +1,498 @@ +package check_storage + +import ( + "bytes" + "cmp" + "fmt" + "slices" + + "golang.org/x/exp/maps" + + "github.com/onflow/atree" + + "github.com/onflow/cadence/runtime" + "github.com/onflow/cadence/runtime/common" + + "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/cmd/util/ledger/util/registers" + "github.com/onflow/flow-go/fvm/evm/emulator/state" + "github.com/onflow/flow-go/model/flow" +) + +var ( + compareSlabID = func(a, b atree.SlabID) int { + return a.Compare(b) + } + + equalSlabID = func(a, b atree.SlabID) bool { + return a.Compare(b) == 0 + } +) + +// checkEVMAccountStorageHealth checks storage health of cadence-atree +// registers and evm-atree registers in evm account. +func checkEVMAccountStorageHealth( + address common.Address, + accountRegisters *registers.AccountRegisters, +) []accountStorageIssue { + var issues []accountStorageIssue + + ledger := NewReadOnlyLedgerWithAtreeRegisterReadSet(accountRegisters) + + // Check health of cadence-atree registers. + issues = append( + issues, + checkCadenceAtreeRegistersInEVMAccount(address, ledger)..., + ) + + // Check health of evm-atree registers. + issues = append( + issues, + checkEVMAtreeRegistersInEVMAccount(address, ledger)..., + ) + + // Check unreferenced atree registers. + // If any atree registers are not accessed during health check of + // cadence-atree and evm-atree registers, these atree registers are + // unreferenced. + issues = append( + issues, + checkUnreferencedAtreeRegisters(address, ledger, accountRegisters)..., + ) + + return issues +} + +// checkCadenceAtreeRegistersInEVMAccount checks health of cadence-atree registers. +func checkCadenceAtreeRegistersInEVMAccount( + address common.Address, + ledger atree.Ledger, +) []accountStorageIssue { + var issues []accountStorageIssue + + storage := runtime.NewStorage(ledger, nil) + + // Load Cadence domains storage map, so atree slab iterator can traverse connected slabs from loaded root slab. + // NOTE: don't preload all atree slabs in evm account because evm-atree registers require evm-atree decoder. + + for _, domain := range util.StorageMapDomains { + _ = storage.GetStorageMap(address, domain, false) + } + + err := storage.CheckHealth() + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[cadenceAtreeStorageErrorKind], + Msg: fmt.Sprintf("cadence-atree registers health check failed in evm account: %s", err), + }) + } + + return issues +} + +// checkEVMAtreeRegistersInEVMAccount checks health of evm-atree registers. +func checkEVMAtreeRegistersInEVMAccount( + address common.Address, + ledger atree.Ledger, +) []accountStorageIssue { + var issues []accountStorageIssue + + baseStorage := atree.NewLedgerBaseStorage(ledger) + + storage, err := state.NewPersistentSlabStorage(baseStorage) + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to create atree.PersistentSlabStorage for evm registers: %s", err), + }) + return issues + } + + domainSlabIDs := make(map[string]atree.SlabID) + + // Load evm domain root slabs. + for _, domain := range evmStorageIDKeys { + rawDomainSlabID, err := ledger.GetValue(address[:], []byte(domain)) + if err != nil { + issues = append(issues, accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to get evm domain %s raw slab ID: %s", domain, err), + }) + continue + } + + if len(rawDomainSlabID) == 0 { + continue + } + + domainSlabID, err := atree.NewSlabIDFromRawBytes(rawDomainSlabID) + if err != nil { + issues = append(issues, accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to convert evm domain %s raw slab ID %x to atree slab ID: %s", domain, rawDomainSlabID, err), + }) + continue + } + + // Retrieve evm domain storage register so slab iterator can traverse connected slabs from root slab. + + _, found, err := storage.Retrieve(domainSlabID) + if err != nil { + issues = append(issues, accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to retrieve evm domain %s root slab %s: %s", domain, domainSlabID, err), + }) + continue + } + if !found { + issues = append(issues, accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to find evm domain %s root slab %s", domain, domainSlabID), + }) + continue + } + + domainSlabIDs[domain] = domainSlabID + } + + if len(domainSlabIDs) == 0 { + return issues + } + + // Get evm storage slot slab IDs. + storageSlotSlabIDs, storageSlotIssues := getStorageSlotRootSlabIDs( + address, + domainSlabIDs[state.AccountsStorageIDKey], + storage) + + issues = append(issues, storageSlotIssues...) + + // Load evm storage slot slabs so slab iterator can traverse connected slabs in storage health check. + for _, id := range storageSlotSlabIDs { + _, found, err := storage.Retrieve(id) + if err != nil { + issues = append(issues, accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to retrieve evm storage slot %s: %s", id, err), + }) + } + if !found { + issues = append(issues, accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to find evm storage slot %s", id), + }) + } + } + + // Expected root slabs include domain root slabs and storage slot root slabs. + + expectedRootSlabIDs := make([]atree.SlabID, 0, len(domainSlabIDs)+len(storageSlotSlabIDs)) + expectedRootSlabIDs = append(expectedRootSlabIDs, maps.Values(domainSlabIDs)...) + expectedRootSlabIDs = append(expectedRootSlabIDs, storageSlotSlabIDs...) + + issues = append( + issues, + // Check storage health of evm-atree registers + checkHealthWithExpectedRootSlabIDs(address, storage, expectedRootSlabIDs)..., + ) + + return issues +} + +// getStorageSlotRootSlabIDs returns evm storage slot root slab IDs. +func getStorageSlotRootSlabIDs( + address common.Address, + accountStorageRootSlabID atree.SlabID, + storage *atree.PersistentSlabStorage, +) ([]atree.SlabID, []accountStorageIssue) { + + if accountStorageRootSlabID == atree.SlabIDUndefined { + return nil, nil + } + + var issues []accountStorageIssue + + // Load account storage map + m, err := atree.NewMapWithRootID(storage, accountStorageRootSlabID, atree.NewDefaultDigesterBuilder()) + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to load evm storage slot %s: %s", accountStorageRootSlabID, err), + }) + return nil, issues + } + + storageSlotRootSlabIDs := make(map[atree.SlabID]struct{}) + + // Iterate accounts in account storage map to get storage slot collection ID. + err = m.IterateReadOnly(func(key, value atree.Value) (bool, error) { + // Check address type + acctAddress, ok := key.(state.ByteStringValue) + if !ok { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("expect evm account address as ByteStringValue, got %T", key), + }) + return true, nil + } + + // Check encoded account type + encodedAccount, ok := value.(state.ByteStringValue) + if !ok { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("expect evm account as ByteStringValue, got %T", key), + }) + return true, nil + } + + // Decode account + acct, err := state.DecodeAccount(encodedAccount.Bytes()) + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to decode account %x in evm account storage: %s", acctAddress.Bytes(), err), + }) + return true, nil + } + + storageSlotCollectionID := acct.CollectionID + + if len(storageSlotCollectionID) == 0 { + return true, nil + } + + storageSlotSlabID, err := atree.NewSlabIDFromRawBytes(storageSlotCollectionID) + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to convert storage slot collection ID %x to atree slab ID: %s", storageSlotCollectionID, err), + }) + return true, nil + } + + // Check storage slot is not double referenced. + if _, ok := storageSlotRootSlabIDs[storageSlotSlabID]; ok { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("found storage slot collection %x referenced by multiple accounts", storageSlotCollectionID), + }) + } + + storageSlotRootSlabIDs[storageSlotSlabID] = struct{}{} + + return true, nil + }) + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("failed to iterate EVM account storage map: %s", err), + }) + } + + return maps.Keys(storageSlotRootSlabIDs), nil +} + +// checkHealthWithExpectedRootSlabIDs checks atree registers in storage (loaded and connected registers). +func checkHealthWithExpectedRootSlabIDs( + address common.Address, + storage *atree.PersistentSlabStorage, + expectedRootSlabIDs []atree.SlabID, +) []accountStorageIssue { + var issues []accountStorageIssue + + // Check atree storage health + rootSlabIDSet, err := atree.CheckStorageHealth(storage, -1) + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("evm atree storage check failed: %s", err), + }) + return issues + } + + // Check if returned root slab IDs match expected root slab IDs. + + rootSlabIDs := maps.Keys(rootSlabIDSet) + slices.SortFunc(rootSlabIDs, compareSlabID) + + slices.SortFunc(expectedRootSlabIDs, compareSlabID) + + if !slices.EqualFunc(expectedRootSlabIDs, rootSlabIDs, equalSlabID) { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf("root slabs %v from storage health check != expected root slabs %v", rootSlabIDs, expectedRootSlabIDs), + }) + } + + return issues +} + +// checkUnreferencedAtreeRegisters checks if all atree registers in account has been read through ledger. +func checkUnreferencedAtreeRegisters( + address common.Address, + ledger *ReadOnlyLedgerWithAtreeRegisterReadSet, + accountRegisters *registers.AccountRegisters, +) []accountStorageIssue { + var issues []accountStorageIssue + + allAtreeRegisterIDs, err := getAtreeRegisterIDsFromRegisters(accountRegisters) + if err != nil { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[otherErrorKind], + Msg: fmt.Sprintf("failed to get atree register IDs from account registers: %s", err), + }) + return issues + } + + // Check for unreferenced atree slabs by verifing all atree slabs in accountRegisters are read + // during storage health check for evm-atree and cadence-atree registers. + + if ledger.GetAtreeRegisterReadCount() == len(allAtreeRegisterIDs) { + return issues + } + + if ledger.GetAtreeRegisterReadCount() > len(allAtreeRegisterIDs) { + issues = append( + issues, + accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[otherErrorKind], + Msg: fmt.Sprintf("%d atree registers was read > %d atree registers in evm account", + ledger.GetAtreeRegisterReadCount(), + len(allAtreeRegisterIDs)), + }) + return issues + } + + unreferencedAtreeRegisterIDs := make([]flow.RegisterID, 0, len(allAtreeRegisterIDs)-ledger.GetAtreeRegisterReadCount()) + + for _, id := range allAtreeRegisterIDs { + if !ledger.IsAtreeRegisterRead(id) { + unreferencedAtreeRegisterIDs = append(unreferencedAtreeRegisterIDs, id) + } + } + + slices.SortFunc(unreferencedAtreeRegisterIDs, func(a, b flow.RegisterID) int { + return cmp.Compare(a.Key, b.Key) + }) + + issues = append(issues, accountStorageIssue{ + Address: address.Hex(), + Kind: storageErrorKindString[evmAtreeStorageErrorKind], + Msg: fmt.Sprintf( + "number of read atree slabs %d != number of atree slabs in storage %d: unreferenced atree registers %v", + ledger.GetAtreeRegisterReadCount(), + len(allAtreeRegisterIDs), + unreferencedAtreeRegisterIDs, + ), + }) + + return issues +} + +func getAtreeRegisterIDsFromRegisters(registers registers.Registers) ([]flow.RegisterID, error) { + registerIDs := make([]flow.RegisterID, 0, registers.Count()) + + err := registers.ForEach(func(owner string, key string, _ []byte) error { + if !flow.IsSlabIndexKey(key) { + return nil + } + + registerIDs = append( + registerIDs, + flow.NewRegisterID(flow.BytesToAddress([]byte(owner)), key), + ) + + return nil + }) + if err != nil { + return nil, err + } + + return registerIDs, nil +} + +func isEVMAccount(owner common.Address) bool { + return bytes.Equal(owner[:], evmAccount[:]) +} + +type ReadOnlyLedgerWithAtreeRegisterReadSet struct { + *registers.ReadOnlyLedger + atreeRegistersReadSet map[flow.RegisterID]struct{} +} + +var _ atree.Ledger = &ReadOnlyLedgerWithAtreeRegisterReadSet{} + +func NewReadOnlyLedgerWithAtreeRegisterReadSet( + accountRegisters *registers.AccountRegisters, +) *ReadOnlyLedgerWithAtreeRegisterReadSet { + return &ReadOnlyLedgerWithAtreeRegisterReadSet{ + ReadOnlyLedger: ®isters.ReadOnlyLedger{Registers: accountRegisters}, + atreeRegistersReadSet: make(map[flow.RegisterID]struct{}), + } +} + +func (l *ReadOnlyLedgerWithAtreeRegisterReadSet) GetValue(address, key []byte) (value []byte, err error) { + value, err = l.ReadOnlyLedger.GetValue(address, key) + if err != nil { + return nil, err + } + + if flow.IsSlabIndexKey(string(key)) { + registerID := flow.NewRegisterID(flow.BytesToAddress(address), string(key)) + l.atreeRegistersReadSet[registerID] = struct{}{} + } + return value, nil +} + +func (l *ReadOnlyLedgerWithAtreeRegisterReadSet) GetAtreeRegisterReadCount() int { + return len(l.atreeRegistersReadSet) +} + +func (l *ReadOnlyLedgerWithAtreeRegisterReadSet) IsAtreeRegisterRead(id flow.RegisterID) bool { + _, ok := l.atreeRegistersReadSet[id] + return ok +} diff --git a/cmd/util/cmd/check-storage/evm_account_storage_health_test.go b/cmd/util/cmd/check-storage/evm_account_storage_health_test.go new file mode 100644 index 00000000000..babb96ea796 --- /dev/null +++ b/cmd/util/cmd/check-storage/evm_account_storage_health_test.go @@ -0,0 +1,153 @@ +package check_storage + +import ( + "strconv" + "testing" + + "github.com/holiman/uint256" + "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" + + "github.com/onflow/atree" + "github.com/onflow/cadence/runtime" + "github.com/onflow/cadence/runtime/common" + "github.com/onflow/cadence/runtime/interpreter" + gethCommon "github.com/onflow/go-ethereum/common" + + "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/cmd/util/ledger/util/registers" + "github.com/onflow/flow-go/fvm/evm/emulator/state" + "github.com/onflow/flow-go/fvm/evm/testutils" + "github.com/onflow/flow-go/fvm/evm/types" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/convert" + "github.com/onflow/flow-go/model/flow" +) + +func TestEVMAccountStorageHealth(t *testing.T) { + address := common.Address{1} + + t.Run("has storage slot", func(t *testing.T) { + led := createPayloadLedger() + + createEVMStorage(t, led, address) + + createCadenceStorage(t, led, address) + + payloads := maps.Values(led.Payloads) + + accountRegisters, err := registers.NewAccountRegistersFromPayloads(string(address[:]), payloads) + require.NoError(t, err) + + issues := checkEVMAccountStorageHealth( + address, + accountRegisters, + ) + require.Equal(t, 0, len(issues)) + }) + + t.Run("unreferenced slabs", func(t *testing.T) { + led := createPayloadLedger() + + createEVMStorage(t, led, address) + + createCadenceStorage(t, led, address) + + payloads := maps.Values(led.Payloads) + + // Add unreferenced slabs + slabIndex, err := led.AllocateSlabIndexFunc(address[:]) + require.NoError(t, err) + + registerID := flow.NewRegisterID( + flow.BytesToAddress(address[:]), + string(atree.SlabIndexToLedgerKey(slabIndex))) + + unreferencedPayload := ledger.NewPayload( + convert.RegisterIDToLedgerKey(registerID), + ledger.Value([]byte{1})) + + payloads = append(payloads, unreferencedPayload) + + accountRegisters, err := registers.NewAccountRegistersFromPayloads(string(address[:]), payloads) + require.NoError(t, err) + + issues := checkEVMAccountStorageHealth( + address, + accountRegisters, + ) + require.Equal(t, 1, len(issues)) + require.Equal(t, storageErrorKindString[evmAtreeStorageErrorKind], issues[0].Kind) + require.Contains(t, issues[0].Msg, "unreferenced atree registers") + }) +} + +func createEVMStorage(t *testing.T, ledger atree.Ledger, address common.Address) { + view, err := state.NewBaseView(ledger, flow.BytesToAddress(address[:])) + require.NoError(t, err) + + // Create an account without storage slot + addr1 := testutils.RandomCommonAddress(t) + + err = view.CreateAccount(addr1, uint256.NewInt(1), 2, []byte("ABC"), gethCommon.Hash{3, 4, 5}) + require.NoError(t, err) + + // Create an account with storage slot + addr2 := testutils.RandomCommonAddress(t) + + err = view.CreateAccount(addr2, uint256.NewInt(6), 7, []byte("DEF"), gethCommon.Hash{8, 9, 19}) + require.NoError(t, err) + + slot := types.SlotAddress{ + Address: addr2, + Key: testutils.RandomCommonHash(t), + } + + err = view.UpdateSlot(slot, testutils.RandomCommonHash(t)) + require.NoError(t, err) + + err = view.Commit() + require.NoError(t, err) +} + +func createCadenceStorage(t *testing.T, ledger atree.Ledger, address common.Address) { + storage := runtime.NewStorage(ledger, nil) + + inter, err := interpreter.NewInterpreter( + nil, + nil, + &interpreter.Config{ + Storage: storage, + }, + ) + require.NoError(t, err) + + // Create storage and public domains + for _, domain := range []string{"storage", "public"} { + storageDomain := storage.GetStorageMap(address, domain, true) + + // Create large domain map so there are more than one atree registers under the hood. + for i := 0; i < 100; i++ { + key := interpreter.StringStorageMapKey(domain + "_key_" + strconv.Itoa(i)) + value := interpreter.NewUnmeteredStringValue(domain + "_value_" + strconv.Itoa(i)) + storageDomain.SetValue(inter, key, value) + } + } + + // Commit domain data + err = storage.Commit(inter, false) + require.NoError(t, err) +} + +func createPayloadLedger() *util.PayloadsLedger { + nextSlabIndex := atree.SlabIndex{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1} + + return &util.PayloadsLedger{ + Payloads: make(map[flow.RegisterID]*ledger.Payload), + AllocateSlabIndexFunc: func([]byte) (atree.SlabIndex, error) { + var slabIndex atree.SlabIndex + slabIndex, nextSlabIndex = nextSlabIndex, nextSlabIndex.Next() + return slabIndex, nil + }, + } +} From f0886526ca58a9c513a09469de573f476691599e Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Wed, 4 Sep 2024 05:39:34 -0700 Subject: [PATCH 10/18] fix cluster assignment >2/3 internal check During refactoring in 2024-04, this check was modified to incorrectly use the wrong variable (not operating on only Collection Nodes). This commit uses the correct variable, and renames the filtered lists. --- cmd/util/cmd/common/clusters.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/cmd/util/cmd/common/clusters.go b/cmd/util/cmd/common/clusters.go index 6c77db7fbb3..5e5d8e894a4 100644 --- a/cmd/util/cmd/common/clusters.go +++ b/cmd/util/cmd/common/clusters.go @@ -38,9 +38,9 @@ import ( // - error: if any error occurs. Any error returned from this function is irrecoverable. func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes flow.IdentityList, numCollectionClusters int) (flow.AssignmentList, flow.ClusterList, error) { - partners := partnerNodes.Filter(filter.HasRole[flow.Identity](flow.RoleCollection)) - internals := internalNodes.Filter(filter.HasRole[flow.Identity](flow.RoleCollection)) - nCollectors := len(partners) + len(internals) + partnerCollectors := partnerNodes.Filter(filter.HasRole[flow.Identity](flow.RoleCollection)) + internalCollectors := internalNodes.Filter(filter.HasRole[flow.Identity](flow.RoleCollection)) + nCollectors := len(partnerCollectors) + len(internalCollectors) // ensure we have at least as many collection nodes as clusters if nCollectors < int(numCollectionClusters) { @@ -49,11 +49,11 @@ func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes } // shuffle both collector lists based on a non-deterministic algorithm - partners, err := partners.Shuffle() + partnerCollectors, err := partnerCollectors.Shuffle() if err != nil { log.Fatal().Err(err).Msg("could not shuffle partners") } - internals, err = internals.Shuffle() + internalCollectors, err = internalCollectors.Shuffle() if err != nil { log.Fatal().Err(err).Msg("could not shuffle internals") } @@ -63,7 +63,7 @@ func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes // to control strictly more than 2/3 of the cluster's total weight. // The following is a heuristic that distributes collectors round-robbin across the specified number of clusters. // This heuristic only works when all collectors have equal weight! The following sanity check enforces this: - if len(partnerNodes) > 0 && len(partnerNodes) > 2*len(internalNodes) { + if len(partnerCollectors) > 0 && len(partnerCollectors) > 2*len(internalCollectors) { return nil, nil, fmt.Errorf("requiring at least x>0 number of partner nodes and y > 2x number of internal nodes, but got x,y=%d,%d", len(partnerNodes), len(internalNodes)) } // sanity check ^ enforces that there is at least one internal node, hence `internalNodes[0].InitialWeight` is always a valid reference weight @@ -74,7 +74,7 @@ func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes constraint := make([]int, numCollectionClusters) // first, round-robin internal nodes into each cluster - for i, node := range internals { + for i, node := range internalCollectors { if node.InitialWeight != refWeight { return nil, nil, fmt.Errorf("current implementation requires all collectors (partner & interal nodes) to have equal weight") } @@ -84,7 +84,7 @@ func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes } // next, round-robin partner nodes into each cluster - for i, node := range partners { + for i, node := range partnerCollectors { if node.InitialWeight != refWeight { return nil, nil, fmt.Errorf("current implementation requires all collectors (partner & interal nodes) to have equal weight") } @@ -102,7 +102,7 @@ func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes assignments := assignment.FromIdentifierLists(identifierLists) - collectors := append(partners, internals...) + collectors := append(partnerCollectors, internalCollectors...) clusters, err := factory.NewClusterList(assignments, collectors.ToSkeleton()) if err != nil { log.Fatal().Err(err).Msg("could not create cluster list") From 466b2e6854d07b23409acc55c2e326d032b2ef67 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Wed, 4 Sep 2024 05:55:01 -0700 Subject: [PATCH 11/18] Update cmd/util/cmd/common/clusters.go --- cmd/util/cmd/common/clusters.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/util/cmd/common/clusters.go b/cmd/util/cmd/common/clusters.go index 5e5d8e894a4..9eecffc0a61 100644 --- a/cmd/util/cmd/common/clusters.go +++ b/cmd/util/cmd/common/clusters.go @@ -64,7 +64,7 @@ func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes // The following is a heuristic that distributes collectors round-robbin across the specified number of clusters. // This heuristic only works when all collectors have equal weight! The following sanity check enforces this: if len(partnerCollectors) > 0 && len(partnerCollectors) > 2*len(internalCollectors) { - return nil, nil, fmt.Errorf("requiring at least x>0 number of partner nodes and y > 2x number of internal nodes, but got x,y=%d,%d", len(partnerNodes), len(internalNodes)) + return nil, nil, fmt.Errorf("requiring at least x>0 number of partner collection nodes and y > 2x number of internal collection nodes, but got x,y=%d,%d", len(partnerNodes), len(internalNodes)) } // sanity check ^ enforces that there is at least one internal node, hence `internalNodes[0].InitialWeight` is always a valid reference weight refWeight := internalNodes[0].InitialWeight From 666fa16b8f895446139b11ffcf6503a3312473b3 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Wed, 4 Sep 2024 05:56:12 -0700 Subject: [PATCH 12/18] Apply suggestions from code review --- cmd/util/cmd/common/clusters.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/util/cmd/common/clusters.go b/cmd/util/cmd/common/clusters.go index 9eecffc0a61..379d1409bc8 100644 --- a/cmd/util/cmd/common/clusters.go +++ b/cmd/util/cmd/common/clusters.go @@ -64,10 +64,10 @@ func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes // The following is a heuristic that distributes collectors round-robbin across the specified number of clusters. // This heuristic only works when all collectors have equal weight! The following sanity check enforces this: if len(partnerCollectors) > 0 && len(partnerCollectors) > 2*len(internalCollectors) { - return nil, nil, fmt.Errorf("requiring at least x>0 number of partner collection nodes and y > 2x number of internal collection nodes, but got x,y=%d,%d", len(partnerNodes), len(internalNodes)) + return nil, nil, fmt.Errorf("requiring at least x>0 number of partner collection nodes and y > 2x number of internal collection nodes, but got x,y=%d,%d", len(partnerCollectors), len(internalCollectors)) } // sanity check ^ enforces that there is at least one internal node, hence `internalNodes[0].InitialWeight` is always a valid reference weight - refWeight := internalNodes[0].InitialWeight + refWeight := internalCollectors[0].InitialWeight identifierLists := make([]flow.IdentifierList, numCollectionClusters) // array to track the 2/3 internal-nodes constraint (internal_nodes > 2 * partner_nodes) From a82f198cae61315f788873b79fabf640089a3d37 Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Wed, 4 Sep 2024 05:56:35 -0700 Subject: [PATCH 13/18] Update cmd/util/cmd/common/clusters.go --- cmd/util/cmd/common/clusters.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/util/cmd/common/clusters.go b/cmd/util/cmd/common/clusters.go index 379d1409bc8..03aeba29529 100644 --- a/cmd/util/cmd/common/clusters.go +++ b/cmd/util/cmd/common/clusters.go @@ -66,7 +66,7 @@ func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes if len(partnerCollectors) > 0 && len(partnerCollectors) > 2*len(internalCollectors) { return nil, nil, fmt.Errorf("requiring at least x>0 number of partner collection nodes and y > 2x number of internal collection nodes, but got x,y=%d,%d", len(partnerCollectors), len(internalCollectors)) } - // sanity check ^ enforces that there is at least one internal node, hence `internalNodes[0].InitialWeight` is always a valid reference weight + // sanity check ^ enforces that there is at least one internal node, hence `internalCollectors[0].InitialWeight` is always a valid reference weight refWeight := internalCollectors[0].InitialWeight identifierLists := make([]flow.IdentifierList, numCollectionClusters) From fd6e94562af7ea522678749bf32f9c909e459d0a Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Wed, 4 Sep 2024 08:50:59 -0700 Subject: [PATCH 14/18] remove incorrect check --- cmd/util/cmd/common/clusters.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/cmd/util/cmd/common/clusters.go b/cmd/util/cmd/common/clusters.go index 03aeba29529..6451f1494e7 100644 --- a/cmd/util/cmd/common/clusters.go +++ b/cmd/util/cmd/common/clusters.go @@ -58,15 +58,7 @@ func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes log.Fatal().Err(err).Msg("could not shuffle internals") } - // The following is a heuristic for distributing the internal collector nodes (private staking key available - // to generate QC for cluster root block) and partner nodes (private staking unknown). We need internal nodes - // to control strictly more than 2/3 of the cluster's total weight. - // The following is a heuristic that distributes collectors round-robbin across the specified number of clusters. - // This heuristic only works when all collectors have equal weight! The following sanity check enforces this: - if len(partnerCollectors) > 0 && len(partnerCollectors) > 2*len(internalCollectors) { - return nil, nil, fmt.Errorf("requiring at least x>0 number of partner collection nodes and y > 2x number of internal collection nodes, but got x,y=%d,%d", len(partnerCollectors), len(internalCollectors)) - } - // sanity check ^ enforces that there is at least one internal node, hence `internalCollectors[0].InitialWeight` is always a valid reference weight + // capture first reference weight to validate that all collectors have equal weight refWeight := internalCollectors[0].InitialWeight identifierLists := make([]flow.IdentifierList, numCollectionClusters) From af81d945c9a6c607987141e0c288be2901f5ff65 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Wed, 4 Sep 2024 18:37:09 +0200 Subject: [PATCH 15/18] update flow grpc --- go.mod | 2 +- go.sum | 4 ++-- insecure/go.mod | 2 +- insecure/go.sum | 4 ++-- integration/go.mod | 2 +- integration/go.sum | 4 ++-- 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index e557998ca60..d5a140eda9a 100644 --- a/go.mod +++ b/go.mod @@ -54,7 +54,7 @@ require ( github.com/onflow/flow-core-contracts/lib/go/contracts v1.3.1 github.com/onflow/flow-core-contracts/lib/go/templates v1.3.1 github.com/onflow/flow-go-sdk v1.0.0-preview.55 - github.com/onflow/flow/protobuf/go/flow v0.4.6 + github.com/onflow/flow/protobuf/go/flow v0.4.7 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pierrec/lz4 v2.6.1+incompatible github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index e3cd8146665..ac8e956948f 100644 --- a/go.sum +++ b/go.sum @@ -2194,8 +2194,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0= github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= -github.com/onflow/flow/protobuf/go/flow v0.4.6 h1:KE/CsRVfyG5lGBtm1aNcjojMciQyS5GfPF3ixOWRfi0= -github.com/onflow/flow/protobuf/go/flow v0.4.6/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.7 h1:iP6DFx4wZ3ETORsyeqzHu7neFT3d1CXF6wdK+AOOjmc= +github.com/onflow/flow/protobuf/go/flow v0.4.7/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I= github.com/onflow/nft-storefront/lib/go/contracts v1.0.0 h1:sxyWLqGm/p4EKT6DUlQESDG1ZNMN9GjPCm1gTq7NGfc= diff --git a/insecure/go.mod b/insecure/go.mod index 0086182c027..8493273c41d 100644 --- a/insecure/go.mod +++ b/insecure/go.mod @@ -211,7 +211,7 @@ require ( github.com/onflow/flow-go-sdk v1.0.0-preview.55 // indirect github.com/onflow/flow-nft/lib/go/contracts v1.2.1 // indirect github.com/onflow/flow-nft/lib/go/templates v1.2.0 // indirect - github.com/onflow/flow/protobuf/go/flow v0.4.6 // indirect + github.com/onflow/flow/protobuf/go/flow v0.4.7 // indirect github.com/onflow/go-ethereum v1.14.7 // indirect github.com/onflow/sdks v0.6.0-preview.1 // indirect github.com/onflow/wal v1.0.2 // indirect diff --git a/insecure/go.sum b/insecure/go.sum index ab6a5936284..1777ec88fd9 100644 --- a/insecure/go.sum +++ b/insecure/go.sum @@ -2180,8 +2180,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0= github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= -github.com/onflow/flow/protobuf/go/flow v0.4.6 h1:KE/CsRVfyG5lGBtm1aNcjojMciQyS5GfPF3ixOWRfi0= -github.com/onflow/flow/protobuf/go/flow v0.4.6/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.7 h1:iP6DFx4wZ3ETORsyeqzHu7neFT3d1CXF6wdK+AOOjmc= +github.com/onflow/flow/protobuf/go/flow v0.4.7/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I= github.com/onflow/sdks v0.5.1-0.20230912225508-b35402f12bba/go.mod h1:F0dj0EyHC55kknLkeD10js4mo14yTdMotnWMslPirrU= diff --git a/integration/go.mod b/integration/go.mod index 8723aa74250..ef58e5c61f6 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -28,7 +28,7 @@ require ( github.com/onflow/flow-go v0.37.7-0.20240826193109-e211841b59f5 github.com/onflow/flow-go-sdk v1.0.0-preview.55 github.com/onflow/flow-go/insecure v0.0.0-00010101000000-000000000000 - github.com/onflow/flow/protobuf/go/flow v0.4.6 + github.com/onflow/flow/protobuf/go/flow v0.4.7 github.com/onflow/go-ethereum v1.14.7 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_model v0.5.0 diff --git a/integration/go.sum b/integration/go.sum index 17fec9e6715..5b6f4c5ccf9 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -2167,8 +2167,8 @@ github.com/onflow/flow-nft/lib/go/contracts v1.2.1/go.mod h1:2gpbza+uzs1k7x31hkp github.com/onflow/flow-nft/lib/go/templates v1.2.0 h1:JSQyh9rg0RC+D1930BiRXN8lrtMs+ubVMK6aQPon6Yc= github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJe5Gqgr2eeH49QB6+s6ze00w0= github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= -github.com/onflow/flow/protobuf/go/flow v0.4.6 h1:KE/CsRVfyG5lGBtm1aNcjojMciQyS5GfPF3ixOWRfi0= -github.com/onflow/flow/protobuf/go/flow v0.4.6/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.7 h1:iP6DFx4wZ3ETORsyeqzHu7neFT3d1CXF6wdK+AOOjmc= +github.com/onflow/flow/protobuf/go/flow v0.4.7/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I= github.com/onflow/sdks v0.5.1-0.20230912225508-b35402f12bba/go.mod h1:F0dj0EyHC55kknLkeD10js4mo14yTdMotnWMslPirrU= From 2464067d2bd6a8c598d6e02ce995c4b6ec10a5e2 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Wed, 4 Sep 2024 18:45:57 +0200 Subject: [PATCH 16/18] add comment --- engine/execution/computation/metrics/provider.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/engine/execution/computation/metrics/provider.go b/engine/execution/computation/metrics/provider.go index 39b483fc531..c23a426141d 100644 --- a/engine/execution/computation/metrics/provider.go +++ b/engine/execution/computation/metrics/provider.go @@ -38,6 +38,9 @@ func newProvider( } } +// Push buffers the metrics for the given height. +// The call should ensure height are called in strictly increasing order, otherwise +// metrics for the skipped height will not buffered. func (p *provider) Push( height uint64, data []TransactionExecutionMetrics, From 9150daf0e83211e1830860f5364c66a4ef416dbd Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Wed, 4 Sep 2024 16:35:43 -0700 Subject: [PATCH 17/18] set max retry interval for en requester engine to 10s --- cmd/execution_builder.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 86b74a8be6f..3e12a84ffdd 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -1048,6 +1048,7 @@ func (exeNode *ExecutionNode) LoadIngestionEngine( // consistency of collection can be checked by checking hash, and hash comes from trusted source (blocks from consensus follower) // hence we not need to check origin requester.WithValidateStaking(false), + requester.WithRetryMaximum(10*time.Second), ) if err != nil { From 85232ce318a2bc6f6bb0434ba01eb90d3154ae9f Mon Sep 17 00:00:00 2001 From: Jordan Schalm Date: Fri, 6 Sep 2024 09:39:03 -0700 Subject: [PATCH 18/18] add comment explaining settings change --- cmd/execution_builder.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 3e12a84ffdd..0ca1cc51008 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -1048,6 +1048,8 @@ func (exeNode *ExecutionNode) LoadIngestionEngine( // consistency of collection can be checked by checking hash, and hash comes from trusted source (blocks from consensus follower) // hence we not need to check origin requester.WithValidateStaking(false), + // we have observed execution nodes occasionally fail to retrieve collections using this engine, which can cause temporary execution halts + // setting a retry maximum of 10s results in a much faster recovery from these faults (default is 2m) requester.WithRetryMaximum(10*time.Second), )