Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add logger, context; fix: download failure #16

Merged
merged 5 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/download.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package cmd

import (
"context"

"github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/node"
"github.com/0glabs/0g-storage-client/transfer"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -37,12 +40,12 @@ func init() {
func download(*cobra.Command, []string) {
nodes := node.MustNewClients(downloadArgs.nodes)

downloader, err := transfer.NewDownloader(nodes)
downloader, err := transfer.NewDownloader(nodes, common.LogOption{Logger: logrus.StandardLogger()})
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize downloader")
}

if err := downloader.Download(downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil {
if err := downloader.Download(context.Background(), downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil {
logrus.WithError(err).Fatal("Failed to download file")
}
}
7 changes: 5 additions & 2 deletions cmd/upload.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package cmd

import (
"context"

zg_common "github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/common/blockchain"
"github.com/0glabs/0g-storage-client/contract"
"github.com/0glabs/0g-storage-client/core"
Expand Down Expand Up @@ -68,7 +71,7 @@ func upload(*cobra.Command, []string) {
defer client.Close()
}

uploader, err := transfer.NewUploader(flow, clients)
uploader, err := transfer.NewUploader(flow, clients, zg_common.LogOption{Logger: logrus.StandardLogger()})
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize uploader")
}
Expand All @@ -84,7 +87,7 @@ func upload(*cobra.Command, []string) {
}
defer file.Close()

if err := uploader.Upload(file, opt); err != nil {
if err := uploader.Upload(context.Background(), file, opt); err != nil {
logrus.WithError(err).Fatal("Failed to upload file")
}
}
3 changes: 0 additions & 3 deletions common/blockchain/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/openweb3/web3go"
"github.com/openweb3/web3go/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

var CustomGasPrice uint64
Expand Down Expand Up @@ -50,8 +49,6 @@ func Deploy(clientWithSigner *web3go.Client, dataOrFile string) (common.Address,
return common.Address{}, errors.WithMessage(err, "Failed to send transaction")
}

logrus.WithField("hash", txHash).Info("Transaction sent to blockchain")

receipt, err := WaitForReceipt(clientWithSigner, txHash, true)
if err != nil {
return common.Address{}, errors.WithMessage(err, "Failed to wait for receipt")
Expand Down
7 changes: 4 additions & 3 deletions common/blockchain/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var Web3LogEnabled bool
type RetryOption struct {
Rounds uint
Interval time.Duration
logger *logrus.Logger
}

func MustNewWeb3(url, key string) *web3go.Client {
Expand Down Expand Up @@ -63,13 +64,13 @@ func WaitForReceipt(client *web3go.Client, txHash common.Hash, successRequired b
if len(opts) > 0 {
opt = opts[0]
} else {
// default infinite wait
opt.Rounds = 0
// default 10 rounds
opt.Rounds = 10
opt.Interval = time.Second * 3
}

var tries uint
reminder := util.NewReminder(logrus.TraceLevel, time.Minute)
reminder := util.NewReminder(opt.logger, time.Minute)
for receipt == nil {
if tries > opt.Rounds+1 && opt.Rounds != 0 {
return nil, errors.New("no receipt after max retries")
Expand Down
25 changes: 25 additions & 0 deletions common/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package common

import (
"io"

"github.com/sirupsen/logrus"
)

type LogOption struct {
LogLevel logrus.Level
Logger *logrus.Logger
}

func NewLogger(opt ...LogOption) *logrus.Logger {
logger := logrus.New()
if len(opt) == 0 {
logger.Out = io.Discard
return logger
}
if opt[0].Logger != nil {
return opt[0].Logger
}
logger.SetLevel(opt[0].LogLevel)
return logger
}
4 changes: 3 additions & 1 deletion common/parallel/interface.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package parallel

import "context"

type Result struct {
Routine int
Task int
Expand All @@ -8,6 +10,6 @@ type Result struct {
}

type Interface interface {
ParallelDo(routine, task int) (interface{}, error)
ParallelDo(ctx context.Context, routine, task int) (interface{}, error)
ParallelCollect(result *Result) error
}
6 changes: 3 additions & 3 deletions common/parallel/serial.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
)

func Serial(parallelizable Interface, tasks, routines, window int) error {
func Serial(ctx context.Context, parallelizable Interface, tasks, routines, window int) error {
if tasks == 0 {
return nil
}
Expand All @@ -29,7 +29,7 @@ func Serial(parallelizable Interface, tasks, routines, window int) error {
defer close(resultCh)

var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)

// start routines to do tasks
for i := 0; i < routines; i++ {
Expand All @@ -56,7 +56,7 @@ func work(ctx context.Context, routine int, parallelizable Interface, taskCh <-c
case <-ctx.Done():
return
case task := <-taskCh:
val, err := parallelizable.ParallelDo(routine, task)
val, err := parallelizable.ParallelDo(ctx, routine, task)
resultCh <- &Result{routine, task, val, err}
if err != nil {
return
Expand Down
5 changes: 3 additions & 2 deletions common/parallel/serial_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package parallel

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -11,7 +12,7 @@ type foo struct {
result []int
}

func (f *foo) ParallelDo(routine, task int) (interface{}, error) {
func (f *foo) ParallelDo(ctx context.Context, routine, task int) (interface{}, error) {
return task * task, nil
}

Expand All @@ -30,7 +31,7 @@ func TestSerial(t *testing.T) {

tasks := 100

err := Serial(&f, tasks, 4, 16)
err := Serial(context.Background(), &f, tasks, 4, 16)
assert.Nil(t, err)
assert.Equal(t, tasks, len(f.result))

Expand Down
24 changes: 13 additions & 11 deletions common/util/reminder.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
package util

import (
"io"
"time"

"github.com/sirupsen/logrus"
)

// Reminder is used for time consuming operations to remind user about progress.
type Reminder struct {
start time.Time // start time since last warn
interval time.Duration // interval to warn once
level logrus.Level // log level to remind in general
start time.Time // start time since last warn
interval time.Duration // interval to warn once
logger *logrus.Logger // log level to remind in general
}

// NewReminder returns a new Reminder instance.
//
// `level`: log level to remind in general.
//
// `interval`: interval to remind in warning level.
func NewReminder(level logrus.Level, interval time.Duration) *Reminder {
if level < logrus.InfoLevel {
panic("invalid log level to remind in general")
func NewReminder(logger *logrus.Logger, interval time.Duration) *Reminder {
if logger == nil {
logger = logrus.New()
logger.Out = io.Discard
}

return &Reminder{
start: time.Now(),
interval: interval,
level: level,
logger: logger,
}
}

Expand All @@ -40,15 +42,15 @@ func (reminder *Reminder) Remind(message string, fields ...logrus.Fields) {
if time.Since(reminder.start) > reminder.interval {
reminder.remind(logrus.WarnLevel, message, fields...)
reminder.start = time.Now()
} else if logrus.IsLevelEnabled(reminder.level) {
reminder.remind(reminder.level, message, fields...)
} else {
reminder.remind(reminder.logger.Level, message, fields...)
}
}

func (reminder *Reminder) remind(level logrus.Level, message string, fields ...logrus.Fields) {
if len(fields) > 0 {
logrus.WithFields(fields[0]).Log(level, message)
reminder.logger.WithFields(fields[0]).Log(level, message)
} else {
logrus.StandardLogger().Log(level, message)
reminder.logger.Log(level, message)
}
}
8 changes: 2 additions & 6 deletions core/dataflow.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package core

import (
"context"
"errors"
"runtime"
"time"

"github.com/0glabs/0g-storage-client/common/parallel"
"github.com/0glabs/0g-storage-client/core/merkle"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/sirupsen/logrus"
)

const (
Expand Down Expand Up @@ -38,7 +37,6 @@ type IterableData interface {
}

func MerkleTree(data IterableData) (*merkle.Tree, error) {
stageTimer := time.Now()
var builder merkle.TreeBuilder
initializer := &TreeBuilderInitializer{
data: data,
Expand All @@ -47,13 +45,11 @@ func MerkleTree(data IterableData) (*merkle.Tree, error) {
builder: &builder,
}

err := parallel.Serial(initializer, NumSegmentsPadded(data), runtime.GOMAXPROCS(0), 0)
err := parallel.Serial(context.Background(), initializer, NumSegmentsPadded(data), runtime.GOMAXPROCS(0), 0)
if err != nil {
return nil, err
}

logrus.WithField("duration", time.Since(stageTimer)).Info("create segment root took")

return builder.Build(), nil
}

Expand Down
15 changes: 8 additions & 7 deletions core/flow.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package core

import (
"context"
"math"
"math/big"
"runtime"
"time"

"github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/common/parallel"
"github.com/0glabs/0g-storage-client/contract"
"github.com/0glabs/0g-storage-client/core/merkle"
Expand All @@ -15,10 +16,12 @@ import (
type Flow struct {
data IterableData
tags []byte

logger *logrus.Logger
}

func NewFlow(data IterableData, tags []byte) *Flow {
return &Flow{data, tags}
func NewFlow(data IterableData, tags []byte, opts ...common.LogOption) *Flow {
return &Flow{data: data, tags: tags, logger: common.NewLogger(opts...)}
}

func (flow *Flow) CreateSubmission() (*contract.Submission, error) {
Expand All @@ -28,7 +31,6 @@ func (flow *Flow) CreateSubmission() (*contract.Submission, error) {
Tags: flow.tags,
}

stageTimer := time.Now()
var offset int64
for _, chunks := range flow.splitNodes() {
node, err := flow.createNode(offset, chunks)
Expand All @@ -38,7 +40,6 @@ func (flow *Flow) CreateSubmission() (*contract.Submission, error) {
submission.Nodes = append(submission.Nodes, *node)
offset += chunks * DefaultChunkSize
}
logrus.WithField("duration", time.Since(stageTimer)).Info("create submission nodes took")

return &submission, nil
}
Expand Down Expand Up @@ -88,7 +89,7 @@ func (flow *Flow) splitNodes() []int64 {
}
nextChunkSize /= 2
}
logrus.WithFields(logrus.Fields{
flow.logger.WithFields(logrus.Fields{
"chunks": chunks,
"nodeSize": nodes,
}).Debug("SplitNodes")
Expand All @@ -114,7 +115,7 @@ func (flow *Flow) createSegmentNode(offset, batch, size int64) (*contract.Submis
builder: &builder,
}

err := parallel.Serial(initializer, int((size-1)/batch+1), runtime.GOMAXPROCS(0), 0)
err := parallel.Serial(context.Background(), initializer, int((size-1)/batch+1), runtime.GOMAXPROCS(0), 0)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion core/tree_builder_initializer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package core

import (
"context"

"github.com/0glabs/0g-storage-client/common/parallel"
"github.com/0glabs/0g-storage-client/core/merkle"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -22,7 +24,7 @@ func (t *TreeBuilderInitializer) ParallelCollect(result *parallel.Result) error
}

// ParallelDo implements parallel.Interface.
func (t *TreeBuilderInitializer) ParallelDo(routine int, task int) (interface{}, error) {
func (t *TreeBuilderInitializer) ParallelDo(ctx context.Context, routine int, task int) (interface{}, error) {
offset := t.offset + int64(task)*t.batch
buf, err := ReadAt(t.data, int(t.batch), offset, t.data.PaddedSize())
if err != nil {
Expand Down
Loading
Loading