Skip to content

Commit

Permalink
Merge pull request #98 from ethpandaops/feat/mimicry-remove-dupe-cache
Browse files Browse the repository at this point in the history
feat(mimicry): remove dupe cache
  • Loading branch information
Savid authored Mar 9, 2023
2 parents 3f2e6d5 + 0c5e9b4 commit b73f580
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 64 deletions.
27 changes: 0 additions & 27 deletions pkg/mimicry/p2p/execution/cache/duplicate.go

This file was deleted.

25 changes: 3 additions & 22 deletions pkg/mimicry/p2p/execution/event_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package execution

import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/mitchellh/hashstructure/v2"
"github.com/savid/ttlcache/v3"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -19,22 +17,6 @@ import (
func (p *Peer) handleTransaction(ctx context.Context, eventTime time.Time, event *types.Transaction) (*xatu.DecoratedEvent, error) {
p.log.Debug("Transaction received")

hash, err := hashstructure.Hash(event.Hash().String(), hashstructure.FormatV2, nil)
if err != nil {
return nil, err
}

item, retrieved := p.duplicateCache.Transaction.GetOrSet(fmt.Sprint(hash), time.Now(), ttlcache.DefaultTTL)
if retrieved {
p.log.WithFields(logrus.Fields{
"hash": hash,
"time_since_first_item": time.Since(item.Value()),
"transaction_hash": event.Hash().String(),
}).Debug("Duplicate transaction event received")
// TODO(savid): add metrics
return nil, errors.New("duplicate transaction event received")
}

meta, err := p.createNewClientMeta(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -151,10 +133,9 @@ func (p *Peer) ExportTransactions(ctx context.Context, items []*TransactionHashI

if txs != nil {
for _, tx := range txs.PooledTransactionsPacket {
exists := p.sharedCache.Transaction.Get(tx.Hash().String())
if exists == nil {
p.sharedCache.Transaction.Set(tx.Hash().String(), true, ttlcache.DefaultTTL)

_, retrieved := p.sharedCache.Transaction.GetOrSet(tx.Hash().String(), true, ttlcache.DefaultTTL)
// transaction was just set in shared cache, so we need to handle it
if !retrieved {
seen := seenMap[tx.Hash()]
if seen.IsZero() {
p.log.WithField("hash", tx.Hash().String()).Error("Failed to find seen time for transaction")
Expand Down
20 changes: 5 additions & 15 deletions pkg/mimicry/p2p/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethpandaops/ethcore/pkg/execution/mimicry"
coordCache "github.com/ethpandaops/xatu/pkg/mimicry/coordinator/cache"
"github.com/ethpandaops/xatu/pkg/mimicry/p2p/execution/cache"
"github.com/ethpandaops/xatu/pkg/mimicry/p2p/handler"
"github.com/ethpandaops/xatu/pkg/networks"
"github.com/ethpandaops/xatu/pkg/processor"
Expand All @@ -35,8 +34,6 @@ type Peer struct {

client *mimicry.Client

// don't send duplicate events from the same client
duplicateCache *cache.DuplicateCache
// shared cache between clients
sharedCache *coordCache.SharedCache

Expand All @@ -63,15 +60,12 @@ func New(ctx context.Context, log logrus.FieldLogger, nodeRecord string, handler
return nil, err
}

duplicateCache := cache.NewDuplicateCache()

return &Peer{
log: log.WithField("node_record", nodeRecord),
nodeRecord: nodeRecord,
handlers: handlers,
client: client,
duplicateCache: duplicateCache,
sharedCache: sharedCache,
log: log.WithField("node_record", nodeRecord),
nodeRecord: nodeRecord,
handlers: handlers,
client: client,
sharedCache: sharedCache,
network: &networks.Network{
Name: networks.NetworkNameNone,
},
Expand Down Expand Up @@ -124,8 +118,6 @@ func (p *Peer) Start(ctx context.Context) (<-chan error, error) {
processor.WithMaxExportBatchSize(50000),
)

p.duplicateCache.Start()

p.client.OnHello(ctx, func(ctx context.Context, hello *mimicry.Hello) error {
// setup client implementation and version info
split := strings.SplitN(hello.Name, "/", 2)
Expand Down Expand Up @@ -333,8 +325,6 @@ func (p *Peer) processTransaction(ctx context.Context, now time.Time, hash commo
}

func (p *Peer) Stop(ctx context.Context) error {
p.duplicateCache.Stop()

if err := p.txProc.Shutdown(ctx); err != nil {
return err
}
Expand Down

0 comments on commit b73f580

Please sign in to comment.