Skip to content

Commit

Permalink
Refactor rpc client
Browse files Browse the repository at this point in the history
  • Loading branch information
boqiu committed Sep 19, 2024
1 parent af5b003 commit b361cbb
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 141 deletions.
4 changes: 2 additions & 2 deletions cmd/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cmd
import (
"time"

"github.com/0glabs/0g-storage-client/common/util"
"github.com/0glabs/0g-storage-client/common/rpc"
"github.com/0glabs/0g-storage-client/indexer"
"github.com/0glabs/0g-storage-client/indexer/gateway"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -79,7 +79,7 @@ func startIndexer(*cobra.Command, []string) {
Endpoint: indexerArgs.endpoint,
Nodes: indexerArgs.nodes.TrustedNodes,
MaxDownloadFileSize: indexerArgs.maxDownloadFileSize,
RPCHandler: util.MustNewRPCHandler(map[string]interface{}{
RPCHandler: rpc.MustNewHandler(map[string]interface{}{
api.Namespace: api,
}),
})
Expand Down
49 changes: 49 additions & 0 deletions common/rpc/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package rpc

import (
"context"

"github.com/openweb3/go-rpc-provider"
"github.com/openweb3/go-rpc-provider/interfaces"
)

type Request struct {
Method string
Args []any
}

type Response[T any] struct {
Data T
Error error
}

// BatchCall is a generic method to call RPC in batch.
func BatchCall[T any](provider interfaces.Provider, requests ...Request) ([]Response[T], error) {
return BatchCallContext[T](provider, context.Background(), requests...)
}

// BatchCallContext is a generic method to call RPC with context in batch.
func BatchCallContext[T any](provider interfaces.Provider, ctx context.Context, requests ...Request) ([]Response[T], error) {
batch := make([]rpc.BatchElem, 0, len(requests))
responses := make([]Response[T], len(requests))

for i, v := range requests {
batch = append(batch, rpc.BatchElem{
Method: v.Method,
Args: v.Args,
Result: &responses[i].Data,
})
}

if err := provider.BatchCallContext(ctx, batch); err != nil {
return nil, err
}

for i, v := range batch {
if v.Error != nil {
responses[i].Error = v.Error
}
}

return responses, nil
}
45 changes: 45 additions & 0 deletions common/rpc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package rpc

import (
"context"

"github.com/openweb3/go-rpc-provider/interfaces"
providers "github.com/openweb3/go-rpc-provider/provider_wrapper"
)

// Client is a base class of any RPC client.
type Client struct {
*providers.MiddlewarableProvider
url string
}

// NewClient creates a new client instance.
func NewClient(url string, option ...providers.Option) (*Client, error) {
var opt providers.Option
if len(option) > 0 {
opt = option[0]
}

provider, err := providers.NewProviderWithOption(url, opt)
if err != nil {
return nil, err
}

return &Client{providers.NewMiddlewarableProvider(provider), url}, nil
}

// URL Get the RPC server URL the client connected to.
func (c *Client) URL() string {
return c.url
}

// Call is a generic method to call RPC.
func Call[T any](provider interfaces.Provider, method string, args ...any) (result T, err error) {
return CallContext[T](provider, context.Background(), method, args...)
}

// CallContext is a generic method to call RPC with context.
func CallContext[T any](provider interfaces.Provider, ctx context.Context, method string, args ...any) (result T, err error) {
err = provider.CallContext(ctx, &result, method, args...)
return
}
20 changes: 10 additions & 10 deletions common/util/rpc.go → common/rpc/server.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package util
package rpc

import (
"net/http"
"net/rpc"

"github.com/ethereum/go-ethereum/node"
"github.com/openweb3/go-rpc-provider"
"github.com/sirupsen/logrus"
)

// MustNewRPCHandler creates a http.Handler for the specified RPC apis.
func MustNewRPCHandler(apis map[string]interface{}) http.Handler {
// MustNewHandler creates a http.Handler for the specified RPC apis.
func MustNewHandler(apis map[string]interface{}) http.Handler {
handler := rpc.NewServer()

for namespace, impl := range apis {
Expand All @@ -22,8 +22,8 @@ func MustNewRPCHandler(apis map[string]interface{}) http.Handler {
return node.NewHTTPHandlerStack(handler, []string{"*"}, []string{"*"}, []byte{})
}

// MustServe starts a HTTP service util shutdown.
func MustServe(endpoint string, handler http.Handler) {
// Start starts a HTTP service util shutdown.
func Start(endpoint string, handler http.Handler) {
server := http.Server{
Addr: endpoint,
Handler: handler,
Expand All @@ -32,8 +32,8 @@ func MustServe(endpoint string, handler http.Handler) {
server.ListenAndServe()
}

// MustServeRPC starts RPC service until shutdown.
func MustServeRPC(endpoint string, apis map[string]interface{}) {
rpcHandler := MustNewRPCHandler(apis)
MustServe(endpoint, rpcHandler)
// MustServe starts RPC service until shutdown.
func MustServe(endpoint string, apis map[string]interface{}) {
rpcHandler := MustNewHandler(apis)
Start(endpoint, rpcHandler)
}
31 changes: 14 additions & 17 deletions indexer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"time"

"github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/common/rpc"
"github.com/0glabs/0g-storage-client/common/shard"
"github.com/0glabs/0g-storage-client/core"
"github.com/0glabs/0g-storage-client/node"
"github.com/0glabs/0g-storage-client/transfer"
eth_common "github.com/ethereum/go-ethereum/common"
"github.com/openweb3/go-rpc-provider/interfaces"
providers "github.com/openweb3/go-rpc-provider/provider_wrapper"
"github.com/openweb3/web3go"
"github.com/pkg/errors"
Expand All @@ -28,7 +28,7 @@ var (

// Client indexer client
type Client struct {
interfaces.Provider
*rpc.Client
option IndexerClientOption
logger *logrus.Logger
}
Expand All @@ -46,34 +46,31 @@ func NewClient(url string, option ...IndexerClientOption) (*Client, error) {
opt = option[0]
}

provider, err := providers.NewProviderWithOption(url, opt.ProviderOption)
client, err := rpc.NewClient(url, opt.ProviderOption)
if err != nil {
return nil, err
}

return &Client{
Provider: provider,
option: opt,
logger: common.NewLogger(opt.LogOption),
Client: client,
option: opt,
logger: common.NewLogger(opt.LogOption),
}, nil
}

// GetNodes get node list from indexer service
func (c *Client) GetShardedNodes(ctx context.Context) (nodes ShardedNodes, err error) {
err = c.Provider.CallContext(ctx, &nodes, "indexer_getShardedNodes")
return
// GetShardedNodes get node list from indexer service
func (c *Client) GetShardedNodes(ctx context.Context) (ShardedNodes, error) {
return rpc.CallContext[ShardedNodes](c, ctx, "indexer_getShardedNodes")
}

// GetNodes return storage nodes with IP location information.
func (c *Client) GetNodeLocations(ctx context.Context) (locations map[string]*IPLocation, err error) {
err = c.Provider.CallContext(ctx, &locations, "indexer_getNodeLocations")
return
// GetNodeLocations return storage nodes with IP location information.
func (c *Client) GetNodeLocations(ctx context.Context) (map[string]*IPLocation, error) {
return rpc.CallContext[map[string]*IPLocation](c, ctx, "indexer_getNodeLocations")
}

// GetFileLocations return locations info of given file.
func (c *Client) GetFileLocations(ctx context.Context, root string) (locations []*shard.ShardedNode, err error) {
err = c.Provider.CallContext(ctx, &locations, "indexer_getFileLocations", root)
return
func (c *Client) GetFileLocations(ctx context.Context, root string) ([]*shard.ShardedNode, error) {
return rpc.CallContext[[]*shard.ShardedNode](c, ctx, "indexer_getFileLocations", root)
}

// SelectNodes get node list from indexer service and select a subset of it, which is sufficient to store expected number of replications.
Expand Down
4 changes: 2 additions & 2 deletions indexer/gateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package gateway
import (
"net/http"

"github.com/0glabs/0g-storage-client/common/util"
"github.com/0glabs/0g-storage-client/common/rpc"
"github.com/0glabs/0g-storage-client/node"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -34,7 +34,7 @@ func MustServeWithRPC(config Config) {
router.POST("/", gin.WrapH(config.RPCHandler))
}

util.MustServe(config.Endpoint, router)
rpc.Start(config.Endpoint, router)
}

func newRouter() *gin.Engine {
Expand Down
56 changes: 23 additions & 33 deletions node/client_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package node
import (
"context"

"github.com/0glabs/0g-storage-client/common/rpc"
providers "github.com/openweb3/go-rpc-provider/provider_wrapper"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -33,66 +34,55 @@ func NewAdminClient(url string, option ...providers.Option) (*AdminClient, error
}

// FindFile Call find_file to update file location cache
func (c *AdminClient) FindFile(ctx context.Context, txSeq uint64) (ret int, err error) {
err = c.wrapError(c.CallContext(ctx, &ret, "admin_findFile", txSeq), "admin_findFile")
return
func (c *AdminClient) FindFile(ctx context.Context, txSeq uint64) (int, error) {
return rpc.CallContext[int](c, ctx, "admin_findFile", txSeq)
}

// Shutdown Call admin_shutdown to shutdown the node.
func (c *AdminClient) Shutdown(ctx context.Context) (ret int, err error) {
err = c.wrapError(c.CallContext(ctx, &ret, "admin_shutdown"), "admin_shutdown")
return
func (c *AdminClient) Shutdown(ctx context.Context) (int, error) {
return rpc.CallContext[int](c, ctx, "admin_shutdown")
}

// StartSyncFile Call admin_startSyncFile to request synchronization of a file.
func (c *AdminClient) StartSyncFile(ctx context.Context, txSeq uint64) (ret int, err error) {
err = c.wrapError(c.CallContext(ctx, &ret, "admin_startSyncFile", txSeq), "admin_startSyncFile")
return
func (c *AdminClient) StartSyncFile(ctx context.Context, txSeq uint64) (int, error) {
return rpc.CallContext[int](c, ctx, "admin_startSyncFile", txSeq)
}

// StartSyncChunks Call admin_startSyncChunks to request synchronization of specified chunks.
func (c *AdminClient) StartSyncChunks(ctx context.Context, txSeq, startIndex, endIndex uint64) (ret int, err error) {
err = c.wrapError(c.CallContext(ctx, &ret, "admin_startSyncChunks", txSeq, startIndex, endIndex), "admin_startSyncChunks")
return
func (c *AdminClient) StartSyncChunks(ctx context.Context, txSeq, startIndex, endIndex uint64) (int, error) {
return rpc.CallContext[int](c, ctx, "admin_startSyncChunks", txSeq, startIndex, endIndex)
}

// TerminateSync Call admin_terminateSync to terminate a file sync.
func (c *AdminClient) TerminateSync(ctx context.Context, txSeq uint64) (terminated bool, err error) {
err = c.wrapError(c.CallContext(ctx, &terminated, "admin_terminateSync", txSeq), "admin_terminateSync")
return
func (c *AdminClient) TerminateSync(ctx context.Context, txSeq uint64) (bool, error) {
return rpc.CallContext[bool](c, ctx, "admin_terminateSync", txSeq)
}

// GetSyncStatus Call admin_getSyncStatus to retrieve the sync status of specified file.
func (c *AdminClient) GetSyncStatus(ctx context.Context, txSeq uint64) (status string, err error) {
err = c.wrapError(c.CallContext(ctx, &status, "admin_getSyncStatus", txSeq), "admin_getSyncStatus")
return
func (c *AdminClient) GetSyncStatus(ctx context.Context, txSeq uint64) (string, error) {
return rpc.CallContext[string](c, ctx, "admin_getSyncStatus", txSeq)
}

// GetSyncInfo Call admin_getSyncInfo to retrieve the sync status of specified file or all files.
func (c *AdminClient) GetSyncInfo(ctx context.Context, tx_seq ...uint64) (files map[uint64]FileSyncInfo, err error) {
func (c *AdminClient) GetSyncInfo(ctx context.Context, tx_seq ...uint64) (map[uint64]FileSyncInfo, error) {
if len(tx_seq) > 0 {
err = c.wrapError(c.CallContext(ctx, &files, "admin_getSyncInfo", tx_seq[0]), "admin_getSyncInfo")
} else {
err = c.wrapError(c.CallContext(ctx, &files, "admin_getSyncInfo"), "admin_getSyncInfo")
return rpc.CallContext[map[uint64]FileSyncInfo](c, ctx, "admin_getSyncInfo", tx_seq[0])
}

return
return rpc.CallContext[map[uint64]FileSyncInfo](c, ctx, "admin_getSyncInfo")
}

// GetNetworkInfo Call admin_getNetworkInfo to retrieve the network information.
func (c *AdminClient) GetNetworkInfo(ctx context.Context) (info NetworkInfo, err error) {
err = c.wrapError(c.CallContext(ctx, &info, "admin_getNetworkInfo"), "admin_getNetworkInfo")
return
func (c *AdminClient) GetNetworkInfo(ctx context.Context) (NetworkInfo, error) {
return rpc.CallContext[NetworkInfo](c, ctx, "admin_getNetworkInfo")
}

// GetPeers Call admin_getPeers to retrieve all discovered network peers.
func (c *AdminClient) GetPeers(ctx context.Context) (peers map[string]*PeerInfo, err error) {
err = c.wrapError(c.CallContext(ctx, &peers, "admin_getPeers"), "admin_getPeers")
return
func (c *AdminClient) GetPeers(ctx context.Context) (map[string]*PeerInfo, error) {
return rpc.CallContext[map[string]*PeerInfo](c, ctx, "admin_getPeers")
}

// getFileLocation Get file location
func (c *AdminClient) GetFileLocation(ctx context.Context, txSeq uint64, allShards bool) (locations []LocationInfo, err error) {
err = c.wrapError(c.CallContext(ctx, &locations, "admin_getFileLocation", txSeq, allShards), "admin_getFileLocation")
return
// GetFileLocation Get file location
func (c *AdminClient) GetFileLocation(ctx context.Context, txSeq uint64, allShards bool) ([]LocationInfo, error) {
return rpc.CallContext[[]LocationInfo](c, ctx, "admin_getFileLocation", txSeq, allShards)
}
Loading

0 comments on commit b361cbb

Please sign in to comment.