Skip to content

Commit

Permalink
add througput metrics for txpool
Browse files Browse the repository at this point in the history
add unpromoted metric
  • Loading branch information
andyzhang2023 committed Nov 25, 2024
1 parent ec9393c commit 65b4c13
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 6 deletions.
73 changes: 67 additions & 6 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ var (
// latency of accessing state objects
accountSnapReadsTimer = metrics.NewRegisteredTimer("txpool/account/snap/readtime", nil)
accountTrieReadsTimer = metrics.NewRegisteredTimer("txpool/account/trie/readtime", nil)
//throughput metrics
thAddTimer = metrics.NewRegisteredTimer("txpool/throughput/add", nil)
thQueue2PendingTimer = metrics.NewRegisteredTimer("txpool/throughput/queue/to/pending", nil)
thPending2P2PTimer = metrics.NewRegisteredTimer("txpool/throughput/pending/to/p2p", nil)
thDemoteTimer = metrics.NewRegisteredTimer("txpool/throughput/demote", nil)
)

// BlockChain defines the minimal set of methods needed to back a tx pool with
Expand Down Expand Up @@ -253,6 +258,16 @@ func (config *Config) sanitize() Config {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type LegacyPool struct {
througPuts struct {
add throughput
queueToPending throughput
pendingToP2P throughput
demote throughput
}
metrics struct {
unpromoted int64
}

config Config
chainconfig *params.ChainConfig
chain BlockChain
Expand Down Expand Up @@ -330,6 +345,12 @@ func New(config Config, chain BlockChain) *LegacyPool {
if (!config.NoLocals || config.JournalRemote) && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)
}
// reset metrics
pool.througPuts.add.lastReset = time.Now()
pool.througPuts.queueToPending.lastReset = time.Now()
pool.througPuts.pendingToP2P.lastReset = time.Now()
pool.througPuts.demote.lastReset = time.Now()

return pool
}

Expand Down Expand Up @@ -1172,6 +1193,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
addTimer.Update(time.Since(start) / time.Duration(len(txs)))
validateBasicTimer.Update(durationValidate / time.Duration(len(txs)))
requestPromoteTimer.Update(durationPromote / time.Duration(len(txs)))
pool.througPuts.add.mark(time.Since(start), len(txs))
}
}()
// Do not treat as local if local transactions have been disabled
Expand Down Expand Up @@ -1464,14 +1486,46 @@ func (pool *LegacyPool) scheduleReorgLoop() {
}
}

func (pool *LegacyPool) resetMetrics() {
atomic.StoreInt64(&pool.metrics.unpromoted, 0)
}

// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*sortedMap) {
var promoted []*types.Transaction
var unpromoted int
var demoted int
defer func(t0 time.Time) {
runReorgCost := time.Since(t0)
pool.througPuts.queueToPending.mark(runReorgCost, len(promoted))
pool.througPuts.demote.mark(runReorgCost, demoted)
atomic.AddInt64(&pool.metrics.unpromoted, int64(unpromoted))
reorgDurationTimer.Update(time.Since(t0))
if reset != nil {
reorgresetTimer.UpdateSince(t0)
if reset.newHead != nil {
log.Info("Transaction pool reorged", "from", reset.oldHead.Number.Uint64(), "to", reset.newHead.Number.Uint64())
// calculate the avg duration of each throughput
now := time.Now()
addTimer, addDur, addCost, addCount, addTps := pool.througPuts.add.avgAndRest(now)
queue2pendingTimer, q2pDur, q2pCost, q2pCount, q2pTps := pool.througPuts.queueToPending.avgAndRest(now)
pending2P2PTimer, p2ppDur, p2ppCost, p2ppCount, p2ppTps := pool.througPuts.pendingToP2P.avgAndRest(now)
demoteTimer, demDur, demCost, demCount, demTps := pool.througPuts.demote.avgAndRest(now)
// reset metrics
pool.resetMetrics()
unpromoted := atomic.LoadInt64(&pool.metrics.unpromoted)

thAddTimer.Update(addTimer)
thQueue2PendingTimer.Update(queue2pendingTimer)
thPending2P2PTimer.Update(pending2P2PTimer)
thDemoteTimer.Update(demoteTimer)

log.Info("Transaction pool reorged", "from", reset.oldHead.Number.Uint64(), "to", reset.newHead.Number.Uint64(),
"addTimer", addTimer, "addDur", addDur, "addCost", addCost, "addCount", addCount, "addTps", addTps,
"queue2pendingTimer", queue2pendingTimer, "queue2pandingDur", q2pDur, "queue2pendingCost", q2pCost, "queue2pendingCount", q2pCount, "queue2pendingTps", q2pTps,
"pending2P2PTimer", pending2P2PTimer, "pending2P2PDur", p2ppDur, "pending2P2PCost", p2ppCost, "pending2P2PCount", p2ppCount, "pending2P2PTps", p2ppTps,
"demoteTimer", demoteTimer, "demoteDur", demDur, "demoteCost", demCost, "demoteCount", demCount, "demoteTps", demTps,
"unpromoted", unpromoted, "promoted", q2pCount, "demoted", demCount,
)
}
}
}(time.Now())
Expand Down Expand Up @@ -1510,15 +1564,15 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
}
// Check for pending transactions for every account that sent new ones
t0 = time.Now()
promoted := pool.promoteExecutables(promoteAddrs)
promoted, unpromoted = pool.promoteExecutables(promoteAddrs)
promoteTimer.UpdateSince(t0)

