From 8854f51ebdf85ea8a410ffb920973614df9bfaca Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Wed, 26 Jun 2024 17:02:27 +0800 Subject: [PATCH] indexer service skeleton --- cmd/indexer.go | 53 ++++++++++++++++++++++++++++++++++++++++++++ common/util/rpc.go | 33 +++++++++++++++++++++++++++ indexer/api.go | 36 ++++++++++++++++++++++++++++++ indexer/client.go | 36 ++++++++++++++++++++++++++++++ indexer/types.go | 43 +++++++++++++++++++++++++++++++++++ node/client.go | 2 +- node/types.go | 4 ++++ transfer/uploader.go | 2 +- 8 files changed, 207 insertions(+), 2 deletions(-) create mode 100644 cmd/indexer.go create mode 100644 common/util/rpc.go create mode 100644 indexer/api.go create mode 100644 indexer/client.go create mode 100644 indexer/types.go diff --git a/cmd/indexer.go b/cmd/indexer.go new file mode 100644 index 0000000..cb27684 --- /dev/null +++ b/cmd/indexer.go @@ -0,0 +1,53 @@ +package cmd + +import ( + "github.com/0glabs/0g-storage-client/common/util" + "github.com/0glabs/0g-storage-client/indexer" + "github.com/0glabs/0g-storage-client/node" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var ( + nodes []string + endpoint string + + indexderCmd = &cobra.Command{ + Use: "indexer", + Short: "Start indexer service", + Run: startIndexer, + } +) + +func init() { + indexderCmd.Flags().StringSliceVar(&nodes, "nodes", nil, "Storage node URLs that separated by comma") + indexderCmd.MarkFlagRequired("nodes") + indexderCmd.Flags().StringVar(&endpoint, "endpoint", ":12345", "Indexer RPC endpoint") + + rootCmd.AddCommand(indexderCmd) +} + +func startIndexer(*cobra.Command, []string) { + var clients []*node.Client + + for _, v := range nodes { + client, err := node.NewClient(v) + if err != nil { + logrus.WithError(err).WithField("node", v).Fatal("Failed to dail storage node") + } + + clients = append(clients, client) + } + + defer func() { + for _, v := range clients { + v.Close() + } + }() + + api := indexer.NewIndexerApi(clients) + + util.MustServeRPC(endpoint, map[string]interface{}{ + api.Namespace: api, + }) +} diff --git a/common/util/rpc.go b/common/util/rpc.go new file mode 100644 index 0000000..a49517a --- /dev/null +++ b/common/util/rpc.go @@ -0,0 +1,33 @@ +package util + +import ( + "net" + "net/http" + + "github.com/openweb3/go-rpc-provider" + "github.com/sirupsen/logrus" +) + +// MustServeRPC starts RPC service until shutdown. +func MustServeRPC(endpoint string, apis map[string]interface{}) { + handler := rpc.NewServer() + + for namespace, impl := range apis { + if err := handler.RegisterName(namespace, impl); err != nil { + logrus.WithError(err).WithField("namespace", namespace).Fatal("Failed to register rpc service") + } + } + + httpServer := http.Server{ + // "github.com/ethereum/go-ethereum/node" + // Handler: node.NewHTTPHandlerStack(handler, []string{"*"}, []string{"*"}), + Handler: handler, + } + + listener, err := net.Listen("tcp", endpoint) + if err != nil { + logrus.WithError(err).WithField("endpoint", endpoint).Fatal("Failed to listen to endpoint") + } + + httpServer.Serve(listener) +} diff --git a/indexer/api.go b/indexer/api.go new file mode 100644 index 0000000..1e6cb3a --- /dev/null +++ b/indexer/api.go @@ -0,0 +1,36 @@ +package indexer + +import ( + "github.com/0glabs/0g-storage-client/node" + "github.com/pkg/errors" +) + +// Requires `indexerApi` implements the `Interface` interface. +var _ Interface = (*IndexerApi)(nil) + +type IndexerApi struct { + Namespace string + nodes []*node.Client +} + +func NewIndexerApi(nodes []*node.Client) *IndexerApi { + return &IndexerApi{"indexer", nodes} +} + +func (api *IndexerApi) GetNodes() ([]ShardedNode, error) { + var result []ShardedNode + + for _, v := range api.nodes { + config, err := v.ZeroGStorage().GetShardConfig() + if err != nil { + return nil, errors.WithMessage(err, "Failed to query shard config from storage node") + } + + result = append(result, ShardedNode{ + URL: v.URL(), + Config: config, + }) + } + + return result, nil +} diff --git a/indexer/client.go b/indexer/client.go new file mode 100644 index 0000000..f12dca8 --- /dev/null +++ b/indexer/client.go @@ -0,0 +1,36 @@ +package indexer + +import ( + "context" + + "github.com/openweb3/go-rpc-provider/interfaces" + providers "github.com/openweb3/go-rpc-provider/provider_wrapper" +) + +// Requires `Client` implements the `Interface` interface. +var _ Interface = (*Client)(nil) + +type Client struct { + interfaces.Provider +} + +func NewClient(url string, option ...providers.Option) (*Client, error) { + var opt providers.Option + if len(option) > 0 { + opt = option[0] + } + + provider, err := providers.NewProviderWithOption(url, opt) + if err != nil { + return nil, err + } + + return &Client{ + Provider: provider, + }, nil +} + +func (c *Client) GetNodes() (nodes []ShardedNode, err error) { + err = c.Provider.CallContext(context.Background(), &nodes, "indexer_getNodes") + return +} diff --git a/indexer/types.go b/indexer/types.go new file mode 100644 index 0000000..a250784 --- /dev/null +++ b/indexer/types.go @@ -0,0 +1,43 @@ +package indexer + +import ( + "math/rand" + + "github.com/0glabs/0g-storage-client/node" +) + +type ShardedNode struct { + URL string + Config node.ShardConfig +} + +type Interface interface { + GetNodes() ([]ShardedNode, error) +} + +func Select(nodes []ShardedNode, segmentIndex uint64, replica int) []ShardedNode { + var matched []ShardedNode + + for _, v := range nodes { + if v.Config.HasSegment(segmentIndex) { + matched = append(matched, v) + } + } + + numMatched := len(matched) + if numMatched == 0 { + return nil + } + + perm := rand.Perm(numMatched) + result := make([]ShardedNode, numMatched) + for i := 0; i < numMatched; i++ { + result[i] = matched[perm[i]] + } + + if replica < numMatched { + result = result[:replica] + } + + return result +} diff --git a/node/client.go b/node/client.go index 69dd174..c8bb42f 100644 --- a/node/client.go +++ b/node/client.go @@ -114,7 +114,7 @@ func (c *ZeroGStorageClient) DownloadSegmentWithProof(root common.Hash, index ui return } -func (c *ZeroGStorageClient) GetShardConfig() (shardConfig *ShardConfig, err error) { +func (c *ZeroGStorageClient) GetShardConfig() (shardConfig ShardConfig, err error) { err = c.provider.CallContext(context.Background(), &shardConfig, "zgs_getShardConfig") return } diff --git a/node/types.go b/node/types.go index 6fa7a89..05190f9 100644 --- a/node/types.go +++ b/node/types.go @@ -53,3 +53,7 @@ type ShardConfig struct { ShardId uint64 `json:"shardId"` NumShard uint64 `json:"numShard"` } + +func (config *ShardConfig) HasSegment(segmentIndex uint64) bool { + return config.NumShard < 2 || segmentIndex%config.NumShard == config.ShardId +} diff --git a/transfer/uploader.go b/transfer/uploader.go index b310607..57e740a 100644 --- a/transfer/uploader.go +++ b/transfer/uploader.go @@ -51,7 +51,7 @@ func getShardConfigs(clients []*node.Client) ([]*node.ShardConfig, error) { if shardConfig.NumShard == 0 { return nil, errors.New("NumShard is zero") } - shardConfigs = append(shardConfigs, shardConfig) + shardConfigs = append(shardConfigs, &shardConfig) } return shardConfigs, nil }