From 38cc4c7b62ead3732c19442035f36852c10d1562 Mon Sep 17 00:00:00 2001 From: MiniFrenchBread <103425574+MiniFrenchBread@users.noreply.github.com> Date: Wed, 4 Sep 2024 13:54:18 +0800 Subject: [PATCH] feat: add tx hash in return values of upload apis (#52) --- cmd/kv_write.go | 3 ++- cmd/upload.go | 4 +-- example/kv_put/main.go | 2 +- gateway/local_apis.go | 2 +- indexer/client.go | 8 +++--- kv/batcher.go | 18 +++++++------ node/errors_test.go | 39 +++++++++++++++++++++++++++++ tests/go_tests/indexer_test/main.go | 4 +-- transfer/uploader.go | 29 +++++++++++---------- 9 files changed, 77 insertions(+), 32 deletions(-) create mode 100644 node/errors_test.go diff --git a/cmd/kv_write.go b/cmd/kv_write.go index 0539c7f..f25f8a7 100644 --- a/cmd/kv_write.go +++ b/cmd/kv_write.go @@ -151,7 +151,8 @@ func kvWrite(*cobra.Command, []string) { ) } - if err := batcher.Exec(ctx, opt); err != nil { + _, err := batcher.Exec(ctx, opt) + if err != nil { logrus.WithError(err).Fatal("fail to execute kv batch") } } diff --git a/cmd/upload.go b/cmd/upload.go index b5dbeb6..ca6845a 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -119,7 +119,7 @@ func upload(*cobra.Command, []string) { if err != nil { logrus.WithError(err).Fatal("Failed to initialize indexer client") } - if err := indexerClient.Upload(ctx, w3client, file, opt); err != nil { + if _, err := indexerClient.Upload(ctx, w3client, file, opt); err != nil { logrus.WithError(err).Fatal("Failed to upload file") } return @@ -135,7 +135,7 @@ func upload(*cobra.Command, []string) { logrus.WithError(err).Fatal("Failed to initialize uploader") } - if err := uploader.Upload(ctx, file, opt); err != nil { + if _, err := uploader.Upload(ctx, file, opt); err != nil { logrus.WithError(err).Fatal("Failed to upload file") } } diff --git a/example/kv_put/main.go b/example/kv_put/main.go index 97d0266..635adc4 100644 --- a/example/kv_put/main.go +++ b/example/kv_put/main.go @@ -34,7 +34,7 @@ func main() { []byte("TESTKEY1"), []byte{74, 75, 76, 77, 78}, ) - err = batcher.Exec(context.Background()) + _, err = batcher.Exec(context.Background()) if err != nil { fmt.Println(err) return diff --git a/gateway/local_apis.go b/gateway/local_apis.go index aeb02b3..a4f730b 100644 --- a/gateway/local_apis.go +++ b/gateway/local_apis.go @@ -131,7 +131,7 @@ func uploadLocalFile(c *gin.Context) (interface{}, error) { } defer file.Close() - if err := uploader.Upload(context.Background(), file); err != nil { + if _, err := uploader.Upload(context.Background(), file); err != nil { return nil, err } diff --git a/indexer/client.go b/indexer/client.go index 597b646..8d7e7c0 100644 --- a/indexer/client.go +++ b/indexer/client.go @@ -133,7 +133,7 @@ func (c *Client) NewUploaderFromIndexerNodes(ctx context.Context, w3Client *web3 } // Upload submit data to 0g storage contract, then transfer the data to the storage nodes selected from indexer service. -func (c *Client) Upload(ctx context.Context, w3Client *web3go.Client, data core.IterableData, option ...transfer.UploadOption) error { +func (c *Client) Upload(ctx context.Context, w3Client *web3go.Client, data core.IterableData, option ...transfer.UploadOption) (eth_common.Hash, error) { expectedReplica := uint(1) if len(option) > 0 { expectedReplica = max(expectedReplica, option[0].ExpectedReplica) @@ -142,15 +142,15 @@ func (c *Client) Upload(ctx context.Context, w3Client *web3go.Client, data core. for { uploader, err := c.NewUploaderFromIndexerNodes(ctx, w3Client, expectedReplica, dropped) if err != nil { - return err + return eth_common.Hash{}, err } - err = uploader.Upload(ctx, data, option...) + txHash, err := uploader.Upload(ctx, data, option...) var rpcError *node.RPCError if errors.As(err, &rpcError) { dropped = append(dropped, rpcError.URL) c.logger.Infof("dropped problematic node and retry: %v", rpcError.Error()) } else { - return err + return txHash, err } } } diff --git a/kv/batcher.go b/kv/batcher.go index 12bbacd..4b191da 100644 --- a/kv/batcher.go +++ b/kv/batcher.go @@ -7,6 +7,7 @@ import ( "github.com/0glabs/0g-storage-client/core" "github.com/0glabs/0g-storage-client/node" "github.com/0glabs/0g-storage-client/transfer" + "github.com/ethereum/go-ethereum/common" "github.com/openweb3/web3go" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -34,34 +35,35 @@ func NewBatcher(version uint64, clients []*node.ZgsClient, w3Client *web3go.Clie // The submission process is the same as uploading a normal file. The batcher should be dropped after execution. // Note, this may be time consuming operation, e.g. several seconds or even longer. // When it comes to a time sentitive context, it should be executed in a separate go-routine. -func (b *Batcher) Exec(ctx context.Context, option ...transfer.UploadOption) error { +func (b *Batcher) Exec(ctx context.Context, option ...transfer.UploadOption) (common.Hash, error) { // build stream data streamData, err := b.Build() if err != nil { - return errors.WithMessage(err, "Failed to build stream data") + return common.Hash{}, errors.WithMessage(err, "Failed to build stream data") } encoded, err := streamData.Encode() if err != nil { - return errors.WithMessage(err, "Failed to encode data") + return common.Hash{}, errors.WithMessage(err, "Failed to encode data") } data, err := core.NewDataInMemory(encoded) if err != nil { - return err + return common.Hash{}, err } // upload file uploader, err := transfer.NewUploader(ctx, b.w3Client, b.clients, zg_common.LogOption{Logger: b.logger}) if err != nil { - return err + return common.Hash{}, err } var opt transfer.UploadOption if len(option) > 0 { opt = option[0] } opt.Tags = b.buildTags() - if err = uploader.Upload(ctx, data, opt); err != nil { - return errors.WithMessagef(err, "Failed to upload data") + txHash, err := uploader.Upload(ctx, data, opt) + if err != nil { + return txHash, errors.WithMessagef(err, "Failed to upload data") } - return nil + return txHash, nil } diff --git a/node/errors_test.go b/node/errors_test.go new file mode 100644 index 0000000..1c545a1 --- /dev/null +++ b/node/errors_test.go @@ -0,0 +1,39 @@ +package node_test + +import ( + "fmt" + "testing" + + "github.com/pkg/errors" + + "github.com/0glabs/0g-storage-client/node" + "gotest.tools/assert" +) + +func extractRPCError(err error) *node.RPCError { + var rpcError *node.RPCError + if errors.As(err, &rpcError) { + return rpcError + } + return nil +} + +func TestErrorAs(t *testing.T) { + err := fmt.Errorf("123") + assert.Equal(t, extractRPCError(err) == nil, true) + err = &node.RPCError{ + Method: "test", + URL: "127.0.0.1:1234", + Message: "test error", + } + assert.DeepEqual( + t, + extractRPCError(errors.WithMessage(err, "failed to upload")), + err, + ) + assert.DeepEqual( + t, + extractRPCError(errors.WithMessage(errors.WithMessage(err, "failed to upload segment"), "Failed to upload file")), + err, + ) +} diff --git a/tests/go_tests/indexer_test/main.go b/tests/go_tests/indexer_test/main.go index a527b2b..1599dcf 100644 --- a/tests/go_tests/indexer_test/main.go +++ b/tests/go_tests/indexer_test/main.go @@ -39,7 +39,7 @@ func runTest() error { if err != nil { return errors.WithMessage(err, "failed to initialize indexer client") } - if err := indexerClient.Upload(ctx, w3client, data, transfer.UploadOption{ + if _, err := indexerClient.Upload(ctx, w3client, data, transfer.UploadOption{ FinalityRequired: true, }); err != nil { return errors.WithMessage(err, "failed to upload file") @@ -75,7 +75,7 @@ func runTest() error { return errors.WithMessage(err, "failed to initialize uploader") } - if err := uploader.Upload(context.Background(), data, transfer.UploadOption{ + if _, err := uploader.Upload(context.Background(), data, transfer.UploadOption{ FinalityRequired: true, }); err != nil { return errors.WithMessage(err, "failed to upload file") diff --git a/transfer/uploader.go b/transfer/uploader.go index 2c5a62e..7dec5a9 100644 --- a/transfer/uploader.go +++ b/transfer/uploader.go @@ -228,12 +228,12 @@ func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.Iterable if len(toSubmitDatas) > 0 { var err error if txHash, receipt, err = uploader.SubmitLogEntry(ctx, toSubmitDatas, toSubmitTags, waitForLogEntry, opts.Nonce, opts.Fee); err != nil { - return common.Hash{}, nil, errors.WithMessage(err, "Failed to submit log entry") + return txHash, nil, errors.WithMessage(err, "Failed to submit log entry") } if waitForLogEntry { // Wait for storage node to retrieve log entry from blockchain if err := uploader.waitForLogEntry(ctx, lastTreeToSubmit.Root(), false, receipt); err != nil { - return common.Hash{}, nil, errors.WithMessage(err, "Failed to check if log entry available on storage node") + return txHash, nil, errors.WithMessage(err, "Failed to check if log entry available on storage node") } } } @@ -262,7 +262,7 @@ func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.Iterable close(errs) for e := range errs { if e != nil { - return common.Hash{}, nil, e + return txHash, nil, e } } errs = make(chan error, opts.TaskSize) @@ -275,7 +275,8 @@ func (uploader *Uploader) BatchUpload(ctx context.Context, datas []core.Iterable } // Upload submit data to 0g storage contract, then transfer the data to the storage nodes. -func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, option ...UploadOption) error { +// returns the submission transaction hash and the hash will be zero if transaction is skipped. +func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, option ...UploadOption) (common.Hash, error) { stageTimer := time.Now() var opt UploadOption @@ -292,21 +293,23 @@ func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, op // Calculate file merkle root. tree, err := core.MerkleTree(data) if err != nil { - return errors.WithMessage(err, "Failed to create data merkle tree") + return common.Hash{}, errors.WithMessage(err, "Failed to create data merkle tree") } uploader.logger.WithField("root", tree.Root()).Info("Data merkle root calculated") // Check existance exist, err := uploader.checkLogExistance(ctx, tree.Root()) if err != nil { - return errors.WithMessage(err, "Failed to check if skipped log entry available on storage node") + return common.Hash{}, errors.WithMessage(err, "Failed to check if skipped log entry available on storage node") } + txHash := common.Hash{} // Append log on blockchain if !opt.SkipTx || !exist { var receipt *types.Receipt - if _, receipt, err = uploader.SubmitLogEntry(ctx, []core.IterableData{data}, [][]byte{opt.Tags}, true, opt.Nonce, opt.Fee); err != nil { - return errors.WithMessage(err, "Failed to submit log entry") + txHash, receipt, err = uploader.SubmitLogEntry(ctx, []core.IterableData{data}, [][]byte{opt.Tags}, true, opt.Nonce, opt.Fee) + if err != nil { + return txHash, errors.WithMessage(err, "Failed to submit log entry") } // For small data, could upload file to storage node immediately. @@ -317,24 +320,24 @@ func (uploader *Uploader) Upload(ctx context.Context, data core.IterableData, op } else { // Wait for storage node to retrieve log entry from blockchain if err = uploader.waitForLogEntry(ctx, tree.Root(), false, receipt); err != nil { - return errors.WithMessage(err, "Failed to check if log entry available on storage node") + return txHash, errors.WithMessage(err, "Failed to check if log entry available on storage node") } } } // Upload file to storage node - if err = uploader.uploadFile(ctx, data, tree, opt.ExpectedReplica, opt.TaskSize); err != nil { - return errors.WithMessage(err, "Failed to upload file") + if err := uploader.uploadFile(ctx, data, tree, opt.ExpectedReplica, opt.TaskSize); err != nil { + return txHash, errors.WithMessage(err, "Failed to upload file") } // Wait for transaction finality if err = uploader.waitForLogEntry(ctx, tree.Root(), opt.FinalityRequired, nil); err != nil { - return errors.WithMessage(err, "Failed to wait for transaction finality on storage node") + return txHash, errors.WithMessage(err, "Failed to wait for transaction finality on storage node") } uploader.logger.WithField("duration", time.Since(stageTimer)).Info("upload took") - return nil + return txHash, nil } // SubmitLogEntry submit the data to 0g storage contract by sending a transaction