diff --git a/ingest/CHANGELOG.md b/ingest/CHANGELOG.md index 04fb9b7bbe..3d3835a799 100644 --- a/ingest/CHANGELOG.md +++ b/ingest/CHANGELOG.md @@ -6,7 +6,7 @@ All notable changes to this project will be documented in this file. This projec ### New Features * Create new package `ingest/cdp` for new components which will assist towards writing data transformation pipelines as part of [Composable Data Platform](https://stellar.org/blog/developers/composable-data-platform). -* Add new functional producer, `cdp.PublishFromBufferedStorageBackend`. A new function which enables a private instance of `BufferedStorageBackend` to perfrom the role of a producer operator in streaming pipeline designs. It will emit pre-computed `LedgerCloseMeta` from a chosen `DataStore`. The stream can use `PublishFromBufferedStorageBackend` as the origin of `LedgerCloseMeta`, providing a callback function which acts as the next operator in the stream, receiving the `LedgerCloseMeta`. [5462](https://github.com/stellar/go/pull/5462). +* Add new functional producer, `cdp.ApplyLedgerMetadata`. A new function which enables a private instance of `BufferedStorageBackend` to perfrom the role of a producer operator in streaming pipeline designs. It will emit pre-computed `LedgerCloseMeta` from a chosen `DataStore`. The stream can use `ApplyLedgerMetadata` as the origin of `LedgerCloseMeta`, providing a callback function which acts as the next operator in the stream, receiving the `LedgerCloseMeta`. [5462](https://github.com/stellar/go/pull/5462). ### Stellar Core Protocol 21 Configuration Update: * BucketlistDB is now the default database for stellar-core, replacing the experimental option. As a result, the `EXPERIMENTAL_BUCKETLIST_DB` configuration parameter has been deprecated. diff --git a/ingest/cdp/producer.go b/ingest/cdp/producer.go index ef863e0a55..0c5ff6de49 100644 --- a/ingest/cdp/producer.go +++ b/ingest/cdp/producer.go @@ -63,8 +63,8 @@ type PublisherConfig struct { Log *log.Entry } -// PublishFromBufferedStorageBackend - create an internal instance -// of BufferedStorageBackend using provided config and emit +// ApplyLedgerMetadata - creates an internal instance +// of BufferedStorageBackend using provided config and emits // ledger metadata for the requested range by invoking the provided callback // once per ledger. // @@ -86,7 +86,7 @@ type PublisherConfig struct { // return - error, function only returns if requested range is bounded or an error occured. // nil will be returned only if bounded range requested and completed processing with no errors. // otherwise return will always be an error. -func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range, +func ApplyLedgerMetadata(ledgerRange ledgerbackend.Range, publisherConfig PublisherConfig, ctx context.Context, callback func(xdr.LedgerCloseMeta) error) error { diff --git a/ingest/cdp/producer_test.go b/ingest/cdp/producer_test.go index 1e93955052..2c1b39c247 100644 --- a/ingest/cdp/producer_test.go +++ b/ingest/cdp/producer_test.go @@ -84,7 +84,7 @@ func TestBSBProducerFn(t *testing.T) { return nil } - assert.Nil(t, PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback)) + assert.Nil(t, ApplyLedgerMetadata(ledgerRange, pubConfig, ctx, appCallback)) assert.Equal(t, expectedLcmSeqWasPublished, []bool{true, true}, "producer fn did not invoke callback for all expected lcm") } @@ -105,7 +105,7 @@ func TestBSBProducerFnDataStoreError(t *testing.T) { } assert.ErrorContains(t, - PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback), + ApplyLedgerMetadata(ledgerRange, pubConfig, ctx, appCallback), "failed to create datastore:") } @@ -125,7 +125,7 @@ func TestBSBProducerFnConfigError(t *testing.T) { return mockDataStore, nil } assert.ErrorContains(t, - PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback), + ApplyLedgerMetadata(ledgerRange, pubConfig, ctx, appCallback), "failed to create buffered storage backend") mockDataStore.AssertExpectations(t) } @@ -151,7 +151,7 @@ func TestBSBProducerFnInvalidRange(t *testing.T) { } assert.ErrorContains(t, - PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback), + ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback), "invalid end value for bounded range, must be greater than start") mockDataStore.AssertExpectations(t) } @@ -183,7 +183,7 @@ func TestBSBProducerFnGetLedgerError(t *testing.T) { return mockDataStore, nil } assert.ErrorContains(t, - PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), + ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), "error getting ledger") mockDataStore.AssertExpectations(t) @@ -218,7 +218,7 @@ func TestBSBProducerCallerCancelsCtx(t *testing.T) { return mockDataStore, nil } assert.ErrorIs(t, - PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), + ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), context.Canceled) } @@ -238,7 +238,7 @@ func TestBSBProducerFnCallbackError(t *testing.T) { return mockDataStore, nil } assert.ErrorContains(t, - PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), + ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), "received an error from callback invocation") }