From 9d07cfaa675cd3d94f7d5f6a608788ee13da2d70 Mon Sep 17 00:00:00 2001 From: BananaLF <864685021@qq.com> Date: Sun, 10 Dec 2023 23:05:52 +0800 Subject: [PATCH] limit tx count of per peer --- app/config/config.go | 25 +++++++++++++++++-- dev/testnet/testnet.sh | 3 ++- .../cmd/tendermint/commands/run_node.go | 5 ++++ libs/tendermint/config/config.go | 10 +++++--- .../config/dynamic_config_okchain.go | 5 ++++ libs/tendermint/mempool/clist_mempool.go | 25 +++++++++++++++++++ 6 files changed, 67 insertions(+), 6 deletions(-) diff --git a/app/config/config.go b/app/config/config.go index 3bdeceb9cb..be3d0abd3c 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -124,6 +124,19 @@ type OecConfig struct { commitGapOffset int64 maxSubscriptionClients int + + maxTxLimitPerPeer uint64 +} + +func (c *OecConfig) SetMaxTxLimitPerPeer(maxTxLimitPerPeer int64) { + if maxTxLimitPerPeer < 0 { + return + } + c.maxTxLimitPerPeer = uint64(maxTxLimitPerPeer) +} + +func (c *OecConfig) GetMaxTxLimitPerPeer() uint64 { + return c.maxTxLimitPerPeer } const ( @@ -161,8 +174,9 @@ const ( FlagEnableHasBlockPartMsg = "enable-blockpart-ack" FlagDebugGcInterval = "debug.gc-interval" FlagCommitGapOffset = "commit-gap-offset" - - FlagMaxSubscriptionClients = "max-subscription-clients" + FlagEnableMempoolSimGuFactor = "enable-mem-sim-gu-factor" + FlagMaxSubscriptionClients = "max-subscription-clients" + FlagMaxTxLimitPerPeer = "mempool.max_tx_limit_per_peer" ) var ( @@ -278,6 +292,7 @@ func (c *OecConfig) loadFromConfig() { c.SetMempoolFlush(viper.GetBool(FlagMempoolFlush)) c.SetMempoolCheckTxCost(viper.GetBool(FlagMempoolCheckTxCost)) c.SetMaxTxNumPerBlock(viper.GetInt64(FlagMaxTxNumPerBlock)) + c.SetMaxTxLimitPerPeer(int64(viper.GetUint64(FlagMaxTxLimitPerPeer))) c.SetEnableDeleteMinGPTx(viper.GetBool(FlagMempoolEnableDeleteMinGPTx)) c.SetMaxGasUsedPerBlock(viper.GetInt64(FlagMaxGasUsedPerBlock)) c.SetEnablePGU(viper.GetBool(FlagEnablePGU)) @@ -463,6 +478,12 @@ func (c *OecConfig) updateFromKVStr(k, v string) { return } c.SetMaxTxNumPerBlock(r) + case FlagMaxTxLimitPerPeer: + r, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return + } + c.SetMaxTxLimitPerPeer(r) case FlagMempoolEnableDeleteMinGPTx: r, err := strconv.ParseBool(v) if err != nil { diff --git a/dev/testnet/testnet.sh b/dev/testnet/testnet.sh index d049d9494d..2c28b69c43 100755 --- a/dev/testnet/testnet.sh +++ b/dev/testnet/testnet.sh @@ -150,6 +150,7 @@ run() { --enable-wtx=${WRAPPEDTX} \ --mempool.node_key_whitelist ${WHITE_LIST} \ --p2p.pex=false \ + --mempool.max_tx_limit_per_peer=1 \ --p2p.addr_book_strict=false \ $p2p_seed_opt $p2p_seed_arg \ --p2p.laddr tcp://${IP}:${p2pport} \ @@ -158,7 +159,7 @@ run() { --chain-id ${CHAIN_ID} \ --upload-delta=false \ --enable-gid \ - --consensus.timeout_commit 3800ms \ + --consensus.timeout_commit 10000ms \ --enable-blockpart-ack=false \ --append-pid=true \ ${LOG_SERVER} \ diff --git a/libs/tendermint/cmd/tendermint/commands/run_node.go b/libs/tendermint/cmd/tendermint/commands/run_node.go index 0a41d913cf..999f150c72 100644 --- a/libs/tendermint/cmd/tendermint/commands/run_node.go +++ b/libs/tendermint/cmd/tendermint/commands/run_node.go @@ -176,6 +176,11 @@ func AddNodeFlags(cmd *cobra.Command) { config.Mempool.PendingRemoveEvent, "Push event when remove a pending tx", ) + cmd.Flags().Uint64( + "mempool.max_tx_limit_per_peer", + config.Mempool.MaxTxLimitPerPeer, + "Max tx limit per peer", + ) cmd.Flags().String( "mempool.node_key_whitelist", diff --git a/libs/tendermint/config/config.go b/libs/tendermint/config/config.go index ae3f587780..b5559335b3 100644 --- a/libs/tendermint/config/config.go +++ b/libs/tendermint/config/config.go @@ -689,6 +689,7 @@ type MempoolConfig struct { PendingPoolMaxTxPerAddress int `mapstructure:"pending_pool_max_tx_per_address"` NodeKeyWhitelist []string `mapstructure:"node_key_whitelist"` PendingRemoveEvent bool `mapstructure:"pending_remove_event"` + MaxTxLimitPerPeer uint64 `mapstructure:"max_tx_limit_per_peer"` } // DefaultMempoolConfig returns a default configuration for the Tendermint mempool @@ -715,6 +716,7 @@ func DefaultMempoolConfig() *MempoolConfig { PendingPoolMaxTxPerAddress: 100, NodeKeyWhitelist: []string{}, PendingRemoveEvent: false, + MaxTxLimitPerPeer: 100, } } @@ -953,12 +955,14 @@ func (cfg *ConsensusConfig) ValidateBasic() error { return nil } -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- // TxIndexConfig // Remember that Event has the following structure: // type: [ -// key: value, -// ... +// +// key: value, +// ... +// // ] // // CompositeKeys are constructed by `type.key` diff --git a/libs/tendermint/config/dynamic_config_okchain.go b/libs/tendermint/config/dynamic_config_okchain.go index fd4a1b836e..58687743bb 100644 --- a/libs/tendermint/config/dynamic_config_okchain.go +++ b/libs/tendermint/config/dynamic_config_okchain.go @@ -34,6 +34,7 @@ type IDynamicConfig interface { GetDynamicGpMaxTxNum() int64 GetDynamicGpMaxGasUsed() int64 GetMaxSubscriptionClients() int + GetMaxTxLimitPerPeer() uint64 } var DynamicConfig IDynamicConfig = MockDynamicConfig{} @@ -198,3 +199,7 @@ func (d *MockDynamicConfig) SetMaxSubscriptionClients(value int) { } d.maxSubscriptionClients = value } + +func (c MockDynamicConfig) GetMaxTxLimitPerPeer() uint64 { + return DefaultMempoolConfig().MaxTxLimitPerPeer +} diff --git a/libs/tendermint/mempool/clist_mempool.go b/libs/tendermint/mempool/clist_mempool.go index c7e6cff30e..f5359771f1 100644 --- a/libs/tendermint/mempool/clist_mempool.go +++ b/libs/tendermint/mempool/clist_mempool.go @@ -108,6 +108,9 @@ type CListMempool struct { rmPendingTxChan chan types.EventDataRmPendingTx gpo *Oracle + + peersTxCountMtx sync.RWMutex + peersTxCount map[string]uint64 } func (mem *CListMempool) filterCMTx(tx abci.TxEssentials) bool { @@ -166,6 +169,7 @@ func NewCListMempool( simQueue: make(chan *mempoolTx, 100000), gasCache: gasCache, gpo: gpo, + peersTxCount: make(map[string]uint64, 0), } if config.PendingRemoveEvent { @@ -305,6 +309,22 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} { // // Safe for concurrent use by multiple goroutines. func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error { + mem.peersTxCountMtx.Lock() + if len(txInfo.SenderP2PID) != 0 { + peerTxCount, ok := mem.peersTxCount[string(txInfo.SenderP2PID)] + if !ok { + peerTxCount = 0 + } + if peerTxCount >= cfg.DynamicConfig.GetMaxTxLimitPerPeer() { + mem.peersTxCountMtx.Unlock() + mem.logger.Debug(fmt.Sprintf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer())) + return fmt.Errorf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer()) + } + peerTxCount++ + mem.peersTxCount[string(txInfo.SenderP2PID)] = peerTxCount + } + mem.peersTxCountMtx.Unlock() + timeStart := int64(0) if cfg.DynamicConfig.GetMempoolCheckTxCost() { timeStart = time.Now().UnixMicro() @@ -1010,6 +1030,11 @@ func (mem *CListMempool) Update( preCheck PreCheckFunc, postCheck PostCheckFunc, ) error { + mem.peersTxCountMtx.Lock() + for key := range mem.peersTxCount { + delete(mem.peersTxCount, key) + } + mem.peersTxCountMtx.Unlock() // no need to update when mempool is unavailable if mem.config.Sealed { return mem.updateSealed(height, txs, deliverTxResponses)