Skip to content

Commit

Permalink
feat: skipTx
Browse files Browse the repository at this point in the history
  • Loading branch information
MiniFrenchBread committed Jul 15, 2024
1 parent 58611bd commit 51adf91
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 34 deletions.
8 changes: 5 additions & 3 deletions cmd/kv_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"encoding/json"
"fmt"
"math"

Expand Down Expand Up @@ -30,10 +31,10 @@ var (

func init() {
kvReadCmd.Flags().StringVar(&kvReadArgs.streamId, "stream-id", "0x", "stream to read/write")
kvReadCmd.MarkFlagRequired("streamId")
kvReadCmd.MarkFlagRequired("stream-id")

kvReadCmd.Flags().StringSliceVar(&kvReadArgs.keys, "stream-keys", []string{}, "kv keys")
kvReadCmd.MarkFlagRequired("stream-keys")
kvReadCmd.MarkFlagRequired("kv-keys")

kvReadCmd.Flags().Uint64Var(&kvReadArgs.version, "version", math.MaxUint64, "key version")

Expand All @@ -60,5 +61,6 @@ func kvRead(*cobra.Command, []string) {
}
m[key] = string(val.Data)
}
fmt.Println(m)
bs, _ := json.Marshal(m)
fmt.Println(string(bs))
}
25 changes: 14 additions & 11 deletions cmd/kv_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"os"

zg_common "github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/common/blockchain"
"github.com/0glabs/0g-storage-client/contract"
"github.com/0glabs/0g-storage-client/indexer"
Expand Down Expand Up @@ -34,25 +35,26 @@ var (

expectedReplica uint

skipTx bool
finalityRequired bool
taskSize uint
}

kvWriteCmd = &cobra.Command{
Use: "kvWrite",
Use: "kv-write",
Short: "write to kv streams",
Run: kvWrite,
}
)

