Skip to content

Commit

Permalink
refactor: visibility, add comments (#22)
Browse files Browse the repository at this point in the history
* refactor: shard, check shard coverage on upload

* feat: indexer upload api

* feat: expectedReplica

* refactor: cli

* feat: skipTx

* fix: cli

* doc: readme

* fix: cli argument description

* fix: remove unused params

* refactor: visibility, add comments
  • Loading branch information
MiniFrenchBread authored Jul 16, 2024
1 parent 004a858 commit 04fb59a
Show file tree
Hide file tree
Showing 24 changed files with 265 additions and 152 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ Go implementation for client to interact with storage nodes in 0G Storage networ

# SDK

Application could use a `node/Client` instance to interact with storage node via JSON RPC. Especially, use `Client.KV()` for **KV** operations.
Following packages can help applications to integrate with 0g storage network:

**[transfer](transfer)** : defines data structures and functions for transferring data between local and 0g storage.
**[kv]**: defines structures to interact with 0g storage kv.
**[indexer]**: select storage nodes to upload data from indexer which maintains trusted node list.
**[node]**: defines RPC client structures to facilitate RPC interactions with 0g storage nodes and 0g key-value (KV) nodes.

# CLI
Run `go build` under the root folder to compile the executable binary.
Expand Down
5 changes: 5 additions & 0 deletions core/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
EmptyChunkHash = crypto.Keccak256Hash(EmptyChunk)
)

