From ba11f097d8926183948a0a16cfab21f2069fcf2a Mon Sep 17 00:00:00 2001 From: shawn Date: Thu, 3 Oct 2024 11:03:15 -0700 Subject: [PATCH] ingest/ledgerbackend: Create functional producer for CDP (#5462) --- ingest/CHANGELOG.md | 6 + ingest/cdp/producer.go | 147 +++++++++ ingest/cdp/producer_test.go | 296 ++++++++++++++++++ .../buffered_storage_backend_test.go | 2 +- ingest/ledgerbackend/range.go | 12 + support/datastore/mocks.go | 6 +- 6 files changed, 467 insertions(+), 2 deletions(-) create mode 100644 ingest/cdp/producer.go create mode 100644 ingest/cdp/producer_test.go diff --git a/ingest/CHANGELOG.md b/ingest/CHANGELOG.md index ed168de74e..3d3835a799 100644 --- a/ingest/CHANGELOG.md +++ b/ingest/CHANGELOG.md @@ -2,6 +2,12 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## Pending + +### New Features +* Create new package `ingest/cdp` for new components which will assist towards writing data transformation pipelines as part of [Composable Data Platform](https://stellar.org/blog/developers/composable-data-platform). +* Add new functional producer, `cdp.ApplyLedgerMetadata`. A new function which enables a private instance of `BufferedStorageBackend` to perfrom the role of a producer operator in streaming pipeline designs. It will emit pre-computed `LedgerCloseMeta` from a chosen `DataStore`. The stream can use `ApplyLedgerMetadata` as the origin of `LedgerCloseMeta`, providing a callback function which acts as the next operator in the stream, receiving the `LedgerCloseMeta`. [5462](https://github.com/stellar/go/pull/5462). + ### Stellar Core Protocol 21 Configuration Update: * BucketlistDB is now the default database for stellar-core, replacing the experimental option. As a result, the `EXPERIMENTAL_BUCKETLIST_DB` configuration parameter has been deprecated. * A new mandatory parameter, `DEPRECATED_SQL_LEDGER_STATE`, has been added with a default value of false which equivalent to `EXPERIMENTAL_BUCKETLIST_DB` being set to true. diff --git a/ingest/cdp/producer.go b/ingest/cdp/producer.go new file mode 100644 index 0000000000..0c5ff6de49 --- /dev/null +++ b/ingest/cdp/producer.go @@ -0,0 +1,147 @@ +package cdp + +import ( + "context" + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/support/datastore" + "github.com/stellar/go/support/log" + "github.com/stellar/go/support/ordered" + "github.com/stellar/go/xdr" +) + +// provide testing hooks to inject mocks of these +var datastoreFactory = datastore.NewDataStore + +// Generate a default buffered storage config with values +// set to optimize buffered performance to some degree based +// on number of ledgers per file expected in the underlying +// datastore used by an instance of BufferedStorageBackend. +// +// these numbers were derived empirically from benchmarking analysis: +// https://github.com/stellar/go/issues/5390 +// +// ledgersPerFile - number of ledgers per file from remote datastore schema. +// return - preconfigured instance of BufferedStorageBackendConfig +func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) ledgerbackend.BufferedStorageBackendConfig { + + config := ledgerbackend.BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + } + + switch { + case ledgersPerFile < 2: + config.BufferSize = 500 + config.NumWorkers = 5 + return config + case ledgersPerFile < 101: + config.BufferSize = 10 + config.NumWorkers = 5 + return config + default: + config.BufferSize = 10 + config.NumWorkers = 2 + return config + } +} + +type PublisherConfig struct { + // Registry, optional, include to capture buffered storage backend metrics + Registry *prometheus.Registry + // RegistryNamespace, optional, include to emit buffered storage backend + // under this namespace + RegistryNamespace string + // BufferedStorageConfig, required + BufferedStorageConfig ledgerbackend.BufferedStorageBackendConfig + //DataStoreConfig, required + DataStoreConfig datastore.DataStoreConfig + // Log, optional, if nil uses go default logger + Log *log.Entry +} + +// ApplyLedgerMetadata - creates an internal instance +// of BufferedStorageBackend using provided config and emits +// ledger metadata for the requested range by invoking the provided callback +// once per ledger. +// +// The function is blocking, it will only return when a bounded range +// is completed, the ctx is canceled, or an error occurs. +// +// ledgerRange - the requested range, can be bounded or unbounded. +// +// publisherConfig - PublisherConfig. Provide configuration settings for DataStore +// and BufferedStorageBackend. Use DefaultBufferedStorageBackendConfig() to create +// optimized BufferedStorageBackendConfig. +// +// ctx - the context. Caller uses this to cancel the internal ledger processing, +// when canceled, the function will return asap with that error. +// +// callback - function. Invoked for every LedgerCloseMeta. If callback invocation +// returns an error, the processing will stop and return an error asap. +// +// return - error, function only returns if requested range is bounded or an error occured. +// nil will be returned only if bounded range requested and completed processing with no errors. +// otherwise return will always be an error. +func ApplyLedgerMetadata(ledgerRange ledgerbackend.Range, + publisherConfig PublisherConfig, + ctx context.Context, + callback func(xdr.LedgerCloseMeta) error) error { + + logger := publisherConfig.Log + if logger == nil { + logger = log.DefaultLogger + } + + dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig) + if err != nil { + return fmt.Errorf("failed to create datastore: %w", err) + } + + var ledgerBackend ledgerbackend.LedgerBackend + ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore) + if err != nil { + return fmt.Errorf("failed to create buffered storage backend: %w", err) + } + + if publisherConfig.Registry != nil { + ledgerBackend = ledgerbackend.WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace) + } + + if ledgerRange.Bounded() && ledgerRange.To() <= ledgerRange.From() { + return fmt.Errorf("invalid end value for bounded range, must be greater than start") + } + + if !ledgerRange.Bounded() && ledgerRange.To() > 0 { + return fmt.Errorf("invalid end value for unbounded range, must be zero") + } + + from := ordered.Max(2, ledgerRange.From()) + ledgerBackend.PrepareRange(ctx, ledgerRange) + + for ledgerSeq := from; ledgerSeq <= ledgerRange.To() || !ledgerRange.Bounded(); ledgerSeq++ { + var ledgerCloseMeta xdr.LedgerCloseMeta + + logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...") + startTime := time.Now() + ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq) + + if err != nil { + return fmt.Errorf("error getting ledger, %w", err) + } + + log.WithFields(log.F{ + "sequence": ledgerSeq, + "duration": time.Since(startTime).Seconds(), + }).Info("Ledger returned from the backend") + + err = callback(ledgerCloseMeta) + if err != nil { + return fmt.Errorf("received an error from callback invocation: %w", err) + } + } + return nil +} diff --git a/ingest/cdp/producer_test.go b/ingest/cdp/producer_test.go new file mode 100644 index 0000000000..2c1b39c247 --- /dev/null +++ b/ingest/cdp/producer_test.go @@ -0,0 +1,296 @@ +package cdp + +import ( + "bytes" + "context" + "fmt" + "io" + "math" + "os" + "testing" + "time" + + "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/support/compressxdr" + "github.com/stellar/go/support/datastore" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestDefaultBSBConfigs(t *testing.T) { + smallConfig := ledgerbackend.BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + BufferSize: 500, + NumWorkers: 5, + } + + mediumConfig := ledgerbackend.BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + BufferSize: 10, + NumWorkers: 5, + } + + largeConfig := ledgerbackend.BufferedStorageBackendConfig{ + RetryLimit: 5, + RetryWait: 30 * time.Second, + BufferSize: 10, + NumWorkers: 2, + } + + assert.Equal(t, DefaultBufferedStorageBackendConfig(1), smallConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(2), mediumConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(100), mediumConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(101), largeConfig) + assert.Equal(t, DefaultBufferedStorageBackendConfig(1000), largeConfig) +} + +func TestBSBProducerFn(t *testing.T) { + startLedger := uint32(2) + endLedger := uint32(3) + ctx := context.Background() + ledgerRange := ledgerbackend.BoundedRange(startLedger, endLedger) + mockDataStore := createMockdataStore(t, startLedger, endLedger, 64000) + dsConfig := datastore.DataStoreConfig{} + pubConfig := PublisherConfig{ + DataStoreConfig: dsConfig, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + + // inject the mock datastore using the package private testing factory override + datastoreFactory = func(ctx context.Context, datastoreConfig datastore.DataStoreConfig) (datastore.DataStore, error) { + assert.Equal(t, datastoreConfig, dsConfig) + return mockDataStore, nil + } + + expectedLcmSeqWasPublished := []bool{false, false} + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + if lcm.MustV0().LedgerHeader.Header.LedgerSeq == 2 { + if expectedLcmSeqWasPublished[0] { + assert.Fail(t, "producer fn had multiple callback invocations for same lcm") + } + expectedLcmSeqWasPublished[0] = true + } + if lcm.MustV0().LedgerHeader.Header.LedgerSeq == 3 { + if expectedLcmSeqWasPublished[1] { + assert.Fail(t, "producer fn had multiple callback invocations for same lcm") + } + expectedLcmSeqWasPublished[1] = true + } + return nil + } + + assert.Nil(t, ApplyLedgerMetadata(ledgerRange, pubConfig, ctx, appCallback)) + assert.Equal(t, expectedLcmSeqWasPublished, []bool{true, true}, "producer fn did not invoke callback for all expected lcm") +} + +func TestBSBProducerFnDataStoreError(t *testing.T) { + ctx := context.Background() + ledgerRange := ledgerbackend.BoundedRange(uint32(2), uint32(3)) + pubConfig := PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: ledgerbackend.BufferedStorageBackendConfig{}, + } + + datastoreFactory = func(ctx context.Context, datastoreConfig datastore.DataStoreConfig) (datastore.DataStore, error) { + return &datastore.MockDataStore{}, errors.New("uhoh") + } + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + assert.ErrorContains(t, + ApplyLedgerMetadata(ledgerRange, pubConfig, ctx, appCallback), + "failed to create datastore:") +} + +func TestBSBProducerFnConfigError(t *testing.T) { + ctx := context.Background() + ledgerRange := ledgerbackend.BoundedRange(uint32(2), uint32(3)) + pubConfig := PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: ledgerbackend.BufferedStorageBackendConfig{}, + } + mockDataStore := new(datastore.MockDataStore) + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + assert.ErrorContains(t, + ApplyLedgerMetadata(ledgerRange, pubConfig, ctx, appCallback), + "failed to create buffered storage backend") + mockDataStore.AssertExpectations(t) +} + +func TestBSBProducerFnInvalidRange(t *testing.T) { + ctx := context.Background() + pubConfig := PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + mockDataStore := new(datastore.MockDataStore) + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: 1, + FilesPerPartition: 1, + }) + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + + assert.ErrorContains(t, + ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback), + "invalid end value for bounded range, must be greater than start") + mockDataStore.AssertExpectations(t) +} + +func TestBSBProducerFnGetLedgerError(t *testing.T) { + ctx := context.Background() + pubConfig := PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + // we don't want to let buffer do real retries, force the first error to propagate + pubConfig.BufferedStorageConfig.RetryLimit = 0 + mockDataStore := new(datastore.MockDataStore) + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: 1, + FilesPerPartition: 1, + }) + + mockDataStore.On("GetFile", mock.Anything, "FFFFFFFD--2.xdr.zstd").Return(nil, os.ErrNotExist).Once() + // since buffer is multi-worker async, it may get to this on other worker, but not deterministic, + // don't assert on it + mockDataStore.On("GetFile", mock.Anything, "FFFFFFFC--3.xdr.zstd").Return(makeSingleLCMBatch(3), nil).Maybe() + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + assert.ErrorContains(t, + ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), + "error getting ledger") + + mockDataStore.AssertExpectations(t) +} + +func TestBSBProducerCallerCancelsCtx(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + pubConfig := PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + + // the buffering runs async, test needs to stub datastore methods for potential invocation, + // but is race, since test also cancels the backend context which started the buffer, + // so, not deterministic, no assert on these. + mockDataStore := new(datastore.MockDataStore) + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: 1, + FilesPerPartition: 1, + }) + + mockDataStore.On("GetFile", mock.Anything, "FFFFFFFD--2.xdr.zstd").Return(makeSingleLCMBatch(2), nil) + mockDataStore.On("GetFile", mock.Anything, "FFFFFFFC--3.xdr.zstd").Return(makeSingleLCMBatch(3), nil) + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return nil + } + + cancel() + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + assert.ErrorIs(t, + ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), + context.Canceled) +} + +func TestBSBProducerFnCallbackError(t *testing.T) { + ctx := context.Background() + pubConfig := PublisherConfig{ + DataStoreConfig: datastore.DataStoreConfig{}, + BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1), + } + mockDataStore := createMockdataStore(t, 2, 3, 64000) + + appCallback := func(lcm xdr.LedgerCloseMeta) error { + return errors.New("uhoh") + } + + datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { + return mockDataStore, nil + } + assert.ErrorContains(t, + ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), + "received an error from callback invocation") +} + +func createMockdataStore(t *testing.T, start, end, partitionSize uint32) *datastore.MockDataStore { + mockDataStore := new(datastore.MockDataStore) + partition := partitionSize - 1 + for i := start; i <= end; i++ { + objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-i, i) + mockDataStore.On("GetFile", mock.Anything, objectName).Return(makeSingleLCMBatch(i), nil).Once() + } + mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ + LedgersPerFile: 1, + FilesPerPartition: partitionSize, + }) + + t.Cleanup(func() { + mockDataStore.AssertExpectations(t) + }) + + return mockDataStore +} + +func makeSingleLCMBatch(seq uint32) io.ReadCloser { + lcm := xdr.LedgerCloseMetaBatch{ + StartSequence: xdr.Uint32(seq), + EndSequence: xdr.Uint32(seq), + LedgerCloseMetas: []xdr.LedgerCloseMeta{ + createLedgerCloseMeta(seq), + }, + } + encoder := compressxdr.NewXDREncoder(compressxdr.DefaultCompressor, lcm) + var buf bytes.Buffer + encoder.WriteTo(&buf) + capturedBuf := buf.Bytes() + reader := bytes.NewReader(capturedBuf) + return io.NopCloser(reader) +} + +func createLedgerCloseMeta(ledgerSeq uint32) xdr.LedgerCloseMeta { + return xdr.LedgerCloseMeta{ + V: int32(0), + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(ledgerSeq), + }, + }, + TxSet: xdr.TransactionSet{}, + TxProcessing: nil, + UpgradesProcessing: nil, + ScpInfo: nil, + }, + V1: nil, + } +} diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go index 6c95b465f7..0d461cff07 100644 --- a/ingest/ledgerbackend/buffered_storage_backend_test.go +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -76,7 +76,7 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32) readCloser = createLCMBatchReader(i, i, count) objectName = fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-i, i) } - mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil) + mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil).Times(1) } mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{ LedgersPerFile: count, diff --git a/ingest/ledgerbackend/range.go b/ingest/ledgerbackend/range.go index f0c80695a1..99b4dfc800 100644 --- a/ingest/ledgerbackend/range.go +++ b/ingest/ledgerbackend/range.go @@ -46,6 +46,18 @@ func (r Range) String() string { return fmt.Sprintf("[%d,latest)", r.from) } +func (r Range) Bounded() bool { + return r.bounded +} + +func (r Range) To() uint32 { + return r.to +} + +func (r Range) From() uint32 { + return r.from +} + func (r Range) Contains(other Range) bool { if r.bounded && !other.bounded { return false diff --git a/support/datastore/mocks.go b/support/datastore/mocks.go index 2fa39a4712..a8c10438ab 100644 --- a/support/datastore/mocks.go +++ b/support/datastore/mocks.go @@ -29,7 +29,11 @@ func (m *MockDataStore) GetFileMetadata(ctx context.Context, path string) (map[s func (m *MockDataStore) GetFile(ctx context.Context, path string) (io.ReadCloser, error) { args := m.Called(ctx, path) - return args.Get(0).(io.ReadCloser), args.Error(1) + closer := (io.ReadCloser)(nil) + if args.Get(0) != nil { + closer = args.Get(0).(io.ReadCloser) + } + return closer, args.Error(1) } func (m *MockDataStore) PutFile(ctx context.Context, path string, in io.WriterTo, metadata map[string]string) error {