Skip to content

Commit

Permalink
feat: more options (#44)
Browse files Browse the repository at this point in the history
* feat: add options for rpc retry/timeout and tx nonce

* test: batch upload

* fix: nonce
  • Loading branch information
MiniFrenchBread authored Aug 17, 2024
1 parent e96480e commit 369dfb5
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 21 deletions.
7 changes: 5 additions & 2 deletions cmd/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ func download(*cobra.Command, []string) {
}

if downloadArgs.indexer != "" {
indexerClient, err := indexer.NewClient(downloadArgs.indexer, indexer.IndexerClientOption{LogOption: common.LogOption{Logger: logrus.StandardLogger()}})
indexerClient, err := indexer.NewClient(downloadArgs.indexer, indexer.IndexerClientOption{
ProviderOption: providerOption,
LogOption: common.LogOption{Logger: logrus.StandardLogger()},
})
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize indexer client")
}
Expand All @@ -68,7 +71,7 @@ func download(*cobra.Command, []string) {
return
}

nodes := node.MustNewZgsClients(downloadArgs.nodes)
nodes := node.MustNewZgsClients(downloadArgs.nodes, providerOption)

downloader, err := transfer.NewDownloader(nodes, common.LogOption{Logger: logrus.StandardLogger()})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/kv_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func kvRead(*cobra.Command, []string) {
defer cancel()
}

client := node.MustNewKvClient(kvReadArgs.node)
client := node.MustNewKvClient(kvReadArgs.node, providerOption)
defer client.Close()
kvClient := kv.NewClient(client)
streamId := common.HexToHash(kvReadArgs.streamId)
Expand Down
18 changes: 14 additions & 4 deletions cmd/kv_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ var (
finalityRequired bool
taskSize uint

fee float64
fee float64
nonce uint

timeout time.Duration
}
Expand Down Expand Up @@ -81,6 +82,7 @@ func init() {
kvWriteCmd.Flags().DurationVar(&kvWriteArgs.timeout, "timeout", 0, "cli task timeout, 0 for no timeout")

kvWriteCmd.Flags().Float64Var(&kvWriteArgs.fee, "fee", 0, "fee paid in a0gi")
kvWriteCmd.Flags().UintVar(&kvWriteArgs.nonce, "nonce", 0, "nonce of upload transaction")

rootCmd.AddCommand(kvWriteCmd)
}
Expand All @@ -93,7 +95,7 @@ func kvWrite(*cobra.Command, []string) {
defer cancel()
}

w3client := blockchain.MustNewWeb3(kvWriteArgs.url, kvWriteArgs.key)
w3client := blockchain.MustNewWeb3(kvWriteArgs.url, kvWriteArgs.key, providerOption)
defer w3client.Close()
contractAddr := common.HexToAddress(kvWriteArgs.contract)
flow, err := contract.NewFlowContract(contractAddr, w3client)
Expand All @@ -106,17 +108,25 @@ func kvWrite(*cobra.Command, []string) {
feeInA0GI := big.NewFloat(kvWriteArgs.fee)
fee, _ = feeInA0GI.Mul(feeInA0GI, big.NewFloat(1e18)).Int(nil)
}
var nonce *big.Int
if kvWriteArgs.nonce > 0 {
nonce = big.NewInt(int64(kvWriteArgs.nonce))
}
opt := transfer.UploadOption{
FinalityRequired: kvWriteArgs.finalityRequired,
TaskSize: kvWriteArgs.taskSize,
ExpectedReplica: kvWriteArgs.expectedReplica,
SkipTx: kvWriteArgs.skipTx,
Fee: fee,
Nonce: nonce,
}

var clients []*node.ZgsClient
if kvWriteArgs.indexer != "" {
indexerClient, err := indexer.NewClient(kvWriteArgs.indexer, indexer.IndexerClientOption{LogOption: zg_common.LogOption{Logger: logrus.StandardLogger()}})
indexerClient, err := indexer.NewClient(kvWriteArgs.indexer, indexer.IndexerClientOption{
ProviderOption: providerOption,
LogOption: zg_common.LogOption{Logger: logrus.StandardLogger()},
})
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize indexer client")
}
Expand All @@ -128,7 +138,7 @@ func kvWrite(*cobra.Command, []string) {
if len(kvWriteArgs.node) == 0 {
logrus.Fatal("At least one of --node and --indexer should not be empty")
}
clients = node.MustNewZgsClients(kvWriteArgs.node)
clients = node.MustNewZgsClients(kvWriteArgs.node, providerOption)
for _, client := range clients {
defer client.Close()
}
Expand Down
9 changes: 9 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package cmd
import (
"fmt"
"os"
"time"

"github.com/0glabs/0g-storage-client/common/blockchain"
"github.com/mcuadros/go-defaults"
providers "github.com/openweb3/go-rpc-provider/provider_wrapper"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
Expand All @@ -13,11 +16,14 @@ var (
logLevel string
logColorDisabled bool

providerOption providers.Option

rootCmd = &cobra.Command{
Use: "0g-storage-client",
Short: "ZeroGStorage client to interact with ZeroGStorage network",
PersistentPreRun: func(*cobra.Command, []string) {
initLog()
defaults.SetDefaults(&providerOption)
},
RunE: func(cmd *cobra.Command, args []string) error {
return cmd.Help()
Expand All @@ -31,6 +37,9 @@ func init() {
rootCmd.PersistentFlags().Uint64Var(&blockchain.CustomGasPrice, "gas-price", 0, "Custom gas price to send transaction")
rootCmd.PersistentFlags().Uint64Var(&blockchain.CustomGasLimit, "gas-limit", 0, "Custom gas limit to send transaction")
rootCmd.PersistentFlags().BoolVar(&blockchain.Web3LogEnabled, "web3-log-enabled", false, "Enable log for web3 RPC")
rootCmd.PersistentFlags().IntVar(&providerOption.RetryCount, "rpc-retry-count", 5, "Retry count for rpc request")
rootCmd.PersistentFlags().DurationVar(&providerOption.RetryInterval, "rpc-retry-interval", 5*time.Second, "Retry interval for rpc request")
rootCmd.PersistentFlags().DurationVar(&providerOption.RequestTimeout, "rpc-timeout", 30*time.Second, "Timeout for single rpc request")
}

func initLog() {
Expand Down
18 changes: 14 additions & 4 deletions cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ var (
finalityRequired bool
taskSize uint

fee float64
fee float64
nonce uint

timeout time.Duration
}
Expand Down Expand Up @@ -73,6 +74,7 @@ func init() {
uploadCmd.Flags().DurationVar(&uploadArgs.timeout, "timeout", 0, "cli task timeout, 0 for no timeout")

uploadCmd.Flags().Float64Var(&uploadArgs.fee, "fee", 0, "fee paid in a0gi")
uploadCmd.Flags().UintVar(&uploadArgs.nonce, "nonce", 0, "nonce of upload transaction")

rootCmd.AddCommand(uploadCmd)
}
Expand All @@ -85,7 +87,7 @@ func upload(*cobra.Command, []string) {
defer cancel()
}

w3client := blockchain.MustNewWeb3(uploadArgs.url, uploadArgs.key)
w3client := blockchain.MustNewWeb3(uploadArgs.url, uploadArgs.key, providerOption)
defer w3client.Close()
contractAddr := common.HexToAddress(uploadArgs.contract)
flow, err := contract.NewFlowContract(contractAddr, w3client)
Expand All @@ -97,13 +99,18 @@ func upload(*cobra.Command, []string) {
feeInA0GI := big.NewFloat(uploadArgs.fee)
fee, _ = feeInA0GI.Mul(feeInA0GI, big.NewFloat(1e18)).Int(nil)
}
var nonce *big.Int
if uploadArgs.nonce > 0 {
nonce = big.NewInt(int64(uploadArgs.nonce))
}
opt := transfer.UploadOption{
Tags: hexutil.MustDecode(uploadArgs.tags),
FinalityRequired: uploadArgs.finalityRequired,
TaskSize: uploadArgs.taskSize,
ExpectedReplica: uploadArgs.expectedReplica,
SkipTx: uploadArgs.skipTx,
Fee: fee,
Nonce: nonce,
}

file, err := core.Open(uploadArgs.file)
Expand All @@ -113,7 +120,10 @@ func upload(*cobra.Command, []string) {
defer file.Close()

if uploadArgs.indexer != "" {
indexerClient, err := indexer.NewClient(uploadArgs.indexer, indexer.IndexerClientOption{LogOption: zg_common.LogOption{Logger: logrus.StandardLogger()}})
indexerClient, err := indexer.NewClient(uploadArgs.indexer, indexer.IndexerClientOption{
ProviderOption: providerOption,
LogOption: zg_common.LogOption{Logger: logrus.StandardLogger()},
})
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize indexer client")
}
Expand All @@ -123,7 +133,7 @@ func upload(*cobra.Command, []string) {
return
}

clients := node.MustNewZgsClients(uploadArgs.node)
clients := node.MustNewZgsClients(uploadArgs.node, providerOption)
for _, client := range clients {
defer client.Close()
}
Expand Down
17 changes: 10 additions & 7 deletions common/blockchain/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/0glabs/0g-storage-client/common/util"
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/mcuadros/go-defaults"
providers "github.com/openweb3/go-rpc-provider/provider_wrapper"
"github.com/openweb3/web3go"
"github.com/openweb3/web3go/interfaces"
Expand All @@ -23,22 +24,24 @@ type RetryOption struct {
logger *logrus.Logger
}

func MustNewWeb3(url, key string) *web3go.Client {
client, err := NewWeb3(url, key)
func MustNewWeb3(url, key string, opt ...providers.Option) *web3go.Client {
client, err := NewWeb3(url, key, opt...)
if err != nil {
logrus.WithError(err).WithField("url", url).Fatal("Failed to connect to fullnode")
}

return client
}

func NewWeb3(url, key string) (*web3go.Client, error) {
func NewWeb3(url, key string, opt ...providers.Option) (*web3go.Client, error) {
sm := signers.MustNewSignerManagerByPrivateKeyStrings([]string{key})

option := new(web3go.ClientOption).
WithRetry(3, time.Second).
WithTimout(5 * time.Second).
WithSignerManager(sm)
option := new(web3go.ClientOption)
if len(opt) > 0 {
option.Option = opt[0]
}
defaults.SetDefaults(&option.Option)
option.WithSignerManager(sm)

if Web3LogEnabled {
option = option.WithLooger(logrus.StandardLogger().Out)
Expand Down
30 changes: 30 additions & 0 deletions tests/batch_upload_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os

from test_framework.test_framework import TestFramework
from config.node_config import GENESIS_PRIV_KEY
from utility.run_go_test import run_go_test

class BatchUploadTest(TestFramework):
def setup_params(self):
self.num_blockchain_nodes = 1
self.num_nodes = 1

def run_test(self):
ports = ",".join([x.rpc_url.split(":")[-1] for x in self.nodes])
self.setup_indexer(self.nodes[0].rpc_url, self.nodes[0].rpc_url, ports)
test_args = [
"go",
"run",
os.path.join(os.path.dirname(__file__), "go_tests", "batch_upload_test", "main.go"),
# arguments passed to go
GENESIS_PRIV_KEY,
self.contract.address(),
self.blockchain_nodes[0].rpc_url,
",".join([x.rpc_url for x in self.nodes]),
self.indexer_rpc_url
]
run_go_test(self.root_dir, test_args)


if __name__ == "__main__":
BatchUploadTest().main()
78 changes: 78 additions & 0 deletions tests/go_tests/batch_upload_test/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/common/blockchain"
"github.com/0glabs/0g-storage-client/common/util"
"github.com/0glabs/0g-storage-client/contract"
"github.com/0glabs/0g-storage-client/core"
"github.com/0glabs/0g-storage-client/indexer"
"github.com/0glabs/0g-storage-client/transfer"
eth_common "github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func runTest() error {
ctx := context.Background()
// load args
args := os.Args[1:]
key := args[0]
contractAddr := eth_common.HexToAddress(args[1])
chainUrl := args[2]
zgsNodeUrls := strings.Split(args[3], ",")
indexerUrl := args[4]

w3client := blockchain.MustNewWeb3(chainUrl, key)
defer w3client.Close()
flow, err := contract.NewFlowContract(contractAddr, w3client)
if err != nil {
return fmt.Errorf("failed to create flow contract")
}
// batch upload
datas := make([]core.IterableData, 10)
opts := make([]transfer.UploadOption, 10)
for i := 0; i < 10; i += 1 {
datas[i], err = core.NewDataInMemory([]byte(fmt.Sprintf("indexer_test_data_%v", i)))
if err != nil {
return errors.WithMessage(err, "failed to initialize data")
}
opts[i] = transfer.UploadOption{
FinalityRequired: true,
}
}
indexerClient, err := indexer.NewClient(indexerUrl, indexer.IndexerClientOption{LogOption: common.LogOption{Logger: logrus.StandardLogger()}})
if err != nil {
return errors.WithMessage(err, "failed to initialize indexer client")
}
_, roots, err := indexerClient.BatchUpload(ctx, flow, datas, true, opts)
if err != nil {
return errors.WithMessage(err, "failed to upload file")
}
// check file location
for _, root := range roots {
locations, err := indexerClient.GetFileLocations(ctx, root.Hex())
if err != nil {
return errors.WithMessage(err, "failed to get file locations")
}
if len(locations) != 1 {
return fmt.Errorf("unexpected file location length: %v", len(locations))
}
if locations[0].URL != zgsNodeUrls[0] {
return fmt.Errorf("unexpected file location: %v", locations[0].URL)
}
}
return nil
}

func main() {
if err := util.WaitUntil(runTest, time.Minute*3); err != nil {
logrus.WithError(err).Fatalf("batch upload test failed")
}
}
Loading

0 comments on commit 369dfb5

Please sign in to comment.