Skip to content

Commit

Permalink
Split rpc client (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
boqiu authored Jul 16, 2024
1 parent 04fb59a commit 29afc07
Show file tree
Hide file tree
Showing 26 changed files with 370 additions and 234 deletions.
2 changes: 1 addition & 1 deletion cmd/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func init() {
}

func download(*cobra.Command, []string) {
nodes := node.MustNewClients(downloadArgs.nodes)
nodes := node.MustNewZgsClients(downloadArgs.nodes)

downloader, err := transfer.NewDownloader(nodes, common.LogOption{Logger: logrus.StandardLogger()})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ func init() {
}

func startGateway(*cobra.Command, []string) {
nodes := node.MustNewClients(gatewayArgs.nodes)
nodes := node.MustNewZgsClients(gatewayArgs.nodes)
gateway.MustServeLocal(nodes)
}
4 changes: 2 additions & 2 deletions cmd/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ func init() {
}

func startIndexer(*cobra.Command, []string) {
var clients []*node.Client
var clients []*node.ZgsClient

for _, v := range nodes {
client, err := node.NewClient(v)
client, err := node.NewZgsClient(v)
if err != nil {
logrus.WithError(err).WithField("node", v).Fatal("Failed to dail storage node")
}
Expand Down
6 changes: 2 additions & 4 deletions cmd/kv_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@ func init() {

func kvRead(*cobra.Command, []string) {
ctx := context.Background()
client, err := node.NewClient(kvReadArgs.node)
if err != nil {
logrus.WithError(err).Fatal("failed to initialize kv client")
}
client := node.MustNewKvClient(kvReadArgs.node)
defer client.Close()
kvClient := kv.NewClient(client)
streamId := common.HexToHash(kvReadArgs.streamId)

Expand Down
4 changes: 2 additions & 2 deletions cmd/kv_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func kvWrite(*cobra.Command, []string) {
SkipTx: kvWriteArgs.skipTx,
}

var clients []*node.Client
var clients []*node.ZgsClient
if kvWriteArgs.indexer != "" {
indexerClient, err := indexer.NewClient(kvWriteArgs.indexer, indexer.IndexerClientOption{LogOption: zg_common.LogOption{Logger: logrus.StandardLogger()}})
if err != nil {
Expand All @@ -105,7 +105,7 @@ func kvWrite(*cobra.Command, []string) {
if len(kvWriteArgs.node) == 0 {
logrus.Fatal("At least one of --node and --indexer should not be empty")
}
clients = node.MustNewClients(kvWriteArgs.node)
clients = node.MustNewZgsClients(kvWriteArgs.node)
for _, client := range clients {
defer client.Close()
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func upload(*cobra.Command, []string) {
logrus.Fatal("At least one of --node and --indexer should not be empty")
}

clients := node.MustNewClients(uploadArgs.node)
clients := node.MustNewZgsClients(uploadArgs.node)
for _, client := range clients {
defer client.Close()
}
Expand Down
8 changes: 3 additions & 5 deletions example/kv_iterator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ const KvClientAddr = "http://127.0.0.1:6789"

func main() {
ctx := context.Background()
client, err := node.NewClient(KvClientAddr)
if err != nil {
fmt.Println(err)
return
}
client := node.MustNewKvClient(KvClientAddr)
defer client.Close()

streamId := ethCommon.HexToHash("0x000000000000000000000000000000000000000000000000000000000000f2bd")
key0 := []byte("TESTKEY")
key1 := []byte("TESTKEY2")
Expand Down
4 changes: 2 additions & 2 deletions example/kv_put/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const PrivKey = ""
const FlowContractAddr = ""

func main() {
zgsClient, err := node.NewClient(ZgsClientAddr)
zgsClient, err := node.NewZgsClient(ZgsClientAddr)
if err != nil {
fmt.Println(err)
return
Expand All @@ -31,7 +31,7 @@ func main() {
fmt.Println(err)
return
}
batcher := kv.NewBatcher(math.MaxUint64, []*node.Client{zgsClient}, flow)
batcher := kv.NewBatcher(math.MaxUint64, []*node.ZgsClient{zgsClient}, flow)
batcher.Set(common.HexToHash("0x000000000000000000000000000000000000000000000000000000000000f2bd"),
[]byte("TESTKEY0"),
[]byte{69, 70, 71, 72, 73},
Expand Down
7 changes: 2 additions & 5 deletions example/kv_query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@ const KvClientAddr = "http://127.0.0.1:6789"

func main() {
ctx := context.Background()
client, err := node.NewClient(KvClientAddr)
if err != nil {
fmt.Println(err)
return
}
client := node.MustNewKvClient(KvClientAddr)
defer client.Close()
streamId := ethCommon.HexToHash("0x000000000000000000000000000000000000000000000000000000000000f2bd")
key := []byte("TESTKEY0")
key1 := []byte("TESTKEY1")
Expand Down
6 changes: 3 additions & 3 deletions gateway/local_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func getFileStatus(c *gin.Context) (interface{}, error) {
var notFinalized bool

for _, client := range allClients {
info, err := client.ZeroGStorage().GetFileInfo(context.Background(), root)
info, err := client.GetFileInfo(context.Background(), root)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func uploadLocalFile(c *gin.Context) (interface{}, error) {
return nil, ErrValidation.WithData("node index out of bound")
}

uploader, err := transfer.NewUploader(nil, []*node.Client{allClients[input.Node]}, zg_common.LogOption{Logger: logrus.StandardLogger()})
uploader, err := transfer.NewUploader(nil, []*node.ZgsClient{allClients[input.Node]}, zg_common.LogOption{Logger: logrus.StandardLogger()})
if err != nil {
return nil, ErrValidation.WithData(err)
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func downloadFileLocal(c *gin.Context) (interface{}, error) {
return nil, ErrValidation.WithData("node index out of bound")
}

downloader, err := transfer.NewDownloader([]*node.Client{allClients[input.Node]}, zg_common.LogOption{Logger: logrus.StandardLogger()})
downloader, err := transfer.NewDownloader([]*node.ZgsClient{allClients[input.Node]}, zg_common.LogOption{Logger: logrus.StandardLogger()})
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions gateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (

const httpStatusInternalError = 600

var allClients []*node.Client
var allClients []*node.ZgsClient

func MustServeLocal(nodes []*node.Client) {
func MustServeLocal(nodes []*node.ZgsClient) {
if len(nodes) == 0 {
logrus.Fatal("storage nodes not configured")
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/sirupsen/logrus v1.8.1
github.com/spf13/cobra v1.4.0
github.com/stretchr/testify v1.7.5
gotest.tools v2.2.0+incompatible
)

require (
Expand Down Expand Up @@ -69,5 +70,4 @@ require (
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools v2.2.0+incompatible // indirect
)
6 changes: 3 additions & 3 deletions indexer/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ var _ Interface = (*IndexerApi)(nil)
// IndexerApi indexer service configuration
type IndexerApi struct {
Namespace string
nodes []*node.Client
nodes []*node.ZgsClient
}

// NewIndexerApi creates indexer service configuration
func NewIndexerApi(nodes []*node.Client) *IndexerApi {
func NewIndexerApi(nodes []*node.ZgsClient) *IndexerApi {
return &IndexerApi{"indexer", nodes}
}

Expand All @@ -27,7 +27,7 @@ func (api *IndexerApi) GetNodes(ctx context.Context) ([]shard.ShardedNode, error
var result []shard.ShardedNode

for _, v := range api.nodes {
config, err := v.ZeroGStorage().GetShardConfig(ctx)
config, err := v.GetShardConfig(ctx)
if err != nil {
return nil, errors.WithMessage(err, "Failed to query shard config from storage node")
}
Expand Down
6 changes: 3 additions & 3 deletions indexer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (c *Client) GetNodes(ctx context.Context) (nodes []shard.ShardedNode, err e
}

// SelectNodes get node list from indexer service and select a subset of it, which is sufficient to store expected number of replications.
func (c *Client) SelectNodes(ctx context.Context, expectedReplica uint) ([]*node.Client, error) {
func (c *Client) SelectNodes(ctx context.Context, expectedReplica uint) ([]*node.ZgsClient, error) {
nodes, err := c.GetNodes(ctx)
if err != nil {
return nil, err
Expand All @@ -65,9 +65,9 @@ func (c *Client) SelectNodes(ctx context.Context, expectedReplica uint) ([]*node
if !ok {
return nil, fmt.Errorf("cannot select a subset from the returned nodes that meets the replication requirement")
}
clients := make([]*node.Client, len(nodes))
clients := make([]*node.ZgsClient, len(nodes))
for i, shardedNode := range nodes {
clients[i], err = node.NewClient(shardedNode.URL, c.option.ProviderOption)
clients[i], err = node.NewZgsClient(shardedNode.URL, c.option.ProviderOption)
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("failed to initialize storage node client with %v", shardedNode.URL))
}
Expand Down
4 changes: 2 additions & 2 deletions kv/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
// Batcher struct to cache and execute KV write and access control operations.
type Batcher struct {
*streamDataBuilder
clients []*node.Client
clients []*node.ZgsClient
flow *contract.FlowContract
logger *logrus.Logger
}

// NewBatcher Initialize a new batcher. Version denotes the expected version of keys to read or write when the cached KV operations is settled on chain.
func NewBatcher(version uint64, clients []*node.Client, flow *contract.FlowContract, opts ...zg_common.LogOption) *Batcher {
func NewBatcher(version uint64, clients []*node.ZgsClient, flow *contract.FlowContract, opts ...zg_common.LogOption) *Batcher {
return &Batcher{
streamDataBuilder: newStreamDataBuilder(version),
clients: clients,
Expand Down
30 changes: 15 additions & 15 deletions kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (

// Client client to query data from 0g kv node.
type Client struct {
node *node.Client
node *node.KvClient
}

// NewClient creates a new client for kv queries.
func NewClient(node *node.Client) *Client {
func NewClient(node *node.KvClient) *Client {
return &Client{
node: node,
}
Expand Down Expand Up @@ -49,7 +49,7 @@ func (c *Client) GetValue(ctx context.Context, streamId common.Hash, key []byte,
}
for {
var seg *node.Value
seg, err = c.node.KV().GetValue(ctx, streamId, key, uint64(len(val.Data)), maxQuerySize, val.Version)
seg, err = c.node.GetValue(ctx, streamId, key, uint64(len(val.Data)), maxQuerySize, val.Version)
if err != nil {
return
}
Expand All @@ -69,60 +69,60 @@ func (c *Client) GetValue(ctx context.Context, streamId common.Hash, key []byte,

// Get returns paginated value for the specified stream key.
func (c *Client) Get(ctx context.Context, streamId common.Hash, key []byte, startIndex, length uint64, version ...uint64) (val *node.Value, err error) {
return c.node.KV().GetValue(ctx, streamId, key, startIndex, length, version...)
return c.node.GetValue(ctx, streamId, key, startIndex, length, version...)
}

// GetNext returns paginated key-value of the next key of the specified stream key.
func (c *Client) GetNext(ctx context.Context, streamId common.Hash, key []byte, startIndex, length uint64, inclusive bool, version ...uint64) (val *node.KeyValue, err error) {
return c.node.KV().GetNext(ctx, streamId, key, startIndex, length, inclusive, version...)
return c.node.GetNext(ctx, streamId, key, startIndex, length, inclusive, version...)
}

// GetPrev returns paginated key-value of the prev key of the specified stream key.
func (c *Client) GetPrev(ctx context.Context, streamId common.Hash, key []byte, startIndex, length uint64, inclusive bool, version ...uint64) (val *node.KeyValue, err error) {
return c.node.KV().GetPrev(ctx, streamId, key, startIndex, length, inclusive, version...)
return c.node.GetPrev(ctx, streamId, key, startIndex, length, inclusive, version...)
}

// GetFirst returns paginated key-value of the first key of the specified stream.
func (c *Client) GetFirst(ctx context.Context, streamId common.Hash, startIndex, length uint64, version ...uint64) (val *node.KeyValue, err error) {
return c.node.KV().GetFirst(ctx, streamId, startIndex, length, version...)
return c.node.GetFirst(ctx, streamId, startIndex, length, version...)
}

// GetLast returns paginated key-value of the first key of the specified stream.
func (c *Client) GetLast(ctx context.Context, streamId common.Hash, startIndex, length uint64, version ...uint64) (val *node.KeyValue, err error) {
return c.node.KV().GetLast(ctx, streamId, startIndex, length, version...)
return c.node.GetLast(ctx, streamId, startIndex, length, version...)
}

// GetTransactionResult query the kv replay status of a given data by sequence id.
func (c *Client) GetTransactionResult(ctx context.Context, txSeq uint64) (result string, err error) {
return c.node.KV().GetTransactionResult(ctx, txSeq)
return c.node.GetTransactionResult(ctx, txSeq)
}

// GetHoldingStreamIds query the stream ids monitered by the kv node.
func (c *Client) GetHoldingStreamIds(ctx context.Context) (streamIds []common.Hash, err error) {
return c.node.KV().GetHoldingStreamIds(ctx)
return c.node.GetHoldingStreamIds(ctx)
}

// HasWritePermission check if the account is able to write the stream.
func (c *Client) HasWritePermission(ctx context.Context, account common.Address, streamId common.Hash, key []byte, version ...uint64) (hasPermission bool, err error) {
return c.node.KV().HasWritePermission(ctx, account, streamId, key, version...)
return c.node.HasWritePermission(ctx, account, streamId, key, version...)
}

// IsAdmin check if the account is the admin of the stream.
func (c *Client) IsAdmin(ctx context.Context, account common.Address, streamId common.Hash, version ...uint64) (isAdmin bool, err error) {
return c.node.KV().IsAdmin(ctx, account, streamId, version...)
return c.node.IsAdmin(ctx, account, streamId, version...)
}

// IsSpecialKey check if the key has unique access control.
func (c *Client) IsSpecialKey(ctx context.Context, streamId common.Hash, key []byte, version ...uint64) (isSpecialKey bool, err error) {
return c.node.KV().IsSpecialKey(ctx, streamId, key, version...)
return c.node.IsSpecialKey(ctx, streamId, key, version...)
}

// IsWriterOfKey check if the account can write the special key.
func (c *Client) IsWriterOfKey(ctx context.Context, account common.Address, streamId common.Hash, key []byte, version ...uint64) (isWriter bool, err error) {
return c.node.KV().IsWriterOfKey(ctx, account, streamId, key, version...)
return c.node.IsWriterOfKey(ctx, account, streamId, key, version...)
}

// IsWriterOfStream check if the account is the writer of the stream.
func (c *Client) IsWriterOfStream(ctx context.Context, account common.Address, streamId common.Hash, version ...uint64) (isWriter bool, err error) {
return c.node.KV().IsWriterOfStream(ctx, account, streamId, version...)
return c.node.IsWriterOfStream(ctx, account, streamId, version...)
}
Loading

0 comments on commit 29afc07

Please sign in to comment.