diff --git a/docs/build/apps/ingest-sdk/ingestion-pipeline-code.mdx b/docs/build/apps/ingest-sdk/ingestion-pipeline-code.mdx
index 39cc46ca0..33042cfc3 100644
--- a/docs/build/apps/ingest-sdk/ingestion-pipeline-code.mdx
+++ b/docs/build/apps/ingest-sdk/ingestion-pipeline-code.mdx
@@ -3,11 +3,14 @@ title: Ingestion Pipeline Sample Code
sidebar_position: 30
---
-Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go [Ingestion SDK](overview.mdx#the-ingestion-sdk-packages) to publish derived data to a remote message broker. Demonstrate event-driven, distributed processing with a sample microservice (Python script) as subscriber.
+Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go [Ingest SDK](overview.mdx#the-ingestion-sdk-packages) paired with new [CDP SDK](https://github.com/stellar/go/tree/master/ingest/cdp) to publish derived data to a remote message broker. Demonstrate event-driven, distributed processing with a sample microservice (Python script) as subscriber.
This example uses the ZeroMQ [goczmq](https://github.com/zeromq/goczmq) Go wrapper SDK, which requires a few o/s [dependent libraries to also be installed on the host machine](https://github.com/zeromq/goczmq?tab=readme-ov-file#dependencies).
-Put these files in a directory, compile and run with `go build -o pipeline ./.; ./pipeline`
+Steps:
+#1 - Put these files in a directory
+#2 - compile and run with `go mod tidy;go build -o pipeline ./.; ./pipeline`
+#3 - in separate terminal, run `python distributed_payment_subsciber.py`, this will perform distributed pipeline topology, as it receives messages with payment info from the pipeline process and does additional processing(printing it to console).
### `go.mod`
@@ -21,8 +24,8 @@ go 1.22
toolchain go1.22.1
require (
- github.com/stellar/go v0.0.0-20240614234001-3a31ed780c58
- github.com/zeromq/goczmq v4.1.0+incompatible
+ github.com/stellar/go v0.0.0-20241008214914-7950d4254e6a
+ github.com/zeromq/goczmq v4.1.0+incompatible
)
```
@@ -33,20 +36,24 @@ require (
```go
+
package main
import (
"context"
"encoding/json"
+ "fmt"
"io"
"log"
"os"
"os/signal"
+ "github.com/pelletier/go-toml"
"github.com/pkg/errors"
"github.com/stellar/go/amount"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
+ "github.com/stellar/go/ingest/cdp"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/datastore"
@@ -56,7 +63,7 @@ import (
"github.com/zeromq/goczmq"
)
-// Application specifics
+// Application payment model
type AppPayment struct {
Timestamp uint
BuyerAccountId string
@@ -65,9 +72,9 @@ type AppPayment struct {
Amount string
}
-// General stream topology
+// application data pipeline
type Message struct {
- Payload []byte
+ Payload interface{}
}
type Processor interface {
@@ -84,7 +91,7 @@ type ZeroMQOutboundAdapter struct {
}
func (adapter *ZeroMQOutboundAdapter) Process(ctx context.Context, msg Message) error {
- _, err := adapter.Publisher.Write(msg.Payload)
+ _, err := adapter.Publisher.Write(msg.Payload.([]byte))
return err
}
@@ -98,12 +105,7 @@ func (transformer *AppPaymentTransformer) Subscribe(receiver Processor) {
}
func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Message) error {
- ledgerCloseMeta := xdr.LedgerCloseMeta{}
- err := ledgerCloseMeta.UnmarshalBinary(msg.Payload)
- if err != nil {
- return errors.Wrapf(err, "failed to unmarshal message payload to LedgerCloseMeta")
- }
-
+ ledgerCloseMeta := msg.Payload.(xdr.LedgerCloseMeta)
ledgerTxReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(transformer.networkPassPhrase, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to create reader for ledger %v", ledgerCloseMeta.LedgerSequence())
@@ -142,109 +144,90 @@ func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Messa
return nil
}
-type CaptiveCoreInboundAdapter struct {
- TomlParams ledgerbackend.CaptiveCoreTomlParams
+type LedgerMetadataInboundAdapter struct {
processors []Processor
historyArchiveURLs []string
- coreConfigNetworkTemplate []byte
+ dataStoreConfig datastore.DataStoreConfig
}
-func (adapter *CaptiveCoreInboundAdapter) Subscribe(receiver Processor) {
+func (adapter *LedgerMetadataInboundAdapter) Subscribe(receiver Processor) {
adapter.processors = append(adapter.processors, receiver)
}
-func (adapter *CaptiveCoreInboundAdapter) Run(ctx context.Context) error {
- // Setup captive core config to use the Pubnet network
- captiveCoreToml, err := ledgerbackend.NewCaptiveCoreTomlFromData(adapter.coreConfigNetworkTemplate, adapter.TomlParams)
- if err != nil {
- return errors.Wrap(err, "error creating captive core toml")
- }
-
- captiveConfig := ledgerbackend.CaptiveCoreConfig{
- BinaryPath: adapter.TomlParams.CoreBinaryPath,
- HistoryArchiveURLs: adapter.TomlParams.HistoryArchiveURLs,
- Context: ctx,
- Toml: captiveCoreToml,
- }
-
- // Create a new captive core backend, the origin of ingestion pipeline
- captiveBackend, err := ledgerbackend.NewCaptive(captiveConfig)
- if err != nil {
- return errors.Wrap(err, "error creating captive core instance")
- }
+func (adapter *LedgerMetadataInboundAdapter) Run(ctx context.Context) error {
- // Create a client to the network's history archives
- historyAchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
+ /////////////////////////////////////////////////////////////////
+ // TODO - make this easier for callers, move latest ledger fetch into cdp.ApplyLedgerMetadata()
+ // so caller doesn't have to this to start streaming from latest use case
+ // i.e. allow unbounded range with from=0 which can signal the same.
+ historyArchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
- UserAgent: "my_app",
+ UserAgent: "payment_demo",
Context: ctx,
},
})
-
if err != nil {
return errors.Wrap(err, "error creating history archive client")
}
+ // Acquire the most recent ledger on network to begin streaming from
+ latestNetworkLedger, err := historyArchive.GetLatestLedgerSequence()
+ /////////////////////////////////////////////////////////////////////
- // Acquire the most recent ledger on network
- latestNetworkLedger, err := datastore.GetLatestLedgerSequenceFromHistoryArchives(historyAchive)
if err != nil {
return errors.Wrap(err, "error getting latest ledger")
}
- // Tell the captive core instance to emit LedgerCloseMeta starting at
- // latest network ledger and continuing indefintely, streaming.
- if err := captiveBackend.PrepareRange(ctx, ledgerbackend.UnboundedRange(latestNetworkLedger)); err != nil {
- return errors.Wrap(err, "error preparing captive core ledger range")
- }
-
- // Run endless loop that receives LedgerCloseMeta from captive core for each new
- // ledger generated by the network and pushes it to next processors in pipeline
- for nextLedger := latestNetworkLedger; true; nextLedger++ {
- ledgerCloseMeta, err := captiveBackend.GetLedger(ctx, nextLedger)
- if err != nil {
- return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger)
- }
-
- payload, err := ledgerCloseMeta.MarshalBinary()
- if err != nil {
- return errors.Wrapf(err, "failed to encode ledger %d from xdr to binary", nextLedger)
- }
+ ledgerRange := ledgerbackend.UnboundedRange(latestNetworkLedger)
- log.Printf("Processing Ledger %v", nextLedger)
- for _, processor := range adapter.processors {
- if err := processor.Process(ctx, Message{Payload: payload}); err != nil {
- return errors.Wrapf(err, "failed to process ledger %d", nextLedger)
- }
- }
+ pubConfig := cdp.PublisherConfig{
+ DataStoreConfig: adapter.dataStoreConfig,
+ BufferedStorageConfig: cdp.DefaultBufferedStorageBackendConfig(adapter.dataStoreConfig.Schema.LedgersPerFile),
}
- return nil
+
+ fmt.Printf("beginning payments stream, starting at ledger %v ...\n", latestNetworkLedger)
+ return cdp.ApplyLedgerMetadata(ledgerRange, pubConfig, ctx,
+ func(lcm xdr.LedgerCloseMeta) error {
+ for _, processor := range adapter.processors {
+ if err = processor.Process(ctx, Message{Payload: lcm}); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
}
func main() {
+ // run a data pipeline that transforms Pubnet ledger metadata into payment events
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer stop()
- // create the inbound 'source of origin' adapter,
- // imports network data using this captive core config
- networkInboundAdapter := &CaptiveCoreInboundAdapter{
- historyArchiveURLs: network.TestNetworkhistoryArchiveURLs,
- coreConfigNetworkTemplate: ledgerbackend.TestnetDefaultConfig,
- TomlParams: ledgerbackend.CaptiveCoreTomlParams{
- NetworkPassphrase: network.TestNetworkPassphrase,
- HistoryArchiveURLs: network.TestNetworkhistoryArchiveURLs,
- UseDB: true,
- CoreBinaryPath: "stellar-core", // assumes you have installed stellar-core on your o/s PATH
- },
+ cfg, err := toml.LoadFile("config.toml")
+ if err != nil {
+ fmt.Printf("config.toml shoule be accessible in current directdory: %v\n", err)
+ return
+ }
+
+ datastoreConfig := datastore.DataStoreConfig{}
+ // Unmarshal TOML data into the Config struct
+ if err = cfg.Unmarshal(&datastoreConfig); err != nil {
+ fmt.Printf("error unmarshalling TOML config: %v\n", err)
+ return
+ }
+
+ // create the inbound source of pubnet ledger metadata
+ ledgerMetadataInboundAdapter := &LedgerMetadataInboundAdapter{
+ historyArchiveURLs: network.PublicNetworkhistoryArchiveURLs,
+ dataStoreConfig: datastoreConfig,
}
// create the app transformer to convert network data to application data model
- appTransformer := &AppPaymentTransformer{networkPassPhrase: network.TestNetworkPassphrase}
+ appTransformer := &AppPaymentTransformer{networkPassPhrase: network.PublicNetworkPassphrase}
// create the outbound adapter, this is the end point of the pipeline
// publishes application data model as messages to a broker
publisher, err := goczmq.NewPub("tcp://127.0.0.1:5555")
if err != nil {
- log.Printf("error creating 0MQ publisher: %v", err)
+ log.Printf("error creating 0MQ publisher: %v\n", err)
return
}
defer publisher.Destroy()
@@ -252,9 +235,30 @@ func main() {
// wire up the ingestion pipeline and let it run
appTransformer.Subscribe(outboundAdapter)
- networkInboundAdapter.Subscribe(appTransformer)
- log.Printf("Payment ingestion pipeline ended %v", networkInboundAdapter.Run(ctx))
+ ledgerMetadataInboundAdapter.Subscribe(appTransformer)
+ log.Printf("Payment ingestion pipeline ended %v\n", ledgerMetadataInboundAdapter.Run(ctx))
}
+
+```
+
+
+
+### `config.toml`
+
+The CDP configuration settings, this file defines the data storage which contains the pre-generated Ledger Metadata files. Google Cloud storage is used in this example.
+
+
+
+```
+type = "GCS"
+
+[params]
+destination_bucket_path = "my-gcs-bucketname/prefix1/prefix2"
+
+[schema]
+ledgers_per_file = 1
+files_per_partition = 64000
+
```
@@ -274,11 +278,11 @@ import json
context = zmq.Context()
socket = context.socket(zmq.SUB)
-print("Collecting next 10 payments from pipeline ...")
+print("Collecting payments from pipeline ...")
socket.connect("tcp://127.0.0.1:5555")
socket.subscribe("")
-for request in range(10):
+while True:
message = socket.recv()
json_object = json.loads(message)
diff --git a/docs/build/apps/ingest-sdk/overview.mdx b/docs/build/apps/ingest-sdk/overview.mdx
index bec22ec52..6e1df4831 100644
--- a/docs/build/apps/ingest-sdk/overview.mdx
+++ b/docs/build/apps/ingest-sdk/overview.mdx
@@ -10,7 +10,7 @@ This tutorial walks through how an application can leverage common streaming dat
- `github.com/stellar/go/amount` utility package to convert prices from network transaction operations to string
- `github.com/stellar/go/historyarchive` `github.com/stellar/go/support/datastore` `github.com/stellar/go/support/storage` utility package with convenient wrappers for accessing history archives, and avoid low-level http aspects
- `github.com/stellar/go/ingest` provides parsing functionality over the network ledger data, converts to more developer-centric `LedgerTransaction` model
-- `github.com/stellar/go/ingest/ledgerbackend` provides the captive core backend implementation
+- `github.com/stellar/go/cdp` provides the CDP ledger metadata producer function for streaming pipeline integration.
- `github.com/stellar/go/network` provides convenient pre-configured settings for Testnet and Mainnet networks
- `github.com/stellar/go/xdr` a complete Golang binding to the Stellar network data model
@@ -25,7 +25,9 @@ To build an example streaming network ingestion pipeline from live Stellar netwo
- A newly initialized, empty Go project folder. `mkdir pipeline; cd pipeline; go mod init example/pipeline`
- `stellar-core` must be [installed](../../../validators/admin-guide/installation.mdx) on your workstation and available on your o/s PATH
-The [Stellar network data model](../../../learn/fundamentals/stellar-data-structures/README.mdx) is defined in an IDL format expressed in [XDR encoding](https://github.com/stellar/stellar-xdr). Our example application is only interested in a small subset of the overall transaction data model related to buying and selling of assets, i.e. a payment, and defines its own data model internally:
+The [Stellar network data model](../../../learn/fundamentals/stellar-data-structures/README.mdx) is defined in an IDL format expressed in [XDR encoding](https://github.com/stellar/stellar-xdr).
+
+Our example application is only interested in a small subset of the overall transaction data model related to buying and selling of assets, i.e. a payment, and defines its own derived data model internally:
@@ -47,7 +49,7 @@ The ingestion pipeline will perform three distinct stream processor roles:
### Inbound Adapter
-Acts as the 'source of origin' for the pipeline. Retrieves [LedgerCloseMeta](https://github.com/stellar/go/blob/f30d11432e81c7a7cbb739a694520f729bbb31dd/xdr/xdr_generated.go#L18358) generated from a Stellar network using captive core. `LedgerCloseMeta` is the top-level aggregate in the Stellar data model of which all [Stellar network transaction data](../../../learn/fundamentals/stellar-data-structures/README.mdx) is nested within. Publishes the `LedgerCloseMeta` onto the pipeline.
+Acts as the 'source of origin' for the ledger metadata in a pipeline. Retrieves [Ledger Metadata](https://github.com/stellar/go/blob/f30d11432e81c7a7cbb739a694520f729bbb31dd/xdr/xdr_generated.go#L18358) which is the top-level aggregate structure in the Stellar data model of which all [Stellar network transaction data](../../../learn/fundamentals/stellar-data-structures/README.mdx) is nested within. The example leverages [CDP architecture](https://stellar.org/blog/developers/composable-data-platform), files are exported to a datastore containing the Ledger Metadata as it's captured from a Stellar network. The inbound adpater retrieves these files extracts the `LedgerCloseMeta` and publishes onto the message pipeline.
### Transformer