Skip to content

Commit

Permalink
[DVT-819][DVT-903] Use p2p.Server and refactor (#93)
Browse files Browse the repository at this point in the history
* add key file flag

* fix shadowed error

* fix database nil

* add ip and port flags

* add server

* fetch latest block

* update message codes

* update params

* add nat

* remove ip flag

* fix ci

* move logic to protocol.go

* use any nat

* behold!

* remove port for p2p.Listen

* fix lint and docs

* fix lint

* update nodeset

* fix lint

* make gen-doc

* remove truncNow

* address comments

* keep track of head block and improve logging

* update docs

* update when the nodes file gets written

* update docs

* fix protocol msg codes

* add quick start flag

* update docs

* add trusted peers flag

* update docs

* set head block differently
  • Loading branch information
minhd-vu authored Aug 16, 2023
1 parent 9e13a01 commit cdf7659
Show file tree
Hide file tree
Showing 17 changed files with 1,054 additions and 814 deletions.
24 changes: 12 additions & 12 deletions cmd/p2p/crawl/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type (
NodesFile string
Database string
RevalidationInterval string

revalidationInterval time.Duration
}
)
Expand All @@ -36,7 +37,7 @@ var (
var CrawlCmd = &cobra.Command{
Use: "crawl [nodes file]",
Short: "Crawl a network on the devp2p layer and generate a nodes JSON file.",
Long: "If no nodes.json file exists, run `echo \"{}\" >> nodes.json` to get started.",
Long: "If no nodes.json file exists, it will be created.",
Args: cobra.MinimumNArgs(1),
PreRunE: func(cmd *cobra.Command, args []string) (err error) {
inputCrawlParams.NodesFile = args[0]
Expand All @@ -54,18 +55,17 @@ var CrawlCmd = &cobra.Command{
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
inputSet, err := p2p.LoadNodesJSON(inputCrawlParams.NodesFile)
nodes, err := p2p.ReadNodeSet(inputCrawlParams.NodesFile)
if err != nil {
return err
log.Warn().Err(err).Msgf("Creating nodes file %v because it does not exist", inputCrawlParams.NodesFile)
}

var cfg discover.Config
cfg.PrivateKey, _ = crypto.GenerateKey()
bn, err := p2p.ParseBootnodes(inputCrawlParams.Bootnodes)
cfg.Bootnodes, err = p2p.ParseBootnodes(inputCrawlParams.Bootnodes)
if err != nil {
return fmt.Errorf("unable to parse bootnodes: %w", err)
}
cfg.Bootnodes = bn

db, err := enode.OpenDB(inputCrawlParams.Database)
if err != nil {
Expand All @@ -84,13 +84,13 @@ var CrawlCmd = &cobra.Command{
}
defer disc.Close()

c := newCrawler(inputSet, disc, disc.RandomNodes())
c := newCrawler(nodes, disc, disc.RandomNodes())
c.revalidateInterval = inputCrawlParams.revalidationInterval

log.Info().Msg("Starting crawl")

output := c.run(inputCrawlParams.timeout, inputCrawlParams.Threads)
return p2p.WriteNodesJSON(inputCrawlParams.NodesFile, output)
return p2p.WriteNodeSet(inputCrawlParams.NodesFile, output)
},
}

Expand All @@ -101,9 +101,9 @@ required, so other nodes in the network can discover each other.`)
if err := CrawlCmd.MarkPersistentFlagRequired("bootnodes"); err != nil {
log.Error().Err(err).Msg("Failed to mark bootnodes as required persistent flag")
}
CrawlCmd.PersistentFlags().StringVarP(&inputCrawlParams.Timeout, "timeout", "t", "30m0s", "Time limit for the crawl.")
CrawlCmd.PersistentFlags().IntVarP(&inputCrawlParams.Threads, "parallel", "p", 16, "How many parallel discoveries to attempt.")
CrawlCmd.PersistentFlags().Uint64VarP(&inputCrawlParams.NetworkID, "network-id", "n", 0, "Filter discovered nodes by this network id.")
CrawlCmd.PersistentFlags().StringVarP(&inputCrawlParams.Database, "database", "d", "", "Node database for updating and storing client information.")
CrawlCmd.PersistentFlags().StringVarP(&inputCrawlParams.RevalidationInterval, "revalidation-interval", "r", "10m", "The amount of time it takes to retry connecting to a failed peer.")
CrawlCmd.PersistentFlags().StringVarP(&inputCrawlParams.Timeout, "timeout", "t", "30m0s", "Time limit for the crawl")
CrawlCmd.PersistentFlags().IntVarP(&inputCrawlParams.Threads, "parallel", "p", 16, "How many parallel discoveries to attempt")
CrawlCmd.PersistentFlags().Uint64VarP(&inputCrawlParams.NetworkID, "network-id", "n", 0, "Filter discovered nodes by this network id")
CrawlCmd.PersistentFlags().StringVarP(&inputCrawlParams.Database, "database", "d", "", "Node database for updating and storing client information")
CrawlCmd.PersistentFlags().StringVarP(&inputCrawlParams.RevalidationInterval, "revalidation-interval", "r", "10m", "Time before retrying to connect to a failed peer")
}
66 changes: 14 additions & 52 deletions cmd/p2p/crawl/crawl_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

type crawler struct {
input p2p.NodeSet
input []*enode.Node
output p2p.NodeSet
disc resolver
iters []enode.Iterator
Expand All @@ -37,21 +37,21 @@ type resolver interface {
RequestENR(*enode.Node) (*enode.Node, error)
}

func newCrawler(input p2p.NodeSet, disc resolver, iters ...enode.Iterator) *crawler {
func newCrawler(input []*enode.Node, disc resolver, iters ...enode.Iterator) *crawler {
c := &crawler{
input: input,
output: make(p2p.NodeSet, len(input)),
disc: disc,
iters: iters,
inputIter: enode.IterNodes(input.Nodes()),
inputIter: enode.IterNodes(input),
ch: make(chan *enode.Node),
closed: make(chan struct{}),
}
c.iters = append(c.iters, c.inputIter)
// Copy input to output initially. Any nodes that fail validation
// will be dropped from output during the run.
for id, n := range input {
c.output[id] = n
for _, n := range input {
c.output[n.ID()] = n.URLv4()
}
return c
}
Expand All @@ -74,10 +74,8 @@ func (c *crawler) run(timeout time.Duration, nthreads int) p2p.NodeSet {
}
var (
added uint64
updated uint64
skipped uint64
recent uint64
removed uint64
wg sync.WaitGroup
)
wg.Add(nthreads)
Expand All @@ -92,12 +90,8 @@ func (c *crawler) run(timeout time.Duration, nthreads int) p2p.NodeSet {
atomic.AddUint64(&skipped, 1)
case nodeSkipRecent:
atomic.AddUint64(&recent, 1)
case nodeRemoved:
atomic.AddUint64(&removed, 1)
case nodeAdded:
atomic.AddUint64(&added, 1)
default:
atomic.AddUint64(&updated, 1)
}
case <-c.closed:
return
Expand Down Expand Up @@ -125,9 +119,7 @@ loop:
case <-statusTicker.C:
log.Info().
Uint64("added", atomic.LoadUint64(&added)).
Uint64("updated", atomic.LoadUint64(&updated)).
Uint64("removed", atomic.LoadUint64(&removed)).
Uint64("ignored(recent)", atomic.LoadUint64(&removed)).
Uint64("ignored(recent)", atomic.LoadUint64(&recent)).
Uint64("ignored(incompatible)", atomic.LoadUint64(&skipped)).
Msg("Crawling in progress")
}
Expand Down Expand Up @@ -184,12 +176,10 @@ func shouldSkipNode(n *enode.Node) bool {
// what changed.
func (c *crawler) updateNode(n *enode.Node) int {
c.mu.RLock()
node, ok := c.output[n.ID()]
_, ok := c.output[n.ID()]
c.mu.RUnlock()

// Skip validation of recently-seen nodes.
if ok && time.Since(node.LastCheck) < c.revalidateInterval {
log.Debug().Str("id", n.ID().String()).Msg("Skipping node")
if ok {
return nodeSkipRecent
}

Expand All @@ -198,43 +188,15 @@ func (c *crawler) updateNode(n *enode.Node) int {
return nodeSkipIncompat
}

// Request the node record.
status := nodeUpdated
node.LastCheck = truncNow()

if nn, err := c.disc.RequestENR(n); err != nil {
if node.Score == 0 {
// Node doesn't implement EIP-868.
log.Debug().Str("id", n.ID().String()).Msg("Skipping node")
return nodeSkipIncompat
}
node.Score /= 2
} else {
node.N = nn
node.Seq = nn.Seq()
node.Score++
if node.FirstResponse.IsZero() {
node.FirstResponse = node.LastCheck
status = nodeAdded
}
node.LastResponse = node.LastCheck
nn, err := c.disc.RequestENR(n)
if err != nil {
return nodeSkipIncompat
}

// Store/update node in output set.
c.mu.Lock()
defer c.mu.Unlock()

if node.Score <= 0 {
log.Debug().Str("id", n.ID().String()).Msg("Removing node")
delete(c.output, n.ID())
return nodeRemoved
}

log.Debug().Str("id", n.ID().String()).Uint64("seq", n.Seq()).Int("score", node.Score).Msg("Updating node")
c.output[n.ID()] = node
return status
}
c.output[nn.ID()] = nn.URLv4()
c.mu.Unlock()

func truncNow() time.Time {
return time.Now().UTC().Truncate(1 * time.Second)
return nodeAdded
}
36 changes: 25 additions & 11 deletions cmd/p2p/ping/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,14 @@ var PingCmd = &cobra.Command{
Long: `Ping nodes by either giving a single enode/enr or an entire nodes file.
This command will establish a handshake and status exchange to get the Hello and
Status messages and output JSON. If providing a enode/enr rather than a node file,
then the connection will remain open by default (--listen=true), and you can see
other messages the peer sends (e.g. blocks, transactions, etc.).`,
Status messages and output JSON. If providing a enode/enr rather than a nodes
file, then the connection will remain open by default (--listen=true), and you
can see other messages the peer sends (e.g. blocks, transactions, etc.).`,
Args: cobra.MinimumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
nodes := []*enode.Node{}
if inputSet, err := p2p.LoadNodesJSON(args[0]); err == nil {
nodes = inputSet.Nodes()
inputPingParams.Listen = false
if input, err := p2p.ReadNodeSet(args[0]); err == nil {
nodes = input
} else if node, err := p2p.ParseNode(args[0]); err == nil {
nodes = append(nodes, node)
} else {
Expand All @@ -65,7 +64,17 @@ other messages the peer sends (e.g. blocks, transactions, etc.).`,
sem := make(chan bool, inputPingParams.Threads)

count := &p2p.MessageCount{}
go p2p.LogMessageCount(count, time.NewTicker(time.Second))
go func() {
ticker := time.NewTicker(2 * time.Second)
for {
<-ticker.C
c := count.Load()
if !c.IsEmpty() {
log.Info().Interface("counts", c).Send()
count.Clear()
}
}
}()

// Ping each node in the slice.
for _, n := range nodes {
Expand Down Expand Up @@ -98,14 +107,19 @@ other messages the peer sends (e.g. blocks, transactions, etc.).`,
errStr = err.Error()
} else if inputPingParams.Listen {
// If the dial and peering were successful, listen to the peer for messages.
if err := conn.ReadAndServe(nil, count); err != nil {
if err := conn.ReadAndServe(count); err != nil {
log.Error().Err(err).Msg("Received error")
}
}

// Save the results to the output map.
mutex.Lock()
output[node.ID()] = pingNodeJSON{node, hello, status, errStr}
output[node.ID()] = pingNodeJSON{
Record: node,
Hello: hello,
Status: status,
Error: errStr,
}
mutex.Unlock()
}(n)
}
Expand All @@ -128,8 +142,8 @@ other messages the peer sends (e.g. blocks, transactions, etc.).`,
}

func init() {
PingCmd.PersistentFlags().StringVarP(&inputPingParams.OutputFile, "output", "o", "", "Write ping results to output file. (default stdout)")
PingCmd.PersistentFlags().IntVarP(&inputPingParams.Threads, "parallel", "p", 16, "How many parallel pings to attempt.")
PingCmd.PersistentFlags().StringVarP(&inputPingParams.OutputFile, "output", "o", "", "Write ping results to output file (default stdout)")
PingCmd.PersistentFlags().IntVarP(&inputPingParams.Threads, "parallel", "p", 16, "How many parallel pings to attempt")
PingCmd.PersistentFlags().BoolVarP(&inputPingParams.Listen, "listen", "l", true,
`Keep the connection open and listen to the peer. This only works if the first
argument is an enode/enr, not a nodes file.`)
Expand Down
Loading

0 comments on commit cdf7659

Please sign in to comment.