func init() {
kvWriteCmd.Flags().StringVar(&kvWriteArgs.streamId, "stream-id", "0x", "stream to read/write")
kvWriteCmd.MarkFlagRequired("streamId")
kvWriteCmd.MarkFlagRequired("stream-id")

kvWriteCmd.Flags().StringSliceVar(&kvWriteArgs.keys, "stream-keys", []string{}, "kv keys")
kvWriteCmd.MarkFlagRequired("stream-keys")
kvWriteCmd.MarkFlagRequired("kv-keys")
kvWriteCmd.Flags().StringSliceVar(&kvWriteArgs.values, "stream-values", []string{}, "kv values")
kvWriteCmd.MarkFlagRequired("stream-values")
kvWriteCmd.MarkFlagRequired("kv-values")

kvWriteCmd.Flags().Uint64Var(&kvWriteArgs.version, "version", math.MaxUint64, "key version")

Expand All @@ -68,6 +70,7 @@ func init() {

kvWriteCmd.Flags().UintVar(&kvWriteArgs.expectedReplica, "expected-replica", 1, "expected number of replications to kvWrite")

kvWriteCmd.Flags().BoolVar(&kvWriteArgs.skipTx, "skip-tx", false, "Skip sending the transaction on chain")
kvWriteCmd.Flags().BoolVar(&kvWriteArgs.finalityRequired, "finality-required", false, "Wait for file finality on nodes to kvWrite")
kvWriteCmd.Flags().UintVar(&kvWriteArgs.taskSize, "task-size", 10, "Number of segments to kvWrite in single rpc request")

Expand All @@ -87,6 +90,7 @@ func kvWrite(*cobra.Command, []string) {
FinalityRequired: kvWriteArgs.finalityRequired,
TaskSize: kvWriteArgs.taskSize,
ExpectedReplica: kvWriteArgs.expectedReplica,
SkipTx: kvWriteArgs.skipTx,
}

var clients []*node.Client
Expand All @@ -103,21 +107,20 @@ func kvWrite(*cobra.Command, []string) {
if len(kvWriteArgs.node) == 0 {
logrus.Fatal("At least one of --node and --indexer should not be empty")
}
clients = node.MustNewClients(kvWriteArgs.node)
for _, client := range clients {
defer client.Close()
}
}

clients = node.MustNewClients(uploadArgs.node)
for _, client := range clients {
defer client.Close()
}

batcher := kv.NewBatcher(kvWriteArgs.version, clients, flow)
batcher := kv.NewBatcher(kvWriteArgs.version, clients, flow, zg_common.LogOption{Logger: logrus.StandardLogger()})
if len(kvWriteArgs.keys) != len(kvWriteArgs.values) {
logrus.Fatal("keys and values length mismatch")
}
if len(kvWriteArgs.keys) == 0 {
logrus.Fatal("no keys to write")
}
streamId := common.HexToHash(kvReadArgs.streamId)
streamId := common.HexToHash(kvWriteArgs.streamId)

for i := range kvWriteArgs.keys {
batcher.Set(streamId,
Expand Down
3 changes: 3 additions & 0 deletions cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (

expectedReplica uint

skipTx bool
finalityRequired bool
taskSize uint
}
Expand Down Expand Up @@ -60,6 +61,7 @@ func init() {

uploadCmd.Flags().UintVar(&uploadArgs.expectedReplica, "expected-replica", 1, "expected number of replications to upload")

uploadCmd.Flags().BoolVar(&uploadArgs.skipTx, "skip-tx", false, "Skip sending the transaction on chain")
uploadCmd.Flags().BoolVar(&uploadArgs.finalityRequired, "finality-required", false, "Wait for file finality on nodes to upload")
uploadCmd.Flags().UintVar(&uploadArgs.taskSize, "task-size", 10, "Number of segments to upload in single rpc request")

Expand All @@ -80,6 +82,7 @@ func upload(*cobra.Command, []string) {
FinalityRequired: uploadArgs.finalityRequired,
TaskSize: uploadArgs.taskSize,
ExpectedReplica: uploadArgs.expectedReplica,
SkipTx: uploadArgs.skipTx,
}

file, err := core.Open(uploadArgs.file)
Expand Down
76 changes: 56 additions & 20 deletions transfer/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type UploadOption struct {
FinalityRequired bool // wait for file finalized on uploaded nodes or not
TaskSize uint // number of segment to upload in single rpc request
ExpectedReplica uint // expected number of replications
SkipTx bool // skip sending transaction on chain
}

type Uploader struct {
Expand Down Expand Up @@ -73,6 +74,20 @@ func NewUploader(flow *contract.FlowContract, clients []*node.Client, opts ...zg
return uploader, nil
}

func (uploader *Uploader) checkLogExistance(ctx context.Context, root common.Hash) (bool, error) {
for _, client := range uploader.clients {
info, err := client.ZeroGStorage().GetFileInfo(ctx, root)
if err != nil {
return false, errors.WithMessage(err, fmt.Sprintf("Failed to get file info from storage node %v", client.URL()))
}
// log entry unavailable yet
if info == nil {
return false, nil
}
}
return true, nil
}

// upload data(batchly in 1 blockchain transaction if there are more than one files)
func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.IterableData, waitForLogEntry bool, option ...[]UploadOption) (common.Hash, []common.Hash, error) {
stageTimer := time.Now()
Expand Down Expand Up @@ -118,8 +133,19 @@ func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.Iterable
trees[i] = tree
dataRoots[i] = trees[i].Root()

toSubmitDatas = append(toSubmitDatas, data)
toSubmitTags = append(toSubmitTags, opt.Tags)
if !opts[i].SkipTx {
toSubmitDatas = append(toSubmitDatas, data)
toSubmitTags = append(toSubmitTags, opt.Tags)
} else {
// Check existance
exist, err := uploader.checkLogExistance(ctx, trees[i].Root())
if err != nil {
return common.Hash{}, nil, errors.WithMessage(err, "Failed to check if skipped log entry available on storage node")
}
if !exist {
return common.Hash{}, nil, fmt.Errorf("data #%v log entry is not exist on given nodes", i)
}
}
lastTreeToSubmit = trees[i]
}

Expand All @@ -141,7 +167,7 @@ func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.Iterable

for i := 0; i < n; i++ {
// Upload file to storage node
if err := uploader.UploadFile(ctx, datas[i], trees[i], opts[i].ExpectedReplica, opts[i].TaskSize); err != nil {
if err := uploader.uploadFile(ctx, datas[i], trees[i], opts[i].ExpectedReplica, opts[i].TaskSize); err != nil {
return common.Hash{}, nil, errors.WithMessage(err, "Failed to upload file")
}

Expand Down Expand Up @@ -180,26 +206,37 @@ func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, op
uploader.logger.WithField("root", tree.Root()).Info("Data merkle root calculated")

// Append log on blockchain
var receipt *types.Receipt
if !opt.SkipTx {
var receipt *types.Receipt

if _, receipt, err = uploader.SubmitLogEntry([]core.IterableData{data}, [][]byte{opt.Tags}, true); err != nil {
return errors.WithMessage(err, "Failed to submit log entry")
}
if _, receipt, err = uploader.SubmitLogEntry([]core.IterableData{data}, [][]byte{opt.Tags}, true); err != nil {
return errors.WithMessage(err, "Failed to submit log entry")
}

// For small data, could upload file to storage node immediately.
// Otherwise, need to wait for log entry available on storage node,
// which requires transaction confirmed on blockchain.
if data.Size() <= smallFileSizeThreshold {
uploader.logger.Info("Upload small data immediately")
// For small data, could upload file to storage node immediately.
// Otherwise, need to wait for log entry available on storage node,
// which requires transaction confirmed on blockchain.
if data.Size() <= smallFileSizeThreshold {
uploader.logger.Info("Upload small data immediately")
} else {
// Wait for storage node to retrieve log entry from blockchain
if err = uploader.waitForLogEntry(ctx, tree.Root(), false, receipt); err != nil {
return errors.WithMessage(err, "Failed to check if log entry available on storage node")
}
}
} else {
// Wait for storage node to retrieve log entry from blockchain
if err = uploader.waitForLogEntry(ctx, tree.Root(), false, receipt); err != nil {
return errors.WithMessage(err, "Failed to check if log entry available on storage node")
// Check existance
exist, err := uploader.checkLogExistance(ctx, tree.Root())
if err != nil {
return errors.WithMessage(err, "Failed to check if skipped log entry available on storage node")
}
if !exist {
return fmt.Errorf("data log entry is not exist on given nodes")
}
}

// Upload file to storage node
if err = uploader.UploadFile(ctx, data, tree, opt.ExpectedReplica, opt.TaskSize); err != nil {
if err = uploader.uploadFile(ctx, data, tree, opt.ExpectedReplica, opt.TaskSize); err != nil {
return errors.WithMessage(err, "Failed to upload file")
}

Expand Down Expand Up @@ -307,7 +344,7 @@ func (uploader *Uploader) waitForLogEntry(ctx context.Context, root common.Hash,
return nil
}

func (uploader *Uploader) NewSegmentUploader(ctx context.Context, data core.IterableData, tree *merkle.Tree, expectedReplica uint, taskSize uint) (*SegmentUploader, error) {
func (uploader *Uploader) newSegmentUploader(ctx context.Context, data core.IterableData, tree *merkle.Tree, expectedReplica uint, taskSize uint) (*SegmentUploader, error) {
numSegments := data.NumSegments()
shardConfigs, err := getShardConfigs(ctx, uploader.clients)
if err != nil {
Expand Down Expand Up @@ -357,8 +394,7 @@ func (uploader *Uploader) NewSegmentUploader(ctx context.Context, data core.Iter
}, nil
}

// TODO error tolerance
func (uploader *Uploader) UploadFile(ctx context.Context, data core.IterableData, tree *merkle.Tree, expectedReplica uint, taskSize uint) error {
func (uploader *Uploader) uploadFile(ctx context.Context, data core.IterableData, tree *merkle.Tree, expectedReplica uint, taskSize uint) error {
stageTimer := time.Now()

if taskSize == 0 {
Expand All @@ -370,7 +406,7 @@ func (uploader *Uploader) UploadFile(ctx context.Context, data core.IterableData
"nodeNum": len(uploader.clients),
}).Info("Begin to upload file")

segmentUploader, err := uploader.NewSegmentUploader(ctx, data, tree, expectedReplica, taskSize)
segmentUploader, err := uploader.newSegmentUploader(ctx, data, tree, expectedReplica, taskSize)
if err != nil {
return err
}
Expand Down

0 comments on commit 51adf91

Please sign in to comment.