Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: split large files #67

Merged
merged 5 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions cmd/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"runtime"
"time"

"github.com/0glabs/0g-storage-client/common"
Expand All @@ -20,8 +21,11 @@ type downloadArgument struct {
nodes []string

root string
roots []string
proof bool

routines int

timeout time.Duration
}

Expand All @@ -34,9 +38,14 @@ func bindDownloadFlags(cmd *cobra.Command, args *downloadArgument) {
cmd.MarkFlagsOneRequired("indexer", "node")

cmd.Flags().StringVar(&args.root, "root", "", "Merkle root to download file")
cmd.MarkFlagRequired("root")
cmd.Flags().StringSliceVar(&args.roots, "roots", []string{}, "Merkle roots to download fragments")
cmd.MarkFlagsOneRequired("root", "roots")
cmd.MarkFlagsMutuallyExclusive("root", "roots")

cmd.Flags().BoolVar(&args.proof, "proof", false, "Whether to download with merkle proof for validation")

cmd.Flags().IntVar(&args.routines, "routines", runtime.GOMAXPROCS(0), "number of go routines for downloading simutanously")

cmd.Flags().DurationVar(&args.timeout, "timeout", 0, "cli task timeout, 0 for no timeout")
}

Expand Down Expand Up @@ -70,8 +79,14 @@ func download(*cobra.Command, []string) {
}
defer closer()

