Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/sync): Implement state sync strategy #4425

Open
wants to merge 61 commits into
base: diego/warpsync/strategy
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
0dac186
feat(dot/sync): Implement warp sync strategy
dimartiro Oct 22, 2024
d8244fa
Add missing license
dimartiro Oct 22, 2024
3a9b827
Add result method in strategies and switch logic
dimartiro Oct 24, 2024
5b0e502
Fix tests
dimartiro Oct 24, 2024
85ff377
Small fixes and first tests
dimartiro Oct 28, 2024
17aa919
Modify current strategy selection
dimartiro Oct 28, 2024
11aac02
Share peer view set between strategies
dimartiro Oct 28, 2024
2357b24
Add block announce tests
dimartiro Oct 28, 2024
f0a4f15
Add block announce handshake tests
dimartiro Oct 28, 2024
d701d32
Change comment
dimartiro Oct 28, 2024
6f336d6
Fix lint
dimartiro Oct 29, 2024
67bb2f8
Add CLI flag for sync mode
dimartiro Oct 29, 2024
2c3220c
Lint
dimartiro Oct 29, 2024
3d164da
Lint
dimartiro Oct 29, 2024
aafd47a
Use right procotol id for sync
dimartiro Oct 31, 2024
5ce24b3
Fix comment
dimartiro Oct 31, 2024
b41343b
Add warp sync message decode test
dimartiro Oct 31, 2024
46a565b
Add missing license
dimartiro Oct 31, 2024
e52181c
Remove import
dimartiro Oct 31, 2024
eed6979
Use right justification number for generic
dimartiro Nov 1, 2024
8d2d60c
Disable linting lll
dimartiro Nov 1, 2024
fa76dd1
Fix types
dimartiro Nov 1, 2024
f7c4516
Small fixes
dimartiro Nov 1, 2024
66d4722
Create warp sync provider interface
dimartiro Nov 1, 2024
1453a66
Remove unnecesary logs
dimartiro Nov 1, 2024
09c9a4a
Clean up, remove logs and local tests
dimartiro Nov 1, 2024
0289d61
Lint
dimartiro Nov 1, 2024
9839f0a
Lint
dimartiro Nov 1, 2024
47901df
Fix Test_PeerSupportsProtocol test
dimartiro Nov 4, 2024
65e4086
Remove TODO
dimartiro Nov 4, 2024
31beb8f
Fix findScheduledChange
dimartiro Nov 4, 2024
d8fc337
Improve logs and remove tests
dimartiro Nov 4, 2024
a2a557b
Improve logs
dimartiro Nov 4, 2024
9e336bd
Fix get target block
dimartiro Nov 5, 2024
e0cfc62
Update comment
dimartiro Nov 5, 2024
9ae044a
Remove comment
dimartiro Nov 5, 2024
306c84c
Refactor
dimartiro Nov 5, 2024
7a8d910
Update comment
dimartiro Nov 5, 2024
f89b54b
Add error message in panic
dimartiro Nov 5, 2024
f3b11b2
Fix get target block
dimartiro Nov 9, 2024
d245c46
Add warp sync proof fixture
dimartiro Dec 5, 2024
5cb57cf
Move types to warpsync package
dimartiro Dec 5, 2024
5ce032d
Remove unnused linter comment
dimartiro Dec 16, 2024
2402582
Improve logs
dimartiro Jan 7, 2025
d3313f8
Fix findScheduledChange
dimartiro Nov 4, 2024
6c107c1
Small changes
dimartiro Nov 11, 2024
14c9312
Retrieve state and store it in in memory trie
dimartiro Dec 11, 2024
3628a01
Store state in our db
dimartiro Dec 12, 2024
e39446a
Implement show metrics for state sync
dimartiro Dec 12, 2024
46f7b8a
Implement StateRequestProvider as part of sync package
dimartiro Dec 16, 2024
1d5c8ae
Replace retrieve state script provider with the one in sync pkg
dimartiro Dec 16, 2024
57ab3b8
Create and switch to state sync strategy
dimartiro Dec 19, 2024
ca41978
Fix response type
dimartiro Dec 19, 2024
a3bf542
Remove log
dimartiro Dec 19, 2024
941480d
Add unit tests for StateRequestProvider
dimartiro Dec 19, 2024
af91223
Remove TODO
dimartiro Dec 19, 2024
7e5d219
Improve sync service config and creation
dimartiro Dec 19, 2024
d6443a2
Improve logs
dimartiro Jan 8, 2025
09d6021
Fix lint
dimartiro Jan 8, 2025
b6a35fa
Fix tests
dimartiro Jan 8, 2025
661797c
Fix lint
dimartiro Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chain/kusama/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.SyncMode = cfg.FullSync

