From fc25cd9ac9e10bb5091fabaa03d3025584a391ff Mon Sep 17 00:00:00 2001 From: Bo QIU <35757521+boqiu@users.noreply.github.com> Date: Tue, 10 Sep 2024 11:04:31 +0800 Subject: [PATCH] Opt indexer perf with parallel rpc execution (#54) * Refactor parallel serial option * Add util to call rpc in parallel * go mod tidy * opt indexer to query shard configs in parallel --- common/parallel/rpc.go | 92 +++++++++++++++++++++++++ common/parallel/serial.go | 55 ++++++++++----- common/parallel/serial_test.go | 2 +- core/dataflow.go | 3 +- core/flow.go | 3 +- go.mod | 4 +- indexer/node_manager.go | 120 +++++++++++++++++++-------------- transfer/download_parallel.go | 3 +- transfer/uploader.go | 5 +- 9 files changed, 210 insertions(+), 77 deletions(-) create mode 100644 common/parallel/rpc.go diff --git a/common/parallel/rpc.go b/common/parallel/rpc.go new file mode 100644 index 0000000..8a9f602 --- /dev/null +++ b/common/parallel/rpc.go @@ -0,0 +1,92 @@ +package parallel + +import ( + "context" + "time" + + "github.com/0glabs/0g-storage-client/node" + providers "github.com/openweb3/go-rpc-provider/provider_wrapper" + "github.com/sirupsen/logrus" +) + +type RpcOption struct { + Parallel SerialOption + Provider providers.Option + ReportInterval time.Duration +} + +type RpcResult[T any] struct { + Data T + Err error + Latency time.Duration +} + +type closable interface { + Close() +} + +// rpcExecutor is used for RPC execution in parallel. +type rpcExecutor[CLIENT closable, T any] struct { + option RpcOption + nodes []string + clientFactory func(string) (CLIENT, error) + rpcFunc func(CLIENT, context.Context) (T, error) + node2Results map[string]*RpcResult[T] + lastReportTime time.Time +} + +// QueryZgsRpc calls zgs RPC with given nodes in parallel. +func QueryZgsRpc[T any](ctx context.Context, nodes []string, rpcFunc func(*node.ZgsClient, context.Context) (T, error), option ...RpcOption) map[string]*RpcResult[T] { + var opt RpcOption + if len(option) > 0 { + opt = option[0] + } + + executor := rpcExecutor[*node.ZgsClient, T]{ + option: opt, + nodes: nodes, + clientFactory: func(url string) (*node.ZgsClient, error) { + return node.NewZgsClient(url, opt.Provider) + }, + rpcFunc: rpcFunc, + node2Results: make(map[string]*RpcResult[T]), + lastReportTime: time.Now(), + } + + // should not return err + Serial(ctx, &executor, len(nodes), opt.Parallel) + + return executor.node2Results +} + +func (executor *rpcExecutor[CLIENT, T]) ParallelDo(ctx context.Context, routine, task int) (interface{}, error) { + url := executor.nodes[task] + client, err := executor.clientFactory(url) + if err != nil { + return &RpcResult[T]{Err: err}, nil + } + defer client.Close() + + var result RpcResult[T] + start := time.Now() + result.Data, result.Err = executor.rpcFunc(client, ctx) + result.Latency = time.Since(start) + + return &result, nil +} + +func (executor *rpcExecutor[CLIENT, T]) ParallelCollect(result *Result) error { + node := executor.nodes[result.Task] + executor.node2Results[node] = result.Value.(*RpcResult[T]) + + if executor.option.ReportInterval > 0 && time.Since(executor.lastReportTime) > executor.option.ReportInterval { + logrus.WithFields(logrus.Fields{ + "total": len(executor.nodes), + "completed": result.Task, + }).Info("Progress update") + + executor.lastReportTime = time.Now() + } + + return nil +} diff --git a/common/parallel/serial.go b/common/parallel/serial.go index 8ff1f64..dab0044 100644 --- a/common/parallel/serial.go +++ b/common/parallel/serial.go @@ -2,27 +2,22 @@ package parallel import ( "context" + "runtime" "sync" ) -func Serial(ctx context.Context, parallelizable Interface, tasks, routines, window int) error { - if tasks == 0 { +func Serial(ctx context.Context, parallelizable Interface, tasks int, option ...SerialOption) error { + if tasks <= 0 { return nil } - if routines == 0 { - routines = 1 - } - - if routines > tasks { - routines = tasks - } - - channelLen := max(routines, window) - if window > 0 && window < routines { - window = routines + var opt SerialOption + if len(option) > 0 { + opt = option[0] } + opt.Normalize(tasks) + channelLen := max(opt.Routines, opt.Window) taskCh := make(chan int, channelLen) defer close(taskCh) resultCh := make(chan *Result, channelLen) @@ -32,12 +27,12 @@ func Serial(ctx context.Context, parallelizable Interface, tasks, routines, wind ctx, cancel := context.WithCancel(ctx) // start routines to do tasks - for i := 0; i < routines; i++ { + for i := 0; i < opt.Routines; i++ { wg.Add(1) go work(ctx, i, parallelizable, taskCh, resultCh, &wg) } - err := collect(parallelizable, taskCh, resultCh, tasks, channelLen, window > 0) + err := collect(parallelizable, taskCh, resultCh, tasks, channelLen, opt.Window > 0) // notify all routines to terminate cancel() @@ -112,3 +107,33 @@ func collect(parallelizable Interface, taskCh chan<- int, resultCh <-chan *Resul return nil } + +type SerialOption struct { + Routines int + Window int +} + +func (opt *SerialOption) Normalize(tasks int) { + // 0 < routines <= tasks + if opt.Routines == 0 { + opt.Routines = runtime.GOMAXPROCS(0) + } + + if opt.Routines > tasks { + opt.Routines = tasks + } + + // window disabled + if opt.Window == 0 { + return + } + + // routines <= window <= tasks + if opt.Window < opt.Routines { + opt.Window = opt.Routines + } + + if opt.Window > tasks { + opt.Window = tasks + } +} diff --git a/common/parallel/serial_test.go b/common/parallel/serial_test.go index 7365a96..96637fe 100644 --- a/common/parallel/serial_test.go +++ b/common/parallel/serial_test.go @@ -31,7 +31,7 @@ func TestSerial(t *testing.T) { tasks := 100 - err := Serial(context.Background(), &f, tasks, 4, 16) + err := Serial(context.Background(), &f, tasks, SerialOption{4, 16}) assert.Nil(t, err) assert.Equal(t, tasks, len(f.result)) diff --git a/core/dataflow.go b/core/dataflow.go index 81ce2ef..e1ce33c 100644 --- a/core/dataflow.go +++ b/core/dataflow.go @@ -3,7 +3,6 @@ package core import ( "context" "errors" - "runtime" "github.com/0glabs/0g-storage-client/common/parallel" "github.com/0glabs/0g-storage-client/core/merkle" @@ -47,7 +46,7 @@ func MerkleTree(data IterableData) (*merkle.Tree, error) { builder: &builder, } - err := parallel.Serial(context.Background(), initializer, NumSegmentsPadded(data), runtime.GOMAXPROCS(0), 0) + err := parallel.Serial(context.Background(), initializer, NumSegmentsPadded(data)) if err != nil { return nil, err } diff --git a/core/flow.go b/core/flow.go index b81a77d..4e9f614 100644 --- a/core/flow.go +++ b/core/flow.go @@ -4,7 +4,6 @@ import ( "context" "math" "math/big" - "runtime" "github.com/0glabs/0g-storage-client/common" "github.com/0glabs/0g-storage-client/common/parallel" @@ -115,7 +114,7 @@ func (flow *Flow) createSegmentNode(offset, batch, size int64) (*contract.Submis builder: &builder, } - err := parallel.Serial(context.Background(), initializer, int((size-1)/batch+1), runtime.GOMAXPROCS(0), 0) + err := parallel.Serial(context.Background(), initializer, int((size-1)/batch+1)) if err != nil { return nil, err } diff --git a/go.mod b/go.mod index 106534f..1a1d2d5 100644 --- a/go.mod +++ b/go.mod @@ -9,12 +9,14 @@ require ( github.com/go-playground/validator/v10 v10.22.0 github.com/google/btree v1.1.2 github.com/hashicorp/golang-lru/v2 v2.0.7 + github.com/mcuadros/go-defaults v1.2.0 github.com/openweb3/go-rpc-provider v0.3.3 github.com/openweb3/web3go v0.2.9 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.9.0 + golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 gotest.tools v2.2.0+incompatible ) @@ -85,7 +87,6 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect - github.com/mcuadros/go-defaults v1.2.0 // indirect github.com/mitchellh/mapstructure v1.4.1 // indirect github.com/mitchellh/pointerstructure v1.2.0 // indirect github.com/mmcloughlin/addchain v0.4.0 // indirect @@ -121,7 +122,6 @@ require ( github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect golang.org/x/arch v0.8.0 // indirect golang.org/x/crypto v0.25.0 // indirect - golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/net v0.27.0 // indirect golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.22.0 // indirect diff --git a/indexer/node_manager.go b/indexer/node_manager.go index 72b1117..3e70613 100644 --- a/indexer/node_manager.go +++ b/indexer/node_manager.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/0glabs/0g-storage-client/common/parallel" "github.com/0glabs/0g-storage-client/common/shard" "github.com/0glabs/0g-storage-client/common/util" "github.com/0glabs/0g-storage-client/node" @@ -20,6 +21,13 @@ var ( RequestTimeout: 3 * time.Second, } + defaultRpcOpt = parallel.RpcOption{ + Parallel: parallel.SerialOption{ + Routines: 500, + }, + Provider: defaultZgsClientOpt, + } + defaultNodeManager = NodeManager{} ) @@ -176,6 +184,7 @@ func (nm *NodeManager) discover() error { }).Debug("Succeeded to retrieve peers from storage node") var numNew int + var newPeers []string for _, v := range peers { // public ip address required @@ -200,65 +209,39 @@ func (nm *NodeManager) discover() error { continue } - // add new storage node - node, err := nm.updateNode(url) - if err != nil { - logrus.WithError(err).WithField("url", url).Debug("Failed to add new peer") - } else { - logrus.WithFields(logrus.Fields{ - "url": url, - "shard": node.Config, - "latency": node.Latency, - }).Debug("New peer discovered") - } - - numNew++ + newPeers = append(newPeers, url) break } } - if numNew > 0 { - logrus.WithField("count", numNew).Info("New peers discovered") - } - - return nil -} - -// updateNode updates the shard config of specified storage node by `url`. -func (nm *NodeManager) updateNode(url string) (*shard.ShardedNode, error) { - // query ip location at first - ip := parseIP(url) - if _, err := defaultIPLocationManager.Query(ip); err != nil { - logrus.WithError(err).WithField("ip", ip).Warn("Failed to query IP location") - } - - zgsClient, err := node.NewZgsClient(url, defaultZgsClientOpt) - if err != nil { - return nil, errors.WithMessage(err, "Failed to create zgs client") - } - defer zgsClient.Close() + result := queryShardConfigs(newPeers) + for url, rpcResult := range result { + if rpcResult.Err != nil { + logrus.WithError(rpcResult.Err).WithField("url", url).Debug("Failed to add new peer") + continue + } - start := time.Now() + nm.discovered.Store(url, &shard.ShardedNode{ + URL: url, + Config: rpcResult.Data, + Latency: rpcResult.Latency.Milliseconds(), + Since: time.Now().Unix(), + }) - config, err := zgsClient.GetShardConfig(context.Background()) - if err != nil { - return nil, errors.WithMessage(err, "Failed to retrieve shard config from storage node") - } + numNew++ - if !config.IsValid() { - return nil, errors.Errorf("Invalid shard config retrieved %v", config) + logrus.WithFields(logrus.Fields{ + "url": url, + "shard": rpcResult.Data, + "latency": rpcResult.Latency.Milliseconds(), + }).Debug("New peer discovered") } - node := &shard.ShardedNode{ - URL: url, - Config: config, - Latency: time.Since(start).Milliseconds(), - Since: time.Now().Unix(), + if numNew > 0 { + logrus.WithField("count", numNew).Info("New peers discovered") } - nm.discovered.Store(url, node) - - return node, nil + return nil } // update updates shard configs of all storage nodes. @@ -277,10 +260,18 @@ func (nm *NodeManager) update() error { start := time.Now() - for _, v := range urls { - if _, err := nm.updateNode(v); err != nil { - logrus.WithError(err).WithField("url", v).Debug("Failed to update shard config, remove from cache") - nm.discovered.Delete(v) + result := queryShardConfigs(urls) + for url, rpcResult := range result { + if rpcResult.Err == nil { + nm.discovered.Store(url, &shard.ShardedNode{ + URL: url, + Config: rpcResult.Data, + Latency: rpcResult.Latency.Milliseconds(), + Since: time.Now().Unix(), + }) + } else { + logrus.WithError(rpcResult.Err).WithField("url", url).Debug("Failed to update shard config, remove from cache") + nm.discovered.Delete(url) } } @@ -291,3 +282,28 @@ func (nm *NodeManager) update() error { return nil } + +func queryShardConfigs(nodes []string) map[string]*parallel.RpcResult[shard.ShardConfig] { + // update IP if absent + for _, v := range nodes { + ip := parseIP(v) + if _, err := defaultIPLocationManager.Query(v); err != nil { + logrus.WithError(err).WithField("ip", ip).Warn("Failed to query IP location") + } + } + + rpcFunc := func(client *node.ZgsClient, ctx context.Context) (shard.ShardConfig, error) { + config, err := client.GetShardConfig(ctx) + if err != nil { + return shard.ShardConfig{}, err + } + + if !config.IsValid() { + return shard.ShardConfig{}, errors.Errorf("Invalid shard config retrieved %v", config) + } + + return config, nil + } + + return parallel.QueryZgsRpc(context.Background(), nodes, rpcFunc, defaultRpcOpt) +} diff --git a/transfer/download_parallel.go b/transfer/download_parallel.go index 14adee0..6aa1de1 100644 --- a/transfer/download_parallel.go +++ b/transfer/download_parallel.go @@ -3,7 +3,6 @@ package transfer import ( "context" "fmt" - "runtime" "github.com/0glabs/0g-storage-client/common/parallel" "github.com/0glabs/0g-storage-client/common/shard" @@ -58,7 +57,7 @@ func newSegmentDownloader(clients []*node.ZgsClient, shardConfigs []*shard.Shard func (downloader *segmentDownloader) Download(ctx context.Context) error { numTasks := downloader.numSegments - downloader.segmentOffset - return parallel.Serial(ctx, downloader, int(numTasks), runtime.GOMAXPROCS(0), 0) + return parallel.Serial(ctx, downloader, int(numTasks)) } // ParallelDo implements the parallel.Interface interface. diff --git a/transfer/uploader.go b/transfer/uploader.go index 577a963..b70e97a 100644 --- a/transfer/uploader.go +++ b/transfer/uploader.go @@ -598,7 +598,10 @@ func (uploader *Uploader) uploadFile(ctx context.Context, data core.IterableData return err } - err = parallel.Serial(ctx, segmentUploader, len(segmentUploader.tasks), min(runtime.GOMAXPROCS(0), len(uploader.clients)*5), 0) + opt := parallel.SerialOption{ + Routines: min(runtime.GOMAXPROCS(0), len(uploader.clients)*5), + } + err = parallel.Serial(ctx, segmentUploader, len(segmentUploader.tasks), opt) if err != nil { return err }