Skip to content

Commit

Permalink
stellar#889: update ingestion pipeline example to use cdp
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Oct 11, 2024
1 parent 44ea9ff commit 4054bc0
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 88 deletions.
174 changes: 89 additions & 85 deletions docs/build/apps/ingest-sdk/ingestion-pipeline-code.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ title: Ingestion Pipeline Sample Code
sidebar_position: 30
---

Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go [Ingestion SDK](overview.mdx#the-ingestion-sdk-packages) to publish derived data to a remote message broker. Demonstrate event-driven, distributed processing with a sample microservice (Python script) as subscriber.
Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go [Ingest SDK](overview.mdx#the-ingestion-sdk-packages) paired with new [CDP SDK](https://github.com/stellar/go/tree/master/ingest/cdp) to publish derived data to a remote message broker. Demonstrate event-driven, distributed processing with a sample microservice (Python script) as subscriber.

This example uses the ZeroMQ [goczmq](https://github.com/zeromq/goczmq) Go wrapper SDK, which requires a few o/s [dependent libraries to also be installed on the host machine](https://github.com/zeromq/goczmq?tab=readme-ov-file#dependencies).

Put these files in a directory, compile and run with `go build -o pipeline ./.; ./pipeline`
Steps:
#1 - Put these files in a directory
#2 - compile and run with `go mod tidy;go build -o pipeline ./.; ./pipeline`
#3 - in separate terminal, run `python distributed_payment_subsciber.py`, this will perform distributed pipeline topology, as it receives messages with payment info from the pipeline process and does additional processing(printing it to console).

### `go.mod`

Expand All @@ -21,8 +24,8 @@ go 1.22
toolchain go1.22.1
require (
github.com/stellar/go v0.0.0-20240614234001-3a31ed780c58
github.com/zeromq/goczmq v4.1.0+incompatible
github.com/stellar/go v0.0.0-20241008214914-7950d4254e6a
github.com/zeromq/goczmq v4.1.0+incompatible
)
```

Expand All @@ -33,20 +36,24 @@ require (
<CodeExample>

```go

package main

import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
"os/signal"

"github.com/pelletier/go-toml"
"github.com/pkg/errors"
"github.com/stellar/go/amount"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/cdp"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/datastore"
Expand All @@ -56,7 +63,7 @@ import (
"github.com/zeromq/goczmq"
)

// Application specifics
// Application payment model
type AppPayment struct {
Timestamp uint
BuyerAccountId string
Expand All @@ -65,9 +72,9 @@ type AppPayment struct {
Amount string
}

// General stream topology
// application data pipeline
type Message struct {
Payload []byte
Payload interface{}
}

type Processor interface {
Expand All @@ -84,7 +91,7 @@ type ZeroMQOutboundAdapter struct {
}

func (adapter *ZeroMQOutboundAdapter) Process(ctx context.Context, msg Message) error {
_, err := adapter.Publisher.Write(msg.Payload)
_, err := adapter.Publisher.Write(msg.Payload.([]byte))
return err
}

Expand All @@ -98,12 +105,7 @@ func (transformer *AppPaymentTransformer) Subscribe(receiver Processor) {
}

func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Message) error {
ledgerCloseMeta := xdr.LedgerCloseMeta{}
err := ledgerCloseMeta.UnmarshalBinary(msg.Payload)
if err != nil {
return errors.Wrapf(err, "failed to unmarshal message payload to LedgerCloseMeta")
}

ledgerCloseMeta := msg.Payload.(xdr.LedgerCloseMeta)
ledgerTxReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(transformer.networkPassPhrase, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to create reader for ledger %v", ledgerCloseMeta.LedgerSequence())
Expand Down Expand Up @@ -142,119 +144,121 @@ func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Messa
return nil
}

type CaptiveCoreInboundAdapter struct {
TomlParams ledgerbackend.CaptiveCoreTomlParams
type LedgerMetadataInboundAdapter struct {
processors []Processor
historyArchiveURLs []string
coreConfigNetworkTemplate []byte
dataStoreConfig datastore.DataStoreConfig
}

func (adapter *CaptiveCoreInboundAdapter) Subscribe(receiver Processor) {
func (adapter *LedgerMetadataInboundAdapter) Subscribe(receiver Processor) {
adapter.processors = append(adapter.processors, receiver)
}

func (adapter *CaptiveCoreInboundAdapter) Run(ctx context.Context) error {
// Setup captive core config to use the Pubnet network
captiveCoreToml, err := ledgerbackend.NewCaptiveCoreTomlFromData(adapter.coreConfigNetworkTemplate, adapter.TomlParams)
if err != nil {
return errors.Wrap(err, "error creating captive core toml")
}

captiveConfig := ledgerbackend.CaptiveCoreConfig{
BinaryPath: adapter.TomlParams.CoreBinaryPath,
HistoryArchiveURLs: adapter.TomlParams.HistoryArchiveURLs,
Context: ctx,
Toml: captiveCoreToml,
}

// Create a new captive core backend, the origin of ingestion pipeline
captiveBackend, err := ledgerbackend.NewCaptive(captiveConfig)
if err != nil {
return errors.Wrap(err, "error creating captive core instance")
}
func (adapter *LedgerMetadataInboundAdapter) Run(ctx context.Context) error {

// Create a client to the network's history archives
historyAchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
/////////////////////////////////////////////////////////////////
// TODO - make this easier for callers, move latest ledger fetch into cdp.ApplyLedgerMetadata()
// so caller doesn't have to this to start streaming from latest use case
// i.e. allow unbounded range with from=0 which can signal the same.
historyArchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
UserAgent: "my_app",
UserAgent: "payment_demo",
Context: ctx,
},
})

if err != nil {
return errors.Wrap(err, "error creating history archive client")
}
// Acquire the most recent ledger on network to begin streaming from
latestNetworkLedger, err := historyArchive.GetLatestLedgerSequence()
/////////////////////////////////////////////////////////////////////

// Acquire the most recent ledger on network
latestNetworkLedger, err := datastore.GetLatestLedgerSequenceFromHistoryArchives(historyAchive)
if err != nil {
return errors.Wrap(err, "error getting latest ledger")
}

// Tell the captive core instance to emit LedgerCloseMeta starting at
// latest network ledger and continuing indefintely, streaming.
if err := captiveBackend.PrepareRange(ctx, ledgerbackend.UnboundedRange(latestNetworkLedger)); err != nil {
return errors.Wrap(err, "error preparing captive core ledger range")
}

// Run endless loop that receives LedgerCloseMeta from captive core for each new
// ledger generated by the network and pushes it to next processors in pipeline
for nextLedger := latestNetworkLedger; true; nextLedger++ {
ledgerCloseMeta, err := captiveBackend.GetLedger(ctx, nextLedger)
if err != nil {
return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger)
}

payload, err := ledgerCloseMeta.MarshalBinary()
if err != nil {
return errors.Wrapf(err, "failed to encode ledger %d from xdr to binary", nextLedger)
}
ledgerRange := ledgerbackend.UnboundedRange(latestNetworkLedger)

log.Printf("Processing Ledger %v", nextLedger)
for _, processor := range adapter.processors {
if err := processor.Process(ctx, Message{Payload: payload}); err != nil {
return errors.Wrapf(err, "failed to process ledger %d", nextLedger)
}
}
pubConfig := cdp.PublisherConfig{
DataStoreConfig: adapter.dataStoreConfig,
BufferedStorageConfig: cdp.DefaultBufferedStorageBackendConfig(adapter.dataStoreConfig.Schema.LedgersPerFile),
}
return nil

fmt.Printf("beginning payments stream, starting at ledger %v ...\n", latestNetworkLedger)
return cdp.ApplyLedgerMetadata(ledgerRange, pubConfig, ctx,
func(lcm xdr.LedgerCloseMeta) error {
for _, processor := range adapter.processors {
if err = processor.Process(ctx, Message{Payload: lcm}); err != nil {
return err
}
}
return nil
})
}

func main() {
// run a data pipeline that transforms Pubnet ledger metadata into payment events
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer stop()

// create the inbound 'source of origin' adapter,
// imports network data using this captive core config
networkInboundAdapter := &CaptiveCoreInboundAdapter{
historyArchiveURLs: network.TestNetworkhistoryArchiveURLs,
coreConfigNetworkTemplate: ledgerbackend.TestnetDefaultConfig,
TomlParams: ledgerbackend.CaptiveCoreTomlParams{
NetworkPassphrase: network.TestNetworkPassphrase,
HistoryArchiveURLs: network.TestNetworkhistoryArchiveURLs,
UseDB: true,
CoreBinaryPath: "stellar-core", // assumes you have installed stellar-core on your o/s PATH
},
cfg, err := toml.LoadFile("config.toml")
if err != nil {
fmt.Printf("config.toml shoule be accessible in current directdory: %v\n", err)
return
}

datastoreConfig := datastore.DataStoreConfig{}
// Unmarshal TOML data into the Config struct
if err = cfg.Unmarshal(&datastoreConfig); err != nil {
fmt.Printf("error unmarshalling TOML config: %v\n", err)
return
}

// create the inbound source of pubnet ledger metadata
ledgerMetadataInboundAdapter := &LedgerMetadataInboundAdapter{
historyArchiveURLs: network.PublicNetworkhistoryArchiveURLs,
dataStoreConfig: datastoreConfig,
}

// create the app transformer to convert network data to application data model
appTransformer := &AppPaymentTransformer{networkPassPhrase: network.TestNetworkPassphrase}
appTransformer := &AppPaymentTransformer{networkPassPhrase: network.PublicNetworkPassphrase}

// create the outbound adapter, this is the end point of the pipeline
// publishes application data model as messages to a broker
publisher, err := goczmq.NewPub("tcp://127.0.0.1:5555")
if err != nil {
log.Printf("error creating 0MQ publisher: %v", err)
log.Printf("error creating 0MQ publisher: %v\n", err)
return
}
defer publisher.Destroy()
outboundAdapter := &ZeroMQOutboundAdapter{Publisher: publisher}

// wire up the ingestion pipeline and let it run
appTransformer.Subscribe(outboundAdapter)
networkInboundAdapter.Subscribe(appTransformer)
log.Printf("Payment ingestion pipeline ended %v", networkInboundAdapter.Run(ctx))
ledgerMetadataInboundAdapter.Subscribe(appTransformer)
log.Printf("Payment ingestion pipeline ended %v\n", ledgerMetadataInboundAdapter.Run(ctx))
}

```

</CodeExample>

### `config.toml`

The CDP configuration settings, this file defines the data storage which contains the pre-generated Ledger Metadata files. Google Cloud storage is used in this example.

<CodeExample>

```
type = "GCS"
[params]
destination_bucket_path = "my-gcs-bucketname/prefix1/prefix2"
[schema]
ledgers_per_file = 1
files_per_partition = 64000
```

</CodeExample>
Expand All @@ -274,11 +278,11 @@ import json
context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting next 10 payments from pipeline ...")
print("Collecting payments from pipeline ...")
socket.connect("tcp://127.0.0.1:5555")
socket.subscribe("")

for request in range(10):
while True:

message = socket.recv()
json_object = json.loads(message)
Expand Down
8 changes: 5 additions & 3 deletions docs/build/apps/ingest-sdk/overview.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ This tutorial walks through how an application can leverage common streaming dat
- `github.com/stellar/go/amount` utility package to convert prices from network transaction operations to string
- `github.com/stellar/go/historyarchive` `github.com/stellar/go/support/datastore` `github.com/stellar/go/support/storage` utility package with convenient wrappers for accessing history archives, and avoid low-level http aspects
- `github.com/stellar/go/ingest` provides parsing functionality over the network ledger data, converts to more developer-centric `LedgerTransaction` model
- `github.com/stellar/go/ingest/ledgerbackend` provides the captive core backend implementation
- `github.com/stellar/go/cdp` provides the CDP ledger metadata producer function for streaming pipeline integration.
- `github.com/stellar/go/network` provides convenient pre-configured settings for Testnet and Mainnet networks
- `github.com/stellar/go/xdr` a complete Golang binding to the Stellar network data model

Expand All @@ -25,7 +25,9 @@ To build an example streaming network ingestion pipeline from live Stellar netwo
- A newly initialized, empty Go project folder. `mkdir pipeline; cd pipeline; go mod init example/pipeline`
- `stellar-core` must be [installed](../../../validators/admin-guide/installation.mdx) on your workstation and available on your o/s PATH

The [Stellar network data model](../../../learn/fundamentals/stellar-data-structures/README.mdx) is defined in an IDL format expressed in [XDR encoding](https://github.com/stellar/stellar-xdr). Our example application is only interested in a small subset of the overall transaction data model related to buying and selling of assets, i.e. a payment, and defines its own data model internally:
The [Stellar network data model](../../../learn/fundamentals/stellar-data-structures/README.mdx) is defined in an IDL format expressed in [XDR encoding](https://github.com/stellar/stellar-xdr).

Our example application is only interested in a small subset of the overall transaction data model related to buying and selling of assets, i.e. a payment, and defines its own derived data model internally:

<CodeExample>

Expand All @@ -47,7 +49,7 @@ The ingestion pipeline will perform three distinct stream processor roles:

### Inbound Adapter

Acts as the 'source of origin' for the pipeline. Retrieves [LedgerCloseMeta](https://github.com/stellar/go/blob/f30d11432e81c7a7cbb739a694520f729bbb31dd/xdr/xdr_generated.go#L18358) generated from a Stellar network using captive core. `LedgerCloseMeta` is the top-level aggregate in the Stellar data model of which all [Stellar network transaction data](../../../learn/fundamentals/stellar-data-structures/README.mdx) is nested within. Publishes the `LedgerCloseMeta` onto the pipeline.
Acts as the 'source of origin' for the ledger metadata in a pipeline. Retrieves [Ledger Metadata](https://github.com/stellar/go/blob/f30d11432e81c7a7cbb739a694520f729bbb31dd/xdr/xdr_generated.go#L18358) which is the top-level aggregate structure in the Stellar data model of which all [Stellar network transaction data](../../../learn/fundamentals/stellar-data-structures/README.mdx) is nested within. The example leverages [CDP architecture](https://stellar.org/blog/developers/composable-data-platform), files are exported to a datastore containing the Ledger Metadata as it's captured from a Stellar network. The inbound adpater retrieves these files extracts the `LedgerCloseMeta` and publishes onto the message pipeline.

### Transformer

Expand Down

0 comments on commit 4054bc0

Please sign in to comment.