From 0349db6ed397883e345fb6671dabf603b66129c9 Mon Sep 17 00:00:00 2001 From: MiniFrenchBread <103425574+MiniFrenchBread@users.noreply.github.com> Date: Thu, 11 Jul 2024 18:09:09 +0800 Subject: [PATCH 1/2] refactor: finalized, kv --- cmd/upload.go | 12 +-- example/kv_put/main.go | 6 +- kv/batcher.go | 63 ++++++++++++++ kv/client.go | 64 +------------- transfer/uploader.go | 151 ++++++++++++++-------------------- transfer/uploader_dup_file.go | 70 ---------------- 6 files changed, 134 insertions(+), 232 deletions(-) create mode 100644 kv/batcher.go delete mode 100644 transfer/uploader_dup_file.go diff --git a/cmd/upload.go b/cmd/upload.go index b06a139..94724e2 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -26,8 +26,8 @@ var ( node []string - force bool - taskSize uint + finalityRequired bool + taskSize uint } uploadCmd = &cobra.Command{ @@ -52,7 +52,7 @@ func init() { uploadCmd.Flags().StringSliceVar(&uploadArgs.node, "node", []string{}, "ZeroGStorage storage node URL") uploadCmd.MarkFlagRequired("node") - uploadCmd.Flags().BoolVar(&uploadArgs.force, "force", false, "Force to upload file even already exists") + uploadCmd.Flags().BoolVar(&uploadArgs.finalityRequired, "finality-required", false, "Wait for file finality on nodes to upload") uploadCmd.Flags().UintVar(&uploadArgs.taskSize, "task-size", 10, "Number of segments to upload in single rpc request") rootCmd.AddCommand(uploadCmd) @@ -76,9 +76,9 @@ func upload(*cobra.Command, []string) { logrus.WithError(err).Fatal("Failed to initialize uploader") } opt := transfer.UploadOption{ - Tags: hexutil.MustDecode(uploadArgs.tags), - Force: uploadArgs.force, - TaskSize: uploadArgs.taskSize, + Tags: hexutil.MustDecode(uploadArgs.tags), + FinalityRequired: uploadArgs.finalityRequired, + TaskSize: uploadArgs.taskSize, } file, err := core.Open(uploadArgs.file) diff --git a/example/kv_put/main.go b/example/kv_put/main.go index 346ab72..454edbd 100644 --- a/example/kv_put/main.go +++ b/example/kv_put/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "math" "github.com/0glabs/0g-storage-client/common/blockchain" "github.com/0glabs/0g-storage-client/contract" @@ -25,13 +26,12 @@ func main() { blockchainClient := blockchain.MustNewWeb3(BlockchainClientAddr, PrivKey) defer blockchainClient.Close() blockchain.CustomGasLimit = 1000000 - zgs, err := contract.NewFlowContract(common.HexToAddress(FlowContractAddr), blockchainClient) + flow, err := contract.NewFlowContract(common.HexToAddress(FlowContractAddr), blockchainClient) if err != nil { fmt.Println(err) return } - kvClient := kv.NewClient(zgsClient, zgs) - batcher := kvClient.Batcher() + batcher := kv.NewBatcher(math.MaxUint64, []*node.Client{zgsClient}, flow) batcher.Set(common.HexToHash("0x000000000000000000000000000000000000000000000000000000000000f2bd"), []byte("TESTKEY0"), []byte{69, 70, 71, 72, 73}, diff --git a/kv/batcher.go b/kv/batcher.go new file mode 100644 index 0000000..4162c64 --- /dev/null +++ b/kv/batcher.go @@ -0,0 +1,63 @@ +package kv + +import ( + "context" + + zg_common "github.com/0glabs/0g-storage-client/common" + "github.com/0glabs/0g-storage-client/contract" + "github.com/0glabs/0g-storage-client/core" + "github.com/0glabs/0g-storage-client/node" + "github.com/0glabs/0g-storage-client/transfer" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type Batcher struct { + *StreamDataBuilder + clients []*node.Client + flow *contract.FlowContract + logger *logrus.Logger +} + +func NewBatcher(version uint64, clients []*node.Client, flow *contract.FlowContract, opts ...zg_common.LogOption) *Batcher { + return &Batcher{ + StreamDataBuilder: NewStreamDataBuilder(version), + clients: clients, + flow: flow, + logger: zg_common.NewLogger(opts...), + } +} + +// Exec submit the kv operations to ZeroGStorage network in batch. +// +// Note, this is a time consuming operation, e.g. several seconds or even longer. +// When it comes to a time sentitive context, it should be executed in a separate go-routine. +func (b *Batcher) Exec(ctx context.Context) error { + // build stream data + streamData, err := b.Build() + if err != nil { + return errors.WithMessage(err, "Failed to build stream data") + } + + encoded, err := streamData.Encode() + if err != nil { + return errors.WithMessage(err, "Failed to encode data") + } + data, err := core.NewDataInMemory(encoded) + if err != nil { + return err + } + + // upload file + uploader, err := transfer.NewUploader(b.flow, b.clients, zg_common.LogOption{Logger: b.logger}) + if err != nil { + return err + } + opt := transfer.UploadOption{ + Tags: b.BuildTags(), + } + if err = uploader.Upload(ctx, data, opt); err != nil { + return errors.WithMessagef(err, "Failed to upload data") + } + return nil +} diff --git a/kv/client.go b/kv/client.go index 5e2e78c..e3ab646 100644 --- a/kv/client.go +++ b/kv/client.go @@ -4,14 +4,9 @@ import ( "context" "math" - zg_common "github.com/0glabs/0g-storage-client/common" "github.com/0glabs/0g-storage-client/contract" - "github.com/0glabs/0g-storage-client/core" "github.com/0glabs/0g-storage-client/node" - "github.com/0glabs/0g-storage-client/transfer" "github.com/ethereum/go-ethereum/common" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" ) // Client is used for users to communicate with server for kv operations. @@ -20,10 +15,7 @@ type Client struct { flow *contract.FlowContract } -// NewClient creates a new client for kv operations. -// -// Generally, you could refer to the `upload` function in `cmd/upload.go` file -// for how to create storage node client and flow contract client. +// NewClient creates a new client for kv queries. func NewClient(node *node.Client, flow *contract.FlowContract) *Client { return &Client{ node: node, @@ -124,57 +116,3 @@ func (c *Client) IsWriterOfKey(ctx context.Context, account common.Address, stre func (c *Client) IsWriterOfStream(ctx context.Context, account common.Address, streamId common.Hash, version ...uint64) (isWriter bool, err error) { return c.node.KV().IsWriterOfStream(ctx, account, streamId, version...) } - -// Batcher returns a Batcher instance for kv operations in batch. -func (c *Client) Batcher(opts ...zg_common.LogOption) *Batcher { - return newBatcher(math.MaxUint64, c, opts...) -} - -type Batcher struct { - *StreamDataBuilder - client *Client - logger *logrus.Logger -} - -func newBatcher(version uint64, client *Client, opts ...zg_common.LogOption) *Batcher { - return &Batcher{ - StreamDataBuilder: NewStreamDataBuilder(version), - client: client, - logger: zg_common.NewLogger(opts...), - } -} - -// Exec submit the kv operations to ZeroGStorage network in batch. -// -// Note, this is a time consuming operation, e.g. several seconds or even longer. -// When it comes to a time sentitive context, it should be executed in a separate go-routine. -func (b *Batcher) Exec(ctx context.Context) error { - // build stream data - streamData, err := b.Build() - if err != nil { - return errors.WithMessage(err, "Failed to build stream data") - } - - encoded, err := streamData.Encode() - if err != nil { - return errors.WithMessage(err, "Failed to encode data") - } - data, err := core.NewDataInMemory(encoded) - if err != nil { - return err - } - - // upload file - uploader, err := transfer.NewUploader(b.client.flow, []*node.Client{b.client.node}, zg_common.LogOption{Logger: b.logger}) - if err != nil { - return err - } - opt := transfer.UploadOption{ - Tags: b.BuildTags(), - Force: true, - } - if err = uploader.Upload(ctx, data, opt); err != nil { - return errors.WithMessagef(err, "Failed to upload data") - } - return nil -} diff --git a/transfer/uploader.go b/transfer/uploader.go index 3576fd7..f918d24 100644 --- a/transfer/uploader.go +++ b/transfer/uploader.go @@ -2,6 +2,7 @@ package transfer import ( "context" + "fmt" "math/big" "runtime" "sort" @@ -32,9 +33,9 @@ func isDuplicateError(msg string) bool { } type UploadOption struct { - Tags []byte // for kv operations - Force bool // for kv to upload same file - TaskSize uint // number of segment to upload in single rpc request + Tags []byte // transaction tags + FinalityRequired bool // wait for file finalized on uploaded nodes or not + TaskSize uint // number of segment to upload in single rpc request } type Uploader struct { @@ -138,13 +139,13 @@ func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.Iterable for i := 0; i < n; i++ { // Upload file to storage node - if err := uploader.UploadFile(ctx, datas[i], trees[i], 0, opts[i].TaskSize); err != nil { + if err := uploader.UploadFile(ctx, datas[i], trees[i], opts[i].TaskSize); err != nil { return common.Hash{}, nil, errors.WithMessage(err, "Failed to upload file") } if waitForLogEntry { // Wait for transaction finality - if err := uploader.waitForLogEntry(ctx, trees[i].Root(), false, receipt); err != nil { + if err := uploader.waitForLogEntry(ctx, trees[i].Root(), opts[i].FinalityRequired, receipt); err != nil { return common.Hash{}, nil, errors.WithMessage(err, "Failed to wait for transaction finality on storage node") } } @@ -176,67 +177,32 @@ func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, op } uploader.logger.WithField("root", tree.Root()).Info("Data merkle root calculated") - info, err := uploader.clients[0].ZeroGStorage().GetFileInfo(ctx, tree.Root()) - if err != nil { - return errors.WithMessage(err, "Failed to get data info from storage node") - } - - uploader.logger.WithField("info", info).Debug("Log entry retrieved from storage node") - - // In case that user interact with blockchain via Metamask - if uploader.flow == nil && info == nil { - return errors.New("log entry not available on storage node") - } - - // already finalized - if info != nil && info.Finalized { - if !opt.Force { - return errors.New("Data already exists on ZeroGStorage network") - } - - // Allow to upload duplicated file for KV scenario - if err = uploader.uploadDuplicatedFile(ctx, data, opt.Tags, tree.Root()); err != nil { - return errors.WithMessage(err, "Failed to upload duplicated data") - } + // Append log on blockchain + var receipt *types.Receipt - return nil + if _, receipt, err = uploader.SubmitLogEntry([]core.IterableData{data}, [][]byte{opt.Tags}, true); err != nil { + return errors.WithMessage(err, "Failed to submit log entry") } - // Log entry unavailable on storage node yet. - segNum := uint64(0) - if info == nil { - // Append log on blockchain - var receipt *types.Receipt - - if _, receipt, err = uploader.SubmitLogEntry([]core.IterableData{data}, [][]byte{opt.Tags}, true); err != nil { - return errors.WithMessage(err, "Failed to submit log entry") - } - - // For small data, could upload file to storage node immediately. - // Otherwise, need to wait for log entry available on storage node, - // which requires transaction confirmed on blockchain. - if data.Size() <= smallFileSizeThreshold { - uploader.logger.Info("Upload small data immediately") - } else { - // Wait for storage node to retrieve log entry from blockchain - if err = uploader.waitForLogEntry(ctx, tree.Root(), false, receipt); err != nil { - return errors.WithMessage(err, "Failed to check if log entry available on storage node") - } - info, err = uploader.clients[0].ZeroGStorage().GetFileInfo(ctx, tree.Root()) - if err != nil { - return errors.WithMessage(err, "Failed to get file info from storage node after waitForLogEntry.") - } - segNum = info.UploadedSegNum + // For small data, could upload file to storage node immediately. + // Otherwise, need to wait for log entry available on storage node, + // which requires transaction confirmed on blockchain. + if data.Size() <= smallFileSizeThreshold { + uploader.logger.Info("Upload small data immediately") + } else { + // Wait for storage node to retrieve log entry from blockchain + if err = uploader.waitForLogEntry(ctx, tree.Root(), false, receipt); err != nil { + return errors.WithMessage(err, "Failed to check if log entry available on storage node") } } // Upload file to storage node - if err = uploader.UploadFile(ctx, data, tree, segNum, opt.TaskSize); err != nil { + if err = uploader.UploadFile(ctx, data, tree, opt.TaskSize); err != nil { return errors.WithMessage(err, "Failed to upload file") } // Wait for transaction finality - if err = uploader.waitForLogEntry(ctx, tree.Root(), false, nil); err != nil { + if err = uploader.waitForLogEntry(ctx, tree.Root(), opt.FinalityRequired, nil); err != nil { return errors.WithMessage(err, "Failed to wait for transaction finality on storage node") } @@ -300,41 +266,46 @@ func (uploader *Uploader) waitForLogEntry(ctx context.Context, root common.Hash, for { time.Sleep(time.Second) - info, err := uploader.clients[0].ZeroGStorage().GetFileInfo(ctx, root) - if err != nil { - return errors.WithMessage(err, "Failed to get file info from storage node") - } - - // log entry unavailable yet - if info == nil { - fields := logrus.Fields{} - if receipt != nil { - if status, err := uploader.clients[0].ZeroGStorage().GetStatus(ctx); err == nil { - fields["txBlockNumber"] = receipt.BlockNumber - fields["zgsNodeSyncHeight"] = status.LogSyncHeight - } + ok := true + for _, client := range uploader.clients { + info, err := client.ZeroGStorage().GetFileInfo(ctx, root) + if err != nil { + return errors.WithMessage(err, fmt.Sprintf("Failed to get file info from storage node %v", client.URL())) } + // log entry unavailable yet + if info == nil { + fields := logrus.Fields{} + if receipt != nil { + if status, err := client.ZeroGStorage().GetStatus(ctx); err == nil { + fields["txBlockNumber"] = receipt.BlockNumber + fields["zgsNodeSyncHeight"] = status.LogSyncHeight + } + } - reminder.Remind("Log entry is unavailable yet", fields) + reminder.Remind("Log entry is unavailable yet", fields) + ok = false + break + } - continue + if finalityRequired && !info.Finalized { + reminder.Remind("Log entry is available, but not finalized yet", logrus.Fields{ + "cached": info.IsCached, + "uploadedSegments": info.UploadedSegNum, + }) + ok = false + break + } } - if finalityRequired && !info.Finalized { - reminder.Remind("Log entry is available, but not finalized yet", logrus.Fields{ - "cached": info.IsCached, - "uploadedSegments": info.UploadedSegNum, - }) - continue + if ok { + break } - - break } return nil } -func (uploader *Uploader) NewSegmentUploader(ctx context.Context, data core.IterableData, tree *merkle.Tree, startSegIndex uint64, taskSize uint) (*SegmentUploader, error) { +func (uploader *Uploader) NewSegmentUploader(ctx context.Context, data core.IterableData, tree *merkle.Tree, taskSize uint) (*SegmentUploader, error) { numSegments := data.NumSegments() shardConfigs, err := getShardConfigs(ctx, uploader.clients) if err != nil { @@ -342,13 +313,13 @@ func (uploader *Uploader) NewSegmentUploader(ctx context.Context, data core.Iter } clientTasks := make([][]*UploadTask, 0) for clientIndex, shardConfig := range shardConfigs { - var segIndex uint64 - r := startSegIndex % shardConfig.NumShard - if r <= shardConfig.ShardId { - segIndex = startSegIndex + shardConfig.ShardId - r - } else { - segIndex = startSegIndex - r + shardConfig.ShardId + shardConfig.NumShard + // skip finalized nodes + info, _ := uploader.clients[clientIndex].ZeroGStorage().GetFileInfo(ctx, tree.Root()) + if info != nil && info.Finalized { + continue } + // create upload tasks + segIndex := shardConfig.ShardId tasks := make([]*UploadTask, 0) for ; segIndex < numSegments; segIndex += shardConfig.NumShard * uint64(taskSize) { tasks = append(tasks, &UploadTask{ @@ -380,7 +351,7 @@ func (uploader *Uploader) NewSegmentUploader(ctx context.Context, data core.Iter } // TODO error tolerance -func (uploader *Uploader) UploadFile(ctx context.Context, data core.IterableData, tree *merkle.Tree, segIndex uint64, taskSize uint) error { +func (uploader *Uploader) UploadFile(ctx context.Context, data core.IterableData, tree *merkle.Tree, taskSize uint) error { stageTimer := time.Now() if taskSize == 0 { @@ -388,11 +359,11 @@ func (uploader *Uploader) UploadFile(ctx context.Context, data core.IterableData } uploader.logger.WithFields(logrus.Fields{ - "segIndex": segIndex, - "nodeNum": len(uploader.clients), + "segNum": data.NumSegments(), + "nodeNum": len(uploader.clients), }).Info("Begin to upload file") - segmentUploader, err := uploader.NewSegmentUploader(ctx, data, tree, segIndex, taskSize) + segmentUploader, err := uploader.NewSegmentUploader(ctx, data, tree, taskSize) if err != nil { return err } @@ -404,7 +375,7 @@ func (uploader *Uploader) UploadFile(ctx context.Context, data core.IterableData uploader.logger.WithFields(logrus.Fields{ "duration": time.Since(stageTimer), - "segNum": data.NumSegments() - segIndex, + "segNum": data.NumSegments(), }).Info("Completed to upload file") return nil diff --git a/transfer/uploader_dup_file.go b/transfer/uploader_dup_file.go deleted file mode 100644 index 88fb1a1..0000000 --- a/transfer/uploader_dup_file.go +++ /dev/null @@ -1,70 +0,0 @@ -package transfer - -import ( - "context" - "time" - - "github.com/0glabs/0g-storage-client/common/blockchain" - "github.com/0glabs/0g-storage-client/contract" - "github.com/0glabs/0g-storage-client/core" - "github.com/0glabs/0g-storage-client/node" - "github.com/ethereum/go-ethereum/common" - "github.com/pkg/errors" -) - -const SubmitEventHash = "0x167ce04d2aa1981994d3a31695da0d785373335b1078cec239a1a3a2c7675555" - -// uploadDuplicatedFile uploads file to storage node that already exists by root. -// In this case, user only need to submit transaction on blockchain, and wait for -// file finality on storage node. -func (uploader *Uploader) uploadDuplicatedFile(ctx context.Context, data core.IterableData, tags []byte, root common.Hash) error { - // submit transaction on blockchain - _, receipt, err := uploader.SubmitLogEntry([]core.IterableData{data}, [][]byte{tags}, true) - if err != nil { - return errors.WithMessage(err, "Failed to submit log entry") - } - - // parse submission from event log - var submission *contract.FlowSubmit - for _, v := range receipt.Logs { - if v.Topics[0] == common.HexToHash(SubmitEventHash) { - log := blockchain.ConvertToGethLog(v) - - if submission, err = uploader.flow.ParseSubmit(*log); err != nil { - return err - } - - break - } - } - - // wait for finality from storage node - txSeq := submission.SubmissionIndex.Uint64() - info, err := uploader.waitForFileFinalityByTxSeq(ctx, txSeq) - if err != nil { - return errors.WithMessagef(err, "Failed to wait for finality for tx %v", txSeq) - } - - if info.Tx.DataMerkleRoot != root { - return errors.New("Merkle root mismatch, maybe transaction reverted") - } - - return nil -} - -func (uploader *Uploader) waitForFileFinalityByTxSeq(ctx context.Context, txSeq uint64) (*node.FileInfo, error) { - uploader.logger.WithField("txSeq", txSeq).Info("Wait for finality on storage node") - - for { - time.Sleep(time.Second) - - info, err := uploader.clients[0].ZeroGStorage().GetFileInfoByTxSeq(ctx, txSeq) - if err != nil { - return nil, errors.WithMessage(err, "Failed to get file info from storage node") - } - - if info != nil && info.Finalized { - return info, nil - } - } -} From b9ae7b147665f79e1c8367771cacedd01703307e Mon Sep 17 00:00:00 2001 From: MiniFrenchBread <103425574+MiniFrenchBread@users.noreply.github.com> Date: Thu, 11 Jul 2024 18:59:47 +0800 Subject: [PATCH 2/2] fix: task length check --- transfer/uploader.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/transfer/uploader.go b/transfer/uploader.go index f918d24..16e1863 100644 --- a/transfer/uploader.go +++ b/transfer/uploader.go @@ -334,9 +334,11 @@ func (uploader *Uploader) NewSegmentUploader(ctx context.Context, data core.Iter return len(clientTasks[i]) > len(clientTasks[j]) }) tasks := make([]*UploadTask, 0) - for taskIndex := 0; taskIndex < len(clientTasks[0]); taskIndex += 1 { - for i := 0; i < len(clientTasks) && taskIndex < len(clientTasks[i]); i += 1 { - tasks = append(tasks, clientTasks[i][taskIndex]) + if len(clientTasks) > 0 { + for taskIndex := 0; taskIndex < len(clientTasks[0]); taskIndex += 1 { + for i := 0; i < len(clientTasks) && taskIndex < len(clientTasks[i]); i += 1 { + tasks = append(tasks, clientTasks[i][taskIndex]) + } } }