if err := downloader.Download(ctx, downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil {
logrus.WithError(err).Fatal("Failed to download file")
if downloadArgs.root != "" {
if err := downloader.Download(ctx, downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil {
logrus.WithError(err).Fatal("Failed to download file")
}
} else {
if err := downloader.DownloadFragments(ctx, downloadArgs.roots, downloadArgs.file, downloadArgs.proof); err != nil {
logrus.WithError(err).Fatal("Failed to download file")
}
}
}

Expand Down Expand Up @@ -100,6 +115,7 @@ func newDownloader(args downloadArgument) (transfer.IDownloader, func(), error)
closer()
return nil, nil, err
}
downloader.WithRoutines(downloadArgs.routines)

return downloader, closer, nil
}
22 changes: 21 additions & 1 deletion cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package cmd
import (
"context"
"math/big"
"runtime"
"strings"
"time"

zg_common "github.com/0glabs/0g-storage-client/common"
Expand Down Expand Up @@ -51,6 +53,9 @@ type uploadArgument struct {
skipTx bool
finalityRequired bool
taskSize uint
routines int

fragmentSize int64

timeout time.Duration
}
Expand All @@ -71,6 +76,10 @@ func bindUploadFlags(cmd *cobra.Command, args *uploadArgument) {
cmd.Flags().BoolVar(&args.finalityRequired, "finality-required", false, "Wait for file finality on nodes to upload")
cmd.Flags().UintVar(&args.taskSize, "task-size", 10, "Number of segments to upload in single rpc request")

cmd.Flags().Int64Var(&args.fragmentSize, "fragment-size", 1024*1024*1024*4, "the size of fragment to split into when file is too large")

cmd.Flags().IntVar(&args.routines, "routines", runtime.GOMAXPROCS(0), "number of go routines for uploading simutanously")

cmd.Flags().DurationVar(&args.timeout, "timeout", 0, "cli task timeout, 0 for no timeout")
}

Expand Down Expand Up @@ -136,10 +145,21 @@ func upload(*cobra.Command, []string) {
logrus.WithError(err).Fatal("Failed to initialize uploader")
}
defer closer()
uploader.WithRoutines(uploadArgs.routines)

if _, err := uploader.Upload(ctx, file, opt); err != nil {
_, roots, err := uploader.SplitableUpload(ctx, file, uploadArgs.fragmentSize, opt)
if err != nil {
logrus.WithError(err).Fatal("Failed to upload file")
}
if len(roots) == 1 {
logrus.Infof("file uploaded, root = %v", roots[0])
} else {
s := make([]string, len(roots))
for i, root := range roots {
s[i] = root.String()
}
logrus.Infof("file uploaded in %v fragments, roots = %v", len(roots), strings.Join(s, ","))
}
}

func newUploader(ctx context.Context, segNum uint64, args uploadArgument, w3client *web3go.Client, opt transfer.UploadOption) (*transfer.Uploader, func(), error) {
Expand Down
1 change: 1 addition & 0 deletions cmd/upload_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func uploadDir(*cobra.Command, []string) {
logrus.WithError(err).Fatal("Failed to initialize uploader")
}
defer closer()
uploader.WithRoutines(uploadArgs.routines)

txnHash, rootHash, err := uploader.UploadDir(ctx, uploadDirArgs.file, opt)
if err != nil {
Expand Down
10 changes: 2 additions & 8 deletions common/shard/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package shard

import (
"sort"
"time"

"golang.org/x/exp/rand"
"github.com/0glabs/0g-storage-client/common/util"
)

type ShardConfig struct {
Expand Down Expand Up @@ -130,12 +129,7 @@ func CheckReplica(shardConfigs []*ShardConfig, expectedReplica uint) bool {
// Helper function to pre-process (sort or shuffle) the nodes before selection
func prepareSelectionNodes(nodes []*ShardedNode, random bool) []*ShardedNode {
if random {
// Shuffle the nodes randomly if needed
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))
for i := range nodes {
j := rng.Intn(i + 1)
nodes[i], nodes[j] = nodes[j], nodes[i]
}
util.Shuffle(nodes)
} else {
// Sort nodes based on NumShard and ShardId
sort.Slice(nodes, func(i, j int) bool {
Expand Down
15 changes: 15 additions & 0 deletions common/util/shuffle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package util

import (
"time"

"golang.org/x/exp/rand"
)

func Shuffle[T any](items []T) {
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))
for i := range items {
j := rng.Intn(i + 1)
items[i], items[j] = items[j], items[i]
}
}
3 changes: 2 additions & 1 deletion core/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ var (
type IterableData interface {
NumChunks() uint64
NumSegments() uint64
Offset() int64
Size() int64
PaddedSize() uint64
Iterate(offset int64, batch int64, flowPadding bool) Iterator
Read(buf []byte, offset int64) (int, error)
Split(fragmentSize int64) []IterableData
}

// MerkleTree create merkle tree of the data.
Expand Down
102 changes: 23 additions & 79 deletions core/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ type File struct {
os.FileInfo
underlying *os.File
paddedSize uint64
offset int64
size int64
}

var _ IterableData = (*File)(nil)

func (file *File) Read(buf []byte, offset int64) (int, error) {
n, err := file.underlying.ReadAt(buf, offset)
n, err := file.underlying.ReadAt(buf, file.offset+offset)
// unexpected IO error
if !errors.Is(err, io.EOF) {
return 0, err
Expand Down Expand Up @@ -72,6 +74,8 @@ func Open(name string) (*File, error) {
return &File{
FileInfo: info,
underlying: file,
offset: 0,
size: info.Size(),
paddedSize: IteratorPaddedSize(info.Size(), true),
}, nil
}
Expand Down Expand Up @@ -110,86 +114,26 @@ func (file *File) PaddedSize() uint64 {
return file.paddedSize
}

func (file *File) Iterate(offset int64, batch int64, flowPadding bool) Iterator {
if batch%DefaultChunkSize > 0 {
panic("batch size should align with chunk size")
}
dataSize := file.Size()
return &FileIterator{
file: file.underlying,
buf: make([]byte, batch),
offset: offset,
fileSize: dataSize,
paddedSize: IteratorPaddedSize(dataSize, flowPadding),
}
}

type FileIterator struct {
file *os.File
buf []byte // buffer to read data from file
bufSize int // actual data size in buffer
fileSize int64
paddedSize uint64
offset int64 // offset to read data
}

var _ Iterator = (*FileIterator)(nil)

func (it *FileIterator) Next() (bool, error) {
// Reject invalid offset
if it.offset < 0 || uint64(it.offset) >= it.paddedSize {
return false, nil
}

var expectedBufSize int
maxAvailableLength := it.paddedSize - uint64(it.offset)
if maxAvailableLength >= uint64(len(it.buf)) {
expectedBufSize = len(it.buf)
} else {
expectedBufSize = int(maxAvailableLength)
}

it.clearBuffer()

if it.offset >= it.fileSize {
it.paddingZeros(expectedBufSize)
return true, nil
}

n, err := it.file.ReadAt(it.buf, it.offset)
it.bufSize = n
it.offset += int64(n)

// not reach EOF
if n == expectedBufSize {
return true, nil
}

// unexpected IO error
if !errors.Is(err, io.EOF) {
return false, err
}

if n > expectedBufSize {
// should never happen
panic("load more data from file than expected")
}

it.paddingZeros(expectedBufSize - n)

return true, nil
}

func (it *FileIterator) clearBuffer() {
it.bufSize = 0
func (file *File) Size() int64 {
return file.size
}

func (it *FileIterator) paddingZeros(length int) {
paddingZeros(it.buf, it.bufSize, length)
it.bufSize += length
it.offset += int64(length)
func (file *File) Offset() int64 {
return file.offset
}

func (it *FileIterator) Current() []byte {
return it.buf[:it.bufSize]
func (file *File) Split(fragmentSize int64) []IterableData {
fragments := make([]IterableData, 0)
for offset := file.offset; offset < file.offset+file.size; offset += fragmentSize {
size := min(file.size-offset, fragmentSize)
fragment := &File{
FileInfo: file.FileInfo,
underlying: file.underlying,
offset: offset,
size: size,
paddedSize: IteratorPaddedSize(size, true),
}
fragments = append(fragments, fragment)
}
return fragments
}
4 changes: 2 additions & 2 deletions core/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (flow *Flow) CreateSubmission() (*contract.Submission, error) {
return &submission, nil
}

func nextPow2(input uint64) uint64 {
func NextPow2(input uint64) uint64 {
x := input
x -= 1
x |= x >> 32
Expand All @@ -57,7 +57,7 @@ func nextPow2(input uint64) uint64 {
}

func ComputePaddedSize(chunks uint64) (uint64, uint64) {
chunksNextPow2 := nextPow2(chunks)
chunksNextPow2 := NextPow2(chunks)
if chunksNextPow2 == chunks {
return chunksNextPow2, chunksNextPow2
}
Expand Down
Loading
Loading