Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update ingestion pipeline example to use cdp #1036

Merged
merged 3 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 91 additions & 86 deletions docs/build/apps/ingest-sdk/ingestion-pipeline-code.mdx
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
---
title: Ingestion Pipeline Sample Code
title: CDP Consumer Pipeline Sample Code
sidebar_position: 30
---

Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go [Ingestion SDK](overview.mdx#the-ingestion-sdk-packages) to publish derived data to a remote message broker. Demonstrate event-driven, distributed processing with a sample microservice (Python script) as subscriber.
Complete code for a small sample of CDP consumer pipeline of Stellar network ledger metadata using the Stellar Go [Ingest SDK](overview.mdx#the-ingestion-sdk-packages) paired with new [CDP SDK](https://github.com/stellar/go/tree/master/ingest/cdp) to demonstrate data pipeline from ledger metadata to derived data model with event-driven, distributed processing to sample microservice (Python script) as subscriber.

This example uses the ZeroMQ [goczmq](https://github.com/zeromq/goczmq) Go wrapper SDK, which requires a few o/s [dependent libraries to also be installed on the host machine](https://github.com/zeromq/goczmq?tab=readme-ov-file#dependencies).

Put these files in a directory, compile and run with `go build -o pipeline ./.; ./pipeline`
Steps:
#1 - Put these files in a directory
#2 - compile and run with `go mod tidy;go build -o pipeline ./.; ./pipeline`
#3 - in separate terminal, run `python distributed_payment_subsciber.py`, this will perform distributed pipeline topology, as it receives messages with payment info from the pipeline process and does additional processing(printing it to console).

### `go.mod`

Expand All @@ -21,8 +24,8 @@ go 1.22
toolchain go1.22.1

require (
github.com/stellar/go v0.0.0-20240614234001-3a31ed780c58
github.com/zeromq/goczmq v4.1.0+incompatible
github.com/stellar/go v0.0.0-20241008214914-7950d4254e6a
github.com/zeromq/goczmq v4.1.0+incompatible
)
```

Expand All @@ -33,20 +36,24 @@ require (
<CodeExample>

```go

package main

import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
"os/signal"

"github.com/pelletier/go-toml"
"github.com/pkg/errors"
"github.com/stellar/go/amount"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/cdp"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/datastore"
Expand All @@ -56,7 +63,7 @@ import (
"github.com/zeromq/goczmq"
)

// Application specifics
// Application payment model
type AppPayment struct {
Timestamp uint
BuyerAccountId string
Expand All @@ -65,9 +72,9 @@ type AppPayment struct {
Amount string
}

// General stream topology
// application data pipeline
type Message struct {
Payload []byte
Payload interface{}
}

type Processor interface {
Expand All @@ -84,7 +91,7 @@ type ZeroMQOutboundAdapter struct {
}

func (adapter *ZeroMQOutboundAdapter) Process(ctx context.Context, msg Message) error {
_, err := adapter.Publisher.Write(msg.Payload)
_, err := adapter.Publisher.Write(msg.Payload.([]byte))
return err
}

Expand All @@ -98,12 +105,7 @@ func (transformer *AppPaymentTransformer) Subscribe(receiver Processor) {
}

func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Message) error {
ledgerCloseMeta := xdr.LedgerCloseMeta{}
err := ledgerCloseMeta.UnmarshalBinary(msg.Payload)
if err != nil {
return errors.Wrapf(err, "failed to unmarshal message payload to LedgerCloseMeta")
}

ledgerCloseMeta := msg.Payload.(xdr.LedgerCloseMeta)
ledgerTxReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(transformer.networkPassPhrase, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to create reader for ledger %v", ledgerCloseMeta.LedgerSequence())
Expand Down Expand Up @@ -142,119 +144,122 @@ func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Messa
return nil
}

type CaptiveCoreInboundAdapter struct {
TomlParams ledgerbackend.CaptiveCoreTomlParams
type LedgerMetadataInboundAdapter struct {
processors []Processor
historyArchiveURLs []string
coreConfigNetworkTemplate []byte
dataStoreConfig datastore.DataStoreConfig
}

func (adapter *CaptiveCoreInboundAdapter) Subscribe(receiver Processor) {
func (adapter *LedgerMetadataInboundAdapter) Subscribe(receiver Processor) {
adapter.processors = append(adapter.processors, receiver)
}

func (adapter *CaptiveCoreInboundAdapter) Run(ctx context.Context) error {
// Setup captive core config to use the Pubnet network
captiveCoreToml, err := ledgerbackend.NewCaptiveCoreTomlFromData(adapter.coreConfigNetworkTemplate, adapter.TomlParams)
if err != nil {
return errors.Wrap(err, "error creating captive core toml")
}

captiveConfig := ledgerbackend.CaptiveCoreConfig{
BinaryPath: adapter.TomlParams.CoreBinaryPath,
HistoryArchiveURLs: adapter.TomlParams.HistoryArchiveURLs,
Context: ctx,
Toml: captiveCoreToml,
}

// Create a new captive core backend, the origin of ingestion pipeline
captiveBackend, err := ledgerbackend.NewCaptive(captiveConfig)
if err != nil {
return errors.Wrap(err, "error creating captive core instance")
}
func (adapter *LedgerMetadataInboundAdapter) Run(ctx context.Context) error {

// Create a client to the network's history archives
historyAchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
/////////////////////////////////////////////////////////////////
// Note, https://github.com/stellar/go/issues/5495 will deprecate this
// need for this type of manual retrieval of latest ledger when
// use case is to initiate streaming from latest point.
// It proposes new mechanism to trigger ApplyLedgerMetadata() to
// perform automatic resolution of 'latest ledger' instead.
historyArchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
UserAgent: "my_app",
UserAgent: "payment_demo",
Context: ctx,
},
})

if err != nil {
return errors.Wrap(err, "error creating history archive client")
}
latestNetworkLedger, err := historyArchive.GetLatestLedgerSequence()
/////////////////////////////////////////////////////////////////////

// Acquire the most recent ledger on network
latestNetworkLedger, err := datastore.GetLatestLedgerSequenceFromHistoryArchives(historyAchive)
if err != nil {
return errors.Wrap(err, "error getting latest ledger")
}

// Tell the captive core instance to emit LedgerCloseMeta starting at
// latest network ledger and continuing indefintely, streaming.
if err := captiveBackend.PrepareRange(ctx, ledgerbackend.UnboundedRange(latestNetworkLedger)); err != nil {
return errors.Wrap(err, "error preparing captive core ledger range")
}

// Run endless loop that receives LedgerCloseMeta from captive core for each new
// ledger generated by the network and pushes it to next processors in pipeline
for nextLedger := latestNetworkLedger; true; nextLedger++ {
ledgerCloseMeta, err := captiveBackend.GetLedger(ctx, nextLedger)
if err != nil {
return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger)
}

payload, err := ledgerCloseMeta.MarshalBinary()
if err != nil {
return errors.Wrapf(err, "failed to encode ledger %d from xdr to binary", nextLedger)
}
ledgerRange := ledgerbackend.UnboundedRange(latestNetworkLedger)

log.Printf("Processing Ledger %v", nextLedger)
for _, processor := range adapter.processors {
if err := processor.Process(ctx, Message{Payload: payload}); err != nil {
return errors.Wrapf(err, "failed to process ledger %d", nextLedger)
}
}
pubConfig := cdp.PublisherConfig{
DataStoreConfig: adapter.dataStoreConfig,
BufferedStorageConfig: cdp.DefaultBufferedStorageBackendConfig(adapter.dataStoreConfig.Schema.LedgersPerFile),
}
return nil

fmt.Printf("beginning payments stream, starting at ledger %v ...\n", latestNetworkLedger)
return cdp.ApplyLedgerMetadata(ledgerRange, pubConfig, ctx,
func(lcm xdr.LedgerCloseMeta) error {
for _, processor := range adapter.processors {
if err = processor.Process(ctx, Message{Payload: lcm}); err != nil {
return err
}
}
return nil
})
}

func main() {
// run a data pipeline that transforms Pubnet ledger metadata into payment events
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer stop()

// create the inbound 'source of origin' adapter,
// imports network data using this captive core config
networkInboundAdapter := &CaptiveCoreInboundAdapter{
historyArchiveURLs: network.TestNetworkhistoryArchiveURLs,
coreConfigNetworkTemplate: ledgerbackend.TestnetDefaultConfig,
TomlParams: ledgerbackend.CaptiveCoreTomlParams{
NetworkPassphrase: network.TestNetworkPassphrase,
HistoryArchiveURLs: network.TestNetworkhistoryArchiveURLs,
UseDB: true,
CoreBinaryPath: "stellar-core", // assumes you have installed stellar-core on your o/s PATH
},
cfg, err := toml.LoadFile("config.toml")
if err != nil {
fmt.Printf("config.toml shoule be accessible in current directdory: %v\n", err)
return
}

datastoreConfig := datastore.DataStoreConfig{}
// Unmarshal TOML data into the Config struct
if err = cfg.Unmarshal(&datastoreConfig); err != nil {
fmt.Printf("error unmarshalling TOML config: %v\n", err)
return
}

// create the inbound source of pubnet ledger metadata
ledgerMetadataInboundAdapter := &LedgerMetadataInboundAdapter{
historyArchiveURLs: network.PublicNetworkhistoryArchiveURLs,
dataStoreConfig: datastoreConfig,
}

// create the app transformer to convert network data to application data model
appTransformer := &AppPaymentTransformer{networkPassPhrase: network.TestNetworkPassphrase}
appTransformer := &AppPaymentTransformer{networkPassPhrase: network.PublicNetworkPassphrase}

// create the outbound adapter, this is the end point of the pipeline
// publishes application data model as messages to a broker
publisher, err := goczmq.NewPub("tcp://127.0.0.1:5555")
if err != nil {
log.Printf("error creating 0MQ publisher: %v", err)
log.Printf("error creating 0MQ publisher: %v\n", err)
return
}
defer publisher.Destroy()
outboundAdapter := &ZeroMQOutboundAdapter{Publisher: publisher}

// wire up the ingestion pipeline and let it run
appTransformer.Subscribe(outboundAdapter)
networkInboundAdapter.Subscribe(appTransformer)
log.Printf("Payment ingestion pipeline ended %v", networkInboundAdapter.Run(ctx))
ledgerMetadataInboundAdapter.Subscribe(appTransformer)
log.Printf("Payment ingestion pipeline ended %v\n", ledgerMetadataInboundAdapter.Run(ctx))
}

```

</CodeExample>

### `config.toml`

The CDP configuration settings, this file defines the data storage which contains the pre-generated Ledger Metadata files. Google Cloud storage is used in this example.

<CodeExample>

```
type = "GCS"

[params]
destination_bucket_path = "my-gcs-bucketname/prefix1/prefix2"

[schema]
ledgers_per_file = 1
files_per_partition = 64000

```

</CodeExample>
Expand All @@ -274,11 +279,11 @@ import json
context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting next 10 payments from pipeline ...")
print("Collecting payments from pipeline ...")
socket.connect("tcp://127.0.0.1:5555")
socket.subscribe("")

for request in range(10):
while True:

message = socket.recv()
json_object = json.loads(message)
Expand Down
Loading