From a50330b07211503e867e2f56cf98f525834fbb5f Mon Sep 17 00:00:00 2001 From: Liqun Date: Tue, 10 Sep 2024 10:06:06 +0800 Subject: [PATCH] feat: Add support for uploading folders to 0gStorage (#46) --- cmd/upload.go | 140 ++++++++++++++++++++++++---------------- cmd/upload_dir.go | 71 ++++++++++++++++++++ transfer/dir/fs.go | 52 +++++++++++++++ transfer/dir/fs_test.go | 71 ++++++++++++++++++++ transfer/uploader.go | 68 +++++++++++++++++++ 5 files changed, 348 insertions(+), 54 deletions(-) create mode 100644 cmd/upload_dir.go diff --git a/cmd/upload.go b/cmd/upload.go index 1e58b1b..7afdec7 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -12,32 +12,70 @@ import ( "github.com/0glabs/0g-storage-client/node" "github.com/0glabs/0g-storage-client/transfer" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/openweb3/web3go" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) -var ( - uploadArgs struct { - file string - tags string +// L1 transaction relevant operations, including nonce, fee, and so on. +type transactionArgument struct { + url string + key string - url string - key string + fee float64 + nonce uint +} - node []string - indexer string +func bindTransactionFlags(cmd *cobra.Command, args *transactionArgument) { + cmd.Flags().StringVar(&args.url, "url", "", "Fullnode URL to interact with ZeroGStorage smart contract") + cmd.MarkFlagRequired("url") + cmd.Flags().StringVar(&args.key, "key", "", "Private key to interact with smart contract") + cmd.MarkFlagRequired("key") - expectedReplica uint + cmd.Flags().Float64Var(&args.fee, "fee", 0, "fee paid in a0gi") + cmd.Flags().UintVar(&args.nonce, "nonce", 0, "nonce of upload transaction") +} - skipTx bool - finalityRequired bool - taskSize uint +type uploadArgument struct { + transactionArgument - fee float64 - nonce uint + file string + tags string - timeout time.Duration - } + node []string + indexer string + + expectedReplica uint + + skipTx bool + finalityRequired bool + taskSize uint + + timeout time.Duration +} + +func bindUploadFlags(cmd *cobra.Command, args *uploadArgument) { + cmd.Flags().StringVar(&args.file, "file", "", "File name to upload") + cmd.MarkFlagRequired("file") + cmd.Flags().StringVar(&args.tags, "tags", "0x", "Tags of the file") + + cmd.Flags().StringSliceVar(&args.node, "node", []string{}, "ZeroGStorage storage node URL") + cmd.Flags().StringVar(&args.indexer, "indexer", "", "ZeroGStorage indexer URL") + cmd.MarkFlagsOneRequired("indexer", "node") + cmd.MarkFlagsMutuallyExclusive("indexer", "node") + + cmd.Flags().UintVar(&args.expectedReplica, "expected-replica", 1, "expected number of replications to upload") + + cmd.Flags().BoolVar(&args.skipTx, "skip-tx", true, "Skip sending the transaction on chain if already exists") + cmd.Flags().BoolVar(&args.finalityRequired, "finality-required", false, "Wait for file finality on nodes to upload") + cmd.Flags().UintVar(&args.taskSize, "task-size", 10, "Number of segments to upload in single rpc request") + + cmd.Flags().DurationVar(&args.timeout, "timeout", 0, "cli task timeout, 0 for no timeout") +} + +var ( + uploadArgs uploadArgument uploadCmd = &cobra.Command{ Use: "upload", @@ -47,30 +85,8 @@ var ( ) func init() { - uploadCmd.Flags().StringVar(&uploadArgs.file, "file", "", "File name to upload") - uploadCmd.MarkFlagRequired("file") - uploadCmd.Flags().StringVar(&uploadArgs.tags, "tags", "0x", "Tags of the file") - - uploadCmd.Flags().StringVar(&uploadArgs.url, "url", "", "Fullnode URL to interact with ZeroGStorage smart contract") - uploadCmd.MarkFlagRequired("url") - uploadCmd.Flags().StringVar(&uploadArgs.key, "key", "", "Private key to interact with smart contract") - uploadCmd.MarkFlagRequired("key") - - uploadCmd.Flags().StringSliceVar(&uploadArgs.node, "node", []string{}, "ZeroGStorage storage node URL") - uploadCmd.Flags().StringVar(&uploadArgs.indexer, "indexer", "", "ZeroGStorage indexer URL") - uploadCmd.MarkFlagsOneRequired("indexer", "node") - uploadCmd.MarkFlagsMutuallyExclusive("indexer", "node") - - uploadCmd.Flags().UintVar(&uploadArgs.expectedReplica, "expected-replica", 1, "expected number of replications to upload") - - uploadCmd.Flags().BoolVar(&uploadArgs.skipTx, "skip-tx", true, "Skip sending the transaction on chain if already exists") - uploadCmd.Flags().BoolVar(&uploadArgs.finalityRequired, "finality-required", false, "Wait for file finality on nodes to upload") - uploadCmd.Flags().UintVar(&uploadArgs.taskSize, "task-size", 10, "Number of segments to upload in single rpc request") - - uploadCmd.Flags().DurationVar(&uploadArgs.timeout, "timeout", 0, "cli task timeout, 0 for no timeout") - - uploadCmd.Flags().Float64Var(&uploadArgs.fee, "fee", 0, "fee paid in a0gi") - uploadCmd.Flags().UintVar(&uploadArgs.nonce, "nonce", 0, "nonce of upload transaction") + bindUploadFlags(uploadCmd, &uploadArgs) + bindTransactionFlags(uploadCmd, &uploadArgs.transactionArgument) rootCmd.AddCommand(uploadCmd) } @@ -109,37 +125,53 @@ func upload(*cobra.Command, []string) { Nonce: nonce, } + uploader, closer, err := newUploader(ctx, uploadArgs, w3client, opt) + if err != nil { + logrus.WithError(err).Fatal("Failed to initialize uploader") + } + defer closer() + file, err := core.Open(uploadArgs.file) if err != nil { logrus.WithError(err).Fatal("Failed to open file") } defer file.Close() - if uploadArgs.indexer != "" { - indexerClient, err := indexer.NewClient(uploadArgs.indexer, indexer.IndexerClientOption{ + if _, err := uploader.Upload(ctx, file, opt); err != nil { + logrus.WithError(err).Fatal("Failed to upload file") + } +} + +func newUploader(ctx context.Context, args uploadArgument, w3client *web3go.Client, opt transfer.UploadOption) (*transfer.Uploader, func(), error) { + if args.indexer != "" { + indexerClient, err := indexer.NewClient(args.indexer, indexer.IndexerClientOption{ ProviderOption: providerOption, LogOption: zg_common.LogOption{Logger: logrus.StandardLogger()}, }) if err != nil { - logrus.WithError(err).Fatal("Failed to initialize indexer client") + return nil, nil, errors.WithMessage(err, "failed to initialize indexer client") } - if _, err := indexerClient.Upload(ctx, w3client, file, opt); err != nil { - logrus.WithError(err).Fatal("Failed to upload file") + + up, err := indexerClient.NewUploaderFromIndexerNodes(ctx, w3client, opt.ExpectedReplica, nil) + if err != nil { + return nil, nil, err } - return + + return up, indexerClient.Close, nil } - clients := node.MustNewZgsClients(uploadArgs.node, providerOption) - for _, client := range clients { - defer client.Close() + clients := node.MustNewZgsClients(args.node, providerOption) + closer := func() { + for _, client := range clients { + client.Close() + } } - uploader, err := transfer.NewUploader(ctx, w3client, clients, zg_common.LogOption{Logger: logrus.StandardLogger()}) + up, err := transfer.NewUploader(ctx, w3client, clients, zg_common.LogOption{Logger: logrus.StandardLogger()}) if err != nil { - logrus.WithError(err).Fatal("Failed to initialize uploader") + closer() + return nil, nil, err } - if _, err := uploader.Upload(ctx, file, opt); err != nil { - logrus.WithError(err).Fatal("Failed to upload file") - } + return up, closer, nil } diff --git a/cmd/upload_dir.go b/cmd/upload_dir.go new file mode 100644 index 0000000..be99b84 --- /dev/null +++ b/cmd/upload_dir.go @@ -0,0 +1,71 @@ +package cmd + +import ( + "context" + + "github.com/0glabs/0g-storage-client/common/blockchain" + "github.com/0glabs/0g-storage-client/transfer" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var ( + uploadDirArgs uploadArgument + + uploadDirCmd = &cobra.Command{ + Use: "upload-dir", + Short: "Upload directory to ZeroGStorage network", + Run: uploadDir, + } +) + +func init() { + bindUploadFlags(uploadDirCmd, &uploadDirArgs) + uploadDirCmd.Flags().StringVar(&uploadDirArgs.url, "url", "", "Fullnode URL to interact with ZeroGStorage smart contract") + uploadDirCmd.MarkFlagRequired("url") + uploadDirCmd.Flags().StringVar(&uploadDirArgs.key, "key", "", "Private key to interact with smart contract") + uploadDirCmd.MarkFlagRequired("key") + + rootCmd.AddCommand(uploadDirCmd) +} + +func uploadDir(*cobra.Command, []string) { + ctx := context.Background() + var cancel context.CancelFunc + if uploadDirArgs.timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, uploadDirArgs.timeout) + defer cancel() + } + + w3client := blockchain.MustNewWeb3(uploadDirArgs.url, uploadDirArgs.key, providerOption) + defer w3client.Close() + + finalityRequired := transfer.TransactionPacked + if uploadDirArgs.finalityRequired { + finalityRequired = transfer.FileFinalized + } + opt := transfer.UploadOption{ + Tags: hexutil.MustDecode(uploadDirArgs.tags), + FinalityRequired: finalityRequired, + TaskSize: uploadDirArgs.taskSize, + ExpectedReplica: uploadDirArgs.expectedReplica, + SkipTx: uploadDirArgs.skipTx, + } + + uploader, closer, err := newUploader(ctx, uploadDirArgs, w3client, opt) + if err != nil { + logrus.WithError(err).Fatal("Failed to initialize uploader") + } + defer closer() + + txnHash, rootHash, err := uploader.UploadDir(ctx, uploadDirArgs.file, opt) + if err != nil { + logrus.WithError(err).Fatal("Failed to upload directory") + } + + logrus.WithFields(logrus.Fields{ + "txnHash": txnHash, + "rootHash": rootHash, + }).Info("Directory uploaded done") +} diff --git a/transfer/dir/fs.go b/transfer/dir/fs.go index 15b18da..60eb913 100644 --- a/transfer/dir/fs.go +++ b/transfer/dir/fs.go @@ -100,6 +100,54 @@ func (node *FsNode) Equal(rhs *FsNode) bool { } } +// Flatten recursively flattens the FsNode tree into a slice of FsNode pointers and a slice of relative paths. +// The filterFunc is applied to each node to determine if it should be included in the result. +func (node *FsNode) Flatten(filterFunc func(*FsNode) bool) (result []*FsNode, relpaths []string) { + node.Traverse(func(n *FsNode, p string) error { + if filterFunc(n) { + result = append(result, n) + relpaths = append(relpaths, p) + } + return nil + }) + return result, relpaths +} + +// Traverse recursively traverses the FsNode tree and applies the provided actionFunc to each node. +// This method only requires the user to handle relative paths. +// +// Parameters: +// +// - actionFunc: A function that defines the action to perform on each node. The function +// takes the current node and its relative path as arguments. This function can perform any necessary +// operations, such as collecting nodes, uploading files, or logging information. +func (node *FsNode) Traverse(actionFunc func(node *FsNode, relativePath string) error) error { + return node.traverse("", actionFunc) +} + +// traverse is a helper function that manages relative paths during the traversal process. +func (node *FsNode) traverse(baseDir string, actionFunc func(node *FsNode, relativePath string) error) error { + relative := filepath.Join(baseDir, node.Name) + + // Apply the action function to the current node + if err := actionFunc(node, relative); err != nil { + return err + } + + if node.Type != FileTypeDirectory { + return nil + } + + // If the node is a directory, recursively traverse its entries + for _, entry := range node.Entries { + if err := entry.traverse(relative, actionFunc); err != nil { + return err + } + } + + return nil +} + // BuildFileTree recursively builds a file tree for the specified directory. func BuildFileTree(path string) (*FsNode, error) { info, err := os.Stat(path) @@ -171,6 +219,10 @@ func buildSymbolicNode(path string, info os.FileInfo) (*FsNode, error) { // buildFileNode creates an FsNode for a regular file, including its Merkle root hash. func buildFileNode(path string, info os.FileInfo) (*FsNode, error) { + if info.Size() == 0 { + return NewFileFsNode(info.Name(), common.Hash{}, 0), nil + } + hash, err := core.MerkleRoot(path) if err != nil { return nil, errors.WithMessagef(err, "failed to calculate merkle root for %s", path) diff --git a/transfer/dir/fs_test.go b/transfer/dir/fs_test.go index d3ca9d4..6b9ecaf 100644 --- a/transfer/dir/fs_test.go +++ b/transfer/dir/fs_test.go @@ -1,6 +1,7 @@ package dir_test import ( + "fmt" "os" "path/filepath" "testing" @@ -207,3 +208,73 @@ func TestBuildFileTree(t *testing.T) { assert.Equal(t, filePath, node.Link) }) } + +func TestTraverse(t *testing.T) { + // Create a mock directory structure + root := &dir.FsNode{ + Name: "root", + Type: dir.FileTypeDirectory, + Entries: []*dir.FsNode{ + { + Name: "file1.txt", + Type: dir.FileTypeFile, + }, + { + Name: "subdir", + Type: dir.FileTypeDirectory, + Entries: []*dir.FsNode{ + { + Name: "file2.txt", + Type: dir.FileTypeFile, + }, + }, + }, + }, + } + + // Define the expected paths + expectedPaths := map[string]bool{ + "root": false, + "root/file1.txt": false, + "root/subdir": false, + "root/subdir/file2.txt": false, + } + + // Define the action function to check the paths + actionFunc := func(node *dir.FsNode, path string) error { + if _, ok := expectedPaths[path]; ok { + expectedPaths[path] = true + return nil + } else { + return fmt.Errorf("Unexpected path: %s", path) + } + } + + // Perform the traversal + err := root.Traverse(actionFunc) + assert.NoError(t, err) + + // Verify all expected paths were visited + for path, visited := range expectedPaths { + assert.True(t, visited, "Path not visited: %s", path) + } + + // Define the expected flatten result + expectedFlattenResult := []struct { + node *dir.FsNode + path string + }{ + {root.Entries[0], "root/file1.txt"}, + {root.Entries[1].Entries[0], "root/subdir/file2.txt"}, + } + + // Test Flatten method + result, paths := root.Flatten(func(n *dir.FsNode) bool { + return n.Type == dir.FileTypeFile + }) + + for i, item := range expectedFlattenResult { + assert.Equal(t, item.node, result[i]) + assert.Equal(t, item.path, paths[i]) + } +} diff --git a/transfer/uploader.go b/transfer/uploader.go index 2b6c8cb..577a963 100644 --- a/transfer/uploader.go +++ b/transfer/uploader.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/big" + "path/filepath" "runtime" "sort" "strings" @@ -18,6 +19,7 @@ import ( "github.com/0glabs/0g-storage-client/core" "github.com/0glabs/0g-storage-client/core/merkle" "github.com/0glabs/0g-storage-client/node" + "github.com/0glabs/0g-storage-client/transfer/dir" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/openweb3/web3go" @@ -348,6 +350,72 @@ func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, op return txHash, nil } +func (uploader *Uploader) UploadDir(ctx context.Context, folder string, option ...UploadOption) (txnHash, rootHash common.Hash, _ error) { + // Build the file tree representation of the directory. + root, err := dir.BuildFileTree(folder) + if err != nil { + return txnHash, rootHash, errors.WithMessage(err, "failed to build file tree") + } + + tdata, err := root.MarshalBinary() + if err != nil { + return txnHash, rootHash, errors.WithMessage(err, "failed to encode file tree") + } + + // Create an in-memory data object from the encoded file tree. + iterdata, err := core.NewDataInMemory(tdata) + if err != nil { + return txnHash, rootHash, errors.WithMessage(err, "failed to create `IterableData` in memory") + } + + // Generate the Merkle tree from the in-memory data. + mtree, err := core.MerkleTree(iterdata) + if err != nil { + return txnHash, rootHash, errors.WithMessage(err, "failed to create merkle tree") + } + rootHash = mtree.Root() + + // Flattening the file tree to get the list of files and their relative paths. + _, relPaths := root.Flatten(func(n *dir.FsNode) bool { + return n.Type == dir.FileTypeFile && n.Size > 0 + }) + + logrus.Infof("Total %d files to be uploaded", len(relPaths)) + + // Upload each file to the storage network. + for i := range relPaths { + path := filepath.Join(folder, relPaths[i]) + txhash, err := uploader.UploadFile(ctx, path, option...) + if err != nil { + return txnHash, rootHash, errors.WithMessagef(err, "failed to upload file %s", path) + } + + logrus.WithFields(logrus.Fields{ + "txnHash": txhash, + "path": path, + }).Info("File uploaded successfully") + } + + // Finally, upload the directory metadata + txnHash, err = uploader.Upload(ctx, iterdata, option...) + if err != nil { + err = errors.WithMessage(err, "failed to upload directory metadata") + } + + return txnHash, rootHash, err +} + +func (uploader *Uploader) UploadFile(ctx context.Context, path string, option ...UploadOption) (txnHash common.Hash, err error) { + file, err := core.Open(path) + if err != nil { + err = errors.WithMessagef(err, "failed to open file %s", path) + return + } + defer file.Close() + + return uploader.Upload(ctx, file, option...) +} + // SubmitLogEntry submit the data to 0g storage contract by sending a transaction func (uploader *Uploader) SubmitLogEntry(ctx context.Context, datas []core.IterableData, tags [][]byte, waitForReceipt bool, nonce *big.Int, fee *big.Int) (common.Hash, *types.Receipt, error) { // Construct submission