Skip to content

Commit

Permalink
stellar#5412: converted producer function to sync signature, per revi…
Browse files Browse the repository at this point in the history
…ew feedback on api best practice
  • Loading branch information
sreuland committed Sep 27, 2024
1 parent 52bb0bd commit 8c3f694
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 162 deletions.
117 changes: 52 additions & 65 deletions ingest/cdp/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,98 +63,85 @@ type PublisherConfig struct {
Log *log.Entry
}

// PublishFromBufferedStorageBackend is asynchronous.
// Proceeds to create an internal instance of BufferedStorageBackend
// using provided configs and emit ledgers asynchronously to the provided
// callback fn for all ledgers in the requested range.
// PublishFromBufferedStorageBackend - create an internal instance
// of BufferedStorageBackend using provided config and emit
// ledger metadata for the requested range by invoking the provided callback
// once per ledger.
//
// ledgerRange - the requested range. If bounded range, will close resultCh
// after last ledger is emitted.
// The function is blocking, it will only return when a bounded range
// is completed, the ctx is canceled, or an error occurs.
//
// ledgerRange - the requested range, can be bounded or unbounded.
//
// publisherConfig - PublisherConfig. Provide configuration settings for DataStore
// and BufferedStorageBackend. Use DefaultBufferedStorageBackendConfig() to create
// optimized BufferedStorageBackendConfig.
//
// ctx - the context. Caller uses this to cancel the asynchronousledger processing.
// If caller does cancel, can sync on resultCh to receive an error to confirm
// all asynchronous processing stopped.
// ctx - the context. Caller uses this to cancel the internal ledger processing,
// when canceled, the function will return asap with that error.
//
// callback - function. Invoked for every LedgerCloseMeta. If callback invocation
// returns an error, the publishing will shut down and indicate with error on resultCh.
// returns an error, the processing will stop and return an error asap.
//
// return - channel, used to signal to caller when publishing has stopped.
// If stoppage was due to an error, the error will be sent on
// channel and then closed. If no errors and ledgerRange is bounded,
// the channel will be closed when range is completed. If ledgerRange
// is unbounded, then the channel is never closed until an error
// or caller cancels.
// return - error, function only returns if requested range is bounded or an error occured.
// nil will be returned only if bounded range requested and completed processing with no errors.
// otherwise return will always be an error.
func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range,
publisherConfig PublisherConfig,
ctx context.Context,
callback func(xdr.LedgerCloseMeta) error) chan error {
callback func(xdr.LedgerCloseMeta) error) error {

logger := publisherConfig.Log
if logger == nil {
logger = log.DefaultLogger
}
resultCh := make(chan error, 1)

go func() {
defer close(resultCh)
dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig)
if err != nil {
resultCh <- fmt.Errorf("failed to create datastore: %w", err)
return
}
dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig)
if err != nil {
return fmt.Errorf("failed to create datastore: %w", err)
}

var ledgerBackend ledgerbackend.LedgerBackend
ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore)
if err != nil {
resultCh <- fmt.Errorf("failed to create buffered storage backend: %w", err)
return
}
var ledgerBackend ledgerbackend.LedgerBackend
ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore)
if err != nil {
return fmt.Errorf("failed to create buffered storage backend: %w", err)
}

if publisherConfig.Registry != nil {
ledgerBackend = ledgerbackend.WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace)
}
if publisherConfig.Registry != nil {
ledgerBackend = ledgerbackend.WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace)
}

if ledgerRange.Bounded() && ledgerRange.To() <= ledgerRange.From() {
resultCh <- fmt.Errorf("invalid end value for bounded range, must be greater than start")
return
}
if ledgerRange.Bounded() && ledgerRange.To() <= ledgerRange.From() {
return fmt.Errorf("invalid end value for bounded range, must be greater than start")
}

if !ledgerRange.Bounded() && ledgerRange.To() > 0 {
resultCh <- fmt.Errorf("invalid end value for unbounded range, must be zero")
return
}
if !ledgerRange.Bounded() && ledgerRange.To() > 0 {
return fmt.Errorf("invalid end value for unbounded range, must be zero")
}

from := ordered.Max(2, ledgerRange.From())
ledgerBackend.PrepareRange(ctx, ledgerRange)
from := ordered.Max(2, ledgerRange.From())
ledgerBackend.PrepareRange(ctx, ledgerRange)

for ledgerSeq := from; ledgerSeq <= ledgerRange.To() || !ledgerRange.Bounded(); ledgerSeq++ {
var ledgerCloseMeta xdr.LedgerCloseMeta
for ledgerSeq := from; ledgerSeq <= ledgerRange.To() || !ledgerRange.Bounded(); ledgerSeq++ {
var ledgerCloseMeta xdr.LedgerCloseMeta

logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...")
startTime := time.Now()
ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq)
logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...")
startTime := time.Now()
ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq)

