Skip to content

Commit

Permalink
limit tx count of per peer
Browse files Browse the repository at this point in the history
  • Loading branch information
BananaLF committed Dec 11, 2023
1 parent 2933b9f commit 9d07cfa
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 6 deletions.
25 changes: 23 additions & 2 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,19 @@ type OecConfig struct {
commitGapOffset int64

maxSubscriptionClients int

maxTxLimitPerPeer uint64
}

func (c *OecConfig) SetMaxTxLimitPerPeer(maxTxLimitPerPeer int64) {
if maxTxLimitPerPeer < 0 {
return
}
c.maxTxLimitPerPeer = uint64(maxTxLimitPerPeer)
}

func (c *OecConfig) GetMaxTxLimitPerPeer() uint64 {
return c.maxTxLimitPerPeer
}

const (
Expand Down Expand Up @@ -161,8 +174,9 @@ const (
FlagEnableHasBlockPartMsg = "enable-blockpart-ack"
FlagDebugGcInterval = "debug.gc-interval"
FlagCommitGapOffset = "commit-gap-offset"

FlagMaxSubscriptionClients = "max-subscription-clients"
FlagEnableMempoolSimGuFactor = "enable-mem-sim-gu-factor"
FlagMaxSubscriptionClients = "max-subscription-clients"
FlagMaxTxLimitPerPeer = "mempool.max_tx_limit_per_peer"
)

var (
Expand Down Expand Up @@ -278,6 +292,7 @@ func (c *OecConfig) loadFromConfig() {
c.SetMempoolFlush(viper.GetBool(FlagMempoolFlush))
c.SetMempoolCheckTxCost(viper.GetBool(FlagMempoolCheckTxCost))
c.SetMaxTxNumPerBlock(viper.GetInt64(FlagMaxTxNumPerBlock))
c.SetMaxTxLimitPerPeer(int64(viper.GetUint64(FlagMaxTxLimitPerPeer)))
c.SetEnableDeleteMinGPTx(viper.GetBool(FlagMempoolEnableDeleteMinGPTx))
c.SetMaxGasUsedPerBlock(viper.GetInt64(FlagMaxGasUsedPerBlock))
c.SetEnablePGU(viper.GetBool(FlagEnablePGU))
Expand Down Expand Up @@ -463,6 +478,12 @@ func (c *OecConfig) updateFromKVStr(k, v string) {
return
}
c.SetMaxTxNumPerBlock(r)
case FlagMaxTxLimitPerPeer:
r, err := strconv.ParseInt(v, 10, 64)
if err != nil {
return
}
c.SetMaxTxLimitPerPeer(r)
case FlagMempoolEnableDeleteMinGPTx:
r, err := strconv.ParseBool(v)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion dev/testnet/testnet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ run() {
--enable-wtx=${WRAPPEDTX} \
--mempool.node_key_whitelist ${WHITE_LIST} \
--p2p.pex=false \
--mempool.max_tx_limit_per_peer=1 \
--p2p.addr_book_strict=false \
$p2p_seed_opt $p2p_seed_arg \
--p2p.laddr tcp://${IP}:${p2pport} \
Expand All @@ -158,7 +159,7 @@ run() {
--chain-id ${CHAIN_ID} \
--upload-delta=false \
--enable-gid \
--consensus.timeout_commit 3800ms \
--consensus.timeout_commit 10000ms \
--enable-blockpart-ack=false \
--append-pid=true \
${LOG_SERVER} \
Expand Down
5 changes: 5 additions & 0 deletions libs/tendermint/cmd/tendermint/commands/run_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ func AddNodeFlags(cmd *cobra.Command) {
config.Mempool.PendingRemoveEvent,
"Push event when remove a pending tx",
)
cmd.Flags().Uint64(
"mempool.max_tx_limit_per_peer",
config.Mempool.MaxTxLimitPerPeer,
"Max tx limit per peer",
)

cmd.Flags().String(
"mempool.node_key_whitelist",
Expand Down
10 changes: 7 additions & 3 deletions libs/tendermint/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ type MempoolConfig struct {
PendingPoolMaxTxPerAddress int `mapstructure:"pending_pool_max_tx_per_address"`
NodeKeyWhitelist []string `mapstructure:"node_key_whitelist"`
PendingRemoveEvent bool `mapstructure:"pending_remove_event"`
MaxTxLimitPerPeer uint64 `mapstructure:"max_tx_limit_per_peer"`
}

// DefaultMempoolConfig returns a default configuration for the Tendermint mempool
Expand All @@ -715,6 +716,7 @@ func DefaultMempoolConfig() *MempoolConfig {
PendingPoolMaxTxPerAddress: 100,
NodeKeyWhitelist: []string{},
PendingRemoveEvent: false,
MaxTxLimitPerPeer: 100,
}
}

Expand Down Expand Up @@ -953,12 +955,14 @@ func (cfg *ConsensusConfig) ValidateBasic() error {
return nil
}

//-----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// TxIndexConfig
// Remember that Event has the following structure:
// type: [
// key: value,
// ...
//
// key: value,
// ...
//
// ]
//
// CompositeKeys are constructed by `type.key`
Expand Down
5 changes: 5 additions & 0 deletions libs/tendermint/config/dynamic_config_okchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type IDynamicConfig interface {
GetDynamicGpMaxTxNum() int64
GetDynamicGpMaxGasUsed() int64
GetMaxSubscriptionClients() int
GetMaxTxLimitPerPeer() uint64
}

var DynamicConfig IDynamicConfig = MockDynamicConfig{}
Expand Down Expand Up @@ -198,3 +199,7 @@ func (d *MockDynamicConfig) SetMaxSubscriptionClients(value int) {
}
d.maxSubscriptionClients = value
}

func (c MockDynamicConfig) GetMaxTxLimitPerPeer() uint64 {
return DefaultMempoolConfig().MaxTxLimitPerPeer
}
25 changes: 25 additions & 0 deletions libs/tendermint/mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ type CListMempool struct {
rmPendingTxChan chan types.EventDataRmPendingTx

gpo *Oracle

peersTxCountMtx sync.RWMutex
peersTxCount map[string]uint64
}

func (mem *CListMempool) filterCMTx(tx abci.TxEssentials) bool {
Expand Down Expand Up @@ -166,6 +169,7 @@ func NewCListMempool(
simQueue: make(chan *mempoolTx, 100000),
gasCache: gasCache,
gpo: gpo,
peersTxCount: make(map[string]uint64, 0),
}

if config.PendingRemoveEvent {
Expand Down Expand Up @@ -305,6 +309,22 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
//
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error {
mem.peersTxCountMtx.Lock()
if len(txInfo.SenderP2PID) != 0 {
peerTxCount, ok := mem.peersTxCount[string(txInfo.SenderP2PID)]
if !ok {
peerTxCount = 0
}
if peerTxCount >= cfg.DynamicConfig.GetMaxTxLimitPerPeer() {
mem.peersTxCountMtx.Unlock()
mem.logger.Debug(fmt.Sprintf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer()))
return fmt.Errorf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer())
}
peerTxCount++
mem.peersTxCount[string(txInfo.SenderP2PID)] = peerTxCount
}
mem.peersTxCountMtx.Unlock()

timeStart := int64(0)
if cfg.DynamicConfig.GetMempoolCheckTxCost() {
timeStart = time.Now().UnixMicro()
Expand Down Expand Up @@ -1010,6 +1030,11 @@ func (mem *CListMempool) Update(
preCheck PreCheckFunc,
postCheck PostCheckFunc,
) error {
mem.peersTxCountMtx.Lock()
for key := range mem.peersTxCount {
delete(mem.peersTxCount, key)
}
mem.peersTxCountMtx.Unlock()
// no need to update when mempool is unavailable
if mem.config.Sealed {
return mem.updateSealed(height, txs, deliverTxResponses)
Expand Down

0 comments on commit 9d07cfa

Please sign in to comment.