From be6673714adb0fd0fc5b7fbcc58e509f1b0f7a37 Mon Sep 17 00:00:00 2001 From: Urvi Date: Thu, 2 Jan 2025 14:19:21 -0800 Subject: [PATCH] ingest/ledgerbackend: fix to ensure that the ledger buffer properly queues the last batch of ledgers (LCM) within the specified range, preventing get_ledger from blocking indefinitely. --- .../buffered_storage_backend_test.go | 40 +++++++++---------- ingest/ledgerbackend/ledger_buffer.go | 4 +- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go index fe3ca0266c..2ddb0efb8d 100644 --- a/ingest/ledgerbackend/buffered_storage_backend_test.go +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -65,6 +65,14 @@ func createBufferedStorageBackendForTesting() BufferedStorageBackend { func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32) *datastore.MockDataStore { mockDataStore := new(datastore.MockDataStore) partition := count*partitionSize - 1 + + schema := datastore.DataStoreSchema{ + LedgersPerFile: count, + FilesPerPartition: partitionSize, + } + + start = schema.GetSequenceNumberStartBoundary(start) + end = schema.GetSequenceNumberEndBoundary(end) for i := start; i <= end; i = i + count { var objectName string var readCloser io.ReadCloser @@ -78,10 +86,7 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32) } mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil).Times(1) } - mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ - LedgersPerFile: count, - FilesPerPartition: partitionSize, - }) + mockDataStore.On("GetSchema").Return(schema) t.Cleanup(func() { mockDataStore.AssertExpectations(t) @@ -248,31 +253,24 @@ func TestBSBGetLedger_SingleLedgerPerFile(t *testing.T) { } func TestCloudStorageGetLedger_MultipleLedgerPerFile(t *testing.T) { - startLedger := uint32(2) - endLedger := uint32(5) + startLedger := uint32(6) + endLedger := uint32(17) lcmArray := createLCMForTesting(startLedger, endLedger) bsb := createBufferedStorageBackendForTesting() ctx := context.Background() ledgerRange := BoundedRange(startLedger, endLedger) - mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, 2) + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, 4) bsb.dataStore = mockDataStore - mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ - LedgersPerFile: uint32(2), - FilesPerPartition: partitionSize, - }) + assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) - assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 2 }, time.Second*5, time.Millisecond*50) + assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 4 }, time.Second*5, time.Millisecond*50) - lcm, err := bsb.GetLedger(ctx, uint32(2)) - assert.NoError(t, err) - assert.Equal(t, lcmArray[0], lcm) - lcm, err = bsb.GetLedger(ctx, uint32(3)) - assert.NoError(t, err) - assert.Equal(t, lcmArray[1], lcm) - lcm, err = bsb.GetLedger(ctx, uint32(4)) - assert.NoError(t, err) - assert.Equal(t, lcmArray[2], lcm) + for i := 0; i <= int(endLedger-startLedger); i++ { + lcm, err := bsb.GetLedger(ctx, startLedger+uint32(i)) + assert.NoError(t, err) + assert.Equal(t, lcmArray[i], lcm) + } } func TestBSBGetLedger_ErrorPreceedingLedger(t *testing.T) { diff --git a/ingest/ledgerbackend/ledger_buffer.go b/ingest/ledgerbackend/ledger_buffer.go index 7ee9dda083..0b2742e94d 100644 --- a/ingest/ledgerbackend/ledger_buffer.go +++ b/ingest/ledgerbackend/ledger_buffer.go @@ -96,8 +96,8 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu } func (lb *ledgerBuffer) pushTaskQueue() { - // In bounded mode, don't queue past the end ledger - if lb.nextTaskLedger > lb.ledgerRange.to && lb.ledgerRange.bounded { + // In bounded mode, don't queue past the end boundary ledger for the specified range. + if lb.ledgerRange.bounded && lb.nextTaskLedger > lb.dataStore.GetSchema().GetSequenceNumberEndBoundary(lb.ledgerRange.to) { return } lb.taskQueue <- lb.nextTaskLedger