Skip to content

Commit

Permalink
Fixed unhandled Errors in consumeEvents loop (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
begmaroman authored Jun 21, 2024
1 parent 0f850ac commit d98a596
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 93 deletions.
45 changes: 28 additions & 17 deletions .github/workflows/regression-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,37 @@ on:
jobs:
deploy_devnet:
runs-on: ubuntu-latest

steps:
- name: Checkout
- name: Checkout cdk-data-availability
uses: actions/checkout@v4
with:
path: cdk-data-availability

- name: Checkout kurtosis-cdk
uses: actions/checkout@v4
with:
repository: 0xPolygon/kurtosis-cdk
ref: feat/cdk-erigon-zkevm
path: kurtosis-cdk

- name: Build image (with proposed code changes) locally
id: build
- name: Install Kurtosis CDK tools
uses: ./kurtosis-cdk/.github/actions/setup-kurtosis-cdk

- name: Build docker image
working-directory: ./cdk-data-availability
run: docker build -t cdk-data-availability:local --file Dockerfile .

- name: Configure Kurtosis CDK
working-directory: ./kurtosis-cdk
run: |
if [ "$GITHUB_EVENT_NAME" == "pull_request" ]; then
GITHUB_SHA_SHORT=$(jq -r .pull_request.head.sha "$GITHUB_EVENT_PATH" | cut -c 1-7)
echo $GITHUB_SHA_SHORT
echo "::set-output name=GITHUB_SHA_SHORT::$GITHUB_SHA_SHORT"
fi
yq -Y --in-place '.args.data_availability_mode = "cdk-validium"' params.yml
yq -Y --in-place '.args.zkevm_da_image = "cdk-data-availability:local"' params.yml
- name: Set up Docker
uses: docker/setup-buildx-action@v1
- name: Deploy Kurtosis CDK package
working-directory: ./kurtosis-cdk
run: kurtosis run --enclave cdk-v1 --args-file params.yml --image-download always .

- name: Run regression tests against JIT container image
uses: 0xPolygon/[email protected]
with:
zkevm_dac: ${{ steps.build.outputs.GITHUB_SHA_SHORT }}
kurtosis_cli: 0.89.3
kurtosis_cdk: v0.2.0
- name: Monitor verified batches
working-directory: ./kurtosis-cdk
shell: bash
run: .github/actions/monitor-cdk-verified-batches/batch_verification_monitor.sh 19 600
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func start(cliCtx *cli.Context) error {

// ensure synchro/reorg start block is set
err = synchronizer.InitStartBlock(
cliCtx.Context,
storage,
etm,
c.L1.GenesisBlock,
Expand Down Expand Up @@ -141,7 +142,7 @@ func start(cliCtx *cli.Context) error {
if err != nil {
log.Fatal(err)
}
go batchSynchronizer.Start()
go batchSynchronizer.Start(cliCtx.Context)
cancelFuncs = append(cancelFuncs, batchSynchronizer.Stop)

// Register services
Expand Down
95 changes: 48 additions & 47 deletions synchronizer/batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -40,7 +41,6 @@ type BatchSynchronizer struct {
committee map[common.Address]etherman.DataCommitteeMember
lock sync.Mutex
reorgs <-chan BlockReorg
events chan *polygonvalidium.PolygonvalidiumSequenceBatches
sequencer SequencerTracker
rpcClientFactory client.Factory
}
Expand Down Expand Up @@ -68,7 +68,6 @@ func NewBatchSynchronizer(
self: self,
db: db,
reorgs: reorgs,
events: make(chan *polygonvalidium.PolygonvalidiumSequenceBatches),
sequencer: sequencer,
rpcClientFactory: rpcClientFactory,
}
Expand All @@ -94,36 +93,35 @@ func (bs *BatchSynchronizer) resolveCommittee() error {
}

// Start starts the synchronizer
func (bs *BatchSynchronizer) Start() {
func (bs *BatchSynchronizer) Start(ctx context.Context) {
log.Infof("starting batch synchronizer, DAC addr: %v", bs.self)
go bs.startUnresolvedBatchesProcessor()
go bs.consumeEvents()
go bs.produceEvents()
go bs.handleReorgs()
go bs.startUnresolvedBatchesProcessor(ctx)
go bs.produceEvents(ctx)
go bs.handleReorgs(ctx)
}

// Stop stops the synchronizer
func (bs *BatchSynchronizer) Stop() {
close(bs.events)
close(bs.stop)
}

func (bs *BatchSynchronizer) handleReorgs() {
func (bs *BatchSynchronizer) handleReorgs(ctx context.Context) {
log.Info("starting reorgs handler")
for {
select {
case r := <-bs.reorgs:
latest, err := getStartBlock(bs.db)
latest, err := getStartBlock(ctx, bs.db)
if err != nil {
log.Errorf("could not determine latest processed block: %v", err)
continue
}

if latest < r.Number {
// only reset start block if necessary
continue
}
err = setStartBlock(bs.db, r.Number)
if err != nil {

if err = setStartBlock(ctx, bs.db, r.Number); err != nil {
log.Errorf("failed to store new start block to %d: %v", r.Number, err)
}
case <-bs.stop:
Expand All @@ -132,13 +130,13 @@ func (bs *BatchSynchronizer) handleReorgs() {
}
}

func (bs *BatchSynchronizer) produceEvents() {
func (bs *BatchSynchronizer) produceEvents(ctx context.Context) {
log.Info("starting event producer")
for {
delay := time.NewTimer(bs.retry)
select {
case <-delay.C:
if err := bs.filterEvents(); err != nil {
if err := bs.filterEvents(ctx); err != nil {
log.Errorf("error filtering events: %v", err)
}
case <-bs.stop:
Expand All @@ -148,74 +146,77 @@ func (bs *BatchSynchronizer) produceEvents() {
}

// Start an iterator from last block processed, picking off SequenceBatches events
func (bs *BatchSynchronizer) filterEvents() error {
start, err := getStartBlock(bs.db)
func (bs *BatchSynchronizer) filterEvents(ctx context.Context) error {
start, err := getStartBlock(ctx, bs.db)
if err != nil {
return err
}

end := start + uint64(bs.blockBatchSize)

// get the latest block number
header, err := bs.client.HeaderByNumber(context.TODO(), nil)
header, err := bs.client.HeaderByNumber(ctx, nil)
if err != nil {
log.Errorf("failed to determine latest block number: %v", err)
return err
}

// we don't want to scan beyond latest block
if end > header.Number.Uint64() {
end = header.Number.Uint64()
}

iter, err := bs.client.FilterSequenceBatches(
&bind.FilterOpts{
Context: ctx,
Start: start,
End: &end,
Context: context.TODO(),
}, nil)
if err != nil {
log.Errorf("failed to create SequenceBatches event iterator: %v", err)
return err
}

// Collect events into the slice
var events []*polygonvalidium.PolygonvalidiumSequenceBatches
for iter.Next() {
if iter.Error() != nil {
return iter.Error()
}
bs.events <- iter.Event

events = append(events, iter.Event)
}

// advance start block
err = setStartBlock(bs.db, end)
if err != nil {
return err
if err = iter.Close(); err != nil {
log.Errorf("failed to close SequenceBatches event iterator: %v", err)
}
return nil
}

func (bs *BatchSynchronizer) consumeEvents() {
log.Info("starting event consumer")
for {
select {
case sb := <-bs.events:
if err := bs.handleEvent(sb); err != nil {
log.Errorf("failed to handle event: %v", err)
}
case <-bs.stop:
return
// Sort events by block number ascending
sort.Slice(events, func(i, j int) bool {
return events[i].Raw.BlockNumber < events[j].Raw.BlockNumber
})

// Handle events
for _, event := range events {
if err = bs.handleEvent(ctx, event); err != nil {
log.Errorf("failed to handle event: %v", err)
return setStartBlock(ctx, bs.db, event.Raw.BlockNumber-1)
}
}

return setStartBlock(ctx, bs.db, end)
}

func (bs *BatchSynchronizer) handleEvent(event *polygonvalidium.PolygonvalidiumSequenceBatches) error {
ctx, cancel := context.WithTimeout(context.Background(), bs.rpcTimeout)
func (bs *BatchSynchronizer) handleEvent(parentCtx context.Context, event *polygonvalidium.PolygonvalidiumSequenceBatches) error {
ctx, cancel := context.WithTimeout(parentCtx, bs.rpcTimeout)
defer cancel()

tx, _, err := bs.client.GetTx(ctx, event.Raw.TxHash)
if err != nil {
return err
}

txData := tx.Data()
keys, err := UnpackTxData(txData)
keys, err := UnpackTxData(tx.Data())
if err != nil {
return err
}
Expand All @@ -231,16 +232,16 @@ func (bs *BatchSynchronizer) handleEvent(event *polygonvalidium.PolygonvalidiumS
}

// Store batch keys. Already handled batch keys are going to be ignored based on the DB logic.
return storeUnresolvedBatchKeys(bs.db, batchKeys)
return storeUnresolvedBatchKeys(ctx, bs.db, batchKeys)
}

func (bs *BatchSynchronizer) startUnresolvedBatchesProcessor() {
func (bs *BatchSynchronizer) startUnresolvedBatchesProcessor(ctx context.Context) {
log.Info("starting handling unresolved batches")
for {
delay := time.NewTimer(bs.retry)
select {
case <-delay.C:
if err := bs.handleUnresolvedBatches(); err != nil {
if err := bs.handleUnresolvedBatches(ctx); err != nil {
log.Error(err)
}
case <-bs.stop:
Expand All @@ -250,9 +251,9 @@ func (bs *BatchSynchronizer) startUnresolvedBatchesProcessor() {
}

// handleUnresolvedBatches handles unresolved batches that were collected by the event consumer
func (bs *BatchSynchronizer) handleUnresolvedBatches() error {
func (bs *BatchSynchronizer) handleUnresolvedBatches(ctx context.Context) error {
// Get unresolved batches
batchKeys, err := getUnresolvedBatchKeys(bs.db)
batchKeys, err := getUnresolvedBatchKeys(ctx, bs.db)
if err != nil {
return fmt.Errorf("failed to get unresolved batch keys: %v", err)
}
Expand All @@ -265,7 +266,7 @@ func (bs *BatchSynchronizer) handleUnresolvedBatches() error {
var data []types.OffChainData
var resolved []types.BatchKey
for _, key := range batchKeys {
if exists(bs.db, key.Hash) {
if exists(ctx, bs.db, key.Hash) {
resolved = append(resolved, key)
} else {
var value *types.OffChainData
Expand All @@ -281,14 +282,14 @@ func (bs *BatchSynchronizer) handleUnresolvedBatches() error {

// Store data of the batches to the DB
if len(data) > 0 {
if err = storeOffchainData(bs.db, data); err != nil {
if err = storeOffchainData(ctx, bs.db, data); err != nil {
return fmt.Errorf("failed to store offchain data: %v", err)
}
}

// Mark batches as resolved
if len(resolved) > 0 {
if err = deleteUnresolvedBatchKeys(bs.db, resolved); err != nil {
if err = deleteUnresolvedBatchKeys(ctx, bs.db, resolved); err != nil {
return fmt.Errorf("failed to delete successfully resolved batch keys: %v", err)
}
}
Expand Down
7 changes: 4 additions & 3 deletions synchronizer/batches_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package synchronizer

import (
"context"
"errors"
"math/big"
"strings"
Expand Down Expand Up @@ -356,7 +357,7 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) {
client: ethermanMock,
}

err := batchSynronizer.handleEvent(event)
err := batchSynronizer.handleEvent(context.Background(), event)
if config.isErrorExpected {
require.Error(t, err)
} else {
Expand Down Expand Up @@ -624,7 +625,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) {
sequencer: sequencerMock,
}

err := batchSynronizer.handleUnresolvedBatches()
err := batchSynronizer.handleUnresolvedBatches(context.Background())
if config.isErrorExpected {
require.Error(t, err)
} else {
Expand Down Expand Up @@ -868,7 +869,7 @@ func TestBatchSyncronizer_HandleReorgs(t *testing.T) {
reorgs: reorgChan,
}

go batchSynchronizer.handleReorgs()
go batchSynchronizer.handleReorgs(context.Background())

reorgChan <- config.reorg

Expand Down
8 changes: 4 additions & 4 deletions synchronizer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ const (
)

// InitStartBlock initializes the L1 sync task by finding the inception block for the CDKValidium contract
func InitStartBlock(db db.DB, em etherman.Etherman, genesisBlock uint64, validiumAddr common.Address) error {
ctx, cancel := context.WithTimeout(context.Background(), initBlockTimeout)
func InitStartBlock(parentCtx context.Context, db db.DB, em etherman.Etherman, genesisBlock uint64, validiumAddr common.Address) error {
ctx, cancel := context.WithTimeout(parentCtx, initBlockTimeout)
defer cancel()

current, err := getStartBlock(db)
current, err := getStartBlock(ctx, db)
if err != nil {
return err
}
Expand All @@ -41,7 +41,7 @@ func InitStartBlock(db db.DB, em etherman.Etherman, genesisBlock uint64, validiu
}
}

return setStartBlock(db, startBlock.Uint64())
return setStartBlock(ctx, db, startBlock.Uint64())
}

func findContractDeploymentBlock(ctx context.Context, em etherman.Etherman, contract common.Address) (*big.Int, error) {
Expand Down
2 changes: 2 additions & 0 deletions synchronizer/init_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package synchronizer

import (
"context"
"errors"
"math/big"
"testing"
Expand Down Expand Up @@ -90,6 +91,7 @@ func Test_InitStartBlock(t *testing.T) {
}

err := InitStartBlock(
context.Background(),
dbMock,
emMock,
l1Config.GenesisBlock,
Expand Down
Loading

0 comments on commit d98a596

Please sign in to comment.