Skip to content

Commit

Permalink
stellar#5412: moved PublisherConfig to cdp package
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Sep 25, 2024
1 parent 3414d24 commit 68f7f43
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 23 deletions.
17 changes: 16 additions & 1 deletion ingest/cdp/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/errors"
Expand Down Expand Up @@ -50,6 +51,20 @@ func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) ledgerbackend.Bu
}
}

type PublisherConfig struct {
// Registry, optional, include to capture buffered storage backend metrics
Registry *prometheus.Registry
// RegistryNamespace, optional, include to emit buffered storage backend
// under this namespace
RegistryNamespace string
// BufferedStorageConfig, required
BufferedStorageConfig ledgerbackend.BufferedStorageBackendConfig
//DataStoreConfig, required
DataStoreConfig datastore.DataStoreConfig
// Log, optional, if nil uses go default logger
Log *log.Entry
}

// PublishFromBufferedStorageBackend is asynchronous.
// Proceeds to create an internal instance of BufferedStorageBackend
// using provided configs and emit ledgers asynchronously to the provided
Expand All @@ -76,7 +91,7 @@ func DefaultBufferedStorageBackendConfig(ledgersPerFile uint32) ledgerbackend.Bu
// is unbounded, then the channel is never closed until an error
// or caller cancels.
func PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range,
publisherConfig ledgerbackend.PublisherConfig,
publisherConfig PublisherConfig,
ctx context.Context,
callback func(xdr.LedgerCloseMeta) error) chan error {

Expand Down
12 changes: 6 additions & 6 deletions ingest/cdp/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestBSBProducerFn(t *testing.T) {
ledgerRange := ledgerbackend.BoundedRange(startLedger, endLedger)
mockDataStore := createMockdataStore(t, startLedger, endLedger, 64000)
dsConfig := datastore.DataStoreConfig{}
pubConfig := ledgerbackend.PublisherConfig{
pubConfig := PublisherConfig{
DataStoreConfig: dsConfig,
BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1),
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestBSBProducerFn(t *testing.T) {
func TestBSBProducerFnDataStoreError(t *testing.T) {
ctx := context.Background()
ledgerRange := ledgerbackend.BoundedRange(uint32(2), uint32(3))
pubConfig := ledgerbackend.PublisherConfig{
pubConfig := PublisherConfig{
DataStoreConfig: datastore.DataStoreConfig{},
BufferedStorageConfig: ledgerbackend.BufferedStorageBackendConfig{},
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestBSBProducerFnDataStoreError(t *testing.T) {
func TestBSBProducerFnConfigError(t *testing.T) {
ctx := context.Background()
ledgerRange := ledgerbackend.BoundedRange(uint32(2), uint32(3))
pubConfig := ledgerbackend.PublisherConfig{
pubConfig := PublisherConfig{
DataStoreConfig: datastore.DataStoreConfig{},
BufferedStorageConfig: ledgerbackend.BufferedStorageBackendConfig{},
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestBSBProducerFnConfigError(t *testing.T) {

func TestBSBProducerFnInvalidRange(t *testing.T) {
ctx := context.Background()
pubConfig := ledgerbackend.PublisherConfig{
pubConfig := PublisherConfig{
DataStoreConfig: datastore.DataStoreConfig{},
BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1),
}
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestBSBProducerFnInvalidRange(t *testing.T) {

func TestBSBProducerFnGetLedgerError(t *testing.T) {
ctx := context.Background()
pubConfig := ledgerbackend.PublisherConfig{
pubConfig := PublisherConfig{
DataStoreConfig: datastore.DataStoreConfig{},
BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1),
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestBSBProducerFnGetLedgerError(t *testing.T) {

func TestBSBProducerFnCallbackError(t *testing.T) {
ctx := context.Background()
pubConfig := ledgerbackend.PublisherConfig{
pubConfig := PublisherConfig{
DataStoreConfig: datastore.DataStoreConfig{},
BufferedStorageConfig: DefaultBufferedStorageBackendConfig(1),
}
Expand Down
16 changes: 0 additions & 16 deletions ingest/ledgerbackend/buffered_storage_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ import (
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -66,20 +64,6 @@ func NewBufferedStorageBackend(config BufferedStorageBackendConfig, dataStore da
return bsBackend, nil
}

type PublisherConfig struct {
// Registry, optional, include to capture buffered storage backend metrics
Registry *prometheus.Registry
// RegistryNamespace, optional, include to emit buffered storage backend
// under this namespace
RegistryNamespace string
// BufferedStorageConfig, required
BufferedStorageConfig BufferedStorageBackendConfig
//DataStoreConfig, required
DataStoreConfig datastore.DataStoreConfig
// Log, optional, if nil uses go default logger
Log *log.Entry
}

// GetLatestLedgerSequence returns the most recent ledger sequence number available in the buffer.
func (bsb *BufferedStorageBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
bsb.bsBackendLock.RLock()
Expand Down

0 comments on commit 68f7f43

Please sign in to comment.