// IterableData defines the data interface to upload to 0g storage network.
type IterableData interface {
NumChunks() uint64
NumSegments() uint64
Expand All @@ -36,6 +37,7 @@ type IterableData interface {
Read(buf []byte, offset int64) (int, error)
}

// MerkleTree create merkle tree of the data.
func MerkleTree(data IterableData) (*merkle.Tree, error) {
var builder merkle.TreeBuilder
initializer := &TreeBuilderInitializer{
Expand All @@ -57,10 +59,12 @@ func NumSplits(total int64, unit int) uint64 {
return uint64((total-1)/int64(unit) + 1)
}

// NumSegmentsPadded return the number of segments of padded data
func NumSegmentsPadded(data IterableData) int {
return int((data.PaddedSize()-1)/DefaultSegmentSize + 1)
}

// SegmentRoot return the merkle root of given chunks
func SegmentRoot(chunks []byte, emptyChunksPadded ...uint64) common.Hash {
var builder merkle.TreeBuilder

Expand Down Expand Up @@ -90,6 +94,7 @@ func paddingZeros(buf []byte, startOffset int, length int) {
}
}

// ReadAt read data at specified offset, paddedSize is the size of data after padding.
func ReadAt(data IterableData, readSize int, offset int64, paddedSize uint64) ([]byte, error) {
// Reject invalid offset
if offset < 0 || uint64(offset) >= paddedSize {
Expand Down
2 changes: 2 additions & 0 deletions core/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package core defines structure to hold data and relative functions.
package core
2 changes: 2 additions & 0 deletions core/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var (
ErrFileEmpty = errors.New("file is empty")
)

// File implement of IterableData, the underlying is a file on disk
type File struct {
os.FileInfo
underlying *os.File
Expand Down Expand Up @@ -46,6 +47,7 @@ func Exists(name string) (bool, error) {
return true, nil
}

// Open create a File from a file on disk
func Open(name string) (*File, error) {
file, err := os.Open(name)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions core/in_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package core

import "errors"

// DataInMemory implement of IterableData, the underlying is memory data
type DataInMemory struct {
underlying []byte
paddedSize uint64
}

var _ IterableData = (*DataInMemory)(nil)

// NewDataInMemory creates DataInMemory from given data
func NewDataInMemory(data []byte) (*DataInMemory, error) {
if len(data) == 0 {
return nil, errors.New("data is empty")
Expand Down
3 changes: 3 additions & 0 deletions indexer/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@ import (
// Requires `indexerApi` implements the `Interface` interface.
var _ Interface = (*IndexerApi)(nil)

// IndexerApi indexer service configuration
type IndexerApi struct {
Namespace string
nodes []*node.Client
}

// NewIndexerApi creates indexer service configuration
func NewIndexerApi(nodes []*node.Client) *IndexerApi {
return &IndexerApi{"indexer", nodes}
}

// GetNodes return storage node list
func (api *IndexerApi) GetNodes(ctx context.Context) ([]shard.ShardedNode, error) {
var result []shard.ShardedNode

Expand Down
10 changes: 9 additions & 1 deletion indexer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@ import (
// Requires `Client` implements the `Interface` interface.
var _ Interface = (*Client)(nil)

// Client indexer client
type Client struct {
interfaces.Provider
option IndexerClientOption
}

// IndexerClientOption indexer client option
type IndexerClientOption struct {
ProviderOption providers.Option
LogOption common.LogOption
LogOption common.LogOption // log option when uploading data
}

// NewClient create new indexer client, url is indexer service url
func NewClient(url string, option ...IndexerClientOption) (*Client, error) {
var opt IndexerClientOption
if len(option) > 0 {
Expand All @@ -46,11 +49,13 @@ func NewClient(url string, option ...IndexerClientOption) (*Client, error) {
}, nil
}

// GetNodes get node list from indexer service
func (c *Client) GetNodes(ctx context.Context) (nodes []shard.ShardedNode, err error) {
err = c.Provider.CallContext(ctx, &nodes, "indexer_getNodes")
return
}

// SelectNodes get node list from indexer service and select a subset of it, which is sufficient to store expected number of replications.
func (c *Client) SelectNodes(ctx context.Context, expectedReplica uint) ([]*node.Client, error) {
nodes, err := c.GetNodes(ctx)
if err != nil {
Expand All @@ -70,6 +75,7 @@ func (c *Client) SelectNodes(ctx context.Context, expectedReplica uint) ([]*node
return clients, nil
}

// NewUploaderFromIndexerNodes return an uploader with selected storage nodes from indexer service.
func (c *Client) NewUploaderFromIndexerNodes(ctx context.Context, flow *contract.FlowContract, expectedReplica uint) (*transfer.Uploader, error) {
clients, err := c.SelectNodes(ctx, expectedReplica)
if err != nil {
Expand All @@ -78,6 +84,7 @@ func (c *Client) NewUploaderFromIndexerNodes(ctx context.Context, flow *contract
return transfer.NewUploader(flow, clients, c.option.LogOption)
}

// Upload submit data to 0g storage contract, then transfer the data to the storage nodes selected from indexer service.
func (c *Client) Upload(ctx context.Context, flow *contract.FlowContract, data core.IterableData, option ...transfer.UploadOption) error {
expectedReplica := uint(1)
if len(option) > 0 {
Expand All @@ -90,6 +97,7 @@ func (c *Client) Upload(ctx context.Context, flow *contract.FlowContract, data c
return uploader.Upload(ctx, data, option...)
}

// BatchUpload submit multiple data to 0g storage contract batchly in single on-chain transaction, then transfer the data to the storage nodes selected from indexer service.
func (c *Client) BatchUpload(ctx context.Context, flow *contract.FlowContract, datas []core.IterableData, waitForLogEntry bool, option ...[]transfer.UploadOption) (eth_common.Hash, []eth_common.Hash, error) {
expectedReplica := uint(1)
if len(option) > 0 {
Expand Down
3 changes: 3 additions & 0 deletions indexer/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package indexer defines indexer server and client.
// The indexer server is a service that maintain a trusted storage node list, client can fetch the list from server and select nodes in list to upload data.
package indexer
9 changes: 9 additions & 0 deletions kv/Iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@ import (

var errIteratorInvalid = errors.New("iterator is invalid")

// Iterator to iterate over a kv stream
type Iterator struct {
client *Client
streamId common.Hash
version uint64
currentPair *node.KeyValue
}

// Valid check if current position is exist
func (iter *Iterator) Valid() bool {
return iter.currentPair != nil
}

// KeyValue return key-value at current position
func (iter *Iterator) KeyValue() *node.KeyValue {
return iter.currentPair
}
Expand All @@ -43,6 +46,7 @@ func (iter *Iterator) move(ctx context.Context, kv *node.KeyValue) error {
return nil
}

// SeekBefore seek to the position before given key(inclusive)
func (iter *Iterator) SeekBefore(ctx context.Context, key []byte) error {
kv, err := iter.client.GetPrev(ctx, iter.streamId, key, 0, 0, true, iter.version)
if err != nil {
Expand All @@ -51,6 +55,7 @@ func (iter *Iterator) SeekBefore(ctx context.Context, key []byte) error {
return iter.move(ctx, kv)
}

// SeekAfter seek to the position after given key(inclusive)
func (iter *Iterator) SeekAfter(ctx context.Context, key []byte) error {
kv, err := iter.client.GetNext(ctx, iter.streamId, key, 0, 0, true, iter.version)
if err != nil {
Expand All @@ -59,6 +64,7 @@ func (iter *Iterator) SeekAfter(ctx context.Context, key []byte) error {
return iter.move(ctx, kv)
}

// SeekToFirst seek to the first position
func (iter *Iterator) SeekToFirst(ctx context.Context) error {
kv, err := iter.client.GetFirst(ctx, iter.streamId, 0, 0, iter.version)
if err != nil {
Expand All @@ -67,6 +73,7 @@ func (iter *Iterator) SeekToFirst(ctx context.Context) error {
return iter.move(ctx, kv)
}

// SeekToLast seek to the last position
func (iter *Iterator) SeekToLast(ctx context.Context) error {
kv, err := iter.client.GetLast(ctx, iter.streamId, 0, 0, iter.version)
if err != nil {
Expand All @@ -75,6 +82,7 @@ func (iter *Iterator) SeekToLast(ctx context.Context) error {
return iter.move(ctx, kv)
}

// Next move to the next position
func (iter *Iterator) Next(ctx context.Context) error {
if !iter.Valid() {
return errIteratorInvalid
Expand All @@ -86,6 +94,7 @@ func (iter *Iterator) Next(ctx context.Context) error {
return iter.move(ctx, kv)
}

// Prev move to the prev position
func (iter *Iterator) Prev(ctx context.Context) error {
if !iter.Valid() {
return errIteratorInvalid
Expand Down
14 changes: 8 additions & 6 deletions kv/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,27 @@ import (
"github.com/sirupsen/logrus"
)

// Batcher struct to cache and execute KV write and access control operations.
type Batcher struct {
*StreamDataBuilder
*streamDataBuilder
clients []*node.Client
flow *contract.FlowContract
logger *logrus.Logger
}

// NewBatcher Initialize a new batcher. Version denotes the expected version of keys to read or write when the cached KV operations is settled on chain.
func NewBatcher(version uint64, clients []*node.Client, flow *contract.FlowContract, opts ...zg_common.LogOption) *Batcher {
return &Batcher{
StreamDataBuilder: NewStreamDataBuilder(version),
streamDataBuilder: newStreamDataBuilder(version),
clients: clients,
flow: flow,
logger: zg_common.NewLogger(opts...),
}
}

// Exec submit the kv operations to ZeroGStorage network in batch.
//
// Note, this is a time consuming operation, e.g. several seconds or even longer.
// Exec Serialize the cached KV operations in Batcher, then submit the serialized data to 0g storage network.
// The submission process is the same as uploading a normal file. The batcher should be dropped after execution.
// Note, this may be time consuming operation, e.g. several seconds or even longer.
// When it comes to a time sentitive context, it should be executed in a separate go-routine.
func (b *Batcher) Exec(ctx context.Context, option ...transfer.UploadOption) error {
// build stream data
Expand All @@ -57,7 +59,7 @@ func (b *Batcher) Exec(ctx context.Context, option ...transfer.UploadOption) err
if len(option) > 0 {
opt = option[0]
}
opt.Tags = b.BuildTags()
opt.Tags = b.buildTags()
if err = uploader.Upload(ctx, data, opt); err != nil {
return errors.WithMessagef(err, "Failed to upload data")
}
Expand Down
Loading

0 comments on commit 04fb59a

Please sign in to comment.