diff --git a/accounts/abi/bind/util_test.go b/accounts/abi/bind/util_test.go index 592465f2ac..87917d43fa 100644 --- a/accounts/abi/bind/util_test.go +++ b/accounts/abi/bind/util_test.go @@ -83,6 +83,7 @@ func TestWaitDeployed(t *testing.T) { // Send and mine the transaction. backend.Client().SendTransaction(ctx, tx) + time.Sleep(500 * time.Millisecond) //wait for the tx to be mined backend.Commit() select { @@ -117,6 +118,7 @@ func TestWaitDeployedCornerCases(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() backend.Client().SendTransaction(ctx, tx) + time.Sleep(500 * time.Millisecond) //wait for the tx to be mined backend.Commit() notContractCreation := errors.New("tx is not contract creation") if _, err := bind.WaitDeployed(ctx, backend.Client(), tx); err.Error() != notContractCreation.Error() { @@ -135,5 +137,6 @@ func TestWaitDeployedCornerCases(t *testing.T) { }() backend.Client().SendTransaction(ctx, tx) + time.Sleep(500 * time.Millisecond) //wait for the tx to be mined cancel() } diff --git a/core/txpool/legacypool/cache_for_miner.go b/core/txpool/legacypool/cache_for_miner.go index 892d797394..2ce44cd247 100644 --- a/core/txpool/legacypool/cache_for_miner.go +++ b/core/txpool/legacypool/cache_for_miner.go @@ -5,21 +5,26 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/metrics" + "github.com/holiman/uint256" ) var ( - pendingCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/pending/cache", nil) - localCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/local/cache", nil) + pendingCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/pending/cache", nil) + localCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/local/cache", nil) + _ pendingCache = (*cacheForMiner)(nil) ) type pendingCache interface { add(types.Transactions, types.Signer) del(types.Transactions, types.Signer) - dump() map[common.Address]types.Transactions + dump(filtered bool) map[common.Address][]*txpool.LazyTransaction markLocal(common.Address) flattenLocals() []common.Address + IsLocal(common.Address) bool + sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) } // copy of pending transactions @@ -28,12 +33,18 @@ type cacheForMiner struct { pending map[common.Address]map[*types.Transaction]struct{} locals map[common.Address]bool addrLock sync.Mutex + + allCache map[common.Address][]*txpool.LazyTransaction + filteredCache map[common.Address][]*txpool.LazyTransaction + cacheLock sync.Mutex } func newCacheForMiner() *cacheForMiner { return &cacheForMiner{ - pending: make(map[common.Address]map[*types.Transaction]struct{}), - locals: make(map[common.Address]bool), + pending: make(map[common.Address]map[*types.Transaction]struct{}), + locals: make(map[common.Address]bool), + allCache: make(map[common.Address][]*txpool.LazyTransaction), + filteredCache: make(map[common.Address][]*txpool.LazyTransaction), } } @@ -75,8 +86,9 @@ func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) { } } -func (pc *cacheForMiner) dump() map[common.Address]types.Transactions { +func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) { pending := make(map[common.Address]types.Transactions) + pc.txLock.Lock() for addr, txlist := range pc.pending { pending[addr] = make(types.Transactions, 0, len(txlist)) @@ -85,10 +97,46 @@ func (pc *cacheForMiner) dump() map[common.Address]types.Transactions { } } pc.txLock.Unlock() - for _, txs := range pending { + + // convert pending to lazyTransactions + filteredLazy := make(map[common.Address][]*txpool.LazyTransaction) + allLazy := make(map[common.Address][]*txpool.LazyTransaction) + for addr, txs := range pending { // sorted by nonce sort.Sort(types.TxByNonce(txs)) + filterd := filter(txs, addr) + if len(txs) > 0 { + lazies := make([]*txpool.LazyTransaction, len(txs)) + for i, tx := range txs { + lazies[i] = &txpool.LazyTransaction{ + Pool: pool, + Hash: tx.Hash(), + Tx: tx, + Time: tx.Time(), + GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()), + GasTipCap: uint256.MustFromBig(tx.GasTipCap()), + Gas: tx.Gas(), + BlobGas: tx.BlobGas(), + } + } + allLazy[addr] = lazies + filteredLazy[addr] = lazies[:len(filterd)] + } + } + + pc.cacheLock.Lock() + pc.filteredCache = filteredLazy + pc.allCache = allLazy + pc.cacheLock.Unlock() +} + +func (pc *cacheForMiner) dump(filtered bool) map[common.Address][]*txpool.LazyTransaction { + pc.cacheLock.Lock() + pending := pc.allCache + if filtered { + pending = pc.filteredCache } + pc.cacheLock.Unlock() return pending } @@ -99,7 +147,7 @@ func (pc *cacheForMiner) markLocal(addr common.Address) { pc.locals[addr] = true } -func (pc *cacheForMiner) isLocal(addr common.Address) bool { +func (pc *cacheForMiner) IsLocal(addr common.Address) bool { pc.addrLock.Lock() defer pc.addrLock.Unlock() return pc.locals[addr] diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 700853643f..2bfd08758e 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -309,7 +309,8 @@ type LegacyPool struct { pendingCounter int queueCounter int - pendingCache pendingCache //pending list cache for miner + pendingCache pendingCache //pending list cache for miner + pendingFilter func(txs types.Transactions, addr common.Address) types.Transactions reqResetCh chan *txpoolResetRequest reqPromoteCh chan *accountSet @@ -395,6 +396,10 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A // Set the basic pool parameters pool.gasTip.Store(uint256.NewInt(gasTip)) + // set dumper + pool.pendingFilter = pool.createFilter(pool.gasTip.Load().ToBig(), head.BaseFee) + pool.pendingCache.sync2cache(pool, pool.pendingFilter) + // Initialize the state with head block, or fallback to empty one in // case the head state is not available (might occur when node is not // fully synced). @@ -429,9 +434,42 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A } pool.wg.Add(1) go pool.loop() + go pool.loopOfSync() return nil } +func (pool *LegacyPool) loopOfSync() { + ticker := time.NewTicker(400 * time.Millisecond) + for { + select { + case <-pool.reorgShutdownCh: + return + case <-ticker.C: + gasTip := pool.gasTip.Load() + currHead := pool.currentHead.Load() + if gasTip == nil || currHead == nil { + continue + } + pool.pendingFilter = pool.createFilter(gasTip.ToBig(), currHead.BaseFee) + pool.pendingCache.sync2cache(pool, pool.pendingFilter) + } + } +} + +func (pool *LegacyPool) createFilter(gasPrice, baseFee *big.Int) func(txs types.Transactions, addr common.Address) types.Transactions { + return func(txs types.Transactions, addr common.Address) types.Transactions { + if !pool.pendingCache.IsLocal(addr) { + for i, tx := range txs { + if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 { + txs = txs[:i] + break + } + } + } + return txs + } +} + // loop is the transaction pool's main event loop, waiting for and reacting to // outside blockchain events as well as for various reporting and transaction // eviction events. @@ -676,95 +714,57 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction, func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { // If only blob transactions are requested, this pool is unsuitable as it // contains none, don't even bother. - if filter.OnlyBlobTxs { - return nil + empty := txpool.PendingFilter{} + if filter == empty { + // return all pending transactions, no filtering + return pool.filterOutStaled(pool.pendingCache.dump(false), pool.staledTransactions()) } - defer func(t0 time.Time) { - getPendingDurationTimer.Update(time.Since(t0)) - }(time.Now()) // If only blob transactions are requested, this pool is unsuitable as it // contains none, don't even bother. if filter.OnlyBlobTxs { return nil } + defer func(t0 time.Time) { + getPendingDurationTimer.Update(time.Since(t0)) + }(time.Now()) + // It is a bit tricky here, we don't do the filtering here. + return pool.filterOutStaled(pool.pendingCache.dump(true), pool.staledTransactions()) +} - // Convert the new uint256.Int types to the old big.Int ones used by the legacy pool - var ( - minTipBig *big.Int - baseFeeBig *big.Int - - blockNumber uint64 = 0 - blockHash common.Hash = common.Hash{} - nonceTooLowCount = 0 - currBlockDuration time.Duration = 0 - currHeaderDuration time.Duration = 0 - txHashesDuration time.Duration = 0 - staled = make(map[common.Hash]struct{}) - ) - defer func() { - log.Info("perf-trace Pending() nonce too low", "blockNumber", blockNumber, "blockHash", blockHash, "nonceTooLowCount", nonceTooLowCount, "staled", len(staled), "currHeaderDuration", currHeaderDuration, "currBlockDuration", currBlockDuration, "txHashesDuration", txHashesDuration) - }() - if filter.MinTip != nil { - minTipBig = filter.MinTip.ToBig() +func (pool *LegacyPool) filterOutStaled(lazy map[common.Address][]*txpool.LazyTransaction, staled map[common.Hash]struct{}) map[common.Address][]*txpool.LazyTransaction { + if len(staled) == 0 { + return lazy } - if filter.BaseFee != nil { - baseFeeBig = filter.BaseFee.ToBig() + for addr, txs := range lazy { + var nonceTooLow int = -1 + for i, tx := range txs { + if _, ok := staled[tx.Tx.Hash()]; ok { + // staled transaction + nonceTooLow = i + continue + } + // only lower-nonce transaction would be a potential staled transaction, so it means no staled transaction left any more now + break + } + if nonceTooLow != -1 { + // filter out the staled transactions + lazy[addr] = txs[nonceTooLow+1:] + } } - pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) - t0 := time.Now() + return lazy +} + +func (pool *LegacyPool) staledTransactions() map[common.Hash]struct{} { + var staled map[common.Hash]struct{} if currHeader := pool.chain.CurrentBlock(); currHeader != nil { - currHeaderDuration = time.Since(t0) - blockNumber = currHeader.Number.Uint64() - blockHash = currHeader.Hash() + blockHash := currHeader.Hash() currBlock := pool.chain.GetBlock(blockHash, currHeader.Number.Uint64()) - currBlockDuration = time.Since(t0) - currHeaderDuration staled = make(map[common.Hash]struct{}, len(currBlock.Transactions())) for _, tx := range currBlock.Transactions() { staled[tx.Hash()] = struct{}{} } - txHashesDuration = time.Since(t0) - currBlockDuration - currHeaderDuration } - for addr, txs := range pool.pendingCache.dump() { - // remove nonce too low transactions - if len(staled) > 0 { - noncetoolow := -1 - for i, tx := range txs { - if _, hit := staled[tx.Hash()]; !hit { - break - } - noncetoolow = i - } - nonceTooLowCount += noncetoolow + 1 - txs = txs[noncetoolow+1:] - } - - // If the miner requests tip enforcement, cap the lists now - if minTipBig != nil && !pool.locals.contains(addr) { - for i, tx := range txs { - if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 { - txs = txs[:i] - break - } - } - } - if len(txs) > 0 { - lazies := make([]*txpool.LazyTransaction, len(txs)) - for i := 0; i < len(txs); i++ { - lazies[i] = &txpool.LazyTransaction{ - Pool: pool, - Hash: txs[i].Hash(), - Tx: txs[i], - Time: txs[i].Time(), - GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()), - GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()), - Gas: txs[i].Gas(), - BlobGas: txs[i].BlobGas(), - } - } - pending[addr] = lazies - } - } - return pending + return staled } // Locals retrieves the accounts currently considered local by the pool. @@ -1610,6 +1610,11 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, pool.priced.SetBaseFee(pendingBaseFee) } } + gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee + go func() { + pool.pendingFilter = pool.createFilter(gasTip.ToBig(), baseFee) + pool.pendingCache.sync2cache(pool, pool.pendingFilter) + }() // Update all accounts to the latest known pending nonce nonces := make(map[common.Address]uint64, len(pool.pending)) for addr, list := range pool.pending { diff --git a/core/txpool/legacypool/nonecache_for_miner.go b/core/txpool/legacypool/nonecache_for_miner.go index dd5ed21836..9ebc3f7b59 100644 --- a/core/txpool/legacypool/nonecache_for_miner.go +++ b/core/txpool/legacypool/nonecache_for_miner.go @@ -1,8 +1,12 @@ package legacypool import ( + "sort" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" + "github.com/holiman/uint256" ) var ( @@ -25,21 +29,55 @@ func (nc *noneCacheForMiner) del(txs types.Transactions, signer types.Signer) { // do nothing } -func (nc *noneCacheForMiner) dump() map[common.Address]types.Transactions { +func (nc *noneCacheForMiner) dump(filtered bool) map[common.Address][]*txpool.LazyTransaction { // dump all pending transactions from the pool nc.pool.mu.RLock() - defer nc.pool.mu.RUnlock() pending := make(map[common.Address]types.Transactions) for addr, txlist := range nc.pool.pending { pending[addr] = txlist.Flatten() } - return pending + nc.pool.mu.RUnlock() + filteredLazy := make(map[common.Address][]*txpool.LazyTransaction) + allLazy := make(map[common.Address][]*txpool.LazyTransaction) + for addr, txs := range pending { + // sorted by nonce + sort.Sort(types.TxByNonce(txs)) + filterd := nc.pool.pendingFilter(txs, addr) + if len(txs) > 0 { + lazies := make([]*txpool.LazyTransaction, len(txs)) + for i, tx := range txs { + lazies[i] = &txpool.LazyTransaction{ + Pool: nc.pool, + Hash: tx.Hash(), + Tx: tx, + Time: tx.Time(), + GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()), + GasTipCap: uint256.MustFromBig(tx.GasTipCap()), + Gas: tx.Gas(), + BlobGas: tx.BlobGas(), + } + } + allLazy[addr] = lazies + filteredLazy[addr] = lazies[:len(filterd)] + } + } + if filtered { + return filteredLazy + } else { + return allLazy + } } func (nc *noneCacheForMiner) markLocal(addr common.Address) { // do nothing } +func (nc *noneCacheForMiner) IsLocal(addr common.Address) bool { + nc.pool.mu.RLock() + defer nc.pool.mu.RUnlock() + return nc.pool.locals.contains(addr) +} + func (nc *noneCacheForMiner) flattenLocals() []common.Address { // return a copy of pool.locals nc.pool.mu.RLock() @@ -50,3 +88,7 @@ func (nc *noneCacheForMiner) flattenLocals() []common.Address { } return locals } + +func (nc *noneCacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) { + // do nothing +} diff --git a/ethclient/simulated/backend.go b/ethclient/simulated/backend.go index 1df0a73150..5c137d4079 100644 --- a/ethclient/simulated/backend.go +++ b/ethclient/simulated/backend.go @@ -178,6 +178,8 @@ func (n *Backend) Close() error { // Commit seals a block and moves the chain forward to a new empty block. func (n *Backend) Commit() common.Hash { + // wait for the transactions to be sync into cache + time.Sleep(350 * time.Millisecond) return n.beacon.Commit() } diff --git a/ethclient/simulated/backend_test.go b/ethclient/simulated/backend_test.go index a8fd7913c3..9307e2105a 100644 --- a/ethclient/simulated/backend_test.go +++ b/ethclient/simulated/backend_test.go @@ -214,6 +214,7 @@ func TestForkResendTx(t *testing.T) { t.Fatalf("could not create transaction: %v", err) } client.SendTransaction(ctx, tx) + time.Sleep(1 * time.Second) sim.Commit() // 3.