Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: finality check, KV client #18

Merged
merged 2 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ var (

node []string

force bool
taskSize uint
finalityRequired bool
taskSize uint
}

uploadCmd = &cobra.Command{
Expand All @@ -52,7 +52,7 @@ func init() {
uploadCmd.Flags().StringSliceVar(&uploadArgs.node, "node", []string{}, "ZeroGStorage storage node URL")
uploadCmd.MarkFlagRequired("node")

uploadCmd.Flags().BoolVar(&uploadArgs.force, "force", false, "Force to upload file even already exists")
uploadCmd.Flags().BoolVar(&uploadArgs.finalityRequired, "finality-required", false, "Wait for file finality on nodes to upload")
uploadCmd.Flags().UintVar(&uploadArgs.taskSize, "task-size", 10, "Number of segments to upload in single rpc request")

rootCmd.AddCommand(uploadCmd)
Expand All @@ -76,9 +76,9 @@ func upload(*cobra.Command, []string) {
logrus.WithError(err).Fatal("Failed to initialize uploader")
}
opt := transfer.UploadOption{
Tags: hexutil.MustDecode(uploadArgs.tags),
Force: uploadArgs.force,
TaskSize: uploadArgs.taskSize,
Tags: hexutil.MustDecode(uploadArgs.tags),
FinalityRequired: uploadArgs.finalityRequired,
TaskSize: uploadArgs.taskSize,
}

file, err := core.Open(uploadArgs.file)
Expand Down
6 changes: 3 additions & 3 deletions example/kv_put/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"math"

"github.com/0glabs/0g-storage-client/common/blockchain"
"github.com/0glabs/0g-storage-client/contract"
Expand All @@ -25,13 +26,12 @@ func main() {
blockchainClient := blockchain.MustNewWeb3(BlockchainClientAddr, PrivKey)
defer blockchainClient.Close()
blockchain.CustomGasLimit = 1000000
zgs, err := contract.NewFlowContract(common.HexToAddress(FlowContractAddr), blockchainClient)
flow, err := contract.NewFlowContract(common.HexToAddress(FlowContractAddr), blockchainClient)
if err != nil {
fmt.Println(err)
return
}
kvClient := kv.NewClient(zgsClient, zgs)
batcher := kvClient.Batcher()
batcher := kv.NewBatcher(math.MaxUint64, []*node.Client{zgsClient}, flow)
batcher.Set(common.HexToHash("0x000000000000000000000000000000000000000000000000000000000000f2bd"),
[]byte("TESTKEY0"),
[]byte{69, 70, 71, 72, 73},
Expand Down
63 changes: 63 additions & 0 deletions kv/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package kv

import (
"context"

zg_common "github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/contract"
"github.com/0glabs/0g-storage-client/core"
"github.com/0glabs/0g-storage-client/node"
"github.com/0glabs/0g-storage-client/transfer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

type Batcher struct {
*StreamDataBuilder
clients []*node.Client
flow *contract.FlowContract
logger *logrus.Logger
}

func NewBatcher(version uint64, clients []*node.Client, flow *contract.FlowContract, opts ...zg_common.LogOption) *Batcher {
return &Batcher{
StreamDataBuilder: NewStreamDataBuilder(version),
clients: clients,
flow: flow,
logger: zg_common.NewLogger(opts...),
}
}

// Exec submit the kv operations to ZeroGStorage network in batch.
//
// Note, this is a time consuming operation, e.g. several seconds or even longer.
// When it comes to a time sentitive context, it should be executed in a separate go-routine.
func (b *Batcher) Exec(ctx context.Context) error {
// build stream data
streamData, err := b.Build()
if err != nil {
return errors.WithMessage(err, "Failed to build stream data")
}

encoded, err := streamData.Encode()
if err != nil {
return errors.WithMessage(err, "Failed to encode data")
}
data, err := core.NewDataInMemory(encoded)
if err != nil {
return err
}

// upload file
uploader, err := transfer.NewUploader(b.flow, b.clients, zg_common.LogOption{Logger: b.logger})
if err != nil {
return err
}
opt := transfer.UploadOption{
Tags: b.BuildTags(),
}
if err = uploader.Upload(ctx, data, opt); err != nil {
return errors.WithMessagef(err, "Failed to upload data")
}
return nil
}
64 changes: 1 addition & 63 deletions kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,9 @@ import (
"context"
"math"

zg_common "github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/contract"
"github.com/0glabs/0g-storage-client/core"
"github.com/0glabs/0g-storage-client/node"
"github.com/0glabs/0g-storage-client/transfer"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// Client is used for users to communicate with server for kv operations.
Expand All @@ -20,10 +15,7 @@ type Client struct {
flow *contract.FlowContract
}

// NewClient creates a new client for kv operations.
//
// Generally, you could refer to the `upload` function in `cmd/upload.go` file
// for how to create storage node client and flow contract client.
// NewClient creates a new client for kv queries.
func NewClient(node *node.Client, flow *contract.FlowContract) *Client {
return &Client{
node: node,
Expand Down Expand Up @@ -124,57 +116,3 @@ func (c *Client) IsWriterOfKey(ctx context.Context, account common.Address, stre
func (c *Client) IsWriterOfStream(ctx context.Context, account common.Address, streamId common.Hash, version ...uint64) (isWriter bool, err error) {
return c.node.KV().IsWriterOfStream(ctx, account, streamId, version...)
}

// Batcher returns a Batcher instance for kv operations in batch.
func (c *Client) Batcher(opts ...zg_common.LogOption) *Batcher {
return newBatcher(math.MaxUint64, c, opts...)
}

type Batcher struct {
*StreamDataBuilder
client *Client
logger *logrus.Logger
}

func newBatcher(version uint64, client *Client, opts ...zg_common.LogOption) *Batcher {
return &Batcher{
StreamDataBuilder: NewStreamDataBuilder(version),
client: client,
logger: zg_common.NewLogger(opts...),
}
}

// Exec submit the kv operations to ZeroGStorage network in batch.
//
// Note, this is a time consuming operation, e.g. several seconds or even longer.
// When it comes to a time sentitive context, it should be executed in a separate go-routine.
func (b *Batcher) Exec(ctx context.Context) error {
// build stream data
streamData, err := b.Build()
if err != nil {
return errors.WithMessage(err, "Failed to build stream data")
}

encoded, err := streamData.Encode()
if err != nil {
return errors.WithMessage(err, "Failed to encode data")
}
data, err := core.NewDataInMemory(encoded)
if err != nil {
return err
}

// upload file
uploader, err := transfer.NewUploader(b.client.flow, []*node.Client{b.client.node}, zg_common.LogOption{Logger: b.logger})
if err != nil {
return err
}
opt := transfer.UploadOption{
Tags: b.BuildTags(),
Force: true,
}
if err = uploader.Upload(ctx, data, opt); err != nil {
return errors.WithMessagef(err, "Failed to upload data")
}
return nil
}
Loading
Loading