Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor rpc client #59

Merged
merged 4 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cmd
import (
"time"

"github.com/0glabs/0g-storage-client/common/util"
"github.com/0glabs/0g-storage-client/common/rpc"
"github.com/0glabs/0g-storage-client/indexer"
"github.com/0glabs/0g-storage-client/indexer/gateway"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -79,7 +79,7 @@ func startIndexer(*cobra.Command, []string) {
Endpoint: indexerArgs.endpoint,
Nodes: indexerArgs.nodes.TrustedNodes,
MaxDownloadFileSize: indexerArgs.maxDownloadFileSize,
RPCHandler: util.MustNewRPCHandler(map[string]interface{}{
RPCHandler: rpc.MustNewHandler(map[string]interface{}{
api.Namespace: api,
}),
})
Expand Down
31 changes: 31 additions & 0 deletions common/rpc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package rpc

import (
providers "github.com/openweb3/go-rpc-provider/provider_wrapper"
)

// Client is a base class of any RPC client.
type Client struct {
*providers.MiddlewarableProvider
url string
}

// NewClient creates a new client instance.
func NewClient(url string, option ...providers.Option) (*Client, error) {
var opt providers.Option
if len(option) > 0 {
opt = option[0]
}

provider, err := providers.NewProviderWithOption(url, opt)
if err != nil {
return nil, err
}

return &Client{provider, url}, nil
}

// URL Get the RPC server URL the client connected to.
func (c *Client) URL() string {
return c.url
}
18 changes: 9 additions & 9 deletions common/util/rpc.go → common/rpc/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package rpc

import (
"net/http"
Expand All @@ -8,8 +8,8 @@ import (
"github.com/sirupsen/logrus"
)

// MustNewRPCHandler creates a http.Handler for the specified RPC apis.
func MustNewRPCHandler(apis map[string]interface{}) http.Handler {
// MustNewHandler creates a http.Handler for the specified RPC apis.
func MustNewHandler(apis map[string]interface{}) http.Handler {
handler := rpc.NewServer()

for namespace, impl := range apis {
Expand All @@ -22,8 +22,8 @@ func MustNewRPCHandler(apis map[string]interface{}) http.Handler {
return node.NewHTTPHandlerStack(handler, []string{"*"}, []string{"*"}, []byte{})
}

// MustServe starts a HTTP service util shutdown.
func MustServe(endpoint string, handler http.Handler) {
// Start starts a HTTP service util shutdown.
func Start(endpoint string, handler http.Handler) {
server := http.Server{
Addr: endpoint,
Handler: handler,
Expand All @@ -32,8 +32,8 @@ func MustServe(endpoint string, handler http.Handler) {
server.ListenAndServe()
}

// MustServeRPC starts RPC service until shutdown.
func MustServeRPC(endpoint string, apis map[string]interface{}) {
rpcHandler := MustNewRPCHandler(apis)
MustServe(endpoint, rpcHandler)
// MustServe starts RPC service until shutdown.
func MustServe(endpoint string, apis map[string]interface{}) {
rpcHandler := MustNewHandler(apis)
Start(endpoint, rpcHandler)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/google/btree v1.1.2
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/mcuadros/go-defaults v1.2.0
github.com/openweb3/go-rpc-provider v0.3.3
github.com/openweb3/go-rpc-provider v0.3.4
github.com/openweb3/web3go v0.2.9
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/openweb3/go-ethereum-hdwallet v0.1.0 h1:q1W82vIw5QVrotnzgowu63AqcO/ERD7LMr9UxEoFJIs=
github.com/openweb3/go-ethereum-hdwallet v0.1.0/go.mod h1:ISDWwl+xpbvGbAfsZKfvW+LjHGjPzmdJQXbwi/ckzUE=
github.com/openweb3/go-rpc-provider v0.3.3 h1:aNelA69cJ9pk9lo7Z8ukYz/qPyZUGNU3IF1600A8riI=
github.com/openweb3/go-rpc-provider v0.3.3/go.mod h1:jUDuGZDP5eoN3iS7ZG3qg/aRAzL9ncUMkJ/R85X4s+w=
github.com/openweb3/go-rpc-provider v0.3.4 h1:0ub6PJkTxKlgvU/yODLMTVbaSS4fMXtpXhcxDgBvwuQ=
github.com/openweb3/go-rpc-provider v0.3.4/go.mod h1:jUDuGZDP5eoN3iS7ZG3qg/aRAzL9ncUMkJ/R85X4s+w=
github.com/openweb3/go-sdk-common v0.0.0-20240627072707-f78f0155ab34 h1:qLelmviLGRleOB6A8ssljatvs6K6n1BMd3PNeozNq/E=
github.com/openweb3/go-sdk-common v0.0.0-20240627072707-f78f0155ab34/go.mod h1:YMfzbYeq1G7s6nRjcFAgYSA/Uqy5+Aa1UvL0Rbnc290=
github.com/openweb3/web3go v0.2.9 h1:RjCYVr3oBUeuPtX3Cwzz3pjg6qn91Ut3VuO/Li/XOeg=
Expand Down
31 changes: 14 additions & 17 deletions indexer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"time"

"github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/common/rpc"
"github.com/0glabs/0g-storage-client/common/shard"
"github.com/0glabs/0g-storage-client/core"
"github.com/0glabs/0g-storage-client/node"
"github.com/0glabs/0g-storage-client/transfer"
eth_common "github.com/ethereum/go-ethereum/common"
"github.com/openweb3/go-rpc-provider/interfaces"
providers "github.com/openweb3/go-rpc-provider/provider_wrapper"
"github.com/openweb3/web3go"
"github.com/pkg/errors"
Expand All @@ -28,7 +28,7 @@ var (

// Client indexer client
type Client struct {
interfaces.Provider
*rpc.Client
option IndexerClientOption
logger *logrus.Logger
}
Expand All @@ -46,34 +46,31 @@ func NewClient(url string, option ...IndexerClientOption) (*Client, error) {
opt = option[0]
}

provider, err := providers.NewProviderWithOption(url, opt.ProviderOption)
client, err := rpc.NewClient(url, opt.ProviderOption)
if err != nil {
return nil, err
}

return &Client{
Provider: provider,
option: opt,
logger: common.NewLogger(opt.LogOption),
Client: client,
option: opt,
logger: common.NewLogger(opt.LogOption),
}, nil
}

// GetNodes get node list from indexer service
func (c *Client) GetShardedNodes(ctx context.Context) (nodes ShardedNodes, err error) {
err = c.Provider.CallContext(ctx, &nodes, "indexer_getShardedNodes")
return
// GetShardedNodes get node list from indexer service
func (c *Client) GetShardedNodes(ctx context.Context) (ShardedNodes, error) {
return providers.CallContext[ShardedNodes](c, ctx, "indexer_getShardedNodes")
}

// GetNodes return storage nodes with IP location information.
func (c *Client) GetNodeLocations(ctx context.Context) (locations map[string]*IPLocation, err error) {
err = c.Provider.CallContext(ctx, &locations, "indexer_getNodeLocations")
return
// GetNodeLocations return storage nodes with IP location information.
func (c *Client) GetNodeLocations(ctx context.Context) (map[string]*IPLocation, error) {
return providers.CallContext[map[string]*IPLocation](c, ctx, "indexer_getNodeLocations")
}

// GetFileLocations return locations info of given file.
func (c *Client) GetFileLocations(ctx context.Context, root string) (locations []*shard.ShardedNode, err error) {
err = c.Provider.CallContext(ctx, &locations, "indexer_getFileLocations", root)
return
func (c *Client) GetFileLocations(ctx context.Context, root string) ([]*shard.ShardedNode, error) {
return providers.CallContext[[]*shard.ShardedNode](c, ctx, "indexer_getFileLocations", root)
}

// SelectNodes get node list from indexer service and select a subset of it, which is sufficient to store expected number of replications.
Expand Down
4 changes: 2 additions & 2 deletions indexer/gateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package gateway
import (
"net/http"

"github.com/0glabs/0g-storage-client/common/util"
"github.com/0glabs/0g-storage-client/common/rpc"
"github.com/0glabs/0g-storage-client/node"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -34,7 +34,7 @@ func MustServeWithRPC(config Config) {
router.POST("/", gin.WrapH(config.RPCHandler))
}

util.MustServe(config.Endpoint, router)
rpc.Start(config.Endpoint, router)
}

func newRouter() *gin.Engine {
Expand Down
55 changes: 22 additions & 33 deletions node/client_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,66 +33,55 @@ func NewAdminClient(url string, option ...providers.Option) (*AdminClient, error
}

// FindFile Call find_file to update file location cache
func (c *AdminClient) FindFile(ctx context.Context, txSeq uint64) (ret int, err error) {
err = c.wrapError(c.CallContext(ctx, &ret, "admin_findFile", txSeq), "admin_findFile")
return
func (c *AdminClient) FindFile(ctx context.Context, txSeq uint64) (int, error) {
return providers.CallContext[int](c, ctx, "admin_findFile", txSeq)
}

// Shutdown Call admin_shutdown to shutdown the node.
func (c *AdminClient) Shutdown(ctx context.Context) (ret int, err error) {
err = c.wrapError(c.CallContext(ctx, &ret, "admin_shutdown"), "admin_shutdown")
return
func (c *AdminClient) Shutdown(ctx context.Context) (int, error) {
return providers.CallContext[int](c, ctx, "admin_shutdown")
}

// StartSyncFile Call admin_startSyncFile to request synchronization of a file.
func (c *AdminClient) StartSyncFile(ctx context.Context, txSeq uint64) (ret int, err error) {
err = c.wrapError(c.CallContext(ctx, &ret, "admin_startSyncFile", txSeq), "admin_startSyncFile")
return
func (c *AdminClient) StartSyncFile(ctx context.Context, txSeq uint64) (int, error) {
return providers.CallContext[int](c, ctx, "admin_startSyncFile", txSeq)
}

// StartSyncChunks Call admin_startSyncChunks to request synchronization of specified chunks.
func (c *AdminClient) StartSyncChunks(ctx context.Context, txSeq, startIndex, endIndex uint64) (ret int, err error) {
err = c.wrapError(c.CallContext(ctx, &ret, "admin_startSyncChunks", txSeq, startIndex, endIndex), "admin_startSyncChunks")
return
func (c *AdminClient) StartSyncChunks(ctx context.Context, txSeq, startIndex, endIndex uint64) (int, error) {
return providers.CallContext[int](c, ctx, "admin_startSyncChunks", txSeq, startIndex, endIndex)
}

// TerminateSync Call admin_terminateSync to terminate a file sync.
func (c *AdminClient) TerminateSync(ctx context.Context, txSeq uint64) (terminated bool, err error) {
err = c.wrapError(c.CallContext(ctx, &terminated, "admin_terminateSync", txSeq), "admin_terminateSync")
return
func (c *AdminClient) TerminateSync(ctx context.Context, txSeq uint64) (bool, error) {
return providers.CallContext[bool](c, ctx, "admin_terminateSync", txSeq)
}

// GetSyncStatus Call admin_getSyncStatus to retrieve the sync status of specified file.
func (c *AdminClient) GetSyncStatus(ctx context.Context, txSeq uint64) (status string, err error) {
err = c.wrapError(c.CallContext(ctx, &status, "admin_getSyncStatus", txSeq), "admin_getSyncStatus")
return
func (c *AdminClient) GetSyncStatus(ctx context.Context, txSeq uint64) (string, error) {
return providers.CallContext[string](c, ctx, "admin_getSyncStatus", txSeq)
}

// GetSyncInfo Call admin_getSyncInfo to retrieve the sync status of specified file or all files.
func (c *AdminClient) GetSyncInfo(ctx context.Context, tx_seq ...uint64) (files map[uint64]FileSyncInfo, err error) {
func (c *AdminClient) GetSyncInfo(ctx context.Context, tx_seq ...uint64) (map[uint64]FileSyncInfo, error) {
if len(tx_seq) > 0 {
err = c.wrapError(c.CallContext(ctx, &files, "admin_getSyncInfo", tx_seq[0]), "admin_getSyncInfo")
} else {
err = c.wrapError(c.CallContext(ctx, &files, "admin_getSyncInfo"), "admin_getSyncInfo")
return providers.CallContext[map[uint64]FileSyncInfo](c, ctx, "admin_getSyncInfo", tx_seq[0])
}

return
return providers.CallContext[map[uint64]FileSyncInfo](c, ctx, "admin_getSyncInfo")
}

// GetNetworkInfo Call admin_getNetworkInfo to retrieve the network information.
func (c *AdminClient) GetNetworkInfo(ctx context.Context) (info NetworkInfo, err error) {
err = c.wrapError(c.CallContext(ctx, &info, "admin_getNetworkInfo"), "admin_getNetworkInfo")
return
func (c *AdminClient) GetNetworkInfo(ctx context.Context) (NetworkInfo, error) {
return providers.CallContext[NetworkInfo](c, ctx, "admin_getNetworkInfo")
}

// GetPeers Call admin_getPeers to retrieve all discovered network peers.
func (c *AdminClient) GetPeers(ctx context.Context) (peers map[string]*PeerInfo, err error) {
err = c.wrapError(c.CallContext(ctx, &peers, "admin_getPeers"), "admin_getPeers")
return
func (c *AdminClient) GetPeers(ctx context.Context) (map[string]*PeerInfo, error) {
return providers.CallContext[map[string]*PeerInfo](c, ctx, "admin_getPeers")
}

// getFileLocation Get file location
func (c *AdminClient) GetFileLocation(ctx context.Context, txSeq uint64, allShards bool) (locations []LocationInfo, err error) {
err = c.wrapError(c.CallContext(ctx, &locations, "admin_getFileLocation", txSeq, allShards), "admin_getFileLocation")
return
// GetFileLocation Get file location
func (c *AdminClient) GetFileLocation(ctx context.Context, txSeq uint64, allShards bool) ([]LocationInfo, error) {
return providers.CallContext[[]LocationInfo](c, ctx, "admin_getFileLocation", txSeq, allShards)
}
Loading
Loading