Skip to content

Commit

Permalink
stellar#5412: renamed the producer fn to ApplyLedgerMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Oct 3, 2024
1 parent 33ba777 commit 1b1244b
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
2 changes: 1 addition & 1 deletion ingest/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ All notable changes to this project will be documented in this file. This projec

### New Features
* Create new package `ingest/cdp` for new components which will assist towards writing data transformation pipelines as part of [Composable Data Platform](https://stellar.org/blog/developers/composable-data-platform).
* Add new functional producer, `cdp.PublishFromBufferedStorageBackend`. A new function which enables a private instance of `BufferedStorageBackend` to perfrom the role of a producer operator in streaming pipeline designs. It will emit pre-computed `LedgerCloseMeta` from a chosen `DataStore`. The stream can use `PublishFromBufferedStorageBackend` as the origin of `LedgerCloseMeta`, providing a callback function which acts as the next operator in the stream, receiving the `LedgerCloseMeta`. [5462](https://github.com/stellar/go/pull/5462).
* Add new functional producer, `cdp.ApplyLedgerMetadata`. A new function which enables a private instance of `BufferedStorageBackend` to perfrom the role of a producer operator in streaming pipeline designs. It will emit pre-computed `LedgerCloseMeta` from a chosen `DataStore`. The stream can use `ApplyLedgerMetadata` as the origin of `LedgerCloseMeta`, providing a callback function which acts as the next operator in the stream, receiving the `LedgerCloseMeta`. [5462](https://github.com/stellar/go/pull/5462).

### Stellar Core Protocol 21 Configuration Update:
* BucketlistDB is now the default database for stellar-core, replacing the experimental option. As a result, the `EXPERIMENTAL_BUCKETLIST_DB` configuration parameter has been deprecated.
Expand Down
6 changes: 3 additions & 3 deletions ingest/cdp/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ type PublisherConfig struct {
Log *log.Entry
}

// PublishFromBufferedStorageBackend - create an internal instance
// of BufferedStorageBackend using provided config and emit
// ApplyLedgerMetadata - creates an internal instance
// of BufferedStorageBackend using provided config and emits
// ledger metadata for the requested range by invoking the provided callback
// once per ledger.
//
Expand All @@ -86,7 +86,7 @@ type PublisherConfig struct {
// return - error, function only returns if requested range is bounded or an error occured.
// nil will be returned only if bounded range requested and completed processing with no errors.
// otherwise return will always be an error.
func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range,
func ApplyLedgerMetadata(ledgerRange ledgerbackend.Range,
publisherConfig PublisherConfig,
ctx context.Context,
callback func(xdr.LedgerCloseMeta) error) error {
Expand Down
14 changes: 7 additions & 7 deletions ingest/cdp/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestBSBProducerFn(t *testing.T) {
return nil
}

assert.Nil(t, PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback))
assert.Nil(t, ApplyLedgerMetadata(ledgerRange, pubConfig, ctx, appCallback))
assert.Equal(t, expectedLcmSeqWasPublished, []bool{true, true}, "producer fn did not invoke callback for all expected lcm")
}

Expand All @@ -105,7 +105,7 @@ func TestBSBProducerFnDataStoreError(t *testing.T) {
}

assert.ErrorContains(t,
PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback),
ApplyLedgerMetadata(ledgerRange, pubConfig, ctx, appCallback),
"failed to create datastore:")
}

Expand All @@ -125,7 +125,7 @@ func TestBSBProducerFnConfigError(t *testing.T) {
return mockDataStore, nil
}
assert.ErrorContains(t,
PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback),
ApplyLedgerMetadata(ledgerRange, pubConfig, ctx, appCallback),
"failed to create buffered storage backend")
mockDataStore.AssertExpectations(t)
}
Expand All @@ -151,7 +151,7 @@ func TestBSBProducerFnInvalidRange(t *testing.T) {
}

assert.ErrorContains(t,
PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback),
ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback),
"invalid end value for bounded range, must be greater than start")
mockDataStore.AssertExpectations(t)
}
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestBSBProducerFnGetLedgerError(t *testing.T) {
return mockDataStore, nil
}
assert.ErrorContains(t,
PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback),
ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback),
"error getting ledger")

mockDataStore.AssertExpectations(t)
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestBSBProducerCallerCancelsCtx(t *testing.T) {
return mockDataStore, nil
}
assert.ErrorIs(t,
PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback),
ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback),
context.Canceled)
}

Expand All @@ -238,7 +238,7 @@ func TestBSBProducerFnCallbackError(t *testing.T) {
return mockDataStore, nil
}
assert.ErrorContains(t,
PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback),
ApplyLedgerMetadata(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback),
"received an error from callback invocation")
}

Expand Down

0 comments on commit 1b1244b

Please sign in to comment.