Skip to content

Commit

Permalink
Merge pull request #72 from ethpandaops/feat/cleanup-mimicry
Browse files Browse the repository at this point in the history
feat(mimicry): cleanup transactions event
  • Loading branch information
Savid authored Feb 21, 2023
2 parents b505b63 + 3d5d217 commit 5164214
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
7 changes: 6 additions & 1 deletion pkg/mimicry/p2p/execution/event_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (p *Peer) handleTransaction(ctx context.Context, eventTime time.Time, event
now = now.Add(time.Duration(meta.ClockDrift) * time.Millisecond)
}

tx, err := event.MarshalBinary()
if err != nil {
return nil, err
}

decoratedEvent := &xatu.DecoratedEvent{
Event: &xatu.Event{
Name: xatu.Event_MEMPOOL_TRANSACTION,
Expand All @@ -54,7 +59,7 @@ func (p *Peer) handleTransaction(ctx context.Context, eventTime time.Time, event
Client: meta,
},
Data: &xatu.DecoratedEvent_MempoolTransaction{
MempoolTransaction: string(event.Data()),
MempoolTransaction: fmt.Sprintf("0x%x", tx),
},
}

Expand Down
43 changes: 43 additions & 0 deletions pkg/mimicry/p2p/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math/big"
"strings"
"sync"
"time"

"github.com/savid/ttlcache/v3"
Expand Down Expand Up @@ -51,6 +52,9 @@ type Peer struct {
version string
forkID *xatu.ForkID
capabilities *[]p2p.Cap

mu *sync.Mutex
ignoreBefore *time.Time
}

func New(ctx context.Context, log logrus.FieldLogger, nodeRecord string, handlers *handler.Peer, sharedCache *coordCache.SharedCache) (*Peer, error) {
Expand All @@ -71,6 +75,7 @@ func New(ctx context.Context, log logrus.FieldLogger, nodeRecord string, handler
network: &networks.Network{
Name: networks.NetworkNameNone,
},
mu: &sync.Mutex{},
}, nil
}

Expand Down Expand Up @@ -193,10 +198,18 @@ func (p *Peer) Start(ctx context.Context) (<-chan error, error) {
"fork_id_next": fmt.Sprintf("%d", status.ForkID.Next),
}).Debug("got client status")

// set the ignore before time to 15 seconds in the future
ignoreBefore := time.Now().Add(15 * time.Second)
p.ignoreBefore = &ignoreBefore

return nil
})

p.client.OnNewPooledTransactionHashes(ctx, func(ctx context.Context, hashes *mimicry.NewPooledTransactionHashes) error {
if !p.shouldGetTransactions() {
return nil
}

now := time.Now()
if p.handlers.DecoratedEvent != nil && hashes != nil {
for _, hash := range *hashes {
Expand All @@ -210,6 +223,10 @@ func (p *Peer) Start(ctx context.Context) (<-chan error, error) {
})

p.client.OnNewPooledTransactionHashes68(ctx, func(ctx context.Context, hashes *mimicry.NewPooledTransactionHashes68) error {
if !p.shouldGetTransactions() {
return nil
}

now := time.Now()
if p.handlers.DecoratedEvent != nil && hashes != nil {
// TODO: handle eth68+ transaction size/types as well
Expand All @@ -224,6 +241,10 @@ func (p *Peer) Start(ctx context.Context) (<-chan error, error) {
})

p.client.OnTransactions(ctx, func(ctx context.Context, txs *mimicry.Transactions) error {
if !p.shouldGetTransactions() {
return nil
}

if p.handlers.DecoratedEvent != nil && txs != nil {
now := time.Now()
for _, tx := range *txs {
Expand Down Expand Up @@ -275,6 +296,28 @@ func (p *Peer) Start(ctx context.Context) (<-chan error, error) {
return response, nil
}

// typically when first connecting to a peer, a dump of their transaction pool is sent.
// not looking to get stale/old transactions, so we can just ignore the first batch.
func (p *Peer) shouldGetTransactions() bool {
p.mu.Lock()
defer p.mu.Unlock()

if p.ignoreBefore == nil {
// no ignore before time set, so we should get transactions
return true
}

if time.Now().Before(*p.ignoreBefore) {
// ignore time set and is still in the future, so we should not get transactions
return false
}

// set ignore time to nil, so we should get transactions
p.ignoreBefore = nil

return true
}

func (p *Peer) processTransaction(ctx context.Context, now time.Time, hash common.Hash) error {
// check if transaction is already in the shared cache, no need to fetch it again
exists := p.sharedCache.Transaction.Get(hash.String())
Expand Down

0 comments on commit 5164214

Please sign in to comment.