Skip to content

Commit

Permalink
feat: cli timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
MiniFrenchBread committed Jul 26, 2024
1 parent a0854ca commit 9cb572f
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 23 deletions.
15 changes: 14 additions & 1 deletion cmd/deploy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package cmd

import (
"context"
"time"

"github.com/0glabs/0g-storage-client/common/blockchain"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -11,6 +14,7 @@ var (
url string
key string
bytecodeOrFile string
timeout time.Duration
}

deployCmd = &cobra.Command{
Expand All @@ -28,13 +32,22 @@ func init() {
deployCmd.Flags().StringVar(&deployArgs.bytecodeOrFile, "bytecode", "", "ZeroGStorage smart contract bytecode")
deployCmd.MarkFlagRequired("bytecode")

deployCmd.Flags().DurationVar(&deployArgs.timeout, "timeout", 0, "cli task timeout, 0 for no timeout")

rootCmd.AddCommand(deployCmd)
}

func deploy(*cobra.Command, []string) {
ctx := context.Background()
var cancel context.CancelFunc
if deployArgs.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, deployArgs.timeout)
defer cancel()
}

client := blockchain.MustNewWeb3(deployArgs.url, deployArgs.key)

contract, err := blockchain.Deploy(cliCtx, client, deployArgs.bytecodeOrFile)
contract, err := blockchain.Deploy(ctx, client, deployArgs.bytecodeOrFile)
if err != nil {
logrus.WithError(err).Fatal("Failed to deploy smart contract")
}
Expand Down
18 changes: 16 additions & 2 deletions cmd/download.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package cmd

import (
"context"
"time"

"github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/indexer"
"github.com/0glabs/0g-storage-client/node"
Expand All @@ -18,6 +21,8 @@ var (

root string
proof bool

timeout time.Duration
}

downloadCmd = &cobra.Command{
Expand All @@ -39,16 +44,25 @@ func init() {
downloadCmd.MarkFlagRequired("root")
downloadCmd.Flags().BoolVar(&downloadArgs.proof, "proof", false, "Whether to download with merkle proof for validation")

downloadCmd.Flags().DurationVar(&downloadArgs.timeout, "timeout", 0, "cli task timeout, 0 for no timeout")

rootCmd.AddCommand(downloadCmd)
}

func download(*cobra.Command, []string) {
ctx := context.Background()
var cancel context.CancelFunc
if downloadArgs.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, downloadArgs.timeout)
defer cancel()
}

if downloadArgs.indexer != "" {
indexerClient, err := indexer.NewClient(downloadArgs.indexer, indexer.IndexerClientOption{LogOption: common.LogOption{Logger: logrus.StandardLogger()}})
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize indexer client")
}
if err := indexerClient.Download(cliCtx, downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil {
if err := indexerClient.Download(ctx, downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil {
logrus.WithError(err).Fatal("Failed to download file from indexer")
}
return
Expand All @@ -61,7 +75,7 @@ func download(*cobra.Command, []string) {
logrus.WithError(err).Fatal("Failed to initialize downloader")
}

if err := downloader.Download(cliCtx, downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil {
if err := downloader.Download(ctx, downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil {
logrus.WithError(err).Fatal("Failed to download file")
}
}
15 changes: 14 additions & 1 deletion cmd/kv_read.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package cmd

import (
"context"
"encoding/json"
"fmt"
"math"
"time"

"github.com/0glabs/0g-storage-client/kv"
"github.com/0glabs/0g-storage-client/node"
Expand All @@ -19,6 +21,8 @@ var (
version uint64

node string

timeout time.Duration
}

kvReadCmd = &cobra.Command{
Expand All @@ -40,18 +44,27 @@ func init() {
kvReadCmd.Flags().StringVar(&kvReadArgs.node, "node", "", "kv node url")
kvReadCmd.MarkFlagRequired("node")

kvReadCmd.Flags().DurationVar(&kvReadArgs.timeout, "timeout", 0, "cli task timeout, 0 for no timeout")

rootCmd.AddCommand(kvReadCmd)
}

func kvRead(*cobra.Command, []string) {
ctx := context.Background()
var cancel context.CancelFunc
if kvReadArgs.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, kvReadArgs.timeout)
defer cancel()
}

client := node.MustNewKvClient(kvReadArgs.node)
defer client.Close()
kvClient := kv.NewClient(client)
streamId := common.HexToHash(kvReadArgs.streamId)

m := make(map[string]string)
for _, key := range kvReadArgs.keys {
val, err := kvClient.GetValue(cliCtx, streamId, []byte(key))
val, err := kvClient.GetValue(ctx, streamId, []byte(key))
if err != nil {
logrus.WithError(err).Fatalf("failed to read key %v", key)
}
Expand Down
21 changes: 16 additions & 5 deletions cmd/kv_write.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package cmd

import (
"fmt"
"context"
"math"
"time"

zg_common "github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/common/blockchain"
Expand Down Expand Up @@ -35,6 +36,8 @@ var (
skipTx bool
finalityRequired bool
taskSize uint

timeout time.Duration
}

kvWriteCmd = &cobra.Command{
Expand Down Expand Up @@ -72,10 +75,19 @@ func init() {
kvWriteCmd.Flags().BoolVar(&kvWriteArgs.finalityRequired, "finality-required", false, "Wait for file finality on nodes to kvWrite")
kvWriteCmd.Flags().UintVar(&kvWriteArgs.taskSize, "task-size", 10, "Number of segments to kvWrite in single rpc request")

kvWriteCmd.Flags().DurationVar(&kvWriteArgs.timeout, "timeout", 0, "cli task timeout, 0 for no timeout")

rootCmd.AddCommand(kvWriteCmd)
}

func kvWrite(*cobra.Command, []string) {
ctx := context.Background()
var cancel context.CancelFunc
if kvWriteArgs.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, kvWriteArgs.timeout)
defer cancel()
}

w3client := blockchain.MustNewWeb3(kvWriteArgs.url, kvWriteArgs.key)
defer w3client.Close()
contractAddr := common.HexToAddress(kvWriteArgs.contract)
Expand All @@ -97,7 +109,7 @@ func kvWrite(*cobra.Command, []string) {
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize indexer client")
}
if clients, err = indexerClient.SelectNodes(cliCtx, max(1, opt.ExpectedReplica)); err != nil {
if clients, err = indexerClient.SelectNodes(ctx, max(1, opt.ExpectedReplica)); err != nil {
logrus.WithError(err).Fatal("failed to select nodes from indexer")
}
}
Expand Down Expand Up @@ -127,9 +139,8 @@ func kvWrite(*cobra.Command, []string) {
)
}

err = batcher.Exec(cliCtx, opt)
err = batcher.Exec(ctx, opt)
if err != nil {
fmt.Println(err)
return
logrus.WithError(err).Fatal("fail to execute kv batch")
}
}
9 changes: 0 additions & 9 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package cmd

import (
"context"
"fmt"
"os"
"time"

"github.com/0glabs/0g-storage-client/common/blockchain"
"github.com/sirupsen/logrus"
Expand All @@ -14,18 +12,12 @@ import (
var (
logLevel string
logColorDisabled bool
cliTimeout time.Duration
cliCtx context.Context

rootCmd = &cobra.Command{
Use: "0g-storage-client",
Short: "ZeroGStorage client to interact with ZeroGStorage network",
PersistentPreRun: func(*cobra.Command, []string) {
initLog()
cliCtx := context.Background()
if cliTimeout > 0 {
cliCtx, _ = context.WithTimeout(cliCtx, cliTimeout)
}
},
RunE: func(cmd *cobra.Command, args []string) error {
return cmd.Help()
Expand All @@ -39,7 +31,6 @@ func init() {
rootCmd.PersistentFlags().Uint64Var(&blockchain.CustomGasPrice, "gas-price", 0, "Custom gas price to send transaction")
rootCmd.PersistentFlags().Uint64Var(&blockchain.CustomGasLimit, "gas-limit", 0, "Custom gas limit to send transaction")
rootCmd.PersistentFlags().BoolVar(&blockchain.Web3LogEnabled, "web3-log-enabled", false, "Enable log for web3 RPC")
rootCmd.PersistentFlags().DurationVar(&cliTimeout, "timeout", 0, "cli task timeout, 0 for no timeout")
}

func initLog() {
Expand Down
18 changes: 16 additions & 2 deletions cmd/upload.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package cmd

import (
"context"
"time"

zg_common "github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/common/blockchain"
"github.com/0glabs/0g-storage-client/contract"
Expand Down Expand Up @@ -31,6 +34,8 @@ var (
skipTx bool
finalityRequired bool
taskSize uint

timeout time.Duration
}

uploadCmd = &cobra.Command{
Expand Down Expand Up @@ -62,10 +67,19 @@ func init() {
uploadCmd.Flags().BoolVar(&uploadArgs.finalityRequired, "finality-required", false, "Wait for file finality on nodes to upload")
uploadCmd.Flags().UintVar(&uploadArgs.taskSize, "task-size", 10, "Number of segments to upload in single rpc request")

uploadCmd.Flags().DurationVar(&uploadArgs.timeout, "timeout", 0, "cli task timeout, 0 for no timeout")

rootCmd.AddCommand(uploadCmd)
}

func upload(*cobra.Command, []string) {
ctx := context.Background()
var cancel context.CancelFunc
if uploadArgs.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, uploadArgs.timeout)
defer cancel()
}

w3client := blockchain.MustNewWeb3(uploadArgs.url, uploadArgs.key)
defer w3client.Close()
contractAddr := common.HexToAddress(uploadArgs.contract)
Expand Down Expand Up @@ -93,7 +107,7 @@ func upload(*cobra.Command, []string) {
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize indexer client")
}
if err := indexerClient.Upload(cliCtx, flow, file, opt); err != nil {
if err := indexerClient.Upload(ctx, flow, file, opt); err != nil {
logrus.WithError(err).Fatal("Failed to upload file")
}
return
Expand All @@ -109,7 +123,7 @@ func upload(*cobra.Command, []string) {
logrus.WithError(err).Fatal("Failed to initialize uploader")
}

if err := uploader.Upload(cliCtx, file, opt); err != nil {
if err := uploader.Upload(ctx, file, opt); err != nil {
logrus.WithError(err).Fatal("Failed to upload file")
}
}
2 changes: 0 additions & 2 deletions common/blockchain/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,11 @@ func WaitForReceipt(ctx context.Context, client *web3go.Client, txHash common.Ha
opt.Interval = time.Second * 3
}

var tries uint
reminder := util.NewReminder(opt.logger, time.Minute)
for receipt == nil {
if receipt, err = client.WithContext(ctx).Eth.TransactionReceipt(txHash); err != nil {
return nil, err
}
tries++

// remind
if receipt == nil {
Expand Down
2 changes: 1 addition & 1 deletion transfer/download_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (downloader *segmentDownloader) ParallelDo(ctx context.Context, routine, ta
}
return segment, nil
}
return nil, fmt.Errorf("no storage node holds segment with index %v", segmentIndex)
return nil, fmt.Errorf("failed to download segment %v", segmentIndex)
}

// ParallelCollect implements the parallel.Interface interface.
Expand Down

0 comments on commit 9cb572f

Please sign in to comment.