return config
}
1 change: 1 addition & 0 deletions chain/paseo/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.SyncMode = cfg.FullSync

return config
}
1 change: 1 addition & 0 deletions chain/polkadot/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.SyncMode = cfg.FullSync

return config
}
1 change: 1 addition & 0 deletions chain/westend-dev/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func DefaultConfig() *cfg.Config {
config.RPC.UnsafeRPC = true
config.RPC.WSExternal = true
config.RPC.UnsafeWSExternal = true
config.Core.SyncMode = cfg.FullSync

return config
}
1 change: 1 addition & 0 deletions chain/westend-local/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func DefaultConfig() *cfg.Config {
config.RPC.UnsafeRPC = true
config.RPC.WSExternal = true
config.RPC.UnsafeWSExternal = true
config.Core.SyncMode = cfg.FullSync

return config
}
Expand Down
1 change: 1 addition & 0 deletions chain/westend/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.SyncMode = cfg.FullSync

return config
}
11 changes: 11 additions & 0 deletions cmd/gossamer/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var (
role string
// validator when set, the node will be an authority
validator bool
// Sync mode [warp | full]
syncMode string

// Account Config
// key to use for the node
Expand Down Expand Up @@ -102,6 +104,10 @@ Usage:
return fmt.Errorf("failed to parse role: %s", err)
}

if err := parseSyncMode(); err != nil {
return fmt.Errorf("failed to parse sync mode: %s", err)
}

if err := parseTelemetryURL(); err != nil {
return fmt.Errorf("failed to parse telemetry-url: %s", err.Error())
}
Expand Down Expand Up @@ -529,6 +535,11 @@ func addCoreFlags(cmd *cobra.Command) error {
return fmt.Errorf("failed to add --grandpa-interval flag: %s", err)
}

cmd.Flags().StringVar(&syncMode,
"sync",
cfg.FullSync.String(),
"Sync mode. One of 'full' or 'warp'.")

return nil
}

Expand Down
17 changes: 17 additions & 0 deletions cmd/gossamer/commands/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,23 @@ func parseRole() error {
return nil
}

// parseSyncMode parses the sync mode from the command line flags
func parseSyncMode() error {
var selectedSyncMode cfg.SyncMode
switch syncMode {
case cfg.FullSync.String():
selectedSyncMode = cfg.FullSync
case cfg.WarpSync.String():
selectedSyncMode = cfg.WarpSync
default:
return fmt.Errorf("invalid sync mode: %s", role)
}

config.Core.SyncMode = selectedSyncMode
viper.Set("core.syncMode", config.Core.SyncMode)
return nil
}

