diff --git a/.github/workflows/horizon-master.yml b/.github/workflows/horizon-master.yml deleted file mode 100644 index e2487a0d64..0000000000 --- a/.github/workflows/horizon-master.yml +++ /dev/null @@ -1,28 +0,0 @@ -name: Horizon master - -on: - push: - branches: [master] - -jobs: - - push-state-diff-image: - name: Push stellar/ledger-state-diff:{sha,latest} to DockerHub - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v3 - - - name: Login to DockerHub - uses: docker/login-action@bb984efc561711aaa26e433c32c3521176eae55b - with: - username: ${{ secrets.DOCKERHUB_USERNAME }} - password: ${{ secrets.DOCKERHUB_TOKEN }} - - - name: Build and push to DockerHub - uses: docker/build-push-action@7f9d37fa544684fb73bfe4835ed7214c255ce02b - with: - push: true - tags: stellar/ledger-state-diff:${{ github.sha }},stellar/ledger-state-diff:latest - file: exp/tools/dump-ledger-state/Dockerfile - build-args: GITCOMMIT=${{ github.sha }} - no-cache: true diff --git a/.github/workflows/horizon.yml b/.github/workflows/horizon.yml index f5e77c6c6b..117e11bb6f 100644 --- a/.github/workflows/horizon.yml +++ b/.github/workflows/horizon.yml @@ -148,32 +148,3 @@ jobs: - if: github.ref == 'refs/heads/master' name: Push to DockerHub run: docker push stellar/horizon-verify-range:latest - - ledger-exporter: - name: Test and push the Ledger Exporter images - runs-on: ubuntu-latest - env: - STELLAR_CORE_VERSION: 21.0.0-1872.c6f474133.focal - LEDGEREXPORTER_INTEGRATION_TESTS_ENABLED: "true" - steps: - - uses: actions/checkout@v3 - with: - # For pull requests, build and test the PR head not a merge of the PR with the destination. - ref: ${{ github.event.pull_request.head.sha || github.ref }} - - name: Build Ledger Exporter docker - run: make -C exp/services/ledgerexporter docker-build - - - name: Run Ledger Exporter test - run: go test -v -race ./exp/services/ledgerexporter/... - - # Push images - - if: github.ref == 'refs/heads/master' - name: Login to DockerHub - uses: docker/login-action@bb984efc561711aaa26e433c32c3521176eae55b - with: - username: ${{ secrets.DOCKERHUB_USERNAME }} - password: ${{ secrets.DOCKERHUB_TOKEN }} - - - if: github.ref == 'refs/heads/master' - name: Push to DockerHub - run: make -C exp/services/ledgerexporter docker-push diff --git a/.github/workflows/ledgerexporter-release.yml b/.github/workflows/ledgerexporter-release.yml new file mode 100644 index 0000000000..e25ea92382 --- /dev/null +++ b/.github/workflows/ledgerexporter-release.yml @@ -0,0 +1,49 @@ +name: Ledger Exporter release + +on: + push: + tags: ['ledgerexporter-v*'] + +jobs: + + publish-docker: + name: Test and push the Ledger Exporter images + runs-on: ubuntu-latest + env: + LEDGEREXPORTER_INTEGRATION_TESTS_ENABLED: "true" + LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core + LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE: stellar/quickstart:testing + STELLAR_CORE_VERSION: 21.1.0-1921.b3aeb14cc.focal + VERSION: ${GITHUB_REF_NAME#ledgerexporter-v} + steps: + - uses: actions/checkout@v3 + with: + ref: github.sha + - name: Install captive core + run: | + # Workaround for https://github.com/actions/virtual-environments/issues/5245, + # libc++1-8 won't be installed if another version is installed (but apt won't give you a helpul + # message about why the installation fails) + sudo apt list --installed | grep libc++ + sudo apt-get remove -y libc++1-* libc++abi1-* || true + + sudo wget -qO - https://apt.stellar.org/SDF.asc | APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=true sudo apt-key add - + sudo bash -c 'echo "deb https://apt.stellar.org focal unstable" > /etc/apt/sources.list.d/SDF-unstable.list' + sudo apt-get update && sudo apt-get install -y stellar-core="$STELLAR_CORE_VERSION" + echo "Using stellar core version $(stellar-core version)" + + - name: Run Ledger Exporter test + run: go test -v -race -run TestLedgerExporterTestSuite ./exp/services/ledgerexporter/... + + - name: Build Ledger Exporter docker + run: make -C exp/services/ledgerexporter docker-build + + # Push images + - name: Login to DockerHub + uses: docker/login-action@bb984efc561711aaa26e433c32c3521176eae55b + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Push to DockerHub + run: make -C exp/services/ledgerexporter docker-push diff --git a/.github/workflows/ledgerexporter.yml b/.github/workflows/ledgerexporter.yml new file mode 100644 index 0000000000..1c25a5c8b3 --- /dev/null +++ b/.github/workflows/ledgerexporter.yml @@ -0,0 +1,36 @@ +name: LedgerExporter + +on: + push: + branches: [master] + pull_request: + +jobs: + ledger-exporter: + name: Test Ledger Exporter + runs-on: ubuntu-latest + env: + CAPTIVE_CORE_DEBIAN_PKG_VERSION: 21.1.0-1921.b3aeb14cc.focal + LEDGEREXPORTER_INTEGRATION_TESTS_ENABLED: "true" + LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core + LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE: stellar/quickstart:testing + steps: + - name: Install captive core + run: | + # Workaround for https://github.com/actions/virtual-environments/issues/5245, + # libc++1-8 won't be installed if another version is installed (but apt won't give you a helpul + # message about why the installation fails) + sudo apt list --installed | grep libc++ + sudo apt-get remove -y libc++1-* libc++abi1-* || true + + sudo wget -qO - https://apt.stellar.org/SDF.asc | APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=true sudo apt-key add - + sudo bash -c 'echo "deb https://apt.stellar.org focal unstable" > /etc/apt/sources.list.d/SDF-unstable.list' + sudo apt-get update && sudo apt-get install -y stellar-core="$CAPTIVE_CORE_DEBIAN_PKG_VERSION" + echo "Using stellar core version $(stellar-core version)" + + - uses: actions/checkout@v3 + with: + # For pull requests, build and test the PR head not a merge of the PR with the destination. + ref: ${{ github.event.pull_request.head.sha || github.ref }} + - name: Run Ledger Exporter test + run: go test -v -race -run TestLedgerExporterTestSuite ./exp/services/ledgerexporter/... diff --git a/exp/services/ledgerexporter/DEVELOPER_GUIDE.md b/exp/services/ledgerexporter/DEVELOPER_GUIDE.md new file mode 100644 index 0000000000..ef81553e37 --- /dev/null +++ b/exp/services/ledgerexporter/DEVELOPER_GUIDE.md @@ -0,0 +1,65 @@ + +# Ledger Exporter Developer Guide +The ledger exporter is a tool to export Stellar network transaction data to cloud storage in a way that is easy to access. + +## Prerequisites +This document assumes that you have installed and can run the ledger exporter, and that you have familiarity with its CLI and configuration. If not, please refer to the [Installation Guide](./README.md). + +## Goal +The goal of the ledger exporter is to build an easy-to-use tool to export Stellar network ledger data to a configurable remote data store, such as cloud blob storage. + - Use cloud storage optimally + - Minimize network usage to export + - Make it easy and fast to search for a specific ledger or ledger range + +## Architecture +To achieve its goals, the ledger exporter uses the following architecture, which consists of the 3 main components: +- Captive-core to extract raw transaction metadata from the Stellar Network. +- Export manager to bundles and organizes the ledgers to get them ready for export. +- The cloud storage plugin writes to the cloud storage. This is specific to the type of cloud storage, GCS in this case. + + +![ledgerexporter-architecture](./architecture.png) + + +## Data Format +- Ledger exporter uses a compact and efficient data format called [XDR](https://developers.stellar.org/docs/learn/encyclopedia/data-format/xdr) (External Data Representation), which is a compact binary format. A Stellar Captive Core instance emits data in this format and the data structure is referred to as `LedgerCloseMeta`. The exporter bundles multiple `LedgerCloseMeta`'s into a single object using a custom XDR structure called `LedgerCloseMetaBatch` which is defined in [Stellar-exporter.x](https://github.com/stellar/go/blob/master/xdr/Stellar-exporter.x). + +- The metadata for the same batch is also stored alongside each exported object. Supported metadata is defined in [metadata.go](https://github.com/stellar/go/blob/master/support/datastore/metadata.go). + +- Objects are compressed before uploading using the [zstd](http://facebook.github.io/zstd/) (zstandard) compression algorithm to optimize network usage and storage needs. + +## Data Storage +- An example implementation of `DataStore` for GCS, Google Cloud Storage. This plugin is located in the [support](https://github.com/stellar/go/tree/master/support/datastore) package. +- The ledger exporter currently implements the interface only for Google Cloud Storage (GCS). The [GCS plugin](https://github.com/stellar/go/blob/master/support/datastore/gcs_datastore.go) uses GCS-specific behaviors like conditional puts, automatic retry, metadata, and CRC checksum. + +## Build, Run and Test using Docker +The Dockerfile contains all the necessary dependencies (e.g., Stellar-core) required to run the ledger exporter. + +- Build: To build the Docker container, use the provided [Makefile](./Makefile). Simply run make `make docker-build` to build a new container after making any changes. + +- Run: For instructions on running the Docker container, refer to the [Installation Guide](./README.md). + +- Test: To test the Docker container, refer to the [docker-test](./Makefile) command for an example of how to use the [GCS emulator](https://github.com/fsouza/fake-gcs-server) for local testing. + +## Adding support for a new storage type +Support for different data storage types are encapsulated as 'plugins', which are implementation of `DataStore` interface in a go package. To add a data storage plugin based on a new storage type (e.g. AWS S3), follow these steps: + +- A data storage plugin must implement the [DataStore](https://github.com/stellar/go/blob/master/support/datastore/datastore.go) interface. +- Add support for new datastore-specific features. Implement any datastore-specific custom logic. Different datastores have different ways of handling + - race conditions + - automatic retries + - metadata storage, etc. +- Add the new datastore to the factory function [NewDataStore](https://github.com/stellar/go/blob/master/support/datastore/datastore.go). +- Add a [config](./config.example.toml) section for the new storage type. This may include configurations like destination, authentication information etc. +- An emulator such as a GCS emulator [fake-gcs-server](https://github.com/fsouza/fake-gcs-server) can be used for testing without connecting to real cloud storage. + +### Design DOs and DONTs +- Multiple exporters should be able to run in parallel without the need for explicit locking or synchronization. +- Exporters when restarted do not have any memory of prior operation and rely on the already exported data as much as possible to decide where to resume. + +## Using exported data +The exported data in storage can be used in the ETL pipeline to gather analytics and reporting. To write a tool that consumes exported data you can use Stellar ingestion library's `ledgerbackend` package. This package includes a ledger backend called [BufferedStorageBackend](https://github.com/stellar/go/blob/master/ingest/ledgerbackend/buffered_storage_backend.go), +which imports data from the storage and validates it. For more details, refer to the ledgerbackend [documentation](https://github.com/stellar/go/tree/master/ingest/ledgerbackend). + +## Contributing +For information on how to contribute, please refer to our [Contribution Guidelines](https://github.com/stellar/go/blob/master/CONTRIBUTING.md). diff --git a/exp/services/ledgerexporter/Makefile b/exp/services/ledgerexporter/Makefile index 3890a56fcf..b8b45aff89 100644 --- a/exp/services/ledgerexporter/Makefile +++ b/exp/services/ledgerexporter/Makefile @@ -2,7 +2,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # https://github.com/opencontainers/image-spec/blob/master/annotations.md BUILD_DATE := $(shell date -u +%FT%TZ) -VERSION ?= 1.0.0-$(shell git rev-parse --short HEAD) +VERSION ?= $(shell git rev-parse --short HEAD) DOCKER_IMAGE := stellar/ledger-exporter docker-build: diff --git a/exp/services/ledgerexporter/README.md b/exp/services/ledgerexporter/README.md index 2299b0f45f..97f6eb3aa7 100644 --- a/exp/services/ledgerexporter/README.md +++ b/exp/services/ledgerexporter/README.md @@ -1,103 +1,132 @@ -# Ledger Exporter (Work in Progress) +## Ledger Exporter: Installation and Usage Guide -The Ledger Exporter is a tool designed to export ledger data from a Stellar network and upload it to a specified destination. It supports both bounded and unbounded modes, allowing users to export a specific range of ledgers or continuously export new ledgers as they arrive on the network. +This guide provides step-by-step instructions on installing and using the Ledger Exporter, a tool that exports Stellar network ledger data to a Google Cloud Storage (GCS) bucket for efficient analysis and storage. -Ledger Exporter currently uses captive-core as the ledger backend and GCS as the destination data store. +* [Prerequisites](#prerequisites) +* [Setup](#setup) + * [Set Up GCP Credentials](#set-up-gcp-credentials) + * [Create a GCS Bucket for Storage](#create-a-gcs-bucket-for-storage) +* [Running the Ledger Exporter](#running-the-ledger-exporter) + * [Pull the Docker Image](#1-pull-the-docker-image) + * [Configure the Exporter](#2-configure-the-exporter-configtoml) + * [Run the Exporter](#3-run-the-exporter) +* [Command Line Interface (CLI)](#command-line-interface-cli) + 1. [scan-and-fill: Fill Data Gaps](#1-scan-and-fill-fill-data-gaps) + 2. [append: Continuously Export New Data](#2-append-continuously-export-new-data) -# Exported Data Format -The tool allows for the export of multiple ledgers in a single exported file. The exported data is in XDR format and is compressed using zstd before being uploaded. +## Prerequisites -```go -type LedgerCloseMetaBatch struct { - StartSequence uint32 - EndSequence uint32 - LedgerCloseMetas []LedgerCloseMeta -} -``` +* **Google Cloud Platform (GCP) Account:** You will need a GCP account to create a GCS bucket for storing the exported data. +* **Docker:** Allows you to run the Ledger Exporter in a self-contained environment. The official Docker installation guide: [https://docs.docker.com/engine/install/](https://docs.docker.com/engine/install/) + +## Setup + +### Set Up GCP Credentials + +Create application default credentials for your Google Cloud Platform (GCP) project by following these steps: +1. Download the [SDK](https://cloud.google.com/sdk/docs/install). +2. Install and initialize the [gcloud CLI](https://cloud.google.com/sdk/docs/initializing). +3. Create [application authentication credentials](https://cloud.google.com/docs/authentication/provide-credentials-adc#google-idp) and store it in a secure location on your system, such as $HOME/.config/gcloud/application_default_credentials.json. + +For detailed instructions, refer to the [Providing Credentials for Application Default Credentials (ADC) guide.](https://cloud.google.com/docs/authentication/provide-credentials-adc) + +### Create a GCS Bucket for Storage -## Getting Started +1. Go to the GCP Console's Storage section ([https://console.cloud.google.com/storage](https://console.cloud.google.com/storage)) and create a new bucket. +2. Choose a descriptive name for the bucket, such as `stellar-ledger-data`. Refer to [Google Cloud Storage Bucket Naming Guideline](https://cloud.google.com/storage/docs/buckets#naming) for more information. +3. **Note down the bucket name** as you'll need it later in the configuration process. -### Installation (coming soon) -### Command Line Options +## Running the Ledger Exporter + +### 1. Pull the Docker Image + +Open a terminal window and download the Stellar Ledger Exporter Docker image using the following command: -#### Scan and Fill Mode: -Exports a specific range of ledgers, defined by --start and --end. Will only export to remote datastore if data is absent. ```bash -ledgerexporter scan-and-fill --start --end --config-file +docker pull stellar/ledger-exporter ``` -#### Append Mode: -Exports ledgers initially searching from --start, looking for the next absent ledger sequence number proceeding --start on the data store. If abscence is detected, the export range is narrowed to `--start `. -This feature requires ledgers to be present on the remote data store for some (possibly empty) prefix of the requested range and then absent for the (possibly empty) remainder. +### 2. Configure the Exporter (config.toml) +The Ledger Exporter relies on a configuration file (config.toml) to connect to your specific environment. This file defines details like: +- Your Google Cloud Storage (GCS) bucket where exported ledger data will be stored. +- Stellar network settings, such as the network you're using (testnet or pubnet). +- Datastore schema to control data organization. -In this mode, the --end ledger can be provided to stop the process once export has reached that ledger, or if absent or 0 it will result in continous exporting of new ledgers emitted from the network. +A sample configuration file [config.example.toml](config.example.toml) is provided. Copy and rename it to config.toml for customization. Edit the copied file (config.toml) to replace placeholders with your specific details. + +### 3. Run the Exporter + +The following command demonstrates how to run the Ledger Exporter: - It’s guaranteed that ledgers exported during `append` mode from `start` and up to the last logged ledger file `Uploaded {ledger file name}` were contiguous, meaning all ledgers within that range were exported to the data lake with no gaps or missing ledgers in between. ```bash -ledgerexporter append --start --config-file +docker run --platform linux/amd64 \ + -v "$HOME/.config/gcloud/application_default_credentials.json":/.config/gcp/credentials.json:ro \ + -e GOOGLE_APPLICATION_CREDENTIALS=/.config/gcp/credentials.json \ + -v ${PWD}/config.toml:/config.toml \ + stellar/ledger-exporter [options] ``` -### Configuration (toml): -The `stellar_core_config` supports two ways for configuring captive core: - - use prebuilt captive core config toml, archive urls, and passphrase based on `stellar_core_config.network = testnet|pubnet`. - - manually set the the captive core confg by supplying these core parameters which will override any defaults when `stellar_core_config.network` is present also: - `stellar_core_config.captive_core_toml_path` - `stellar_core_config.history_archive_urls` - `stellar_core_config.network_passphrase` +**Explanation:** -Ensure you have stellar-core installed and set `stellar_core_config.stellar_core_binary_path` to it's path on o/s. +* `--platform linux/amd64`: Specifies the platform architecture (adjust if needed for your system). +* `-v`: Mounts volumes to map your local GCP credentials and config.toml file to the container: + * `$HOME/.config/gcloud/application_default_credentials.json`: Your local GCP credentials file. + * `${PWD}/config.toml`: Your local configuration file. +* `-e GOOGLE_APPLICATION_CREDENTIALS=/.config/gcp/credentials.json`: Sets the environment variable for credentials within the container. +* `stellar/ledger-exporter`: The Docker image name. +* ``: The Stellar Ledger Exporter command: [append](#1-append-continuously-export-new-data), [scan-and-fill](#2-scan-and-fill-fill-data-gaps)) -Enable web service that will be bound to localhost post and publishes metrics by including `admin_port = {port}` +## Command Line Interface (CLI) -An example config, demonstrating preconfigured captive core settings and gcs data store config. -```toml -admin_port = 6061 +The Ledger Exporter offers two mode of operation for exporting ledger data: -[datastore_config] -type = "GCS" +### 1. append: Continuously Export New Data -[datastore_config.params] -destination_bucket_path = "your-bucket-name///" -[datastore_config.schema] -ledgers_per_file = 64 -files_per_partition = 10 +Exports ledgers initially searching from --start, looking for the next absent ledger sequence number proceeding --start on the data store. If abscence is detected, the export range is narrowed to `--start `. +This feature requires ledgers to be present on the remote data store for some (possibly empty) prefix of the requested range and then absent for the (possibly empty) remainder. -[stellar_core_config] - network = "testnet" - stellar_core_binary_path = "/my/path/to/stellar-core" - captive_core_toml_path = "my-captive-core.cfg" - history_archive_urls = ["http://testarchiveurl1", "http://testarchiveurl2"] - network_passphrase = "test" -``` +In this mode, the --end ledger can be provided to stop the process once export has reached that ledger, or if absent or 0 it will result in continous exporting of new ledgers emitted from the network. -### Exported Files +It’s guaranteed that ledgers exported during `append` mode from `start` and up to the last logged ledger file `Uploaded {ledger file name}` were contiguous, meaning all ledgers within that range were exported to the data lake with no gaps or missing ledgers in between. -#### File Organization: -- Ledgers are grouped into files, with the number of ledgers per file set by `ledgers_per_file`. -- Files are further organized into partitions, with the number of files per partition set by `files_per_partition`. -### Filename Structure: -- Filenames indicate the ledger range they contain, e.g., `0-63.xdr.zstd` holds ledgers 0 to 63. -- Partition directories group files, e.g., `/0-639/` holds files for ledgers 0 to 639. +**Usage:** -#### Example: -with `ledgers_per_file = 64` and `files_per_partition = 10`: -- Partition names: `/0-639`, `/640-1279`, ... -- Filenames: `/0-639/0-63.xdr.zstd`, `/0-639/64-127.xdr.zstd`, ... +```bash +docker run --platform linux/amd64 -d \ + -v "$HOME/.config/gcloud/application_default_credentials.json":/.config/gcp/credentials.json:ro \ + -e GOOGLE_APPLICATION_CREDENTIALS=/.config/gcp/credentials.json \ + -v ${PWD}/config.toml:/config.toml \ + stellar/ledger-exporter \ + append --start [--end ] [--config-file ] +``` + +Arguments: +- `--start ` (required): The starting ledger sequence number for the export process. +- `--end ` (optional): The ending ledger sequence number. If omitted or set to 0, the exporter will continuously export new ledgers as they appear on the network. +- `--config-file ` (optional): The path to your configuration file, containing details like GCS bucket information. If not provided, the exporter will look for config.toml in the directory where you run the command. + +### 2. scan-and-fill: Fill Data Gaps -#### Special Cases: +Scans the datastore (GCS bucket) for the specified ledger range and exports any missing ledgers to the datastore. This mode avoids unnecessary exports if the data is already present. The range is specified using the --start and --end options. -- If `ledgers_per_file` is set to 1, filenames will only contain the ledger number. -- If `files_per_partition` is set to 1, filenames will not contain the partition. +**Usage:** -#### Note: -- Avoid changing `ledgers_per_file` and `files_per_partition` after configuration for consistency. +```bash +docker run --platform linux/amd64 -d \ + -v "$HOME/.config/gcloud/application_default_credentials.json":/.config/gcp/credentials.json:ro \ + -e GOOGLE_APPLICATION_CREDENTIALS=/.config/gcp/credentials.json \ + -v ${PWD}/config.toml:/config.toml \ + stellar/ledger-exporter \ + scan-and-fill --start --end [--config-file ] +``` -#### Retrieving Data: -- To locate a specific ledger sequence, calculate the partition name and ledger file name using `files_per_partition` and `ledgers_per_file`. -- The `GetObjectKeyFromSequenceNumber` function automates this calculation. +Arguments: +- `--start ` (required): The starting ledger sequence number in the range to export. +- `--end ` (required): The ending ledger sequence number in the range. +- `--config-file ` (optional): The path to your configuration file, containing details like GCS bucket information. If not provided, the exporter will look for config.toml in the directory where you run the command. #### Running Integration Tests: from top directory of stellar/go repo, run go test to launch ledger exporter integration diff --git a/exp/services/ledgerexporter/architecture.png b/exp/services/ledgerexporter/architecture.png new file mode 100644 index 0000000000..85bd6d8b31 Binary files /dev/null and b/exp/services/ledgerexporter/architecture.png differ diff --git a/exp/services/ledgerexporter/config.example.toml b/exp/services/ledgerexporter/config.example.toml new file mode 100644 index 0000000000..db72169311 --- /dev/null +++ b/exp/services/ledgerexporter/config.example.toml @@ -0,0 +1,42 @@ + +# Sample TOML Configuration + +# Admin port configuration +# Specifies the port number for hosting the web service locally to publish metrics. +admin_port = 6061 + +# Datastore Configuration +[datastore_config] +# Specifies the type of datastore. Currently, only Google Cloud Storage (GCS) is supported. +type = "GCS" + +[datastore_config.params] +# The Google Cloud Storage bucket path for storing data, with optional subpaths for organization. +destination_bucket_path = "your-bucket-name///" + +[datastore_config.schema] +# Configuration for data organization +ledgers_per_file = 64 # Number of ledgers stored in each file. +files_per_partition = 10 # Number of files per partition/directory. + +# Stellar-core Configuration +[stellar_core_config] +# Use default captive-core config based on network +# Options are "testnet" for the test network or "pubnet" for the public network. +network = "testnet" + +# Alternatively, you can manually configure captive-core parameters (overrides defaults if 'network' is set). + +# Path to the captive-core configuration file. +#captive_core_config_path = "my-captive-core.cfg" + +# URLs for Stellar history archives, with multiple URLs allowed. +#history_archive_urls = ["http://testarchiveurl1", "http://testarchiveurl2"] + +# Network passphrase for the Stellar network. +#network_passphrase = "Test SDF Network ; September 2015" + +# Path to stellar-core binary +# Not required when running in a Docker container as it has the stellar-core installed and path is set. +# When running outside of Docker, it will look for stellar-core in the OS path if it exists. +#stellar_core_binary_path = "/my/path/to/stellar-core diff --git a/exp/services/ledgerexporter/config.toml b/exp/services/ledgerexporter/config.toml deleted file mode 100644 index c41d9376ac..0000000000 --- a/exp/services/ledgerexporter/config.toml +++ /dev/null @@ -1,14 +0,0 @@ -[datastore_config] -type = "GCS" - -[datastore_config.params] -destination_bucket_path = "exporter-test/ledgers/testnet" - -[datastore_config.schema] -ledgers_per_file = 1 -files_per_partition = 64000 - -[stellar_core_config] - network = "testnet" - stellar_core_binary_path = "/usr/local/bin/stellar-core" - diff --git a/exp/services/ledgerexporter/internal/config.go b/exp/services/ledgerexporter/internal/config.go index 9b0225549e..754fe5512b 100644 --- a/exp/services/ledgerexporter/internal/config.go +++ b/exp/services/ledgerexporter/internal/config.go @@ -124,8 +124,8 @@ func (config *Config) ValidateAndSetLedgerRange(ctx context.Context, archive his return errors.New("invalid end value, must be greater than start") } - latestNetworkLedger, err := datastore.GetLatestLedgerSequenceFromHistoryArchives(archive) - latestNetworkLedger = latestNetworkLedger + (datastore.GetHistoryArchivesCheckPointFrequency() * 2) + latestNetworkLedger, err := archive.GetLatestLedgerSequence() + latestNetworkLedger = latestNetworkLedger + (archive.GetCheckpointManager().GetCheckpointFrequency() * 2) if err != nil { return errors.Wrap(err, "Failed to retrieve the latest ledger sequence from history archives.") @@ -189,7 +189,7 @@ func (config *Config) GenerateCaptiveCoreConfig(coreBinFromPath string) (ledgerb BinaryPath: config.StellarCoreConfig.StellarCoreBinaryPath, NetworkPassphrase: params.NetworkPassphrase, HistoryArchiveURLs: params.HistoryArchiveURLs, - CheckpointFrequency: datastore.GetHistoryArchivesCheckPointFrequency(), + CheckpointFrequency: historyarchive.DefaultCheckpointFrequency, Log: logger.WithField("subservice", "stellar-core"), Toml: captiveCoreToml, UserAgent: "ledger-exporter", diff --git a/exp/services/ledgerexporter/internal/config_test.go b/exp/services/ledgerexporter/internal/config_test.go index f782de5ea4..d1c24cb198 100644 --- a/exp/services/ledgerexporter/internal/config_test.go +++ b/exp/services/ledgerexporter/internal/config_test.go @@ -5,19 +5,20 @@ import ( "fmt" "testing" + "github.com/stellar/go/historyarchive" "github.com/stellar/go/network" - "github.com/stellar/go/support/datastore" "github.com/stretchr/testify/require" - - "github.com/stellar/go/historyarchive" ) func TestNewConfig(t *testing.T) { ctx := context.Background() mockArchive := &historyarchive.MockArchive{} - mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: 5}, nil).Once() + mockArchive.On("GetLatestLedgerSequence").Return(uint32(5), nil).Once() + mockArchive.On("GetCheckpointManager"). + Return(historyarchive.NewCheckpointManager( + historyarchive.DefaultCheckpointFrequency)).Once() config, err := NewConfig( RuntimeSettings{StartLedger: 2, EndLedger: 3, ConfigFilePath: "test/test.toml", Mode: Append}, nil) @@ -198,7 +199,7 @@ func TestInvalidCaptiveCoreTomlPath(t *testing.T) { func TestValidateStartAndEndLedger(t *testing.T) { latestNetworkLedger := uint32(20000) - latestNetworkLedgerPadding := datastore.GetHistoryArchivesCheckPointFrequency() * 2 + latestNetworkLedgerPadding := historyarchive.DefaultCheckpointFrequency * 2 tests := []struct { name string @@ -282,7 +283,10 @@ func TestValidateStartAndEndLedger(t *testing.T) { ctx := context.Background() mockArchive := &historyarchive.MockArchive{} - mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: latestNetworkLedger}, nil) + mockArchive.On("GetLatestLedgerSequence").Return(latestNetworkLedger, nil) + mockArchive.On("GetCheckpointManager"). + Return(historyarchive.NewCheckpointManager( + historyarchive.DefaultCheckpointFrequency)) mockedHasCtr := 0 for _, tt := range tests { @@ -302,7 +306,7 @@ func TestValidateStartAndEndLedger(t *testing.T) { } }) } - mockArchive.AssertNumberOfCalls(t, "GetRootHAS", mockedHasCtr) + mockArchive.AssertExpectations(t) } func TestAdjustedLedgerRangeBoundedMode(t *testing.T) { @@ -358,7 +362,10 @@ func TestAdjustedLedgerRangeBoundedMode(t *testing.T) { ctx := context.Background() mockArchive := &historyarchive.MockArchive{} - mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: 500}, nil).Times(len(tests)) + mockArchive.On("GetLatestLedgerSequence").Return(uint32(500), nil) + mockArchive.On("GetCheckpointManager"). + Return(historyarchive.NewCheckpointManager( + historyarchive.DefaultCheckpointFrequency)) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -421,7 +428,10 @@ func TestAdjustedLedgerRangeUnBoundedMode(t *testing.T) { ctx := context.Background() mockArchive := &historyarchive.MockArchive{} - mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: 500}, nil).Times(len(tests)) + mockArchive.On("GetLatestLedgerSequence").Return(uint32(500), nil) + mockArchive.On("GetCheckpointManager"). + Return(historyarchive.NewCheckpointManager( + historyarchive.DefaultCheckpointFrequency)) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/exp/tools/dump-ledger-state/Dockerfile b/exp/tools/dump-ledger-state/Dockerfile deleted file mode 100644 index 5ffcb9c0a2..0000000000 --- a/exp/tools/dump-ledger-state/Dockerfile +++ /dev/null @@ -1,45 +0,0 @@ -FROM ubuntu:22.04 - -ENV STELLAR_CORE_VERSION=21.0.0-1872.c6f474133.focal -ENV DEBIAN_FRONTEND=noninteractive - -RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl wget gnupg apt-utils -RUN wget -qO - https://apt.stellar.org/SDF.asc | APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=true apt-key add - -RUN echo "deb https://apt.stellar.org focal stable" >/etc/apt/sources.list.d/SDF.list -# RUN echo "deb https://apt.stellar.org focal unstable" >/etc/apt/sources.list.d/SDF-unstable.list -RUN apt-get update -y - -RUN apt-get install -y stellar-core=${STELLAR_CORE_VERSION} jq -RUN apt-get clean -RUN echo "deb http://apt.postgresql.org/pub/repos/apt/ $(env -i bash -c '. /etc/os-release; echo $VERSION_CODENAME')-pgdg main" | tee /etc/apt/sources.list.d/pgdg.list && \ - wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - && \ - apt-get update && \ - DEBIAN_FRONTEND="noninteractive" apt-get install -y postgresql-9.6 postgresql-contrib-9.6 postgresql-client-9.6 - -# Create a PostgreSQL role named `circleci` and then create a database `core` owned by the `circleci` role. -RUN su - postgres -c "/etc/init.d/postgresql start && psql --command \"CREATE USER circleci WITH SUPERUSER;\" && createdb -O circleci core" - -# Adjust PostgreSQL configuration so that remote connections to the -# database are possible. -RUN echo "host all all all trust" > /etc/postgresql/9.6/main/pg_hba.conf - -# And add `listen_addresses` to `/etc/postgresql/9.6/main/postgresql.conf` -RUN echo "listen_addresses='*'" >> /etc/postgresql/9.6/main/postgresql.conf - -COPY --from=golang:1.22-bullseye /usr/local/go/ /usr/local/go/ -RUN ln -s /usr/local/go/bin/go /usr/local/bin/go -WORKDIR /go/src/github.com/stellar/go -COPY go.mod go.sum ./ -RUN go mod download -COPY . ./ - -ENV PGPORT=5432 -ENV PGUSER=circleci -ENV PGHOST=localhost - -WORKDIR /go/src/github.com/stellar/go/exp/tools/dump-ledger-state - -ARG GITCOMMIT -ENV GITCOMMIT=${GITCOMMIT} - -ENTRYPOINT ["./docker-entrypoint.sh"] diff --git a/exp/tools/dump-ledger-state/README.md b/exp/tools/dump-ledger-state/README.md deleted file mode 100644 index 17376bd17d..0000000000 --- a/exp/tools/dump-ledger-state/README.md +++ /dev/null @@ -1,14 +0,0 @@ -# dump-ledger-state - -This tool dumps the state from history archive buckets to 4 separate files: -* accounts.csv -* accountdata.csv -* offers.csv -* trustlines.csv -* claimablebalances.csv - -It's primary use is to test `SingleLedgerStateReader`. To run the test (`run_test.sh`) it: -1. Runs `dump-ledger-state`. -2. Syncs stellar-core to the same checkpoint: `stellar-core catchup [ledger]/1`. -3. Dumps stellar-core DB by using `dump_core_db.sh` script. -4. Diffs results by using `diff_test.sh` script. diff --git a/exp/tools/dump-ledger-state/diff_test.sh b/exp/tools/dump-ledger-state/diff_test.sh deleted file mode 100755 index 69295b2a82..0000000000 --- a/exp/tools/dump-ledger-state/diff_test.sh +++ /dev/null @@ -1,36 +0,0 @@ -ENTRIES=(accounts accountdata offers trustlines claimablebalances pools) - -echo "Sorting dump-ledger-state output files..." -for i in "${ENTRIES[@]}" -do - if test -f "${i}_sorted.csv"; then - echo "Skipping, ${i}_sorted.csv exists (remove if out of date to sort again)" - continue - fi - wc -l ${i}.csv - sort -S 500M -o ${i}_sorted.csv ${i}.csv -done - -echo "Sorting stellar-core output files..." -for i in "${ENTRIES[@]}" -do - if test -f "${i}_core_sorted.csv"; then - echo "Skipping, ${i}_core_sorted.csv exists (remove if out of date to sort again)" - continue - fi - wc -l ${i}_core.csv - sort -S 500M -o ${i}_core_sorted.csv ${i}_core.csv -done - -echo "Checking diffs..." -for type in "${ENTRIES[@]}" -do - diff -q ${type}_core_sorted.csv ${type}_sorted.csv - if [ "$?" -ne "0" ] - then - echo "ERROR: $type does NOT match"; - exit -1 - else - echo "$type OK"; - fi -done diff --git a/exp/tools/dump-ledger-state/docker-entrypoint.sh b/exp/tools/dump-ledger-state/docker-entrypoint.sh deleted file mode 100755 index f1451c2ad5..0000000000 --- a/exp/tools/dump-ledger-state/docker-entrypoint.sh +++ /dev/null @@ -1,39 +0,0 @@ -#! /bin/bash -set -e - -/etc/init.d/postgresql start - -while ! psql -U circleci -d core -h localhost -p 5432 -c 'select 1' >/dev/null 2>&1; do - echo "Waiting for postgres to be available..." - sleep 1 -done - -echo "using version $(stellar-core version)" - -if [ -z ${TESTNET+x} ]; then - stellar-core --conf ./stellar-core.cfg new-db -else - stellar-core --conf ./stellar-core-testnet.cfg new-db -fi - -if [ -z ${LATEST_LEDGER+x} ]; then - # Get latest ledger - echo "Getting latest checkpoint ledger..." - if [ -z ${TESTNET+x} ]; then - export LATEST_LEDGER=`curl -s http://history.stellar.org/prd/core-live/core_live_001/.well-known/stellar-history.json | jq -r '.currentLedger'` - else - export LATEST_LEDGER=`curl -s http://history.stellar.org/prd/core-testnet/core_testnet_001/.well-known/stellar-history.json | jq -r '.currentLedger'` - fi -fi - -if [[ -z "${LATEST_LEDGER}" ]]; then - echo "could not obtain latest ledger" - exit 1 -fi - -echo "Latest ledger: $LATEST_LEDGER" - -if ! ./run_test.sh; then - echo "ingestion dump (git commit \`$GITCOMMIT\`) of ledger \`$LATEST_LEDGER\` does not match stellar core db." - exit 1 -fi \ No newline at end of file diff --git a/exp/tools/dump-ledger-state/dump_core_db.sh b/exp/tools/dump-ledger-state/dump_core_db.sh deleted file mode 100755 index ebd8871a47..0000000000 --- a/exp/tools/dump-ledger-state/dump_core_db.sh +++ /dev/null @@ -1,27 +0,0 @@ -# Get state from stellar-core DB, colums match CSV printer -# FETCH_COUNT is there for circleci to use cursor-based method of getting rows (less RAM usage): -# https://dba.stackexchange.com/a/101510 - -echo "Fetching accounts from stellar-core DB..." -psql -d core -t -A -F"," --variable="FETCH_COUNT=10000" -c "select accountid, balance, seqnum, numsubentries, inflationdest, homedomain, thresholds, flags, COALESCE(extension, 'AAAAAA=='), signers, ledgerext from accounts" > accounts_core.csv -rm accounts_core_sorted.csv || true # Remove if exist in case original files are rebuilt - -echo "Fetching accountdata from stellar-core DB..." -psql -d core -t -A -F"," --variable="FETCH_COUNT=10000" -c "select accountid, dataname, datavalue, COALESCE(extension, 'AAAAAA=='), ledgerext from accountdata" > accountdata_core.csv -rm accountdata_core_sorted.csv || true # Remove if exist in case original files are rebuilt - -echo "Fetching offers from stellar-core DB..." -psql -d core -t -A -F"," --variable="FETCH_COUNT=10000" -c "select sellerid, offerid, sellingasset, buyingasset, amount, pricen, priced, flags, COALESCE(extension, 'AAAAAA=='), ledgerext from offers" > offers_core.csv -rm offers_core_sorted.csv || true # Remove if exist in case original files are rebuilt - -echo "Fetching trustlines from stellar-core DB..." -psql -d core -t -A -F"," --variable="FETCH_COUNT=10000" -c "select ledgerentry from trustlines" > trustlines_core.csv -rm trustlines_core_sorted.csv || true # Remove if exist in case original files are rebuilt - -echo "Fetching claimable balances from stellar-core DB..." -psql -d core -t -A -F"," --variable="FETCH_COUNT=10000" -c "select balanceid, ledgerentry from claimablebalance" > claimablebalances_core.csv -rm claimablebalances_core_sorted.csv || true # Remove if exist in case original files are rebuilt - -echo "Fetching liquidity pools from stellar-core DB..." -psql -d core -t -A -F"," --variable="FETCH_COUNT=10000" -c "select ledgerentry from liquiditypool" > pools_core.csv -rm pools_core_sorted.csv || true # Remove if exist in case original files are rebuilt \ No newline at end of file diff --git a/exp/tools/dump-ledger-state/main.go b/exp/tools/dump-ledger-state/main.go deleted file mode 100644 index 26f59348a7..0000000000 --- a/exp/tools/dump-ledger-state/main.go +++ /dev/null @@ -1,366 +0,0 @@ -package main - -import ( - "context" - "encoding/base64" - "encoding/csv" - "flag" - "io" - "os" - "runtime" - "strconv" - "time" - - "github.com/stellar/go/historyarchive" - "github.com/stellar/go/ingest" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/support/log" - "github.com/stellar/go/support/storage" - "github.com/stellar/go/xdr" -) - -// csvMap maintains a mapping from ledger entry type to csv file -type csvMap struct { - files map[xdr.LedgerEntryType]*os.File - writers map[xdr.LedgerEntryType]*csv.Writer -} - -// newCSVMap constructs an empty csvMap instance -func newCSVMap() csvMap { - return csvMap{ - files: map[xdr.LedgerEntryType]*os.File{}, - writers: map[xdr.LedgerEntryType]*csv.Writer{}, - } -} - -// put creates a new file with the given file name and links that file to the -// given ledger entry type -func (c csvMap) put(entryType xdr.LedgerEntryType, fileName string) error { - if _, ok := c.files[entryType]; ok { - return errors.Errorf("entry type %s is already present in the file set", fileName) - } - - file, err := os.Create(fileName) - if err != nil { - return errors.Wrapf(err, "could not open file %s", fileName) - } - - c.files[entryType] = file - c.writers[entryType] = csv.NewWriter(file) - - return nil -} - -// get returns a csv writer for the given ledger entry type if it exists in the mapping -func (c csvMap) get(entryType xdr.LedgerEntryType) (*csv.Writer, bool) { - writer, ok := c.writers[entryType] - return writer, ok -} - -// close will close all files contained in the mapping -func (c csvMap) close() { - for entryType, file := range c.files { - if err := file.Close(); err != nil { - log.WithField("type", entryType.String()).Warn("could not close csv file") - } - delete(c.files, entryType) - delete(c.writers, entryType) - } -} - -type csvProcessor struct { - files csvMap - changeStats *ingest.StatsChangeProcessor -} - -func (processor csvProcessor) ProcessChange(change ingest.Change) error { - csvWriter, ok := processor.files.get(change.Type) - if !ok { - return nil - } - if err := processor.changeStats.ProcessChange(context.Background(), change); err != nil { - return err - } - - legerExt, err := xdr.MarshalBase64(change.Post.Ext) - if err != nil { - return err - } - - switch change.Type { - case xdr.LedgerEntryTypeAccount: - account := change.Post.Data.MustAccount() - - inflationDest := "" - if account.InflationDest != nil { - inflationDest = account.InflationDest.Address() - } - - var signers string - if len(account.Signers) > 0 { - var err error - signers, err = xdr.MarshalBase64(account.Signers) - if err != nil { - return err - } - } - - accountExt, err := xdr.MarshalBase64(account.Ext) - if err != nil { - return err - } - - csvWriter.Write([]string{ - account.AccountId.Address(), - strconv.FormatInt(int64(account.Balance), 10), - strconv.FormatInt(int64(account.SeqNum), 10), - strconv.FormatInt(int64(account.NumSubEntries), 10), - inflationDest, - base64.StdEncoding.EncodeToString([]byte(account.HomeDomain)), - base64.StdEncoding.EncodeToString(account.Thresholds[:]), - strconv.FormatInt(int64(account.Flags), 10), - accountExt, - signers, - legerExt, - }) - case xdr.LedgerEntryTypeTrustline: - ledgerEntry, err := xdr.MarshalBase64(change.Post) - if err != nil { - return err - } - csvWriter.Write([]string{ - ledgerEntry, - }) - case xdr.LedgerEntryTypeOffer: - offer := change.Post.Data.MustOffer() - - selling, err := xdr.MarshalBase64(offer.Selling) - if err != nil { - return err - } - - buying, err := xdr.MarshalBase64(offer.Buying) - if err != nil { - return err - } - - offerExt, err := xdr.MarshalBase64(offer.Ext) - if err != nil { - return err - } - - csvWriter.Write([]string{ - offer.SellerId.Address(), - strconv.FormatInt(int64(offer.OfferId), 10), - selling, - buying, - strconv.FormatInt(int64(offer.Amount), 10), - strconv.FormatInt(int64(offer.Price.N), 10), - strconv.FormatInt(int64(offer.Price.D), 10), - strconv.FormatInt(int64(offer.Flags), 10), - offerExt, - legerExt, - }) - case xdr.LedgerEntryTypeData: - accountData := change.Post.Data.MustData() - accountDataExt, err := xdr.MarshalBase64(accountData.Ext) - if err != nil { - return err - } - - csvWriter.Write([]string{ - accountData.AccountId.Address(), - base64.StdEncoding.EncodeToString([]byte(accountData.DataName)), - base64.StdEncoding.EncodeToString(accountData.DataValue), - accountDataExt, - legerExt, - }) - case xdr.LedgerEntryTypeClaimableBalance: - claimableBalance := change.Post.Data.MustClaimableBalance() - - ledgerEntry, err := xdr.MarshalBase64(change.Post) - if err != nil { - return err - } - - balanceID, err := xdr.MarshalBase64(claimableBalance.BalanceId) - if err != nil { - return err - } - - csvWriter.Write([]string{ - balanceID, - ledgerEntry, - }) - case xdr.LedgerEntryTypeLiquidityPool: - ledgerEntry, err := xdr.MarshalBase64(change.Post) - if err != nil { - return err - } - csvWriter.Write([]string{ - ledgerEntry, - }) - default: - return errors.Errorf("Invalid LedgerEntryType: %d", change.Type) - } - - if err := csvWriter.Error(); err != nil { - return errors.Wrap(err, "Error during csv.Writer.Write") - } - - csvWriter.Flush() - - if err := csvWriter.Error(); err != nil { - return errors.Wrap(err, "Error during csv.Writer.Flush") - } - return nil -} - -func main() { - testnet := flag.Bool("testnet", false, "connect to the Stellar test network") - flag.Parse() - - archive, err := archive(*testnet) - if err != nil { - panic(err) - } - log.SetLevel(log.InfoLevel) - - files := newCSVMap() - defer files.close() - - for entryType, fileName := range map[xdr.LedgerEntryType]string{ - xdr.LedgerEntryTypeAccount: "./accounts.csv", - xdr.LedgerEntryTypeData: "./accountdata.csv", - xdr.LedgerEntryTypeOffer: "./offers.csv", - xdr.LedgerEntryTypeTrustline: "./trustlines.csv", - xdr.LedgerEntryTypeClaimableBalance: "./claimablebalances.csv", - xdr.LedgerEntryTypeLiquidityPool: "./pools.csv", - } { - if err = files.put(entryType, fileName); err != nil { - log.WithField("err", err). - WithField("file", fileName). - Fatal("cannot create csv file") - } - } - - ledgerSequenceString := os.Getenv("LATEST_LEDGER") - ledgerSequence, err := strconv.Atoi(ledgerSequenceString) - if err != nil { - log.WithField("ledger", ledgerSequenceString). - WithField("err", err). - Fatal("cannot parse latest ledger") - } - log.WithField("ledger", ledgerSequence). - Info("Processing entries from History Archive Snapshot") - - changeReader, err := ingest.NewCheckpointChangeReader( - context.Background(), - archive, - uint32(ledgerSequence), - ) - if err != nil { - log.WithField("err", err).Fatal("cannot construct change reader") - } - defer changeReader.Close() - - changeStats := &ingest.StatsChangeProcessor{} - doneStats := printPipelineStats(changeStats) - changeProcessor := csvProcessor{files: files, changeStats: changeStats} - logFatalError := func(err error) { - log.WithField("err", err).Fatal("could not process all changes from HAS") - } - for { - change, err := changeReader.Read() - if err == io.EOF { - break - } - if err != nil { - logFatalError(errors.Wrap(err, "could not read transaction")) - } - - if err = changeProcessor.ProcessChange(change); err != nil { - logFatalError(errors.Wrap(err, "could not process change")) - } - } - - // Remove sorted files - sortedFiles := []string{ - "./accounts_sorted.csv", - "./accountdata_sorted.csv", - "./offers_sorted.csv", - "./trustlines_sorted.csv", - "./claimablebalances_sort.csv", - } - for _, file := range sortedFiles { - err := os.Remove(file) - // Ignore not exist errors - if err != nil && !os.IsNotExist(err) { - panic(err) - } - } - - doneStats <- true -} - -func archive(testnet bool) (*historyarchive.Archive, error) { - if testnet { - return historyarchive.Connect( - "https://history.stellar.org/prd/core-testnet/core_testnet_001", - historyarchive.ArchiveOptions{ - ConnectOptions: storage.ConnectOptions{ - UserAgent: "dump-ledger-state", - }, - }, - ) - } - - return historyarchive.Connect( - "https://history.stellar.org/prd/core-live/core_live_001/", - historyarchive.ArchiveOptions{ - ConnectOptions: storage.ConnectOptions{ - UserAgent: "dump-ledger-state", - }, - }, - ) -} - -func printPipelineStats(reporter *ingest.StatsChangeProcessor) chan<- bool { - startTime := time.Now() - done := make(chan bool) - ticker := time.NewTicker(10 * time.Second) - - go func() { - defer ticker.Stop() - - for { - var m runtime.MemStats - runtime.ReadMemStats(&m) - results := reporter.GetResults() - stats := log.F(results.Map()) - stats["Alloc"] = bToMb(m.Alloc) - stats["HeapAlloc"] = bToMb(m.HeapAlloc) - stats["Sys"] = bToMb(m.Sys) - stats["NumGC"] = m.NumGC - stats["Goroutines"] = runtime.NumGoroutine() - stats["NumCPU"] = runtime.NumCPU() - stats["Duration"] = time.Since(startTime) - - log.WithFields(stats).Info("Current Job Status") - - select { - case <-ticker.C: - continue - case <-done: - // Pipeline done - return - } - } - }() - - return done -} - -func bToMb(b uint64) uint64 { - return b / 1024 / 1024 -} diff --git a/exp/tools/dump-ledger-state/run_test.sh b/exp/tools/dump-ledger-state/run_test.sh deleted file mode 100755 index ef2b56356c..0000000000 --- a/exp/tools/dump-ledger-state/run_test.sh +++ /dev/null @@ -1,39 +0,0 @@ -#! /bin/bash -set -e - -if [ -z ${LATEST_LEDGER+x} ]; then - # Get latest ledger - echo "Getting latest checkpoint ledger..." - if [ -z ${TESTNET+x} ]; then - export LATEST_LEDGER=`curl -s http://history.stellar.org/prd/core-live/core_live_001/.well-known/stellar-history.json | jq -r '.currentLedger'` - else - export LATEST_LEDGER=`curl -s http://history.stellar.org/prd/core-testnet/core_testnet_001/.well-known/stellar-history.json | jq -r '.currentLedger'` - fi - echo "Latest ledger: $LATEST_LEDGER" -fi - -# Dump state using Golang -if [ -z ${TESTNET+x} ]; then - echo "Dumping pubnet state using ingest..." - go run ./main.go -else - echo "Dumping testnet state using ingest..." - go run ./main.go --testnet -fi -echo "State dumped..." - -# Catchup core -if [ -z ${TESTNET+x} ]; then - echo "Catch up from pubnet" - stellar-core --conf ./stellar-core.cfg catchup $LATEST_LEDGER/1 -else - echo "Catch up from testnet" - stellar-core --conf ./stellar-core-testnet.cfg catchup $LATEST_LEDGER/1 -fi - -echo "Dumping state from stellar-core..." -./dump_core_db.sh -echo "State dumped..." - -echo "Comparing state dumps..." -./diff_test.sh diff --git a/exp/tools/dump-ledger-state/stellar-core-testnet.cfg b/exp/tools/dump-ledger-state/stellar-core-testnet.cfg deleted file mode 100644 index a02221e795..0000000000 --- a/exp/tools/dump-ledger-state/stellar-core-testnet.cfg +++ /dev/null @@ -1,39 +0,0 @@ -HTTP_PORT=11626 -PUBLIC_HTTP_PORT=true -LOG_FILE_PATH="" - -DATABASE="postgresql://dbname=core host=localhost user=circleci" -NETWORK_PASSPHRASE="Test SDF Network ; September 2015" -UNSAFE_QUORUM=true -FAILURE_SAFETY=1 -CATCHUP_RECENT=8640 - -EXPERIMENTAL_BUCKETLIST_DB=true - -[HISTORY.cache] -get="cp /opt/stellar/history-cache/{0} {1}" - -[[HOME_DOMAINS]] -HOME_DOMAIN="testnet.stellar.org" -QUALITY="HIGH" - -[[VALIDATORS]] -NAME="sdf_testnet_1" -HOME_DOMAIN="testnet.stellar.org" -PUBLIC_KEY="GDKXE2OZMJIPOSLNA6N6F2BVCI3O777I2OOC4BV7VOYUEHYX7RTRYA7Y" -ADDRESS="core-testnet1.stellar.org" -HISTORY="curl -sf http://history.stellar.org/prd/core-testnet/core_testnet_001/{0} -o {1}" - -[[VALIDATORS]] -NAME="sdf_testnet_2" -HOME_DOMAIN="testnet.stellar.org" -PUBLIC_KEY="GCUCJTIYXSOXKBSNFGNFWW5MUQ54HKRPGJUTQFJ5RQXZXNOLNXYDHRAP" -ADDRESS="core-testnet2.stellar.org" -HISTORY="curl -sf http://history.stellar.org/prd/core-testnet/core_testnet_002/{0} -o {1}" - -[[VALIDATORS]] -NAME="sdf_testnet_3" -HOME_DOMAIN="testnet.stellar.org" -PUBLIC_KEY="GC2V2EFSXN6SQTWVYA5EPJPBWWIMSD2XQNKUOHGEKB535AQE2I6IXV2Z" -ADDRESS="core-testnet3.stellar.org" -HISTORY="curl -sf http://history.stellar.org/prd/core-testnet/core_testnet_003/{0} -o {1}" \ No newline at end of file diff --git a/exp/tools/dump-ledger-state/stellar-core.cfg b/exp/tools/dump-ledger-state/stellar-core.cfg deleted file mode 100644 index 0d97346ce6..0000000000 --- a/exp/tools/dump-ledger-state/stellar-core.cfg +++ /dev/null @@ -1,201 +0,0 @@ -HTTP_PORT=11626 -PUBLIC_HTTP_PORT=true -LOG_FILE_PATH="" - -DATABASE="postgresql://dbname=core host=localhost user=circleci" -NETWORK_PASSPHRASE="Public Global Stellar Network ; September 2015" -CATCHUP_RECENT=1 - -EXPERIMENTAL_BUCKETLIST_DB=true - -[HISTORY.cache] -get="cp /opt/stellar/history-cache/{0} {1}" - -[[HOME_DOMAINS]] -HOME_DOMAIN="publicnode.org" -QUALITY="HIGH" - -[[HOME_DOMAINS]] -HOME_DOMAIN="lobstr.co" -QUALITY="HIGH" - -[[HOME_DOMAINS]] -HOME_DOMAIN="www.franklintempleton.com" -QUALITY="HIGH" - -[[HOME_DOMAINS]] -HOME_DOMAIN="satoshipay.io" -QUALITY="HIGH" - -[[HOME_DOMAINS]] -HOME_DOMAIN="whalestack.com" -QUALITY="HIGH" - -[[HOME_DOMAINS]] -HOME_DOMAIN="www.stellar.org" -QUALITY="HIGH" - -[[HOME_DOMAINS]] -HOME_DOMAIN="stellar.blockdaemon.com" -QUALITY="HIGH" - -[[VALIDATORS]] -NAME="Boötes" -PUBLIC_KEY="GCVJ4Z6TI6Z2SOGENSPXDQ2U4RKH3CNQKYUHNSSPYFPNWTLGS6EBH7I2" -ADDRESS="bootes.publicnode.org:11625" -HISTORY="curl -sf https://bootes-history.publicnode.org/{0} -o {1}" -HOME_DOMAIN="publicnode.org" - -[[VALIDATORS]] -NAME="Lyra by BP Ventures" -PUBLIC_KEY="GCIXVKNFPKWVMKJKVK2V4NK7D4TC6W3BUMXSIJ365QUAXWBRPPJXIR2Z" -ADDRESS="lyra.publicnode.org:11625" -HISTORY="curl -sf https://lyra-history.publicnode.org/{0} -o {1}" -HOME_DOMAIN="publicnode.org" - -[[VALIDATORS]] -NAME="Hercules by OG Technologies" -PUBLIC_KEY="GBLJNN3AVZZPG2FYAYTYQKECNWTQYYUUY2KVFN2OUKZKBULXIXBZ4FCT" -ADDRESS="hercules.publicnode.org:11625" -HISTORY="curl -sf https://hercules-history.publicnode.org/{0} -o {1}" -HOME_DOMAIN="publicnode.org" - -[[VALIDATORS]] -NAME="LOBSTR 3 (North America)" -PUBLIC_KEY="GD5QWEVV4GZZTQP46BRXV5CUMMMLP4JTGFD7FWYJJWRL54CELY6JGQ63" -ADDRESS="v3.stellar.lobstr.co:11625" -HISTORY="curl -sf https://archive.v3.stellar.lobstr.co/{0} -o {1}" -HOME_DOMAIN="lobstr.co" - -[[VALIDATORS]] -NAME="LOBSTR 1 (Europe)" -PUBLIC_KEY="GCFONE23AB7Y6C5YZOMKUKGETPIAJA4QOYLS5VNS4JHBGKRZCPYHDLW7" -ADDRESS="v1.stellar.lobstr.co:11625" -HISTORY="curl -sf https://archive.v1.stellar.lobstr.co/{0} -o {1}" -HOME_DOMAIN="lobstr.co" - -[[VALIDATORS]] -NAME="LOBSTR 2 (Europe)" -PUBLIC_KEY="GCB2VSADESRV2DDTIVTFLBDI562K6KE3KMKILBHUHUWFXCUBHGQDI7VL" -ADDRESS="v2.stellar.lobstr.co:11625" -HISTORY="curl -sf https://archive.v2.stellar.lobstr.co/{0} -o {1}" -HOME_DOMAIN="lobstr.co" - -[[VALIDATORS]] -NAME="LOBSTR 4 (Asia)" -PUBLIC_KEY="GA7TEPCBDQKI7JQLQ34ZURRMK44DVYCIGVXQQWNSWAEQR6KB4FMCBT7J" -ADDRESS="v4.stellar.lobstr.co:11625" -HISTORY="curl -sf https://archive.v4.stellar.lobstr.co/{0} -o {1}" -HOME_DOMAIN="lobstr.co" - -[[VALIDATORS]] -NAME="LOBSTR 5 (India)" -PUBLIC_KEY="GA5STBMV6QDXFDGD62MEHLLHZTPDI77U3PFOD2SELU5RJDHQWBR5NNK7" -ADDRESS="v5.stellar.lobstr.co:11625" -HISTORY="curl -sf https://archive.v5.stellar.lobstr.co/{0} -o {1}" -HOME_DOMAIN="lobstr.co" - -[[VALIDATORS]] -NAME="FT SCV 2" -PUBLIC_KEY="GCMSM2VFZGRPTZKPH5OABHGH4F3AVS6XTNJXDGCZ3MKCOSUBH3FL6DOB" -ADDRESS="stellar2.franklintempleton.com:11625" -HISTORY="curl -sf https://stellar-history-usc.franklintempleton.com/azuscshf401/{0} -o {1}" -HOME_DOMAIN="www.franklintempleton.com" - -[[VALIDATORS]] -NAME="FT SCV 3" -PUBLIC_KEY="GA7DV63PBUUWNUFAF4GAZVXU2OZMYRATDLKTC7VTCG7AU4XUPN5VRX4A" -ADDRESS="stellar3.franklintempleton.com:11625" -HISTORY="curl -sf https://stellar-history-ins.franklintempleton.com/azinsshf401/{0} -o {1}" -HOME_DOMAIN="www.franklintempleton.com" - -[[VALIDATORS]] -NAME="FT SCV 1" -PUBLIC_KEY="GARYGQ5F2IJEBCZJCBNPWNWVDOFK7IBOHLJKKSG2TMHDQKEEC6P4PE4V" -ADDRESS="stellar1.franklintempleton.com:11625" -HISTORY="curl -sf https://stellar-history-usw.franklintempleton.com/azuswshf401/{0} -o {1}" -HOME_DOMAIN="www.franklintempleton.com" - -[[VALIDATORS]] -NAME="SatoshiPay Frankfurt" -PUBLIC_KEY="GC5SXLNAM3C4NMGK2PXK4R34B5GNZ47FYQ24ZIBFDFOCU6D4KBN4POAE" -ADDRESS="stellar-de-fra.satoshipay.io:11625" -HISTORY="curl -sf https://stellar-history-de-fra.satoshipay.io/{0} -o {1}" -HOME_DOMAIN="satoshipay.io" - -[[VALIDATORS]] -NAME="SatoshiPay Singapore" -PUBLIC_KEY="GBJQUIXUO4XSNPAUT6ODLZUJRV2NPXYASKUBY4G5MYP3M47PCVI55MNT" -ADDRESS="stellar-sg-sin.satoshipay.io:11625" -HISTORY="curl -sf https://stellar-history-sg-sin.satoshipay.io/{0} -o {1}" -HOME_DOMAIN="satoshipay.io" - -[[VALIDATORS]] -NAME="SatoshiPay Iowa" -PUBLIC_KEY="GAK6Z5UVGUVSEK6PEOCAYJISTT5EJBB34PN3NOLEQG2SUKXRVV2F6HZY" -ADDRESS="stellar-us-iowa.satoshipay.io:11625" -HISTORY="curl -sf https://stellar-history-us-iowa.satoshipay.io/{0} -o {1}" -HOME_DOMAIN="satoshipay.io" - -[[VALIDATORS]] -NAME="Whalestack (Germany)" -PUBLIC_KEY="GD6SZQV3WEJUH352NTVLKEV2JM2RH266VPEM7EH5QLLI7ZZAALMLNUVN" -ADDRESS="germany.stellar.whalestack.com:11625" -HISTORY="curl -sf https://germany.stellar.whalestack.com/history/{0} -o {1}" -HOME_DOMAIN="whalestack.com" - -[[VALIDATORS]] -NAME="Whalestack (Hong Kong)" -PUBLIC_KEY="GAZ437J46SCFPZEDLVGDMKZPLFO77XJ4QVAURSJVRZK2T5S7XUFHXI2Z" -ADDRESS="hongkong.stellar.whalestack.com:11625" -HISTORY="curl -sf https://hongkong.stellar.whalestack.com/history/{0} -o {1}" -HOME_DOMAIN="whalestack.com" - -[[VALIDATORS]] -NAME="Whalestack (Finland)" -PUBLIC_KEY="GADLA6BJK6VK33EM2IDQM37L5KGVCY5MSHSHVJA4SCNGNUIEOTCR6J5T" -ADDRESS="finland.stellar.whalestack.com:11625" -HISTORY="curl -sf https://finland.stellar.whalestack.com/history/{0} -o {1}" -HOME_DOMAIN="whalestack.com" - -[[VALIDATORS]] -NAME="SDF 2" -PUBLIC_KEY="GCM6QMP3DLRPTAZW2UZPCPX2LF3SXWXKPMP3GKFZBDSF3QZGV2G5QSTK" -ADDRESS="core-live-b.stellar.org:11625" -HISTORY="curl -sf http://history.stellar.org/prd/core-live/core_live_002/{0} -o {1}" -HOME_DOMAIN="www.stellar.org" - -[[VALIDATORS]] -NAME="SDF 1" -PUBLIC_KEY="GCGB2S2KGYARPVIA37HYZXVRM2YZUEXA6S33ZU5BUDC6THSB62LZSTYH" -ADDRESS="core-live-a.stellar.org:11625" -HISTORY="curl -sf http://history.stellar.org/prd/core-live/core_live_001/{0} -o {1}" -HOME_DOMAIN="www.stellar.org" - -[[VALIDATORS]] -NAME="SDF 3" -PUBLIC_KEY="GABMKJM6I25XI4K7U6XWMULOUQIQ27BCTMLS6BYYSOWKTBUXVRJSXHYQ" -ADDRESS="core-live-c.stellar.org:11625" -HISTORY="curl -sf http://history.stellar.org/prd/core-live/core_live_003/{0} -o {1}" -HOME_DOMAIN="www.stellar.org" - -[[VALIDATORS]] -NAME="Blockdaemon Validator 3" -PUBLIC_KEY="GAYXZ4PZ7P6QOX7EBHPIZXNWY4KCOBYWJCA4WKWRKC7XIUS3UJPT6EZ4" -ADDRESS="stellar-full-validator3.bdnodes.net:11625" -HISTORY="curl -sf https://stellar-full-history3.bdnodes.net/{0} -o {1}" -HOME_DOMAIN="stellar.blockdaemon.com" - -[[VALIDATORS]] -NAME="Blockdaemon Validator 2" -PUBLIC_KEY="GAVXB7SBJRYHSG6KSQHY74N7JAFRL4PFVZCNWW2ARI6ZEKNBJSMSKW7C" -ADDRESS="stellar-full-validator2.bdnodes.net:11625" -HISTORY="curl -sf https://stellar-full-history2.bdnodes.net/{0} -o {1}" -HOME_DOMAIN="stellar.blockdaemon.com" - -[[VALIDATORS]] -NAME="Blockdaemon Validator 1" -PUBLIC_KEY="GAAV2GCVFLNN522ORUYFV33E76VPC22E72S75AQ6MBR5V45Z5DWVPWEU" -ADDRESS="stellar-full-validator1.bdnodes.net:11625" -HISTORY="curl -sf https://stellar-full-history1.bdnodes.net/{0} -o {1}" -HOME_DOMAIN="stellar.blockdaemon.com" \ No newline at end of file diff --git a/historyarchive/archive.go b/historyarchive/archive.go index 4f9e14380f..d97471b42f 100644 --- a/historyarchive/archive.go +++ b/historyarchive/archive.go @@ -71,6 +71,7 @@ type ArchiveInterface interface { GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) GetRootHAS() (HistoryArchiveState, error) GetLedgers(start, end uint32) (map[uint32]*Ledger, error) + GetLatestLedgerSequence() (uint32, error) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error @@ -176,6 +177,16 @@ func (a *Archive) PutPathHAS(path string, has HistoryArchiveState, opts *Command return a.backend.PutFile(path, io.NopCloser(bytes.NewReader(buf))) } +func (a *Archive) GetLatestLedgerSequence() (uint32, error) { + has, err := a.GetRootHAS() + if err != nil { + log.Error("Error getting root HAS from archive", err) + return 0, errors.Wrap(err, "failed to retrieve the latest ledger sequence from history archive") + } + + return has.CurrentLedger, nil +} + func (a *Archive) BucketExists(bucket Hash) (bool, error) { return a.cachedExists(BucketPath(bucket)) } diff --git a/historyarchive/archive_pool.go b/historyarchive/archive_pool.go index 48178ade26..28967d8aa6 100644 --- a/historyarchive/archive_pool.go +++ b/historyarchive/archive_pool.go @@ -204,6 +204,16 @@ func (pa *ArchivePool) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) }) } +func (pa *ArchivePool) GetLatestLedgerSequence() (uint32, error) { + has, err := pa.GetRootHAS() + if err != nil { + log.Error("Error getting root HAS from archive", err) + return 0, errors.Wrap(err, "failed to retrieve the latest ledger sequence from history archive") + } + + return has.CurrentLedger, nil +} + func (pa *ArchivePool) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error { return pa.runRoundRobin(func(ai ArchiveInterface) error { return ai.PutCheckpointHAS(chk, has, opts) diff --git a/historyarchive/mocks.go b/historyarchive/mocks.go index fa5716e5de..efe333cd33 100644 --- a/historyarchive/mocks.go +++ b/historyarchive/mocks.go @@ -10,6 +10,11 @@ type MockArchive struct { mock.Mock } +func (m *MockArchive) GetLatestLedgerSequence() (uint32, error) { + a := m.Called() + return a.Get(0).(uint32), a.Error(1) +} + func (m *MockArchive) GetCheckpointManager() CheckpointManager { a := m.Called() return a.Get(0).(CheckpointManager) diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 9d1f7f7fc0..c8f28974f5 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -349,11 +349,11 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error ) } - c.stellarCoreRunner = c.stellarCoreRunnerFactory() - err = c.stellarCoreRunner.catchup(from, to) - if err != nil { + stellarCoreRunner := c.stellarCoreRunnerFactory() + if err = stellarCoreRunner.catchup(from, to); err != nil { return errors.Wrap(err, "error running stellar-core") } + c.stellarCoreRunner = stellarCoreRunner // The next ledger should be the first ledger of the checkpoint containing // the requested ledger @@ -375,11 +375,11 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro return errors.Wrap(err, "error calculating ledger and hash for stellar-core run") } - c.stellarCoreRunner = c.stellarCoreRunnerFactory() - err = c.stellarCoreRunner.runFrom(runFrom, ledgerHash) - if err != nil { + stellarCoreRunner := c.stellarCoreRunnerFactory() + if err = stellarCoreRunner.runFrom(runFrom, ledgerHash); err != nil { return errors.Wrap(err, "error running stellar-core") } + c.stellarCoreRunner = stellarCoreRunner // In the online mode we update nextLedger after streaming the first ledger. // This is to support versions before and after/including v17.1.0 that @@ -556,7 +556,7 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool { return false } - if exited, _ := c.stellarCoreRunner.getProcessExitError(); exited { + if _, exited := c.stellarCoreRunner.getProcessExitError(); exited { return false } @@ -627,9 +627,6 @@ func (c *CaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xd if c.stellarCoreRunner == nil { return xdr.LedgerCloseMeta{}, errors.New("stellar-core cannot be nil, call PrepareRange first") } - if c.closed { - return xdr.LedgerCloseMeta{}, errors.New("stellar-core has an error, call PrepareRange first") - } if sequence < c.nextExpectedSequence() { return xdr.LedgerCloseMeta{}, errors.Errorf( @@ -647,12 +644,17 @@ func (c *CaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xd ) } + ch, ok := c.stellarCoreRunner.getMetaPipe() + if !ok { + return xdr.LedgerCloseMeta{}, errors.New("stellar-core is not running, call PrepareRange first") + } + // Now loop along the range until we find the ledger we want. for { select { case <-ctx.Done(): return xdr.LedgerCloseMeta{}, ctx.Err() - case result, ok := <-c.stellarCoreRunner.getMetaPipe(): + case result, ok := <-ch: found, ledger, err := c.handleMetaPipeResult(sequence, result, ok) if found || err != nil { return ledger, err @@ -732,7 +734,7 @@ func (c *CaptiveStellarCore) checkMetaPipeResult(result metaResult, ok bool) err return err } if !ok || result.err != nil { - exited, err := c.stellarCoreRunner.getProcessExitError() + err, exited := c.stellarCoreRunner.getProcessExitError() if exited && err != nil { // Case 2 - The stellar core process exited unexpectedly with an error message return errors.Wrap(err, "stellar core exited unexpectedly") @@ -775,12 +777,12 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3 if c.stellarCoreRunner == nil { return 0, errors.New("stellar-core cannot be nil, call PrepareRange first") } - if c.closed { - return 0, errors.New("stellar-core is closed, call PrepareRange first") - + ch, ok := c.stellarCoreRunner.getMetaPipe() + if !ok { + return 0, errors.New("stellar-core is not running, call PrepareRange first") } if c.lastLedger == nil { - return c.nextExpectedSequence() - 1 + uint32(len(c.stellarCoreRunner.getMetaPipe())), nil + return c.nextExpectedSequence() - 1 + uint32(len(ch)), nil } return *c.lastLedger, nil } diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index 76319c2f77..f8161aec25 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -42,14 +42,14 @@ func (m *stellarCoreRunnerMock) runFrom(from uint32, hash string) error { return a.Error(0) } -func (m *stellarCoreRunnerMock) getMetaPipe() <-chan metaResult { +func (m *stellarCoreRunnerMock) getMetaPipe() (<-chan metaResult, bool) { a := m.Called() - return a.Get(0).(<-chan metaResult) + return a.Get(0).(<-chan metaResult), a.Bool(1) } -func (m *stellarCoreRunnerMock) getProcessExitError() (bool, error) { +func (m *stellarCoreRunnerMock) getProcessExitError() (error, bool) { a := m.Called() - return a.Bool(0), a.Error(1) + return a.Error(0), a.Bool(1) } func (m *stellarCoreRunnerMock) close() error { @@ -213,7 +213,7 @@ func TestCaptivePrepareRange(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockArchive := &historyarchive.MockArchive{} @@ -251,8 +251,8 @@ func TestCaptivePrepareRangeCrash(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once() - mockRunner.On("getProcessExitError").Return(true, errors.New("exit code -1")) - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getProcessExitError").Return(errors.New("exit code -1"), true) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("close").Return(nil).Once() mockRunner.On("context").Return(ctx) @@ -292,7 +292,7 @@ func TestCaptivePrepareRangeTerminated(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockArchive := &historyarchive.MockArchive{} @@ -328,7 +328,7 @@ func TestCaptivePrepareRangeCloseNotFullyTerminated(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Twice() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil) @@ -364,7 +364,7 @@ func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("close").Return(fmt.Errorf("transient error")) - mockRunner.On("getProcessExitError").Return(false, nil) + mockRunner.On("getProcessExitError").Return(nil, false) mockRunner.On("context").Return(ctx) captiveBackend := CaptiveStellarCore{ @@ -440,7 +440,7 @@ func TestCaptivePrepareRange_FromIsAheadOfRootHAS(t *testing.T) { } mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) assert.NoError(t, captiveBackend.PrepareRange(ctx, UnboundedRange(100))) @@ -481,7 +481,7 @@ func TestCaptivePrepareRangeWithDB_FromIsAheadOfRootHAS(t *testing.T) { LedgerCloseMeta: &meta, } mockRunner.On("runFrom", uint32(99), "").Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) assert.NoError(t, captiveBackend.PrepareRange(ctx, UnboundedRange(100))) @@ -517,7 +517,6 @@ func TestCaptivePrepareRange_ToIsAheadOfRootHAS(t *testing.T) { func TestCaptivePrepareRange_ErrCatchup(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(100), uint32(192)).Return(errors.New("transient error")).Once() - mockRunner.On("close").Return(nil).Once() mockArchive := &historyarchive.MockArchive{} mockArchive. @@ -552,7 +551,6 @@ func TestCaptivePrepareRange_ErrCatchup(t *testing.T) { func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} mockRunner.On("runFrom", uint32(126), "0000000000000000000000000000000000000000000000000000000000000000").Return(errors.New("transient error")).Once() - mockRunner.On("close").Return(nil).Once() mockArchive := &historyarchive.MockArchive{} mockArchive. @@ -604,9 +602,9 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("runFrom", uint32(64), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) - mockRunner.On("getProcessExitError").Return(false, nil) + mockRunner.On("getProcessExitError").Return(nil, false) mockArchive := &historyarchive.MockArchive{} mockArchive. @@ -653,7 +651,7 @@ func TestGetLatestLedgerSequence(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockArchive := &historyarchive.MockArchive{} @@ -699,7 +697,7 @@ func TestGetLatestLedgerSequenceRaceCondition(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("runFrom", mock.Anything, mock.Anything).Return(nil) @@ -766,9 +764,9 @@ func TestCaptiveGetLedger(t *testing.T) { ctx, cancel := context.WithCancel(ctx) mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) - mockRunner.On("getProcessExitError").Return(false, nil) + mockRunner.On("getProcessExitError").Return(nil, false) mockArchive := &historyarchive.MockArchive{} mockArchive. @@ -857,7 +855,7 @@ func TestCaptiveGetLedgerCacheLatestLedger(t *testing.T) { defer cancel() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("runFrom", uint32(65), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockArchive := &historyarchive.MockArchive{} @@ -919,7 +917,7 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T) ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil) @@ -965,7 +963,7 @@ func TestCaptiveGetLedger_NextLedger0RangeFromIsSmallerThanLedgerFromBuffer(t *t ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("runFrom", uint32(64), mock.Anything).Return(nil) - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil) @@ -1067,13 +1065,13 @@ func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) ctx, cancel := context.WithCancel(ctx) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil).Run(func(args mock.Arguments) { cancel() }).Once() - mockRunner.On("getProcessExitError").Return(false, nil) + mockRunner.On("getProcessExitError").Return(nil, false) // even if the request to fetch the latest checkpoint succeeds, we should fail at creating the subprocess mockArchive := &historyarchive.MockArchive{} @@ -1125,7 +1123,7 @@ func TestCaptiveGetLedger_ErrClosingAfterLastLedger(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(fmt.Errorf("transient error")).Once() @@ -1167,7 +1165,7 @@ func TestCaptiveAfterClose(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} ctx, cancel := context.WithCancel(context.Background()) mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil).Once() @@ -1222,7 +1220,7 @@ func TestGetLedgerBoundsCheck(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(128), uint32(130)).Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockArchive := &historyarchive.MockArchive{} @@ -1346,9 +1344,9 @@ func TestCaptiveGetLedgerTerminatedUnexpectedly(t *testing.T) { ctx := testCase.ctx mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(64), uint32(100)).Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) - mockRunner.On("getProcessExitError").Return(testCase.processExited, testCase.processExitedError) + mockRunner.On("getProcessExitError").Return(testCase.processExitedError, testCase.processExited) mockRunner.On("close").Return(nil).Once() mockArchive := &historyarchive.MockArchive{} @@ -1514,7 +1512,7 @@ func TestCaptiveRunFromParams(t *testing.T) { func TestCaptiveIsPrepared(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} mockRunner.On("context").Return(context.Background()).Maybe() - mockRunner.On("getProcessExitError").Return(false, nil) + mockRunner.On("getProcessExitError").Return(nil, false) // c.prepared == nil captiveBackend := CaptiveStellarCore{ @@ -1578,7 +1576,7 @@ func TestCaptiveIsPreparedCoreContextCancelled(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} ctx, cancel := context.WithCancel(context.Background()) mockRunner.On("context").Return(ctx).Maybe() - mockRunner.On("getProcessExitError").Return(false, nil) + mockRunner.On("getProcessExitError").Return(nil, false) rang := UnboundedRange(100) captiveBackend := CaptiveStellarCore{ @@ -1630,7 +1628,7 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("runFrom", uint32(254), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once() - mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil).Once() diff --git a/ingest/ledgerbackend/catchup.go b/ingest/ledgerbackend/catchup.go new file mode 100644 index 0000000000..2cd12df0f3 --- /dev/null +++ b/ingest/ledgerbackend/catchup.go @@ -0,0 +1,76 @@ +package ledgerbackend + +import ( + "context" + "fmt" + + "github.com/stellar/go/support/log" +) + +type catchupStream struct { + dir workingDir + from uint32 + to uint32 + coreCmdFactory coreCmdFactory + log *log.Entry + useDB bool +} + +func newCatchupStream(r *stellarCoreRunner, from, to uint32) catchupStream { + // We want to use ephemeral directories in running the catchup command + // (used for the reingestion use case) because it's possible to run parallel + // reingestion where multiple captive cores are active on the same machine. + // Having ephemeral directories will ensure that each ingestion worker will + // have a separate working directory + dir := newWorkingDir(r, true) + return catchupStream{ + dir: dir, + from: from, + to: to, + coreCmdFactory: newCoreCmdFactory(r, dir), + log: r.log, + useDB: r.useDB, + } +} + +func (s catchupStream) getWorkingDir() workingDir { + return s.dir +} + +func (s catchupStream) start(ctx context.Context) (cmdI, pipe, error) { + var err error + var cmd cmdI + var captiveCorePipe pipe + + rangeArg := fmt.Sprintf("%d/%d", s.to, s.to-s.from+1) + params := []string{"catchup", rangeArg, "--metadata-output-stream", s.coreCmdFactory.getPipeName()} + + // horizon operator has specified to use external storage for captive core ledger state + // instruct captive core invocation to not use memory, and in that case + // cc will look at DATABASE property in cfg toml for the external storage source to use. + // when using external storage of ledgers, use new-db to first set the state of + // remote db storage to genesis to purge any prior state and reset. + if s.useDB { + cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOffline, true, "new-db") + if err != nil { + return nil, pipe{}, fmt.Errorf("error creating command: %w", err) + } + if err = cmd.Run(); err != nil { + return nil, pipe{}, fmt.Errorf("error initializing core db: %w", err) + } + } else { + params = append(params, "--in-memory") + } + + cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOffline, true, params...) + if err != nil { + return nil, pipe{}, fmt.Errorf("error creating command: %w", err) + } + + captiveCorePipe, err = s.coreCmdFactory.startCaptiveCore(cmd) + if err != nil { + return nil, pipe{}, fmt.Errorf("error starting `stellar-core run` subprocess: %w", err) + } + + return cmd, captiveCorePipe, nil +} diff --git a/ingest/ledgerbackend/cmd.go b/ingest/ledgerbackend/cmd.go new file mode 100644 index 0000000000..8af729f0a6 --- /dev/null +++ b/ingest/ledgerbackend/cmd.go @@ -0,0 +1,157 @@ +package ledgerbackend + +import ( + "context" + "fmt" + "io/fs" + "io/ioutil" + "math/rand" + "os" + "os/exec" + "time" + + "github.com/stellar/go/support/log" +) + +type isDir interface { + IsDir() bool +} + +type systemCaller interface { + removeAll(path string) error + writeFile(filename string, data []byte, perm fs.FileMode) error + mkdirAll(path string, perm os.FileMode) error + stat(name string) (isDir, error) + command(ctx context.Context, name string, arg ...string) cmdI +} + +type realSystemCaller struct{} + +func (realSystemCaller) removeAll(path string) error { + return os.RemoveAll(path) +} + +func (realSystemCaller) writeFile(filename string, data []byte, perm fs.FileMode) error { + return ioutil.WriteFile(filename, data, perm) +} + +func (realSystemCaller) mkdirAll(path string, perm os.FileMode) error { + return os.MkdirAll(path, perm) +} + +func (realSystemCaller) stat(name string) (isDir, error) { + return os.Stat(name) +} + +func (realSystemCaller) command(ctx context.Context, name string, arg ...string) cmdI { + cmd := exec.CommandContext(ctx, name, arg...) + cmd.Cancel = func() error { + return cmd.Process.Signal(os.Interrupt) + } + cmd.WaitDelay = time.Second * 10 + return &realCmd{Cmd: cmd} +} + +type cmdI interface { + Output() ([]byte, error) + Wait() error + Start() error + Run() error + setDir(dir string) + setLogLineWriter(logWriter *logLineWriter) + setExtraFiles([]*os.File) +} + +type realCmd struct { + *exec.Cmd + logWriter *logLineWriter +} + +func (r *realCmd) setDir(dir string) { + r.Cmd.Dir = dir +} + +func (r *realCmd) setLogLineWriter(logWriter *logLineWriter) { + r.logWriter = logWriter +} + +func (r *realCmd) setExtraFiles(extraFiles []*os.File) { + r.ExtraFiles = extraFiles +} + +func (r *realCmd) Start() error { + if r.logWriter != nil { + r.Cmd.Stdout = r.logWriter + r.Cmd.Stderr = r.logWriter + r.logWriter.Start() + } + err := r.Cmd.Start() + if err != nil && r.logWriter != nil { + r.logWriter.Close() + } + return err +} + +func (r *realCmd) Run() error { + if r.logWriter != nil { + r.Cmd.Stdout = r.logWriter + r.Cmd.Stderr = r.logWriter + r.logWriter.Start() + } + err := r.Cmd.Run() + if r.logWriter != nil { + r.logWriter.Close() + } + return err +} + +func (r *realCmd) Wait() error { + err := r.Cmd.Wait() + if r.logWriter != nil { + r.logWriter.Close() + } + return err +} + +type coreCmdFactory struct { + log *log.Entry + systemCaller systemCaller + executablePath string + dir workingDir + nonce string +} + +func newCoreCmdFactory(r *stellarCoreRunner, dir workingDir) coreCmdFactory { + return coreCmdFactory{ + log: r.log, + systemCaller: r.systemCaller, + executablePath: r.executablePath, + dir: dir, + nonce: fmt.Sprintf( + "captive-stellar-core-%x", + rand.New(rand.NewSource(time.Now().UnixNano())).Uint64(), + ), + } +} + +func (c coreCmdFactory) newCmd(ctx context.Context, mode stellarCoreRunnerMode, redirectOutputToLogs bool, params ...string) (cmdI, error) { + if err := c.dir.createIfNotExists(); err != nil { + return nil, err + } + + if err := c.dir.writeConf(mode); err != nil { + return nil, fmt.Errorf("error writing configuration: %w", err) + } + + allParams := []string{"--conf", c.dir.getConfFileName()} + if redirectOutputToLogs { + allParams = append(allParams, "--console") + } + allParams = append(allParams, params...) + cmd := c.systemCaller.command(ctx, c.executablePath, allParams...) + cmd.setDir(c.dir.path) + if redirectOutputToLogs { + cmd.setLogLineWriter(newLogLineWriter(c.log)) + } + return cmd, nil +} diff --git a/ingest/ledgerbackend/stellar_core_runner_posix.go b/ingest/ledgerbackend/cmd_posix.go similarity index 75% rename from ingest/ledgerbackend/stellar_core_runner_posix.go rename to ingest/ledgerbackend/cmd_posix.go index 6f34a49a75..c36dc208ee 100644 --- a/ingest/ledgerbackend/stellar_core_runner_posix.go +++ b/ingest/ledgerbackend/cmd_posix.go @@ -4,26 +4,25 @@ package ledgerbackend import ( + "fmt" "os" - - "github.com/pkg/errors" ) // Posix-specific methods for the StellarCoreRunner type. -func (c *stellarCoreRunner) getPipeName() string { +func (c coreCmdFactory) getPipeName() string { // The exec.Cmd.ExtraFiles field carries *io.File values that are assigned // to child process fds counting from 3, and we'll be passing exactly one // fd: the write end of the anonymous pipe below. return "fd:3" } -func (c *stellarCoreRunner) start(cmd cmdI) (pipe, error) { +func (c coreCmdFactory) startCaptiveCore(cmd cmdI) (pipe, error) { // First make an anonymous pipe. // Note io.File objects close-on-finalization. readFile, writeFile, err := os.Pipe() if err != nil { - return pipe{}, errors.Wrap(err, "error making a pipe") + return pipe{}, fmt.Errorf("error making a pipe: %w", err) } p := pipe{Reader: readFile, File: writeFile} @@ -34,7 +33,7 @@ func (c *stellarCoreRunner) start(cmd cmdI) (pipe, error) { if err != nil { writeFile.Close() readFile.Close() - return pipe{}, errors.Wrap(err, "error starting stellar-core") + return pipe{}, fmt.Errorf("error starting stellar-core: %w", err) } return p, nil diff --git a/ingest/ledgerbackend/stellar_core_runner_windows.go b/ingest/ledgerbackend/cmd_windows.go similarity index 85% rename from ingest/ledgerbackend/stellar_core_runner_windows.go rename to ingest/ledgerbackend/cmd_windows.go index 47368a55b6..6eb6e4d0d2 100644 --- a/ingest/ledgerbackend/stellar_core_runner_windows.go +++ b/ingest/ledgerbackend/cmd_windows.go @@ -11,11 +11,11 @@ import ( // Windows-specific methods for the stellarCoreRunner type. -func (c *stellarCoreRunner) getPipeName() string { +func (c coreCmdFactory) getPipeName() string { return fmt.Sprintf(`\\.\pipe\%s`, c.nonce) } -func (c *stellarCoreRunner) start(cmd cmdI) (pipe, error) { +func (c coreCmdFactory) startCaptiveCore(cmd cmdI) (pipe, error) { // First set up the server pipe. listener, err := winio.ListenPipe(c.getPipeName(), nil) if err != nil { diff --git a/ingest/ledgerbackend/core_log.go b/ingest/ledgerbackend/core_log.go new file mode 100644 index 0000000000..438bc136b0 --- /dev/null +++ b/ingest/ledgerbackend/core_log.go @@ -0,0 +1,82 @@ +package ledgerbackend + +import ( + "bufio" + "io" + "regexp" + "strings" + "sync" + + "github.com/stellar/go/support/log" +) + +type logLineWriter struct { + pipeReader *io.PipeReader + pipeWriter *io.PipeWriter + wg sync.WaitGroup + log *log.Entry +} + +func newLogLineWriter(log *log.Entry) *logLineWriter { + rd, wr := io.Pipe() + return &logLineWriter{ + pipeReader: rd, + pipeWriter: wr, + log: log, + } +} + +func (l *logLineWriter) Write(p []byte) (n int, err error) { + return l.pipeWriter.Write(p) +} + +func (l *logLineWriter) Close() error { + err := l.pipeWriter.Close() + l.wg.Wait() + return err +} + +func (l *logLineWriter) Start() { + br := bufio.NewReader(l.pipeReader) + l.wg.Add(1) + go func() { + defer l.wg.Done() + dateRx := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3} `) + levelRx := regexp.MustCompile(`\[(\w+) ([A-Z]+)\] (.*)`) + for { + line, err := br.ReadString('\n') + if err != nil { + break + } + line = dateRx.ReplaceAllString(line, "") + line = strings.TrimSpace(line) + + if line == "" { + continue + } + + matches := levelRx.FindStringSubmatch(line) + if len(matches) >= 4 { + // Extract the substrings from the log entry and trim it + category, level := matches[1], matches[2] + line = matches[3] + + levelMapping := map[string]func(string, ...interface{}){ + "FATAL": l.log.Errorf, + "ERROR": l.log.Errorf, + "WARNING": l.log.Warnf, + "INFO": l.log.Infof, + "DEBUG": l.log.Debugf, + } + + writer := l.log.Infof + if f, ok := levelMapping[strings.ToUpper(level)]; ok { + writer = f + } + writer("%s: %s", category, line) + } else { + l.log.Info(line) + } + } + }() +} diff --git a/ingest/ledgerbackend/dir.go b/ingest/ledgerbackend/dir.go new file mode 100644 index 0000000000..d26835936c --- /dev/null +++ b/ingest/ledgerbackend/dir.go @@ -0,0 +1,109 @@ +package ledgerbackend + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + + "github.com/stellar/go/support/log" +) + +type workingDir struct { + ephemeral bool + path string + log *log.Entry + toml *CaptiveCoreToml + systemCaller systemCaller +} + +func newWorkingDir(r *stellarCoreRunner, ephemeral bool) workingDir { + var path string + if ephemeral { + path = filepath.Join(r.storagePath, "captive-core-"+createRandomHexString(8)) + } else { + path = filepath.Join(r.storagePath, "captive-core") + } + return workingDir{ + ephemeral: ephemeral, + path: path, + log: r.log, + toml: r.toml, + systemCaller: r.systemCaller, + } +} + +func (w workingDir) createIfNotExists() error { + info, err := w.systemCaller.stat(w.path) + if os.IsNotExist(err) { + innerErr := w.systemCaller.mkdirAll(w.path, os.FileMode(int(0755))) // rwx|rx|rx + if innerErr != nil { + return fmt.Errorf("failed to create storage directory (%s): %w", w.path, innerErr) + } + } else if !info.IsDir() { + return fmt.Errorf("%s is not a directory", w.path) + } else if err != nil { + return fmt.Errorf("error accessing storage directory (%s): %w", w.path, err) + } + + return nil +} + +func (w workingDir) writeConf(mode stellarCoreRunnerMode) error { + text, err := generateConfig(w.toml, mode) + if err != nil { + return err + } + + w.log.Debugf("captive core config file contents:\n%s", string(text)) + return w.systemCaller.writeFile(w.getConfFileName(), text, 0644) +} + +func (w workingDir) cleanup(coreExitError error) error { + if w.ephemeral || (coreExitError != nil && !errors.Is(coreExitError, context.Canceled)) { + return w.remove() + } + return nil +} + +func (w workingDir) remove() error { + return w.systemCaller.removeAll(w.path) +} + +func generateConfig(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode) ([]byte, error) { + if mode == stellarCoreRunnerModeOffline { + var err error + captiveCoreToml, err = captiveCoreToml.CatchupToml() + if err != nil { + return nil, fmt.Errorf("could not generate catch up config: %w", err) + } + } + + if !captiveCoreToml.QuorumSetIsConfigured() { + return nil, fmt.Errorf("captive-core config file does not define any quorum set") + } + + text, err := captiveCoreToml.Marshal() + if err != nil { + return nil, fmt.Errorf("could not marshal captive core config: %w", err) + } + return text, nil +} + +func (w workingDir) getConfFileName() string { + joinedPath := filepath.Join(w.path, "stellar-core.conf") + + // Given that `storagePath` can be anything, we need the full, absolute path + // here so that everything Core needs is created under the storagePath + // subdirectory. + // + // If the path *can't* be absolutely resolved (bizarre), we can still try + // recovering by using the path the user specified directly. + path, err := filepath.Abs(joinedPath) + if err != nil { + w.log.Warnf("Failed to resolve %s as an absolute path: %s", joinedPath, err) + return joinedPath + } + return path +} diff --git a/ingest/ledgerbackend/main.go b/ingest/ledgerbackend/main.go deleted file mode 100644 index 6029b8a1f6..0000000000 --- a/ingest/ledgerbackend/main.go +++ /dev/null @@ -1,92 +0,0 @@ -package ledgerbackend - -import ( - "io/fs" - "io/ioutil" - "os" - "os/exec" -) - -type isDir interface { - IsDir() bool -} - -type systemCaller interface { - removeAll(path string) error - writeFile(filename string, data []byte, perm fs.FileMode) error - mkdirAll(path string, perm os.FileMode) error - stat(name string) (isDir, error) - command(name string, arg ...string) cmdI -} - -type realSystemCaller struct{} - -func (realSystemCaller) removeAll(path string) error { - return os.RemoveAll(path) -} - -func (realSystemCaller) writeFile(filename string, data []byte, perm fs.FileMode) error { - return ioutil.WriteFile(filename, data, perm) -} - -func (realSystemCaller) mkdirAll(path string, perm os.FileMode) error { - return os.MkdirAll(path, perm) -} - -func (realSystemCaller) stat(name string) (isDir, error) { - return os.Stat(name) -} - -func (realSystemCaller) command(name string, arg ...string) cmdI { - cmd := exec.Command(name, arg...) - return &realCmd{Cmd: cmd} -} - -type cmdI interface { - Output() ([]byte, error) - Wait() error - Start() error - Run() error - setDir(dir string) - setStdout(stdout *logLineWriter) - getStdout() *logLineWriter - setStderr(stderr *logLineWriter) - getStderr() *logLineWriter - getProcess() *os.Process - setExtraFiles([]*os.File) -} - -type realCmd struct { - *exec.Cmd - stdout, stderr *logLineWriter -} - -func (r *realCmd) setDir(dir string) { - r.Cmd.Dir = dir -} - -func (r *realCmd) setStdout(stdout *logLineWriter) { - r.stdout = stdout - r.Cmd.Stdout = stdout -} - -func (r *realCmd) getStdout() *logLineWriter { - return r.stdout -} - -func (r *realCmd) setStderr(stderr *logLineWriter) { - r.stderr = stderr - r.Cmd.Stderr = stderr -} - -func (r *realCmd) getStderr() *logLineWriter { - return r.stderr -} - -func (r *realCmd) getProcess() *os.Process { - return r.Cmd.Process -} - -func (r *realCmd) setExtraFiles(extraFiles []*os.File) { - r.ExtraFiles = extraFiles -} diff --git a/ingest/ledgerbackend/mock_cmd_test.go b/ingest/ledgerbackend/mock_cmd_test.go index bf06a9ae86..8be533e8b1 100644 --- a/ingest/ledgerbackend/mock_cmd_test.go +++ b/ingest/ledgerbackend/mock_cmd_test.go @@ -1,7 +1,6 @@ package ledgerbackend import ( - "io" "os" "github.com/stretchr/testify/mock" @@ -35,27 +34,8 @@ func (m *mockCmd) setDir(dir string) { m.Called(dir) } -func (m *mockCmd) setStdout(stdout *logLineWriter) { - m.Called(stdout) -} - -func (m *mockCmd) getStdout() *logLineWriter { - args := m.Called() - return args.Get(0).(*logLineWriter) -} - -func (m *mockCmd) setStderr(stderr *logLineWriter) { - m.Called(stderr) -} - -func (m *mockCmd) getStderr() *logLineWriter { - args := m.Called() - return args.Get(0).(*logLineWriter) -} - -func (m *mockCmd) getProcess() *os.Process { - args := m.Called() - return args.Get(0).(*os.Process) +func (m *mockCmd) setLogLineWriter(logWriter *logLineWriter) { + m.Called(logWriter) } func (m *mockCmd) setExtraFiles(files []*os.File) { @@ -63,15 +43,9 @@ func (m *mockCmd) setExtraFiles(files []*os.File) { } func simpleCommandMock() *mockCmd { - _, writer := io.Pipe() - llw := logLineWriter{pipeWriter: writer} cmdMock := &mockCmd{} cmdMock.On("setDir", mock.Anything) - cmdMock.On("setStdout", mock.Anything) - cmdMock.On("getStdout").Return(&llw) - cmdMock.On("setStderr", mock.Anything) - cmdMock.On("getStderr").Return(&llw) - cmdMock.On("getProcess").Return(&os.Process{}).Maybe() + cmdMock.On("setLogLineWriter", mock.Anything) cmdMock.On("setExtraFiles", mock.Anything) cmdMock.On("Start").Return(nil) return cmdMock diff --git a/ingest/ledgerbackend/mock_system_caller_test.go b/ingest/ledgerbackend/mock_system_caller_test.go index 99e1faede9..7878e39f34 100644 --- a/ingest/ledgerbackend/mock_system_caller_test.go +++ b/ingest/ledgerbackend/mock_system_caller_test.go @@ -1,6 +1,7 @@ package ledgerbackend import ( + "context" "io/fs" "os" @@ -37,8 +38,8 @@ func (m *mockSystemCaller) stat(name string) (isDir, error) { return args.Get(0).(isDir), args.Error(1) } -func (m *mockSystemCaller) command(name string, arg ...string) cmdI { - a := []interface{}{name} +func (m *mockSystemCaller) command(ctx context.Context, name string, arg ...string) cmdI { + a := []interface{}{ctx, name} for _, ar := range arg { a = append(a, ar) } diff --git a/ingest/ledgerbackend/run_from.go b/ingest/ledgerbackend/run_from.go new file mode 100644 index 0000000000..2d02322519 --- /dev/null +++ b/ingest/ledgerbackend/run_from.go @@ -0,0 +1,140 @@ +package ledgerbackend + +import ( + "context" + "encoding/json" + "fmt" + "runtime" + + "github.com/stellar/go/protocols/stellarcore" + "github.com/stellar/go/support/log" +) + +type runFromStream struct { + dir workingDir + from uint32 + hash string + coreCmdFactory coreCmdFactory + log *log.Entry + useDB bool +} + +func newRunFromStream(r *stellarCoreRunner, from uint32, hash string) runFromStream { + // We only use ephemeral directories on windows because there is + // no way to terminate captive core gracefully on windows. + // Having an ephemeral directory ensures that it is wiped out + // whenever we terminate captive core + dir := newWorkingDir(r, runtime.GOOS == "windows") + return runFromStream{ + dir: dir, + from: from, + hash: hash, + coreCmdFactory: newCoreCmdFactory(r, dir), + log: r.log, + useDB: r.useDB, + } +} + +func (s runFromStream) getWorkingDir() workingDir { + return s.dir +} + +func (s runFromStream) offlineInfo(ctx context.Context) (stellarcore.InfoResponse, error) { + cmd, err := s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOnline, false, "offline-info") + if err != nil { + return stellarcore.InfoResponse{}, fmt.Errorf("error creating offline-info cmd: %w", err) + } + output, err := cmd.Output() + if err != nil { + return stellarcore.InfoResponse{}, fmt.Errorf("error executing offline-info cmd: %w", err) + } + var info stellarcore.InfoResponse + err = json.Unmarshal(output, &info) + if err != nil { + return stellarcore.InfoResponse{}, fmt.Errorf("invalid output of offline-info cmd: %w", err) + } + return info, nil +} + +func (s runFromStream) start(ctx context.Context) (cmd cmdI, captiveCorePipe pipe, returnErr error) { + var err error + var createNewDB bool + defer func() { + if returnErr != nil && createNewDB { + // if we could not start captive core remove the new db we created + s.dir.remove() + } + }() + if s.useDB { + // Check if on-disk core DB exists and what's the LCL there. If not what + // we need remove storage dir and start from scratch. + var info stellarcore.InfoResponse + info, err = s.offlineInfo(ctx) + if err != nil { + s.log.Infof("Error running offline-info: %v, removing existing storage-dir contents", err) + createNewDB = true + } else if info.Info.Ledger.Num <= 1 || uint32(info.Info.Ledger.Num) > s.from { + s.log.Infof("Unexpected LCL in Stellar-Core DB: %d (want: %d), removing existing storage-dir contents", info.Info.Ledger.Num, s.from) + createNewDB = true + } + + if createNewDB { + if err = s.dir.remove(); err != nil { + return nil, pipe{}, fmt.Errorf("error removing existing storage-dir contents: %w", err) + } + + cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOnline, true, "new-db") + if err != nil { + return nil, pipe{}, fmt.Errorf("error creating command: %w", err) + } + + if err = cmd.Run(); err != nil { + return nil, pipe{}, fmt.Errorf("error initializing core db: %w", err) + } + + // Do a quick catch-up to set the LCL in core to be our expected starting + // point. + if s.from > 2 { + cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOnline, true, "catchup", fmt.Sprintf("%d/0", s.from-1)) + } else { + cmd, err = s.coreCmdFactory.newCmd(ctx, stellarCoreRunnerModeOnline, true, "catchup", "2/0") + } + if err != nil { + return nil, pipe{}, fmt.Errorf("error creating command: %w", err) + } + + if err = cmd.Run(); err != nil { + return nil, pipe{}, fmt.Errorf("error runing stellar-core catchup: %w", err) + } + } + + cmd, err = s.coreCmdFactory.newCmd( + ctx, + stellarCoreRunnerModeOnline, + true, + "run", + "--metadata-output-stream", s.coreCmdFactory.getPipeName(), + ) + } else { + cmd, err = s.coreCmdFactory.newCmd( + ctx, + stellarCoreRunnerModeOnline, + true, + "run", + "--in-memory", + "--start-at-ledger", fmt.Sprintf("%d", s.from), + "--start-at-hash", s.hash, + "--metadata-output-stream", s.coreCmdFactory.getPipeName(), + ) + } + if err != nil { + return nil, pipe{}, fmt.Errorf("error creating command: %w", err) + } + + captiveCorePipe, err = s.coreCmdFactory.startCaptiveCore(cmd) + if err != nil { + return nil, pipe{}, fmt.Errorf("error starting `stellar-core run` subprocess: %w", err) + } + + return cmd, captiveCorePipe, nil +} diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index 57e8c1c0f9..5245051dce 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -1,33 +1,21 @@ package ledgerbackend import ( - "bufio" "context" - "encoding/json" "fmt" "io" "math/rand" - "os" - "path" - "path/filepath" - "regexp" - "runtime" - "strings" "sync" - "time" - "github.com/pkg/errors" - - "github.com/stellar/go/protocols/stellarcore" "github.com/stellar/go/support/log" ) type stellarCoreRunnerInterface interface { catchup(from, to uint32) error runFrom(from uint32, hash string) error - getMetaPipe() <-chan metaResult + getMetaPipe() (<-chan metaResult, bool) context() context.Context - getProcessExitError() (bool, error) + getProcessExitError() (error, bool) close() error } @@ -51,29 +39,33 @@ type pipe struct { File io.Closer } +type executionState struct { + cmd cmdI + workingDir workingDir + ledgerBuffer *bufferedLedgerMetaReader + pipe pipe + wg sync.WaitGroup + processExitedLock sync.RWMutex + processExited bool + processExitError error + log *log.Entry +} + type stellarCoreRunner struct { executablePath string - - started bool - cmd cmdI - wg sync.WaitGroup - ctx context.Context - cancel context.CancelFunc - ledgerBuffer *bufferedLedgerMetaReader - pipe pipe - mode stellarCoreRunnerMode + ctx context.Context + cancel context.CancelFunc systemCaller systemCaller - lock sync.Mutex - closeOnce sync.Once - processExited bool - processExitError error + stateLock sync.Mutex + state *executionState + + closeOnce sync.Once storagePath string toml *CaptiveCoreToml useDB bool - nonce string log *log.Entry } @@ -96,12 +88,8 @@ func newStellarCoreRunner(config CaptiveCoreConfig) *stellarCoreRunner { cancel: cancel, storagePath: config.StoragePath, useDB: config.UseDB, - nonce: fmt.Sprintf( - "captive-stellar-core-%x", - rand.New(rand.NewSource(time.Now().UnixNano())).Uint64(), - ), - log: config.Log, - toml: config.Toml, + log: config.Log, + toml: config.Toml, systemCaller: realSystemCaller{}, } @@ -109,356 +97,54 @@ func newStellarCoreRunner(config CaptiveCoreConfig) *stellarCoreRunner { return runner } -func (r *stellarCoreRunner) getFullStoragePath() string { - if runtime.GOOS == "windows" || r.mode == stellarCoreRunnerModeOffline { - // On Windows, first we ALWAYS append something to the base storage path, - // because we will delete the directory entirely when Horizon stops. We also - // add a random suffix in order to ensure that there aren't naming - // conflicts. - // This is done because it's impossible to send SIGINT on Windows so - // buckets can become corrupted. - // We also want to use random directories in offline mode (reingestion) - // because it's possible it's running multiple Stellar-Cores on a single - // machine. - return path.Join(r.storagePath, "captive-core-"+createRandomHexString(8)) - } else { - // Use the specified directory to store Captive Core's data: - // https://github.com/stellar/go/issues/3437 - // but be sure to re-use rather than replace it: - // https://github.com/stellar/go/issues/3631 - return path.Join(r.storagePath, "captive-core") - } -} - -func (r *stellarCoreRunner) establishStorageDirectory() error { - info, err := r.systemCaller.stat(r.storagePath) - if os.IsNotExist(err) { - innerErr := r.systemCaller.mkdirAll(r.storagePath, os.FileMode(int(0755))) // rwx|rx|rx - if innerErr != nil { - return errors.Wrap(innerErr, fmt.Sprintf( - "failed to create storage directory (%s)", r.storagePath)) - } - } else if !info.IsDir() { - return errors.New(fmt.Sprintf("%s is not a directory", r.storagePath)) - } else if err != nil { - return errors.Wrap(err, fmt.Sprintf( - "error accessing storage directory (%s)", r.storagePath)) - } - - return nil -} - -func (r *stellarCoreRunner) writeConf() (string, error) { - text, err := generateConfig(r.toml, r.mode) - if err != nil { - return "", err - } - - return string(text), r.systemCaller.writeFile(r.getConfFileName(), text, 0644) -} - -func generateConfig(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode) ([]byte, error) { - if mode == stellarCoreRunnerModeOffline { - var err error - captiveCoreToml, err = captiveCoreToml.CatchupToml() - if err != nil { - return nil, errors.Wrap(err, "could not generate catch up config") - } - } - - if !captiveCoreToml.QuorumSetIsConfigured() { - return nil, errors.New("captive-core config file does not define any quorum set") - } - - text, err := captiveCoreToml.Marshal() - if err != nil { - return nil, errors.Wrap(err, "could not marshal captive core config") - } - return text, nil -} - -func (r *stellarCoreRunner) getConfFileName() string { - joinedPath := filepath.Join(r.storagePath, "stellar-core.conf") - - // Given that `storagePath` can be anything, we need the full, absolute path - // here so that everything Core needs is created under the storagePath - // subdirectory. - // - // If the path *can't* be absolutely resolved (bizarre), we can still try - // recovering by using the path the user specified directly. - path, err := filepath.Abs(joinedPath) - if err != nil { - r.log.Warnf("Failed to resolve %s as an absolute path: %s", joinedPath, err) - return joinedPath - } - return path -} - -type logLineWriter struct { - pipeWriter *io.PipeWriter - wg sync.WaitGroup -} - -func (l *logLineWriter) Write(p []byte) (n int, err error) { - return l.pipeWriter.Write(p) -} - -func (l *logLineWriter) Close() error { - err := l.pipeWriter.Close() - l.wg.Wait() - return err -} - -func (r *stellarCoreRunner) getLogLineWriter() *logLineWriter { - rd, wr := io.Pipe() - br := bufio.NewReader(rd) - result := &logLineWriter{ - pipeWriter: wr, - } - // Strip timestamps from log lines from captive stellar-core. We emit our own. - dateRx := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3} `) - result.wg.Add(1) - go func() { - defer result.wg.Done() - levelRx := regexp.MustCompile(`\[(\w+) ([A-Z]+)\] (.*)`) - for { - line, err := br.ReadString('\n') - if err != nil { - break - } - line = dateRx.ReplaceAllString(line, "") - line = strings.TrimSpace(line) - - if line == "" { - continue - } - - matches := levelRx.FindStringSubmatch(line) - if len(matches) >= 4 { - // Extract the substrings from the log entry and trim it - category, level := matches[1], matches[2] - line = matches[3] - - levelMapping := map[string]func(string, ...interface{}){ - "FATAL": r.log.Errorf, - "ERROR": r.log.Errorf, - "WARNING": r.log.Warnf, - "INFO": r.log.Infof, - "DEBUG": r.log.Debugf, - } - - writer := r.log.Infof - if f, ok := levelMapping[strings.ToUpper(level)]; ok { - writer = f - } - writer("%s: %s", category, line) - } else { - r.log.Info(line) - } - } - }() - return result -} - -func (r *stellarCoreRunner) offlineInfo() (stellarcore.InfoResponse, error) { - allParams := []string{"--conf", r.getConfFileName(), "offline-info"} - cmd := r.systemCaller.command(r.executablePath, allParams...) - cmd.setDir(r.storagePath) - output, err := cmd.Output() - if err != nil { - return stellarcore.InfoResponse{}, errors.Wrap(err, "error executing offline-info cmd") - } - var info stellarcore.InfoResponse - err = json.Unmarshal(output, &info) - if err != nil { - return stellarcore.InfoResponse{}, errors.Wrap(err, "invalid output of offline-info cmd") - } - return info, nil -} - -func (r *stellarCoreRunner) createCmd(params ...string) (cmdI, error) { - err := r.establishStorageDirectory() - if err != nil { - return nil, err - } - - if conf, err := r.writeConf(); err != nil { - return nil, errors.Wrap(err, "error writing configuration") - } else { - r.log.Debugf("captive core config file contents:\n%s", conf) - } - - allParams := append([]string{"--conf", r.getConfFileName(), "--console"}, params...) - cmd := r.systemCaller.command(r.executablePath, allParams...) - cmd.setDir(r.storagePath) - cmd.setStdout(r.getLogLineWriter()) - cmd.setStderr(r.getLogLineWriter()) - return cmd, nil -} - // context returns the context.Context instance associated with the running captive core instance func (r *stellarCoreRunner) context() context.Context { return r.ctx } +// runFrom executes the run command with a starting ledger on the captive core subprocess +func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { + return r.startMetaStream(newRunFromStream(r, from, hash)) +} + // catchup executes the catchup command on the captive core subprocess func (r *stellarCoreRunner) catchup(from, to uint32) error { - r.lock.Lock() - defer r.lock.Unlock() - - // check if we have already been closed - if r.ctx.Err() != nil { - return r.ctx.Err() - } - - if r.started { - return errors.New("runner already started") - } - - r.mode = stellarCoreRunnerModeOffline - r.storagePath = r.getFullStoragePath() - - rangeArg := fmt.Sprintf("%d/%d", to, to-from+1) - params := []string{"catchup", rangeArg, "--metadata-output-stream", r.getPipeName()} - - // horizon operator has specified to use external storage for captive core ledger state - // instruct captive core invocation to not use memory, and in that case - // cc will look at DATABASE property in cfg toml for the external storage source to use. - // when using external storage of ledgers, use new-db to first set the state of - // remote db storage to genesis to purge any prior state and reset. - if r.useDB { - cmd, err := r.createCmd("new-db") - if err != nil { - return errors.Wrap(err, "error creating command") - } - if err := cmd.Run(); err != nil { - return errors.Wrap(err, "error initializing core db") - } - } else { - params = append(params, "--in-memory") - } - - var err error - r.cmd, err = r.createCmd(params...) - if err != nil { - return errors.Wrap(err, "error creating command") - } - - r.pipe, err = r.start(r.cmd) - if err != nil { - r.closeLogLineWriters(r.cmd) - return errors.Wrap(err, "error starting `stellar-core catchup` subprocess") - } - - r.started = true - r.ledgerBuffer = newBufferedLedgerMetaReader(r.pipe.Reader) - go r.ledgerBuffer.start() - - if binaryWatcher, err := newFileWatcher(r); err != nil { - r.log.Warnf("could not create captive core binary watcher: %v", err) - } else { - go binaryWatcher.loop() - } - - r.wg.Add(1) - go r.handleExit() + return r.startMetaStream(newCatchupStream(r, from, to)) +} - return nil +type metaStream interface { + getWorkingDir() workingDir + start(ctx context.Context) (cmdI, pipe, error) } -// runFrom executes the run command with a starting ledger on the captive core subprocess -func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { - r.lock.Lock() - defer r.lock.Unlock() +func (r *stellarCoreRunner) startMetaStream(stream metaStream) error { + r.stateLock.Lock() + defer r.stateLock.Unlock() // check if we have already been closed if r.ctx.Err() != nil { return r.ctx.Err() } - if r.started { - return errors.New("runner already started") + if r.state != nil { + return fmt.Errorf("runner already started") } - r.mode = stellarCoreRunnerModeOnline - r.storagePath = r.getFullStoragePath() - - var err error - - if r.useDB { - // Check if on-disk core DB exists and what's the LCL there. If not what - // we need remove storage dir and start from scratch. - removeStorageDir := false - var info stellarcore.InfoResponse - info, err = r.offlineInfo() - if err != nil { - r.log.Infof("Error running offline-info: %v, removing existing storage-dir contents", err) - removeStorageDir = true - } else if uint32(info.Info.Ledger.Num) > from { - r.log.Infof("Unexpected LCL in Stellar-Core DB: %d (want: %d), removing existing storage-dir contents", info.Info.Ledger.Num, from) - removeStorageDir = true - } - - if removeStorageDir { - if err = r.systemCaller.removeAll(r.storagePath); err != nil { - return errors.Wrap(err, "error removing existing storage-dir contents") - } - - var cmd cmdI - cmd, err = r.createCmd("new-db") - if err != nil { - return errors.Wrap(err, "error creating command") - } - - if err = cmd.Run(); err != nil { - return errors.Wrap(err, "error initializing core db") - } - - // Do a quick catch-up to set the LCL in core to be our expected starting - // point. - if from > 2 { - cmd, err = r.createCmd("catchup", fmt.Sprintf("%d/0", from-1)) - } else { - cmd, err = r.createCmd("catchup", "2/0") - } - - if err != nil { - return errors.Wrap(err, "error creating command") - } - - if err = cmd.Run(); err != nil { - return errors.Wrap(err, "error runing stellar-core catchup") - } - } - - r.cmd, err = r.createCmd( - "run", - "--metadata-output-stream", - r.getPipeName(), - ) - } else { - r.cmd, err = r.createCmd( - "run", - "--in-memory", - "--start-at-ledger", fmt.Sprintf("%d", from), - "--start-at-hash", hash, - "--metadata-output-stream", r.getPipeName(), - ) + state := &executionState{ + workingDir: stream.getWorkingDir(), + log: r.log, } + cmd, p, err := stream.start(r.ctx) if err != nil { - return errors.Wrap(err, "error creating command") + state.workingDir.cleanup(nil) + return err } - r.pipe, err = r.start(r.cmd) - if err != nil { - r.closeLogLineWriters(r.cmd) - return errors.Wrap(err, "error starting `stellar-core run` subprocess") - } - - r.started = true - r.ledgerBuffer = newBufferedLedgerMetaReader(r.pipe.Reader) - go r.ledgerBuffer.start() + state.cmd = cmd + state.pipe = p + state.ledgerBuffer = newBufferedLedgerMetaReader(state.pipe.Reader) + go state.ledgerBuffer.start() if binaryWatcher, err := newFileWatcher(r); err != nil { r.log.Warnf("could not create captive core binary watcher: %v", err) @@ -466,101 +152,78 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { go binaryWatcher.loop() } - r.wg.Add(1) - go r.handleExit() + state.wg.Add(1) + go state.handleExit() + r.state = state return nil } -func (r *stellarCoreRunner) handleExit() { - defer r.wg.Done() - - // Pattern recommended in: - // https://github.com/golang/go/blob/cacac8bdc5c93e7bc71df71981fdf32dded017bf/src/cmd/go/script_test.go#L1091-L1098 - interrupt := os.Interrupt - if runtime.GOOS == "windows" { - // Per https://golang.org/pkg/os/#Signal, “Interrupt is not implemented on - // Windows; using it with os.Process.Signal will return an error.” - // Fall back to Kill instead. - interrupt = os.Kill - } +func (r *stellarCoreRunner) getExecutionState() *executionState { + r.stateLock.Lock() + defer r.stateLock.Unlock() + return r.state +} - errc := make(chan error) - go func() { - select { - case errc <- nil: - return - case <-r.ctx.Done(): - } +func (state *executionState) handleExit() { + defer state.wg.Done() - err := r.cmd.getProcess().Signal(interrupt) - if err == nil { - err = r.ctx.Err() // Report ctx.Err() as the reason we interrupted. - } else if err.Error() == "os: process already finished" { - errc <- nil - return - } + waitErr := state.cmd.Wait() - timer := time.NewTimer(10 * time.Second) - select { - // Report ctx.Err() as the reason we interrupted the process... - case errc <- r.ctx.Err(): - timer.Stop() - return - // ...but after killDelay has elapsed, fall back to a stronger signal. - case <-timer.C: - } + // By closing the pipe file we will send an EOF to the pipe reader used by ledgerBuffer. + if err := state.pipe.File.Close(); err != nil { + state.log.WithError(err).Warn("could not close captive core write pipe") + } - // Wait still hasn't returned. - // Kill the process harder to make sure that it exits. - // - // Ignore any error: if cmd.Process has already terminated, we still - // want to send ctx.Err() (or the error from the Interrupt call) - // to properly attribute the signal that may have terminated it. - _ = r.cmd.getProcess().Kill() + state.processExitedLock.Lock() + defer state.processExitedLock.Unlock() + state.processExited = true + state.processExitError = waitErr +} - errc <- err - }() +func (state *executionState) getProcessExitError() (error, bool) { + state.processExitedLock.RLock() + defer state.processExitedLock.RUnlock() + return state.processExitError, state.processExited +} - waitErr := r.cmd.Wait() - r.closeLogLineWriters(r.cmd) +func (state *executionState) cleanup() error { + // wait for the stellar core process to terminate + state.wg.Wait() - r.lock.Lock() - defer r.lock.Unlock() + // drain meta pipe channel to make sure the ledger buffer goroutine exits + for range state.ledgerBuffer.getChannel() { - // By closing the pipe file we will send an EOF to the pipe reader used by ledgerBuffer. - // We need to do this operation with the lock to ensure that the processExitError is available - // when the ledgerBuffer channel is closed - if closeErr := r.pipe.File.Close(); closeErr != nil { - r.log.WithError(closeErr).Warn("could not close captive core write pipe") } - r.processExited = true - if interruptErr := <-errc; interruptErr != nil { - r.processExitError = interruptErr - } else { - r.processExitError = waitErr + // now it's safe to close the pipe reader + // because the ledger buffer is no longer reading from it + if err := state.pipe.Reader.Close(); err != nil { + state.log.WithError(err).Warn("could not close captive core read pipe") } -} -// closeLogLineWriters closes the go routines created by getLogLineWriter() -func (r *stellarCoreRunner) closeLogLineWriters(cmd cmdI) { - cmd.getStdout().Close() - cmd.getStderr().Close() + processExitError, _ := state.getProcessExitError() + return state.workingDir.cleanup(processExitError) } // getMetaPipe returns a channel which contains ledgers streamed from the captive core subprocess -func (r *stellarCoreRunner) getMetaPipe() <-chan metaResult { - return r.ledgerBuffer.getChannel() +func (r *stellarCoreRunner) getMetaPipe() (<-chan metaResult, bool) { + state := r.getExecutionState() + if state == nil { + return nil, false + } + return state.ledgerBuffer.getChannel(), true } // getProcessExitError returns an exit error (can be nil) of the process and a bool indicating // if the process has exited yet // getProcessExitError is thread safe -func (r *stellarCoreRunner) getProcessExitError() (bool, error) { - r.lock.Lock() - defer r.lock.Unlock() - return r.processExited, r.processExitError +func (r *stellarCoreRunner) getProcessExitError() (error, bool) { + state := r.getExecutionState() + if state == nil { + return nil, false + } + return state.getProcessExitError() } // close kills the captive core process if it is still running and performs @@ -569,43 +232,11 @@ func (r *stellarCoreRunner) getProcessExitError() (bool, error) { func (r *stellarCoreRunner) close() error { var closeError error r.closeOnce.Do(func() { - r.lock.Lock() - // we cancel the context while holding the lock in order to guarantee that - // this captive core instance cannot start once the lock is released. - // catchup() and runFrom() can only execute while holding the lock and if - // the context is canceled both catchup() and runFrom() will abort early - // without performing any side effects (e.g. state mutations). r.cancel() - r.lock.Unlock() - - // only reap captive core sub process and related go routines if we've started - // otherwise, just cleanup the temp dir - if r.started { - // wait for the stellar core process to terminate - r.wg.Wait() - - // drain meta pipe channel to make sure the ledger buffer goroutine exits - for range r.getMetaPipe() { - - } - - // now it's safe to close the pipe reader - // because the ledger buffer is no longer reading from it - r.pipe.Reader.Close() - } - - if r.mode != 0 && (runtime.GOOS == "windows" || - (r.processExitError != nil && r.processExitError != context.Canceled) || - r.mode == stellarCoreRunnerModeOffline) { - // It's impossible to send SIGINT on Windows so buckets can become - // corrupted. If we can't reuse it, then remove it. - // We also remove the storage path if there was an error terminating the - // process (files can be corrupted). - // We remove all files when reingesting to save disk space. - closeError = r.systemCaller.removeAll(r.storagePath) - return + state := r.getExecutionState() + if state != nil { + closeError = state.cleanup() } }) - return closeError } diff --git a/ingest/ledgerbackend/stellar_core_runner_test.go b/ingest/ledgerbackend/stellar_core_runner_test.go index 00cb29137b..f53cd88328 100644 --- a/ingest/ledgerbackend/stellar_core_runner_test.go +++ b/ingest/ledgerbackend/stellar_core_runner_test.go @@ -37,6 +37,7 @@ func TestCloseOffline(t *testing.T) { scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -78,6 +79,7 @@ func TestCloseOnline(t *testing.T) { scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -121,6 +123,7 @@ func TestCloseOnlineWithError(t *testing.T) { scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -141,7 +144,7 @@ func TestCloseOnlineWithError(t *testing.T) { // Wait with calling close until r.processExitError is set to Wait() error for { - _, err := runner.getProcessExitError() + err, _ := runner.getProcessExitError() if err != nil { break } @@ -175,6 +178,7 @@ func TestCloseConcurrency(t *testing.T) { scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -196,7 +200,7 @@ func TestCloseConcurrency(t *testing.T) { go func() { defer wg.Done() assert.NoError(t, runner.close()) - exited, err := runner.getProcessExitError() + err, exited := runner.getProcessExitError() assert.True(t, exited) assert.Error(t, err) }() @@ -238,12 +242,14 @@ func TestRunFromUseDBLedgersMatch(t *testing.T) { scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, "offline-info", ).Return(offlineInfoCmdMock) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -299,12 +305,14 @@ func TestRunFromUseDBLedgersBehind(t *testing.T) { scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, "offline-info", ).Return(offlineInfoCmdMock) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -360,12 +368,14 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) { scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, "offline-info", ).Return(offlineInfoCmdMock) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -373,6 +383,7 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) { "new-db", ).Return(newDBCmdMock) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, @@ -381,6 +392,7 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) { "99/0", ).Return(catchupCmdMock) scMock.On("command", + runner.ctx, "/usr/bin/stellar-core", "--conf", mock.Anything, diff --git a/services/friendbot/main.go b/services/friendbot/main.go index 22a04b0bed..22e7d4c44d 100644 --- a/services/friendbot/main.go +++ b/services/friendbot/main.go @@ -8,6 +8,7 @@ import ( "github.com/go-chi/chi" "github.com/spf13/cobra" + "github.com/stellar/go/services/friendbot/internal" "github.com/stellar/go/support/app" "github.com/stellar/go/support/config" @@ -29,6 +30,7 @@ type Config struct { BaseFee int64 `toml:"base_fee" valid:"optional"` MinionBatchSize int `toml:"minion_batch_size" valid:"optional"` SubmitTxRetriesAllowed int `toml:"submit_tx_retries_allowed" valid:"optional"` + UseCloudflareIP bool `toml:"use_cloudflare_ip" valid:"optional"` } func main() { @@ -68,7 +70,7 @@ func run(cmd *cobra.Command, args []string) { log.Error(err) os.Exit(1) } - router := initRouter(fb) + router := initRouter(cfg, fb) registerProblems() addr := fmt.Sprintf("0.0.0.0:%d", cfg.Port) @@ -84,8 +86,8 @@ func run(cmd *cobra.Command, args []string) { }) } -func initRouter(fb *internal.Bot) *chi.Mux { - mux := http.NewAPIMux(log.DefaultLogger) +func initRouter(cfg Config, fb *internal.Bot) *chi.Mux { + mux := newMux(cfg) handler := &internal.FriendbotHandler{Friendbot: fb} mux.Get("/", handler.Handle) @@ -97,6 +99,15 @@ func initRouter(fb *internal.Bot) *chi.Mux { return mux } +func newMux(cfg Config) *chi.Mux { + mux := chi.NewRouter() + // first apply XFFMiddleware so we can have the real ip in the subsequent + // middlewares + mux.Use(http.XFFMiddleware(http.XFFMiddlewareConfig{BehindCloudflare: cfg.UseCloudflareIP})) + mux.Use(http.NewAPIMux(log.DefaultLogger).Middlewares()...) + return mux +} + func registerProblems() { problem.RegisterError(sql.ErrNoRows, problem.NotFound) diff --git a/services/friendbot/router_test.go b/services/friendbot/router_test.go new file mode 100644 index 0000000000..292a3253ca --- /dev/null +++ b/services/friendbot/router_test.go @@ -0,0 +1,33 @@ +package main + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/stellar/go/support/log" +) + +func TestIPLogging(t *testing.T) { + done := log.DefaultLogger.StartTest(log.InfoLevel) + + mux := newMux(Config{UseCloudflareIP: true}) + mux.Get("/", func(w http.ResponseWriter, request *http.Request) { + w.WriteHeader(http.StatusOK) + }) + recorder := httptest.NewRecorder() + request := httptest.NewRequest("GET", "/", nil) + ipAddress := "255.128.255.128" + request.Header.Set("CF-Connecting-IP", ipAddress) + mux.ServeHTTP(recorder, request) + require.Equal(t, http.StatusOK, recorder.Code) + + logged := done() + require.Len(t, logged, 2) + require.Equal(t, "starting request", logged[0].Message) + require.Equal(t, ipAddress, logged[0].Data["ip"]) + require.Equal(t, "finished request", logged[1].Message) + require.Equal(t, ipAddress, logged[1].Data["ip"]) +} diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 6f127b9968..501ad51847 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -3,17 +3,31 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). -## Unreleased +## 2.31.0 ### Breaking Changes - Change ingestion filtering logic to store transactions if any filter matches on it. ([5303](https://github.com/stellar/go/pull/5303)) - - The previous behaviour was to store a tx only if both asset and account filters match together. So even if a tx matched an account filter but failed to match an asset filter, it would not be stored by Horizon. - + - The previous behaviour was to store a tx only if both asset and account filters match together. So even if a tx matched an account filter but failed to match an asset filter, it would not be stored by Horizon. - Captive-core configuration parameters updated to align with [stellar-core v21](https://github.com/stellar/stellar-core/issues/3811) ([5333](https://github.com/stellar/go/pull/5333)) - - BucketlistDB is now the default database for stellar-core, deprecating `EXPERIMENTAL_BUCKETLIST_DB`. - - A new mandatory parameter `DEPRECATED_SQL_LEDGER_STATE` (default: false) is required by stellar-core on its configuration toml file. if the toml provided by `CAPTIVE_CORE_CONFIG_PATH` does not have this new setting, Horizon will add it automatically, therefore, no action required. - - If using `EXPERIMENTAL_BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT` or `EXPERIMENTAL_BUCKETLIST_DB_INDEX_CUTOFF`, they have been renamed to `BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT` and `BUCKETLIST_DB_INDEX_CUTOFF` respectively. + - BucketlistDB is now the default database for stellar-core, deprecating the usage of `EXPERIMENTAL_BUCKETLIST_DB` in captive core configuration toml. + - A new mandatory parameter `DEPRECATED_SQL_LEDGER_STATE` (default: false) is required by stellar-core on its captive core configuration toml file. if the toml provided by `CAPTIVE_CORE_CONFIG_PATH` does not have this new setting, Horizon will add it automatically, therefore, no action required. + - If using `EXPERIMENTAL_BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT` or `EXPERIMENTAL_BUCKETLIST_DB_INDEX_CUTOFF` in captive core configuration toml, they must be renamed to `BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT` and `BUCKETLIST_DB_INDEX_CUTOFF` respectively. + +### Added + +- Bump XDR definitions ([5289](https://github.com/stellar/go/pull/5289)), ([5330](https://github.com/stellar/go/pull/5330)) +- Add new async transaction submission endpoint ([5188](https://github.com/stellar/go/pull/5188)) +- Add `horizon_ingest_errors_total` metric key ([5302](https://github.com/stellar/go/pull/5302)) +- Add transaction hash to txsub timeout response ([5328](https://github.com/stellar/go/pull/5328)) +- Add new captive-core flags for V1 Meta ([5309](https://github.com/stellar/go/pull/5309)) +- Add version check for protocol 21 ([5346](https://github.com/stellar/go/pull/5346)) +- Improve horizon history reaper ([5331](https://github.com/stellar/go/pull/5331)). New reaper configuration flags `REAP_FREQUENCY` - the frequency in units of ledgers for how often history is reaped. + +### Fixed + +- Fix the following ingestion error: `error preparing range: error starting prepare range: the previous Stellar-Core instance is still running` ([5307](https://github.com/stellar/go/pull/5307)) + ## 2.30.0 **This release adds support for Protocol 21** diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index 3414b1aaea..b0e1ecb9df 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -85,6 +85,8 @@ type Config struct { // If ReapFrequency is set to 2 history is reaped after ingesting every two ledgers. // etc... ReapFrequency uint + // ReapLookupTables enables the reaping of history lookup tables + ReapLookupTables bool // StaleThreshold represents the number of ledgers a history database may be // out-of-date by before horizon begins to respond with an error to history // requests. diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go index 7321d07fb2..9930fee69f 100644 --- a/services/horizon/internal/flags.go +++ b/services/horizon/internal/flags.go @@ -687,6 +687,14 @@ func Flags() (*Config, support.ConfigOptions) { return nil }, }, + { + Name: "reap-lookup-tables", + ConfigKey: &config.ReapLookupTables, + OptType: types.Bool, + FlagDefault: true, + Usage: "enables the reaping of history lookup tables.", + UsedInCommands: IngestionCommands, + }, &support.ConfigOption{ Name: "history-stale-threshold", ConfigKey: &config.StaleThreshold, diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 283cd3347a..c165faf371 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -537,9 +537,6 @@ func (r resumeState) run(s *system) (transition, error) { return retryResume(r), err } - duration = time.Since(startTime).Seconds() - s.Metrics().LedgerIngestionDuration.Observe(float64(duration)) - // Update stats metrics changeStatsMap := stats.changeStats.Map() r.addLedgerStatsMetricFromMap(s, "change", changeStatsMap) @@ -560,6 +557,13 @@ func (r resumeState) run(s *system) (transition, error) { // roll up and be reported here as part of resumeState transition addHistoryArchiveStatsMetrics(s, s.historyAdapter.GetStats()) + s.maybeVerifyState(ingestLedger, ledgerCloseMeta.BucketListHash()) + s.maybeReapHistory(ingestLedger) + s.maybeReapLookupTables(ingestLedger) + + duration = time.Since(startTime).Seconds() + s.Metrics().LedgerIngestionDuration.Observe(float64(duration)) + localLog := log.WithFields(logpkg.F{ "sequence": ingestLedger, "duration": duration, @@ -577,10 +581,6 @@ func (r resumeState) run(s *system) (transition, error) { localLog.Info("Processed ledger") - s.maybeVerifyState(ingestLedger, ledgerCloseMeta.BucketListHash()) - s.maybeReapHistory(ingestLedger) - s.maybeReapLookupTables(ingestLedger) - return resumeImmediately(ingestLedger), nil } diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 5bff414ac3..650a08b426 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -95,7 +95,7 @@ type Config struct { HistoryArchiveCaching bool DisableStateVerification bool - EnableReapLookupTables bool + ReapLookupTables bool EnableExtendedLogLedgerStats bool MaxReingestRetries int @@ -757,7 +757,7 @@ func (s *system) maybeVerifyState(lastIngestedLedger uint32, expectedBucketListH } func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { - if !s.config.EnableReapLookupTables { + if !s.config.ReapLookupTables { return } diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index 5167eb195a..feb5e13bb0 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -402,9 +402,9 @@ func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() { } func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { - s.system.config.EnableReapLookupTables = true + s.system.config.ReapLookupTables = true defer func() { - s.system.config.EnableReapLookupTables = false + s.system.config.ReapLookupTables = false }() s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once() diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 137ff26aff..c245970117 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -99,7 +99,7 @@ func initIngester(app *App) { DisableStateVerification: app.config.IngestDisableStateVerification, StateVerificationCheckpointFrequency: uint32(app.config.IngestStateVerificationCheckpointFrequency), StateVerificationTimeout: app.config.IngestStateVerificationTimeout, - EnableReapLookupTables: app.config.HistoryRetentionCount > 0, + ReapLookupTables: app.config.ReapLookupTables && app.config.HistoryRetentionCount > 0, EnableExtendedLogLedgerStats: app.config.IngestEnableExtendedLogLedgerStats, RoundingSlippageFilter: app.config.RoundingSlippageFilter, SkipTxmeta: app.config.SkipTxmeta, diff --git a/support/datastore/history_archive.go b/support/datastore/history_archive.go deleted file mode 100644 index 9fd291bac7..0000000000 --- a/support/datastore/history_archive.go +++ /dev/null @@ -1,53 +0,0 @@ -package datastore - -import ( - "context" - - log "github.com/sirupsen/logrus" - - "github.com/stellar/go/historyarchive" - "github.com/stellar/go/network" - "github.com/stellar/go/support/errors" - supportlog "github.com/stellar/go/support/log" - "github.com/stellar/go/support/storage" -) - -const ( - Pubnet = "pubnet" - Testnet = "testnet" -) - -func CreateHistoryArchiveFromNetworkName(ctx context.Context, networkName string, userAgent string, logger *supportlog.Entry) (historyarchive.ArchiveInterface, error) { - var historyArchiveUrls []string - switch networkName { - case Pubnet: - historyArchiveUrls = network.PublicNetworkhistoryArchiveURLs - case Testnet: - historyArchiveUrls = network.TestNetworkhistoryArchiveURLs - default: - return nil, errors.Errorf("Invalid network name %s", networkName) - } - - return historyarchive.NewArchivePool(historyArchiveUrls, historyarchive.ArchiveOptions{ - Logger: logger, - ConnectOptions: storage.ConnectOptions{ - UserAgent: userAgent, - Context: ctx, - }, - }) -} - -func GetLatestLedgerSequenceFromHistoryArchives(archive historyarchive.ArchiveInterface) (uint32, error) { - has, err := archive.GetRootHAS() - if err != nil { - log.Error("Error getting root HAS from archives", err) - return 0, errors.Wrap(err, "failed to retrieve the latest ledger sequence from any history archive") - } - - return has.CurrentLedger, nil -} - -func GetHistoryArchivesCheckPointFrequency() uint32 { - // this could evolve to use other sources for checkpoint freq - return historyarchive.DefaultCheckpointFrequency -} diff --git a/support/datastore/resumablemanager_test.go b/support/datastore/resumablemanager_test.go index 4616f9e4ae..4fc8738b08 100644 --- a/support/datastore/resumablemanager_test.go +++ b/support/datastore/resumablemanager_test.go @@ -282,8 +282,12 @@ func TestResumability(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { mockArchive := &historyarchive.MockArchive{} - mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: tt.latestLedger}, tt.archiveError).Once() - + mockArchive.On("GetLatestLedgerSequence").Return(tt.latestLedger, tt.archiveError).Once() + if tt.archiveError == nil { + mockArchive.On("GetCheckpointManager"). + Return(historyarchive.NewCheckpointManager( + historyarchive.DefaultCheckpointFrequency)).Once() + } mockDataStore := &MockDataStore{} tt.registerMockCalls(mockDataStore) diff --git a/support/datastore/resumeablemanager.go b/support/datastore/resumeablemanager.go index 35031d73f6..7e6b03df99 100644 --- a/support/datastore/resumeablemanager.go +++ b/support/datastore/resumeablemanager.go @@ -62,12 +62,12 @@ func (rm resumableManagerService) FindStart(ctx context.Context, start, end uint networkLatest := uint32(0) if end < 1 { var latestErr error - networkLatest, latestErr = GetLatestLedgerSequenceFromHistoryArchives(rm.archive) + networkLatest, latestErr = rm.archive.GetLatestLedgerSequence() if latestErr != nil { err := errors.Wrap(latestErr, "Resumability of requested export ledger range, was not able to get latest ledger from network") return 0, false, err } - networkLatest = networkLatest + (GetHistoryArchivesCheckPointFrequency() * 2) + networkLatest = networkLatest + (rm.archive.GetCheckpointManager().GetCheckpointFrequency() * 2) log.Infof("Resumability computed effective latest network ledger including padding of checkpoint frequency to be %d", networkLatest) if start > networkLatest { diff --git a/support/http/logging_middleware.go b/support/http/logging_middleware.go index 2cc957ac68..540dbd1243 100644 --- a/support/http/logging_middleware.go +++ b/support/http/logging_middleware.go @@ -8,6 +8,7 @@ import ( "github.com/go-chi/chi" "github.com/go-chi/chi/middleware" + "github.com/stellar/go/support/http/mutil" "github.com/stellar/go/support/log" ) @@ -136,6 +137,7 @@ func logEndOfRequest( "subsys": "http", "path": r.URL.String(), "method": r.Method, + "ip": r.RemoteAddr, "status": mw.Status(), "bytes": mw.BytesWritten(), "duration": duration, diff --git a/support/http/logging_middleware_test.go b/support/http/logging_middleware_test.go index 0e2eb45bb2..3ba4d651db 100644 --- a/support/http/logging_middleware_test.go +++ b/support/http/logging_middleware_test.go @@ -6,9 +6,10 @@ import ( "github.com/go-chi/chi" "github.com/go-chi/chi/middleware" + "github.com/stretchr/testify/assert" + "github.com/stellar/go/support/http/httptest" "github.com/stellar/go/support/log" - "github.com/stretchr/testify/assert" ) // setXFFMiddleware sets "X-Forwarded-For" header to test LoggingMiddlewareWithOptions. @@ -143,7 +144,7 @@ func TestHTTPMiddlewareWithOptions(t *testing.T) { assert.Equal(t, req1, logged[2].Data["req"]) assert.Equal(t, "/path/1234", logged[2].Data["path"]) assert.Equal(t, "/path/{value}", logged[2].Data["route"]) - assert.Equal(t, 9, len(logged[2].Data)) + assert.Equal(t, 10, len(logged[2].Data)) assert.Equal(t, "starting request", logged[3].Message) assert.Equal(t, "http", logged[3].Data["subsys"]) @@ -162,7 +163,7 @@ func TestHTTPMiddlewareWithOptions(t *testing.T) { assert.Equal(t, req2, logged[4].Data["req"]) assert.Equal(t, "/not_found", logged[4].Data["path"]) assert.Equal(t, "/not_found", logged[4].Data["route"]) - assert.Equal(t, 9, len(logged[4].Data)) + assert.Equal(t, 10, len(logged[4].Data)) assert.Equal(t, "starting request", logged[5].Message) assert.Equal(t, "http", logged[5].Data["subsys"]) @@ -181,7 +182,7 @@ func TestHTTPMiddlewareWithOptions(t *testing.T) { assert.Equal(t, req3, logged[6].Data["req"]) assert.Equal(t, "/really_not_found", logged[6].Data["path"]) assert.Equal(t, "", logged[6].Data["route"]) - assert.Equal(t, 9, len(logged[6].Data)) + assert.Equal(t, 10, len(logged[6].Data)) } } diff --git a/support/http/mux.go b/support/http/mux.go index ca041d3797..1d64f99812 100644 --- a/support/http/mux.go +++ b/support/http/mux.go @@ -4,6 +4,7 @@ import ( "github.com/go-chi/chi" "github.com/go-chi/chi/middleware" "github.com/rs/cors" + "github.com/stellar/go/support/log" )