diff --git a/cmd/download.go b/cmd/download.go index 7eaf954..a101ed5 100644 --- a/cmd/download.go +++ b/cmd/download.go @@ -1,6 +1,9 @@ package cmd import ( + "context" + + "github.com/0glabs/0g-storage-client/common" "github.com/0glabs/0g-storage-client/node" "github.com/0glabs/0g-storage-client/transfer" "github.com/sirupsen/logrus" @@ -37,12 +40,12 @@ func init() { func download(*cobra.Command, []string) { nodes := node.MustNewClients(downloadArgs.nodes) - downloader, err := transfer.NewDownloader(nodes) + downloader, err := transfer.NewDownloader(nodes, common.LogOption{Logger: logrus.StandardLogger()}) if err != nil { logrus.WithError(err).Fatal("Failed to initialize downloader") } - if err := downloader.Download(downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil { + if err := downloader.Download(context.Background(), downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil { logrus.WithError(err).Fatal("Failed to download file") } } diff --git a/cmd/upload.go b/cmd/upload.go index 99869fe..b06a139 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -1,6 +1,9 @@ package cmd import ( + "context" + + zg_common "github.com/0glabs/0g-storage-client/common" "github.com/0glabs/0g-storage-client/common/blockchain" "github.com/0glabs/0g-storage-client/contract" "github.com/0glabs/0g-storage-client/core" @@ -68,7 +71,7 @@ func upload(*cobra.Command, []string) { defer client.Close() } - uploader, err := transfer.NewUploader(flow, clients) + uploader, err := transfer.NewUploader(flow, clients, zg_common.LogOption{Logger: logrus.StandardLogger()}) if err != nil { logrus.WithError(err).Fatal("Failed to initialize uploader") } @@ -84,7 +87,7 @@ func upload(*cobra.Command, []string) { } defer file.Close() - if err := uploader.Upload(file, opt); err != nil { + if err := uploader.Upload(context.Background(), file, opt); err != nil { logrus.WithError(err).Fatal("Failed to upload file") } } diff --git a/common/blockchain/contract.go b/common/blockchain/contract.go index 6415540..aa25c73 100644 --- a/common/blockchain/contract.go +++ b/common/blockchain/contract.go @@ -12,7 +12,6 @@ import ( "github.com/openweb3/web3go" "github.com/openweb3/web3go/types" "github.com/pkg/errors" - "github.com/sirupsen/logrus" ) var CustomGasPrice uint64 @@ -50,8 +49,6 @@ func Deploy(clientWithSigner *web3go.Client, dataOrFile string) (common.Address, return common.Address{}, errors.WithMessage(err, "Failed to send transaction") } - logrus.WithField("hash", txHash).Info("Transaction sent to blockchain") - receipt, err := WaitForReceipt(clientWithSigner, txHash, true) if err != nil { return common.Address{}, errors.WithMessage(err, "Failed to wait for receipt") diff --git a/common/blockchain/rpc.go b/common/blockchain/rpc.go index bad1d28..88b831e 100644 --- a/common/blockchain/rpc.go +++ b/common/blockchain/rpc.go @@ -20,6 +20,7 @@ var Web3LogEnabled bool type RetryOption struct { Rounds uint Interval time.Duration + logger *logrus.Logger } func MustNewWeb3(url, key string) *web3go.Client { @@ -63,13 +64,13 @@ func WaitForReceipt(client *web3go.Client, txHash common.Hash, successRequired b if len(opts) > 0 { opt = opts[0] } else { - // default infinite wait - opt.Rounds = 0 + // default 10 rounds + opt.Rounds = 10 opt.Interval = time.Second * 3 } var tries uint - reminder := util.NewReminder(logrus.TraceLevel, time.Minute) + reminder := util.NewReminder(opt.logger, time.Minute) for receipt == nil { if tries > opt.Rounds+1 && opt.Rounds != 0 { return nil, errors.New("no receipt after max retries") diff --git a/common/options.go b/common/options.go new file mode 100644 index 0000000..3d2c223 --- /dev/null +++ b/common/options.go @@ -0,0 +1,25 @@ +package common + +import ( + "io" + + "github.com/sirupsen/logrus" +) + +type LogOption struct { + LogLevel logrus.Level + Logger *logrus.Logger +} + +func NewLogger(opt ...LogOption) *logrus.Logger { + logger := logrus.New() + if len(opt) == 0 { + logger.Out = io.Discard + return logger + } + if opt[0].Logger != nil { + return opt[0].Logger + } + logger.SetLevel(opt[0].LogLevel) + return logger +} diff --git a/common/parallel/interface.go b/common/parallel/interface.go index f169d18..0e19a83 100644 --- a/common/parallel/interface.go +++ b/common/parallel/interface.go @@ -1,5 +1,7 @@ package parallel +import "context" + type Result struct { Routine int Task int @@ -8,6 +10,6 @@ type Result struct { } type Interface interface { - ParallelDo(routine, task int) (interface{}, error) + ParallelDo(ctx context.Context, routine, task int) (interface{}, error) ParallelCollect(result *Result) error } diff --git a/common/parallel/serial.go b/common/parallel/serial.go index 6f0b1e9..8ff1f64 100644 --- a/common/parallel/serial.go +++ b/common/parallel/serial.go @@ -5,7 +5,7 @@ import ( "sync" ) -func Serial(parallelizable Interface, tasks, routines, window int) error { +func Serial(ctx context.Context, parallelizable Interface, tasks, routines, window int) error { if tasks == 0 { return nil } @@ -29,7 +29,7 @@ func Serial(parallelizable Interface, tasks, routines, window int) error { defer close(resultCh) var wg sync.WaitGroup - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) // start routines to do tasks for i := 0; i < routines; i++ { @@ -56,7 +56,7 @@ func work(ctx context.Context, routine int, parallelizable Interface, taskCh <-c case <-ctx.Done(): return case task := <-taskCh: - val, err := parallelizable.ParallelDo(routine, task) + val, err := parallelizable.ParallelDo(ctx, routine, task) resultCh <- &Result{routine, task, val, err} if err != nil { return diff --git a/common/parallel/serial_test.go b/common/parallel/serial_test.go index 8cb1908..7365a96 100644 --- a/common/parallel/serial_test.go +++ b/common/parallel/serial_test.go @@ -1,6 +1,7 @@ package parallel import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -11,7 +12,7 @@ type foo struct { result []int } -func (f *foo) ParallelDo(routine, task int) (interface{}, error) { +func (f *foo) ParallelDo(ctx context.Context, routine, task int) (interface{}, error) { return task * task, nil } @@ -30,7 +31,7 @@ func TestSerial(t *testing.T) { tasks := 100 - err := Serial(&f, tasks, 4, 16) + err := Serial(context.Background(), &f, tasks, 4, 16) assert.Nil(t, err) assert.Equal(t, tasks, len(f.result)) diff --git a/common/util/reminder.go b/common/util/reminder.go index 8ff4f6f..e52ff04 100644 --- a/common/util/reminder.go +++ b/common/util/reminder.go @@ -1,6 +1,7 @@ package util import ( + "io" "time" "github.com/sirupsen/logrus" @@ -8,9 +9,9 @@ import ( // Reminder is used for time consuming operations to remind user about progress. type Reminder struct { - start time.Time // start time since last warn - interval time.Duration // interval to warn once - level logrus.Level // log level to remind in general + start time.Time // start time since last warn + interval time.Duration // interval to warn once + logger *logrus.Logger // log level to remind in general } // NewReminder returns a new Reminder instance. @@ -18,15 +19,16 @@ type Reminder struct { // `level`: log level to remind in general. // // `interval`: interval to remind in warning level. -func NewReminder(level logrus.Level, interval time.Duration) *Reminder { - if level < logrus.InfoLevel { - panic("invalid log level to remind in general") +func NewReminder(logger *logrus.Logger, interval time.Duration) *Reminder { + if logger == nil { + logger = logrus.New() + logger.Out = io.Discard } return &Reminder{ start: time.Now(), interval: interval, - level: level, + logger: logger, } } @@ -40,15 +42,15 @@ func (reminder *Reminder) Remind(message string, fields ...logrus.Fields) { if time.Since(reminder.start) > reminder.interval { reminder.remind(logrus.WarnLevel, message, fields...) reminder.start = time.Now() - } else if logrus.IsLevelEnabled(reminder.level) { - reminder.remind(reminder.level, message, fields...) + } else { + reminder.remind(reminder.logger.Level, message, fields...) } } func (reminder *Reminder) remind(level logrus.Level, message string, fields ...logrus.Fields) { if len(fields) > 0 { - logrus.WithFields(fields[0]).Log(level, message) + reminder.logger.WithFields(fields[0]).Log(level, message) } else { - logrus.StandardLogger().Log(level, message) + reminder.logger.Log(level, message) } } diff --git a/core/dataflow.go b/core/dataflow.go index f11256c..c7e8e18 100644 --- a/core/dataflow.go +++ b/core/dataflow.go @@ -1,15 +1,14 @@ package core import ( + "context" "errors" "runtime" - "time" "github.com/0glabs/0g-storage-client/common/parallel" "github.com/0glabs/0g-storage-client/core/merkle" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - "github.com/sirupsen/logrus" ) const ( @@ -38,7 +37,6 @@ type IterableData interface { } func MerkleTree(data IterableData) (*merkle.Tree, error) { - stageTimer := time.Now() var builder merkle.TreeBuilder initializer := &TreeBuilderInitializer{ data: data, @@ -47,13 +45,11 @@ func MerkleTree(data IterableData) (*merkle.Tree, error) { builder: &builder, } - err := parallel.Serial(initializer, NumSegmentsPadded(data), runtime.GOMAXPROCS(0), 0) + err := parallel.Serial(context.Background(), initializer, NumSegmentsPadded(data), runtime.GOMAXPROCS(0), 0) if err != nil { return nil, err } - logrus.WithField("duration", time.Since(stageTimer)).Info("create segment root took") - return builder.Build(), nil } diff --git a/core/flow.go b/core/flow.go index 741f397..b81a77d 100644 --- a/core/flow.go +++ b/core/flow.go @@ -1,11 +1,12 @@ package core import ( + "context" "math" "math/big" "runtime" - "time" + "github.com/0glabs/0g-storage-client/common" "github.com/0glabs/0g-storage-client/common/parallel" "github.com/0glabs/0g-storage-client/contract" "github.com/0glabs/0g-storage-client/core/merkle" @@ -15,10 +16,12 @@ import ( type Flow struct { data IterableData tags []byte + + logger *logrus.Logger } -func NewFlow(data IterableData, tags []byte) *Flow { - return &Flow{data, tags} +func NewFlow(data IterableData, tags []byte, opts ...common.LogOption) *Flow { + return &Flow{data: data, tags: tags, logger: common.NewLogger(opts...)} } func (flow *Flow) CreateSubmission() (*contract.Submission, error) { @@ -28,7 +31,6 @@ func (flow *Flow) CreateSubmission() (*contract.Submission, error) { Tags: flow.tags, } - stageTimer := time.Now() var offset int64 for _, chunks := range flow.splitNodes() { node, err := flow.createNode(offset, chunks) @@ -38,7 +40,6 @@ func (flow *Flow) CreateSubmission() (*contract.Submission, error) { submission.Nodes = append(submission.Nodes, *node) offset += chunks * DefaultChunkSize } - logrus.WithField("duration", time.Since(stageTimer)).Info("create submission nodes took") return &submission, nil } @@ -88,7 +89,7 @@ func (flow *Flow) splitNodes() []int64 { } nextChunkSize /= 2 } - logrus.WithFields(logrus.Fields{ + flow.logger.WithFields(logrus.Fields{ "chunks": chunks, "nodeSize": nodes, }).Debug("SplitNodes") @@ -114,7 +115,7 @@ func (flow *Flow) createSegmentNode(offset, batch, size int64) (*contract.Submis builder: &builder, } - err := parallel.Serial(initializer, int((size-1)/batch+1), runtime.GOMAXPROCS(0), 0) + err := parallel.Serial(context.Background(), initializer, int((size-1)/batch+1), runtime.GOMAXPROCS(0), 0) if err != nil { return nil, err } diff --git a/core/tree_builder_initializer.go b/core/tree_builder_initializer.go index 5cb8c12..249dd24 100644 --- a/core/tree_builder_initializer.go +++ b/core/tree_builder_initializer.go @@ -1,6 +1,8 @@ package core import ( + "context" + "github.com/0glabs/0g-storage-client/common/parallel" "github.com/0glabs/0g-storage-client/core/merkle" "github.com/ethereum/go-ethereum/common" @@ -22,7 +24,7 @@ func (t *TreeBuilderInitializer) ParallelCollect(result *parallel.Result) error } // ParallelDo implements parallel.Interface. -func (t *TreeBuilderInitializer) ParallelDo(routine int, task int) (interface{}, error) { +func (t *TreeBuilderInitializer) ParallelDo(ctx context.Context, routine int, task int) (interface{}, error) { offset := t.offset + int64(task)*t.batch buf, err := ReadAt(t.data, int(t.batch), offset, t.data.PaddedSize()) if err != nil { diff --git a/example/kv_iterator/main.go b/example/kv_iterator/main.go index e0c57eb..7058663 100644 --- a/example/kv_iterator/main.go +++ b/example/kv_iterator/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "github.com/0glabs/0g-storage-client/kv" @@ -11,6 +12,7 @@ import ( const KvClientAddr = "http://127.0.0.1:6789" func main() { + ctx := context.Background() client, err := node.NewClient(KvClientAddr) if err != nil { fmt.Println(err) @@ -24,28 +26,28 @@ func main() { iter := kvClient.NewIterator(streamId) fmt.Println("begin to end:") - iter.SeekToFirst() + iter.SeekToFirst(ctx) for iter.Valid() { pair := iter.KeyValue() fmt.Printf("%v: %v\n", string(pair.Key), string(pair.Data)) - iter.Next() + iter.Next(ctx) } fmt.Println("end to begin:") - iter.SeekToLast() + iter.SeekToLast(ctx) for iter.Valid() { pair := iter.KeyValue() fmt.Printf("%v: %v\n", string(pair.Key), string(pair.Data)) - iter.Prev() + iter.Prev(ctx) } fmt.Printf("seek before %v\n", string(key1)) - iter.SeekBefore(key1) + iter.SeekBefore(ctx, key1) pair := iter.KeyValue() fmt.Printf("%v: %v\n", string(pair.Key), string(pair.Data)) fmt.Printf("seek after %v\n", string(key0)) - iter.SeekAfter(key0) + iter.SeekAfter(ctx, key0) pair = iter.KeyValue() fmt.Printf("%v: %v\n", string(pair.Key), string(pair.Data)) } diff --git a/example/kv_put/main.go b/example/kv_put/main.go index b9ced21..346ab72 100644 --- a/example/kv_put/main.go +++ b/example/kv_put/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "github.com/0glabs/0g-storage-client/common/blockchain" @@ -39,7 +40,7 @@ func main() { []byte("TESTKEY1"), []byte{74, 75, 76, 77, 78}, ) - err = batcher.Exec() + err = batcher.Exec(context.Background()) if err != nil { fmt.Println(err) return diff --git a/example/kv_query/main.go b/example/kv_query/main.go index 99195c1..404e0e5 100644 --- a/example/kv_query/main.go +++ b/example/kv_query/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "github.com/0glabs/0g-storage-client/kv" @@ -11,6 +12,7 @@ import ( const KvClientAddr = "http://127.0.0.1:6789" func main() { + ctx := context.Background() client, err := node.NewClient(KvClientAddr) if err != nil { fmt.Println(err) @@ -23,17 +25,17 @@ func main() { account := ethCommon.HexToAddress("0x578dd2bfc41bb66e9f0ae0802c613996440c9597") kvClient := kv.NewClient(client, nil) - val, _ := kvClient.GetValue(streamId, key) + val, _ := kvClient.GetValue(ctx, streamId, key) fmt.Println(string(val.Data)) - val, _ = kvClient.GetValue(streamId, key1) + val, _ = kvClient.GetValue(ctx, streamId, key1) fmt.Println(string(val.Data)) - val, _ = kvClient.GetValue(streamId, key2) + val, _ = kvClient.GetValue(ctx, streamId, key2) fmt.Println(val) - fmt.Println(kvClient.GetTransactionResult(2)) - fmt.Println(kvClient.GetHoldingStreamIds()) - fmt.Println(kvClient.HasWritePermission(account, streamId, key)) - fmt.Println(kvClient.IsAdmin(account, streamId)) - fmt.Println(kvClient.IsSpecialKey(streamId, key)) - fmt.Println(kvClient.IsWriterOfKey(account, streamId, key)) - fmt.Println(kvClient.IsWriterOfStream(account, streamId)) + fmt.Println(kvClient.GetTransactionResult(ctx, 2)) + fmt.Println(kvClient.GetHoldingStreamIds(ctx)) + fmt.Println(kvClient.HasWritePermission(ctx, account, streamId, key)) + fmt.Println(kvClient.IsAdmin(ctx, account, streamId)) + fmt.Println(kvClient.IsSpecialKey(ctx, streamId, key)) + fmt.Println(kvClient.IsWriterOfKey(ctx, account, streamId, key)) + fmt.Println(kvClient.IsWriterOfStream(ctx, account, streamId)) } diff --git a/gateway/local_apis.go b/gateway/local_apis.go index 361cc0c..36ea38b 100644 --- a/gateway/local_apis.go +++ b/gateway/local_apis.go @@ -1,13 +1,16 @@ package gateway import ( + "context" "path/filepath" + zg_common "github.com/0glabs/0g-storage-client/common" "github.com/0glabs/0g-storage-client/core" "github.com/0glabs/0g-storage-client/node" "github.com/0glabs/0g-storage-client/transfer" "github.com/ethereum/go-ethereum/common" "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" ) var LocalFileRepo string = "." @@ -78,7 +81,7 @@ func getFileStatus(c *gin.Context) (interface{}, error) { var notFinalized bool for _, client := range allClients { - info, err := client.ZeroGStorage().GetFileInfo(root) + info, err := client.ZeroGStorage().GetFileInfo(context.Background(), root) if err != nil { return nil, err } @@ -114,7 +117,7 @@ func uploadLocalFile(c *gin.Context) (interface{}, error) { return nil, ErrValidation.WithData("node index out of bound") } - uploader, err := transfer.NewUploaderLight([]*node.Client{allClients[input.Node]}) + uploader, err := transfer.NewUploader(nil, []*node.Client{allClients[input.Node]}, zg_common.LogOption{Logger: logrus.StandardLogger()}) if err != nil { return nil, ErrValidation.WithData(err) } @@ -128,7 +131,7 @@ func uploadLocalFile(c *gin.Context) (interface{}, error) { } defer file.Close() - if err := uploader.Upload(file); err != nil { + if err := uploader.Upload(context.Background(), file); err != nil { return nil, err } @@ -150,14 +153,14 @@ func downloadFileLocal(c *gin.Context) (interface{}, error) { return nil, ErrValidation.WithData("node index out of bound") } - downloader, err := transfer.NewDownloader([]*node.Client{allClients[input.Node]}) + downloader, err := transfer.NewDownloader([]*node.Client{allClients[input.Node]}, zg_common.LogOption{Logger: logrus.StandardLogger()}) if err != nil { return nil, err } filename := getFilePath(input.Path, true) - if err := downloader.Download(input.Root, filename, false); err != nil { + if err := downloader.Download(context.Background(), input.Root, filename, false); err != nil { return nil, err } diff --git a/indexer/api.go b/indexer/api.go index b9e631f..9cb9d20 100644 --- a/indexer/api.go +++ b/indexer/api.go @@ -1,6 +1,8 @@ package indexer import ( + "context" + "github.com/0glabs/0g-storage-client/node" "github.com/pkg/errors" ) @@ -17,11 +19,11 @@ func NewIndexerApi(nodes []*node.Client) *IndexerApi { return &IndexerApi{"indexer", nodes} } -func (api *IndexerApi) GetNodes() ([]ShardedNode, error) { +func (api *IndexerApi) GetNodes(ctx context.Context) ([]ShardedNode, error) { var result []ShardedNode for _, v := range api.nodes { - config, err := v.ZeroGStorage().GetShardConfig() + config, err := v.ZeroGStorage().GetShardConfig(ctx) if err != nil { return nil, errors.WithMessage(err, "Failed to query shard config from storage node") } diff --git a/indexer/client.go b/indexer/client.go index f12dca8..c37b5a0 100644 --- a/indexer/client.go +++ b/indexer/client.go @@ -30,7 +30,7 @@ func NewClient(url string, option ...providers.Option) (*Client, error) { }, nil } -func (c *Client) GetNodes() (nodes []ShardedNode, err error) { - err = c.Provider.CallContext(context.Background(), &nodes, "indexer_getNodes") +func (c *Client) GetNodes(ctx context.Context) (nodes []ShardedNode, err error) { + err = c.Provider.CallContext(ctx, &nodes, "indexer_getNodes") return } diff --git a/indexer/types.go b/indexer/types.go index 27d90e7..b7938bc 100644 --- a/indexer/types.go +++ b/indexer/types.go @@ -1,6 +1,7 @@ package indexer import ( + "context" "sort" "github.com/0glabs/0g-storage-client/node" @@ -12,7 +13,7 @@ type ShardedNode struct { } type Interface interface { - GetNodes() ([]ShardedNode, error) + GetNodes(ctx context.Context) ([]ShardedNode, error) } type shardSegmentTreeNode struct { diff --git a/kv/Iterator.go b/kv/Iterator.go index f3b8840..0c81f48 100644 --- a/kv/Iterator.go +++ b/kv/Iterator.go @@ -1,6 +1,7 @@ package kv import ( + "context" "errors" "github.com/0glabs/0g-storage-client/node" @@ -24,12 +25,12 @@ func (iter *Iterator) KeyValue() *node.KeyValue { return iter.currentPair } -func (iter *Iterator) move(kv *node.KeyValue) error { +func (iter *Iterator) move(ctx context.Context, kv *node.KeyValue) error { if kv == nil { iter.currentPair = nil return nil } - value, err := iter.client.GetValue(iter.streamId, kv.Key, iter.version) + value, err := iter.client.GetValue(ctx, iter.streamId, kv.Key, iter.version) if err != nil { return err } @@ -42,56 +43,56 @@ func (iter *Iterator) move(kv *node.KeyValue) error { return nil } -func (iter *Iterator) SeekBefore(key []byte) error { - kv, err := iter.client.GetPrev(iter.streamId, key, 0, 0, true, iter.version) +func (iter *Iterator) SeekBefore(ctx context.Context, key []byte) error { + kv, err := iter.client.GetPrev(ctx, iter.streamId, key, 0, 0, true, iter.version) if err != nil { return err } - return iter.move(kv) + return iter.move(ctx, kv) } -func (iter *Iterator) SeekAfter(key []byte) error { - kv, err := iter.client.GetNext(iter.streamId, key, 0, 0, true, iter.version) +func (iter *Iterator) SeekAfter(ctx context.Context, key []byte) error { + kv, err := iter.client.GetNext(ctx, iter.streamId, key, 0, 0, true, iter.version) if err != nil { return err } - return iter.move(kv) + return iter.move(ctx, kv) } -func (iter *Iterator) SeekToFirst() error { - kv, err := iter.client.GetFirst(iter.streamId, 0, 0, iter.version) +func (iter *Iterator) SeekToFirst(ctx context.Context) error { + kv, err := iter.client.GetFirst(ctx, iter.streamId, 0, 0, iter.version) if err != nil { return err } - return iter.move(kv) + return iter.move(ctx, kv) } -func (iter *Iterator) SeekToLast() error { - kv, err := iter.client.GetLast(iter.streamId, 0, 0, iter.version) +func (iter *Iterator) SeekToLast(ctx context.Context) error { + kv, err := iter.client.GetLast(ctx, iter.streamId, 0, 0, iter.version) if err != nil { return err } - return iter.move(kv) + return iter.move(ctx, kv) } -func (iter *Iterator) Next() error { +func (iter *Iterator) Next(ctx context.Context) error { if !iter.Valid() { return errIteratorInvalid } - kv, err := iter.client.GetNext(iter.streamId, iter.currentPair.Key, 0, 0, false, iter.version) + kv, err := iter.client.GetNext(ctx, iter.streamId, iter.currentPair.Key, 0, 0, false, iter.version) if err != nil { return err } - return iter.move(kv) + return iter.move(ctx, kv) } -func (iter *Iterator) Prev() error { +func (iter *Iterator) Prev(ctx context.Context) error { if !iter.Valid() { return errIteratorInvalid } - kv, err := iter.client.GetPrev(iter.streamId, iter.currentPair.Key, 0, 0, false, iter.version) + kv, err := iter.client.GetPrev(ctx, iter.streamId, iter.currentPair.Key, 0, 0, false, iter.version) if err != nil { return err } - return iter.move(kv) + return iter.move(ctx, kv) } diff --git a/kv/client.go b/kv/client.go index 2e42eca..5e2e78c 100644 --- a/kv/client.go +++ b/kv/client.go @@ -1,14 +1,17 @@ package kv import ( + "context" "math" + zg_common "github.com/0glabs/0g-storage-client/common" "github.com/0glabs/0g-storage-client/contract" "github.com/0glabs/0g-storage-client/core" "github.com/0glabs/0g-storage-client/node" "github.com/0glabs/0g-storage-client/transfer" "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" + "github.com/sirupsen/logrus" ) // Client is used for users to communicate with server for kv operations. @@ -42,7 +45,7 @@ func (c *Client) NewIterator(streamId common.Hash, version ...uint64) *Iterator } } -func (c *Client) GetValue(streamId common.Hash, key []byte, version ...uint64) (val *node.Value, err error) { +func (c *Client) GetValue(ctx context.Context, streamId common.Hash, key []byte, version ...uint64) (val *node.Value, err error) { var v uint64 v = math.MaxUint64 if len(version) > 0 { @@ -55,7 +58,7 @@ func (c *Client) GetValue(streamId common.Hash, key []byte, version ...uint64) ( } for { var seg *node.Value - seg, err = c.node.KV().GetValue(streamId, key, uint64(len(val.Data)), maxQuerySize, val.Version) + seg, err = c.node.KV().GetValue(ctx, streamId, key, uint64(len(val.Data)), maxQuerySize, val.Version) if err != nil { return } @@ -74,68 +77,70 @@ func (c *Client) GetValue(streamId common.Hash, key []byte, version ...uint64) ( } // Get returns paginated value for the specified stream key and offset. -func (c *Client) Get(streamId common.Hash, key []byte, startIndex, length uint64, version ...uint64) (val *node.Value, err error) { - return c.node.KV().GetValue(streamId, key, startIndex, length, version...) +func (c *Client) Get(ctx context.Context, streamId common.Hash, key []byte, startIndex, length uint64, version ...uint64) (val *node.Value, err error) { + return c.node.KV().GetValue(ctx, streamId, key, startIndex, length, version...) } -func (c *Client) GetNext(streamId common.Hash, key []byte, startIndex, length uint64, inclusive bool, version ...uint64) (val *node.KeyValue, err error) { - return c.node.KV().GetNext(streamId, key, startIndex, length, inclusive, version...) +func (c *Client) GetNext(ctx context.Context, streamId common.Hash, key []byte, startIndex, length uint64, inclusive bool, version ...uint64) (val *node.KeyValue, err error) { + return c.node.KV().GetNext(ctx, streamId, key, startIndex, length, inclusive, version...) } -func (c *Client) GetPrev(streamId common.Hash, key []byte, startIndex, length uint64, inclusive bool, version ...uint64) (val *node.KeyValue, err error) { - return c.node.KV().GetPrev(streamId, key, startIndex, length, inclusive, version...) +func (c *Client) GetPrev(ctx context.Context, streamId common.Hash, key []byte, startIndex, length uint64, inclusive bool, version ...uint64) (val *node.KeyValue, err error) { + return c.node.KV().GetPrev(ctx, streamId, key, startIndex, length, inclusive, version...) } -func (c *Client) GetFirst(streamId common.Hash, startIndex, length uint64, version ...uint64) (val *node.KeyValue, err error) { - return c.node.KV().GetFirst(streamId, startIndex, length, version...) +func (c *Client) GetFirst(ctx context.Context, streamId common.Hash, startIndex, length uint64, version ...uint64) (val *node.KeyValue, err error) { + return c.node.KV().GetFirst(ctx, streamId, startIndex, length, version...) } -func (c *Client) GetLast(streamId common.Hash, startIndex, length uint64, version ...uint64) (val *node.KeyValue, err error) { - return c.node.KV().GetLast(streamId, startIndex, length, version...) +func (c *Client) GetLast(ctx context.Context, streamId common.Hash, startIndex, length uint64, version ...uint64) (val *node.KeyValue, err error) { + return c.node.KV().GetLast(ctx, streamId, startIndex, length, version...) } -func (c *Client) GetTransactionResult(txSeq uint64) (result string, err error) { - return c.node.KV().GetTransactionResult(txSeq) +func (c *Client) GetTransactionResult(ctx context.Context, txSeq uint64) (result string, err error) { + return c.node.KV().GetTransactionResult(ctx, txSeq) } -func (c *Client) GetHoldingStreamIds() (streamIds []common.Hash, err error) { - return c.node.KV().GetHoldingStreamIds() +func (c *Client) GetHoldingStreamIds(ctx context.Context) (streamIds []common.Hash, err error) { + return c.node.KV().GetHoldingStreamIds(ctx) } -func (c *Client) HasWritePermission(account common.Address, streamId common.Hash, key []byte, version ...uint64) (hasPermission bool, err error) { - return c.node.KV().HasWritePermission(account, streamId, key, version...) +func (c *Client) HasWritePermission(ctx context.Context, account common.Address, streamId common.Hash, key []byte, version ...uint64) (hasPermission bool, err error) { + return c.node.KV().HasWritePermission(ctx, account, streamId, key, version...) } -func (c *Client) IsAdmin(account common.Address, streamId common.Hash, version ...uint64) (isAdmin bool, err error) { - return c.node.KV().IsAdmin(account, streamId, version...) +func (c *Client) IsAdmin(ctx context.Context, account common.Address, streamId common.Hash, version ...uint64) (isAdmin bool, err error) { + return c.node.KV().IsAdmin(ctx, account, streamId, version...) } -func (c *Client) IsSpecialKey(streamId common.Hash, key []byte, version ...uint64) (isSpecialKey bool, err error) { - return c.node.KV().IsSpecialKey(streamId, key, version...) +func (c *Client) IsSpecialKey(ctx context.Context, streamId common.Hash, key []byte, version ...uint64) (isSpecialKey bool, err error) { + return c.node.KV().IsSpecialKey(ctx, streamId, key, version...) } -func (c *Client) IsWriterOfKey(account common.Address, streamId common.Hash, key []byte, version ...uint64) (isWriter bool, err error) { - return c.node.KV().IsWriterOfKey(account, streamId, key, version...) +func (c *Client) IsWriterOfKey(ctx context.Context, account common.Address, streamId common.Hash, key []byte, version ...uint64) (isWriter bool, err error) { + return c.node.KV().IsWriterOfKey(ctx, account, streamId, key, version...) } -func (c *Client) IsWriterOfStream(account common.Address, streamId common.Hash, version ...uint64) (isWriter bool, err error) { - return c.node.KV().IsWriterOfStream(account, streamId, version...) +func (c *Client) IsWriterOfStream(ctx context.Context, account common.Address, streamId common.Hash, version ...uint64) (isWriter bool, err error) { + return c.node.KV().IsWriterOfStream(ctx, account, streamId, version...) } // Batcher returns a Batcher instance for kv operations in batch. -func (c *Client) Batcher() *Batcher { - return newBatcher(math.MaxUint64, c) +func (c *Client) Batcher(opts ...zg_common.LogOption) *Batcher { + return newBatcher(math.MaxUint64, c, opts...) } type Batcher struct { *StreamDataBuilder client *Client + logger *logrus.Logger } -func newBatcher(version uint64, client *Client) *Batcher { +func newBatcher(version uint64, client *Client, opts ...zg_common.LogOption) *Batcher { return &Batcher{ StreamDataBuilder: NewStreamDataBuilder(version), client: client, + logger: zg_common.NewLogger(opts...), } } @@ -143,7 +148,7 @@ func newBatcher(version uint64, client *Client) *Batcher { // // Note, this is a time consuming operation, e.g. several seconds or even longer. // When it comes to a time sentitive context, it should be executed in a separate go-routine. -func (b *Batcher) Exec() error { +func (b *Batcher) Exec(ctx context.Context) error { // build stream data streamData, err := b.Build() if err != nil { @@ -160,7 +165,7 @@ func (b *Batcher) Exec() error { } // upload file - uploader, err := transfer.NewUploader(b.client.flow, []*node.Client{b.client.node}) + uploader, err := transfer.NewUploader(b.client.flow, []*node.Client{b.client.node}, zg_common.LogOption{Logger: b.logger}) if err != nil { return err } @@ -168,7 +173,7 @@ func (b *Batcher) Exec() error { Tags: b.BuildTags(), Force: true, } - if err = uploader.Upload(data, opt); err != nil { + if err = uploader.Upload(ctx, data, opt); err != nil { return errors.WithMessagef(err, "Failed to upload data") } return nil diff --git a/node/client.go b/node/client.go index c8bb42f..015ece6 100644 --- a/node/client.go +++ b/node/client.go @@ -79,43 +79,46 @@ type ZeroGStorageClient struct { provider *providers.MiddlewarableProvider } -func (c *ZeroGStorageClient) GetStatus() (status Status, err error) { - err = c.provider.CallContext(context.Background(), &status, "zgs_getStatus") +func (c *ZeroGStorageClient) GetStatus(ctx context.Context) (status Status, err error) { + err = c.provider.CallContext(ctx, &status, "zgs_getStatus") return } -func (c *ZeroGStorageClient) GetFileInfo(root common.Hash) (file *FileInfo, err error) { - err = c.provider.CallContext(context.Background(), &file, "zgs_getFileInfo", root) +func (c *ZeroGStorageClient) GetFileInfo(ctx context.Context, root common.Hash) (file *FileInfo, err error) { + err = c.provider.CallContext(ctx, &file, "zgs_getFileInfo", root) return } -func (c *ZeroGStorageClient) GetFileInfoByTxSeq(txSeq uint64) (file *FileInfo, err error) { - err = c.provider.CallContext(context.Background(), &file, "zgs_getFileInfoByTxSeq", txSeq) +func (c *ZeroGStorageClient) GetFileInfoByTxSeq(ctx context.Context, txSeq uint64) (file *FileInfo, err error) { + err = c.provider.CallContext(ctx, &file, "zgs_getFileInfoByTxSeq", txSeq) return } -func (c *ZeroGStorageClient) UploadSegment(segment SegmentWithProof) (ret int, err error) { - err = c.provider.CallContext(context.Background(), &ret, "zgs_uploadSegment", segment) +func (c *ZeroGStorageClient) UploadSegment(ctx context.Context, segment SegmentWithProof) (ret int, err error) { + err = c.provider.CallContext(ctx, &ret, "zgs_uploadSegment", segment) return } -func (c *ZeroGStorageClient) UploadSegments(segments []SegmentWithProof) (ret int, err error) { - err = c.provider.CallContext(context.Background(), &ret, "zgs_uploadSegments", segments) +func (c *ZeroGStorageClient) UploadSegments(ctx context.Context, segments []SegmentWithProof) (ret int, err error) { + err = c.provider.CallContext(ctx, &ret, "zgs_uploadSegments", segments) return } -func (c *ZeroGStorageClient) DownloadSegment(root common.Hash, startIndex, endIndex uint64) (data []byte, err error) { - err = c.provider.CallContext(context.Background(), &data, "zgs_downloadSegment", root, startIndex, endIndex) +func (c *ZeroGStorageClient) DownloadSegment(ctx context.Context, root common.Hash, startIndex, endIndex uint64) (data []byte, err error) { + err = c.provider.CallContext(ctx, &data, "zgs_downloadSegment", root, startIndex, endIndex) + if len(data) == 0 { + return nil, err + } return } -func (c *ZeroGStorageClient) DownloadSegmentWithProof(root common.Hash, index uint64) (segment *SegmentWithProof, err error) { - err = c.provider.CallContext(context.Background(), &segment, "zgs_downloadSegmentWithProof", root, index) +func (c *ZeroGStorageClient) DownloadSegmentWithProof(ctx context.Context, root common.Hash, index uint64) (segment *SegmentWithProof, err error) { + err = c.provider.CallContext(ctx, &segment, "zgs_downloadSegmentWithProof", root, index) return } -func (c *ZeroGStorageClient) GetShardConfig() (shardConfig ShardConfig, err error) { - err = c.provider.CallContext(context.Background(), &shardConfig, "zgs_getShardConfig") +func (c *ZeroGStorageClient) GetShardConfig(ctx context.Context) (shardConfig ShardConfig, err error) { + err = c.provider.CallContext(ctx, &shardConfig, "zgs_getShardConfig") return } @@ -124,17 +127,17 @@ type AdminClient struct { provider *providers.MiddlewarableProvider } -func (c *AdminClient) Shutdown() (ret int, err error) { - err = c.provider.CallContext(context.Background(), &ret, "admin_shutdown") +func (c *AdminClient) Shutdown(ctx context.Context) (ret int, err error) { + err = c.provider.CallContext(ctx, &ret, "admin_shutdown") return } -func (c *AdminClient) StartSyncFile(txSeq uint64) (ret int, err error) { - err = c.provider.CallContext(context.Background(), &ret, "admin_startSyncFile", txSeq) +func (c *AdminClient) StartSyncFile(ctx context.Context, txSeq uint64) (ret int, err error) { + err = c.provider.CallContext(ctx, &ret, "admin_startSyncFile", txSeq) return } -func (c *AdminClient) GetSyncStatus(txSeq uint64) (status string, err error) { - err = c.provider.CallContext(context.Background(), &status, "admin_getSyncStatus", txSeq) +func (c *AdminClient) GetSyncStatus(ctx context.Context, txSeq uint64) (status string, err error) { + err = c.provider.CallContext(ctx, &status, "admin_getSyncStatus", txSeq) return } diff --git a/node/client_kv.go b/node/client_kv.go index 632849f..7ad82c4 100644 --- a/node/client_kv.go +++ b/node/client_kv.go @@ -11,102 +11,102 @@ type KvClient struct { provider *providers.MiddlewarableProvider } -func (c *KvClient) GetValue(streamId common.Hash, key []byte, startIndex, length uint64, version ...uint64) (val *Value, err error) { +func (c *KvClient) GetValue(ctx context.Context, streamId common.Hash, key []byte, startIndex, length uint64, version ...uint64) (val *Value, err error) { args := []interface{}{streamId, key, startIndex, length} if len(version) > 0 { args = append(args, version[0]) } - err = c.provider.CallContext(context.Background(), &val, "kv_getValue", args...) + err = c.provider.CallContext(ctx, &val, "kv_getValue", args...) return } -func (c *KvClient) GetNext(streamId common.Hash, key []byte, startIndex, length uint64, inclusive bool, version ...uint64) (val *KeyValue, err error) { +func (c *KvClient) GetNext(ctx context.Context, streamId common.Hash, key []byte, startIndex, length uint64, inclusive bool, version ...uint64) (val *KeyValue, err error) { args := []interface{}{streamId, key, startIndex, length, inclusive} if len(version) > 0 { args = append(args, version[0]) } - err = c.provider.CallContext(context.Background(), &val, "kv_getNext", args...) + err = c.provider.CallContext(ctx, &val, "kv_getNext", args...) return } -func (c *KvClient) GetPrev(streamId common.Hash, key []byte, startIndex, length uint64, inclusive bool, version ...uint64) (val *KeyValue, err error) { +func (c *KvClient) GetPrev(ctx context.Context, streamId common.Hash, key []byte, startIndex, length uint64, inclusive bool, version ...uint64) (val *KeyValue, err error) { args := []interface{}{streamId, key, startIndex, length, inclusive} if len(version) > 0 { args = append(args, version[0]) } - err = c.provider.CallContext(context.Background(), &val, "kv_getPrev", args...) + err = c.provider.CallContext(ctx, &val, "kv_getPrev", args...) return } -func (c *KvClient) GetFirst(streamId common.Hash, startIndex, length uint64, version ...uint64) (val *KeyValue, err error) { +func (c *KvClient) GetFirst(ctx context.Context, streamId common.Hash, startIndex, length uint64, version ...uint64) (val *KeyValue, err error) { args := []interface{}{streamId, startIndex, length} if len(version) > 0 { args = append(args, version[0]) } - err = c.provider.CallContext(context.Background(), &val, "kv_getFirst", args...) + err = c.provider.CallContext(ctx, &val, "kv_getFirst", args...) return } -func (c *KvClient) GetLast(streamId common.Hash, startIndex, length uint64, version ...uint64) (val *KeyValue, err error) { +func (c *KvClient) GetLast(ctx context.Context, streamId common.Hash, startIndex, length uint64, version ...uint64) (val *KeyValue, err error) { args := []interface{}{streamId, startIndex, length} if len(version) > 0 { args = append(args, version[0]) } - err = c.provider.CallContext(context.Background(), &val, "kv_getLast", args...) + err = c.provider.CallContext(ctx, &val, "kv_getLast", args...) return } -func (c *KvClient) GetTransactionResult(txSeq uint64) (result string, err error) { - err = c.provider.CallContext(context.Background(), &result, "kv_getTransactionResult", txSeq) +func (c *KvClient) GetTransactionResult(ctx context.Context, txSeq uint64) (result string, err error) { + err = c.provider.CallContext(ctx, &result, "kv_getTransactionResult", txSeq) return } -func (c *KvClient) GetHoldingStreamIds() (streamIds []common.Hash, err error) { - err = c.provider.CallContext(context.Background(), &streamIds, "kv_getHoldingStreamIds") +func (c *KvClient) GetHoldingStreamIds(ctx context.Context) (streamIds []common.Hash, err error) { + err = c.provider.CallContext(ctx, &streamIds, "kv_getHoldingStreamIds") return } -func (c *KvClient) HasWritePermission(account common.Address, streamId common.Hash, key []byte, version ...uint64) (hasPermission bool, err error) { +func (c *KvClient) HasWritePermission(ctx context.Context, account common.Address, streamId common.Hash, key []byte, version ...uint64) (hasPermission bool, err error) { args := []interface{}{account, streamId, key} if len(version) > 0 { args = append(args, version[0]) } - err = c.provider.CallContext(context.Background(), &hasPermission, "kv_hasWritePermission", args...) + err = c.provider.CallContext(ctx, &hasPermission, "kv_hasWritePermission", args...) return } -func (c *KvClient) IsAdmin(account common.Address, streamId common.Hash, version ...uint64) (isAdmin bool, err error) { +func (c *KvClient) IsAdmin(ctx context.Context, account common.Address, streamId common.Hash, version ...uint64) (isAdmin bool, err error) { args := []interface{}{account, streamId} if len(version) > 0 { args = append(args, version[0]) } - err = c.provider.CallContext(context.Background(), &isAdmin, "kv_isAdmin", args...) + err = c.provider.CallContext(ctx, &isAdmin, "kv_isAdmin", args...) return } -func (c *KvClient) IsSpecialKey(streamId common.Hash, key []byte, version ...uint64) (isSpecialKey bool, err error) { +func (c *KvClient) IsSpecialKey(ctx context.Context, streamId common.Hash, key []byte, version ...uint64) (isSpecialKey bool, err error) { args := []interface{}{streamId, key} if len(version) > 0 { args = append(args, version[0]) } - err = c.provider.CallContext(context.Background(), &isSpecialKey, "kv_isSpecialKey", args...) + err = c.provider.CallContext(ctx, &isSpecialKey, "kv_isSpecialKey", args...) return } -func (c *KvClient) IsWriterOfKey(account common.Address, streamId common.Hash, key []byte, version ...uint64) (isWriter bool, err error) { +func (c *KvClient) IsWriterOfKey(ctx context.Context, account common.Address, streamId common.Hash, key []byte, version ...uint64) (isWriter bool, err error) { args := []interface{}{account, streamId, key} if len(version) > 0 { args = append(args, version[0]) } - err = c.provider.CallContext(context.Background(), &isWriter, "kv_isWriterOfKey", args...) + err = c.provider.CallContext(ctx, &isWriter, "kv_isWriterOfKey", args...) return } -func (c *KvClient) IsWriterOfStream(account common.Address, streamId common.Hash, version ...uint64) (isWriter bool, err error) { +func (c *KvClient) IsWriterOfStream(ctx context.Context, account common.Address, streamId common.Hash, version ...uint64) (isWriter bool, err error) { args := []interface{}{account, streamId} if len(version) > 0 { args = append(args, version[0]) } - err = c.provider.CallContext(context.Background(), &isWriter, "kv_isWriterOfStream", args...) + err = c.provider.CallContext(ctx, &isWriter, "kv_isWriterOfStream", args...) return } diff --git a/transfer/download_parallel.go b/transfer/download_parallel.go index ba15d4b..f3b82b6 100644 --- a/transfer/download_parallel.go +++ b/transfer/download_parallel.go @@ -1,6 +1,7 @@ package transfer import ( + "context" "fmt" "runtime" @@ -23,11 +24,13 @@ type SegmentDownloader struct { segmentOffset uint64 numChunks uint64 numSegments uint64 + + logger *logrus.Logger } var _ parallel.Interface = (*SegmentDownloader)(nil) -func NewSegmentDownloader(clients []*node.Client, shardConfigs []*node.ShardConfig, file *download.DownloadingFile, withProof bool) (*SegmentDownloader, error) { +func newSegmentDownloader(clients []*node.Client, shardConfigs []*node.ShardConfig, file *download.DownloadingFile, withProof bool, logger *logrus.Logger) (*SegmentDownloader, error) { offset := file.Metadata().Offset if offset%core.DefaultSegmentSize > 0 { return nil, errors.Errorf("Invalid data offset in downloading file %v", offset) @@ -45,18 +48,20 @@ func NewSegmentDownloader(clients []*node.Client, shardConfigs []*node.ShardConf segmentOffset: uint64(offset / core.DefaultSegmentSize), numChunks: core.NumSplits(fileSize, core.DefaultChunkSize), numSegments: core.NumSplits(fileSize, core.DefaultSegmentSize), + + logger: logger, }, nil } // Download downloads segments in parallel. -func (downloader *SegmentDownloader) Download() error { +func (downloader *SegmentDownloader) Download(ctx context.Context) error { numTasks := downloader.numSegments - downloader.segmentOffset - return parallel.Serial(downloader, int(numTasks), runtime.GOMAXPROCS(0), 0) + return parallel.Serial(ctx, downloader, int(numTasks), runtime.GOMAXPROCS(0), 0) } // ParallelDo implements the parallel.Interface interface. -func (downloader *SegmentDownloader) ParallelDo(routine, task int) (interface{}, error) { +func (downloader *SegmentDownloader) ParallelDo(ctx context.Context, routine, task int) (interface{}, error) { segmentIndex := downloader.segmentOffset + uint64(task) startIndex := segmentIndex * core.DefaultSegmentMaxChunks endIndex := startIndex + core.DefaultSegmentMaxChunks @@ -66,49 +71,66 @@ func (downloader *SegmentDownloader) ParallelDo(routine, task int) (interface{}, root := downloader.file.Metadata().Root - clientIndex := routine % len(downloader.shardConfigs) - for segmentIndex%downloader.shardConfigs[clientIndex].NumShard != downloader.shardConfigs[clientIndex].ShardId { - clientIndex = (clientIndex + 1) % len(downloader.shardConfigs) - if clientIndex == routine%len(downloader.shardConfigs) { - return nil, fmt.Errorf("no storage node holds segment with index %v", segmentIndex) - } - } - var ( segment []byte err error ) - if downloader.withProof { - segment, err = downloader.downloadWithProof(downloader.clients[clientIndex], root, startIndex, endIndex) - } else { - segment, err = downloader.clients[clientIndex].ZeroGStorage().DownloadSegment(root, startIndex, endIndex) - } + for i := 0; i < len(downloader.shardConfigs); i += 1 { + nodeIndex := (routine + i) % len(downloader.shardConfigs) + if segmentIndex%downloader.shardConfigs[nodeIndex].NumShard != downloader.shardConfigs[nodeIndex].ShardId { + continue + } + // try download from current node + if downloader.withProof { + segment, err = downloader.downloadWithProof(ctx, downloader.clients[nodeIndex], root, startIndex, endIndex) + } else { + segment, err = downloader.clients[nodeIndex].ZeroGStorage().DownloadSegment(ctx, root, startIndex, endIndex) + } - if err != nil { - logrus.WithError(err).WithFields(logrus.Fields{ - "client index": clientIndex, - "segment": fmt.Sprintf("%v/%v", segmentIndex, downloader.numSegments), - "chunks": fmt.Sprintf("[%v, %v)", startIndex, endIndex), - }).Error("Failed to download segment") - } else if logrus.IsLevelEnabled(logrus.TraceLevel) { - logrus.WithFields(logrus.Fields{ - "client index": clientIndex, - "segment": fmt.Sprintf("%v/%v", segmentIndex, downloader.numSegments), - "chunks": fmt.Sprintf("[%v, %v)", startIndex, endIndex), - }).Trace("Succeeded to download segment") - } + if err != nil { + downloader.logger.WithError(err).WithFields(logrus.Fields{ + "node index": nodeIndex, + "segment": fmt.Sprintf("%v/%v", segmentIndex, downloader.numSegments), + "chunks": fmt.Sprintf("[%v, %v)", startIndex, endIndex), + }).Error("Failed to download segment") + continue + } + if segment == nil { + downloader.logger.WithFields(logrus.Fields{ + "node index": nodeIndex, + "segment": fmt.Sprintf("%v/%v", segmentIndex, downloader.numSegments), + "chunks": fmt.Sprintf("[%v, %v)", startIndex, endIndex), + }).Warn("segment not found") + continue + } + if len(segment)%core.DefaultChunkSize != 0 { + downloader.logger.WithFields(logrus.Fields{ + "node index": nodeIndex, + "segment": fmt.Sprintf("%v/%v", segmentIndex, downloader.numSegments), + "chunks": fmt.Sprintf("[%v, %v)", startIndex, endIndex), + }).Warn("invalid segment length") + continue + } + if downloader.logger.IsLevelEnabled(logrus.TraceLevel) { + downloader.logger.WithFields(logrus.Fields{ + "node index": nodeIndex, + "segment": fmt.Sprintf("%v/%v", segmentIndex, downloader.numSegments), + "chunks": fmt.Sprintf("[%v, %v)", startIndex, endIndex), + }).Trace("Succeeded to download segment") + } - // remove paddings for the last chunk - if err == nil && segmentIndex == downloader.numSegments-1 { - fileSize := downloader.file.Metadata().Size - if lastChunkSize := fileSize % core.DefaultChunkSize; lastChunkSize > 0 { - paddings := core.DefaultChunkSize - lastChunkSize - segment = segment[0 : len(segment)-int(paddings)] + // remove paddings for the last chunk + if segmentIndex == downloader.numSegments-1 { + fileSize := downloader.file.Metadata().Size + if lastChunkSize := fileSize % core.DefaultChunkSize; lastChunkSize > 0 { + paddings := core.DefaultChunkSize - lastChunkSize + segment = segment[0 : len(segment)-int(paddings)] + } } + return segment, nil } - - return segment, err + return nil, fmt.Errorf("no storage node holds segment with index %v", segmentIndex) } // ParallelCollect implements the parallel.Interface interface. @@ -116,13 +138,16 @@ func (downloader *SegmentDownloader) ParallelCollect(result *parallel.Result) er return downloader.file.Write(result.Value.([]byte)) } -func (downloader *SegmentDownloader) downloadWithProof(client *node.Client, root common.Hash, startIndex, endIndex uint64) ([]byte, error) { +func (downloader *SegmentDownloader) downloadWithProof(ctx context.Context, client *node.Client, root common.Hash, startIndex, endIndex uint64) ([]byte, error) { segmentIndex := startIndex / core.DefaultSegmentMaxChunks - segment, err := client.ZeroGStorage().DownloadSegmentWithProof(root, segmentIndex) + segment, err := client.ZeroGStorage().DownloadSegmentWithProof(ctx, root, segmentIndex) if err != nil { return nil, errors.WithMessage(err, "Failed to download segment with proof from storage node") } + if segment == nil { + return nil, nil + } if expectedDataLen := (endIndex - startIndex) * core.DefaultChunkSize; int(expectedDataLen) != len(segment.Data) { return nil, errors.Errorf("Downloaded data length mismatch, expected = %v, actual = %v", expectedDataLen, len(segment.Data)) diff --git a/transfer/downloader.go b/transfer/downloader.go index a5c3781..797bde5 100644 --- a/transfer/downloader.go +++ b/transfer/downloader.go @@ -1,9 +1,11 @@ package transfer import ( + "context" "fmt" "os" + zg_common "github.com/0glabs/0g-storage-client/common" "github.com/0glabs/0g-storage-client/core" "github.com/0glabs/0g-storage-client/node" "github.com/0glabs/0g-storage-client/transfer/download" @@ -14,23 +16,26 @@ import ( type Downloader struct { clients []*node.Client + + logger *logrus.Logger } -func NewDownloader(clients []*node.Client) (*Downloader, error) { +func NewDownloader(clients []*node.Client, opts ...zg_common.LogOption) (*Downloader, error) { if len(clients) == 0 { return nil, errors.New("storage node not specified") } downloader := &Downloader{ clients: clients, + logger: zg_common.NewLogger(opts...), } return downloader, nil } -func (downloader *Downloader) Download(root, filename string, withProof bool) error { +func (downloader *Downloader) Download(ctx context.Context, root, filename string, withProof bool) error { hash := common.HexToHash(root) // Query file info from storage node - info, err := downloader.queryFile(hash) + info, err := downloader.queryFile(ctx, hash) if err != nil { return errors.WithMessage(err, "Failed to query file info") } @@ -41,7 +46,7 @@ func (downloader *Downloader) Download(root, filename string, withProof bool) er } // Download segments - if err = downloader.downloadFile(filename, hash, int64(info.Tx.Size), withProof); err != nil { + if err = downloader.downloadFile(ctx, filename, hash, int64(info.Tx.Size), withProof); err != nil { return errors.WithMessage(err, "Failed to download file") } @@ -53,10 +58,10 @@ func (downloader *Downloader) Download(root, filename string, withProof bool) er return nil } -func (downloader *Downloader) queryFile(root common.Hash) (info *node.FileInfo, err error) { +func (downloader *Downloader) queryFile(ctx context.Context, root common.Hash) (info *node.FileInfo, err error) { // do not require file finalized for _, v := range downloader.clients { - info, err = v.ZeroGStorage().GetFileInfo(root) + info, err = v.ZeroGStorage().GetFileInfo(ctx, root) if err != nil { return nil, errors.WithMessagef(err, "Failed to get file info on node %v", v.URL()) } @@ -66,7 +71,7 @@ func (downloader *Downloader) queryFile(root common.Hash) (info *node.FileInfo, } } - logrus.WithField("file", info).Debug("File found by root hash") + downloader.logger.WithField("file", info).Debug("File found by root hash") return } @@ -95,26 +100,26 @@ func (downloader *Downloader) checkExistence(filename string, hash common.Hash) return errors.New("File already exists with different hash") } -func (downloader *Downloader) downloadFile(filename string, root common.Hash, size int64, withProof bool) error { +func (downloader *Downloader) downloadFile(ctx context.Context, filename string, root common.Hash, size int64, withProof bool) error { file, err := download.CreateDownloadingFile(filename, root, size) if err != nil { return errors.WithMessage(err, "Failed to create downloading file") } defer file.Close() - logrus.WithField("clients", len(downloader.clients)).Info("Begin to download file from storage node") + downloader.logger.WithField("num nodes", len(downloader.clients)).Info("Begin to download file from storage nodes") - shardConfigs, err := getShardConfigs(downloader.clients) + shardConfigs, err := getShardConfigs(ctx, downloader.clients) if err != nil { return err } - sd, err := NewSegmentDownloader(downloader.clients, shardConfigs, file, withProof) + sd, err := newSegmentDownloader(downloader.clients, shardConfigs, file, withProof, downloader.logger) if err != nil { return errors.WithMessage(err, "Failed to create segment downloader") } - if err = sd.Download(); err != nil { + if err = sd.Download(ctx); err != nil { return errors.WithMessage(err, "Failed to download file") } @@ -122,7 +127,7 @@ func (downloader *Downloader) downloadFile(filename string, root common.Hash, si return errors.WithMessage(err, "Failed to seal downloading file") } - logrus.Info("Completed to download file") + downloader.logger.Info("Completed to download file") return nil } @@ -147,7 +152,7 @@ func (downloader *Downloader) validateDownloadFile(root, filename string, fileSi return errors.Errorf("Merkle root mismatch, downloaded = %v", rootHex) } - logrus.Info("Succeeded to validate the downloaded file") + downloader.logger.Info("Succeeded to validate the downloaded file") return nil } diff --git a/transfer/uploader.go b/transfer/uploader.go index f6eda2e..3576fd7 100644 --- a/transfer/uploader.go +++ b/transfer/uploader.go @@ -1,11 +1,13 @@ package transfer import ( + "context" "math/big" "runtime" "sort" "time" + zg_common "github.com/0glabs/0g-storage-client/common" "github.com/0glabs/0g-storage-client/common/parallel" "github.com/0glabs/0g-storage-client/common/util" "github.com/0glabs/0g-storage-client/contract" @@ -38,12 +40,13 @@ type UploadOption struct { type Uploader struct { flow *contract.FlowContract clients []*node.Client + logger *logrus.Logger } -func getShardConfigs(clients []*node.Client) ([]*node.ShardConfig, error) { +func getShardConfigs(ctx context.Context, clients []*node.Client) ([]*node.ShardConfig, error) { shardConfigs := make([]*node.ShardConfig, 0) for _, client := range clients { - shardConfig, err := client.ZeroGStorage().GetShardConfig() + shardConfig, err := client.ZeroGStorage().GetShardConfig(ctx) if err != nil { return nil, err } @@ -55,26 +58,20 @@ func getShardConfigs(clients []*node.Client) ([]*node.ShardConfig, error) { return shardConfigs, nil } -func NewUploader(flow *contract.FlowContract, clients []*node.Client) (*Uploader, error) { - uploader, err := NewUploaderLight(clients) - if err != nil { - return nil, err - } - uploader.flow = flow - return uploader, nil -} - -func NewUploaderLight(clients []*node.Client) (*Uploader, error) { +func NewUploader(flow *contract.FlowContract, clients []*node.Client, opts ...zg_common.LogOption) (*Uploader, error) { if len(clients) == 0 { - panic("storage node not specified") + return nil, errors.New("storage node not specified") } - return &Uploader{ + uploader := &Uploader{ clients: clients, - }, nil + logger: zg_common.NewLogger(opts...), + flow: flow, + } + return uploader, nil } // upload data(batchly in 1 blockchain transaction if there are more than one files) -func (uploader *Uploader) BatchUpload(datas []core.IterableData, waitForLogEntry bool, option ...[]UploadOption) (common.Hash, []common.Hash, error) { +func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.IterableData, waitForLogEntry bool, option ...[]UploadOption) (common.Hash, []common.Hash, error) { stageTimer := time.Now() n := len(datas) @@ -90,7 +87,7 @@ func (uploader *Uploader) BatchUpload(datas []core.IterableData, waitForLogEntry if len(opts) != n { return common.Hash{}, nil, errors.New("datas and tags length mismatch") } - logrus.WithFields(logrus.Fields{ + uploader.logger.WithFields(logrus.Fields{ "dataNum": n, }).Info("Prepare to upload batchly") @@ -103,7 +100,7 @@ func (uploader *Uploader) BatchUpload(datas []core.IterableData, waitForLogEntry data := datas[i] opt := opts[i] - logrus.WithFields(logrus.Fields{ + uploader.logger.WithFields(logrus.Fields{ "size": data.Size(), "chunks": data.NumChunks(), "segments": data.NumSegments(), @@ -114,7 +111,7 @@ func (uploader *Uploader) BatchUpload(datas []core.IterableData, waitForLogEntry if err != nil { return common.Hash{}, nil, errors.WithMessage(err, "Failed to create data merkle tree") } - logrus.WithField("root", tree.Root()).Info("Data merkle root calculated") + uploader.logger.WithField("root", tree.Root()).Info("Data merkle root calculated") trees[i] = tree dataRoots[i] = trees[i].Root() @@ -133,7 +130,7 @@ func (uploader *Uploader) BatchUpload(datas []core.IterableData, waitForLogEntry } if waitForLogEntry { // Wait for storage node to retrieve log entry from blockchain - if err := uploader.waitForLogEntry(lastTreeToSubmit.Root(), false, receipt); err != nil { + if err := uploader.waitForLogEntry(ctx, lastTreeToSubmit.Root(), false, receipt); err != nil { return common.Hash{}, nil, errors.WithMessage(err, "Failed to check if log entry available on storage node") } } @@ -141,24 +138,24 @@ func (uploader *Uploader) BatchUpload(datas []core.IterableData, waitForLogEntry for i := 0; i < n; i++ { // Upload file to storage node - if err := uploader.UploadFile(datas[i], trees[i], 0, opts[i].TaskSize); err != nil { + if err := uploader.UploadFile(ctx, datas[i], trees[i], 0, opts[i].TaskSize); err != nil { return common.Hash{}, nil, errors.WithMessage(err, "Failed to upload file") } if waitForLogEntry { // Wait for transaction finality - if err := uploader.waitForLogEntry(trees[i].Root(), false, receipt); err != nil { + if err := uploader.waitForLogEntry(ctx, trees[i].Root(), false, receipt); err != nil { return common.Hash{}, nil, errors.WithMessage(err, "Failed to wait for transaction finality on storage node") } } } - logrus.WithField("duration", time.Since(stageTimer)).Info("batch upload took") + uploader.logger.WithField("duration", time.Since(stageTimer)).Info("batch upload took") return txHash, dataRoots, nil } -func (uploader *Uploader) Upload(data core.IterableData, option ...UploadOption) error { +func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, option ...UploadOption) error { stageTimer := time.Now() var opt UploadOption @@ -166,7 +163,7 @@ func (uploader *Uploader) Upload(data core.IterableData, option ...UploadOption) opt = option[0] } - logrus.WithFields(logrus.Fields{ + uploader.logger.WithFields(logrus.Fields{ "size": data.Size(), "chunks": data.NumChunks(), "segments": data.NumSegments(), @@ -177,14 +174,14 @@ func (uploader *Uploader) Upload(data core.IterableData, option ...UploadOption) if err != nil { return errors.WithMessage(err, "Failed to create data merkle tree") } - logrus.WithField("root", tree.Root()).Info("Data merkle root calculated") + uploader.logger.WithField("root", tree.Root()).Info("Data merkle root calculated") - info, err := uploader.clients[0].ZeroGStorage().GetFileInfo(tree.Root()) + info, err := uploader.clients[0].ZeroGStorage().GetFileInfo(ctx, tree.Root()) if err != nil { return errors.WithMessage(err, "Failed to get data info from storage node") } - logrus.WithField("info", info).Debug("Log entry retrieved from storage node") + uploader.logger.WithField("info", info).Debug("Log entry retrieved from storage node") // In case that user interact with blockchain via Metamask if uploader.flow == nil && info == nil { @@ -198,7 +195,7 @@ func (uploader *Uploader) Upload(data core.IterableData, option ...UploadOption) } // Allow to upload duplicated file for KV scenario - if err = uploader.uploadDuplicatedFile(data, opt.Tags, tree.Root()); err != nil { + if err = uploader.uploadDuplicatedFile(ctx, data, opt.Tags, tree.Root()); err != nil { return errors.WithMessage(err, "Failed to upload duplicated data") } @@ -219,13 +216,13 @@ func (uploader *Uploader) Upload(data core.IterableData, option ...UploadOption) // Otherwise, need to wait for log entry available on storage node, // which requires transaction confirmed on blockchain. if data.Size() <= smallFileSizeThreshold { - logrus.Info("Upload small data immediately") + uploader.logger.Info("Upload small data immediately") } else { // Wait for storage node to retrieve log entry from blockchain - if err = uploader.waitForLogEntry(tree.Root(), false, receipt); err != nil { + if err = uploader.waitForLogEntry(ctx, tree.Root(), false, receipt); err != nil { return errors.WithMessage(err, "Failed to check if log entry available on storage node") } - info, err = uploader.clients[0].ZeroGStorage().GetFileInfo(tree.Root()) + info, err = uploader.clients[0].ZeroGStorage().GetFileInfo(ctx, tree.Root()) if err != nil { return errors.WithMessage(err, "Failed to get file info from storage node after waitForLogEntry.") } @@ -234,16 +231,16 @@ func (uploader *Uploader) Upload(data core.IterableData, option ...UploadOption) } // Upload file to storage node - if err = uploader.UploadFile(data, tree, segNum, opt.TaskSize); err != nil { + if err = uploader.UploadFile(ctx, data, tree, segNum, opt.TaskSize); err != nil { return errors.WithMessage(err, "Failed to upload file") } // Wait for transaction finality - if err = uploader.waitForLogEntry(tree.Root(), false, nil); err != nil { + if err = uploader.waitForLogEntry(ctx, tree.Root(), false, nil); err != nil { return errors.WithMessage(err, "Failed to wait for transaction finality on storage node") } - logrus.WithField("duration", time.Since(stageTimer)).Info("upload took") + uploader.logger.WithField("duration", time.Since(stageTimer)).Info("upload took") return nil } @@ -281,7 +278,7 @@ func (uploader *Uploader) SubmitLogEntry(datas []core.IterableData, tags [][]byt return common.Hash{}, nil, errors.WithMessage(err, "Failed to send transaction to append log entry") } - logrus.WithField("hash", tx.Hash().Hex()).Info("Succeeded to send transaction to append log entry") + uploader.logger.WithField("hash", tx.Hash().Hex()).Info("Succeeded to send transaction to append log entry") if waitForReceipt { // Wait for successful execution @@ -292,18 +289,18 @@ func (uploader *Uploader) SubmitLogEntry(datas []core.IterableData, tags [][]byt } // Wait for log entry ready on storage node. -func (uploader *Uploader) waitForLogEntry(root common.Hash, finalityRequired bool, receipt *types.Receipt) error { - logrus.WithFields(logrus.Fields{ +func (uploader *Uploader) waitForLogEntry(ctx context.Context, root common.Hash, finalityRequired bool, receipt *types.Receipt) error { + uploader.logger.WithFields(logrus.Fields{ "root": root, "finality": finalityRequired, }).Info("Wait for log entry on storage node") - reminder := util.NewReminder(logrus.TraceLevel, time.Minute) + reminder := util.NewReminder(uploader.logger, time.Minute) for { time.Sleep(time.Second) - info, err := uploader.clients[0].ZeroGStorage().GetFileInfo(root) + info, err := uploader.clients[0].ZeroGStorage().GetFileInfo(ctx, root) if err != nil { return errors.WithMessage(err, "Failed to get file info from storage node") } @@ -312,7 +309,7 @@ func (uploader *Uploader) waitForLogEntry(root common.Hash, finalityRequired boo if info == nil { fields := logrus.Fields{} if receipt != nil { - if status, err := uploader.clients[0].ZeroGStorage().GetStatus(); err == nil { + if status, err := uploader.clients[0].ZeroGStorage().GetStatus(ctx); err == nil { fields["txBlockNumber"] = receipt.BlockNumber fields["zgsNodeSyncHeight"] = status.LogSyncHeight } @@ -337,9 +334,9 @@ func (uploader *Uploader) waitForLogEntry(root common.Hash, finalityRequired boo return nil } -func (uploader *Uploader) NewSegmentUploader(data core.IterableData, tree *merkle.Tree, startSegIndex uint64, taskSize uint) (*SegmentUploader, error) { +func (uploader *Uploader) NewSegmentUploader(ctx context.Context, data core.IterableData, tree *merkle.Tree, startSegIndex uint64, taskSize uint) (*SegmentUploader, error) { numSegments := data.NumSegments() - shardConfigs, err := getShardConfigs(uploader.clients) + shardConfigs, err := getShardConfigs(ctx, uploader.clients) if err != nil { return nil, err } @@ -378,33 +375,34 @@ func (uploader *Uploader) NewSegmentUploader(data core.IterableData, tree *merkl clients: uploader.clients, tasks: tasks, taskSize: taskSize, + logger: uploader.logger, }, nil } // TODO error tolerance -func (uploader *Uploader) UploadFile(data core.IterableData, tree *merkle.Tree, segIndex uint64, taskSize uint) error { +func (uploader *Uploader) UploadFile(ctx context.Context, data core.IterableData, tree *merkle.Tree, segIndex uint64, taskSize uint) error { stageTimer := time.Now() if taskSize == 0 { taskSize = defaultTaskSize } - logrus.WithFields(logrus.Fields{ + uploader.logger.WithFields(logrus.Fields{ "segIndex": segIndex, "nodeNum": len(uploader.clients), }).Info("Begin to upload file") - segmentUploader, err := uploader.NewSegmentUploader(data, tree, segIndex, taskSize) + segmentUploader, err := uploader.NewSegmentUploader(ctx, data, tree, segIndex, taskSize) if err != nil { return err } - err = parallel.Serial(segmentUploader, len(segmentUploader.tasks), min(runtime.GOMAXPROCS(0), len(uploader.clients)*5), 0) + err = parallel.Serial(ctx, segmentUploader, len(segmentUploader.tasks), min(runtime.GOMAXPROCS(0), len(uploader.clients)*5), 0) if err != nil { return err } - logrus.WithFields(logrus.Fields{ + uploader.logger.WithFields(logrus.Fields{ "duration": time.Since(stageTimer), "segNum": data.NumSegments() - segIndex, }).Info("Completed to upload file") diff --git a/transfer/uploader_dup_file.go b/transfer/uploader_dup_file.go index fb2bed1..88fb1a1 100644 --- a/transfer/uploader_dup_file.go +++ b/transfer/uploader_dup_file.go @@ -1,6 +1,7 @@ package transfer import ( + "context" "time" "github.com/0glabs/0g-storage-client/common/blockchain" @@ -9,7 +10,6 @@ import ( "github.com/0glabs/0g-storage-client/node" "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" - "github.com/sirupsen/logrus" ) const SubmitEventHash = "0x167ce04d2aa1981994d3a31695da0d785373335b1078cec239a1a3a2c7675555" @@ -17,7 +17,7 @@ const SubmitEventHash = "0x167ce04d2aa1981994d3a31695da0d785373335b1078cec239a1a // uploadDuplicatedFile uploads file to storage node that already exists by root. // In this case, user only need to submit transaction on blockchain, and wait for // file finality on storage node. -func (uploader *Uploader) uploadDuplicatedFile(data core.IterableData, tags []byte, root common.Hash) error { +func (uploader *Uploader) uploadDuplicatedFile(ctx context.Context, data core.IterableData, tags []byte, root common.Hash) error { // submit transaction on blockchain _, receipt, err := uploader.SubmitLogEntry([]core.IterableData{data}, [][]byte{tags}, true) if err != nil { @@ -40,7 +40,7 @@ func (uploader *Uploader) uploadDuplicatedFile(data core.IterableData, tags []by // wait for finality from storage node txSeq := submission.SubmissionIndex.Uint64() - info, err := uploader.waitForFileFinalityByTxSeq(txSeq) + info, err := uploader.waitForFileFinalityByTxSeq(ctx, txSeq) if err != nil { return errors.WithMessagef(err, "Failed to wait for finality for tx %v", txSeq) } @@ -52,13 +52,13 @@ func (uploader *Uploader) uploadDuplicatedFile(data core.IterableData, tags []by return nil } -func (uploader *Uploader) waitForFileFinalityByTxSeq(txSeq uint64) (*node.FileInfo, error) { - logrus.WithField("txSeq", txSeq).Info("Wait for finality on storage node") +func (uploader *Uploader) waitForFileFinalityByTxSeq(ctx context.Context, txSeq uint64) (*node.FileInfo, error) { + uploader.logger.WithField("txSeq", txSeq).Info("Wait for finality on storage node") for { time.Sleep(time.Second) - info, err := uploader.clients[0].ZeroGStorage().GetFileInfoByTxSeq(txSeq) + info, err := uploader.clients[0].ZeroGStorage().GetFileInfoByTxSeq(ctx, txSeq) if err != nil { return nil, errors.WithMessage(err, "Failed to get file info from storage node") } diff --git a/transfer/uploader_parallel.go b/transfer/uploader_parallel.go index 6ab9d6e..05c524a 100644 --- a/transfer/uploader_parallel.go +++ b/transfer/uploader_parallel.go @@ -1,6 +1,8 @@ package transfer import ( + "context" + "github.com/0glabs/0g-storage-client/common/parallel" "github.com/0glabs/0g-storage-client/core" "github.com/0glabs/0g-storage-client/core/merkle" @@ -21,6 +23,7 @@ type SegmentUploader struct { clients []*node.Client tasks []*UploadTask taskSize uint + logger *logrus.Logger } var _ parallel.Interface = (*SegmentUploader)(nil) @@ -31,7 +34,7 @@ func (uploader *SegmentUploader) ParallelCollect(result *parallel.Result) error } // ParallelDo implements parallel.Interface. -func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{}, error) { +func (uploader *SegmentUploader) ParallelDo(ctx context.Context, routine int, task int) (interface{}, error) { numChunks := uploader.data.NumChunks() numSegments := uploader.data.NumSegments() uploadTask := uploader.tasks[task] @@ -72,12 +75,12 @@ func (uploader *SegmentUploader) ParallelDo(routine int, task int) (interface{}, } segIndex += uploadTask.numShard } - if _, err := uploader.clients[uploadTask.clientIndex].ZeroGStorage().UploadSegments(segments); err != nil && !isDuplicateError(err.Error()) { + if _, err := uploader.clients[uploadTask.clientIndex].ZeroGStorage().UploadSegments(ctx, segments); err != nil && !isDuplicateError(err.Error()) { return nil, errors.WithMessage(err, "Failed to upload segment") } - if logrus.IsLevelEnabled(logrus.DebugLevel) { - logrus.WithFields(logrus.Fields{ + if uploader.logger.IsLevelEnabled(logrus.DebugLevel) { + uploader.logger.WithFields(logrus.Fields{ "total": numSegments, "from_seg_index": startSegIndex, "to_seg_index": segIndex,