diff --git a/pkg/mimicry/p2p/execution/cache/duplicate.go b/pkg/mimicry/p2p/execution/cache/duplicate.go deleted file mode 100644 index a3980382..00000000 --- a/pkg/mimicry/p2p/execution/cache/duplicate.go +++ /dev/null @@ -1,27 +0,0 @@ -package cache - -import ( - "time" - - "github.com/savid/ttlcache/v3" -) - -type DuplicateCache struct { - Transaction *ttlcache.Cache[string, time.Time] -} - -func NewDuplicateCache() *DuplicateCache { - return &DuplicateCache{ - Transaction: ttlcache.New( - ttlcache.WithTTL[string, time.Time](30 * time.Minute), - ), - } -} - -func (d *DuplicateCache) Start() { - go d.Transaction.Start() -} - -func (d *DuplicateCache) Stop() { - d.Transaction.Stop() -} diff --git a/pkg/mimicry/p2p/execution/event_transaction.go b/pkg/mimicry/p2p/execution/event_transaction.go index 0326c2f1..2a3aa75e 100644 --- a/pkg/mimicry/p2p/execution/event_transaction.go +++ b/pkg/mimicry/p2p/execution/event_transaction.go @@ -2,7 +2,6 @@ package execution import ( "context" - "errors" "fmt" "strconv" "time" @@ -10,7 +9,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethpandaops/xatu/pkg/proto/xatu" - "github.com/mitchellh/hashstructure/v2" "github.com/savid/ttlcache/v3" "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/timestamppb" @@ -19,22 +17,6 @@ import ( func (p *Peer) handleTransaction(ctx context.Context, eventTime time.Time, event *types.Transaction) (*xatu.DecoratedEvent, error) { p.log.Debug("Transaction received") - hash, err := hashstructure.Hash(event.Hash().String(), hashstructure.FormatV2, nil) - if err != nil { - return nil, err - } - - item, retrieved := p.duplicateCache.Transaction.GetOrSet(fmt.Sprint(hash), time.Now(), ttlcache.DefaultTTL) - if retrieved { - p.log.WithFields(logrus.Fields{ - "hash": hash, - "time_since_first_item": time.Since(item.Value()), - "transaction_hash": event.Hash().String(), - }).Debug("Duplicate transaction event received") - // TODO(savid): add metrics - return nil, errors.New("duplicate transaction event received") - } - meta, err := p.createNewClientMeta(ctx) if err != nil { return nil, err @@ -151,10 +133,9 @@ func (p *Peer) ExportTransactions(ctx context.Context, items []*TransactionHashI if txs != nil { for _, tx := range txs.PooledTransactionsPacket { - exists := p.sharedCache.Transaction.Get(tx.Hash().String()) - if exists == nil { - p.sharedCache.Transaction.Set(tx.Hash().String(), true, ttlcache.DefaultTTL) - + _, retrieved := p.sharedCache.Transaction.GetOrSet(tx.Hash().String(), true, ttlcache.DefaultTTL) + // transaction was just set in shared cache, so we need to handle it + if !retrieved { seen := seenMap[tx.Hash()] if seen.IsZero() { p.log.WithField("hash", tx.Hash().String()).Error("Failed to find seen time for transaction") diff --git a/pkg/mimicry/p2p/execution/execution.go b/pkg/mimicry/p2p/execution/execution.go index 20b3fb12..1c52f48d 100644 --- a/pkg/mimicry/p2p/execution/execution.go +++ b/pkg/mimicry/p2p/execution/execution.go @@ -18,7 +18,6 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethpandaops/ethcore/pkg/execution/mimicry" coordCache "github.com/ethpandaops/xatu/pkg/mimicry/coordinator/cache" - "github.com/ethpandaops/xatu/pkg/mimicry/p2p/execution/cache" "github.com/ethpandaops/xatu/pkg/mimicry/p2p/handler" "github.com/ethpandaops/xatu/pkg/networks" "github.com/ethpandaops/xatu/pkg/processor" @@ -35,8 +34,6 @@ type Peer struct { client *mimicry.Client - // don't send duplicate events from the same client - duplicateCache *cache.DuplicateCache // shared cache between clients sharedCache *coordCache.SharedCache @@ -63,15 +60,12 @@ func New(ctx context.Context, log logrus.FieldLogger, nodeRecord string, handler return nil, err } - duplicateCache := cache.NewDuplicateCache() - return &Peer{ - log: log.WithField("node_record", nodeRecord), - nodeRecord: nodeRecord, - handlers: handlers, - client: client, - duplicateCache: duplicateCache, - sharedCache: sharedCache, + log: log.WithField("node_record", nodeRecord), + nodeRecord: nodeRecord, + handlers: handlers, + client: client, + sharedCache: sharedCache, network: &networks.Network{ Name: networks.NetworkNameNone, }, @@ -124,8 +118,6 @@ func (p *Peer) Start(ctx context.Context) (<-chan error, error) { processor.WithMaxExportBatchSize(50000), ) - p.duplicateCache.Start() - p.client.OnHello(ctx, func(ctx context.Context, hello *mimicry.Hello) error { // setup client implementation and version info split := strings.SplitN(hello.Name, "/", 2) @@ -333,8 +325,6 @@ func (p *Peer) processTransaction(ctx context.Context, now time.Time, hash commo } func (p *Peer) Stop(ctx context.Context) error { - p.duplicateCache.Stop() - if err := p.txProc.Shutdown(ctx); err != nil { return err }