From 31336090fb0b210f0b1ecab856236a0fd9592326 Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Wed, 19 Jun 2024 15:59:57 -0400 Subject: [PATCH] limit the amount of unprocessed batch keys at a time --- db/db.go | 8 ++++---- db/db_test.go | 5 +++-- mocks/db.generated.go | 29 +++++++++++++++-------------- synchronizer/batches_test.go | 8 ++++---- synchronizer/init.go | 5 +++-- synchronizer/store.go | 2 +- synchronizer/store_test.go | 4 ++-- 7 files changed, 32 insertions(+), 29 deletions(-) diff --git a/db/db.go b/db/db.go index ec4786cd..fe6e07d3 100644 --- a/db/db.go +++ b/db/db.go @@ -24,7 +24,7 @@ type DB interface { GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error - GetUnresolvedBatchKeys(ctx context.Context) ([]types.BatchKey, error) + GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error Exists(ctx context.Context, key common.Hash) bool @@ -113,10 +113,10 @@ func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchK } // GetUnresolvedBatchKeys returns the unresolved batch keys from the database -func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context) ([]types.BatchKey, error) { - const getUnresolvedBatchKeysSQL = "SELECT num, hash FROM data_node.unresolved_batches;" +func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) { + const getUnresolvedBatchKeysSQL = "SELECT num, hash FROM data_node.unresolved_batches LIMIT $1;" - rows, err := db.pg.QueryxContext(ctx, getUnresolvedBatchKeysSQL) + rows, err := db.pg.QueryxContext(ctx, getUnresolvedBatchKeysSQL, limit) if err != nil { return nil, err } diff --git a/db/db_test.go b/db/db_test.go index b1177973..8a8e9834 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -244,7 +244,8 @@ func Test_DB_GetUnresolvedBatchKeys(t *testing.T) { // Seed data seedUnresolvedBatchKeys(t, wdb, mock, tt.bks) - expected := mock.ExpectQuery(`SELECT num, hash FROM data_node\.unresolved_batches`) + var limit = uint(10) + expected := mock.ExpectQuery(`SELECT num, hash FROM data_node\.unresolved_batches LIMIT \$1\;`).WithArgs(limit) if tt.returnErr != nil { expected.WillReturnError(tt.returnErr) @@ -256,7 +257,7 @@ func Test_DB_GetUnresolvedBatchKeys(t *testing.T) { dbPG := New(wdb) - data, err := dbPG.GetUnresolvedBatchKeys(context.Background()) + data, err := dbPG.GetUnresolvedBatchKeys(context.Background(), limit) if tt.returnErr != nil { require.ErrorIs(t, err, tt.returnErr) } else { diff --git a/mocks/db.generated.go b/mocks/db.generated.go index 837b2f8b..2a34abc4 100644 --- a/mocks/db.generated.go +++ b/mocks/db.generated.go @@ -355,9 +355,9 @@ func (_c *DB_GetOffChainData_Call) RunAndReturn(run func(context.Context, common return _c } -// GetUnresolvedBatchKeys provides a mock function with given fields: ctx -func (_m *DB) GetUnresolvedBatchKeys(ctx context.Context) ([]types.BatchKey, error) { - ret := _m.Called(ctx) +// GetUnresolvedBatchKeys provides a mock function with given fields: ctx, limit +func (_m *DB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) { + ret := _m.Called(ctx, limit) if len(ret) == 0 { panic("no return value specified for GetUnresolvedBatchKeys") @@ -365,19 +365,19 @@ func (_m *DB) GetUnresolvedBatchKeys(ctx context.Context) ([]types.BatchKey, err var r0 []types.BatchKey var r1 error - if rf, ok := ret.Get(0).(func(context.Context) ([]types.BatchKey, error)); ok { - return rf(ctx) + if rf, ok := ret.Get(0).(func(context.Context, uint) ([]types.BatchKey, error)); ok { + return rf(ctx, limit) } - if rf, ok := ret.Get(0).(func(context.Context) []types.BatchKey); ok { - r0 = rf(ctx) + if rf, ok := ret.Get(0).(func(context.Context, uint) []types.BatchKey); ok { + r0 = rf(ctx, limit) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]types.BatchKey) } } - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) + if rf, ok := ret.Get(1).(func(context.Context, uint) error); ok { + r1 = rf(ctx, limit) } else { r1 = ret.Error(1) } @@ -392,13 +392,14 @@ type DB_GetUnresolvedBatchKeys_Call struct { // GetUnresolvedBatchKeys is a helper method to define mock.On call // - ctx context.Context -func (_e *DB_Expecter) GetUnresolvedBatchKeys(ctx interface{}) *DB_GetUnresolvedBatchKeys_Call { - return &DB_GetUnresolvedBatchKeys_Call{Call: _e.mock.On("GetUnresolvedBatchKeys", ctx)} +// - limit uint +func (_e *DB_Expecter) GetUnresolvedBatchKeys(ctx interface{}, limit interface{}) *DB_GetUnresolvedBatchKeys_Call { + return &DB_GetUnresolvedBatchKeys_Call{Call: _e.mock.On("GetUnresolvedBatchKeys", ctx, limit)} } -func (_c *DB_GetUnresolvedBatchKeys_Call) Run(run func(ctx context.Context)) *DB_GetUnresolvedBatchKeys_Call { +func (_c *DB_GetUnresolvedBatchKeys_Call) Run(run func(ctx context.Context, limit uint)) *DB_GetUnresolvedBatchKeys_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) + run(args[0].(context.Context), args[1].(uint)) }) return _c } @@ -408,7 +409,7 @@ func (_c *DB_GetUnresolvedBatchKeys_Call) Return(_a0 []types.BatchKey, _a1 error return _c } -func (_c *DB_GetUnresolvedBatchKeys_Call) RunAndReturn(run func(context.Context) ([]types.BatchKey, error)) *DB_GetUnresolvedBatchKeys_Call { +func (_c *DB_GetUnresolvedBatchKeys_Call) RunAndReturn(run func(context.Context, uint) ([]types.BatchKey, error)) *DB_GetUnresolvedBatchKeys_Call { _c.Call.Return(run) return _c } diff --git a/synchronizer/batches_test.go b/synchronizer/batches_test.go index 4ab4716c..07ba618c 100644 --- a/synchronizer/batches_test.go +++ b/synchronizer/batches_test.go @@ -642,7 +642,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { t.Parallel() testFn(t, testConfig{ - getUnresolvedBatchKeysArgs: []interface{}{mock.Anything}, + getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, getUnresolvedBatchKeysReturns: []interface{}{nil, errors.New("error")}, isErrorExpected: true, }) @@ -652,7 +652,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { t.Parallel() testFn(t, testConfig{ - getUnresolvedBatchKeysArgs: []interface{}{mock.Anything}, + getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, getUnresolvedBatchKeysReturns: []interface{}{nil, nil}, isErrorExpected: false, }) @@ -662,7 +662,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { t.Parallel() testFn(t, testConfig{ - getUnresolvedBatchKeysArgs: []interface{}{mock.Anything}, + getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, getUnresolvedBatchKeysReturns: []interface{}{ []types.BatchKey{{ Number: 10, @@ -690,7 +690,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { t.Parallel() testFn(t, testConfig{ - getUnresolvedBatchKeysArgs: []interface{}{mock.Anything}, + getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, getUnresolvedBatchKeysReturns: []interface{}{ []types.BatchKey{{ Number: 10, diff --git a/synchronizer/init.go b/synchronizer/init.go index 71df83c4..dc159937 100644 --- a/synchronizer/init.go +++ b/synchronizer/init.go @@ -12,8 +12,9 @@ import ( ) const ( - initBlockTimeout = 15 * time.Second - minCodeLen = 2 + initBlockTimeout = 15 * time.Second + minCodeLen = 2 + maxUnprocessedBatch = 100 ) // InitStartBlock initializes the L1 sync task by finding the inception block for the CDKValidium contract diff --git a/synchronizer/store.go b/synchronizer/store.go index ea10b701..f628d8d8 100644 --- a/synchronizer/store.go +++ b/synchronizer/store.go @@ -89,7 +89,7 @@ func getUnresolvedBatchKeys(db dbTypes.DB) ([]types.BatchKey, error) { ctx, cancel := context.WithTimeout(context.Background(), dbTimeout) defer cancel() - return db.GetUnresolvedBatchKeys(ctx) + return db.GetUnresolvedBatchKeys(ctx, maxUnprocessedBatch) } func deleteUnresolvedBatchKeys(db dbTypes.DB, keys []types.BatchKey) error { diff --git a/synchronizer/store_test.go b/synchronizer/store_test.go index 10abbbbd..252b52b0 100644 --- a/synchronizer/store_test.go +++ b/synchronizer/store_test.go @@ -310,7 +310,7 @@ func Test_getUnresolvedBatchKeys(t *testing.T) { db: func(t *testing.T) db.DB { mockDB := mocks.NewDB(t) - mockDB.On("GetUnresolvedBatchKeys", mock.Anything). + mockDB.On("GetUnresolvedBatchKeys", mock.Anything, uint(100)). Return(nil, testError) return mockDB @@ -322,7 +322,7 @@ func Test_getUnresolvedBatchKeys(t *testing.T) { db: func(t *testing.T) db.DB { mockDB := mocks.NewDB(t) - mockDB.On("GetUnresolvedBatchKeys", mock.Anything).Return(testData, nil) + mockDB.On("GetUnresolvedBatchKeys", mock.Anything, uint(100)).Return(testData, nil) return mockDB },