Skip to content

Commit

Permalink
stellar#889: update ingestion example to demonsterate all cdp
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Oct 29, 2024
1 parent 4054bc0 commit 80fa6a9
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 17 deletions.
4 changes: 2 additions & 2 deletions docs/build/apps/ingest-sdk/ingestion-pipeline-code.mdx
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
title: Ingestion Pipeline Sample Code
title: CDP Consumer Pipeline Sample Code
sidebar_position: 30
---

Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go [Ingest SDK](overview.mdx#the-ingestion-sdk-packages) paired with new [CDP SDK](https://github.com/stellar/go/tree/master/ingest/cdp) to publish derived data to a remote message broker. Demonstrate event-driven, distributed processing with a sample microservice (Python script) as subscriber.
Complete code for a small sample of CDP consumer pipeline of Stellar network ledger metadata using the Stellar Go [Ingest SDK](overview.mdx#the-ingestion-sdk-packages) paired with new [CDP SDK](https://github.com/stellar/go/tree/master/ingest/cdp) to demonstrate data pipeline from ledger metadata to derived data model with event-driven, distributed processing to sample microservice (Python script) as subscriber.

This example uses the ZeroMQ [goczmq](https://github.com/zeromq/goczmq) Go wrapper SDK, which requires a few o/s [dependent libraries to also be installed on the host machine](https://github.com/zeromq/goczmq?tab=readme-ov-file#dependencies).

Expand Down
58 changes: 43 additions & 15 deletions docs/build/apps/ingest-sdk/overview.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ title: Overview
sidebar_position: 10
---

This tutorial walks through how an application can leverage common streaming data patterns to ingest Stellar network transaction data using a few select packages from the Stellar Go Repo [github.com/stellar/go](https://github.com/stellar/go/blob/master/) collectively known as the 'Ingestion' SDK:
This tutorial walks through how an application can leverage [CDP architecture](https://stellar.org/blog/developers/composable-data-platform) to create fast, lightweight Stellar Ledger Metada data pipelines using a few select packages from the Stellar Go Repo [github.com/stellar/go](https://github.com/stellar/go/blob/master/) collectively known as the 'Ingestion' SDK:

## The Ingestion SDK packages

- `github.com/stellar/go/amount` utility package to convert prices from network transaction operations to string
- `github.com/stellar/go/historyarchive` `github.com/stellar/go/support/datastore` `github.com/stellar/go/support/storage` utility package with convenient wrappers for accessing history archives, and avoid low-level http aspects
- `github.com/stellar/go/ingest` provides parsing functionality over the network ledger data, converts to more developer-centric `LedgerTransaction` model
- `github.com/stellar/go/ingest` provides parsing functionality over the network ledger metadata, converts to more developer-centric `LedgerTransaction` model
- `github.com/stellar/go/cdp` provides the CDP ledger metadata producer function for streaming pipeline integration.
- `github.com/stellar/go/network` provides convenient pre-configured settings for Testnet and Mainnet networks
- `github.com/stellar/go/xdr` a complete Golang binding to the Stellar network data model
Expand All @@ -18,16 +18,18 @@ This tutorial walks through how an application can leverage common streaming dat

### Project requirements

To build an example streaming network ingestion pipeline from live Stellar network transaction data, you'll need:
To use this example CDP pipeline for live Stellar network transaction data, you'll need:

- A developer workstation with [Go](https://go.dev/learn/) programming language runtime installed
- An IDE to edit Go code, [VSCode](https://code.visualstudio.com/download) is good if one is needed
- A newly initialized, empty Go project folder. `mkdir pipeline; cd pipeline; go mod init example/pipeline`
- `stellar-core` must be [installed](../../../validators/admin-guide/installation.mdx) on your workstation and available on your o/s PATH
- Some familiarity to the [Stellar Ledger Metadata model](../../../learn/fundamentals/stellar-data-structures/README.mdx). It is defined in an IDL format expressed in [XDR encoding](https://github.com/stellar/stellar-xdr).
- Docker
- Google Cloud Platform account:
- a bucket created in Google Cloud Storage(GCS)
- GCP [credentials in workstation environment](https://github.com/stellar/go/blob/master/services/galexie/README.md#set-up-gcp-credentials)

The [Stellar network data model](../../../learn/fundamentals/stellar-data-structures/README.mdx) is defined in an IDL format expressed in [XDR encoding](https://github.com/stellar/stellar-xdr).

Our example application is only interested in a small subset of the overall transaction data model related to buying and selling of assets, i.e. a payment, and defines its own derived data model internally:
Our example application is only interested in a small subset of the overall network data model related to asset transfers triggered by Payment operation and defines its own derived data model as the goal of exercise:

<CodeExample>

Expand All @@ -43,22 +45,48 @@ Our example application is only interested in a small subset of the overall tran

</CodeExample>

The example application will run a [network ingestion pipeline](https://github.com/stellar/go/blob/master/ingest/doc.go) to derive a smaller `ApplicationPayment` model from the [Stellar network transaction data model](../../../learn/fundamentals/stellar-data-structures/README.mdx) as 'source of origin' and thus enable the application to avoid large compute resources that would have been required for maintaining storage of the full Stellar network data model.
The example application will perform both of CDP pipelines. A minimum of two pipelines are required for a complete end to end CDP architecture.

![](/assets/cdp_pipelines.png)

### Ledger Metadata Export Pipeline

This pipeline needs to be initiated first, it is responsible for exporting Stellar Ledger Metadata as files to a [CDP Datastore](https://github.com/stellar/go/blob/master/support/datastore/datastore.go#L17).

#### Determine the Datastore

The Datastore in CDP is an interface, allowing for multiple implementations which represent different physical storage layers that can be 'plugged in' to export and consumer pipelines. Stellar provides the [GCS Datastore] as the first Datastore implementation, and this example chooses to use this existing implementation.

There will be open source contributions for implementations on other storage layers to choose from as CDP grows. If you can't find an implementation for a storage layer you would like to use, it is also possible to develop your own [ Datastore](https://github.com/stellar/go/blob/master/support/datastore/datastore.go#L17) implementation, which is beyond scope of this example, as it entails a separate learning exercise of its own, coming soon!

#### Exporting network metadata to Datastore

Use Galexie, a new CDP command line program for exporting network metadata to datastores.

- Follow the Galexie setup steps in [Galexie User Guide](https://github.com/stellar/go/blob/master/services/galexie/README.md#setup), to configure specifics of GCS bucket and target network.

- Follow the [Galexie docker runtime instructions](https://github.com/stellar/go/blob/master/services/galexie/README.md#running-galexie) to start the export.
- For one time export of historical bounded range of ledgers, use `append --start <from_ledger> --end <to_ledger>`
- For a continuous export of prior ledgers and all new ledgers generated on network, use `append --start <from_ledger>`.

### Ledger Metadata Consumer Pipeline

A consumer pipeline retrieves files from the GCS bucket and uses them as the origin of Ledger Metadata in a data processing pipeline. There can be many separate consumer pipelines all accessing the same Datastore at stame time. Each consumer pipeline will typically perform three distinct stream processor roles:

The ingestion pipeline will perform three distinct stream processor roles:
#### Inbound Adapter

### Inbound Adapter
The 'source of origin' for the ledger metadata in a pipeline. This processor retrieves [Ledger Metadata](https://github.com/stellar/go/blob/f30d11432e81c7a7cbb739a694520f729bbb31dd/xdr/xdr_generated.go#L18358) files from the GCS Datastore, extracts the `LedgerCloseMeta` for each Ledger and publishes it onto the messaging pipeline.

Acts as the 'source of origin' for the ledger metadata in a pipeline. Retrieves [Ledger Metadata](https://github.com/stellar/go/blob/f30d11432e81c7a7cbb739a694520f729bbb31dd/xdr/xdr_generated.go#L18358) which is the top-level aggregate structure in the Stellar data model of which all [Stellar network transaction data](../../../learn/fundamentals/stellar-data-structures/README.mdx) is nested within. The example leverages [CDP architecture](https://stellar.org/blog/developers/composable-data-platform), files are exported to a datastore containing the Ledger Metadata as it's captured from a Stellar network. The inbound adpater retrieves these files extracts the `LedgerCloseMeta` and publishes onto the message pipeline.
The go sdk provides consumer helper function [ApplyLedgerMetadata](https://github.com/stellar/go/blob/master/ingest/cdp/producer.go#L89) for automated, performant, buffered retrieval of files from the remote datastore, application code can leverage this to acquire pure `LedgerCloseMeta` data from a callback function.

### Transformer
#### Transformer

Subscribes to receive `LedgerCloseMeta` from the pipeline. Uses the Go SDK package [github.com/stellar/go/xdr](https://github.com/stellar/go/tree/master/xdr) to parse the nested network data model for payment operations and convert those into a new instance of application data model `ApplicationPayment` instances. Publishes `ApplicationPayment` to the pipeline.
Subscribes on the pipeline to receive `LedgerCloseMeta`. Uses the Go SDK package [github.com/stellar/go/xdr](https://github.com/stellar/go/tree/master/xdr) to parse the ledger meta data model for payment operations and convert those into a new instance of application data model `AppPayment` instances. Publishes `AppPayment` to the pipeline.

### Outbound Adapter
#### Outbound Adapter

Acts as the termination of the pipeline, it subscribes to receive `ApplicationPayment` and publishes the data off the pipeline and to an external data store, a ZeroMQ Publisher Socket, which is essentially a message broker.

### Summary

Refer to [Ingestion Pipeline Sample Application](./ingestion-pipeline-code.mdx) for complete code demonstrating usage of the 'ingestion' SDK packages to create these adapters and transformers and run a live pipeline against the Stellar network.
Refer to [Ingestion Pipeline Sample Application](./ingestion-pipeline-code.mdx) for complete consumer code example, demonstrating a live, streaming pipeline against the Stellar network, processing each new ledger's metadata as it is closed on the network.
Binary file added static/assets/cdp_pipelines.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 80fa6a9

Please sign in to comment.