Skip to content

Commit

Permalink
Refactor flush logic
Browse files Browse the repository at this point in the history
  • Loading branch information
joelsmith-2019 committed May 1, 2024
1 parent 7f4149b commit 15ff9db
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 12 deletions.
30 changes: 24 additions & 6 deletions ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,16 @@ func (e *Ethereum) consumeStream(
}
}

// flushMechanism looks back over the chain history every specified flushInterval.
//
// Each chain is configured with a lookback period which signifies how many blocks to look back
// at each interval. The flush mechanism will start from the last flushed block and will rescan
// the lookback period and consume all messages in that range. The flush mechanism will not flush
// all the way to the chain's latest block to avoid consuming messages that are still in the queue.
// There will be a minimum gap of the lookback period between the last flushed block and the latest block.
//
// Note: The first time the flush mechanism is run, it will set the lastFlushedBlock to the latest block
// minus twice the lookback period.
func (e *Ethereum) flushMechanism(
ctx context.Context,
logger log.Logger,
Expand All @@ -281,19 +291,27 @@ func (e *Ethereum) flushMechanism(

// initialize first lastFlushedBlock if not set
if e.lastFlushedBlock == 0 {
e.lastFlushedBlock = latestBlock - e.lookbackPeriod
e.lastFlushedBlock = latestBlock - (2 * e.lookbackPeriod)
}

// start from lastFlushedBlock
start := e.lastFlushedBlock
// start from the last block it flushed
startBlock := e.lastFlushedBlock

// set finish block to be latestBlock - lookbackPeriod
finishBlock := latestBlock - e.lookbackPeriod

if startBlock >= finishBlock {
logger.Debug("No new blocks to flush")
continue
}

logger.Info(fmt.Sprintf("Flush started from %d to %d", start, latestBlock))
logger.Info(fmt.Sprintf("Flush started from %d to %d (current height: %d, lookback period: %d)", startBlock, finishBlock, latestBlock, e.lookbackPeriod))

// consume from lastFlushedBlock to the latestBlock
e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, start, latestBlock)
e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, startBlock, finishBlock)

// update lastFlushedBlock to the last block it flushed
e.lastFlushedBlock = latestBlock
e.lastFlushedBlock = finishBlock

logger.Info("Flush complete")

Expand Down
31 changes: 25 additions & 6 deletions noble/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ func (n *Noble) StartListener(
<-ctx.Done()
}

// flushMechanism looks back over the chain history every specified flushInterval.
//
// Each chain is configured with a lookback period which signifies how many blocks to look back
// at each interval. The flush mechanism will start from the last flushed block and will rescan
// the lookback period and consume all messages in that range. The flush mechanism will not flush
// all the way to the chain's latest block to avoid consuming messages that are still in the queue.
// There will be a minimum gap of the lookback period between the last flushed block and the latest block.
//
// Note: The first time the flush mechanism is run, it will set the lastFlushedBlock to the latest block
// minus twice the lookback period.
func (n *Noble) flushMechanism(
ctx context.Context,
logger log.Logger,
Expand All @@ -145,19 +155,28 @@ func (n *Noble) flushMechanism(
continue
}

// initialize first lastFlushedBlock if not set
if n.lastFlushedBlock == 0 {
n.lastFlushedBlock = latestBlock
n.lastFlushedBlock = latestBlock - (2 * n.lookbackPeriod)
}
lastFlushedBlock := n.lastFlushedBlock

flushStart := lastFlushedBlock - n.lookbackPeriod
// start from the last block it flushed
startBlock := n.lastFlushedBlock

logger.Info(fmt.Sprintf("Flush started from: %d to: %d", flushStart, latestBlock))
// set finish block to be latestBlock - lookbackPeriod
finishBlock := latestBlock - n.lookbackPeriod

for i := flushStart; i <= latestBlock; i++ {
if startBlock >= finishBlock {
logger.Debug("No new blocks to flush")
continue
}

logger.Info(fmt.Sprintf("Flush started from %d to %d (current height: %d, lookback period: %d)", startBlock, finishBlock, latestBlock, n.lookbackPeriod))

for i := startBlock; i <= finishBlock; i++ {
blockQueue <- i
}
n.lastFlushedBlock = latestBlock
n.lastFlushedBlock = finishBlock

logger.Info("Flush complete")

Expand Down

0 comments on commit 15ff9db

Please sign in to comment.