Skip to content

Commit

Permalink
indexer service skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
boqiu committed Jun 26, 2024
1 parent 977d3be commit 8854f51
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 2 deletions.
53 changes: 53 additions & 0 deletions cmd/indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package cmd

import (
"github.com/0glabs/0g-storage-client/common/util"
"github.com/0glabs/0g-storage-client/indexer"
"github.com/0glabs/0g-storage-client/node"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var (
nodes []string
endpoint string

indexderCmd = &cobra.Command{
Use: "indexer",
Short: "Start indexer service",
Run: startIndexer,
}
)

func init() {
indexderCmd.Flags().StringSliceVar(&nodes, "nodes", nil, "Storage node URLs that separated by comma")
indexderCmd.MarkFlagRequired("nodes")
indexderCmd.Flags().StringVar(&endpoint, "endpoint", ":12345", "Indexer RPC endpoint")

rootCmd.AddCommand(indexderCmd)
}

func startIndexer(*cobra.Command, []string) {
var clients []*node.Client

for _, v := range nodes {
client, err := node.NewClient(v)
if err != nil {
logrus.WithError(err).WithField("node", v).Fatal("Failed to dail storage node")
}

clients = append(clients, client)
}

defer func() {
for _, v := range clients {
v.Close()
}
}()

api := indexer.NewIndexerApi(clients)

util.MustServeRPC(endpoint, map[string]interface{}{
api.Namespace: api,
})
}
33 changes: 33 additions & 0 deletions common/util/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package util

import (
"net"
"net/http"

"github.com/openweb3/go-rpc-provider"
"github.com/sirupsen/logrus"
)

// MustServeRPC starts RPC service until shutdown.
func MustServeRPC(endpoint string, apis map[string]interface{}) {
handler := rpc.NewServer()

for namespace, impl := range apis {
if err := handler.RegisterName(namespace, impl); err != nil {
logrus.WithError(err).WithField("namespace", namespace).Fatal("Failed to register rpc service")
}
}

httpServer := http.Server{
// "github.com/ethereum/go-ethereum/node"
// Handler: node.NewHTTPHandlerStack(handler, []string{"*"}, []string{"*"}),
Handler: handler,
}

listener, err := net.Listen("tcp", endpoint)
if err != nil {
logrus.WithError(err).WithField("endpoint", endpoint).Fatal("Failed to listen to endpoint")
}

httpServer.Serve(listener)
}
36 changes: 36 additions & 0 deletions indexer/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package indexer

import (
"github.com/0glabs/0g-storage-client/node"
"github.com/pkg/errors"
)

// Requires `indexerApi` implements the `Interface` interface.
var _ Interface = (*IndexerApi)(nil)

type IndexerApi struct {
Namespace string
nodes []*node.Client
}

func NewIndexerApi(nodes []*node.Client) *IndexerApi {
return &IndexerApi{"indexer", nodes}
}

func (api *IndexerApi) GetNodes() ([]ShardedNode, error) {
var result []ShardedNode

for _, v := range api.nodes {
config, err := v.ZeroGStorage().GetShardConfig()
if err != nil {
return nil, errors.WithMessage(err, "Failed to query shard config from storage node")
}

result = append(result, ShardedNode{
URL: v.URL(),
Config: config,
})
}

return result, nil
}
36 changes: 36 additions & 0 deletions indexer/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package indexer

import (
"context"

"github.com/openweb3/go-rpc-provider/interfaces"
providers "github.com/openweb3/go-rpc-provider/provider_wrapper"
)

// Requires `Client` implements the `Interface` interface.
var _ Interface = (*Client)(nil)

type Client struct {
interfaces.Provider
}

func NewClient(url string, option ...providers.Option) (*Client, error) {
var opt providers.Option
if len(option) > 0 {
opt = option[0]
}

provider, err := providers.NewProviderWithOption(url, opt)
if err != nil {
return nil, err
}

return &Client{
Provider: provider,
}, nil
}

func (c *Client) GetNodes() (nodes []ShardedNode, err error) {
err = c.Provider.CallContext(context.Background(), &nodes, "indexer_getNodes")
return
}
43 changes: 43 additions & 0 deletions indexer/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package indexer

import (
"math/rand"

"github.com/0glabs/0g-storage-client/node"
)

type ShardedNode struct {
URL string
Config node.ShardConfig
}

type Interface interface {
GetNodes() ([]ShardedNode, error)
}

func Select(nodes []ShardedNode, segmentIndex uint64, replica int) []ShardedNode {
var matched []ShardedNode

for _, v := range nodes {
if v.Config.HasSegment(segmentIndex) {
matched = append(matched, v)
}
}

numMatched := len(matched)
if numMatched == 0 {
return nil
}

perm := rand.Perm(numMatched)
result := make([]ShardedNode, numMatched)
for i := 0; i < numMatched; i++ {
result[i] = matched[perm[i]]
}

if replica < numMatched {
result = result[:replica]
}

return result
}
2 changes: 1 addition & 1 deletion node/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *ZeroGStorageClient) DownloadSegmentWithProof(root common.Hash, index ui
return
}

func (c *ZeroGStorageClient) GetShardConfig() (shardConfig *ShardConfig, err error) {
func (c *ZeroGStorageClient) GetShardConfig() (shardConfig ShardConfig, err error) {
err = c.provider.CallContext(context.Background(), &shardConfig, "zgs_getShardConfig")
return
}
Expand Down
4 changes: 4 additions & 0 deletions node/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,7 @@ type ShardConfig struct {
ShardId uint64 `json:"shardId"`
NumShard uint64 `json:"numShard"`
}

func (config *ShardConfig) HasSegment(segmentIndex uint64) bool {
return config.NumShard < 2 || segmentIndex%config.NumShard == config.ShardId
}
2 changes: 1 addition & 1 deletion transfer/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func getShardConfigs(clients []*node.Client) ([]*node.ShardConfig, error) {
if shardConfig.NumShard == 0 {
return nil, errors.New("NumShard is zero")
}
shardConfigs = append(shardConfigs, shardConfig)
shardConfigs = append(shardConfigs, &shardConfig)
}
return shardConfigs, nil
}
Expand Down

0 comments on commit 8854f51

Please sign in to comment.