diff --git a/cmd/download.go b/cmd/download.go index 3ddc0bb..daba154 100644 --- a/cmd/download.go +++ b/cmd/download.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "runtime" "time" "github.com/0glabs/0g-storage-client/common" @@ -20,8 +21,11 @@ type downloadArgument struct { nodes []string root string + roots []string proof bool + routines int + timeout time.Duration } @@ -34,9 +38,14 @@ func bindDownloadFlags(cmd *cobra.Command, args *downloadArgument) { cmd.MarkFlagsOneRequired("indexer", "node") cmd.Flags().StringVar(&args.root, "root", "", "Merkle root to download file") - cmd.MarkFlagRequired("root") + cmd.Flags().StringSliceVar(&args.roots, "roots", []string{}, "Merkle roots to download fragments") + cmd.MarkFlagsOneRequired("root", "roots") + cmd.MarkFlagsMutuallyExclusive("root", "roots") + cmd.Flags().BoolVar(&args.proof, "proof", false, "Whether to download with merkle proof for validation") + cmd.Flags().IntVar(&args.routines, "routines", runtime.GOMAXPROCS(0), "number of go routines for downloading simutanously") + cmd.Flags().DurationVar(&args.timeout, "timeout", 0, "cli task timeout, 0 for no timeout") } @@ -70,8 +79,14 @@ func download(*cobra.Command, []string) { } defer closer() - if err := downloader.Download(ctx, downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil { - logrus.WithError(err).Fatal("Failed to download file") + if downloadArgs.root != "" { + if err := downloader.Download(ctx, downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil { + logrus.WithError(err).Fatal("Failed to download file") + } + } else { + if err := downloader.DownloadFragments(ctx, downloadArgs.roots, downloadArgs.file, downloadArgs.proof); err != nil { + logrus.WithError(err).Fatal("Failed to download file") + } } } @@ -100,6 +115,7 @@ func newDownloader(args downloadArgument) (transfer.IDownloader, func(), error) closer() return nil, nil, err } + downloader.WithRoutines(downloadArgs.routines) return downloader, closer, nil } diff --git a/cmd/upload.go b/cmd/upload.go index 91657a1..737fb2f 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -3,6 +3,8 @@ package cmd import ( "context" "math/big" + "runtime" + "strings" "time" zg_common "github.com/0glabs/0g-storage-client/common" @@ -51,6 +53,9 @@ type uploadArgument struct { skipTx bool finalityRequired bool taskSize uint + routines int + + fragmentSize int64 timeout time.Duration } @@ -71,6 +76,10 @@ func bindUploadFlags(cmd *cobra.Command, args *uploadArgument) { cmd.Flags().BoolVar(&args.finalityRequired, "finality-required", false, "Wait for file finality on nodes to upload") cmd.Flags().UintVar(&args.taskSize, "task-size", 10, "Number of segments to upload in single rpc request") + cmd.Flags().Int64Var(&args.fragmentSize, "fragment-size", 1024*1024*1024*4, "the size of fragment to split into when file is too large") + + cmd.Flags().IntVar(&args.routines, "routines", runtime.GOMAXPROCS(0), "number of go routines for uploading simutanously") + cmd.Flags().DurationVar(&args.timeout, "timeout", 0, "cli task timeout, 0 for no timeout") } @@ -136,10 +145,21 @@ func upload(*cobra.Command, []string) { logrus.WithError(err).Fatal("Failed to initialize uploader") } defer closer() + uploader.WithRoutines(uploadArgs.routines) - if _, err := uploader.Upload(ctx, file, opt); err != nil { + _, roots, err := uploader.SplitableUpload(ctx, file, uploadArgs.fragmentSize, opt) + if err != nil { logrus.WithError(err).Fatal("Failed to upload file") } + if len(roots) == 1 { + logrus.Infof("file uploaded, root = %v", roots[0]) + } else { + s := make([]string, len(roots)) + for i, root := range roots { + s[i] = root.String() + } + logrus.Infof("file uploaded in %v fragments, roots = %v", len(roots), strings.Join(s, ",")) + } } func newUploader(ctx context.Context, segNum uint64, args uploadArgument, w3client *web3go.Client, opt transfer.UploadOption) (*transfer.Uploader, func(), error) { diff --git a/cmd/upload_dir.go b/cmd/upload_dir.go index 5eda54c..9263fea 100644 --- a/cmd/upload_dir.go +++ b/cmd/upload_dir.go @@ -58,6 +58,7 @@ func uploadDir(*cobra.Command, []string) { logrus.WithError(err).Fatal("Failed to initialize uploader") } defer closer() + uploader.WithRoutines(uploadArgs.routines) txnHash, rootHash, err := uploader.UploadDir(ctx, uploadDirArgs.file, opt) if err != nil { diff --git a/common/shard/types.go b/common/shard/types.go index 5e5ecbd..2ad9a4a 100644 --- a/common/shard/types.go +++ b/common/shard/types.go @@ -2,9 +2,8 @@ package shard import ( "sort" - "time" - "golang.org/x/exp/rand" + "github.com/0glabs/0g-storage-client/common/util" ) type ShardConfig struct { @@ -130,12 +129,7 @@ func CheckReplica(shardConfigs []*ShardConfig, expectedReplica uint) bool { // Helper function to pre-process (sort or shuffle) the nodes before selection func prepareSelectionNodes(nodes []*ShardedNode, random bool) []*ShardedNode { if random { - // Shuffle the nodes randomly if needed - rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) - for i := range nodes { - j := rng.Intn(i + 1) - nodes[i], nodes[j] = nodes[j], nodes[i] - } + util.Shuffle(nodes) } else { // Sort nodes based on NumShard and ShardId sort.Slice(nodes, func(i, j int) bool { diff --git a/common/util/shuffle.go b/common/util/shuffle.go new file mode 100644 index 0000000..828b376 --- /dev/null +++ b/common/util/shuffle.go @@ -0,0 +1,15 @@ +package util + +import ( + "time" + + "golang.org/x/exp/rand" +) + +func Shuffle[T any](items []T) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + for i := range items { + j := rng.Intn(i + 1) + items[i], items[j] = items[j], items[i] + } +} diff --git a/core/dataflow.go b/core/dataflow.go index 4321080..5695b2d 100644 --- a/core/dataflow.go +++ b/core/dataflow.go @@ -30,10 +30,11 @@ var ( type IterableData interface { NumChunks() uint64 NumSegments() uint64 + Offset() int64 Size() int64 PaddedSize() uint64 - Iterate(offset int64, batch int64, flowPadding bool) Iterator Read(buf []byte, offset int64) (int, error) + Split(fragmentSize int64) []IterableData } // MerkleTree create merkle tree of the data. diff --git a/core/file.go b/core/file.go index 8fe220d..771e549 100644 --- a/core/file.go +++ b/core/file.go @@ -21,12 +21,14 @@ type File struct { os.FileInfo underlying *os.File paddedSize uint64 + offset int64 + size int64 } var _ IterableData = (*File)(nil) func (file *File) Read(buf []byte, offset int64) (int, error) { - n, err := file.underlying.ReadAt(buf, offset) + n, err := file.underlying.ReadAt(buf, file.offset+offset) // unexpected IO error if !errors.Is(err, io.EOF) { return 0, err @@ -72,6 +74,8 @@ func Open(name string) (*File, error) { return &File{ FileInfo: info, underlying: file, + offset: 0, + size: info.Size(), paddedSize: IteratorPaddedSize(info.Size(), true), }, nil } @@ -110,86 +114,26 @@ func (file *File) PaddedSize() uint64 { return file.paddedSize } -func (file *File) Iterate(offset int64, batch int64, flowPadding bool) Iterator { - if batch%DefaultChunkSize > 0 { - panic("batch size should align with chunk size") - } - dataSize := file.Size() - return &FileIterator{ - file: file.underlying, - buf: make([]byte, batch), - offset: offset, - fileSize: dataSize, - paddedSize: IteratorPaddedSize(dataSize, flowPadding), - } -} - -type FileIterator struct { - file *os.File - buf []byte // buffer to read data from file - bufSize int // actual data size in buffer - fileSize int64 - paddedSize uint64 - offset int64 // offset to read data -} - -var _ Iterator = (*FileIterator)(nil) - -func (it *FileIterator) Next() (bool, error) { - // Reject invalid offset - if it.offset < 0 || uint64(it.offset) >= it.paddedSize { - return false, nil - } - - var expectedBufSize int - maxAvailableLength := it.paddedSize - uint64(it.offset) - if maxAvailableLength >= uint64(len(it.buf)) { - expectedBufSize = len(it.buf) - } else { - expectedBufSize = int(maxAvailableLength) - } - - it.clearBuffer() - - if it.offset >= it.fileSize { - it.paddingZeros(expectedBufSize) - return true, nil - } - - n, err := it.file.ReadAt(it.buf, it.offset) - it.bufSize = n - it.offset += int64(n) - - // not reach EOF - if n == expectedBufSize { - return true, nil - } - - // unexpected IO error - if !errors.Is(err, io.EOF) { - return false, err - } - - if n > expectedBufSize { - // should never happen - panic("load more data from file than expected") - } - - it.paddingZeros(expectedBufSize - n) - - return true, nil -} - -func (it *FileIterator) clearBuffer() { - it.bufSize = 0 +func (file *File) Size() int64 { + return file.size } -func (it *FileIterator) paddingZeros(length int) { - paddingZeros(it.buf, it.bufSize, length) - it.bufSize += length - it.offset += int64(length) +func (file *File) Offset() int64 { + return file.offset } -func (it *FileIterator) Current() []byte { - return it.buf[:it.bufSize] +func (file *File) Split(fragmentSize int64) []IterableData { + fragments := make([]IterableData, 0) + for offset := file.offset; offset < file.offset+file.size; offset += fragmentSize { + size := min(file.size-offset, fragmentSize) + fragment := &File{ + FileInfo: file.FileInfo, + underlying: file.underlying, + offset: offset, + size: size, + paddedSize: IteratorPaddedSize(size, true), + } + fragments = append(fragments, fragment) + } + return fragments } diff --git a/core/flow.go b/core/flow.go index 4e9f614..7250f60 100644 --- a/core/flow.go +++ b/core/flow.go @@ -43,7 +43,7 @@ func (flow *Flow) CreateSubmission() (*contract.Submission, error) { return &submission, nil } -func nextPow2(input uint64) uint64 { +func NextPow2(input uint64) uint64 { x := input x -= 1 x |= x >> 32 @@ -57,7 +57,7 @@ func nextPow2(input uint64) uint64 { } func ComputePaddedSize(chunks uint64) (uint64, uint64) { - chunksNextPow2 := nextPow2(chunks) + chunksNextPow2 := NextPow2(chunks) if chunksNextPow2 == chunks { return chunksNextPow2, chunksNextPow2 } diff --git a/core/in_mem.go b/core/in_mem.go index 38e0d79..35b150a 100644 --- a/core/in_mem.go +++ b/core/in_mem.go @@ -5,6 +5,8 @@ import "errors" // DataInMemory implement of IterableData, the underlying is memory data type DataInMemory struct { underlying []byte + offset int64 + size int64 paddedSize uint64 } @@ -17,12 +19,14 @@ func NewDataInMemory(data []byte) (*DataInMemory, error) { } return &DataInMemory{ underlying: data, + offset: 0, + size: int64(len(data)), paddedSize: IteratorPaddedSize(int64(len(data)), true), }, nil } func (data *DataInMemory) Read(buf []byte, offset int64) (int, error) { - n := copy(buf, data.underlying[offset:]) + n := copy(buf, data.underlying[data.offset+offset:]) return n, nil } @@ -35,78 +39,28 @@ func (data *DataInMemory) NumSegments() uint64 { } func (data *DataInMemory) Size() int64 { - return int64(len(data.underlying)) + return data.size } -func (data *DataInMemory) PaddedSize() uint64 { - return data.paddedSize -} - -func (data *DataInMemory) Iterate(offset int64, batch int64, flowPadding bool) Iterator { - if batch%DefaultChunkSize > 0 { - panic("batch size should align with chunk size") - } - dataSize := int64(len(data.underlying)) - return &MemoryDataIterator{ - data: data, - buf: make([]byte, batch), - offset: int(offset), - dataSize: int(dataSize), - paddedSize: uint(IteratorPaddedSize(dataSize, flowPadding)), - } +func (data *DataInMemory) Offset() int64 { + return data.offset } -type MemoryDataIterator struct { - data *DataInMemory - buf []byte // buffer to read data from file - bufSize int // actual data size in buffer - dataSize int - paddedSize uint - offset int // offset to read data +func (data *DataInMemory) PaddedSize() uint64 { + return data.paddedSize } -var _ Iterator = (*MemoryDataIterator)(nil) - -func (it *MemoryDataIterator) Next() (bool, error) { - // Reject invalid offset - if it.offset < 0 || uint(it.offset) >= it.paddedSize { - return false, nil +func (data *DataInMemory) Split(fragmentSize int64) []IterableData { + fragments := make([]IterableData, 0) + for offset := data.offset; offset < data.offset+data.size; offset += fragmentSize { + size := min(data.size-offset, fragmentSize) + fragment := &DataInMemory{ + underlying: data.underlying, + offset: offset, + size: size, + paddedSize: IteratorPaddedSize(size, true), + } + fragments = append(fragments, fragment) } - - var expectedBufSize int - maxAvailableLength := it.paddedSize - uint(it.offset) - if maxAvailableLength >= uint(len(it.buf)) { - expectedBufSize = len(it.buf) - } else { - expectedBufSize = int(maxAvailableLength) - } - - it.bufSize = 0 - - if it.offset >= it.dataSize { - it.paddingZeros(expectedBufSize) - return true, nil - } - - n := copy(it.buf, it.data.underlying[it.offset:]) - it.offset += int(n) - it.bufSize = n - - if n == expectedBufSize { - return true, nil - } - - it.paddingZeros(expectedBufSize - n) - - return true, nil -} - -func (it *MemoryDataIterator) paddingZeros(length int) { - paddingZeros(it.buf, it.bufSize, length) - it.bufSize += length - it.offset += length -} - -func (it *MemoryDataIterator) Current() []byte { - return it.buf[:it.bufSize] + return fragments } diff --git a/gateway/local_apis.go b/gateway/local_apis.go index a4f730b..91ec596 100644 --- a/gateway/local_apis.go +++ b/gateway/local_apis.go @@ -131,7 +131,7 @@ func uploadLocalFile(c *gin.Context) (interface{}, error) { } defer file.Close() - if _, err := uploader.Upload(context.Background(), file); err != nil { + if _, _, err := uploader.Upload(context.Background(), file); err != nil { return nil, err } diff --git a/indexer/client.go b/indexer/client.go index b43460e..18abf8c 100644 --- a/indexer/client.go +++ b/indexer/client.go @@ -3,6 +3,8 @@ package indexer import ( "context" "fmt" + "io" + "os" "slices" "time" @@ -145,7 +147,7 @@ func (c *Client) Upload(ctx context.Context, w3Client *web3go.Client, data core. if err != nil { return eth_common.Hash{}, err } - txHash, err := uploader.Upload(ctx, data, option...) + txHash, _, err := uploader.Upload(ctx, data, option...) var rpcError *node.RPCError if errors.As(err, &rpcError) { dropped = append(dropped, rpcError.URL) @@ -157,7 +159,7 @@ func (c *Client) Upload(ctx context.Context, w3Client *web3go.Client, data core. } // BatchUpload submit multiple data to 0g storage contract batchly in single on-chain transaction, then transfer the data to the storage nodes selected from indexer service. -func (c *Client) BatchUpload(ctx context.Context, w3Client *web3go.Client, datas []core.IterableData, waitForLogEntry bool, option ...transfer.BatchUploadOption) (eth_common.Hash, []eth_common.Hash, error) { +func (c *Client) BatchUpload(ctx context.Context, w3Client *web3go.Client, datas []core.IterableData, option ...transfer.BatchUploadOption) (eth_common.Hash, []eth_common.Hash, error) { expectedReplica := uint(1) if len(option) > 0 { for _, opt := range option[0].DataOptions { @@ -174,7 +176,7 @@ func (c *Client) BatchUpload(ctx context.Context, w3Client *web3go.Client, datas if err != nil { return eth_common.Hash{}, nil, err } - hash, roots, err := uploader.BatchUpload(ctx, datas, waitForLogEntry, option...) + hash, roots, err := uploader.BatchUpload(ctx, datas, option...) var rpcError *node.RPCError if errors.As(err, &rpcError) { dropped = append(dropped, rpcError.URL) @@ -185,11 +187,10 @@ func (c *Client) BatchUpload(ctx context.Context, w3Client *web3go.Client, datas } } -// Download download file by given data root -func (c *Client) Download(ctx context.Context, root, filename string, withProof bool) error { +func (c *Client) NewDownloaderFromIndexerNodes(ctx context.Context, root string) (*transfer.Downloader, error) { locations, err := c.GetFileLocations(ctx, root) if err != nil { - return errors.WithMessage(err, "failed to get file locations") + return nil, errors.WithMessage(err, "failed to get file locations") } clients := make([]*node.ZgsClient, 0) for _, location := range locations { @@ -206,12 +207,57 @@ func (c *Client) Download(ctx context.Context, root, filename string, withProof clients = append(clients, client) } if len(clients) == 0 { - return fmt.Errorf("no node holding the file found, FindFile triggered, try again later") + return nil, fmt.Errorf("no node holding the file found, FindFile triggered, try again later") } downloader, err := transfer.NewDownloader(clients, c.option.LogOption) if err != nil { - return err + return nil, err } + return downloader, nil +} + +func (c *Client) DownloadFragments(ctx context.Context, roots []string, filename string, withProof bool) error { + outFile, err := os.Create(filename) + if err != nil { + return errors.WithMessage(err, "failed to create output file") + } + defer outFile.Close() + + for _, root := range roots { + tempFile := fmt.Sprintf("%v.temp", root) + downloader, err := c.NewDownloaderFromIndexerNodes(ctx, root) + if err != nil { + return err + } + err = downloader.Download(ctx, root, tempFile, withProof) + if err != nil { + return errors.WithMessage(err, "Failed to download file") + } + inFile, err := os.Open(tempFile) + if err != nil { + return errors.WithMessage(err, fmt.Sprintf("failed to open file %s", tempFile)) + } + _, err = io.Copy(outFile, inFile) + inFile.Close() + if err != nil { + return errors.WithMessage(err, fmt.Sprintf("failed to copy content from temp file %s", tempFile)) + } + + err = os.Remove(tempFile) + if err != nil { + return errors.WithMessage(err, fmt.Sprintf("failed to delete temp file %s:", tempFile)) + } + } + + return nil +} + +// Download download file by given data root +func (c *Client) Download(ctx context.Context, root, filename string, withProof bool) error { + downloader, err := c.NewDownloaderFromIndexerNodes(ctx, root) + if err != nil { + return err + } return downloader.Download(ctx, root, filename, withProof) } diff --git a/kv/batcher.go b/kv/batcher.go index 4b191da..97cb0d7 100644 --- a/kv/batcher.go +++ b/kv/batcher.go @@ -61,7 +61,7 @@ func (b *Batcher) Exec(ctx context.Context, option ...transfer.UploadOption) (co opt = option[0] } opt.Tags = b.buildTags() - txHash, err := uploader.Upload(ctx, data, opt) + txHash, _, err := uploader.Upload(ctx, data, opt) if err != nil { return txHash, errors.WithMessagef(err, "Failed to upload data") } diff --git a/tests/cli_skip_tx_test.py b/tests/cli_skip_tx_test.py index 4ee17dd..10a8541 100644 --- a/tests/cli_skip_tx_test.py +++ b/tests/cli_skip_tx_test.py @@ -54,7 +54,7 @@ def run_test(self): self.nodes[node_idx].rpc_url, None, file_to_upload, - True + skip_tx=True ) wait_until(lambda: self.contract.num_submissions() == 1) assert_equal(w3.eth.get_transaction_count(GENESIS_ACCOUNT.address), nonce) diff --git a/tests/client_test_framework/test_framework.py b/tests/client_test_framework/test_framework.py index 531bf94..3ab1f61 100644 --- a/tests/client_test_framework/test_framework.py +++ b/tests/client_test_framework/test_framework.py @@ -105,6 +105,7 @@ def _upload_file_use_cli( node_rpc_url, indexer_url, file_to_upload, + fragment_size = None, skip_tx = True, ): upload_args = [ @@ -126,6 +127,10 @@ def _upload_file_use_cli( elif indexer_url is not None: upload_args.append("--indexer") upload_args.append(indexer_url) + if fragment_size is not None: + upload_args.append("--fragment-size") + upload_args.append(str(fragment_size)) + upload_args.append("--file") self.log.info("upload file with cli: {}".format(upload_args)) @@ -148,11 +153,10 @@ def _upload_file_use_cli( for line in lines: line = line.decode("utf-8") self.log.debug("line: %s", line) - if "root" in line: - filtered_line = re.sub(r'\x1b\[([0-9,A-Z]{1,2}(;[0-9]{1,2})?(;[0-9]{3})?)?[m|K]?', '', line) - index = filtered_line.find("root=") - if index > 0: - root = filtered_line[index + 5 : index + 5 + 66] + if "root = " in line: + root = line.strip().split("root = ")[1] + if "roots = " in line: + root = line.strip().split("roots = ")[1] except Exception as ex: self.log.error("Failed to upload file via CLI tool, output: %s", output_name) raise ex @@ -167,21 +171,30 @@ def _download_file_use_cli( self, node_rpc_url, indexer_url, - root, + root = None, + roots = None, + file_to_download = None, with_proof = True, + remove = True, ): - file_to_download = os.path.join(self.root_dir, "download_{}_{}".format(root, time.time())) + if file_to_download is None: + file_to_download = os.path.join(self.root_dir, "download_{}_{}".format(root, time.time())) download_args = [ self.cli_binary, "download", "--file", file_to_download, - "--root", - root, "--proof=" + str(with_proof), "--log-level", "debug", ] + if root is not None: + download_args.append("--root") + download_args.append(root) + elif roots is not None: + download_args.append("--roots") + download_args.append(roots) + if node_rpc_url is not None: download_args.append("--node") download_args.append(node_rpc_url) @@ -213,7 +226,8 @@ def _download_file_use_cli( assert return_code == 0, "%s download file failed, output: %s, log: %s" % (self.cli_binary, output_name, lines) - os.remove(file_to_download) + if remove: + os.remove(file_to_download) return diff --git a/tests/go_tests/batch_upload_test/main.go b/tests/go_tests/batch_upload_test/main.go index 16988e6..0d54ed6 100644 --- a/tests/go_tests/batch_upload_test/main.go +++ b/tests/go_tests/batch_upload_test/main.go @@ -46,7 +46,7 @@ func runTest() error { if err != nil { return errors.WithMessage(err, "failed to initialize indexer client") } - _, roots, err := indexerClient.BatchUpload(ctx, w3client, datas, true, transfer.BatchUploadOption{ + _, roots, err := indexerClient.BatchUpload(ctx, w3client, datas, transfer.BatchUploadOption{ TaskSize: 5, DataOptions: opts, }) diff --git a/tests/go_tests/indexer_test/main.go b/tests/go_tests/indexer_test/main.go index 5fad394..0f1aded 100644 --- a/tests/go_tests/indexer_test/main.go +++ b/tests/go_tests/indexer_test/main.go @@ -75,7 +75,7 @@ func runTest() error { return errors.WithMessage(err, "failed to initialize uploader") } - if _, err := uploader.Upload(context.Background(), data, transfer.UploadOption{ + if _, _, err := uploader.Upload(context.Background(), data, transfer.UploadOption{ FinalityRequired: transfer.FileFinalized, }); err != nil { return errors.WithMessage(err, "failed to upload file") diff --git a/tests/splitable_upload_test.py b/tests/splitable_upload_test.py new file mode 100644 index 0000000..200e6fa --- /dev/null +++ b/tests/splitable_upload_test.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 + +import random +import tempfile +import os + +from config.node_config import GENESIS_ACCOUNT +from utility.utils import ( + wait_until, +) +from client_test_framework.test_framework import ClientTestFramework + +def files_are_equal(file1, file2): + if os.path.getsize(file1) != os.path.getsize(file2): + return False + + with open(file1, 'rb') as f1, open(file2, 'rb') as f2: + while True: + chunk1 = f1.read(4096) + chunk2 = f2.read(4096) + + if chunk1 != chunk2: + return False + + if not chunk1: + break + + return True + + +class FileUploadDownloadTest(ClientTestFramework): + def setup_params(self): + self.num_blockchain_nodes = 1 + self.num_nodes = 4 + self.zgs_node_configs[0] = { + "db_max_num_sectors": 2 ** 30, + "shard_position": "0/4" + } + self.zgs_node_configs[1] = { + "db_max_num_sectors": 2 ** 30, + "shard_position": "1/4" + } + self.zgs_node_configs[2] = { + "db_max_num_sectors": 2 ** 30, + "shard_position": "2/4" + } + self.zgs_node_configs[3] = { + "db_max_num_sectors": 2 ** 30, + "shard_position": "3/4" + } + + def run_test(self): + size = 50 * 1024 * 1024 # 50M + file_to_upload = tempfile.NamedTemporaryFile(dir=self.root_dir, delete=False) + data = random.randbytes(size) + + file_to_upload.write(data) + file_to_upload.close() + + roots = self._upload_file_use_cli( + self.blockchain_nodes[0].rpc_url, + GENESIS_ACCOUNT.key, + ','.join([x.rpc_url for x in self.nodes]), + None, + file_to_upload, + fragment_size=1024*1024*3, # 3M, will aligned to 4M + ) + + self.log.info("roots: %s", roots) + wait_until(lambda: self.contract.num_submissions() == 13) + + file_to_download = os.path.join(self.root_dir, "downloaded") + self._download_file_use_cli(','.join([x.rpc_url for x in self.nodes]), None, roots=roots, with_proof=True, file_to_download=file_to_download, remove=False) + assert(files_are_equal(file_to_upload.name, file_to_download)) + +if __name__ == "__main__": + FileUploadDownloadTest().main() diff --git a/transfer/download_parallel.go b/transfer/download_parallel.go index b0639aa..c0d914c 100644 --- a/transfer/download_parallel.go +++ b/transfer/download_parallel.go @@ -29,19 +29,21 @@ type segmentDownloader struct { numChunks uint64 + routines int + logger *logrus.Logger } var _ parallel.Interface = (*segmentDownloader)(nil) -func newSegmentDownloader(clients []*node.ZgsClient, info *node.FileInfo, shardConfigs []*shard.ShardConfig, file *download.DownloadingFile, withProof bool, logger *logrus.Logger) (*segmentDownloader, error) { +func newSegmentDownloader(downloader *Downloader, info *node.FileInfo, shardConfigs []*shard.ShardConfig, file *download.DownloadingFile, withProof bool) (*segmentDownloader, error) { startSegmentIndex := info.Tx.StartEntryIndex / core.DefaultSegmentMaxChunks endSegmentIndex := (info.Tx.StartEntryIndex + core.NumSplits(int64(info.Tx.Size), core.DefaultChunkSize) - 1) / core.DefaultSegmentMaxChunks offset := file.Metadata().Offset / core.DefaultSegmentSize return &segmentDownloader{ - clients: clients, + clients: downloader.clients, shardConfigs: shardConfigs, file: file, txSeq: info.Tx.Seq, @@ -55,14 +57,19 @@ func newSegmentDownloader(clients []*node.ZgsClient, info *node.FileInfo, shardC numChunks: core.NumSplits(int64(info.Tx.Size), core.DefaultChunkSize), - logger: logger, + routines: downloader.routines, + + logger: downloader.logger, }, nil } // Download downloads segments in parallel. func (downloader *segmentDownloader) Download(ctx context.Context) error { numTasks := downloader.endSegmentIndex - downloader.startSegmentIndex + 1 - downloader.offset - return parallel.Serial(ctx, downloader, int(numTasks)) + option := parallel.SerialOption{ + Routines: downloader.routines, + } + return parallel.Serial(ctx, downloader, int(numTasks), option) } // ParallelDo implements the parallel.Interface interface. diff --git a/transfer/downloader.go b/transfer/downloader.go index 51c0158..882575e 100644 --- a/transfer/downloader.go +++ b/transfer/downloader.go @@ -3,7 +3,9 @@ package transfer import ( "context" "fmt" + "io" "os" + "runtime" zg_common "github.com/0glabs/0g-storage-client/common" "github.com/0glabs/0g-storage-client/core" @@ -22,12 +24,15 @@ var ( type IDownloader interface { Download(ctx context.Context, root, filename string, withProof bool) error + DownloadFragments(ctx context.Context, roots []string, filename string, withProof bool) error } // Downloader downloader to download file to storage nodes type Downloader struct { clients []*node.ZgsClient + routines int + logger *logrus.Logger } @@ -40,9 +45,47 @@ func NewDownloader(clients []*node.ZgsClient, opts ...zg_common.LogOption) (*Dow clients: clients, logger: zg_common.NewLogger(opts...), } + downloader.routines = runtime.GOMAXPROCS(0) return downloader, nil } +func (downloader *Downloader) WithRoutines(routines int) *Downloader { + downloader.routines = routines + return downloader +} + +func (downloader *Downloader) DownloadFragments(ctx context.Context, roots []string, filename string, withProof bool) error { + outFile, err := os.Create(filename) + if err != nil { + return errors.WithMessage(err, "failed to create output file") + } + defer outFile.Close() + + for _, root := range roots { + tempFile := fmt.Sprintf("%v.temp", root) + err := downloader.Download(ctx, root, tempFile, withProof) + if err != nil { + return errors.WithMessage(err, "Failed to download file") + } + inFile, err := os.Open(tempFile) + if err != nil { + return errors.WithMessage(err, fmt.Sprintf("failed to open file %s", tempFile)) + } + _, err = io.Copy(outFile, inFile) + inFile.Close() + if err != nil { + return errors.WithMessage(err, fmt.Sprintf("failed to copy content from temp file %s", tempFile)) + } + + err = os.Remove(tempFile) + if err != nil { + return errors.WithMessage(err, fmt.Sprintf("failed to delete temp file %s:", tempFile)) + } + } + + return nil +} + // Download download data from storage nodes. func (downloader *Downloader) Download(ctx context.Context, root, filename string, withProof bool) error { hash := common.HexToHash(root) @@ -127,7 +170,7 @@ func (downloader *Downloader) downloadFile(ctx context.Context, filename string, return err } - sd, err := newSegmentDownloader(downloader.clients, info, shardConfigs, file, withProof, downloader.logger) + sd, err := newSegmentDownloader(downloader, info, shardConfigs, file, withProof) if err != nil { return errors.WithMessage(err, "Failed to create segment downloader") } diff --git a/transfer/uploader.go b/transfer/uploader.go index 1e9ae0c..f9c7159 100644 --- a/transfer/uploader.go +++ b/transfer/uploader.go @@ -30,6 +30,7 @@ import ( // defaultTaskSize is the default number of data segments to upload in a single upload RPC request const defaultTaskSize = uint(10) +const defaultBatchSize = uint(10) var dataAlreadyExistsError = "Invalid params: root; data: already uploaded and finalized" var segmentAlreadyExistsError = "segment has already been uploaded or is being uploaded" @@ -79,10 +80,11 @@ type BatchUploadOption struct { // Uploader uploader to upload file to 0g storage, send on-chain transactions and transfer data to storage nodes. type Uploader struct { - flow *contract.FlowContract // flow contract instance - market *contract.Market // market contract instance - clients []*node.ZgsClient // 0g storage clients - logger *logrus.Logger // logger + flow *contract.FlowContract // flow contract instance + market *contract.Market // market contract instance + clients []*node.ZgsClient // 0g storage clients + routines int // number of go routines for uploading + logger *logrus.Logger // logger } func getShardConfigs(ctx context.Context, clients []*node.ZgsClient) ([]*shard.ShardConfig, error) { @@ -156,9 +158,61 @@ func checkLogExistance(ctx context.Context, clients []*node.ZgsClient, root comm return info, nil } +func (uploader *Uploader) WithRoutines(routines int) *Uploader { + uploader.routines = routines + return uploader +} + +// SplitableUpload submit data to 0g storage contract and large data will be splited to reduce padding cost. +func (uploader *Uploader) SplitableUpload(ctx context.Context, data core.IterableData, fragmentSize int64, option ...UploadOption) ([]common.Hash, []common.Hash, error) { + if fragmentSize < core.DefaultChunkSize { + fragmentSize = core.DefaultChunkSize + } + // align size of fragment to 2 power + fragmentSize = int64(core.NextPow2(uint64(fragmentSize))) + uploader.logger.Infof("fragment size: %v", fragmentSize) + + txHashes := make([]common.Hash, 0) + rootHashes := make([]common.Hash, 0) + if data.Size() <= fragmentSize { + txHash, rootHash, err := uploader.Upload(ctx, data, option...) + if err != nil { + return txHashes, rootHashes, err + } + txHashes = append(txHashes, txHash) + rootHashes = append(rootHashes, rootHash) + } else { + fragments := data.Split(fragmentSize) + uploader.logger.Infof("splitted origin file into %v fragments, %v bytes each.", len(fragments), fragmentSize) + var opt UploadOption + if len(option) > 0 { + opt = option[0] + } + for l := 0; l < len(fragments); l += int(defaultBatchSize) { + r := min(l+int(defaultBatchSize), len(fragments)) + uploader.logger.Infof("batch submitting fragments %v to %v...", l, r) + opts := BatchUploadOption{ + Fee: nil, + Nonce: nil, + DataOptions: make([]UploadOption, 0), + } + for i := l; i < r; i += 1 { + opts.DataOptions = append(opts.DataOptions, opt) + } + txHash, roots, err := uploader.BatchUpload(ctx, fragments[l:r], opts) + if err != nil { + return txHashes, rootHashes, err + } + txHashes = append(txHashes, txHash) + rootHashes = append(rootHashes, roots...) + } + } + return txHashes, rootHashes, nil +} + // BatchUpload submit multiple data to 0g storage contract batchly in single on-chain transaction, then transfer the data to the storage nodes. // The nonce for upload transaction will be the first non-nil nonce in given upload options, the protocol fee is the sum of fees in upload options. -func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.IterableData, waitForLogEntry bool, option ...BatchUploadOption) (common.Hash, []common.Hash, error) { +func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.IterableData, option ...BatchUploadOption) (common.Hash, []common.Hash, error) { stageTimer := time.Now() n := len(datas) @@ -248,7 +302,7 @@ func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.Iterable var receipt *types.Receipt if len(toSubmitDatas) > 0 { var err error - if txHash, receipt, err = uploader.SubmitLogEntry(ctx, toSubmitDatas, toSubmitTags, waitForLogEntry, opts.Nonce, opts.Fee); err != nil { + if txHash, receipt, err = uploader.SubmitLogEntry(ctx, toSubmitDatas, toSubmitTags, opts.Nonce, opts.Fee); err != nil { return txHash, nil, errors.WithMessage(err, "Failed to submit log entry") } // Wait for storage node to retrieve log entry from blockchain @@ -302,7 +356,7 @@ func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.Iterable // Upload submit data to 0g storage contract, then transfer the data to the storage nodes. // returns the submission transaction hash and the hash will be zero if transaction is skipped. -func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, option ...UploadOption) (common.Hash, error) { +func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, option ...UploadOption) (common.Hash, common.Hash, error) { stageTimer := time.Now() var opt UploadOption @@ -319,44 +373,44 @@ func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, op // Calculate file merkle root. tree, err := core.MerkleTree(data) if err != nil { - return common.Hash{}, errors.WithMessage(err, "Failed to create data merkle tree") + return common.Hash{}, common.Hash{}, errors.WithMessage(err, "Failed to create data merkle tree") } uploader.logger.WithField("root", tree.Root()).Info("Data merkle root calculated") // Check existance info, err := checkLogExistance(ctx, uploader.clients, tree.Root()) if err != nil { - return common.Hash{}, errors.WithMessage(err, "Failed to check if skipped log entry available on storage node") + return common.Hash{}, tree.Root(), errors.WithMessage(err, "Failed to check if skipped log entry available on storage node") } txHash := common.Hash{} // Append log on blockchain if !opt.SkipTx || info == nil { var receipt *types.Receipt - txHash, receipt, err = uploader.SubmitLogEntry(ctx, []core.IterableData{data}, [][]byte{opt.Tags}, opt.FinalityRequired <= TransactionPacked, opt.Nonce, opt.Fee) + txHash, receipt, err = uploader.SubmitLogEntry(ctx, []core.IterableData{data}, [][]byte{opt.Tags}, opt.Nonce, opt.Fee) if err != nil { - return txHash, errors.WithMessage(err, "Failed to submit log entry") + return txHash, tree.Root(), errors.WithMessage(err, "Failed to submit log entry") } // Wait for storage node to retrieve log entry from blockchain info, err = uploader.waitForLogEntry(ctx, tree.Root(), TransactionPacked, receipt) if err != nil { - return txHash, errors.WithMessage(err, "Failed to check if log entry available on storage node") + return txHash, tree.Root(), errors.WithMessage(err, "Failed to check if log entry available on storage node") } } // Upload file to storage node if err := uploader.uploadFile(ctx, info, data, tree, opt.ExpectedReplica, opt.TaskSize); err != nil { - return txHash, errors.WithMessage(err, "Failed to upload file") + return txHash, tree.Root(), errors.WithMessage(err, "Failed to upload file") } // Wait for transaction finality if _, err = uploader.waitForLogEntry(ctx, tree.Root(), opt.FinalityRequired, nil); err != nil { - return txHash, errors.WithMessage(err, "Failed to wait for transaction finality on storage node") + return txHash, tree.Root(), errors.WithMessage(err, "Failed to wait for transaction finality on storage node") } uploader.logger.WithField("duration", time.Since(stageTimer)).Info("upload took") - return txHash, nil + return txHash, tree.Root(), nil } func (uploader *Uploader) UploadDir(ctx context.Context, folder string, option ...UploadOption) (txnHash, rootHash common.Hash, _ error) { @@ -394,7 +448,7 @@ func (uploader *Uploader) UploadDir(ctx context.Context, folder string, option . // Upload each file to the storage network. for i := range relPaths { path := filepath.Join(folder, relPaths[i]) - txhash, err := uploader.UploadFile(ctx, path, option...) + txhash, _, err := uploader.UploadFile(ctx, path, option...) if err != nil { return txnHash, rootHash, errors.WithMessagef(err, "failed to upload file %s", path) } @@ -406,7 +460,7 @@ func (uploader *Uploader) UploadDir(ctx context.Context, folder string, option . } // Finally, upload the directory metadata - txnHash, err = uploader.Upload(ctx, iterdata, option...) + txnHash, _, err = uploader.Upload(ctx, iterdata, option...) if err != nil { err = errors.WithMessage(err, "failed to upload directory metadata") } @@ -414,7 +468,7 @@ func (uploader *Uploader) UploadDir(ctx context.Context, folder string, option . return txnHash, rootHash, err } -func (uploader *Uploader) UploadFile(ctx context.Context, path string, option ...UploadOption) (txnHash common.Hash, err error) { +func (uploader *Uploader) UploadFile(ctx context.Context, path string, option ...UploadOption) (txnHash common.Hash, rootHash common.Hash, err error) { file, err := core.Open(path) if err != nil { err = errors.WithMessagef(err, "failed to open file %s", path) @@ -426,7 +480,7 @@ func (uploader *Uploader) UploadFile(ctx context.Context, path string, option .. } // SubmitLogEntry submit the data to 0g storage contract by sending a transaction -func (uploader *Uploader) SubmitLogEntry(ctx context.Context, datas []core.IterableData, tags [][]byte, waitForReceipt bool, nonce *big.Int, fee *big.Int) (common.Hash, *types.Receipt, error) { +func (uploader *Uploader) SubmitLogEntry(ctx context.Context, datas []core.IterableData, tags [][]byte, nonce *big.Int, fee *big.Int) (common.Hash, *types.Receipt, error) { // Construct submission submissions := make([]contract.Submission, len(datas)) for i := 0; i < len(datas); i++ { @@ -498,12 +552,9 @@ func (uploader *Uploader) SubmitLogEntry(ctx context.Context, datas []core.Itera uploader.logger.WithField("hash", tx.Hash().Hex()).Info("Succeeded to send transaction to append log entry") - if waitForReceipt { - // Wait for successful execution - receipt, err := uploader.flow.WaitForReceipt(ctx, tx.Hash(), true) - return tx.Hash(), receipt, err - } - return tx.Hash(), nil, nil + // Wait for successful execution + receipt, err := uploader.flow.WaitForReceipt(ctx, tx.Hash(), true) + return tx.Hash(), receipt, err } // Wait for log entry ready on storage node. @@ -631,7 +682,7 @@ func (uploader *Uploader) uploadFile(ctx context.Context, info *node.FileInfo, d } opt := parallel.SerialOption{ - Routines: min(runtime.GOMAXPROCS(0), len(uploader.clients)*5), + Routines: uploader.routines, } err = parallel.Serial(ctx, segmentUploader, len(segmentUploader.tasks), opt) if err != nil { @@ -768,6 +819,7 @@ func (uploader *FileSegmentUploader) newFileSegmentUploader( uploadTasks = append(uploadTasks, tasks) } } + util.Shuffle(uploadTasks) return &fileSegmentUploader{ FileSegmentsWithProof: fileSeg,