Skip to content

Commit

Permalink
feat: Add support for uploading folders to 0gStorage (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
wanliqun authored Sep 10, 2024
1 parent 17934bd commit a50330b
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 54 deletions.
140 changes: 86 additions & 54 deletions cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,70 @@ import (
"github.com/0glabs/0g-storage-client/node"
"github.com/0glabs/0g-storage-client/transfer"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/openweb3/web3go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var (
uploadArgs struct {
file string
tags string
// L1 transaction relevant operations, including nonce, fee, and so on.
type transactionArgument struct {
url string
key string

url string
key string
fee float64
nonce uint
}

node []string
indexer string
func bindTransactionFlags(cmd *cobra.Command, args *transactionArgument) {
cmd.Flags().StringVar(&args.url, "url", "", "Fullnode URL to interact with ZeroGStorage smart contract")
cmd.MarkFlagRequired("url")
cmd.Flags().StringVar(&args.key, "key", "", "Private key to interact with smart contract")
cmd.MarkFlagRequired("key")

expectedReplica uint
cmd.Flags().Float64Var(&args.fee, "fee", 0, "fee paid in a0gi")
cmd.Flags().UintVar(&args.nonce, "nonce", 0, "nonce of upload transaction")
}

skipTx bool
finalityRequired bool
taskSize uint
type uploadArgument struct {
transactionArgument

fee float64
nonce uint
file string
tags string

timeout time.Duration
}
node []string
indexer string

expectedReplica uint

skipTx bool
finalityRequired bool
taskSize uint

timeout time.Duration
}

func bindUploadFlags(cmd *cobra.Command, args *uploadArgument) {
cmd.Flags().StringVar(&args.file, "file", "", "File name to upload")
cmd.MarkFlagRequired("file")
cmd.Flags().StringVar(&args.tags, "tags", "0x", "Tags of the file")

cmd.Flags().StringSliceVar(&args.node, "node", []string{}, "ZeroGStorage storage node URL")
cmd.Flags().StringVar(&args.indexer, "indexer", "", "ZeroGStorage indexer URL")
cmd.MarkFlagsOneRequired("indexer", "node")
cmd.MarkFlagsMutuallyExclusive("indexer", "node")

cmd.Flags().UintVar(&args.expectedReplica, "expected-replica", 1, "expected number of replications to upload")

cmd.Flags().BoolVar(&args.skipTx, "skip-tx", true, "Skip sending the transaction on chain if already exists")
cmd.Flags().BoolVar(&args.finalityRequired, "finality-required", false, "Wait for file finality on nodes to upload")
cmd.Flags().UintVar(&args.taskSize, "task-size", 10, "Number of segments to upload in single rpc request")

cmd.Flags().DurationVar(&args.timeout, "timeout", 0, "cli task timeout, 0 for no timeout")
}

var (
uploadArgs uploadArgument

uploadCmd = &cobra.Command{
Use: "upload",
Expand All @@ -47,30 +85,8 @@ var (
)

func init() {
uploadCmd.Flags().StringVar(&uploadArgs.file, "file", "", "File name to upload")
uploadCmd.MarkFlagRequired("file")
uploadCmd.Flags().StringVar(&uploadArgs.tags, "tags", "0x", "Tags of the file")

uploadCmd.Flags().StringVar(&uploadArgs.url, "url", "", "Fullnode URL to interact with ZeroGStorage smart contract")
uploadCmd.MarkFlagRequired("url")
uploadCmd.Flags().StringVar(&uploadArgs.key, "key", "", "Private key to interact with smart contract")
uploadCmd.MarkFlagRequired("key")

uploadCmd.Flags().StringSliceVar(&uploadArgs.node, "node", []string{}, "ZeroGStorage storage node URL")
uploadCmd.Flags().StringVar(&uploadArgs.indexer, "indexer", "", "ZeroGStorage indexer URL")
uploadCmd.MarkFlagsOneRequired("indexer", "node")
uploadCmd.MarkFlagsMutuallyExclusive("indexer", "node")

uploadCmd.Flags().UintVar(&uploadArgs.expectedReplica, "expected-replica", 1, "expected number of replications to upload")

uploadCmd.Flags().BoolVar(&uploadArgs.skipTx, "skip-tx", true, "Skip sending the transaction on chain if already exists")
uploadCmd.Flags().BoolVar(&uploadArgs.finalityRequired, "finality-required", false, "Wait for file finality on nodes to upload")
uploadCmd.Flags().UintVar(&uploadArgs.taskSize, "task-size", 10, "Number of segments to upload in single rpc request")

uploadCmd.Flags().DurationVar(&uploadArgs.timeout, "timeout", 0, "cli task timeout, 0 for no timeout")

uploadCmd.Flags().Float64Var(&uploadArgs.fee, "fee", 0, "fee paid in a0gi")
uploadCmd.Flags().UintVar(&uploadArgs.nonce, "nonce", 0, "nonce of upload transaction")
bindUploadFlags(uploadCmd, &uploadArgs)
bindTransactionFlags(uploadCmd, &uploadArgs.transactionArgument)

rootCmd.AddCommand(uploadCmd)
}
Expand Down Expand Up @@ -109,37 +125,53 @@ func upload(*cobra.Command, []string) {
Nonce: nonce,
}

uploader, closer, err := newUploader(ctx, uploadArgs, w3client, opt)
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize uploader")
}
defer closer()

file, err := core.Open(uploadArgs.file)
if err != nil {
logrus.WithError(err).Fatal("Failed to open file")
}
defer file.Close()

if uploadArgs.indexer != "" {
indexerClient, err := indexer.NewClient(uploadArgs.indexer, indexer.IndexerClientOption{
if _, err := uploader.Upload(ctx, file, opt); err != nil {
logrus.WithError(err).Fatal("Failed to upload file")
}
}

func newUploader(ctx context.Context, args uploadArgument, w3client *web3go.Client, opt transfer.UploadOption) (*transfer.Uploader, func(), error) {
if args.indexer != "" {
indexerClient, err := indexer.NewClient(args.indexer, indexer.IndexerClientOption{
ProviderOption: providerOption,
LogOption: zg_common.LogOption{Logger: logrus.StandardLogger()},
})
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize indexer client")
return nil, nil, errors.WithMessage(err, "failed to initialize indexer client")
}
if _, err := indexerClient.Upload(ctx, w3client, file, opt); err != nil {
logrus.WithError(err).Fatal("Failed to upload file")

up, err := indexerClient.NewUploaderFromIndexerNodes(ctx, w3client, opt.ExpectedReplica, nil)
if err != nil {
return nil, nil, err
}
return

return up, indexerClient.Close, nil
}

clients := node.MustNewZgsClients(uploadArgs.node, providerOption)
for _, client := range clients {
defer client.Close()
clients := node.MustNewZgsClients(args.node, providerOption)
closer := func() {
for _, client := range clients {
client.Close()
}
}

uploader, err := transfer.NewUploader(ctx, w3client, clients, zg_common.LogOption{Logger: logrus.StandardLogger()})
up, err := transfer.NewUploader(ctx, w3client, clients, zg_common.LogOption{Logger: logrus.StandardLogger()})
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize uploader")
closer()
return nil, nil, err
}

if _, err := uploader.Upload(ctx, file, opt); err != nil {
logrus.WithError(err).Fatal("Failed to upload file")
}
return up, closer, nil
}
71 changes: 71 additions & 0 deletions cmd/upload_dir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package cmd

import (
"context"

"github.com/0glabs/0g-storage-client/common/blockchain"
"github.com/0glabs/0g-storage-client/transfer"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var (
uploadDirArgs uploadArgument

uploadDirCmd = &cobra.Command{
Use: "upload-dir",
Short: "Upload directory to ZeroGStorage network",
Run: uploadDir,
}
)

func init() {
bindUploadFlags(uploadDirCmd, &uploadDirArgs)
uploadDirCmd.Flags().StringVar(&uploadDirArgs.url, "url", "", "Fullnode URL to interact with ZeroGStorage smart contract")
uploadDirCmd.MarkFlagRequired("url")
uploadDirCmd.Flags().StringVar(&uploadDirArgs.key, "key", "", "Private key to interact with smart contract")
uploadDirCmd.MarkFlagRequired("key")

rootCmd.AddCommand(uploadDirCmd)
}

func uploadDir(*cobra.Command, []string) {
ctx := context.Background()
var cancel context.CancelFunc
if uploadDirArgs.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, uploadDirArgs.timeout)
defer cancel()
}

w3client := blockchain.MustNewWeb3(uploadDirArgs.url, uploadDirArgs.key, providerOption)
defer w3client.Close()

finalityRequired := transfer.TransactionPacked
if uploadDirArgs.finalityRequired {
finalityRequired = transfer.FileFinalized
}
opt := transfer.UploadOption{
Tags: hexutil.MustDecode(uploadDirArgs.tags),
FinalityRequired: finalityRequired,
TaskSize: uploadDirArgs.taskSize,
ExpectedReplica: uploadDirArgs.expectedReplica,
SkipTx: uploadDirArgs.skipTx,
}

uploader, closer, err := newUploader(ctx, uploadDirArgs, w3client, opt)
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize uploader")
}
defer closer()

txnHash, rootHash, err := uploader.UploadDir(ctx, uploadDirArgs.file, opt)
if err != nil {
logrus.WithError(err).Fatal("Failed to upload directory")
}

logrus.WithFields(logrus.Fields{
"txnHash": txnHash,
"rootHash": rootHash,
}).Info("Directory uploaded done")
}
52 changes: 52 additions & 0 deletions transfer/dir/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,54 @@ func (node *FsNode) Equal(rhs *FsNode) bool {
}
}

// Flatten recursively flattens the FsNode tree into a slice of FsNode pointers and a slice of relative paths.
// The filterFunc is applied to each node to determine if it should be included in the result.
func (node *FsNode) Flatten(filterFunc func(*FsNode) bool) (result []*FsNode, relpaths []string) {
node.Traverse(func(n *FsNode, p string) error {
if filterFunc(n) {
result = append(result, n)
relpaths = append(relpaths, p)
}
return nil
})
return result, relpaths
}

// Traverse recursively traverses the FsNode tree and applies the provided actionFunc to each node.
// This method only requires the user to handle relative paths.
//
// Parameters:
//
// - actionFunc: A function that defines the action to perform on each node. The function
// takes the current node and its relative path as arguments. This function can perform any necessary
// operations, such as collecting nodes, uploading files, or logging information.
func (node *FsNode) Traverse(actionFunc func(node *FsNode, relativePath string) error) error {
return node.traverse("", actionFunc)
}

// traverse is a helper function that manages relative paths during the traversal process.
func (node *FsNode) traverse(baseDir string, actionFunc func(node *FsNode, relativePath string) error) error {
relative := filepath.Join(baseDir, node.Name)

// Apply the action function to the current node
if err := actionFunc(node, relative); err != nil {
return err
}

if node.Type != FileTypeDirectory {
return nil
}

// If the node is a directory, recursively traverse its entries
for _, entry := range node.Entries {
if err := entry.traverse(relative, actionFunc); err != nil {
return err
}
}

return nil
}

// BuildFileTree recursively builds a file tree for the specified directory.
func BuildFileTree(path string) (*FsNode, error) {
info, err := os.Stat(path)
Expand Down Expand Up @@ -171,6 +219,10 @@ func buildSymbolicNode(path string, info os.FileInfo) (*FsNode, error) {

// buildFileNode creates an FsNode for a regular file, including its Merkle root hash.
func buildFileNode(path string, info os.FileInfo) (*FsNode, error) {
if info.Size() == 0 {
return NewFileFsNode(info.Name(), common.Hash{}, 0), nil
}

hash, err := core.MerkleRoot(path)
if err != nil {
return nil, errors.WithMessagef(err, "failed to calculate merkle root for %s", path)
Expand Down
Loading

0 comments on commit a50330b

Please sign in to comment.