Skip to content

Commit

Permalink
feat: add tx hash in return values of upload apis (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
MiniFrenchBread authored Sep 4, 2024
1 parent 2cf72f6 commit 38cc4c7
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 32 deletions.
3 changes: 2 additions & 1 deletion cmd/kv_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func kvWrite(*cobra.Command, []string) {
)
}

if err := batcher.Exec(ctx, opt); err != nil {
_, err := batcher.Exec(ctx, opt)
if err != nil {
logrus.WithError(err).Fatal("fail to execute kv batch")
}
}
4 changes: 2 additions & 2 deletions cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func upload(*cobra.Command, []string) {
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize indexer client")
}
if err := indexerClient.Upload(ctx, w3client, file, opt); err != nil {
if _, err := indexerClient.Upload(ctx, w3client, file, opt); err != nil {
logrus.WithError(err).Fatal("Failed to upload file")
}
return
Expand All @@ -135,7 +135,7 @@ func upload(*cobra.Command, []string) {
logrus.WithError(err).Fatal("Failed to initialize uploader")
}

if err := uploader.Upload(ctx, file, opt); err != nil {
if _, err := uploader.Upload(ctx, file, opt); err != nil {
logrus.WithError(err).Fatal("Failed to upload file")
}
}
2 changes: 1 addition & 1 deletion example/kv_put/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {
[]byte("TESTKEY1"),
[]byte{74, 75, 76, 77, 78},
)
err = batcher.Exec(context.Background())
_, err = batcher.Exec(context.Background())
if err != nil {
fmt.Println(err)
return
Expand Down
2 changes: 1 addition & 1 deletion gateway/local_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func uploadLocalFile(c *gin.Context) (interface{}, error) {
}
defer file.Close()

if err := uploader.Upload(context.Background(), file); err != nil {
if _, err := uploader.Upload(context.Background(), file); err != nil {
return nil, err
}

Expand Down
8 changes: 4 additions & 4 deletions indexer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (c *Client) NewUploaderFromIndexerNodes(ctx context.Context, w3Client *web3
}

// Upload submit data to 0g storage contract, then transfer the data to the storage nodes selected from indexer service.
func (c *Client) Upload(ctx context.Context, w3Client *web3go.Client, data core.IterableData, option ...transfer.UploadOption) error {
func (c *Client) Upload(ctx context.Context, w3Client *web3go.Client, data core.IterableData, option ...transfer.UploadOption) (eth_common.Hash, error) {
expectedReplica := uint(1)
if len(option) > 0 {
expectedReplica = max(expectedReplica, option[0].ExpectedReplica)
Expand All @@ -142,15 +142,15 @@ func (c *Client) Upload(ctx context.Context, w3Client *web3go.Client, data core.
for {
uploader, err := c.NewUploaderFromIndexerNodes(ctx, w3Client, expectedReplica, dropped)
if err != nil {
return err
return eth_common.Hash{}, err
}
err = uploader.Upload(ctx, data, option...)
txHash, err := uploader.Upload(ctx, data, option...)
var rpcError *node.RPCError
if errors.As(err, &rpcError) {
dropped = append(dropped, rpcError.URL)
c.logger.Infof("dropped problematic node and retry: %v", rpcError.Error())
} else {
return err
return txHash, err
}
}
}
Expand Down
18 changes: 10 additions & 8 deletions kv/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/0glabs/0g-storage-client/core"
"github.com/0glabs/0g-storage-client/node"
"github.com/0glabs/0g-storage-client/transfer"
"github.com/ethereum/go-ethereum/common"
"github.com/openweb3/web3go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -34,34 +35,35 @@ func NewBatcher(version uint64, clients []*node.ZgsClient, w3Client *web3go.Clie
// The submission process is the same as uploading a normal file. The batcher should be dropped after execution.
// Note, this may be time consuming operation, e.g. several seconds or even longer.
// When it comes to a time sentitive context, it should be executed in a separate go-routine.
func (b *Batcher) Exec(ctx context.Context, option ...transfer.UploadOption) error {
func (b *Batcher) Exec(ctx context.Context, option ...transfer.UploadOption) (common.Hash, error) {
// build stream data
streamData, err := b.Build()
if err != nil {
return errors.WithMessage(err, "Failed to build stream data")
return common.Hash{}, errors.WithMessage(err, "Failed to build stream data")
}

encoded, err := streamData.Encode()
if err != nil {
return errors.WithMessage(err, "Failed to encode data")
return common.Hash{}, errors.WithMessage(err, "Failed to encode data")
}
data, err := core.NewDataInMemory(encoded)
if err != nil {
return err
return common.Hash{}, err
}

// upload file
uploader, err := transfer.NewUploader(ctx, b.w3Client, b.clients, zg_common.LogOption{Logger: b.logger})
if err != nil {
return err
return common.Hash{}, err
}
var opt transfer.UploadOption
if len(option) > 0 {
opt = option[0]
}
opt.Tags = b.buildTags()
if err = uploader.Upload(ctx, data, opt); err != nil {
return errors.WithMessagef(err, "Failed to upload data")
txHash, err := uploader.Upload(ctx, data, opt)
if err != nil {
return txHash, errors.WithMessagef(err, "Failed to upload data")
}
return nil
return txHash, nil
}
39 changes: 39 additions & 0 deletions node/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package node_test

import (
"fmt"
"testing"

"github.com/pkg/errors"

"github.com/0glabs/0g-storage-client/node"
"gotest.tools/assert"
)

func extractRPCError(err error) *node.RPCError {
var rpcError *node.RPCError
if errors.As(err, &rpcError) {
return rpcError
}
return nil
}

func TestErrorAs(t *testing.T) {
err := fmt.Errorf("123")
assert.Equal(t, extractRPCError(err) == nil, true)
err = &node.RPCError{
Method: "test",
URL: "127.0.0.1:1234",
Message: "test error",
}
assert.DeepEqual(
t,
extractRPCError(errors.WithMessage(err, "failed to upload")),
err,
)
assert.DeepEqual(
t,
extractRPCError(errors.WithMessage(errors.WithMessage(err, "failed to upload segment"), "Failed to upload file")),
err,
)
}
4 changes: 2 additions & 2 deletions tests/go_tests/indexer_test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func runTest() error {
if err != nil {
return errors.WithMessage(err, "failed to initialize indexer client")
}
if err := indexerClient.Upload(ctx, w3client, data, transfer.UploadOption{
if _, err := indexerClient.Upload(ctx, w3client, data, transfer.UploadOption{
FinalityRequired: true,
}); err != nil {
return errors.WithMessage(err, "failed to upload file")
Expand Down Expand Up @@ -75,7 +75,7 @@ func runTest() error {
return errors.WithMessage(err, "failed to initialize uploader")
}

if err := uploader.Upload(context.Background(), data, transfer.UploadOption{
if _, err := uploader.Upload(context.Background(), data, transfer.UploadOption{
FinalityRequired: true,
}); err != nil {
return errors.WithMessage(err, "failed to upload file")
Expand Down
29 changes: 16 additions & 13 deletions transfer/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,12 @@ func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.Iterable
if len(toSubmitDatas) > 0 {
var err error
if txHash, receipt, err = uploader.SubmitLogEntry(ctx, toSubmitDatas, toSubmitTags, waitForLogEntry, opts.Nonce, opts.Fee); err != nil {
return common.Hash{}, nil, errors.WithMessage(err, "Failed to submit log entry")
return txHash, nil, errors.WithMessage(err, "Failed to submit log entry")
}
if waitForLogEntry {
// Wait for storage node to retrieve log entry from blockchain
if err := uploader.waitForLogEntry(ctx, lastTreeToSubmit.Root(), false, receipt); err != nil {
return common.Hash{}, nil, errors.WithMessage(err, "Failed to check if log entry available on storage node")
return txHash, nil, errors.WithMessage(err, "Failed to check if log entry available on storage node")
}
}
}
Expand Down Expand Up @@ -262,7 +262,7 @@ func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.Iterable
close(errs)
for e := range errs {
if e != nil {
return common.Hash{}, nil, e
return txHash, nil, e
}
}
errs = make(chan error, opts.TaskSize)
Expand All @@ -275,7 +275,8 @@ func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.Iterable
}

// Upload submit data to 0g storage contract, then transfer the data to the storage nodes.
func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, option ...UploadOption) error {
// returns the submission transaction hash and the hash will be zero if transaction is skipped.
func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, option ...UploadOption) (common.Hash, error) {
stageTimer := time.Now()

var opt UploadOption
Expand All @@ -292,21 +293,23 @@ func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, op
// Calculate file merkle root.
tree, err := core.MerkleTree(data)
if err != nil {
return errors.WithMessage(err, "Failed to create data merkle tree")
return common.Hash{}, errors.WithMessage(err, "Failed to create data merkle tree")
}
uploader.logger.WithField("root", tree.Root()).Info("Data merkle root calculated")

// Check existance
exist, err := uploader.checkLogExistance(ctx, tree.Root())
if err != nil {
return errors.WithMessage(err, "Failed to check if skipped log entry available on storage node")
return common.Hash{}, errors.WithMessage(err, "Failed to check if skipped log entry available on storage node")
}
txHash := common.Hash{}
// Append log on blockchain
if !opt.SkipTx || !exist {
var receipt *types.Receipt

if _, receipt, err = uploader.SubmitLogEntry(ctx, []core.IterableData{data}, [][]byte{opt.Tags}, true, opt.Nonce, opt.Fee); err != nil {
return errors.WithMessage(err, "Failed to submit log entry")
txHash, receipt, err = uploader.SubmitLogEntry(ctx, []core.IterableData{data}, [][]byte{opt.Tags}, true, opt.Nonce, opt.Fee)
if err != nil {
return txHash, errors.WithMessage(err, "Failed to submit log entry")
}

// For small data, could upload file to storage node immediately.
Expand All @@ -317,24 +320,24 @@ func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, op
} else {
// Wait for storage node to retrieve log entry from blockchain
if err = uploader.waitForLogEntry(ctx, tree.Root(), false, receipt); err != nil {
return errors.WithMessage(err, "Failed to check if log entry available on storage node")
return txHash, errors.WithMessage(err, "Failed to check if log entry available on storage node")
}
}
}

// Upload file to storage node
if err = uploader.uploadFile(ctx, data, tree, opt.ExpectedReplica, opt.TaskSize); err != nil {
return errors.WithMessage(err, "Failed to upload file")
if err := uploader.uploadFile(ctx, data, tree, opt.ExpectedReplica, opt.TaskSize); err != nil {
return txHash, errors.WithMessage(err, "Failed to upload file")
}

// Wait for transaction finality
if err = uploader.waitForLogEntry(ctx, tree.Root(), opt.FinalityRequired, nil); err != nil {
return errors.WithMessage(err, "Failed to wait for transaction finality on storage node")
return txHash, errors.WithMessage(err, "Failed to wait for transaction finality on storage node")
}

uploader.logger.WithField("duration", time.Since(stageTimer)).Info("upload took")

return nil
return txHash, nil
}

// SubmitLogEntry submit the data to 0g storage contract by sending a transaction
Expand Down

0 comments on commit 38cc4c7

Please sign in to comment.