// parseTelemetryURL parses the telemetry-url from the command line flag
func parseTelemetryURL() error {
if telemetryURLs == "" {
Expand Down
20 changes: 20 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ const (
DefaultSystemName = "Gossamer"
// DefaultSystemVersion is the default system version
DefaultSystemVersion = "0.0.0"

// DefaultSyncMode is the default block sync mode
DefaultSyncMode = FullSync
)

// DefaultRPCModules the default RPC modules
Expand Down Expand Up @@ -188,6 +191,7 @@ type CoreConfig struct {
GrandpaAuthority bool `mapstructure:"grandpa-authority"`
WasmInterpreter string `mapstructure:"wasm-interpreter,omitempty"`
GrandpaInterval time.Duration `mapstructure:"grandpa-interval,omitempty"`
SyncMode SyncMode `mapstructure:"sync,omitempty"`
}

// StateConfig contains the configuration for the state.
Expand Down Expand Up @@ -363,6 +367,7 @@ func DefaultConfig() *Config {
GrandpaAuthority: true,
WasmInterpreter: DefaultWasmInterpreter,
GrandpaInterval: DefaultDiscoveryInterval,
SyncMode: DefaultSyncMode,
},
Network: &NetworkConfig{
Port: DefaultNetworkPort,
Expand Down Expand Up @@ -444,6 +449,7 @@ func DefaultConfigFromSpec(nodeSpec *genesis.Genesis) *Config {
GrandpaAuthority: true,
WasmInterpreter: DefaultWasmInterpreter,
GrandpaInterval: DefaultDiscoveryInterval,
SyncMode: DefaultSyncMode,
},
Network: &NetworkConfig{
Port: DefaultNetworkPort,
Expand Down Expand Up @@ -525,6 +531,7 @@ func Copy(c *Config) Config {
GrandpaAuthority: c.Core.GrandpaAuthority,
WasmInterpreter: c.Core.WasmInterpreter,
GrandpaInterval: c.Core.GrandpaInterval,
SyncMode: c.Core.SyncMode,
},
Network: &NetworkConfig{
Port: c.Network.Port,
Expand Down Expand Up @@ -604,6 +611,19 @@ func (c Chain) String() string {
return string(c)
}

// SyncMode is a string representing a sync mode
type SyncMode string

const (
FullSync SyncMode = "full"
WarpSync SyncMode = "warp"
StateSync SyncMode = "state"
)

func (n SyncMode) String() string {
return string(n)
}

// NetworkRole is a string representing a network role
type NetworkRole string

Expand Down
2 changes: 1 addition & 1 deletion dot/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (d *discovery) advertise() {

ttl, err = d.rd.Advertise(d.ctx, string(d.pid))
if err != nil {
logger.Warnf("failed to advertise in the DHT: %s", err)
logger.Debugf("failed to advertise in the DHT: %s", err)
ttl = tryAdvertiseTimeout
}
}
Expand Down
12 changes: 11 additions & 1 deletion dot/network/host_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package network

import (
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -396,12 +397,21 @@ func Test_PeerSupportsProtocol(t *testing.T) {
}
require.NoError(t, err)

genesisHash := nodeA.blockState.GenesisHash().String()
genesisHash = strings.TrimPrefix(genesisHash, "0x")
fullSyncProtocolId := fmt.Sprintf("/%s%s", genesisHash, SyncID)
warpSyncProtocolId := fmt.Sprintf("/%s%s", genesisHash, WarpSyncID)

tests := []struct {
protocol protocol.ID
expect bool
}{
{
protocol: protocol.ID("/gossamer/test/0/sync/2"),
protocol: protocol.ID(fullSyncProtocolId),
expect: true,
},
{
protocol: protocol.ID(warpSyncProtocolId),
expect: true,
},
{
Expand Down
23 changes: 21 additions & 2 deletions dot/network/messages/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,18 @@ type StateRequest struct {
NoProof bool
}

func NewStateRequest(block common.Hash, start [][]byte, noProof bool) *StateRequest {
return &StateRequest{
Block: block,
Start: start,
NoProof: noProof,
}
}

func (s *StateRequest) String() string {
return fmt.Sprintf("StateRequest Block=%s Start=[0x%x, 0x%x] NoProof=%v",
return fmt.Sprintf("StateRequest Block=%s Start=[%v] NoProof=%v",
s.Block.String(),
s.Start[0], s.Start[1],
s.Start,
s.NoProof,
)
}
Expand Down Expand Up @@ -98,3 +106,14 @@ func (s *StateResponse) Decode(in []byte) error {

return nil
}

func (s *StateResponse) Encode() ([]byte, error) {
panic("not implemented")
}

func (s *StateResponse) String() string {
return fmt.Sprintf("StateResponse Entries=[%v] Proof=[%v]",
s.Entries,
s.Proof,
)
}
6 changes: 6 additions & 0 deletions dot/network/messages/warp_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ type WarpProofRequest struct {
Begin common.Hash
}

func NewWarpProofRequest(from common.Hash) *WarpProofRequest {
return &WarpProofRequest{
Begin: from,
}
}

// Decode decodes the message into a WarpProofRequest
func (wpr *WarpProofRequest) Decode(in []byte) error {
return scale.Unmarshal(in, wpr)
Expand Down
5 changes: 3 additions & 2 deletions dot/network/mock_warp_sync_provider_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
// we've completed the handshake with the peer, send message directly
logger.Tracef("sending message to peer %s using protocol %s: %s", peer, info.protocolID, msg)
if err := s.host.writeToStream(stream, msg); err != nil {
logger.Errorf("failed to send message to peer %s: %s", peer, err)
logger.Debugf("failed to send message to peer %s: %s", peer, err)

// the stream was closed or reset, close it on our end and delete it from our peer's data
if errors.Is(err, io.EOF) || errors.Is(err, network.ErrReset) {
Expand Down
13 changes: 13 additions & 0 deletions dot/network/request_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ type RequestResponseProtocol struct {
responseBuf []byte
}

func NewRequestResponseProtocol(ctx context.Context, host *host, protocolID protocol.ID,
requestTimeout time.Duration, maxResponseSize uint64) *RequestResponseProtocol {
return &RequestResponseProtocol{
ctx: ctx,
host: host,
requestTimeout: requestTimeout,
maxResponseSize: maxResponseSize,
protocolID: protocolID,
responseBuf: make([]byte, maxResponseSize),
responseBufMu: sync.Mutex{},
}
}

func (rrp *RequestResponseProtocol) Do(to peer.ID, req, res messages.P2PMessage) error {
rrp.host.p2pHost.ConnManager().Protect(to, "")
defer rrp.host.p2pHost.ConnManager().Unprotect(to, "")
Expand Down
27 changes: 13 additions & 14 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
// the following are sub-protocols used by the node
SyncID = "/sync/2"
WarpSyncID = "/sync/warp"
StateSyncID = "/state/2"
lightID = "/light/2"
blockAnnounceID = "/block-announces/1"
transactionsID = "/transactions/1"
Expand Down Expand Up @@ -259,11 +260,14 @@ func (s *Service) Start() error {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

genesisHashProtocolId := protocol.ID(s.cfg.BlockState.GenesisHash().String())
genesisHash := s.blockState.GenesisHash().String()
genesisHash = strings.TrimPrefix(genesisHash, "0x")
fullSyncProtocolId := fmt.Sprintf("/%s%s", genesisHash, SyncID)
warpSyncProtocolId := fmt.Sprintf("/%s%s", genesisHash, WarpSyncID)

s.host.registerStreamHandler(s.host.protocolID+SyncID, s.handleSyncStream)
s.host.registerStreamHandler(protocol.ID(fullSyncProtocolId), s.handleSyncStream)
s.host.registerStreamHandler(s.host.protocolID+lightID, s.handleLightStream)
s.host.registerStreamHandler(genesisHashProtocolId+WarpSyncID, s.handleWarpSyncStream)
s.host.registerStreamHandler(protocol.ID(warpSyncProtocolId), s.handleWarpSyncStream)

// register block announce protocol
err := s.RegisterNotificationsProtocol(
Expand Down Expand Up @@ -622,16 +626,11 @@ func (s *Service) SendMessage(to peer.ID, msg NotificationsMessage) error {
func (s *Service) GetRequestResponseProtocol(subprotocol string, requestTimeout time.Duration,
maxResponseSize uint64) *RequestResponseProtocol {

protocolID := s.host.protocolID + protocol.ID(subprotocol)
return &RequestResponseProtocol{
ctx: s.ctx,
host: s.host,
requestTimeout: requestTimeout,
maxResponseSize: maxResponseSize,
protocolID: protocolID,
responseBuf: make([]byte, maxResponseSize),
responseBufMu: sync.Mutex{},
}
genesisHash := s.blockState.GenesisHash().String()
genesisHash = strings.TrimPrefix(genesisHash, "0x")
protocolId := fmt.Sprintf("/%s%s", genesisHash, subprotocol)

return NewRequestResponseProtocol(s.ctx, s.host, protocol.ID(protocolId), requestTimeout, maxResponseSize)
}

// Health returns information about host needed for the rpc server
Expand Down Expand Up @@ -759,7 +758,7 @@ func (s *Service) processMessage(msg peerset.Message) {
err := s.host.connect(addrInfo)
if err != nil {
// TODO: if error happens here outgoing (?) slot is occupied but no peer is really connected
logger.Warnf("failed to open connection for peer %s: %s", peerID, err)
logger.Debugf("failed to open connection for peer %s: %s", peerID, err)
return
}
logger.Debugf("connection successful with peer %s", peerID)
Expand Down
11 changes: 2 additions & 9 deletions dot/network/warp_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,16 @@ import (
"fmt"

"github.com/ChainSafe/gossamer/dot/network/messages"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/internal/primitives/consensus/grandpa"
primitives "github.com/ChainSafe/gossamer/internal/primitives/consensus/grandpa"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/grandpa/warpsync"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)

const MaxAllowedSameRequestPerPeer = 5

type WarpSyncVerificationResult struct {
SetId grandpa.SetID
AuthorityList primitives.AuthorityList
Header types.Header
Completed bool
}

// WarpSyncProvider is an interface for generating warp sync proofs
type WarpSyncProvider interface {
// Generate proof starting at given block hash. The proof is accumulated until maximum proof
Expand All @@ -34,7 +27,7 @@ type WarpSyncProvider interface {
encodedProof []byte,
setId grandpa.SetID,
authorities primitives.AuthorityList,
) (*WarpSyncVerificationResult, error)
) (*warpsync.WarpSyncVerificationResult, error)
}

func (s *Service) handleWarpSyncRequest(req messages.WarpProofRequest) ([]byte, error) {
Expand Down
Loading
Loading