From 0c017b9c5ac924a8e56c0ecaab2dd1237508653b Mon Sep 17 00:00:00 2001 From: Rachit Sonthalia Date: Tue, 26 Nov 2024 17:33:35 +0530 Subject: [PATCH 01/19] wip --- .mockery.yaml | 6 -- cmd/main.go | 2 +- db/db.go | 87 +++---------------- db/db_test.go | 144 ++++++------------------------- mocks/db.generated.go | 58 ------------- mocks/gaps_detector.generated.go | 79 ----------------- services/status/status.go | 27 ++---- services/status/status_test.go | 7 +- services/sync/sync_test.go | 20 ++--- synchronizer/batches.go | 114 +++++------------------- synchronizer/batches_test.go | 121 +------------------------- synchronizer/store.go | 7 -- synchronizer/store_test.go | 58 ------------- types/types.go | 14 ++- 14 files changed, 91 insertions(+), 653 deletions(-) delete mode 100644 mocks/gaps_detector.generated.go diff --git a/.mockery.yaml b/.mockery.yaml index bb55d443..0256cef9 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -36,9 +36,3 @@ packages: SequencerTracker: config: filename: sequencer_tracker.generated.go - github.com/0xPolygon/cdk-data-availability/services/status: - config: - interfaces: - GapsDetector: - config: - filename: gaps_detector.generated.go diff --git a/cmd/main.go b/cmd/main.go index 409c2fd2..38560c9c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -154,7 +154,7 @@ func start(cliCtx *cli.Context) error { []rpc.Service{ { Name: status.APISTATUS, - Service: status.NewEndpoints(storage, batchSynchronizer), + Service: status.NewEndpoints(storage), }, { Name: sync.APISYNC, diff --git a/db/db.go b/db/db.go index c4e4bed8..196039c3 100644 --- a/db/db.go +++ b/db/db.go @@ -28,39 +28,20 @@ const ( // getOffchainDataSQL is a query that returns the offchain data for a given key getOffchainDataSQL = ` - SELECT key, value, batch_num + SELECT key, value FROM data_node.offchain_data WHERE key = $1 LIMIT 1; ` // listOffchainDataSQL is a query that returns the offchain data for a given list of keys listOffchainDataSQL = ` - SELECT key, value, batch_num + SELECT key, value FROM data_node.offchain_data WHERE key IN (?); ` // countOffchainDataSQL is a query that returns the count of rows in the offchain_data table countOffchainDataSQL = "SELECT COUNT(*) FROM data_node.offchain_data;" - - // selectOffchainDataGapsSQL is a query that returns the gaps in the offchain_data table - selectOffchainDataGapsSQL = ` - WITH numbered_batches AS ( - SELECT - batch_num, - ROW_NUMBER() OVER (ORDER BY batch_num) AS row_number - FROM data_node.offchain_data - ) - SELECT - nb1.batch_num AS current_batch_num, - nb2.batch_num AS next_batch_num - FROM - numbered_batches nb1 - LEFT JOIN numbered_batches nb2 ON nb1.row_number = nb2.row_number - 1 - WHERE - nb1.batch_num IS NOT NULL - AND nb2.batch_num IS NOT NULL - AND nb1.batch_num + 1 <> nb2.batch_num;` ) var ( @@ -81,7 +62,6 @@ type DB interface { ListOffChainData(ctx context.Context, keys []common.Hash) ([]types.OffChainData, error) StoreOffChainData(ctx context.Context, od []types.OffChainData) error CountOffchainData(ctx context.Context) (uint64, error) - DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) } // DB is the database layer of the data node @@ -93,7 +73,6 @@ type pgDB struct { getUnresolvedBatchKeysStmt *sqlx.Stmt getOffChainDataStmt *sqlx.Stmt countOffChainDataStmt *sqlx.Stmt - detectOffChainDataGapsStmt *sqlx.Stmt } // New instantiates a DB @@ -123,11 +102,6 @@ func New(ctx context.Context, pg *sqlx.DB) (DB, error) { return nil, fmt.Errorf("failed to prepare the count offchain data statement: %w", err) } - detectOffChainDataGapsStmt, err := pg.PreparexContext(ctx, selectOffchainDataGapsSQL) - if err != nil { - return nil, fmt.Errorf("failed to prepare the detect offchain data gaps statement: %w", err) - } - return &pgDB{ pg: pg, storeLastProcessedBlockStmt: storeLastProcessedBlockStmt, @@ -135,7 +109,6 @@ func New(ctx context.Context, pg *sqlx.DB) (DB, error) { getUnresolvedBatchKeysStmt: getUnresolvedBatchKeysStmt, getOffChainDataStmt: getOffChainDataStmt, countOffChainDataStmt: countOffChainDataStmt, - detectOffChainDataGapsStmt: detectOffChainDataGapsStmt, }, nil } @@ -245,9 +218,8 @@ func (db *pgDB) StoreOffChainData(ctx context.Context, ods []types.OffChainData) // GetOffChainData returns the value identified by the key func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error) { data := struct { - Key string `db:"key"` - Value string `db:"value"` - BatchNum uint64 `db:"batch_num"` + Key string `db:"key"` + Value string `db:"value"` }{} if err := db.getOffChainDataStmt.QueryRowxContext(ctx, key.Hex()).StructScan(&data); err != nil { @@ -259,9 +231,8 @@ func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash) (*types.Of } return &types.OffChainData{ - Key: common.HexToHash(data.Key), - Value: common.FromHex(data.Value), - BatchNum: data.BatchNum, + Key: common.HexToHash(data.Key), + Value: common.FromHex(data.Value), }, nil } @@ -292,9 +263,8 @@ func (db *pgDB) ListOffChainData(ctx context.Context, keys []common.Hash) ([]typ defer rows.Close() type row struct { - Key string `db:"key"` - Value string `db:"value"` - BatchNum uint64 `db:"batch_num"` + Key string `db:"key"` + Value string `db:"value"` } list := make([]types.OffChainData, 0, len(keys)) @@ -305,9 +275,8 @@ func (db *pgDB) ListOffChainData(ctx context.Context, keys []common.Hash) ([]typ } list = append(list, types.OffChainData{ - Key: common.HexToHash(data.Key), - Value: common.FromHex(data.Value), - BatchNum: data.BatchNum, + Key: common.HexToHash(data.Key), + Value: common.FromHex(data.Value), }) } @@ -324,33 +293,6 @@ func (db *pgDB) CountOffchainData(ctx context.Context) (uint64, error) { return count, nil } -// DetectOffchainDataGaps returns the number of gaps in the offchain_data table -func (db *pgDB) DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) { - rows, err := db.detectOffChainDataGapsStmt.QueryxContext(ctx) - if err != nil { - return nil, err - } - - defer rows.Close() - - type row struct { - CurrentBatchNum uint64 `db:"current_batch_num"` - NextBatchNum uint64 `db:"next_batch_num"` - } - - gaps := make(map[uint64]uint64) - for rows.Next() { - var data row - if err = rows.StructScan(&data); err != nil { - return nil, err - } - - gaps[data.CurrentBatchNum] = data.NextBatchNum - } - - return gaps, nil -} - // buildBatchKeysInsertQuery builds the query to insert unresolved batch keys func buildBatchKeysInsertQuery(bks []types.BatchKey) (string, []interface{}) { const columnsAffected = 2 @@ -372,7 +314,7 @@ func buildBatchKeysInsertQuery(bks []types.BatchKey) (string, []interface{}) { // buildOffchainDataInsertQuery builds the query to insert offchain data func buildOffchainDataInsertQuery(ods []types.OffChainData) (string, []interface{}) { - const columnsAffected = 3 + const columnsAffected = 2 // Remove duplicates from the given offchain data ods = types.RemoveDuplicateOffChainData(ods) @@ -380,16 +322,15 @@ func buildOffchainDataInsertQuery(ods []types.OffChainData) (string, []interface args := make([]interface{}, len(ods)*columnsAffected) values := make([]string, len(ods)) for i, od := range ods { - values[i] = fmt.Sprintf("($%d, $%d, $%d)", i*columnsAffected+1, i*columnsAffected+2, i*columnsAffected+3) //nolint:mnd + values[i] = fmt.Sprintf("($%d, $%d)", i*columnsAffected+1, i*columnsAffected+2) //nolint:mnd args[i*columnsAffected] = od.Key.Hex() args[i*columnsAffected+1] = common.Bytes2Hex(od.Value) - args[i*columnsAffected+2] = od.BatchNum } return fmt.Sprintf(` - INSERT INTO data_node.offchain_data (key, value, batch_num) + INSERT INTO data_node.offchain_data (key, value) VALUES %s ON CONFLICT (key) DO UPDATE - SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num; + SET value = EXCLUDED.value; `, strings.Join(values, ",")), args } diff --git a/db/db_test.go b/db/db_test.go index 4f268884..ca05f6d6 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -217,7 +217,6 @@ func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) { mock.ExpectPrepare(regexp.QuoteMeta(getUnresolvedBatchKeysSQL)) mock.ExpectPrepare(regexp.QuoteMeta(getOffchainDataSQL)) mock.ExpectPrepare(regexp.QuoteMeta(countOffchainDataSQL)) - mock.ExpectPrepare(regexp.QuoteMeta(selectOffchainDataGapsSQL)) dbPG, err := New(context.Background(), wdb) require.NoError(t, err) @@ -419,7 +418,7 @@ func Test_DB_StoreOffChainData(t *testing.T) { Key: common.BytesToHash([]byte("key1")), Value: []byte("value1"), }}, - expectedQuery: `INSERT INTO data_node.offchain_data (key, value, batch_num) VALUES ($1, $2, $3) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num`, + expectedQuery: `INSERT INTO data_node.offchain_data (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value`, }, { name: "several values inserted", @@ -430,7 +429,7 @@ func Test_DB_StoreOffChainData(t *testing.T) { Key: common.BytesToHash([]byte("key2")), Value: []byte("value2"), }}, - expectedQuery: `INSERT INTO data_node.offchain_data (key, value, batch_num) VALUES ($1, $2, $3),($4, $5, $6) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num`, + expectedQuery: `INSERT INTO data_node.offchain_data (key, value) VALUES ($1, $2),($3, $4) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value`, }, { name: "error returned", @@ -438,7 +437,7 @@ func Test_DB_StoreOffChainData(t *testing.T) { Key: common.BytesToHash([]byte("key1")), Value: []byte("value1"), }}, - expectedQuery: `INSERT INTO data_node.offchain_data (key, value, batch_num) VALUES ($1, $2, $3) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num`, + expectedQuery: `INSERT INTO data_node.offchain_data (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value`, returnErr: errors.New("test error"), }, } @@ -463,7 +462,7 @@ func Test_DB_StoreOffChainData(t *testing.T) { if tt.expectedQuery != "" { args := make([]driver.Value, 0, len(tt.ods)*3) for _, od := range tt.ods { - args = append(args, od.Key.Hex(), common.Bytes2Hex(od.Value), od.BatchNum) + args = append(args, od.Key.Hex(), common.Bytes2Hex(od.Value)) } expected := mock.ExpectExec(regexp.QuoteMeta(tt.expectedQuery)).WithArgs(args...) @@ -499,15 +498,13 @@ func Test_DB_GetOffChainData(t *testing.T) { { name: "successfully selected value", od: []types.OffChainData{{ - Key: common.BytesToHash([]byte("key1")), - Value: []byte("value1"), - BatchNum: 1, + Key: common.BytesToHash([]byte("key1")), + Value: []byte("value1"), }}, key: common.BytesToHash([]byte("key1")), expected: &types.OffChainData{ - Key: common.BytesToHash([]byte("key1")), - Value: []byte("value1"), - BatchNum: 1, + Key: common.BytesToHash([]byte("key1")), + Value: []byte("value1"), }, }, { @@ -556,8 +553,8 @@ func Test_DB_GetOffChainData(t *testing.T) { if tt.returnErr != nil { expected.WillReturnError(tt.returnErr) } else { - expected.WillReturnRows(sqlmock.NewRows([]string{"key", "value", "batch_num"}). - AddRow(tt.expected.Key.Hex(), common.Bytes2Hex(tt.expected.Value), tt.expected.BatchNum)) + expected.WillReturnRows(sqlmock.NewRows([]string{"key", "value"}). + AddRow(tt.expected.Key.Hex(), common.Bytes2Hex(tt.expected.Value))) } data, err := dbPG.GetOffChainData(context.Background(), tt.key) @@ -595,23 +592,20 @@ func Test_DB_ListOffChainData(t *testing.T) { }, expected: []types.OffChainData{ { - Key: common.BytesToHash([]byte("key1")), - Value: []byte("value1"), - BatchNum: 0, + Key: common.BytesToHash([]byte("key1")), + Value: []byte("value1"), }, }, - sql: `SELECT key, value, batch_num FROM data_node\.offchain_data WHERE key IN \(\$1\)`, + sql: `SELECT key, value FROM data_node\.offchain_data WHERE key IN \(\$1\)`, }, { name: "successfully selected two values", od: []types.OffChainData{{ - Key: common.BytesToHash([]byte("key1")), - Value: []byte("value1"), - BatchNum: 1, + Key: common.BytesToHash([]byte("key1")), + Value: []byte("value1"), }, { - Key: common.BytesToHash([]byte("key2")), - Value: []byte("value2"), - BatchNum: 2, + Key: common.BytesToHash([]byte("key2")), + Value: []byte("value2"), }}, keys: []common.Hash{ common.BytesToHash([]byte("key1")), @@ -619,17 +613,15 @@ func Test_DB_ListOffChainData(t *testing.T) { }, expected: []types.OffChainData{ { - Key: common.BytesToHash([]byte("key1")), - Value: []byte("value1"), - BatchNum: 1, + Key: common.BytesToHash([]byte("key1")), + Value: []byte("value1"), }, { - Key: common.BytesToHash([]byte("key2")), - Value: []byte("value2"), - BatchNum: 2, + Key: common.BytesToHash([]byte("key2")), + Value: []byte("value2"), }, }, - sql: `SELECT key, value, batch_num FROM data_node\.offchain_data WHERE key IN \(\$1\, \$2\)`, + sql: `SELECT key, value FROM data_node\.offchain_data WHERE key IN \(\$1\, \$2\)`, }, { name: "error returned", @@ -640,7 +632,7 @@ func Test_DB_ListOffChainData(t *testing.T) { keys: []common.Hash{ common.BytesToHash([]byte("key1")), }, - sql: `SELECT key, value, batch_num FROM data_node\.offchain_data WHERE key IN \(\$1\)`, + sql: `SELECT key, value FROM data_node\.offchain_data WHERE key IN \(\$1\)`, returnErr: errors.New("test error"), }, { @@ -652,7 +644,7 @@ func Test_DB_ListOffChainData(t *testing.T) { keys: []common.Hash{ common.BytesToHash([]byte("undefined")), }, - sql: `SELECT key, value, batch_num FROM data_node\.offchain_data WHERE key IN \(\$1\)`, + sql: `SELECT key, value FROM data_node\.offchain_data WHERE key IN \(\$1\)`, returnErr: ErrStateNotSynchronized, }, } @@ -688,10 +680,10 @@ func Test_DB_ListOffChainData(t *testing.T) { if tt.returnErr != nil { expected.WillReturnError(tt.returnErr) } else { - returnData := sqlmock.NewRows([]string{"key", "value", "batch_num"}) + returnData := sqlmock.NewRows([]string{"key", "value"}) for _, data := range tt.expected { - returnData = returnData.AddRow(data.Key.Hex(), common.Bytes2Hex(data.Value), data.BatchNum) + returnData = returnData.AddRow(data.Key.Hex(), common.Bytes2Hex(data.Value)) } expected.WillReturnRows(returnData) @@ -785,96 +777,12 @@ func Test_DB_CountOffchainData(t *testing.T) { } } -func Test_DB_DetectOffchainDataGaps(t *testing.T) { - t.Parallel() - - testTable := []struct { - name string - seed []types.OffChainData - gaps map[uint64]uint64 - returnErr error - }{ - { - name: "one gap found", - seed: []types.OffChainData{{ - Key: common.BytesToHash([]byte("key1")), - Value: []byte("value1"), - BatchNum: 1, - }, { - Key: common.BytesToHash([]byte("key2")), - Value: []byte("value2"), - BatchNum: 2, - }, { - Key: common.HexToHash("key4"), - Value: []byte("value4"), - BatchNum: 4, - }}, - gaps: map[uint64]uint64{ - 2: 4, - }, - }, - { - name: "error returned", - seed: []types.OffChainData{{ - Key: common.BytesToHash([]byte("key1")), - Value: []byte("value1"), - }}, - returnErr: errors.New("test error"), - }, - } - - for _, tt := range testTable { - tt := tt - - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - db, mock, err := sqlmock.New() - require.NoError(t, err) - - defer db.Close() - - constructorExpect(mock) - - wdb := sqlx.NewDb(db, "postgres") - dbPG, err := New(context.Background(), wdb) - require.NoError(t, err) - - // Seed data - seedOffchainData(t, dbPG, mock, tt.seed) - - expected := mock.ExpectQuery(regexp.QuoteMeta(selectOffchainDataGapsSQL)) - - if tt.returnErr != nil { - expected.WillReturnError(tt.returnErr) - } else { - rows := sqlmock.NewRows([]string{"current_batch_num", "next_batch_num"}) - for k, v := range tt.gaps { - rows.AddRow(k, v) - } - expected.WillReturnRows(rows) - } - - actual, err := dbPG.DetectOffchainDataGaps(context.Background()) - if tt.returnErr != nil { - require.ErrorIs(t, err, tt.returnErr) - } else { - require.NoError(t, err) - require.Equal(t, tt.gaps, actual) - } - - require.NoError(t, mock.ExpectationsWereMet()) - }) - } -} - func constructorExpect(mock sqlmock.Sqlmock) { mock.ExpectPrepare(regexp.QuoteMeta(storeLastProcessedBlockSQL)) mock.ExpectPrepare(regexp.QuoteMeta(getLastProcessedBlockSQL)) mock.ExpectPrepare(regexp.QuoteMeta(getUnresolvedBatchKeysSQL)) mock.ExpectPrepare(regexp.QuoteMeta(getOffchainDataSQL)) mock.ExpectPrepare(regexp.QuoteMeta(countOffchainDataSQL)) - mock.ExpectPrepare(regexp.QuoteMeta(selectOffchainDataGapsSQL)) } func seedOffchainData(t *testing.T, db DB, mock sqlmock.Sqlmock, ods []types.OffChainData) { diff --git a/mocks/db.generated.go b/mocks/db.generated.go index d21533c9..cf708af2 100644 --- a/mocks/db.generated.go +++ b/mocks/db.generated.go @@ -128,64 +128,6 @@ func (_c *DB_DeleteUnresolvedBatchKeys_Call) RunAndReturn(run func(context.Conte return _c } -// DetectOffchainDataGaps provides a mock function with given fields: ctx -func (_m *DB) DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for DetectOffchainDataGaps") - } - - var r0 map[uint64]uint64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (map[uint64]uint64, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) map[uint64]uint64); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[uint64]uint64) - } - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// DB_DetectOffchainDataGaps_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DetectOffchainDataGaps' -type DB_DetectOffchainDataGaps_Call struct { - *mock.Call -} - -// DetectOffchainDataGaps is a helper method to define mock.On call -// - ctx context.Context -func (_e *DB_Expecter) DetectOffchainDataGaps(ctx interface{}) *DB_DetectOffchainDataGaps_Call { - return &DB_DetectOffchainDataGaps_Call{Call: _e.mock.On("DetectOffchainDataGaps", ctx)} -} - -func (_c *DB_DetectOffchainDataGaps_Call) Run(run func(ctx context.Context)) *DB_DetectOffchainDataGaps_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *DB_DetectOffchainDataGaps_Call) Return(_a0 map[uint64]uint64, _a1 error) *DB_DetectOffchainDataGaps_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *DB_DetectOffchainDataGaps_Call) RunAndReturn(run func(context.Context) (map[uint64]uint64, error)) *DB_DetectOffchainDataGaps_Call { - _c.Call.Return(run) - return _c -} - // GetLastProcessedBlock provides a mock function with given fields: ctx, task func (_m *DB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) { ret := _m.Called(ctx, task) diff --git a/mocks/gaps_detector.generated.go b/mocks/gaps_detector.generated.go deleted file mode 100644 index a3f3f982..00000000 --- a/mocks/gaps_detector.generated.go +++ /dev/null @@ -1,79 +0,0 @@ -// Code generated by mockery v2.40.1. DO NOT EDIT. - -package mocks - -import mock "github.com/stretchr/testify/mock" - -// GapsDetector is an autogenerated mock type for the GapsDetector type -type GapsDetector struct { - mock.Mock -} - -type GapsDetector_Expecter struct { - mock *mock.Mock -} - -func (_m *GapsDetector) EXPECT() *GapsDetector_Expecter { - return &GapsDetector_Expecter{mock: &_m.Mock} -} - -// Gaps provides a mock function with given fields: -func (_m *GapsDetector) Gaps() map[uint64]uint64 { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for Gaps") - } - - var r0 map[uint64]uint64 - if rf, ok := ret.Get(0).(func() map[uint64]uint64); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[uint64]uint64) - } - } - - return r0 -} - -// GapsDetector_Gaps_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Gaps' -type GapsDetector_Gaps_Call struct { - *mock.Call -} - -// Gaps is a helper method to define mock.On call -func (_e *GapsDetector_Expecter) Gaps() *GapsDetector_Gaps_Call { - return &GapsDetector_Gaps_Call{Call: _e.mock.On("Gaps")} -} - -func (_c *GapsDetector_Gaps_Call) Run(run func()) *GapsDetector_Gaps_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *GapsDetector_Gaps_Call) Return(_a0 map[uint64]uint64) *GapsDetector_Gaps_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *GapsDetector_Gaps_Call) RunAndReturn(run func() map[uint64]uint64) *GapsDetector_Gaps_Call { - _c.Call.Return(run) - return _c -} - -// NewGapsDetector creates a new instance of GapsDetector. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewGapsDetector(t interface { - mock.TestingT - Cleanup(func()) -}) *GapsDetector { - mock := &GapsDetector{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/services/status/status.go b/services/status/status.go index 6b3d2a2a..664e5c7c 100644 --- a/services/status/status.go +++ b/services/status/status.go @@ -15,25 +15,17 @@ import ( // APISTATUS is the namespace of the status service const APISTATUS = "status" -// GapsDetector is an interface for detecting gaps in the offchain data -type GapsDetector interface { - // Gaps returns a map of gaps in the offchain data - Gaps() map[uint64]uint64 -} - // Endpoints contains implementations for the "status" RPC endpoints type Endpoints struct { - db db.DB - startTime time.Time - gapsDetector GapsDetector + db db.DB + startTime time.Time } // NewEndpoints returns Endpoints -func NewEndpoints(db db.DB, gapsDetector GapsDetector) *Endpoints { +func NewEndpoints(db db.DB) *Endpoints { return &Endpoints{ - db: db, - startTime: time.Now(), - gapsDetector: gapsDetector, + db: db, + startTime: time.Now(), } } @@ -53,10 +45,9 @@ func (s *Endpoints) GetStatus() (interface{}, rpc.Error) { } return types.DACStatus{ - Version: dataavailability.Version, - Uptime: uptime, - KeyCount: rowCount, - BackfillProgress: backfillProgress, - OffchainDataGapsExist: len(s.gapsDetector.Gaps()) > 0, + Version: dataavailability.Version, + Uptime: uptime, + KeyCount: rowCount, + BackfillProgress: backfillProgress, }, nil } diff --git a/services/status/status_test.go b/services/status/status_test.go index cc9b737f..d6ccbfd1 100644 --- a/services/status/status_test.go +++ b/services/status/status_test.go @@ -52,11 +52,7 @@ func TestEndpoints_GetStatus(t *testing.T) { dbMock.On("GetLastProcessedBlock", mock.Anything, mock.Anything). Return(tt.getLastProcessedBlock, tt.getLastProcessedBlockErr) - gapDetectorMock := mocks.NewGapsDetector(t) - - gapDetectorMock.On("Gaps").Return(map[uint64]uint64{1: 1}) - - statusEndpoints := NewEndpoints(dbMock, gapDetectorMock) + statusEndpoints := NewEndpoints(dbMock) actual, err := statusEndpoints.GetStatus() @@ -73,7 +69,6 @@ func TestEndpoints_GetStatus(t *testing.T) { require.Equal(t, "v0.1.0", dacStatus.Version) require.Equal(t, tt.countOffchainData, dacStatus.KeyCount) require.Equal(t, tt.getLastProcessedBlock, dacStatus.BackfillProgress) - require.True(t, dacStatus.OffchainDataGapsExist) } }) } diff --git a/services/sync/sync_test.go b/services/sync/sync_test.go index 0aa3ac9a..162df4ac 100644 --- a/services/sync/sync_test.go +++ b/services/sync/sync_test.go @@ -27,18 +27,16 @@ func TestEndpoints_GetOffChainData(t *testing.T) { name: "successfully got offchain data", hash: types.ArgHash{}, data: &types.OffChainData{ - Key: common.Hash{}, - Value: types.ArgBytes("offchaindata"), - BatchNum: 0, + Key: common.Hash{}, + Value: types.ArgBytes("offchaindata"), }, }, { name: "db returns error", hash: types.ArgHash{}, data: &types.OffChainData{ - Key: common.Hash{}, - Value: types.ArgBytes("offchaindata"), - BatchNum: 0, + Key: common.Hash{}, + Value: types.ArgBytes("offchaindata"), }, dbErr: errors.New("test error"), err: errors.New("failed to get the requested data"), @@ -85,18 +83,16 @@ func TestSyncEndpoints_ListOffChainData(t *testing.T) { name: "successfully got offchain data", hashes: generateRandomHashes(t, 1), data: []types.OffChainData{{ - Key: common.BytesToHash(nil), - Value: types.ArgBytes("offchaindata"), - BatchNum: 0, + Key: common.BytesToHash(nil), + Value: types.ArgBytes("offchaindata"), }}, }, { name: "db returns error", hashes: []types.ArgHash{}, data: []types.OffChainData{{ - Key: common.BytesToHash(nil), - Value: types.ArgBytes("offchaindata"), - BatchNum: 0, + Key: common.BytesToHash(nil), + Value: types.ArgBytes("offchaindata"), }}, dbErr: errors.New("test error"), err: errors.New("failed to list the requested data"), diff --git a/synchronizer/batches.go b/synchronizer/batches.go index 85e17388..d393de0f 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -1,7 +1,6 @@ package synchronizer import ( - "bytes" "context" "fmt" "math/rand" @@ -33,21 +32,19 @@ type SequencerTracker interface { // BatchSynchronizer watches for number events, checks if they are // "locally" stored, then retrieves and stores missing data type BatchSynchronizer struct { - client etherman.Etherman - stop chan struct{} - retry time.Duration - rpcTimeout time.Duration - blockBatchSize uint - self common.Address - db db.DB - committee *CommitteeMapSafe - syncLock sync.Mutex - reorgs <-chan BlockReorg - events chan *polygonvalidiumetrog.PolygonvalidiumetrogSequenceBatches - sequencer SequencerTracker - rpcClientFactory client.Factory - offchainDataGaps map[uint64]uint64 - offchainDataGapsLock sync.RWMutex + client etherman.Etherman + stop chan struct{} + retry time.Duration + rpcTimeout time.Duration + blockBatchSize uint + self common.Address + db db.DB + committee *CommitteeMapSafe + syncLock sync.Mutex + reorgs <-chan BlockReorg + events chan *polygonvalidiumetrog.PolygonvalidiumetrogSequenceBatches + sequencer SequencerTracker + rpcClientFactory client.Factory } // NewBatchSynchronizer creates the BatchSynchronizer @@ -76,7 +73,6 @@ func NewBatchSynchronizer( events: make(chan *polygonvalidiumetrog.PolygonvalidiumetrogSequenceBatches), sequencer: sequencer, rpcClientFactory: rpcClientFactory, - offchainDataGaps: make(map[uint64]uint64), } return synchronizer, synchronizer.resolveCommittee() } @@ -105,7 +101,6 @@ func (bs *BatchSynchronizer) Start(ctx context.Context) { go bs.processUnresolvedBatches(ctx) go bs.produceEvents(ctx) go bs.handleReorgs(ctx) - go bs.startOffchainDataGapsDetection(ctx) } // Stop stops the synchronizer @@ -113,17 +108,6 @@ func (bs *BatchSynchronizer) Stop() { close(bs.stop) } -// Gaps returns the offchain data gaps -func (bs *BatchSynchronizer) Gaps() map[uint64]uint64 { - bs.offchainDataGapsLock.RLock() - gaps := make(map[uint64]uint64, len(bs.offchainDataGaps)) - for key, value := range bs.offchainDataGaps { - gaps[key] = value - } - bs.offchainDataGapsLock.RUnlock() - return gaps -} - func (bs *BatchSynchronizer) handleReorgs(ctx context.Context) { log.Info("starting reorgs handler") for { @@ -322,12 +306,6 @@ func (bs *BatchSynchronizer) handleUnresolvedBatches(ctx context.Context) error continue } - // If the batch number is zero, update it - if extData.BatchNum == 0 { - extData.BatchNum = batchKey.Number - data = append(data, extData) - } - // Mark the batch as resolved resolved = append(resolved, batchKey) @@ -393,7 +371,7 @@ func (bs *BatchSynchronizer) resolve(ctx context.Context, batch types.BatchKey) continue // malformed committee, skip what is known to be wrong } - value, err := bs.resolveWithMember(ctx, batch, member) + value, err := bs.resolveWithMember(ctx, batch.Hash, member) if err != nil { log.Warnf("error resolving, continuing: %v", err) bs.committee.Delete(member.Addr) @@ -422,15 +400,14 @@ func (bs *BatchSynchronizer) trySequencer(ctx context.Context, batch types.Batch } return &types.OffChainData{ - Key: batch.Hash, - Value: seqBatch.BatchL2Data, - BatchNum: batch.Number, + Key: batch.Hash, + Value: seqBatch.BatchL2Data, } } func (bs *BatchSynchronizer) resolveWithMember( parentCtx context.Context, - batch types.BatchKey, + batch common.Hash, member etherman.DataCommitteeMember, ) (*types.OffChainData, error) { cm := bs.rpcClientFactory.New(member.URL) @@ -438,67 +415,20 @@ func (bs *BatchSynchronizer) resolveWithMember( ctx, cancel := context.WithTimeout(parentCtx, bs.rpcTimeout) defer cancel() - log.Debugf("trying member %v at %v for key %v", member.Addr.Hex(), member.URL, batch.Hash.Hex()) + log.Debugf("trying member %v at %v for key %v", member.Addr.Hex(), member.URL, batch.Hex()) - bytes, err := cm.GetOffChainData(ctx, batch.Hash) + bytes, err := cm.GetOffChainData(ctx, batch) if err != nil { return nil, err } expectKey := crypto.Keccak256Hash(bytes) - if batch.Hash.Cmp(expectKey) != 0 { + if batch.Cmp(expectKey) != 0 { return nil, fmt.Errorf("unexpected key gotten from member: %v. Key: %v", member.Addr.Hex(), expectKey.Hex()) } return &types.OffChainData{ - Key: batch.Hash, - Value: bytes, - BatchNum: batch.Number, + Key: batch, + Value: bytes, }, nil } - -func (bs *BatchSynchronizer) startOffchainDataGapsDetection(ctx context.Context) { - log.Info("starting handling unresolved batches") - for { - delay := time.NewTimer(time.Minute) - select { - case <-delay.C: - if err := bs.detectOffchainDataGaps(ctx); err != nil { - log.Error(err) - } - case <-bs.stop: - return - } - } -} - -// detectOffchainDataGaps detects offchain data gaps and reports them in logs and the service state. -func (bs *BatchSynchronizer) detectOffchainDataGaps(ctx context.Context) error { - // Detect offchain data gaps - gaps, err := detectOffchainDataGaps(ctx, bs.db) - if err != nil { - return fmt.Errorf("failed to detect offchain data gaps: %v", err) - } - - // No gaps found, all good - if len(gaps) == 0 { - return nil - } - - // Log the detected gaps and store the detected gaps in the service state - gapsRaw := new(bytes.Buffer) - bs.offchainDataGapsLock.Lock() - bs.offchainDataGaps = make(map[uint64]uint64, len(gaps)) - for key, value := range gaps { - bs.offchainDataGaps[key] = value - - if _, err = fmt.Fprintf(gapsRaw, "%d=>%d\n", key, value); err != nil { - log.Errorf("failed to write offchain data gaps: %v", err) - } - } - bs.offchainDataGapsLock.Unlock() - - log.Warnf("detected offchain data gaps (current batch number => expected batch number): %s", gapsRaw.String()) - - return nil -} diff --git a/synchronizer/batches_test.go b/synchronizer/batches_test.go index bbb905a9..5ef41ee3 100644 --- a/synchronizer/batches_test.go +++ b/synchronizer/batches_test.go @@ -558,9 +558,8 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { }, listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, listOffchainDataReturns: []interface{}{[]types.OffChainData{{ - Key: txHash, - Value: batchL2Data, - BatchNum: 10, + Key: txHash, + Value: batchL2Data, }}, nil}, deleteUnresolvedBatchKeysArgs: []interface{}{mock.Anything, []types.BatchKey{{ @@ -574,45 +573,6 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { }) }) - t.Run("Unresolved batch key already resolved with no batch number", func(t *testing.T) { - t.Parallel() - - testFn(t, testConfig{ - getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, - getUnresolvedBatchKeysReturns: []interface{}{ - []types.BatchKey{{ - Number: 10, - Hash: txHash, - }}, - nil, - }, - listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, - listOffchainDataReturns: []interface{}{[]types.OffChainData{{ - Key: txHash, - Value: batchL2Data, - BatchNum: 0, - }}, nil}, - storeOffChainDataArgs: []interface{}{mock.Anything, - []types.OffChainData{{ - Key: txHash, - Value: batchL2Data, - BatchNum: 10, - }}, - mock.Anything, - }, - storeOffChainDataReturns: []interface{}{nil}, - deleteUnresolvedBatchKeysArgs: []interface{}{mock.Anything, - []types.BatchKey{{ - Number: 10, - Hash: txHash, - }}, - mock.Anything, - }, - deleteUnresolvedBatchKeysReturns: []interface{}{nil}, - isErrorExpected: false, - }) - }) - t.Run("Unresolved batch key found", func(t *testing.T) { t.Parallel() @@ -629,11 +589,9 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { listOffchainDataReturns: []interface{}{nil, nil}, storeOffChainDataArgs: []interface{}{mock.Anything, []types.OffChainData{{ - Key: txHash, - Value: batchL2Data, - BatchNum: 10, + Key: txHash, + Value: batchL2Data, }}, - mock.Anything, }, storeOffChainDataReturns: []interface{}{nil}, deleteUnresolvedBatchKeysArgs: []interface{}{mock.Anything, @@ -804,74 +762,3 @@ func TestBatchSynchronizer_HandleReorgs(t *testing.T) { }) }) } - -func TestBatchSynchronizer_detectOffchainDataGaps(t *testing.T) { - t.Parallel() - - type testConfig struct { - // db mock - detectOffchainDataGapsArgs []interface{} - detectOffchainDataGapsReturns []interface{} - - expectedGaps map[uint64]uint64 - isErrorExpected bool - } - - testFn := func(t *testing.T, config testConfig) { - t.Helper() - - dbMock := mocks.NewDB(t) - - if config.detectOffchainDataGapsArgs != nil && config.detectOffchainDataGapsReturns != nil { - dbMock.On("DetectOffchainDataGaps", config.detectOffchainDataGapsArgs...).Return( - config.detectOffchainDataGapsReturns...).Once() - } - - batchSynronizer := &BatchSynchronizer{ - db: dbMock, - } - - err := batchSynronizer.detectOffchainDataGaps(context.Background()) - if config.isErrorExpected { - require.Error(t, err) - } else { - require.NoError(t, err) - require.Equal(t, config.expectedGaps, batchSynronizer.Gaps()) - } - - dbMock.AssertExpectations(t) - } - - t.Run("no gaps detected", func(t *testing.T) { - t.Parallel() - - testFn(t, testConfig{ - detectOffchainDataGapsArgs: []interface{}{mock.Anything}, - detectOffchainDataGapsReturns: []interface{}{map[uint64]uint64{}, nil}, - expectedGaps: map[uint64]uint64{}, - isErrorExpected: false, - }) - }) - - t.Run("one gap detected", func(t *testing.T) { - t.Parallel() - - testFn(t, testConfig{ - detectOffchainDataGapsArgs: []interface{}{mock.Anything}, - detectOffchainDataGapsReturns: []interface{}{map[uint64]uint64{1: 3}, nil}, - expectedGaps: map[uint64]uint64{1: 3}, - isErrorExpected: false, - }) - }) - - t.Run("failed to detect gaps", func(t *testing.T) { - t.Parallel() - - testFn(t, testConfig{ - detectOffchainDataGapsArgs: []interface{}{mock.Anything}, - detectOffchainDataGapsReturns: []interface{}{nil, errors.New("test error")}, - expectedGaps: map[uint64]uint64{}, - isErrorExpected: true, - }) - }) -} diff --git a/synchronizer/store.go b/synchronizer/store.go index 3ba53734..5e111f26 100644 --- a/synchronizer/store.go +++ b/synchronizer/store.go @@ -77,10 +77,3 @@ func storeOffchainData(parentCtx context.Context, db dbTypes.DB, data []types.Of return db.StoreOffChainData(ctx, data) } - -func detectOffchainDataGaps(parentCtx context.Context, db dbTypes.DB) (map[uint64]uint64, error) { - ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) - defer cancel() - - return db.DetectOffchainDataGaps(ctx) -} diff --git a/synchronizer/store_test.go b/synchronizer/store_test.go index c32091e1..d877b5cf 100644 --- a/synchronizer/store_test.go +++ b/synchronizer/store_test.go @@ -364,61 +364,3 @@ func Test_storeOffchainData(t *testing.T) { }) } } - -func Test_detectOffchainDataGaps(t *testing.T) { - t.Parallel() - - testError := errors.New("test error") - - tests := []struct { - name string - db func(t *testing.T) db.DB - gaps map[uint64]uint64 - wantErr bool - }{ - { - name: "DetectOffchainDataGaps returns error", - db: func(t *testing.T) db.DB { - t.Helper() - - mockDB := mocks.NewDB(t) - - mockDB.On("DetectOffchainDataGaps", mock.Anything).Return(nil, testError) - - return mockDB - }, - gaps: nil, - wantErr: true, - }, - { - name: "all good", - db: func(t *testing.T) db.DB { - t.Helper() - - mockDB := mocks.NewDB(t) - - mockDB.On("DetectOffchainDataGaps", mock.Anything).Return(map[uint64]uint64{1: 3}, nil) - - return mockDB - }, - gaps: map[uint64]uint64{1: 3}, - wantErr: false, - }, - } - for _, tt := range tests { - tt := tt - - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - testDB := tt.db(t) - - if gaps, err := detectOffchainDataGaps(context.Background(), testDB); tt.wantErr { - require.ErrorIs(t, err, testError) - } else { - require.NoError(t, err) - require.Equal(t, tt.gaps, gaps) - } - }) - } -} diff --git a/types/types.go b/types/types.go index 6d262d7f..1a3d7d06 100644 --- a/types/types.go +++ b/types/types.go @@ -17,11 +17,10 @@ const ( // DACStatus contains DAC status info type DACStatus struct { - Uptime string `json:"uptime"` - Version string `json:"version"` - KeyCount uint64 `json:"key_count"` - BackfillProgress uint64 `json:"backfill_progress"` - OffchainDataGapsExist bool `json:"offchain_data_gaps_exist"` + Uptime string `json:"uptime"` + Version string `json:"version"` + KeyCount uint64 `json:"key_count"` + BackfillProgress uint64 `json:"backfill_progress"` } // BatchKey is the pairing of batch number and data hash of a batch @@ -32,9 +31,8 @@ type BatchKey struct { // OffChainData represents some data that is not stored on chain and should be preserved type OffChainData struct { - Key common.Hash - Value []byte - BatchNum uint64 + Key common.Hash + Value []byte } // RemoveDuplicateOffChainData removes duplicate off chain data From a98b15e6935f9769f82a2e7871fb1d0b0810ca87 Mon Sep 17 00:00:00 2001 From: Rachit Sonthalia Date: Wed, 27 Nov 2024 17:51:46 +0530 Subject: [PATCH 02/19] add migration file and update event processing logic --- db/db.go | 42 ++++----- db/db_test.go | 36 ++++---- db/migrations/0006.sql | 23 +++++ mocks/db.generated.go | 174 +++++++++++++++++------------------ synchronizer/batches.go | 99 +++++++++----------- synchronizer/batches_test.go | 155 +++++++++++++------------------ synchronizer/store.go | 12 +-- synchronizer/store_test.go | 30 +++--- 8 files changed, 280 insertions(+), 291 deletions(-) create mode 100644 db/migrations/0006.sql diff --git a/db/db.go b/db/db.go index 196039c3..42c6fe0c 100644 --- a/db/db.go +++ b/db/db.go @@ -23,8 +23,8 @@ const ( // getLastProcessedBlockSQL is a query that returns the last processed block for a given task getLastProcessedBlockSQL = `SELECT block FROM data_node.sync_tasks WHERE task = $1;` - // getUnresolvedBatchKeysSQL is a query that returns the unresolved batch keys from the database - getUnresolvedBatchKeysSQL = `SELECT num, hash FROM data_node.unresolved_batches LIMIT $1;` + // getMissingBatchKeysSQL is a query that returns the missing batch keys from the database + getMissingBatchKeysSQL = `SELECT num, hash FROM data_node.missing_batches LIMIT $1;` // getOffchainDataSQL is a query that returns the offchain data for a given key getOffchainDataSQL = ` @@ -54,9 +54,9 @@ type DB interface { StoreLastProcessedBlock(ctx context.Context, block uint64, task string) error GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) - StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error - GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) - DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error + StoreMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error + GetMissingBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) + DeleteMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error) ListOffChainData(ctx context.Context, keys []common.Hash) ([]types.OffChainData, error) @@ -70,7 +70,7 @@ type pgDB struct { storeLastProcessedBlockStmt *sqlx.Stmt getLastProcessedBlockStmt *sqlx.Stmt - getUnresolvedBatchKeysStmt *sqlx.Stmt + getMissingBatchKeysStmt *sqlx.Stmt getOffChainDataStmt *sqlx.Stmt countOffChainDataStmt *sqlx.Stmt } @@ -87,9 +87,9 @@ func New(ctx context.Context, pg *sqlx.DB) (DB, error) { return nil, fmt.Errorf("failed to prepare the get last processed block statement: %w", err) } - getUnresolvedBatchKeysStmt, err := pg.PreparexContext(ctx, getUnresolvedBatchKeysSQL) + getMissingBatchKeysStmt, err := pg.PreparexContext(ctx, getMissingBatchKeysSQL) if err != nil { - return nil, fmt.Errorf("failed to prepare the get unresolved batch keys statement: %w", err) + return nil, fmt.Errorf("failed to prepare the get missing batch keys statement: %w", err) } getOffChainDataStmt, err := pg.PreparexContext(ctx, getOffchainDataSQL) @@ -106,7 +106,7 @@ func New(ctx context.Context, pg *sqlx.DB) (DB, error) { pg: pg, storeLastProcessedBlockStmt: storeLastProcessedBlockStmt, getLastProcessedBlockStmt: getLastProcessedBlockStmt, - getUnresolvedBatchKeysStmt: getUnresolvedBatchKeysStmt, + getMissingBatchKeysStmt: getMissingBatchKeysStmt, getOffChainDataStmt: getOffChainDataStmt, countOffChainDataStmt: countOffChainDataStmt, }, nil @@ -129,8 +129,8 @@ func (db *pgDB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, return lastBlock, nil } -// StoreUnresolvedBatchKeys stores unresolved batch keys in the database -func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error { +// StoreMissingBatchKeys stores missing batch keys in the database +func (db *pgDB) StoreMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error { if len(bks) == 0 { return nil } @@ -138,15 +138,15 @@ func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchK query, args := buildBatchKeysInsertQuery(bks) if _, err := db.pg.ExecContext(ctx, query, args...); err != nil { - return fmt.Errorf("failed to store unresolved batches: %w", err) + return fmt.Errorf("failed to store misisng batches: %w", err) } return nil } -// GetUnresolvedBatchKeys returns the unresolved batch keys from the database -func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) { - rows, err := db.getUnresolvedBatchKeysStmt.QueryxContext(ctx, limit) +// GetMissingBatchKeys returns the missing batch keys that is not yet present in offchain table +func (db *pgDB) GetMissingBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) { + rows, err := db.getMissingBatchKeysStmt.QueryxContext(ctx, limit) if err != nil { return nil, err } @@ -174,8 +174,8 @@ func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types return bks, nil } -// DeleteUnresolvedBatchKeys deletes the unresolved batch keys from the database -func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error { +// DeleteMissingBatchKeys deletes the missing batch keys from the missing_batch table in the db +func (db *pgDB) DeleteMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error { if len(bks) == 0 { return nil } @@ -191,11 +191,11 @@ func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.Batch } query := fmt.Sprintf(` - DELETE FROM data_node.unresolved_batches WHERE (num, hash) IN (%s); + DELETE FROM data_node.missing_batches WHERE (num, hash) IN (%s); `, strings.Join(values, ",")) if _, err := db.pg.ExecContext(ctx, query, args...); err != nil { - return fmt.Errorf("failed to delete unresolved batches: %w", err) + return fmt.Errorf("failed to delete missing batches: %w", err) } return nil @@ -293,7 +293,7 @@ func (db *pgDB) CountOffchainData(ctx context.Context) (uint64, error) { return count, nil } -// buildBatchKeysInsertQuery builds the query to insert unresolved batch keys +// buildBatchKeysInsertQuery builds the query to insert missing batch keys func buildBatchKeysInsertQuery(bks []types.BatchKey) (string, []interface{}) { const columnsAffected = 2 @@ -306,7 +306,7 @@ func buildBatchKeysInsertQuery(bks []types.BatchKey) (string, []interface{}) { } return fmt.Sprintf(` - INSERT INTO data_node.unresolved_batches (num, hash) + INSERT INTO data_node.missing_batches (num, hash) VALUES %s ON CONFLICT (num, hash) DO NOTHING; `, strings.Join(values, ",")), args diff --git a/db/db_test.go b/db/db_test.go index ca05f6d6..8356d5a4 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -159,7 +159,7 @@ func Test_DB_GetLastProcessedBlock(t *testing.T) { } } -func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) { +func Test_DB_StoreMissingBatchKeys(t *testing.T) { t.Parallel() testTable := []struct { @@ -177,7 +177,7 @@ func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) { Number: 1, Hash: common.BytesToHash([]byte("key1")), }}, - expectedQuery: `INSERT INTO data_node.unresolved_batches (num, hash) VALUES ($1, $2) ON CONFLICT (num, hash) DO NOTHING`, + expectedQuery: `INSERT INTO data_node.missing_batches (num, hash) VALUES ($1, $2) ON CONFLICT (num, hash) DO NOTHING`, }, { name: "several values inserted", @@ -188,7 +188,7 @@ func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) { Number: 2, Hash: common.BytesToHash([]byte("key2")), }}, - expectedQuery: `INSERT INTO data_node.unresolved_batches (num, hash) VALUES ($1, $2),($3, $4) ON CONFLICT (num, hash) DO NOTHING`, + expectedQuery: `INSERT INTO data_node.missing_batches (num, hash) VALUES ($1, $2),($3, $4) ON CONFLICT (num, hash) DO NOTHING`, }, { name: "error returned", @@ -196,7 +196,7 @@ func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) { Number: 1, Hash: common.BytesToHash([]byte("key1")), }}, - expectedQuery: `INSERT INTO data_node.unresolved_batches (num, hash) VALUES ($1, $2) ON CONFLICT (num, hash) DO NOTHING`, + expectedQuery: `INSERT INTO data_node.missing_batches (num, hash) VALUES ($1, $2) ON CONFLICT (num, hash) DO NOTHING`, returnErr: errors.New("test error"), }, } @@ -214,7 +214,7 @@ func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) { mock.ExpectPrepare(regexp.QuoteMeta(storeLastProcessedBlockSQL)) mock.ExpectPrepare(regexp.QuoteMeta(getLastProcessedBlockSQL)) - mock.ExpectPrepare(regexp.QuoteMeta(getUnresolvedBatchKeysSQL)) + mock.ExpectPrepare(regexp.QuoteMeta(getMissingBatchKeysSQL)) mock.ExpectPrepare(regexp.QuoteMeta(getOffchainDataSQL)) mock.ExpectPrepare(regexp.QuoteMeta(countOffchainDataSQL)) @@ -237,7 +237,7 @@ func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) { } } - err = dbPG.StoreUnresolvedBatchKeys(context.Background(), tt.bk) + err = dbPG.StoreMissingBatchKeys(context.Background(), tt.bk) if tt.returnErr != nil { require.ErrorIs(t, err, tt.returnErr) } else { @@ -249,7 +249,7 @@ func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) { } } -func Test_DB_GetUnresolvedBatchKeys(t *testing.T) { +func Test_DB_GetMissingBatchKeys(t *testing.T) { t.Parallel() testTable := []struct { @@ -292,10 +292,10 @@ func Test_DB_GetUnresolvedBatchKeys(t *testing.T) { require.NoError(t, err) // Seed data - seedUnresolvedBatchKeys(t, dbPG, mock, tt.bks) + seedMissingBatchKeys(t, dbPG, mock, tt.bks) var limit = uint(10) - expected := mock.ExpectQuery(`SELECT num, hash FROM data_node\.unresolved_batches LIMIT \$1\;`).WithArgs(limit) + expected := mock.ExpectQuery(`SELECT num, hash FROM data_node\.missing_batches LIMIT \$1\;`).WithArgs(limit) if tt.returnErr != nil { expected.WillReturnError(tt.returnErr) @@ -305,7 +305,7 @@ func Test_DB_GetUnresolvedBatchKeys(t *testing.T) { } } - data, err := dbPG.GetUnresolvedBatchKeys(context.Background(), limit) + data, err := dbPG.GetMissingBatchKeys(context.Background(), limit) if tt.returnErr != nil { require.ErrorIs(t, err, tt.returnErr) } else { @@ -318,7 +318,7 @@ func Test_DB_GetUnresolvedBatchKeys(t *testing.T) { } } -func Test_DB_DeleteUnresolvedBatchKeys(t *testing.T) { +func Test_DB_DeleteMissingBatchKeys(t *testing.T) { t.Parallel() testTable := []struct { @@ -333,7 +333,7 @@ func Test_DB_DeleteUnresolvedBatchKeys(t *testing.T) { Number: 1, Hash: common.BytesToHash([]byte("key1")), }}, - expectedQuery: `DELETE FROM data_node.unresolved_batches WHERE (num, hash) IN (($1, $2))`, + expectedQuery: `DELETE FROM data_node.missing_batches WHERE (num, hash) IN (($1, $2))`, }, { name: "multiple values deleted", @@ -344,7 +344,7 @@ func Test_DB_DeleteUnresolvedBatchKeys(t *testing.T) { Number: 2, Hash: common.BytesToHash([]byte("key2")), }}, - expectedQuery: `DELETE FROM data_node.unresolved_batches WHERE (num, hash) IN (($1, $2),($3, $4))`, + expectedQuery: `DELETE FROM data_node.missing_batches WHERE (num, hash) IN (($1, $2),($3, $4))`, }, { name: "error returned", @@ -352,7 +352,7 @@ func Test_DB_DeleteUnresolvedBatchKeys(t *testing.T) { Number: 1, Hash: common.BytesToHash([]byte("key1")), }}, - expectedQuery: `DELETE FROM data_node.unresolved_batches WHERE (num, hash) IN (($1, $2))`, + expectedQuery: `DELETE FROM data_node.missing_batches WHERE (num, hash) IN (($1, $2))`, returnErr: errors.New("test error"), }, } @@ -388,7 +388,7 @@ func Test_DB_DeleteUnresolvedBatchKeys(t *testing.T) { } } - err = dbPG.DeleteUnresolvedBatchKeys(context.Background(), tt.bks) + err = dbPG.DeleteMissingBatchKeys(context.Background(), tt.bks) if tt.returnErr != nil { require.ErrorIs(t, err, tt.returnErr) } else { @@ -780,7 +780,7 @@ func Test_DB_CountOffchainData(t *testing.T) { func constructorExpect(mock sqlmock.Sqlmock) { mock.ExpectPrepare(regexp.QuoteMeta(storeLastProcessedBlockSQL)) mock.ExpectPrepare(regexp.QuoteMeta(getLastProcessedBlockSQL)) - mock.ExpectPrepare(regexp.QuoteMeta(getUnresolvedBatchKeysSQL)) + mock.ExpectPrepare(regexp.QuoteMeta(getMissingBatchKeysSQL)) mock.ExpectPrepare(regexp.QuoteMeta(getOffchainDataSQL)) mock.ExpectPrepare(regexp.QuoteMeta(countOffchainDataSQL)) } @@ -806,7 +806,7 @@ func seedOffchainData(t *testing.T, db DB, mock sqlmock.Sqlmock, ods []types.Off require.NoError(t, err) } -func seedUnresolvedBatchKeys(t *testing.T, db DB, mock sqlmock.Sqlmock, bks []types.BatchKey) { +func seedMissingBatchKeys(t *testing.T, db DB, mock sqlmock.Sqlmock, bks []types.BatchKey) { t.Helper() if len(bks) == 0 { @@ -823,6 +823,6 @@ func seedUnresolvedBatchKeys(t *testing.T, db DB, mock sqlmock.Sqlmock, bks []ty mock.ExpectExec(regexp.QuoteMeta(query)).WithArgs(argValues...). WillReturnResult(sqlmock.NewResult(int64(len(bks)), int64(len(bks)))) - err := db.StoreUnresolvedBatchKeys(context.Background(), bks) + err := db.StoreMissingBatchKeys(context.Background(), bks) require.NoError(t, err) } diff --git a/db/migrations/0006.sql b/db/migrations/0006.sql new file mode 100644 index 00000000..57541e10 --- /dev/null +++ b/db/migrations/0006.sql @@ -0,0 +1,23 @@ +-- +migrate Down +-- Add the 'batch_num' column to 'offchain_data' table +ALTER TABLE data_node.offchain_data + ADD COLUMN IF NOT EXISTS batch_num BIGINT NOT NULL DEFAULT 0; + +-- Rename the 'missing_batches' table to 'unresolved_batches' +ALTER TABLE data_node.missing_batches RENAME TO data_node.unresolved_batches; + +-- Create an index for the 'batch_num' column for better performance +CREATE INDEX IF NOT EXISTS idx_batch_num ON data_node.offchain_data(batch_num); + +-- Reset the sync task for L1 to trigger resync +UPDATE data_node.sync_tasks SET block = 0 WHERE task = 'L1'; + +-- +migrate Up +-- Drop the 'batch_num' column from 'offchain_data' table +ALTER TABLE data_node.offchain_data DROP COLUMN batch_num; + +-- Rename the 'unresolved_batches' table back to 'missing_batches' +ALTER TABLE data_node.unresolved_batches RENAME TO data_node.missing_batches; + +-- Drop the index created on 'batch_num' +DROP INDEX IF EXISTS idx_batch_num; \ No newline at end of file diff --git a/mocks/db.generated.go b/mocks/db.generated.go index cf708af2..46a97c4a 100644 --- a/mocks/db.generated.go +++ b/mocks/db.generated.go @@ -81,12 +81,12 @@ func (_c *DB_CountOffchainData_Call) RunAndReturn(run func(context.Context) (uin return _c } -// DeleteUnresolvedBatchKeys provides a mock function with given fields: ctx, bks -func (_m *DB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error { +// DeleteMissingBatchKeys provides a mock function with given fields: ctx, bks +func (_m *DB) DeleteMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error { ret := _m.Called(ctx, bks) if len(ret) == 0 { - panic("no return value specified for DeleteUnresolvedBatchKeys") + panic("no return value specified for DeleteMissingBatchKeys") } var r0 error @@ -99,31 +99,31 @@ func (_m *DB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKe return r0 } -// DB_DeleteUnresolvedBatchKeys_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteUnresolvedBatchKeys' -type DB_DeleteUnresolvedBatchKeys_Call struct { +// DB_DeleteMissingBatchKeys_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteMissingBatchKeys' +type DB_DeleteMissingBatchKeys_Call struct { *mock.Call } -// DeleteUnresolvedBatchKeys is a helper method to define mock.On call +// DeleteMissingBatchKeys is a helper method to define mock.On call // - ctx context.Context // - bks []types.BatchKey -func (_e *DB_Expecter) DeleteUnresolvedBatchKeys(ctx interface{}, bks interface{}) *DB_DeleteUnresolvedBatchKeys_Call { - return &DB_DeleteUnresolvedBatchKeys_Call{Call: _e.mock.On("DeleteUnresolvedBatchKeys", ctx, bks)} +func (_e *DB_Expecter) DeleteMissingBatchKeys(ctx interface{}, bks interface{}) *DB_DeleteMissingBatchKeys_Call { + return &DB_DeleteMissingBatchKeys_Call{Call: _e.mock.On("DeleteMissingBatchKeys", ctx, bks)} } -func (_c *DB_DeleteUnresolvedBatchKeys_Call) Run(run func(ctx context.Context, bks []types.BatchKey)) *DB_DeleteUnresolvedBatchKeys_Call { +func (_c *DB_DeleteMissingBatchKeys_Call) Run(run func(ctx context.Context, bks []types.BatchKey)) *DB_DeleteMissingBatchKeys_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].([]types.BatchKey)) }) return _c } -func (_c *DB_DeleteUnresolvedBatchKeys_Call) Return(_a0 error) *DB_DeleteUnresolvedBatchKeys_Call { +func (_c *DB_DeleteMissingBatchKeys_Call) Return(_a0 error) *DB_DeleteMissingBatchKeys_Call { _c.Call.Return(_a0) return _c } -func (_c *DB_DeleteUnresolvedBatchKeys_Call) RunAndReturn(run func(context.Context, []types.BatchKey) error) *DB_DeleteUnresolvedBatchKeys_Call { +func (_c *DB_DeleteMissingBatchKeys_Call) RunAndReturn(run func(context.Context, []types.BatchKey) error) *DB_DeleteMissingBatchKeys_Call { _c.Call.Return(run) return _c } @@ -185,29 +185,29 @@ func (_c *DB_GetLastProcessedBlock_Call) RunAndReturn(run func(context.Context, return _c } -// GetOffChainData provides a mock function with given fields: ctx, key -func (_m *DB) GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error) { - ret := _m.Called(ctx, key) +// GetMissingBatchKeys provides a mock function with given fields: ctx, limit +func (_m *DB) GetMissingBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) { + ret := _m.Called(ctx, limit) if len(ret) == 0 { - panic("no return value specified for GetOffChainData") + panic("no return value specified for GetMissingBatchKeys") } - var r0 *types.OffChainData + var r0 []types.BatchKey var r1 error - if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*types.OffChainData, error)); ok { - return rf(ctx, key) + if rf, ok := ret.Get(0).(func(context.Context, uint) ([]types.BatchKey, error)); ok { + return rf(ctx, limit) } - if rf, ok := ret.Get(0).(func(context.Context, common.Hash) *types.OffChainData); ok { - r0 = rf(ctx, key) + if rf, ok := ret.Get(0).(func(context.Context, uint) []types.BatchKey); ok { + r0 = rf(ctx, limit) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.OffChainData) + r0 = ret.Get(0).([]types.BatchKey) } } - if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { - r1 = rf(ctx, key) + if rf, ok := ret.Get(1).(func(context.Context, uint) error); ok { + r1 = rf(ctx, limit) } else { r1 = ret.Error(1) } @@ -215,58 +215,58 @@ func (_m *DB) GetOffChainData(ctx context.Context, key common.Hash) (*types.OffC return r0, r1 } -// DB_GetOffChainData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetOffChainData' -type DB_GetOffChainData_Call struct { +// DB_GetMissingBatchKeys_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMissingBatchKeys' +type DB_GetMissingBatchKeys_Call struct { *mock.Call } -// GetOffChainData is a helper method to define mock.On call +// GetMissingBatchKeys is a helper method to define mock.On call // - ctx context.Context -// - key common.Hash -func (_e *DB_Expecter) GetOffChainData(ctx interface{}, key interface{}) *DB_GetOffChainData_Call { - return &DB_GetOffChainData_Call{Call: _e.mock.On("GetOffChainData", ctx, key)} +// - limit uint +func (_e *DB_Expecter) GetMissingBatchKeys(ctx interface{}, limit interface{}) *DB_GetMissingBatchKeys_Call { + return &DB_GetMissingBatchKeys_Call{Call: _e.mock.On("GetMissingBatchKeys", ctx, limit)} } -func (_c *DB_GetOffChainData_Call) Run(run func(ctx context.Context, key common.Hash)) *DB_GetOffChainData_Call { +func (_c *DB_GetMissingBatchKeys_Call) Run(run func(ctx context.Context, limit uint)) *DB_GetMissingBatchKeys_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(common.Hash)) + run(args[0].(context.Context), args[1].(uint)) }) return _c } -func (_c *DB_GetOffChainData_Call) Return(_a0 *types.OffChainData, _a1 error) *DB_GetOffChainData_Call { +func (_c *DB_GetMissingBatchKeys_Call) Return(_a0 []types.BatchKey, _a1 error) *DB_GetMissingBatchKeys_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *DB_GetOffChainData_Call) RunAndReturn(run func(context.Context, common.Hash) (*types.OffChainData, error)) *DB_GetOffChainData_Call { +func (_c *DB_GetMissingBatchKeys_Call) RunAndReturn(run func(context.Context, uint) ([]types.BatchKey, error)) *DB_GetMissingBatchKeys_Call { _c.Call.Return(run) return _c } -// GetUnresolvedBatchKeys provides a mock function with given fields: ctx, limit -func (_m *DB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) { - ret := _m.Called(ctx, limit) +// GetOffChainData provides a mock function with given fields: ctx, key +func (_m *DB) GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error) { + ret := _m.Called(ctx, key) if len(ret) == 0 { - panic("no return value specified for GetUnresolvedBatchKeys") + panic("no return value specified for GetOffChainData") } - var r0 []types.BatchKey + var r0 *types.OffChainData var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint) ([]types.BatchKey, error)); ok { - return rf(ctx, limit) + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*types.OffChainData, error)); ok { + return rf(ctx, key) } - if rf, ok := ret.Get(0).(func(context.Context, uint) []types.BatchKey); ok { - r0 = rf(ctx, limit) + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) *types.OffChainData); ok { + r0 = rf(ctx, key) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]types.BatchKey) + r0 = ret.Get(0).(*types.OffChainData) } } - if rf, ok := ret.Get(1).(func(context.Context, uint) error); ok { - r1 = rf(ctx, limit) + if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { + r1 = rf(ctx, key) } else { r1 = ret.Error(1) } @@ -274,31 +274,31 @@ func (_m *DB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.B return r0, r1 } -// DB_GetUnresolvedBatchKeys_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetUnresolvedBatchKeys' -type DB_GetUnresolvedBatchKeys_Call struct { +// DB_GetOffChainData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetOffChainData' +type DB_GetOffChainData_Call struct { *mock.Call } -// GetUnresolvedBatchKeys is a helper method to define mock.On call +// GetOffChainData is a helper method to define mock.On call // - ctx context.Context -// - limit uint -func (_e *DB_Expecter) GetUnresolvedBatchKeys(ctx interface{}, limit interface{}) *DB_GetUnresolvedBatchKeys_Call { - return &DB_GetUnresolvedBatchKeys_Call{Call: _e.mock.On("GetUnresolvedBatchKeys", ctx, limit)} +// - key common.Hash +func (_e *DB_Expecter) GetOffChainData(ctx interface{}, key interface{}) *DB_GetOffChainData_Call { + return &DB_GetOffChainData_Call{Call: _e.mock.On("GetOffChainData", ctx, key)} } -func (_c *DB_GetUnresolvedBatchKeys_Call) Run(run func(ctx context.Context, limit uint)) *DB_GetUnresolvedBatchKeys_Call { +func (_c *DB_GetOffChainData_Call) Run(run func(ctx context.Context, key common.Hash)) *DB_GetOffChainData_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint)) + run(args[0].(context.Context), args[1].(common.Hash)) }) return _c } -func (_c *DB_GetUnresolvedBatchKeys_Call) Return(_a0 []types.BatchKey, _a1 error) *DB_GetUnresolvedBatchKeys_Call { +func (_c *DB_GetOffChainData_Call) Return(_a0 *types.OffChainData, _a1 error) *DB_GetOffChainData_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *DB_GetUnresolvedBatchKeys_Call) RunAndReturn(run func(context.Context, uint) ([]types.BatchKey, error)) *DB_GetUnresolvedBatchKeys_Call { +func (_c *DB_GetOffChainData_Call) RunAndReturn(run func(context.Context, common.Hash) (*types.OffChainData, error)) *DB_GetOffChainData_Call { _c.Call.Return(run) return _c } @@ -410,17 +410,17 @@ func (_c *DB_StoreLastProcessedBlock_Call) RunAndReturn(run func(context.Context return _c } -// StoreOffChainData provides a mock function with given fields: ctx, od -func (_m *DB) StoreOffChainData(ctx context.Context, od []types.OffChainData) error { - ret := _m.Called(ctx, od) +// StoreMissingBatchKeys provides a mock function with given fields: ctx, bks +func (_m *DB) StoreMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error { + ret := _m.Called(ctx, bks) if len(ret) == 0 { - panic("no return value specified for StoreOffChainData") + panic("no return value specified for StoreMissingBatchKeys") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []types.OffChainData) error); ok { - r0 = rf(ctx, od) + if rf, ok := ret.Get(0).(func(context.Context, []types.BatchKey) error); ok { + r0 = rf(ctx, bks) } else { r0 = ret.Error(0) } @@ -428,46 +428,46 @@ func (_m *DB) StoreOffChainData(ctx context.Context, od []types.OffChainData) er return r0 } -// DB_StoreOffChainData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StoreOffChainData' -type DB_StoreOffChainData_Call struct { +// DB_StoreMissingBatchKeys_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StoreMissingBatchKeys' +type DB_StoreMissingBatchKeys_Call struct { *mock.Call } -// StoreOffChainData is a helper method to define mock.On call +// StoreMissingBatchKeys is a helper method to define mock.On call // - ctx context.Context -// - od []types.OffChainData -func (_e *DB_Expecter) StoreOffChainData(ctx interface{}, od interface{}) *DB_StoreOffChainData_Call { - return &DB_StoreOffChainData_Call{Call: _e.mock.On("StoreOffChainData", ctx, od)} +// - bks []types.BatchKey +func (_e *DB_Expecter) StoreMissingBatchKeys(ctx interface{}, bks interface{}) *DB_StoreMissingBatchKeys_Call { + return &DB_StoreMissingBatchKeys_Call{Call: _e.mock.On("StoreMissingBatchKeys", ctx, bks)} } -func (_c *DB_StoreOffChainData_Call) Run(run func(ctx context.Context, od []types.OffChainData)) *DB_StoreOffChainData_Call { +func (_c *DB_StoreMissingBatchKeys_Call) Run(run func(ctx context.Context, bks []types.BatchKey)) *DB_StoreMissingBatchKeys_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]types.OffChainData)) + run(args[0].(context.Context), args[1].([]types.BatchKey)) }) return _c } -func (_c *DB_StoreOffChainData_Call) Return(_a0 error) *DB_StoreOffChainData_Call { +func (_c *DB_StoreMissingBatchKeys_Call) Return(_a0 error) *DB_StoreMissingBatchKeys_Call { _c.Call.Return(_a0) return _c } -func (_c *DB_StoreOffChainData_Call) RunAndReturn(run func(context.Context, []types.OffChainData) error) *DB_StoreOffChainData_Call { +func (_c *DB_StoreMissingBatchKeys_Call) RunAndReturn(run func(context.Context, []types.BatchKey) error) *DB_StoreMissingBatchKeys_Call { _c.Call.Return(run) return _c } -// StoreUnresolvedBatchKeys provides a mock function with given fields: ctx, bks -func (_m *DB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error { - ret := _m.Called(ctx, bks) +// StoreOffChainData provides a mock function with given fields: ctx, od +func (_m *DB) StoreOffChainData(ctx context.Context, od []types.OffChainData) error { + ret := _m.Called(ctx, od) if len(ret) == 0 { - panic("no return value specified for StoreUnresolvedBatchKeys") + panic("no return value specified for StoreOffChainData") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []types.BatchKey) error); ok { - r0 = rf(ctx, bks) + if rf, ok := ret.Get(0).(func(context.Context, []types.OffChainData) error); ok { + r0 = rf(ctx, od) } else { r0 = ret.Error(0) } @@ -475,31 +475,31 @@ func (_m *DB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey return r0 } -// DB_StoreUnresolvedBatchKeys_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StoreUnresolvedBatchKeys' -type DB_StoreUnresolvedBatchKeys_Call struct { +// DB_StoreOffChainData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StoreOffChainData' +type DB_StoreOffChainData_Call struct { *mock.Call } -// StoreUnresolvedBatchKeys is a helper method to define mock.On call +// StoreOffChainData is a helper method to define mock.On call // - ctx context.Context -// - bks []types.BatchKey -func (_e *DB_Expecter) StoreUnresolvedBatchKeys(ctx interface{}, bks interface{}) *DB_StoreUnresolvedBatchKeys_Call { - return &DB_StoreUnresolvedBatchKeys_Call{Call: _e.mock.On("StoreUnresolvedBatchKeys", ctx, bks)} +// - od []types.OffChainData +func (_e *DB_Expecter) StoreOffChainData(ctx interface{}, od interface{}) *DB_StoreOffChainData_Call { + return &DB_StoreOffChainData_Call{Call: _e.mock.On("StoreOffChainData", ctx, od)} } -func (_c *DB_StoreUnresolvedBatchKeys_Call) Run(run func(ctx context.Context, bks []types.BatchKey)) *DB_StoreUnresolvedBatchKeys_Call { +func (_c *DB_StoreOffChainData_Call) Run(run func(ctx context.Context, od []types.OffChainData)) *DB_StoreOffChainData_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]types.BatchKey)) + run(args[0].(context.Context), args[1].([]types.OffChainData)) }) return _c } -func (_c *DB_StoreUnresolvedBatchKeys_Call) Return(_a0 error) *DB_StoreUnresolvedBatchKeys_Call { +func (_c *DB_StoreOffChainData_Call) Return(_a0 error) *DB_StoreOffChainData_Call { _c.Call.Return(_a0) return _c } -func (_c *DB_StoreUnresolvedBatchKeys_Call) RunAndReturn(run func(context.Context, []types.BatchKey) error) *DB_StoreUnresolvedBatchKeys_Call { +func (_c *DB_StoreOffChainData_Call) RunAndReturn(run func(context.Context, []types.OffChainData) error) *DB_StoreOffChainData_Call { _c.Call.Return(run) return _c } diff --git a/synchronizer/batches.go b/synchronizer/batches.go index d393de0f..e66552d3 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -98,7 +98,7 @@ func (bs *BatchSynchronizer) resolveCommittee() error { // Start starts the synchronizer func (bs *BatchSynchronizer) Start(ctx context.Context) { log.Infof("starting batch synchronizer, DAC addr: %v", bs.self) - go bs.processUnresolvedBatches(ctx) + go bs.processMissingBatches(ctx) go bs.produceEvents(ctx) go bs.handleReorgs(ctx) } @@ -247,17 +247,49 @@ func (bs *BatchSynchronizer) handleEvent( }) } - // Store batch keys. Already handled batch keys are going to be ignored based on the DB logic. - return storeUnresolvedBatchKeys(ctx, bs.db, batchKeys) + // Store batch keys in missing_batches table that are not already present offchain_data table + return bs.findMissingBatches(ctx, batchKeys) } -func (bs *BatchSynchronizer) processUnresolvedBatches(ctx context.Context) { - log.Info("starting handling unresolved batches") +func (bs *BatchSynchronizer) findMissingBatches(ctx context.Context, batchKeys []types.BatchKey) error { + keys := make([]common.Hash, len(batchKeys)) + for i, key := range batchKeys { + keys[i] = key.Hash + } + + // Get the existing offchain data by the given list of keys + existingOffchainData, err := listOffchainData(ctx, bs.db, keys) + if err != nil { + return fmt.Errorf("failed to list offchain data: %v", err) + } + + hashToKeys := make(map[common.Hash]struct{}) + for _, extData := range existingOffchainData { + hashToKeys[extData.Key] = struct{}{} + } + + missingData := make([]types.BatchKey, 0) + for _, batchKey := range batchKeys { + _, ok := hashToKeys[batchKey.Hash] + if !ok { + missingData = append(missingData, batchKey) + } + } + + if len(missingData) > 0 { + return storeMissingBatchKeys(ctx, bs.db, missingData) + } + + return nil +} + +func (bs *BatchSynchronizer) processMissingBatches(ctx context.Context) { + log.Info("starting handling missing batches") for { delay := time.NewTimer(bs.retry) select { case <-delay.C: - if err := bs.handleUnresolvedBatches(ctx); err != nil { + if err := bs.handleMissingBatches(ctx); err != nil { log.Error(err) } case <-bs.stop: @@ -266,75 +298,34 @@ func (bs *BatchSynchronizer) processUnresolvedBatches(ctx context.Context) { } } -// handleUnresolvedBatches handles unresolved batches that were collected by the event consumer -func (bs *BatchSynchronizer) handleUnresolvedBatches(ctx context.Context) error { - // Get unresolved batches - batchKeys, err := getUnresolvedBatchKeys(ctx, bs.db) +// handleMissingBatches handles missing batches that were collected by the event consumer +func (bs *BatchSynchronizer) handleMissingBatches(ctx context.Context) error { + // Get missing batches + batchKeys, err := getMissingBatchKeys(ctx, bs.db) if err != nil { - return fmt.Errorf("failed to get unresolved batch keys: %v", err) + return fmt.Errorf("failed to get missing batch keys: %v", err) } if len(batchKeys) == 0 { return nil } - // Collect list of keys - keys := make([]common.Hash, len(batchKeys)) - hashToKeys := make(map[common.Hash]types.BatchKey) - for i, key := range batchKeys { - keys[i] = key.Hash - hashToKeys[key.Hash] = key - } - - // Get the existing offchain data by the given list of keys - existingOffchainData, err := listOffchainData(ctx, bs.db, keys) - if err != nil { - return fmt.Errorf("failed to list offchain data: %v", err) - } - - // Resolve the unresolved data data := make([]types.OffChainData, 0) - resolved := make([]types.BatchKey, 0) - - // Go over existing keys and mark them as resolved if they exist. - // Update the batch number if it is zero. - for _, extData := range existingOffchainData { - batchKey, ok := hashToKeys[extData.Key] - if !ok { - // This should not happen, but log it just in case - log.Errorf("unexpected key %s in the offchain data", extData.Key.Hex()) - continue - } - - // Mark the batch as resolved - resolved = append(resolved, batchKey) - - // Remove the key from the map - delete(hashToKeys, extData.Key) - } - - // Resolve the remaining unresolved data - for _, key := range hashToKeys { + for _, key := range batchKeys { value, err := bs.resolve(ctx, key) if err != nil { log.Errorf("failed to resolve batch %s: %v", key.Hash.Hex(), err) continue } - - resolved = append(resolved, key) data = append(data, *value) } - // Store data of the batches to the DB if len(data) > 0 { if err = storeOffchainData(ctx, bs.db, data); err != nil { return fmt.Errorf("failed to store offchain data: %v", err) } - } - // Mark batches as resolved - if len(resolved) > 0 { - if err = deleteUnresolvedBatchKeys(ctx, bs.db, resolved); err != nil { + if err = deleteMissingBatchKeys(ctx, bs.db, batchKeys); err != nil { return fmt.Errorf("failed to delete successfully resolved batch keys: %v", err) } } diff --git a/synchronizer/batches_test.go b/synchronizer/batches_test.go index 5ef41ee3..7fd26e2c 100644 --- a/synchronizer/batches_test.go +++ b/synchronizer/batches_test.go @@ -272,8 +272,10 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { getTxArgs []interface{} getTxReturns []interface{} // db mock - storeUnresolvedBatchKeysArgs []interface{} - storeUnresolvedBatchKeysReturns []interface{} + listOffchainDataArgs []interface{} + listOffchainDataReturns []interface{} + storeMissingBatchKeysArgs []interface{} + storeMissingBatchKeysReturns []interface{} isErrorExpected bool } @@ -324,9 +326,14 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { config.getTxReturns...).Once() } - if config.storeUnresolvedBatchKeysArgs != nil && config.storeUnresolvedBatchKeysReturns != nil { - dbMock.On("StoreUnresolvedBatchKeys", config.storeUnresolvedBatchKeysArgs...).Return( - config.storeUnresolvedBatchKeysReturns...).Once() + if config.listOffchainDataArgs != nil && config.listOffchainDataReturns != nil { + dbMock.On("ListOffChainData", config.listOffchainDataArgs...).Return( + config.listOffchainDataReturns...).Once() + } + + if config.storeMissingBatchKeysArgs != nil && config.storeMissingBatchKeysReturns != nil { + dbMock.On("StoreMissingBatchKeys", config.storeMissingBatchKeysArgs...).Return( + config.storeMissingBatchKeysReturns...).Once() } batchSynronizer := &BatchSynchronizer{ @@ -397,9 +404,11 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { }) testFn(t, testConfig{ - getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, - getTxReturns: []interface{}{localTx, true, nil}, - storeUnresolvedBatchKeysArgs: []interface{}{ + getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, + getTxReturns: []interface{}{localTx, true, nil}, + listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, + listOffchainDataReturns: []interface{}{nil, nil}, + storeMissingBatchKeysArgs: []interface{}{ mock.Anything, []types.BatchKey{{ Number: 10, @@ -407,8 +416,8 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { }}, mock.Anything, }, - storeUnresolvedBatchKeysReturns: []interface{}{nil}, - isErrorExpected: false, + storeMissingBatchKeysReturns: []interface{}{nil}, + isErrorExpected: false, }) }) @@ -416,9 +425,11 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { t.Parallel() testFn(t, testConfig{ - getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, - getTxReturns: []interface{}{tx, true, nil}, - storeUnresolvedBatchKeysArgs: []interface{}{ + getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, + getTxReturns: []interface{}{tx, true, nil}, + listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, + listOffchainDataReturns: []interface{}{nil, nil}, + storeMissingBatchKeysArgs: []interface{}{ mock.Anything, []types.BatchKey{{ Number: 10, @@ -426,8 +437,8 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { }}, mock.Anything, }, - storeUnresolvedBatchKeysReturns: []interface{}{nil}, - isErrorExpected: false, + storeMissingBatchKeysReturns: []interface{}{nil}, + isErrorExpected: false, }) }) @@ -435,8 +446,10 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { t.Parallel() testFn(t, testConfig{ - isErrorExpected: true, - storeUnresolvedBatchKeysArgs: []interface{}{ + isErrorExpected: true, + listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, + listOffchainDataReturns: []interface{}{nil, nil}, + storeMissingBatchKeysArgs: []interface{}{ mock.Anything, []types.BatchKey{{ Number: 10, @@ -444,26 +457,24 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { }}, mock.Anything, }, - storeUnresolvedBatchKeysReturns: []interface{}{errors.New("error")}, - getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, - getTxReturns: []interface{}{tx, true, nil}, + storeMissingBatchKeysReturns: []interface{}{errors.New("error")}, + getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, + getTxReturns: []interface{}{tx, true, nil}, }) }) } -func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { +func TestBatchSynchronizer_HandleMissingBatches(t *testing.T) { t.Parallel() type testConfig struct { // db mock - getUnresolvedBatchKeysArgs []interface{} - getUnresolvedBatchKeysReturns []interface{} - listOffchainDataArgs []interface{} - listOffchainDataReturns []interface{} - storeOffChainDataArgs []interface{} - storeOffChainDataReturns []interface{} - deleteUnresolvedBatchKeysArgs []interface{} - deleteUnresolvedBatchKeysReturns []interface{} + getMissingBatchKeysArgs []interface{} + getMissingBatchKeysReturns []interface{} + storeOffChainDataArgs []interface{} + storeOffChainDataReturns []interface{} + deleteMissingBatchKeysArgs []interface{} + deleteMissingBatchKeysReturns []interface{} // sequencer mocks getSequenceBatchArgs []interface{} getSequenceBatchReturns []interface{} @@ -481,14 +492,9 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { ethermanMock := mocks.NewEtherman(t) sequencerMock := mocks.NewSequencerTracker(t) - if config.getUnresolvedBatchKeysArgs != nil && config.getUnresolvedBatchKeysReturns != nil { - dbMock.On("GetUnresolvedBatchKeys", config.getUnresolvedBatchKeysArgs...).Return( - config.getUnresolvedBatchKeysReturns...).Once() - } - - if config.listOffchainDataArgs != nil && config.listOffchainDataReturns != nil { - dbMock.On("ListOffChainData", config.listOffchainDataArgs...).Return( - config.listOffchainDataReturns...).Once() + if config.getMissingBatchKeysArgs != nil && config.getMissingBatchKeysReturns != nil { + dbMock.On("GetMissingBatchKeys", config.getMissingBatchKeysArgs...).Return( + config.getMissingBatchKeysReturns...).Once() } if config.storeOffChainDataArgs != nil && config.storeOffChainDataReturns != nil { @@ -496,9 +502,9 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { config.storeOffChainDataReturns...).Once() } - if config.deleteUnresolvedBatchKeysArgs != nil && config.deleteUnresolvedBatchKeysReturns != nil { - dbMock.On("DeleteUnresolvedBatchKeys", config.deleteUnresolvedBatchKeysArgs...).Return( - config.deleteUnresolvedBatchKeysReturns...).Once() + if config.deleteMissingBatchKeysArgs != nil && config.deleteMissingBatchKeysReturns != nil { + dbMock.On("DeleteMissingBatchKeys", config.deleteMissingBatchKeysArgs...).Return( + config.deleteMissingBatchKeysReturns...).Once() } if config.getSequenceBatchArgs != nil && config.getSequenceBatchReturns != nil { @@ -512,7 +518,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { sequencer: sequencerMock, } - err := batchSynronizer.handleUnresolvedBatches(context.Background()) + err := batchSynronizer.handleMissingBatches(context.Background()) if config.isErrorExpected { require.Error(t, err) } else { @@ -524,69 +530,38 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { sequencerMock.AssertExpectations(t) } - t.Run("Could not get unresolved batch keys", func(t *testing.T) { + t.Run("Could not get missing batch keys", func(t *testing.T) { t.Parallel() testFn(t, testConfig{ - getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, - getUnresolvedBatchKeysReturns: []interface{}{nil, errors.New("error")}, - isErrorExpected: true, + getMissingBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, + getMissingBatchKeysReturns: []interface{}{nil, errors.New("error")}, + isErrorExpected: true, }) }) - t.Run("No unresolved batch keys found", func(t *testing.T) { + t.Run("No missing batch keys found", func(t *testing.T) { t.Parallel() testFn(t, testConfig{ - getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, - getUnresolvedBatchKeysReturns: []interface{}{nil, nil}, - isErrorExpected: false, + getMissingBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, + getMissingBatchKeysReturns: []interface{}{nil, nil}, + isErrorExpected: false, }) }) - t.Run("Unresolved batch key already resolved", func(t *testing.T) { + t.Run("Missing batch key found", func(t *testing.T) { t.Parallel() testFn(t, testConfig{ - getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, - getUnresolvedBatchKeysReturns: []interface{}{ + getMissingBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, + getMissingBatchKeysReturns: []interface{}{ []types.BatchKey{{ Number: 10, Hash: txHash, }}, nil, }, - listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, - listOffchainDataReturns: []interface{}{[]types.OffChainData{{ - Key: txHash, - Value: batchL2Data, - }}, nil}, - deleteUnresolvedBatchKeysArgs: []interface{}{mock.Anything, - []types.BatchKey{{ - Number: 10, - Hash: txHash, - }}, - mock.Anything, - }, - deleteUnresolvedBatchKeysReturns: []interface{}{nil}, - isErrorExpected: false, - }) - }) - - t.Run("Unresolved batch key found", func(t *testing.T) { - t.Parallel() - - testFn(t, testConfig{ - getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, - getUnresolvedBatchKeysReturns: []interface{}{ - []types.BatchKey{{ - Number: 10, - Hash: txHash, - }}, - nil, - }, - listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, - listOffchainDataReturns: []interface{}{nil, nil}, storeOffChainDataArgs: []interface{}{mock.Anything, []types.OffChainData{{ Key: txHash, @@ -594,15 +569,15 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { }}, }, storeOffChainDataReturns: []interface{}{nil}, - deleteUnresolvedBatchKeysArgs: []interface{}{mock.Anything, + deleteMissingBatchKeysArgs: []interface{}{mock.Anything, []types.BatchKey{{ Number: 10, Hash: txHash, }}, mock.Anything, }, - deleteUnresolvedBatchKeysReturns: []interface{}{nil}, - getSequenceBatchArgs: []interface{}{context.Background(), uint64(10)}, + deleteMissingBatchKeysReturns: []interface{}{nil}, + getSequenceBatchArgs: []interface{}{context.Background(), uint64(10)}, getSequenceBatchReturns: []interface{}{&sequencer.SeqBatch{ Number: types.ArgUint64(10), BatchL2Data: types.ArgBytes(batchL2Data), @@ -642,14 +617,14 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { BatchL2Data: types.ArgBytes(batchL2Data), }, nil}, beginStateTransactionArgs: []interface{}{mock.Anything}, - storeUnresolvedBatchKeysArgs: []interface{}{mock.Anything, + storeMissingBatchKeysArgs: []interface{}{mock.Anything, []types.OffChainData{{ Key: txHash, Value: batchL2Data, }}, mock.Anything, }, - storeUnresolvedBatchKeysReturns: []interface{}{nil}, + storeMissingBatchKeysReturns: []interface{}{nil}, commitReturns: []interface{}{nil}, isErrorExpected: false, }) @@ -660,14 +635,14 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { testFn(testConfig{ isErrorExpected: true, - storeUnresolvedBatchKeysArgs: []interface{}{mock.Anything, + storeMissingBatchKeysArgs: []interface{}{mock.Anything, []types.BatchKey{{ Number: 1, Hash: txHash, }}, mock.Anything, }, - storeUnresolvedBatchKeysReturns: []interface{}{errors.New("error")}, + storeMissingBatchKeysReturns: []interface{}{errors.New("error")}, beginStateTransactionArgs: []interface{}{mock.Anything}, rollbackArgs: []interface{}{mock.Anything}, getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, diff --git a/synchronizer/store.go b/synchronizer/store.go index 5e111f26..7bd8b990 100644 --- a/synchronizer/store.go +++ b/synchronizer/store.go @@ -43,25 +43,25 @@ func setStartBlock(parentCtx context.Context, db dbTypes.DB, block uint64, syncT return db.StoreLastProcessedBlock(ctx, block, string(syncTask)) } -func storeUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB, keys []types.BatchKey) error { +func storeMissingBatchKeys(parentCtx context.Context, db dbTypes.DB, keys []types.BatchKey) error { ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() - return db.StoreUnresolvedBatchKeys(ctx, keys) + return db.StoreMissingBatchKeys(ctx, keys) } -func getUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB) ([]types.BatchKey, error) { +func getMissingBatchKeys(parentCtx context.Context, db dbTypes.DB) ([]types.BatchKey, error) { ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() - return db.GetUnresolvedBatchKeys(ctx, maxUnprocessedBatch) + return db.GetMissingBatchKeys(ctx, maxUnprocessedBatch) } -func deleteUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB, keys []types.BatchKey) error { +func deleteMissingBatchKeys(parentCtx context.Context, db dbTypes.DB, keys []types.BatchKey) error { ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() - return db.DeleteUnresolvedBatchKeys(ctx, keys) + return db.DeleteMissingBatchKeys(ctx, keys) } func listOffchainData(parentCtx context.Context, db dbTypes.DB, keys []common.Hash) ([]types.OffChainData, error) { diff --git a/synchronizer/store_test.go b/synchronizer/store_test.go index d877b5cf..f005c022 100644 --- a/synchronizer/store_test.go +++ b/synchronizer/store_test.go @@ -125,7 +125,7 @@ func Test_setStartBlock(t *testing.T) { } } -func Test_storeUnresolvedBatchKeys(t *testing.T) { +func Test_storeMissingBatchKeys(t *testing.T) { t.Parallel() testError := errors.New("test error") @@ -143,12 +143,12 @@ func Test_storeUnresolvedBatchKeys(t *testing.T) { wantErr bool }{ { - name: "StoreUnresolvedBatchKeys returns error", + name: "StoreMissingBatchKeys returns error", db: func(t *testing.T) db.DB { t.Helper() mockDB := mocks.NewDB(t) - mockDB.On("StoreUnresolvedBatchKeys", mock.Anything, testData).Return(testError) + mockDB.On("StoreMissingBatchKeys", mock.Anything, testData).Return(testError) return mockDB }, @@ -161,7 +161,7 @@ func Test_storeUnresolvedBatchKeys(t *testing.T) { t.Helper() mockDB := mocks.NewDB(t) - mockDB.On("StoreUnresolvedBatchKeys", mock.Anything, testData).Return(nil) + mockDB.On("StoreMissingBatchKeys", mock.Anything, testData).Return(nil) return mockDB }, @@ -176,7 +176,7 @@ func Test_storeUnresolvedBatchKeys(t *testing.T) { testDB := tt.db(t) - if err := storeUnresolvedBatchKeys(context.Background(), testDB, tt.keys); tt.wantErr { + if err := storeMissingBatchKeys(context.Background(), testDB, tt.keys); tt.wantErr { require.ErrorIs(t, err, testError) } else { require.NoError(t, err) @@ -185,7 +185,7 @@ func Test_storeUnresolvedBatchKeys(t *testing.T) { } } -func Test_getUnresolvedBatchKeys(t *testing.T) { +func Test_getMissingBatchKeys(t *testing.T) { t.Parallel() testError := errors.New("test error") @@ -203,12 +203,12 @@ func Test_getUnresolvedBatchKeys(t *testing.T) { wantErr bool }{ { - name: "GetUnresolvedBatchKeys returns error", + name: "GetMissingBatchKeys returns error", db: func(t *testing.T) db.DB { t.Helper() mockDB := mocks.NewDB(t) - mockDB.On("GetUnresolvedBatchKeys", mock.Anything, uint(100)). + mockDB.On("GetMissingBatchKeys", mock.Anything, uint(100)). Return(nil, testError) return mockDB @@ -221,7 +221,7 @@ func Test_getUnresolvedBatchKeys(t *testing.T) { t.Helper() mockDB := mocks.NewDB(t) - mockDB.On("GetUnresolvedBatchKeys", mock.Anything, uint(100)).Return(testData, nil) + mockDB.On("GetMissingBatchKeys", mock.Anything, uint(100)).Return(testData, nil) return mockDB }, @@ -236,7 +236,7 @@ func Test_getUnresolvedBatchKeys(t *testing.T) { testDB := tt.db(t) - if keys, err := getUnresolvedBatchKeys(context.Background(), testDB); tt.wantErr { + if keys, err := getMissingBatchKeys(context.Background(), testDB); tt.wantErr { require.ErrorIs(t, err, testError) } else { require.NoError(t, err) @@ -246,7 +246,7 @@ func Test_getUnresolvedBatchKeys(t *testing.T) { } } -func Test_deleteUnresolvedBatchKeys(t *testing.T) { +func Test_deleteMissingBatchKeys(t *testing.T) { t.Parallel() testError := errors.New("test error") @@ -263,12 +263,12 @@ func Test_deleteUnresolvedBatchKeys(t *testing.T) { wantErr bool }{ { - name: "DeleteUnresolvedBatchKeys returns error", + name: "DeleteMissingBatchKeys returns error", db: func(t *testing.T) db.DB { t.Helper() mockDB := mocks.NewDB(t) - mockDB.On("DeleteUnresolvedBatchKeys", mock.Anything, testData). + mockDB.On("DeleteMissingBatchKeys", mock.Anything, testData). Return(testError) return mockDB @@ -281,7 +281,7 @@ func Test_deleteUnresolvedBatchKeys(t *testing.T) { t.Helper() mockDB := mocks.NewDB(t) - mockDB.On("DeleteUnresolvedBatchKeys", mock.Anything, testData). + mockDB.On("DeleteMissingBatchKeys", mock.Anything, testData). Return(nil) return mockDB @@ -296,7 +296,7 @@ func Test_deleteUnresolvedBatchKeys(t *testing.T) { testDB := tt.db(t) - if err := deleteUnresolvedBatchKeys(context.Background(), testDB, testData); tt.wantErr { + if err := deleteMissingBatchKeys(context.Background(), testDB, testData); tt.wantErr { require.ErrorIs(t, err, testError) } else { require.NoError(t, err) From c6c1ec29e91e6f9dbf1b94050d44de38afd93ef9 Mon Sep 17 00:00:00 2001 From: Rachit Sonthalia Date: Wed, 27 Nov 2024 18:08:06 +0530 Subject: [PATCH 03/19] use update instead of upsert --- db/db.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/db/db.go b/db/db.go index 42c6fe0c..40a81c47 100644 --- a/db/db.go +++ b/db/db.go @@ -15,10 +15,9 @@ import ( const ( // storeLastProcessedBlockSQL is a query that stores the last processed block for a given task storeLastProcessedBlockSQL = ` - INSERT INTO data_node.sync_tasks (task, block) - VALUES ($1, $2) - ON CONFLICT (task) DO UPDATE - SET block = EXCLUDED.block, processed = NOW();` + UPDATE data_node.sync_tasks + SET block = $2, processed = NOW() + WHERE task = $1;` // getLastProcessedBlockSQL is a query that returns the last processed block for a given task getLastProcessedBlockSQL = `SELECT block FROM data_node.sync_tasks WHERE task = $1;` From 53a86ac5b3531c7dcc4eb9ef13f32e87bfbcb964 Mon Sep 17 00:00:00 2001 From: Rachit Sonthalia Date: Wed, 27 Nov 2024 18:35:02 +0530 Subject: [PATCH 04/19] fix unit test --- db/db_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/db_test.go b/db/db_test.go index 8356d5a4..97ea5dd9 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -65,7 +65,7 @@ func Test_DB_StoreLastProcessedBlock(t *testing.T) { constructorExpect(mock) - expected := mock.ExpectExec(`INSERT INTO data_node\.sync_tasks \(task, block\) VALUES \(\$1, \$2\) ON CONFLICT \(task\) DO UPDATE SET block = EXCLUDED\.block, processed = NOW\(\)`). + expected := mock.ExpectExec(`UPDATE data_node\.sync_tasks SET block = \$2, processed = NOW\(\) WHERE task = \$1;`). WithArgs(tt.task, tt.block) if tt.returnErr != nil { expected.WillReturnError(tt.returnErr) @@ -125,7 +125,7 @@ func Test_DB_GetLastProcessedBlock(t *testing.T) { constructorExpect(mock) - mock.ExpectExec(`INSERT INTO data_node\.sync_tasks \(task, block\) VALUES \(\$1, \$2\) ON CONFLICT \(task\) DO UPDATE SET block = EXCLUDED\.block, processed = NOW\(\)`). + mock.ExpectExec(`UPDATE data_node\.sync_tasks SET block = \$2, processed = NOW\(\) WHERE task = \$1;`). WithArgs(tt.task, tt.block). WillReturnResult(sqlmock.NewResult(1, 1)) From 36d5c70b9d41ad9c977b08a917a4f5cc34619453 Mon Sep 17 00:00:00 2001 From: Rachit Sonthalia Date: Wed, 27 Nov 2024 19:22:27 +0530 Subject: [PATCH 05/19] test coverage --- synchronizer/batches_test.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/synchronizer/batches_test.go b/synchronizer/batches_test.go index 7fd26e2c..415ff466 100644 --- a/synchronizer/batches_test.go +++ b/synchronizer/batches_test.go @@ -381,6 +381,18 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { }) }) + t.Run("Error getting offchain data", func(t *testing.T) { + t.Parallel() + + testFn(t, testConfig{ + getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, + getTxReturns: []interface{}{tx, true, nil}, + listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, + listOffchainDataReturns: []interface{}{nil, errors.New("error")}, + isErrorExpected: true, + }) + }) + t.Run("doesn't have batch in storage - successfully stored (Elderberry fork)", func(t *testing.T) { t.Parallel() @@ -462,6 +474,25 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { getTxReturns: []interface{}{tx, true, nil}, }) }) + + t.Run("have batch in storage already no error", func(t *testing.T) { + t.Parallel() + + testFn(t, testConfig{ + isErrorExpected: false, + listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, + listOffchainDataReturns: []interface{}{ + []types.OffChainData{ + { + Key: txHash, + Value: batchL2Data, + }, + }, nil, + }, + getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, + getTxReturns: []interface{}{tx, true, nil}, + }) + }) } func TestBatchSynchronizer_HandleMissingBatches(t *testing.T) { From 110db4fc6d49a618f978b249e7204d13139b4711 Mon Sep 17 00:00:00 2001 From: Rachit Sonthalia Date: Wed, 27 Nov 2024 19:34:38 +0530 Subject: [PATCH 06/19] test coverage --- synchronizer/batches_test.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/synchronizer/batches_test.go b/synchronizer/batches_test.go index 415ff466..81722836 100644 --- a/synchronizer/batches_test.go +++ b/synchronizer/batches_test.go @@ -617,6 +617,34 @@ func TestBatchSynchronizer_HandleMissingBatches(t *testing.T) { }) }) + t.Run("DB error while storing missing batch", func(t *testing.T) { + t.Parallel() + + testFn(t, testConfig{ + getMissingBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, + getMissingBatchKeysReturns: []interface{}{ + []types.BatchKey{{ + Number: 10, + Hash: txHash, + }}, + nil, + }, + storeOffChainDataArgs: []interface{}{mock.Anything, + []types.OffChainData{{ + Key: txHash, + Value: batchL2Data, + }}, + }, + storeOffChainDataReturns: []interface{}{errors.New("error")}, + getSequenceBatchArgs: []interface{}{context.Background(), uint64(10)}, + getSequenceBatchReturns: []interface{}{&sequencer.SeqBatch{ + Number: types.ArgUint64(10), + BatchL2Data: types.ArgBytes(batchL2Data), + }, nil}, + isErrorExpected: true, + }) + }) + /*t.Run("Invalid tx data", func(t *testing.T) { t.Parallel() From fe419bf884ef100c5a2fd715955970961240fc45 Mon Sep 17 00:00:00 2001 From: Rachit Sonthalia Date: Wed, 27 Nov 2024 19:45:49 +0530 Subject: [PATCH 07/19] test coverage --- synchronizer/batches_test.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/synchronizer/batches_test.go b/synchronizer/batches_test.go index 81722836..74e96be4 100644 --- a/synchronizer/batches_test.go +++ b/synchronizer/batches_test.go @@ -645,6 +645,42 @@ func TestBatchSynchronizer_HandleMissingBatches(t *testing.T) { }) }) + t.Run("DB error while deleting missing batch entries", func(t *testing.T) { + t.Parallel() + + testFn(t, testConfig{ + getMissingBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, + getMissingBatchKeysReturns: []interface{}{ + []types.BatchKey{{ + Number: 10, + Hash: txHash, + }}, + nil, + }, + storeOffChainDataArgs: []interface{}{mock.Anything, + []types.OffChainData{{ + Key: txHash, + Value: batchL2Data, + }}, + }, + storeOffChainDataReturns: []interface{}{nil}, + deleteMissingBatchKeysArgs: []interface{}{mock.Anything, + []types.BatchKey{{ + Number: 10, + Hash: txHash, + }}, + mock.Anything, + }, + deleteMissingBatchKeysReturns: []interface{}{errors.New("error")}, + getSequenceBatchArgs: []interface{}{context.Background(), uint64(10)}, + getSequenceBatchReturns: []interface{}{&sequencer.SeqBatch{ + Number: types.ArgUint64(10), + BatchL2Data: types.ArgBytes(batchL2Data), + }, nil}, + isErrorExpected: true, + }) + }) + /*t.Run("Invalid tx data", func(t *testing.T) { t.Parallel() From 9a6211e2569fd925cde133873c5950bc499d2209 Mon Sep 17 00:00:00 2001 From: Rachit Sonthalia Date: Thu, 28 Nov 2024 14:37:11 +0530 Subject: [PATCH 08/19] increase test coverage --- synchronizer/batches_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/synchronizer/batches_test.go b/synchronizer/batches_test.go index 74e96be4..057574d9 100644 --- a/synchronizer/batches_test.go +++ b/synchronizer/batches_test.go @@ -6,6 +6,7 @@ import ( "math/big" "strings" "testing" + "time" elderberryValidium "github.com/0xPolygon/cdk-contracts-tooling/contracts/elderberry/polygonvalidiumetrog" etrogValidium "github.com/0xPolygon/cdk-contracts-tooling/contracts/etrog/polygonvalidiumetrog" @@ -495,6 +496,28 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { }) } +func TestBatchSynchronizer_ProcessMissingBatches(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dbMock := mocks.NewDB(t) + dbMock.On("GetMissingBatchKeys", mock.Anything, mock.Anything).Return( + []types.BatchKey{}, nil) + + batchSynronizer := &BatchSynchronizer{ + db: dbMock, + retry: time.Millisecond * 100, + stop: make(chan struct{}), + } + go batchSynronizer.processMissingBatches(ctx) + + // Wait for the retry interval and then signal to stop + time.Sleep(time.Millisecond * 200) + batchSynronizer.stop <- struct{}{} + dbMock.AssertExpectations(t) +} + func TestBatchSynchronizer_HandleMissingBatches(t *testing.T) { t.Parallel() From ac9fa260d7c635b17dc620e4d17bc86172b08b32 Mon Sep 17 00:00:00 2001 From: Rachit Sonthalia Date: Thu, 28 Nov 2024 20:24:02 +0530 Subject: [PATCH 09/19] fix migration file --- db/migrations/0006.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/migrations/0006.sql b/db/migrations/0006.sql index 57541e10..5d681d62 100644 --- a/db/migrations/0006.sql +++ b/db/migrations/0006.sql @@ -4,7 +4,7 @@ ALTER TABLE data_node.offchain_data ADD COLUMN IF NOT EXISTS batch_num BIGINT NOT NULL DEFAULT 0; -- Rename the 'missing_batches' table to 'unresolved_batches' -ALTER TABLE data_node.missing_batches RENAME TO data_node.unresolved_batches; +ALTER TABLE missing_batches RENAME TO unresolved_batches; -- Create an index for the 'batch_num' column for better performance CREATE INDEX IF NOT EXISTS idx_batch_num ON data_node.offchain_data(batch_num); @@ -17,7 +17,7 @@ UPDATE data_node.sync_tasks SET block = 0 WHERE task = 'L1'; ALTER TABLE data_node.offchain_data DROP COLUMN batch_num; -- Rename the 'unresolved_batches' table back to 'missing_batches' -ALTER TABLE data_node.unresolved_batches RENAME TO data_node.missing_batches; +ALTER TABLE unresolved_batches RENAME TO missing_batches; -- Drop the index created on 'batch_num' DROP INDEX IF EXISTS idx_batch_num; \ No newline at end of file From 5089d5c490639ac89fff75f8bc0c05c29fac0b77 Mon Sep 17 00:00:00 2001 From: Rachit Sonthalia Date: Fri, 29 Nov 2024 12:25:23 +0530 Subject: [PATCH 10/19] fix migration file --- db/migrations/0006.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/migrations/0006.sql b/db/migrations/0006.sql index 5d681d62..c6a80c22 100644 --- a/db/migrations/0006.sql +++ b/db/migrations/0006.sql @@ -4,7 +4,7 @@ ALTER TABLE data_node.offchain_data ADD COLUMN IF NOT EXISTS batch_num BIGINT NOT NULL DEFAULT 0; -- Rename the 'missing_batches' table to 'unresolved_batches' -ALTER TABLE missing_batches RENAME TO unresolved_batches; +ALTER TABLE data_node.missing_batches RENAME TO unresolved_batches; -- Create an index for the 'batch_num' column for better performance CREATE INDEX IF NOT EXISTS idx_batch_num ON data_node.offchain_data(batch_num); @@ -17,7 +17,7 @@ UPDATE data_node.sync_tasks SET block = 0 WHERE task = 'L1'; ALTER TABLE data_node.offchain_data DROP COLUMN batch_num; -- Rename the 'unresolved_batches' table back to 'missing_batches' -ALTER TABLE unresolved_batches RENAME TO missing_batches; +ALTER TABLE data_node.unresolved_batches RENAME TO missing_batches; -- Drop the index created on 'batch_num' DROP INDEX IF EXISTS idx_batch_num; \ No newline at end of file From 704d1e296e54a3828b5251195413e6f3dd80ed00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20Negovanovi=C4=87?= Date: Mon, 2 Dec 2024 10:25:07 +0100 Subject: [PATCH 11/19] go mod tidy --- go.mod | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/go.mod b/go.mod index eb2c6fbe..44ef249d 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,6 @@ module github.com/0xPolygon/cdk-data-availability go 1.22.4 - require ( github.com/0xPolygon/cdk v0.1.0 github.com/0xPolygon/cdk-contracts-tooling v0.0.0-20240826154954-f6182d2b17a2 @@ -11,7 +10,6 @@ require ( github.com/ethereum/go-ethereum v1.14.5 github.com/gorilla/websocket v1.5.1 github.com/hermeznetwork/tracerr v0.3.2 - github.com/iden3/go-iden3-crypto v0.0.16 github.com/invopop/jsonschema v0.12.0 github.com/jmoiron/sqlx v1.2.0 github.com/lib/pq v1.10.7 @@ -50,6 +48,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/hcl v1.0.1-0.20180906183839-65a6292f0157 // indirect github.com/holiman/uint256 v1.2.4 // indirect + github.com/iden3/go-iden3-crypto v0.0.16 // indirect github.com/klauspost/compress v1.17.2 // indirect github.com/logrusorgru/aurora v2.0.3+incompatible // indirect github.com/magiconair/properties v1.8.7 // indirect From 7ca135ea120cc6329843c0cad41fb215a8fc8bef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20Negovanovi=C4=87?= Date: Mon, 2 Dec 2024 11:51:43 +0100 Subject: [PATCH 12/19] Add launch.json for debugging --- .vscode/launch.json | 21 +++++++++++++++++++++ test/config/test.local.toml | 2 +- 2 files changed, 22 insertions(+), 1 deletion(-) create mode 100644 .vscode/launch.json diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 00000000..fa6bcd3a --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,21 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Run DAC", + "type": "go", + "request": "launch", + "mode": "debug", + "program": "${workspaceFolder}/cmd", + "args": [ + "run", + "-cfg", + "test/config/test.local.toml", + ], + "cwd": ".", + }, + ] +} \ No newline at end of file diff --git a/test/config/test.local.toml b/test/config/test.local.toml index b19a167b..d27a4203 100644 --- a/test/config/test.local.toml +++ b/test/config/test.local.toml @@ -1,4 +1,4 @@ -PrivateKey = {Path = "config/test-member.keystore", Password = "testonly"} +PrivateKey = {Path = "test/config/test-member.keystore", Password = "testonly"} [L1] RpcURL = "ws://127.0.0.1:8546" From 88e308294f3d2060f39c9744a04fb5d20243a927 Mon Sep 17 00:00:00 2001 From: Rachit Sonthalia Date: Mon, 2 Dec 2024 18:45:49 +0530 Subject: [PATCH 13/19] apply feedback --- client/client_test.go | 8 ++++---- db/db.go | 6 +++++- db/migrations/0006.sql | 4 ++-- services/status/status.go | 10 +++++----- services/status/status_test.go | 2 +- types/types.go | 8 ++++---- 6 files changed, 21 insertions(+), 17 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 72114843..89dd5938 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -30,10 +30,10 @@ func TestClient_GetStatus(t *testing.T) { name: "successfully got status", result: `{"result":{"version":"v1.0.0","uptime":"123","key_count":2,"backfill_progress":5}}`, status: &types.DACStatus{ - Uptime: "123", - Version: "v1.0.0", - KeyCount: 2, - BackfillProgress: 5, + Uptime: "123", + Version: "v1.0.0", + KeyCount: 2, + LastSynchronizedBlock: 5, }, }, { diff --git a/db/db.go b/db/db.go index 40a81c47..e3d6ecb4 100644 --- a/db/db.go +++ b/db/db.go @@ -137,7 +137,11 @@ func (db *pgDB) StoreMissingBatchKeys(ctx context.Context, bks []types.BatchKey) query, args := buildBatchKeysInsertQuery(bks) if _, err := db.pg.ExecContext(ctx, query, args...); err != nil { - return fmt.Errorf("failed to store misisng batches: %w", err) + batchNumbers := make([]string, len(bks)) + for i, bk := range bks { + batchNumbers[i] = fmt.Sprintf("%d", bk.Number) + } + return fmt.Errorf("failed to store missing batches (BatchKey.Number: %s): %w", strings.Join(batchNumbers, ", "), err) } return nil diff --git a/db/migrations/0006.sql b/db/migrations/0006.sql index c6a80c22..57541e10 100644 --- a/db/migrations/0006.sql +++ b/db/migrations/0006.sql @@ -4,7 +4,7 @@ ALTER TABLE data_node.offchain_data ADD COLUMN IF NOT EXISTS batch_num BIGINT NOT NULL DEFAULT 0; -- Rename the 'missing_batches' table to 'unresolved_batches' -ALTER TABLE data_node.missing_batches RENAME TO unresolved_batches; +ALTER TABLE data_node.missing_batches RENAME TO data_node.unresolved_batches; -- Create an index for the 'batch_num' column for better performance CREATE INDEX IF NOT EXISTS idx_batch_num ON data_node.offchain_data(batch_num); @@ -17,7 +17,7 @@ UPDATE data_node.sync_tasks SET block = 0 WHERE task = 'L1'; ALTER TABLE data_node.offchain_data DROP COLUMN batch_num; -- Rename the 'unresolved_batches' table back to 'missing_batches' -ALTER TABLE data_node.unresolved_batches RENAME TO missing_batches; +ALTER TABLE data_node.unresolved_batches RENAME TO data_node.missing_batches; -- Drop the index created on 'batch_num' DROP INDEX IF EXISTS idx_batch_num; \ No newline at end of file diff --git a/services/status/status.go b/services/status/status.go index 664e5c7c..20d1c6b7 100644 --- a/services/status/status.go +++ b/services/status/status.go @@ -39,15 +39,15 @@ func (s *Endpoints) GetStatus() (interface{}, rpc.Error) { log.Errorf("failed to get the key count from the offchain_data table: %v", err) } - backfillProgress, err := s.db.GetLastProcessedBlock(ctx, string(synchronizer.L1SyncTask)) + lastSynchronizedBlock, err := s.db.GetLastProcessedBlock(ctx, string(synchronizer.L1SyncTask)) if err != nil { log.Errorf("failed to get last block processed by the synchronizer: %v", err) } return types.DACStatus{ - Version: dataavailability.Version, - Uptime: uptime, - KeyCount: rowCount, - BackfillProgress: backfillProgress, + Version: dataavailability.Version, + Uptime: uptime, + KeyCount: rowCount, + LastSynchronizedBlock: lastSynchronizedBlock, }, nil } diff --git a/services/status/status_test.go b/services/status/status_test.go index d6ccbfd1..bf0e55a2 100644 --- a/services/status/status_test.go +++ b/services/status/status_test.go @@ -68,7 +68,7 @@ func TestEndpoints_GetStatus(t *testing.T) { require.NotEmpty(t, dacStatus.Uptime) require.Equal(t, "v0.1.0", dacStatus.Version) require.Equal(t, tt.countOffchainData, dacStatus.KeyCount) - require.Equal(t, tt.getLastProcessedBlock, dacStatus.BackfillProgress) + require.Equal(t, tt.getLastProcessedBlock, dacStatus.LastSynchronizedBlock) } }) } diff --git a/types/types.go b/types/types.go index 1a3d7d06..97e48135 100644 --- a/types/types.go +++ b/types/types.go @@ -17,10 +17,10 @@ const ( // DACStatus contains DAC status info type DACStatus struct { - Uptime string `json:"uptime"` - Version string `json:"version"` - KeyCount uint64 `json:"key_count"` - BackfillProgress uint64 `json:"backfill_progress"` + Uptime string `json:"uptime"` + Version string `json:"version"` + KeyCount uint64 `json:"key_count"` + LastSynchronizedBlock uint64 `json:"last_synchronized_block"` } // BatchKey is the pairing of batch number and data hash of a batch From 1d91df861f3f6357dfd7f47cc5a46bba090cc7b0 Mon Sep 17 00:00:00 2001 From: Rachit Sonthalia Date: Mon, 2 Dec 2024 19:12:54 +0530 Subject: [PATCH 14/19] fix migration file --- db/migrations/0006.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/migrations/0006.sql b/db/migrations/0006.sql index 57541e10..c6a80c22 100644 --- a/db/migrations/0006.sql +++ b/db/migrations/0006.sql @@ -4,7 +4,7 @@ ALTER TABLE data_node.offchain_data ADD COLUMN IF NOT EXISTS batch_num BIGINT NOT NULL DEFAULT 0; -- Rename the 'missing_batches' table to 'unresolved_batches' -ALTER TABLE data_node.missing_batches RENAME TO data_node.unresolved_batches; +ALTER TABLE data_node.missing_batches RENAME TO unresolved_batches; -- Create an index for the 'batch_num' column for better performance CREATE INDEX IF NOT EXISTS idx_batch_num ON data_node.offchain_data(batch_num); @@ -17,7 +17,7 @@ UPDATE data_node.sync_tasks SET block = 0 WHERE task = 'L1'; ALTER TABLE data_node.offchain_data DROP COLUMN batch_num; -- Rename the 'unresolved_batches' table back to 'missing_batches' -ALTER TABLE data_node.unresolved_batches RENAME TO data_node.missing_batches; +ALTER TABLE data_node.unresolved_batches RENAME TO missing_batches; -- Drop the index created on 'batch_num' DROP INDEX IF EXISTS idx_batch_num; \ No newline at end of file From 713578a12c75db75ae8e1f7c8018bd0b784dce2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20Negovanovi=C4=87?= Date: Mon, 2 Dec 2024 14:49:46 +0100 Subject: [PATCH 15/19] Bump the Kurtosis CDK to v0.2.23 --- .github/workflows/regression-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/regression-tests.yml b/.github/workflows/regression-tests.yml index 4309f85d..9afc1ff9 100644 --- a/.github/workflows/regression-tests.yml +++ b/.github/workflows/regression-tests.yml @@ -30,7 +30,7 @@ jobs: uses: actions/checkout@v4 with: repository: 0xPolygon/kurtosis-cdk - ref: "v0.2.22" + ref: v0.2.23 path: kurtosis-cdk - name: Install Kurtosis CDK tools From 04e667c73350daa97a3ed8663d7d8db5343a2ebb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20Negovanovi=C4=87?= Date: Mon, 2 Dec 2024 15:01:50 +0100 Subject: [PATCH 16/19] Fix TestClient_GetStatus UT --- client/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/client_test.go b/client/client_test.go index 89dd5938..2251a472 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -28,7 +28,7 @@ func TestClient_GetStatus(t *testing.T) { }{ { name: "successfully got status", - result: `{"result":{"version":"v1.0.0","uptime":"123","key_count":2,"backfill_progress":5}}`, + result: `{"result":{"version":"v1.0.0","uptime":"123","key_count":2,"last_synchronized_block":5}}`, status: &types.DACStatus{ Uptime: "123", Version: "v1.0.0", From a4fab9f875075c883231cb93b13b291c29f55dc4 Mon Sep 17 00:00:00 2001 From: Rachit Sonthalia Date: Mon, 2 Dec 2024 19:48:43 +0530 Subject: [PATCH 17/19] apply feedback --- db/db.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/db.go b/db/db.go index e3d6ecb4..c67e534e 100644 --- a/db/db.go +++ b/db/db.go @@ -141,7 +141,7 @@ func (db *pgDB) StoreMissingBatchKeys(ctx context.Context, bks []types.BatchKey) for i, bk := range bks { batchNumbers[i] = fmt.Sprintf("%d", bk.Number) } - return fmt.Errorf("failed to store missing batches (BatchKey.Number: %s): %w", strings.Join(batchNumbers, ", "), err) + return fmt.Errorf("failed to store missing batches (batch number: %s): %w", strings.Join(batchNumbers, ", "), err) } return nil From 1957d6daee49dbe7a88e1d72483e0d48f5259da5 Mon Sep 17 00:00:00 2001 From: Rachit Sonthalia Date: Mon, 2 Dec 2024 19:52:26 +0530 Subject: [PATCH 18/19] fix typo --- db/db.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/db.go b/db/db.go index c67e534e..05144fa2 100644 --- a/db/db.go +++ b/db/db.go @@ -141,7 +141,7 @@ func (db *pgDB) StoreMissingBatchKeys(ctx context.Context, bks []types.BatchKey) for i, bk := range bks { batchNumbers[i] = fmt.Sprintf("%d", bk.Number) } - return fmt.Errorf("failed to store missing batches (batch number: %s): %w", strings.Join(batchNumbers, ", "), err) + return fmt.Errorf("failed to store missing batches (batch numbers: %s): %w", strings.Join(batchNumbers, ", "), err) } return nil From 6cc4c58403d187e6c2b88829629dece8e8d1fa54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20Negovanovi=C4=87?= Date: Mon, 2 Dec 2024 15:48:32 +0100 Subject: [PATCH 19/19] Bump the Kurtosis CDK version --- .github/workflows/regression-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/regression-tests.yml b/.github/workflows/regression-tests.yml index 9afc1ff9..57024a2e 100644 --- a/.github/workflows/regression-tests.yml +++ b/.github/workflows/regression-tests.yml @@ -30,7 +30,7 @@ jobs: uses: actions/checkout@v4 with: repository: 0xPolygon/kurtosis-cdk - ref: v0.2.23 + ref: v0.2.24 path: kurtosis-cdk - name: Install Kurtosis CDK tools