From 8c3f694b3eda98f4116eb17dbbf90b8c686f9ad2 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Fri, 27 Sep 2024 11:35:21 -0700 Subject: [PATCH] #5412: converted producer function to sync signature, per review feedback on api best practice --- ingest/cdp/producer.go | 117 ++++++++++++++++-------------------- ingest/cdp/producer_test.go | 114 ++++++----------------------------- 2 files changed, 69 insertions(+), 162 deletions(-) diff --git a/ingest/cdp/producer.go b/ingest/cdp/producer.go index 44cc7d3a6d..ef863e0a55 100644 --- a/ingest/cdp/producer.go +++ b/ingest/cdp/producer.go @@ -63,98 +63,85 @@ type PublisherConfig struct { Log *log.Entry } -// PublishFromBufferedStorageBackend is asynchronous. -// Proceeds to create an internal instance of BufferedStorageBackend -// using provided configs and emit ledgers asynchronously to the provided -// callback fn for all ledgers in the requested range. +// PublishFromBufferedStorageBackend - create an internal instance +// of BufferedStorageBackend using provided config and emit +// ledger metadata for the requested range by invoking the provided callback +// once per ledger. // -// ledgerRange - the requested range. If bounded range, will close resultCh -// after last ledger is emitted. +// The function is blocking, it will only return when a bounded range +// is completed, the ctx is canceled, or an error occurs. +// +// ledgerRange - the requested range, can be bounded or unbounded. // // publisherConfig - PublisherConfig. Provide configuration settings for DataStore // and BufferedStorageBackend. Use DefaultBufferedStorageBackendConfig() to create // optimized BufferedStorageBackendConfig. // -// ctx - the context. Caller uses this to cancel the asynchronousledger processing. -// If caller does cancel, can sync on resultCh to receive an error to confirm -// all asynchronous processing stopped. +// ctx - the context. Caller uses this to cancel the internal ledger processing, +// when canceled, the function will return asap with that error. // // callback - function. Invoked for every LedgerCloseMeta. If callback invocation -// returns an error, the publishing will shut down and indicate with error on resultCh. +// returns an error, the processing will stop and return an error asap. // -// return - channel, used to signal to caller when publishing has stopped. -// If stoppage was due to an error, the error will be sent on -// channel and then closed. If no errors and ledgerRange is bounded, -// the channel will be closed when range is completed. If ledgerRange -// is unbounded, then the channel is never closed until an error -// or caller cancels. +// return - error, function only returns if requested range is bounded or an error occured. +// nil will be returned only if bounded range requested and completed processing with no errors. +// otherwise return will always be an error. func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range, publisherConfig PublisherConfig, ctx context.Context, - callback func(xdr.LedgerCloseMeta) error) chan error { + callback func(xdr.LedgerCloseMeta) error) error { logger := publisherConfig.Log if logger == nil { logger = log.DefaultLogger } - resultCh := make(chan error, 1) - go func() { - defer close(resultCh) - dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig) - if err != nil { - resultCh <- fmt.Errorf("failed to create datastore: %w", err) - return - } + dataStore, err := datastoreFactory(ctx, publisherConfig.DataStoreConfig) + if err != nil { + return fmt.Errorf("failed to create datastore: %w", err) + } - var ledgerBackend ledgerbackend.LedgerBackend - ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore) - if err != nil { - resultCh <- fmt.Errorf("failed to create buffered storage backend: %w", err) - return - } + var ledgerBackend ledgerbackend.LedgerBackend + ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(publisherConfig.BufferedStorageConfig, dataStore) + if err != nil { + return fmt.Errorf("failed to create buffered storage backend: %w", err) + } - if publisherConfig.Registry != nil { - ledgerBackend = ledgerbackend.WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace) - } + if publisherConfig.Registry != nil { + ledgerBackend = ledgerbackend.WithMetrics(ledgerBackend, publisherConfig.Registry, publisherConfig.RegistryNamespace) + } - if ledgerRange.Bounded() && ledgerRange.To() <= ledgerRange.From() { - resultCh <- fmt.Errorf("invalid end value for bounded range, must be greater than start") - return - } + if ledgerRange.Bounded() && ledgerRange.To() <= ledgerRange.From() { + return fmt.Errorf("invalid end value for bounded range, must be greater than start") + } - if !ledgerRange.Bounded() && ledgerRange.To() > 0 { - resultCh <- fmt.Errorf("invalid end value for unbounded range, must be zero") - return - } + if !ledgerRange.Bounded() && ledgerRange.To() > 0 { + return fmt.Errorf("invalid end value for unbounded range, must be zero") + } - from := ordered.Max(2, ledgerRange.From()) - ledgerBackend.PrepareRange(ctx, ledgerRange) + from := ordered.Max(2, ledgerRange.From()) + ledgerBackend.PrepareRange(ctx, ledgerRange) - for ledgerSeq := from; ledgerSeq <= ledgerRange.To() || !ledgerRange.Bounded(); ledgerSeq++ { - var ledgerCloseMeta xdr.LedgerCloseMeta + for ledgerSeq := from; ledgerSeq <= ledgerRange.To() || !ledgerRange.Bounded(); ledgerSeq++ { + var ledgerCloseMeta xdr.LedgerCloseMeta - logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...") - startTime := time.Now() - ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq) + logger.WithField("sequence", ledgerSeq).Info("Requesting ledger from the backend...") + startTime := time.Now() + ledgerCloseMeta, err = ledgerBackend.GetLedger(ctx, ledgerSeq) - if err != nil { - resultCh <- fmt.Errorf("error getting ledger, %w", err) - return - } + if err != nil { + return fmt.Errorf("error getting ledger, %w", err) + } - log.WithFields(log.F{ - "sequence": ledgerSeq, - "duration": time.Since(startTime).Seconds(), - }).Info("Ledger returned from the backend") + log.WithFields(log.F{ + "sequence": ledgerSeq, + "duration": time.Since(startTime).Seconds(), + }).Info("Ledger returned from the backend") - err = callback(ledgerCloseMeta) - if err != nil { - resultCh <- fmt.Errorf("received an error from callback invocation: %w", err) - return - } + err = callback(ledgerCloseMeta) + if err != nil { + return fmt.Errorf("received an error from callback invocation: %w", err) } - }() - - return resultCh + } + return nil } diff --git a/ingest/cdp/producer_test.go b/ingest/cdp/producer_test.go index 96d764cb0f..1b0dfa98f4 100644 --- a/ingest/cdp/producer_test.go +++ b/ingest/cdp/producer_test.go @@ -84,24 +84,8 @@ func TestBSBProducerFn(t *testing.T) { return nil } - resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) - - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.Failf(t, "", "producer fn should not have stopped with error %v", chErr) - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) - + assert.Nil(t, PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback)) assert.Equal(t, expectedLcmSeqWasPublished, []bool{true, true}, "producer fn did not invoke callback for all expected lcm") - } func TestBSBProducerFnDataStoreError(t *testing.T) { @@ -120,22 +104,9 @@ func TestBSBProducerFnDataStoreError(t *testing.T) { return nil } - resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "failed to create datastore:") - } else { - assert.Fail(t, "", "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) + assert.ErrorContains(t, + PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback), + "failed to create datastore:") } func TestBSBProducerFnConfigError(t *testing.T) { @@ -153,22 +124,9 @@ func TestBSBProducerFnConfigError(t *testing.T) { datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { return mockDataStore, nil } - resultCh := PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "failed to create buffered storage backend") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) + assert.ErrorContains(t, + PublishFromBufferedStorageBackend(ledgerRange, pubConfig, ctx, appCallback), + "failed to create buffered storage backend") } func TestBSBProducerFnInvalidRange(t *testing.T) { @@ -190,22 +148,10 @@ func TestBSBProducerFnInvalidRange(t *testing.T) { datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { return mockDataStore, nil } - resultCh := PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "invalid end value for bounded range, must be greater than start") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) + + assert.ErrorContains(t, + PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(3), uint32(2)), pubConfig, ctx, appCallback), + "invalid end value for bounded range, must be greater than start") } func TestBSBProducerFnGetLedgerError(t *testing.T) { @@ -232,22 +178,9 @@ func TestBSBProducerFnGetLedgerError(t *testing.T) { datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { return mockDataStore, nil } - resultCh := PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "error getting ledger") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3000, - time.Millisecond*50) + assert.ErrorContains(t, + PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), + "error getting ledger") } func TestBSBProducerFnCallbackError(t *testing.T) { @@ -265,22 +198,9 @@ func TestBSBProducerFnCallbackError(t *testing.T) { datastoreFactory = func(_ context.Context, _ datastore.DataStoreConfig) (datastore.DataStore, error) { return mockDataStore, nil } - resultCh := PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback) - assert.Eventually(t, func() bool { - select { - case chErr, ok := <-resultCh: - if ok { - assert.ErrorContains(t, chErr, "received an error from callback invocation") - } else { - assert.Fail(t, "producer fn should not have closed the result ch") - } - return true - default: - } - return false - }, - time.Second*3, - time.Millisecond*50) + assert.ErrorContains(t, + PublishFromBufferedStorageBackend(ledgerbackend.BoundedRange(uint32(2), uint32(3)), pubConfig, ctx, appCallback), + "received an error from callback invocation") } func createMockdataStore(t *testing.T, start, end, partitionSize uint32) *datastore.MockDataStore {