diff --git a/cmd/download.go b/cmd/download.go index 699ccc3..77c6244 100644 --- a/cmd/download.go +++ b/cmd/download.go @@ -58,7 +58,10 @@ func download(*cobra.Command, []string) { } if downloadArgs.indexer != "" { - indexerClient, err := indexer.NewClient(downloadArgs.indexer, indexer.IndexerClientOption{LogOption: common.LogOption{Logger: logrus.StandardLogger()}}) + indexerClient, err := indexer.NewClient(downloadArgs.indexer, indexer.IndexerClientOption{ + ProviderOption: providerOption, + LogOption: common.LogOption{Logger: logrus.StandardLogger()}, + }) if err != nil { logrus.WithError(err).Fatal("Failed to initialize indexer client") } @@ -68,7 +71,7 @@ func download(*cobra.Command, []string) { return } - nodes := node.MustNewZgsClients(downloadArgs.nodes) + nodes := node.MustNewZgsClients(downloadArgs.nodes, providerOption) downloader, err := transfer.NewDownloader(nodes, common.LogOption{Logger: logrus.StandardLogger()}) if err != nil { diff --git a/cmd/kv_read.go b/cmd/kv_read.go index b9c27be..df23c20 100644 --- a/cmd/kv_read.go +++ b/cmd/kv_read.go @@ -57,7 +57,7 @@ func kvRead(*cobra.Command, []string) { defer cancel() } - client := node.MustNewKvClient(kvReadArgs.node) + client := node.MustNewKvClient(kvReadArgs.node, providerOption) defer client.Close() kvClient := kv.NewClient(client) streamId := common.HexToHash(kvReadArgs.streamId) diff --git a/cmd/kv_write.go b/cmd/kv_write.go index 1016e1d..d3c43ae 100644 --- a/cmd/kv_write.go +++ b/cmd/kv_write.go @@ -38,7 +38,8 @@ var ( finalityRequired bool taskSize uint - fee float64 + fee float64 + nonce uint timeout time.Duration } @@ -81,6 +82,7 @@ func init() { kvWriteCmd.Flags().DurationVar(&kvWriteArgs.timeout, "timeout", 0, "cli task timeout, 0 for no timeout") kvWriteCmd.Flags().Float64Var(&kvWriteArgs.fee, "fee", 0, "fee paid in a0gi") + kvWriteCmd.Flags().UintVar(&kvWriteArgs.nonce, "nonce", 0, "nonce of upload transaction") rootCmd.AddCommand(kvWriteCmd) } @@ -93,7 +95,7 @@ func kvWrite(*cobra.Command, []string) { defer cancel() } - w3client := blockchain.MustNewWeb3(kvWriteArgs.url, kvWriteArgs.key) + w3client := blockchain.MustNewWeb3(kvWriteArgs.url, kvWriteArgs.key, providerOption) defer w3client.Close() contractAddr := common.HexToAddress(kvWriteArgs.contract) flow, err := contract.NewFlowContract(contractAddr, w3client) @@ -106,17 +108,25 @@ func kvWrite(*cobra.Command, []string) { feeInA0GI := big.NewFloat(kvWriteArgs.fee) fee, _ = feeInA0GI.Mul(feeInA0GI, big.NewFloat(1e18)).Int(nil) } + var nonce *big.Int + if kvWriteArgs.nonce > 0 { + nonce = big.NewInt(int64(kvWriteArgs.nonce)) + } opt := transfer.UploadOption{ FinalityRequired: kvWriteArgs.finalityRequired, TaskSize: kvWriteArgs.taskSize, ExpectedReplica: kvWriteArgs.expectedReplica, SkipTx: kvWriteArgs.skipTx, Fee: fee, + Nonce: nonce, } var clients []*node.ZgsClient if kvWriteArgs.indexer != "" { - indexerClient, err := indexer.NewClient(kvWriteArgs.indexer, indexer.IndexerClientOption{LogOption: zg_common.LogOption{Logger: logrus.StandardLogger()}}) + indexerClient, err := indexer.NewClient(kvWriteArgs.indexer, indexer.IndexerClientOption{ + ProviderOption: providerOption, + LogOption: zg_common.LogOption{Logger: logrus.StandardLogger()}, + }) if err != nil { logrus.WithError(err).Fatal("Failed to initialize indexer client") } @@ -128,7 +138,7 @@ func kvWrite(*cobra.Command, []string) { if len(kvWriteArgs.node) == 0 { logrus.Fatal("At least one of --node and --indexer should not be empty") } - clients = node.MustNewZgsClients(kvWriteArgs.node) + clients = node.MustNewZgsClients(kvWriteArgs.node, providerOption) for _, client := range clients { defer client.Close() } diff --git a/cmd/root.go b/cmd/root.go index f3c93cc..635262f 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -3,8 +3,11 @@ package cmd import ( "fmt" "os" + "time" "github.com/0glabs/0g-storage-client/common/blockchain" + "github.com/mcuadros/go-defaults" + providers "github.com/openweb3/go-rpc-provider/provider_wrapper" "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -13,11 +16,14 @@ var ( logLevel string logColorDisabled bool + providerOption providers.Option + rootCmd = &cobra.Command{ Use: "0g-storage-client", Short: "ZeroGStorage client to interact with ZeroGStorage network", PersistentPreRun: func(*cobra.Command, []string) { initLog() + defaults.SetDefaults(&providerOption) }, RunE: func(cmd *cobra.Command, args []string) error { return cmd.Help() @@ -31,6 +37,9 @@ func init() { rootCmd.PersistentFlags().Uint64Var(&blockchain.CustomGasPrice, "gas-price", 0, "Custom gas price to send transaction") rootCmd.PersistentFlags().Uint64Var(&blockchain.CustomGasLimit, "gas-limit", 0, "Custom gas limit to send transaction") rootCmd.PersistentFlags().BoolVar(&blockchain.Web3LogEnabled, "web3-log-enabled", false, "Enable log for web3 RPC") + rootCmd.PersistentFlags().IntVar(&providerOption.RetryCount, "rpc-retry-count", 5, "Retry count for rpc request") + rootCmd.PersistentFlags().DurationVar(&providerOption.RetryInterval, "rpc-retry-interval", 5*time.Second, "Retry interval for rpc request") + rootCmd.PersistentFlags().DurationVar(&providerOption.RequestTimeout, "rpc-timeout", 30*time.Second, "Timeout for single rpc request") } func initLog() { diff --git a/cmd/upload.go b/cmd/upload.go index 1cb467b..413d369 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -36,7 +36,8 @@ var ( finalityRequired bool taskSize uint - fee float64 + fee float64 + nonce uint timeout time.Duration } @@ -73,6 +74,7 @@ func init() { uploadCmd.Flags().DurationVar(&uploadArgs.timeout, "timeout", 0, "cli task timeout, 0 for no timeout") uploadCmd.Flags().Float64Var(&uploadArgs.fee, "fee", 0, "fee paid in a0gi") + uploadCmd.Flags().UintVar(&uploadArgs.nonce, "nonce", 0, "nonce of upload transaction") rootCmd.AddCommand(uploadCmd) } @@ -85,7 +87,7 @@ func upload(*cobra.Command, []string) { defer cancel() } - w3client := blockchain.MustNewWeb3(uploadArgs.url, uploadArgs.key) + w3client := blockchain.MustNewWeb3(uploadArgs.url, uploadArgs.key, providerOption) defer w3client.Close() contractAddr := common.HexToAddress(uploadArgs.contract) flow, err := contract.NewFlowContract(contractAddr, w3client) @@ -97,6 +99,10 @@ func upload(*cobra.Command, []string) { feeInA0GI := big.NewFloat(uploadArgs.fee) fee, _ = feeInA0GI.Mul(feeInA0GI, big.NewFloat(1e18)).Int(nil) } + var nonce *big.Int + if uploadArgs.nonce > 0 { + nonce = big.NewInt(int64(uploadArgs.nonce)) + } opt := transfer.UploadOption{ Tags: hexutil.MustDecode(uploadArgs.tags), FinalityRequired: uploadArgs.finalityRequired, @@ -104,6 +110,7 @@ func upload(*cobra.Command, []string) { ExpectedReplica: uploadArgs.expectedReplica, SkipTx: uploadArgs.skipTx, Fee: fee, + Nonce: nonce, } file, err := core.Open(uploadArgs.file) @@ -113,7 +120,10 @@ func upload(*cobra.Command, []string) { defer file.Close() if uploadArgs.indexer != "" { - indexerClient, err := indexer.NewClient(uploadArgs.indexer, indexer.IndexerClientOption{LogOption: zg_common.LogOption{Logger: logrus.StandardLogger()}}) + indexerClient, err := indexer.NewClient(uploadArgs.indexer, indexer.IndexerClientOption{ + ProviderOption: providerOption, + LogOption: zg_common.LogOption{Logger: logrus.StandardLogger()}, + }) if err != nil { logrus.WithError(err).Fatal("Failed to initialize indexer client") } @@ -123,7 +133,7 @@ func upload(*cobra.Command, []string) { return } - clients := node.MustNewZgsClients(uploadArgs.node) + clients := node.MustNewZgsClients(uploadArgs.node, providerOption) for _, client := range clients { defer client.Close() } diff --git a/common/blockchain/rpc.go b/common/blockchain/rpc.go index f88d0ef..b1f1c10 100644 --- a/common/blockchain/rpc.go +++ b/common/blockchain/rpc.go @@ -7,6 +7,7 @@ import ( "github.com/0glabs/0g-storage-client/common/util" "github.com/ethereum/go-ethereum/common" gethTypes "github.com/ethereum/go-ethereum/core/types" + "github.com/mcuadros/go-defaults" providers "github.com/openweb3/go-rpc-provider/provider_wrapper" "github.com/openweb3/web3go" "github.com/openweb3/web3go/interfaces" @@ -23,8 +24,8 @@ type RetryOption struct { logger *logrus.Logger } -func MustNewWeb3(url, key string) *web3go.Client { - client, err := NewWeb3(url, key) +func MustNewWeb3(url, key string, opt ...providers.Option) *web3go.Client { + client, err := NewWeb3(url, key, opt...) if err != nil { logrus.WithError(err).WithField("url", url).Fatal("Failed to connect to fullnode") } @@ -32,13 +33,15 @@ func MustNewWeb3(url, key string) *web3go.Client { return client } -func NewWeb3(url, key string) (*web3go.Client, error) { +func NewWeb3(url, key string, opt ...providers.Option) (*web3go.Client, error) { sm := signers.MustNewSignerManagerByPrivateKeyStrings([]string{key}) - option := new(web3go.ClientOption). - WithRetry(3, time.Second). - WithTimout(5 * time.Second). - WithSignerManager(sm) + option := new(web3go.ClientOption) + if len(opt) > 0 { + option.Option = opt[0] + } + defaults.SetDefaults(&option.Option) + option.WithSignerManager(sm) if Web3LogEnabled { option = option.WithLooger(logrus.StandardLogger().Out) diff --git a/tests/batch_upload_test.py b/tests/batch_upload_test.py new file mode 100644 index 0000000..814b2c8 --- /dev/null +++ b/tests/batch_upload_test.py @@ -0,0 +1,30 @@ +import os + +from test_framework.test_framework import TestFramework +from config.node_config import GENESIS_PRIV_KEY +from utility.run_go_test import run_go_test + +class BatchUploadTest(TestFramework): + def setup_params(self): + self.num_blockchain_nodes = 1 + self.num_nodes = 1 + + def run_test(self): + ports = ",".join([x.rpc_url.split(":")[-1] for x in self.nodes]) + self.setup_indexer(self.nodes[0].rpc_url, self.nodes[0].rpc_url, ports) + test_args = [ + "go", + "run", + os.path.join(os.path.dirname(__file__), "go_tests", "batch_upload_test", "main.go"), + # arguments passed to go + GENESIS_PRIV_KEY, + self.contract.address(), + self.blockchain_nodes[0].rpc_url, + ",".join([x.rpc_url for x in self.nodes]), + self.indexer_rpc_url + ] + run_go_test(self.root_dir, test_args) + + +if __name__ == "__main__": + BatchUploadTest().main() diff --git a/tests/go_tests/batch_upload_test/main.go b/tests/go_tests/batch_upload_test/main.go new file mode 100644 index 0000000..8d53330 --- /dev/null +++ b/tests/go_tests/batch_upload_test/main.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + "github.com/0glabs/0g-storage-client/common" + "github.com/0glabs/0g-storage-client/common/blockchain" + "github.com/0glabs/0g-storage-client/common/util" + "github.com/0glabs/0g-storage-client/contract" + "github.com/0glabs/0g-storage-client/core" + "github.com/0glabs/0g-storage-client/indexer" + "github.com/0glabs/0g-storage-client/transfer" + eth_common "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +func runTest() error { + ctx := context.Background() + // load args + args := os.Args[1:] + key := args[0] + contractAddr := eth_common.HexToAddress(args[1]) + chainUrl := args[2] + zgsNodeUrls := strings.Split(args[3], ",") + indexerUrl := args[4] + + w3client := blockchain.MustNewWeb3(chainUrl, key) + defer w3client.Close() + flow, err := contract.NewFlowContract(contractAddr, w3client) + if err != nil { + return fmt.Errorf("failed to create flow contract") + } + // batch upload + datas := make([]core.IterableData, 10) + opts := make([]transfer.UploadOption, 10) + for i := 0; i < 10; i += 1 { + datas[i], err = core.NewDataInMemory([]byte(fmt.Sprintf("indexer_test_data_%v", i))) + if err != nil { + return errors.WithMessage(err, "failed to initialize data") + } + opts[i] = transfer.UploadOption{ + FinalityRequired: true, + } + } + indexerClient, err := indexer.NewClient(indexerUrl, indexer.IndexerClientOption{LogOption: common.LogOption{Logger: logrus.StandardLogger()}}) + if err != nil { + return errors.WithMessage(err, "failed to initialize indexer client") + } + _, roots, err := indexerClient.BatchUpload(ctx, flow, datas, true, opts) + if err != nil { + return errors.WithMessage(err, "failed to upload file") + } + // check file location + for _, root := range roots { + locations, err := indexerClient.GetFileLocations(ctx, root.Hex()) + if err != nil { + return errors.WithMessage(err, "failed to get file locations") + } + if len(locations) != 1 { + return fmt.Errorf("unexpected file location length: %v", len(locations)) + } + if locations[0].URL != zgsNodeUrls[0] { + return fmt.Errorf("unexpected file location: %v", locations[0].URL) + } + } + return nil +} + +func main() { + if err := util.WaitUntil(runTest, time.Minute*3); err != nil { + logrus.WithError(err).Fatalf("batch upload test failed") + } +} diff --git a/transfer/uploader.go b/transfer/uploader.go index 82a9fbe..ce54573 100644 --- a/transfer/uploader.go +++ b/transfer/uploader.go @@ -44,6 +44,7 @@ type UploadOption struct { ExpectedReplica uint // expected number of replications SkipTx bool // skip sending transaction on chain, this can set to true only if the data has already settled on chain before Fee *big.Int // fee in neuron + Nonce *big.Int // nonce for transaction } // Uploader uploader to upload file to 0g storage, send on-chain transactions and transfer data to storage nodes. @@ -102,6 +103,7 @@ func (uploader *Uploader) checkLogExistance(ctx context.Context, root common.Has } // BatchUpload submit multiple data to 0g storage contract batchly in single on-chain transaction, then transfer the data to the storage nodes. +// The nonce for upload transaction will be the first non-nil nonce in given upload options, the protocol fee is the sum of fees in upload options. func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.IterableData, waitForLogEntry bool, option ...[]UploadOption) (common.Hash, []common.Hash, error) { stageTimer := time.Now() @@ -126,6 +128,7 @@ func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.Iterable toSubmitDatas := make([]core.IterableData, 0) toSubmitTags := make([][]byte, 0) dataRoots := make([]common.Hash, n) + var nonce, fee *big.Int var lastTreeToSubmit *merkle.Tree for i := 0; i < n; i++ { data := datas[i] @@ -154,6 +157,16 @@ func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.Iterable if !opts[i].SkipTx || !exist { toSubmitDatas = append(toSubmitDatas, data) toSubmitTags = append(toSubmitTags, opt.Tags) + if opt.Fee != nil { + if fee == nil { + fee = new(big.Int).Set(opt.Fee) + } else { + fee.Add(fee, opt.Fee) + } + } + if nonce == nil && opts[i].Nonce != nil { + nonce = new(big.Int).Set(opts[i].Nonce) + } } lastTreeToSubmit = trees[i] } @@ -163,7 +176,7 @@ func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.Iterable var receipt *types.Receipt if len(toSubmitDatas) > 0 { var err error - if txHash, receipt, err = uploader.SubmitLogEntry(ctx, toSubmitDatas, toSubmitTags, waitForLogEntry, nil); err != nil { + if txHash, receipt, err = uploader.SubmitLogEntry(ctx, toSubmitDatas, toSubmitTags, waitForLogEntry, nonce, fee); err != nil { return common.Hash{}, nil, errors.WithMessage(err, "Failed to submit log entry") } if waitForLogEntry { @@ -224,7 +237,7 @@ func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, op if !opt.SkipTx || !exist { var receipt *types.Receipt - if _, receipt, err = uploader.SubmitLogEntry(ctx, []core.IterableData{data}, [][]byte{opt.Tags}, true, opt.Fee); err != nil { + if _, receipt, err = uploader.SubmitLogEntry(ctx, []core.IterableData{data}, [][]byte{opt.Tags}, true, opt.Nonce, opt.Fee); err != nil { return errors.WithMessage(err, "Failed to submit log entry") } @@ -257,7 +270,7 @@ func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, op } // SubmitLogEntry submit the data to 0g storage contract by sending a transaction -func (uploader *Uploader) SubmitLogEntry(ctx context.Context, datas []core.IterableData, tags [][]byte, waitForReceipt bool, fee *big.Int) (common.Hash, *types.Receipt, error) { +func (uploader *Uploader) SubmitLogEntry(ctx context.Context, datas []core.IterableData, tags [][]byte, waitForReceipt bool, nonce *big.Int, fee *big.Int) (common.Hash, *types.Receipt, error) { // Construct submission submissions := make([]contract.Submission, len(datas)) for i := 0; i < len(datas); i++ { @@ -274,6 +287,9 @@ func (uploader *Uploader) SubmitLogEntry(ctx context.Context, datas []core.Itera if err != nil { return common.Hash{}, nil, errors.WithMessage(err, "Failed to create opts to send transaction") } + if nonce != nil { + opts.Nonce = nonce + } var tx *types.Transaction pricePerSector, err := uploader.market.PricePerSector(&bind.CallOpts{Context: ctx})