if err != nil {
resultCh <- fmt.Errorf("error getting ledger, %w", err)
return
}
if err != nil {
return fmt.Errorf("error getting ledger, %w", err)
}

log.WithFields(log.F{
"sequence": ledgerSeq,
"duration": time.Since(startTime).Seconds(),
}).Info("Ledger returned from the backend")
log.WithFields(log.F{
"sequence": ledgerSeq,
"duration": time.Since(startTime).Seconds(),
}).Info("Ledger returned from the backend")

err = callback(ledgerCloseMeta)
if err != nil {
resultCh <- fmt.Errorf("received an error from callback invocation: %w", err)
return
}
err = callback(ledgerCloseMeta)
if err != nil {
return fmt.Errorf("received an error from callback invocation: %w", err)
}
}()

return resultCh
}
return nil
}
114 changes: 17 additions & 97 deletions ingest/cdp/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,8 @@ func TestBSBProducerFn(t *testing.T) {
return nil
}

resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback)

assert.Eventually(t, func() bool {
select {
case chErr, ok := <-resultCh:
if ok {
assert.Failf(t, "", "producer fn should not have stopped with error %v", chErr)
}
return true
default:
}
return false
},
time.Second*3,
time.Millisecond*50)

assert.Nil(t, PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback))
assert.Equal(t, expectedLcmSeqWasPublished, []bool{true, true}, "producer fn did not invoke callback for all expected lcm")

}

func TestBSBProducerFnDataStoreError(t *testing.T) {
Expand All @@ -120,22 +104,9 @@ func TestBSBProducerFnDataStoreError(t *testing.T) {
return nil
}

resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback)
assert.Eventually(t, func() bool {
select {
case chErr, ok := <-resultCh:
if ok {
assert.ErrorContains(t, chErr, "failed to create datastore:")
} else {
assert.Fail(t, "", "producer fn should not have closed the result ch")
}
return true
default:
}
return false
},
time.Second*3,
time.Millisecond*50)
assert.ErrorContains(t,
PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback),
"failed to create datastore:")
}

func TestBSBProducerFnConfigError(t *testing.T) {
Expand All @@ -153,22 +124,9 @@ func TestBSBProducerFnConfigError(t *testing.T) {
datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) {
return mockDataStore, nil
}
resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback)
assert.Eventually(t, func() bool {
select {
case chErr, ok := <-resultCh:
if ok {
assert.ErrorContains(t, chErr, "failed to create buffered storage backend")
} else {
assert.Fail(t, "producer fn should not have closed the result ch")
}
return true
default:
}
return false
},
time.Second*3,
time.Millisecond*50)
assert.ErrorContains(t,
PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback),
"failed to create buffered storage backend")
}

func TestBSBProducerFnInvalidRange(t *testing.T) {
Expand All @@ -190,22 +148,10 @@ func TestBSBProducerFnInvalidRange(t *testing.T) {
datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) {
return mockDataStore, nil
}
resultCh := PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback)
assert.Eventually(t, func() bool {
select {
case chErr, ok := <-resultCh:
if ok {
assert.ErrorContains(t, chErr, "invalid end value for bounded range, must be greater than start")
} else {
assert.Fail(t, "producer fn should not have closed the result ch")
}
return true
default:
}
return false
},
time.Second*3,
time.Millisecond*50)

assert.ErrorContains(t,
PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback),
"invalid end value for bounded range, must be greater than start")
}

func TestBSBProducerFnGetLedgerError(t *testing.T) {
Expand All @@ -232,22 +178,9 @@ func TestBSBProducerFnGetLedgerError(t *testing.T) {
datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) {
return mockDataStore, nil
}
resultCh := PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback)
assert.Eventually(t, func() bool {
select {
case chErr, ok := <-resultCh:
if ok {
assert.ErrorContains(t, chErr, "error getting ledger")
} else {
assert.Fail(t, "producer fn should not have closed the result ch")
}
return true
default:
}
return false
},
time.Second*3000,
time.Millisecond*50)
assert.ErrorContains(t,
PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback),
"error getting ledger")
}

func TestBSBProducerFnCallbackError(t *testing.T) {
Expand All @@ -265,22 +198,9 @@ func TestBSBProducerFnCallbackError(t *testing.T) {
datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) {
return mockDataStore, nil
}
resultCh := PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback)
assert.Eventually(t, func() bool {
select {
case chErr, ok := <-resultCh:
if ok {
assert.ErrorContains(t, chErr, "received an error from callback invocation")
} else {
assert.Fail(t, "producer fn should not have closed the result ch")
}
return true
default:
}
return false
},
time.Second*3,
time.Millisecond*50)
assert.ErrorContains(t,
PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback),
"received an error from callback invocation")
}

func createMockdataStore(t *testing.T, start, end, partitionSize uint32) *datastore.MockDataStore {
Expand Down

0 comments on commit 8c3f694

Please sign in to comment.