Skip to content

Commit

Permalink
Opt indexer perf with parallel rpc execution (#54)
Browse files Browse the repository at this point in the history
* Refactor parallel serial option

* Add util to call rpc in parallel

* go mod tidy

* opt indexer to query shard configs in parallel
  • Loading branch information
boqiu authored Sep 10, 2024
1 parent a50330b commit fc25cd9
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 77 deletions.
92 changes: 92 additions & 0 deletions common/parallel/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package parallel

import (
"context"
"time"

"github.com/0glabs/0g-storage-client/node"
providers "github.com/openweb3/go-rpc-provider/provider_wrapper"
"github.com/sirupsen/logrus"
)

type RpcOption struct {
Parallel SerialOption
Provider providers.Option
ReportInterval time.Duration
}

type RpcResult[T any] struct {
Data T
Err error
Latency time.Duration
}

type closable interface {
Close()
}

// rpcExecutor is used for RPC execution in parallel.
type rpcExecutor[CLIENT closable, T any] struct {
option RpcOption
nodes []string
clientFactory func(string) (CLIENT, error)
rpcFunc func(CLIENT, context.Context) (T, error)
node2Results map[string]*RpcResult[T]
lastReportTime time.Time
}

// QueryZgsRpc calls zgs RPC with given nodes in parallel.
func QueryZgsRpc[T any](ctx context.Context, nodes []string, rpcFunc func(*node.ZgsClient, context.Context) (T, error), option ...RpcOption) map[string]*RpcResult[T] {
var opt RpcOption
if len(option) > 0 {
opt = option[0]
}

executor := rpcExecutor[*node.ZgsClient, T]{
option: opt,
nodes: nodes,
clientFactory: func(url string) (*node.ZgsClient, error) {
return node.NewZgsClient(url, opt.Provider)
},
rpcFunc: rpcFunc,
node2Results: make(map[string]*RpcResult[T]),
lastReportTime: time.Now(),
}

// should not return err
Serial(ctx, &executor, len(nodes), opt.Parallel)

return executor.node2Results
}

func (executor *rpcExecutor[CLIENT, T]) ParallelDo(ctx context.Context, routine, task int) (interface{}, error) {
url := executor.nodes[task]
client, err := executor.clientFactory(url)
if err != nil {
return &RpcResult[T]{Err: err}, nil
}
defer client.Close()

var result RpcResult[T]
start := time.Now()
result.Data, result.Err = executor.rpcFunc(client, ctx)
result.Latency = time.Since(start)

return &result, nil
}

func (executor *rpcExecutor[CLIENT, T]) ParallelCollect(result *Result) error {
node := executor.nodes[result.Task]
executor.node2Results[node] = result.Value.(*RpcResult[T])

if executor.option.ReportInterval > 0 && time.Since(executor.lastReportTime) > executor.option.ReportInterval {
logrus.WithFields(logrus.Fields{
"total": len(executor.nodes),
"completed": result.Task,
}).Info("Progress update")

executor.lastReportTime = time.Now()
}

return nil
}
55 changes: 40 additions & 15 deletions common/parallel/serial.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,22 @@ package parallel

import (
"context"
"runtime"
"sync"
)

func Serial(ctx context.Context, parallelizable Interface, tasks, routines, window int) error {
if tasks == 0 {
func Serial(ctx context.Context, parallelizable Interface, tasks int, option ...SerialOption) error {
if tasks <= 0 {
return nil
}

if routines == 0 {
routines = 1
}

if routines > tasks {
routines = tasks
}

channelLen := max(routines, window)
if window > 0 && window < routines {
window = routines
var opt SerialOption
if len(option) > 0 {
opt = option[0]
}
opt.Normalize(tasks)

channelLen := max(opt.Routines, opt.Window)
taskCh := make(chan int, channelLen)
defer close(taskCh)
resultCh := make(chan *Result, channelLen)
Expand All @@ -32,12 +27,12 @@ func Serial(ctx context.Context, parallelizable Interface, tasks, routines, wind
ctx, cancel := context.WithCancel(ctx)

// start routines to do tasks
for i := 0; i < routines; i++ {
for i := 0; i < opt.Routines; i++ {
wg.Add(1)
go work(ctx, i, parallelizable, taskCh, resultCh, &wg)
}

err := collect(parallelizable, taskCh, resultCh, tasks, channelLen, window > 0)
err := collect(parallelizable, taskCh, resultCh, tasks, channelLen, opt.Window > 0)

// notify all routines to terminate
cancel()
Expand Down Expand Up @@ -112,3 +107,33 @@ func collect(parallelizable Interface, taskCh chan<- int, resultCh <-chan *Resul

return nil
}

type SerialOption struct {
Routines int
Window int
}

func (opt *SerialOption) Normalize(tasks int) {
// 0 < routines <= tasks
if opt.Routines == 0 {
opt.Routines = runtime.GOMAXPROCS(0)
}

if opt.Routines > tasks {
opt.Routines = tasks
}

// window disabled
if opt.Window == 0 {
return
}

// routines <= window <= tasks
if opt.Window < opt.Routines {
opt.Window = opt.Routines
}

if opt.Window > tasks {
opt.Window = tasks
}
}
2 changes: 1 addition & 1 deletion common/parallel/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestSerial(t *testing.T) {

tasks := 100

err := Serial(context.Background(), &f, tasks, 4, 16)
err := Serial(context.Background(), &f, tasks, SerialOption{4, 16})
assert.Nil(t, err)
assert.Equal(t, tasks, len(f.result))

Expand Down
3 changes: 1 addition & 2 deletions core/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package core
import (
"context"
"errors"
"runtime"

"github.com/0glabs/0g-storage-client/common/parallel"
"github.com/0glabs/0g-storage-client/core/merkle"
Expand Down Expand Up @@ -47,7 +46,7 @@ func MerkleTree(data IterableData) (*merkle.Tree, error) {
builder: &builder,
}

err := parallel.Serial(context.Background(), initializer, NumSegmentsPadded(data), runtime.GOMAXPROCS(0), 0)
err := parallel.Serial(context.Background(), initializer, NumSegmentsPadded(data))
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions core/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"math"
"math/big"
"runtime"

"github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/common/parallel"
Expand Down Expand Up @@ -115,7 +114,7 @@ func (flow *Flow) createSegmentNode(offset, batch, size int64) (*contract.Submis
builder: &builder,
}

err := parallel.Serial(context.Background(), initializer, int((size-1)/batch+1), runtime.GOMAXPROCS(0), 0)
err := parallel.Serial(context.Background(), initializer, int((size-1)/batch+1))
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ require (
github.com/go-playground/validator/v10 v10.22.0
github.com/google/btree v1.1.2
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/mcuadros/go-defaults v1.2.0
github.com/openweb3/go-rpc-provider v0.3.3
github.com/openweb3/web3go v0.2.9
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.9.0
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
gotest.tools v2.2.0+incompatible
)

Expand Down Expand Up @@ -85,7 +87,6 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mcuadros/go-defaults v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/mitchellh/pointerstructure v1.2.0 // indirect
github.com/mmcloughlin/addchain v0.4.0 // indirect
Expand Down Expand Up @@ -121,7 +122,6 @@ require (
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
golang.org/x/arch v0.8.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
Expand Down
Loading

0 comments on commit fc25cd9

Please sign in to comment.