// If a new block appeared, validate the pool of pending transactions. This will
// remove any transaction that has been included in the block or was invalidated
// because of another transaction (e.g. higher gas price).
t0 = time.Now()
if reset != nil {
pool.demoteUnexecutables(demoteAddrs)
demoted = pool.demoteUnexecutables(demoteAddrs)
demoteTimer.UpdateSince(t0)
var pendingBaseFee = pool.priced.urgent.baseFee
if reset.newHead != nil {
Expand Down Expand Up @@ -1561,7 +1615,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
for _, set := range events {
txs = append(txs, set.Flatten()...)
}
tfeed := time.Now()
pool.txFeed.Send(core.NewTxsEvent{Txs: txs})
pool.througPuts.pendingToP2P.mark(time.Since(tfeed), len(txs))
}
}

Expand Down Expand Up @@ -1726,9 +1782,10 @@ func (pool *LegacyPool) reduceBalanceByL1Cost(list *list, balance *uint256.Int)
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
func (pool *LegacyPool) promoteExecutables(accounts []common.Address) ([]*types.Transaction, int) {
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction
var unpromoted int

// Iterate over all accounts and promote any executable transactions
gasLimit := txpool.EffectiveGasLimit(pool.chainconfig, pool.currentHead.Load().GasLimit, pool.config.EffectiveGasCeil)
Expand All @@ -1737,6 +1794,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
if list == nil {
continue // Just in case someone calls with a non existing account
}
all := list.Len()
// Drop all transactions that are deemed too old (low nonce)
forwards := list.Forward(pool.currentState.GetNonce(addr))
for _, tx := range forwards {
Expand Down Expand Up @@ -1781,6 +1839,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
all -= len(drops) + len(drops) + len(caps)
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
Expand All @@ -1792,8 +1851,9 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
pool.reserve(addr, false)
}
}
unpromoted += all
}
return promoted
return promoted, unpromoted
}

// truncatePending removes transactions from the pending queue if the pool is above the
Expand Down Expand Up @@ -1939,7 +1999,7 @@ func (pool *LegacyPool) truncateQueue() {
// Note: transactions are not marked as removed in the priced list because re-heaping
// is always explicitly triggered by SetBaseFee and it would be unnecessary and wasteful
// to trigger a re-heap is this function
func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) int {
if demoteAddrs == nil {
demoteAddrs = make([]common.Address, 0, len(pool.pending))
for addr := range pool.pending {
Expand Down Expand Up @@ -2015,6 +2075,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
removed += len(dropPendingCache)
}
pool.priced.Removed(removed)
return removed
}

// addressByHeartbeat is an account address tagged with its last activity timestamp.
Expand Down
46 changes: 46 additions & 0 deletions core/txpool/legacypool/legacypool_throughput.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package legacypool

import (
"sync/atomic"
"time"
)

// throughput is a struct that holds the throughput metrics of the transaction pool.
// it is used at all key points to measure the throughput of the whole path where a transaction comes into the pool.
type throughput struct {
lastReset time.Time
cost int64 // cost in nanoseconds, need to be handled with atomic, so let's use int64
counter int64
}

func (t *throughput) mark(duration time.Duration, count int) {
atomic.AddInt64(&t.cost, int64(duration))
atomic.AddInt64(&t.counter, int64(count))
}

// avgAndReset returns the average nanoseconds of a transaction that it takes to go through the path.
// it's not accurate, but it's good enough to give a rough idea of the throughput.
// metrics data will be reset after this call.
func (t *throughput) avgAndRest(now time.Time) (avgCost time.Duration, duration time.Duration, totalCost time.Duration, count int64, tps int64) {
totalCostI64 := atomic.LoadInt64(&t.cost)
if t.lastReset.IsZero() {
duration = time.Duration(0)
} else {
duration = now.Sub(t.lastReset)
}
count = atomic.LoadInt64(&t.counter)
totalCost = time.Duration(totalCostI64)

atomic.StoreInt64(&t.cost, 0)
atomic.StoreInt64(&t.counter, 0)
t.lastReset = now
if count == 0 {
avgCost = 0
} else {
avgCost = time.Duration(totalCostI64 / count)
}

tpsF64 := float64(time.Second) / float64(totalCostI64) * float64(count)
tps = int64(tpsF64)
return
}
47 changes: 47 additions & 0 deletions core/txpool/legacypool/legacypool_throughput_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package legacypool

import (
"testing"
"time"
)

func TestThroughputAvg(t *testing.T) {
tp := throughput{
lastReset: time.Now(),
cost: 0,
counter: 0,
}
tp.mark(200*time.Millisecond, 4)
tp.mark(50*time.Millisecond, 1)
avg, _, total, count, tps := tp.avgAndRest(tp.lastReset.Add(500 * time.Millisecond))
if avg != 50*time.Millisecond {
t.Errorf("expected avg to be 50ms, got %v", avg)
}
if total != 250*time.Millisecond {
t.Errorf("expected total to be 250ms, got %v", total)
}
if count != 5 {
t.Errorf("expected count to be 5, got %v", count)
}
if tps != 20 {
t.Errorf("expected tps to be 20, got %v", tps)
}

tp = throughput{}
tp.lastReset = time.Now()
tp.mark(200*time.Millisecond, 0)
avg, _, total, count, tps = tp.avgAndRest(tp.lastReset.Add(500 * time.Millisecond))
if avg != 0 {
t.Errorf("expected avg to be 0, got %v", avg)
}
if total != 200*time.Millisecond {
t.Errorf("expected total to be 200ms, got %v", total)
}
if count != 0 {
t.Errorf("expected count to be 0, got %v", count)
}
if tps != 0 {
t.Errorf("expected tps to be 0, got %v", tps)
}

}

0 comments on commit 65b4c13

Please sign in to comment.