From 580a3289cba128343bc7107efad85b0408fdb230 Mon Sep 17 00:00:00 2001 From: Joel Smith Date: Wed, 1 May 2024 14:10:38 -0700 Subject: [PATCH] refactor: flush logic (#89) * Refactor flush logic * Update README * Check bounds --- README.md | 39 +++++++++++++++++++++++---------------- ethereum/listener.go | 36 +++++++++++++++++++++++++++++------- noble/listener.go | 35 +++++++++++++++++++++++++++++------ 3 files changed, 81 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index c768046..94bae60 100644 --- a/README.md +++ b/README.md @@ -20,21 +20,28 @@ Sample configs can be found in [config](config). ### Flush Interval -Using the `--flush-interval` flag will run a flush on all paths every `duration`; ex `--flush-interval 5m` +Using the `--flush-interval` flag will run a flush on all chains every `duration`; ex `--flush-interval 5m` -The relayer will keep track of the latest flushed block. The first time the flush is run, the flush will start at the chains latest height - lookback period and flush up until height of the chain when the flush started. It will then store the height the flush ended on. +The first time the flush is run per chain, the flush will start at the chains `latest height - (2 * lookback period)`. The flush will always finish at the `latest chain height - lookback period`. This allows the flush to lag behind the chain so that the flush does not compete for transactions that are actively being processed. For subsequent flushes, each chain will reference its last flushed block, start from there and flush to the `latest chain height - lookback period` again. The flushing process will continue as long as the relayer is running. -After that, it will flush from the last stored height - lookback period up until the latest height of the chain. +For best results and coverage, the lookback period in blocks should correspond to the flush interval. If a chain produces 1 block a second and the flush interval is set to 30 minutes (1800 seconds), the lookback period should be at least 1800 blocks. When in doubt, round up and add a small buffer. + +#### Examples + +Consider a 30 minute flush interval (1800 seconds) +- Ethereum: 12 second blocks = (1800 / 12) = `150 blocks` +- Polygon: 2 second blocks = (1800 / 2) = `900 blocks` +- Arbitrum: 0.26 second blocks = (1800 / 0.26) = `~6950 blocks` ### Prometheus Metrics By default, metrics are exported at on port :2112/metrics (`http://localhost:2112/metrics`). You can customize the port using the `--metrics-port` flag. -| **Exported Metric** | **Description** | **Type** | -|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|----------| -| cctp_relayer_wallet_balance | Current balance of a relayer wallet in Wei.

Noble balances are not currently exported b/c `MsgReceiveMessage` is free to submit on Noble. | Gauge | -| cctp_relayer_chain_latest_height | Current height of the chain. | Gauge | -| cctp_relayer_broadcast_errors_total | The total number of failed broadcasts. Note: this is AFTER it retries `broadcast-retries` (config setting) number of times. | Counter | +| **Exported Metric** | **Description** | **Type** | +| ----------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ | -------- | +| cctp_relayer_wallet_balance | Current balance of a relayer wallet in Wei.

Noble balances are not currently exported b/c `MsgReceiveMessage` is free to submit on Noble. | Gauge | +| cctp_relayer_chain_latest_height | Current height of the chain. | Gauge | +| cctp_relayer_broadcast_errors_total | The total number of failed broadcasts. Note: this is AFTER it retries `broadcast-retries` (config setting) number of times. | Counter | ### Minter Private Keys Minter private keys are required on a per chain basis to broadcast transactions to the target chain. These private keys can either be set in the `config.yaml` or via environment variables. @@ -66,14 +73,14 @@ localhost:8000/tx/?domain=0 ### State -| IrisLookupId | Status | SourceDomain | DestDomain | SourceTxHash | DestTxHash | MsgSentBytes | Created | Updated | -|:-------------|:---------|:-------------|:-----------|:--------------|:-----------|:-------------|:--------|:--------| -| 0x123 | Created | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | -| 0x123 | Pending | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | -| 0x123 | Attested | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | -| 0x123 | Complete | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | -| 0x123 | Failed | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | -| 0x123 | Filtered | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | +| IrisLookupId | Status | SourceDomain | DestDomain | SourceTxHash | DestTxHash | MsgSentBytes | Created | Updated | +| :----------- | :------- | :----------- | :--------- | :----------- | :--------- | :----------- | :------ | :------ | +| 0x123 | Created | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | +| 0x123 | Pending | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | +| 0x123 | Attested | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | +| 0x123 | Complete | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | +| 0x123 | Failed | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | +| 0x123 | Filtered | 0 | 4 | 0x123 | ABC123 | bytes... | date | date | ### Generating Go ABI bindings diff --git a/ethereum/listener.go b/ethereum/listener.go index c60b709..346f3a8 100644 --- a/ethereum/listener.go +++ b/ethereum/listener.go @@ -261,6 +261,16 @@ func (e *Ethereum) consumeStream( } } +// flushMechanism looks back over the chain history every specified flushInterval. +// +// Each chain is configured with a lookback period which signifies how many blocks to look back +// at each interval. The flush mechanism will start from the last flushed block and will rescan +// the lookback period and consume all messages in that range. The flush mechanism will not flush +// all the way to the chain's latest block to avoid consuming messages that are still in the queue. +// There will be a minimum gap of the lookback period between the last flushed block and the latest block. +// +// Note: The first time the flush mechanism is run, it will set the lastFlushedBlock to the latest block +// minus twice the lookback period. func (e *Ethereum) flushMechanism( ctx context.Context, logger log.Logger, @@ -281,19 +291,31 @@ func (e *Ethereum) flushMechanism( // initialize first lastFlushedBlock if not set if e.lastFlushedBlock == 0 { - e.lastFlushedBlock = latestBlock - e.lookbackPeriod + e.lastFlushedBlock = latestBlock - 2*e.lookbackPeriod + + if latestBlock < e.lookbackPeriod { + e.lastFlushedBlock = 0 + } } - // start from lastFlushedBlock - start := e.lastFlushedBlock + // start from the last block it flushed + startBlock := e.lastFlushedBlock + + // set finish block to be latestBlock - lookbackPeriod + finishBlock := latestBlock - e.lookbackPeriod + + if startBlock >= finishBlock { + logger.Debug("No new blocks to flush") + continue + } - logger.Info(fmt.Sprintf("Flush started from %d to %d", start, latestBlock)) + logger.Info(fmt.Sprintf("Flush started from %d to %d (current height: %d, lookback period: %d)", startBlock, finishBlock, latestBlock, e.lookbackPeriod)) - // consume from lastFlushedBlock to the latestBlock - e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, start, latestBlock) + // consume from lastFlushedBlock to the finishBlock + e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, startBlock, finishBlock) // update lastFlushedBlock to the last block it flushed - e.lastFlushedBlock = latestBlock + e.lastFlushedBlock = finishBlock logger.Info("Flush complete") diff --git a/noble/listener.go b/noble/listener.go index 516f913..7c4830c 100644 --- a/noble/listener.go +++ b/noble/listener.go @@ -121,6 +121,16 @@ func (n *Noble) StartListener( <-ctx.Done() } +// flushMechanism looks back over the chain history every specified flushInterval. +// +// Each chain is configured with a lookback period which signifies how many blocks to look back +// at each interval. The flush mechanism will start from the last flushed block and will rescan +// the lookback period and consume all messages in that range. The flush mechanism will not flush +// all the way to the chain's latest block to avoid consuming messages that are still in the queue. +// There will be a minimum gap of the lookback period between the last flushed block and the latest block. +// +// Note: The first time the flush mechanism is run, it will set the lastFlushedBlock to the latest block +// minus twice the lookback period. func (n *Noble) flushMechanism( ctx context.Context, logger log.Logger, @@ -145,19 +155,32 @@ func (n *Noble) flushMechanism( continue } + // initialize first lastFlushedBlock if not set if n.lastFlushedBlock == 0 { - n.lastFlushedBlock = latestBlock + n.lastFlushedBlock = latestBlock - (2 * n.lookbackPeriod) + + if latestBlock < n.lookbackPeriod { + n.lastFlushedBlock = 0 + } } - lastFlushedBlock := n.lastFlushedBlock - flushStart := lastFlushedBlock - n.lookbackPeriod + // start from the last block it flushed + startBlock := n.lastFlushedBlock + + // set finish block to be latestBlock - lookbackPeriod + finishBlock := latestBlock - n.lookbackPeriod + + if startBlock >= finishBlock { + logger.Debug("No new blocks to flush") + continue + } - logger.Info(fmt.Sprintf("Flush started from: %d to: %d", flushStart, latestBlock)) + logger.Info(fmt.Sprintf("Flush started from %d to %d (current height: %d, lookback period: %d)", startBlock, finishBlock, latestBlock, n.lookbackPeriod)) - for i := flushStart; i <= latestBlock; i++ { + for i := startBlock; i <= finishBlock; i++ { blockQueue <- i } - n.lastFlushedBlock = latestBlock + n.lastFlushedBlock = finishBlock logger.Info("Flush complete")