Skip to content

Commit

Permalink
test: integration test (#36)
Browse files Browse the repository at this point in the history
* feat: indexer file location cache

* fix: add check on received node list

* fix: skip-tx

* feat: indexer download cli

* fix: import

* fix: indexer

* fix: existance check

* fix: cli tags

* fix: comment

* test: integration test

* test: test

* fix: cmd

* test: test

* test: skip tx

* chore: github workflow

* fix: build_kv

* test: refactor

* test: go test

* test: fix default port range

* feat: try ports

* test: go tests

* fix: indexer_test
  • Loading branch information
MiniFrenchBread authored Jul 26, 2024
1 parent 918ece1 commit ec3a1db
Show file tree
Hide file tree
Showing 48 changed files with 8,903 additions and 73 deletions.
3 changes: 3 additions & 0 deletions .github/actions-rs/grcov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
branch: false
ignore-not-existing: true
llvm: true
12 changes: 12 additions & 0 deletions .github/actions/setup-rust/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: Setup Rust (cache & toolchain)
runs:
using: composite
steps:
- name: Install toolchain 1.75.0
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.75.0
components: rustfmt, clippy

- uses: Swatinem/rust-cache@v2
69 changes: 69 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
name: functional-test

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

env:
CARGO_TERM_COLOR: always

jobs:
test:

runs-on: ubuntu-latest

steps:
- name: Free Disk Space (Ubuntu)
uses: jlumbroso/free-disk-space@main
with:
# this might remove tools that are actually needed,
# if set to "true" but frees about 6 GB
tool-cache: false
android: true
dotnet: true
haskell: true
large-packages: false
docker-images: true
swap-storage: true

- name: Checkout sources
uses: actions/checkout@v3
with:
submodules: recursive

- name: Setup Rust (cache & toolchain)
uses: ./.github/actions/setup-rust

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.21'

- name: Build
run: go build

- name: Set up Python 3.9
uses: actions/setup-python@v4
with:
python-version: '3.9'
cache: 'pip'

- name: Install dependencies
run: |
python -m pip install --upgrade pip
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Run tests
run: |
cd tests
uname -a
python test_all.py
- name: Save logs for failures
if: failure()
uses: actions/upload-artifact@v4
with:
name: test_logs
path: /tmp/zg_client_test_*
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,8 @@

0g-storage-client
.ip-location-cache.json

# tests
/target
tests/**/__pycache__
tests/tmp/**
3 changes: 3 additions & 0 deletions cmd/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func init() {
indexerCmd.Flags().DurationVar(&indexerArgs.nodes.DiscoveryInterval, "discover-interval", 10*time.Minute, "Interval to discover peers in network")
indexerCmd.Flags().DurationVar(&indexerArgs.nodes.UpdateInterval, "update-interval", 10*time.Minute, "Interval to update shard config of discovered peers")

indexerCmd.Flags().IntSliceVar(&indexerArgs.nodes.DiscoveryPorts, "discover-ports", []int{5678}, "Ports to try for discovered nodes")

indexerCmd.Flags().StringVar(&indexerArgs.locations.CacheFile, "ip-location-cache-file", ".ip-location-cache.json", "File name to cache IP locations")
indexerCmd.Flags().DurationVar(&indexerArgs.locations.CacheWriteInterval, "ip-location-cache-interval", 10*time.Minute, "Interval to write ip locations to cache file")
indexerCmd.Flags().StringVar(&indexerArgs.locations.AccessToken, "ip-location-token", "", "Access token to retrieve IP location from ipinfo.io")
Expand All @@ -46,6 +48,7 @@ func init() {

func startIndexer(*cobra.Command, []string) {
indexerArgs.locationCache.DiscoveryNode = indexerArgs.nodes.DiscoveryNode
indexerArgs.locationCache.DiscoveryPorts = indexerArgs.nodes.DiscoveryPorts

indexer.InitDefaultIPLocationManager(indexerArgs.locations)

Expand Down
21 changes: 21 additions & 0 deletions common/util/schedule.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package util

import (
"context"
"fmt"
"time"

"github.com/sirupsen/logrus"
Expand All @@ -26,3 +28,22 @@ func ScheduleNow(action func() error, interval time.Duration, errorMessage strin

Schedule(action, interval, errorMessage)
}

// WaitUntil runs the given function within a time duration
func WaitUntil(fn func() error, timeout time.Duration) error {
ch := make(chan error, 1)

ctxTimeout, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

go func() {
ch <- fn()
}()

select {
case <-ctxTimeout.Done():
return fmt.Errorf("wait until task timeout")
case err := <-ch:
return err
}
}
100 changes: 53 additions & 47 deletions indexer/file_location_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ const defaultDiscoveredURLRetryInterval = time.Minute * 10
const defaultSuccessCallLifetime = time.Minute * 10

type FileLocationCacheConfig struct {
CacheSize int
Expiry time.Duration
DiscoveryNode string
CacheSize int
Expiry time.Duration
DiscoveryNode string
DiscoveryPorts []int
}

type successCall struct {
Expand All @@ -34,6 +35,7 @@ type FileLocationCache struct {
latestFailedCall sync.Map // url -> time.Time
latestSuccessCall sync.Map // url -> successCall
discoverNode *node.AdminClient
discoveryPorts []int
}

var defaultFileLocationCache FileLocationCache
Expand All @@ -45,6 +47,7 @@ func InitFileLocationCache(config FileLocationCacheConfig) (closable func(), err
}
}
defaultFileLocationCache.cache = expirable.NewLRU[uint64, []*shard.ShardedNode](config.CacheSize, nil, config.Expiry)
defaultFileLocationCache.discoveryPorts = config.DiscoveryPorts
return defaultFileLocationCache.close, nil
}

Expand Down Expand Up @@ -93,55 +96,58 @@ func (c *FileLocationCache) GetFileLocations(ctx context.Context, txSeq uint64)
}
logrus.Debugf("find file #%v from location cache, got %v nodes holding the file", txSeq, len(locations))
for _, location := range locations {
url := fmt.Sprintf("http://%v:5678", location.Ip)
if _, ok := selected[url]; ok {
continue
}
if val, ok := c.latestSuccessCall.Load(url); ok {
call := val.(successCall)
if time.Since(call.ts) < defaultSuccessCallLifetime {
nodes = append(nodes, call.node)
for _, port := range c.discoveryPorts {
url := fmt.Sprintf("http://%v:%v", location.Ip, port)
if _, ok := selected[url]; ok {
break
}
if val, ok := c.latestSuccessCall.Load(url); ok {
call := val.(successCall)
if time.Since(call.ts) < defaultSuccessCallLifetime {
nodes = append(nodes, call.node)
break
}
}
if val, ok := c.latestFailedCall.Load(url); ok {
if time.Since(val.(time.Time)) < defaultDiscoveredURLRetryInterval {
continue
}
}
zgsClient, err := node.NewZgsClient(url, defaultZgsClientOpt)
if err != nil {
continue
}
}
if val, ok := c.latestFailedCall.Load(url); ok {
if time.Since(val.(time.Time)) < defaultDiscoveredURLRetryInterval {
defer zgsClient.Close()
fileInfo, err := zgsClient.GetFileInfoByTxSeq(ctx, txSeq)
if err != nil {
c.latestFailedCall.Store(url, time.Now())
continue
}
if !fileInfo.Finalized {
continue
}
start := time.Now()
config, err := zgsClient.GetShardConfig(context.Background())
if err != nil {
c.latestFailedCall.Store(url, time.Now())
continue
}
if !config.IsValid() {
continue
}
call := successCall{
node: &shard.ShardedNode{
URL: url,
Config: config,
Latency: time.Since(start).Milliseconds(),
},
ts: time.Now(),
}
nodes = append(nodes, call.node)
c.latestSuccessCall.Store(url, call)
selected[url] = struct{}{}
break
}
zgsClient, err := node.NewZgsClient(url, defaultZgsClientOpt)
if err != nil {
continue
}
defer zgsClient.Close()
fileInfo, err := zgsClient.GetFileInfoByTxSeq(ctx, txSeq)
if err != nil {
c.latestFailedCall.Store(url, time.Now())
continue
}
if !fileInfo.Finalized {
continue
}
start := time.Now()
config, err := zgsClient.GetShardConfig(context.Background())
if err != nil {
c.latestFailedCall.Store(url, time.Now())
continue
}
if !config.IsValid() {
continue
}
call := successCall{
node: &shard.ShardedNode{
URL: url,
Config: config,
Latency: time.Since(start).Milliseconds(),
},
ts: time.Now(),
}
nodes = append(nodes, call.node)
c.latestSuccessCall.Store(url, call)
selected[url] = struct{}{}
}
if _, covered := shard.Select(nodes, 1); covered {
return nodes, nil
Expand Down
58 changes: 32 additions & 26 deletions indexer/node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type NodeManagerConfig struct {

DiscoveryNode string
DiscoveryInterval time.Duration
DiscoveryPorts []int

UpdateInterval time.Duration
}
Expand All @@ -36,8 +37,9 @@ type NodeManagerConfig struct {
type NodeManager struct {
trusted sync.Map // url -> *node.ZgsClient

discoverNode *node.AdminClient
discovered sync.Map // url -> *shard.ShardedNode
discoverNode *node.AdminClient
discoveryPorts []int
discovered sync.Map // url -> *shard.ShardedNode
}

// InitDefaultNodeManager initializes the default `NodeManager`.
Expand All @@ -47,6 +49,7 @@ func InitDefaultNodeManager(config NodeManagerConfig) (closable func(), err erro
return nil, errors.WithMessage(err, "Failed to create admin client to discover peers")
}
}
defaultNodeManager.discoveryPorts = config.DiscoveryPorts

if err = defaultNodeManager.AddTrustedNodes(config.TrustedNodes...); err != nil {
return nil, errors.WithMessage(err, "Failed to add trusted nodes")
Expand Down Expand Up @@ -182,31 +185,34 @@ func (nm *NodeManager) discover() error {
logrus.WithField("seenIPs", v.SeenIps).Warn("More than one seen IPs")
}

url := fmt.Sprintf("http://%v:5678", v.SeenIps[0])

// ignore trusted node
if _, ok := nm.trusted.Load(url); ok {
continue
}

// discovered already
if _, ok := nm.discovered.Load(url); ok {
continue
for _, port := range nm.discoveryPorts {
url := fmt.Sprintf("http://%v:%v", v.SeenIps[0], port)

// ignore trusted node
if _, ok := nm.trusted.Load(url); ok {
continue
}

// discovered already
if _, ok := nm.discovered.Load(url); ok {
continue
}

// add new storage node
node, err := nm.updateNode(url)
if err != nil {
logrus.WithError(err).WithField("url", url).Debug("Failed to add new peer")
} else {
logrus.WithFields(logrus.Fields{
"url": url,
"shard": node.Config,
"latency": node.Latency,
}).Debug("New peer discovered")
}

numNew++
break
}

// add new storage node
node, err := nm.updateNode(url)
if err != nil {
logrus.WithError(err).WithField("url", url).Debug("Failed to add new peer")
} else {
logrus.WithFields(logrus.Fields{
"url": url,
"shard": node.Config,
"latency": node.Latency,
}).Debug("New peer discovered")
}

numNew++
}

if numNew > 0 {
Expand Down
10 changes: 10 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
jsonrpcclient==4.0.3
pyyaml==6.0.1
pysha3==1.0.2
coincurve==18.0.0
eth-utils==3.0.0
py-ecc==7.0.0
web3==6.14.0
eth_tester
cffi==1.16.0
rtoml==0.10.0
Loading

0 comments on commit ec3a1db

Please sign in to comment.