Skip to content

Commit

Permalink
Merge pull request #48 from 0glabs/rate
Browse files Browse the repository at this point in the history
refactor ratelimit
  • Loading branch information
csdtowards authored Aug 14, 2024
2 parents 4a2cfc1 + 567b6dc commit 0640906
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 70 deletions.
36 changes: 28 additions & 8 deletions common/ratelimit/limiter_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,22 @@ import (
)

const (
BucketSizesFlagName = "bucket-sizes"
BucketMultipliersFlagName = "bucket-multipliers"
CountFailedFlagName = "count-failed"
BucketStoreSizeFlagName = "bucket-store-size"
AllowlistFlagName = "allowlist"
BucketSizesFlagName = "bucket-sizes"
BucketMultipliersFlagName = "bucket-multipliers"
CountFailedFlagName = "count-failed"
BucketStoreSizeFlagName = "bucket-store-size"
AllowlistFlagName = "allowlist"
MaxWriteRequestPerMinuteFlagName = "max-write-request-per-minute"
MaxReadRequestPerMinuteFlagName = "max-read-request-per-minute"
)

type Config struct {
common.GlobalRateParams
BucketStoreSize int
UniformRateParam common.RateParam
Allowlist []string
BucketStoreSize int
UniformRateParam common.RateParam
Allowlist []string
MaxWriteRequestPerMinute int
MaxReadRequestPerMinute int
}

func RatelimiterCLIFlags(envPrefix string, flagPrefix string) []cli.Flag {
Expand Down Expand Up @@ -61,6 +65,20 @@ func RatelimiterCLIFlags(envPrefix string, flagPrefix string) []cli.Flag {
Required: false,
Value: &cli.StringSlice{},
},
cli.IntFlag{
Name: common.PrefixFlag(flagPrefix, MaxWriteRequestPerMinuteFlagName),
Usage: "Max write request per minute",
Value: 60,
EnvVar: common.PrefixEnvVar(envPrefix, "MAX_WRITE_REQUEST"),
Required: false,
},
cli.IntFlag{
Name: common.PrefixFlag(flagPrefix, MaxReadRequestPerMinuteFlagName),
Usage: "Max read request per minute",
Value: 60,
EnvVar: common.PrefixEnvVar(envPrefix, "MAX_READ_REQUEST"),
Required: false,
},
}
}

Expand Down Expand Up @@ -107,6 +125,8 @@ func ReadCLIConfig(ctx *cli.Context, flagPrefix string) (Config, error) {
cfg.GlobalRateParams.CountFailed = ctx.Bool(common.PrefixFlag(flagPrefix, CountFailedFlagName))
cfg.BucketStoreSize = ctx.Int(common.PrefixFlag(flagPrefix, BucketStoreSizeFlagName))
cfg.Allowlist = ctx.StringSlice(common.PrefixFlag(flagPrefix, AllowlistFlagName))
cfg.MaxWriteRequestPerMinute = ctx.Int(common.PrefixFlag(flagPrefix, MaxWriteRequestPerMinuteFlagName))
cfg.MaxReadRequestPerMinute = ctx.Int(common.PrefixFlag(flagPrefix, MaxReadRequestPerMinuteFlagName))

err := validateConfig(cfg)
if err != nil {
Expand Down
42 changes: 28 additions & 14 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"fmt"
"net"
"strings"
"sync"
"time"

pb "github.com/0glabs/0g-da-client/api/grpc/disperser"
"github.com/0glabs/0g-da-client/common"
healthcheck "github.com/0glabs/0g-da-client/common/healthcheck"
"github.com/0glabs/0g-da-client/common/ratelimit"
"github.com/0glabs/0g-da-client/core"
"github.com/0glabs/0g-da-client/disperser"
"github.com/0glabs/0g-da-client/disperser/api/grpc/retriever"
Expand All @@ -32,8 +34,7 @@ type DispersalServer struct {

blobStore disperser.BlobStore

rateConfig RateConfig
ratelimiter common.RateLimiter
rateConfig RateConfig

metrics *disperser.Metrics

Expand All @@ -56,7 +57,8 @@ func NewDispersalServer(
store disperser.BlobStore,
logger common.Logger,
metrics *disperser.Metrics,
ratelimiter common.RateLimiter,
ratelimiterConfig ratelimit.Config,
enableRatelimiter bool,
rateConfig RateConfig,
metadataHashAsBlobKey bool,
kvStore *disperser.Store,
Expand All @@ -68,15 +70,14 @@ func NewDispersalServer(
blobStore: store,
metrics: metrics,
logger: logger,
ratelimiter: ratelimiter,
rateConfig: rateConfig,
mu: &sync.RWMutex{},
metadataHashAsBlobKey: metadataHashAsBlobKey,
kvStore: kvStore,
retrieverAddr: retrieverAddr,

writeRateLimiterManager: NewClientRateLimiterManager(60),
readRateLimiterManager: NewClientRateLimiterManager(60),
writeRateLimiterManager: NewClientRateLimiterManager(enableRatelimiter, ratelimiterConfig.MaxWriteRequestPerMinute, ratelimiterConfig.Allowlist),
readRateLimiterManager: NewClientRateLimiterManager(enableRatelimiter, ratelimiterConfig.MaxReadRequestPerMinute, ratelimiterConfig.Allowlist),
}
}

Expand Down Expand Up @@ -106,7 +107,7 @@ func (s *DispersalServer) DisperseBlob(ctx context.Context, req *pb.DisperseBlob
s.logger.Debug("[apiserver] received a new blob request", "origin", origin)

limiter := s.writeRateLimiterManager.GetRateLimiter(origin)
if !limiter.Allow() {
if limiter != nil && !limiter.Allow() {
s.logger.Debug("[apiserver] rate limit exceeded for disperse blob", "client", origin)
return nil, fmt.Errorf("request ratelimited")
}
Expand Down Expand Up @@ -230,7 +231,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
}

limiter := s.readRateLimiterManager.GetRateLimiter(origin)
if !limiter.Allow() {
if limiter != nil && !limiter.Allow() {
s.logger.Debug("[apiserver] rate limit exceeded for retrieve blob", "client", origin)
return nil, fmt.Errorf("request ratelimited")
}
Expand Down Expand Up @@ -368,21 +369,34 @@ func (rl *TinyRateLimiter) Allow() bool {
}

type ClientRateLimiterManager struct {
clients map[string]*TinyRateLimiter // Map of client ID to RateLimiter
maxRequests int
mu sync.Mutex // Mutex for thread safety
clients map[string]*TinyRateLimiter // Map of client ID to RateLimiter
maxRequests int
allowlist []string
EnableRatelimiter bool
mu sync.Mutex // Mutex for thread safety
}

func NewClientRateLimiterManager(maxRequests int) *ClientRateLimiterManager {
func NewClientRateLimiterManager(enableRatelimiter bool, maxRequests int, allowlist []string) *ClientRateLimiterManager {
return &ClientRateLimiterManager{
clients: make(map[string]*TinyRateLimiter),
maxRequests: maxRequests,
clients: make(map[string]*TinyRateLimiter),
maxRequests: maxRequests,
allowlist: allowlist,
EnableRatelimiter: enableRatelimiter,
}
}

func (m *ClientRateLimiterManager) GetRateLimiter(clientID string) *TinyRateLimiter {
m.mu.Lock()
defer m.mu.Unlock()
if !m.EnableRatelimiter {
return nil
}

for _, id := range m.allowlist {
if strings.Contains(clientID, id) {
return nil
}
}

limiter, ok := m.clients[clientID]
if !ok {
Expand Down
25 changes: 1 addition & 24 deletions disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@ import (
"log"
"os"

"github.com/0glabs/0g-da-client/common"
"github.com/0glabs/0g-da-client/disperser/apiserver"
"github.com/0glabs/0g-da-client/disperser/common/blobstore"

"github.com/0glabs/0g-da-client/common/aws/dynamodb"
"github.com/0glabs/0g-da-client/common/aws/s3"
"github.com/0glabs/0g-da-client/common/logging"
"github.com/0glabs/0g-da-client/common/ratelimit"
"github.com/0glabs/0g-da-client/common/store"
"github.com/0glabs/0g-da-client/disperser"
"github.com/0glabs/0g-da-client/disperser/cmd/apiserver/flags"
"github.com/urfave/cli"
Expand Down Expand Up @@ -56,7 +53,6 @@ func RunDisperserServer(ctx *cli.Context) error {
}

var blobStore disperser.BlobStore
var ratelimiter common.RateLimiter

s3Client, err := s3.NewClient(config.AwsClientConfig, logger)
if err != nil {
Expand All @@ -80,29 +76,10 @@ func RunDisperserServer(ctx *cli.Context) error {
return nil
}

if config.EnableRatelimiter {
globalParams := config.RatelimiterConfig.GlobalRateParams

var bucketStore common.KVStore[common.RateBucketParams]
if config.BucketTableName != "" {
dynamoClient, err := dynamodb.NewClient(config.AwsClientConfig, logger)
if err != nil {
return err
}
bucketStore = store.NewDynamoParamStore[common.RateBucketParams](dynamoClient, config.BucketTableName)
} else {
bucketStore, err = store.NewLocalParamStore[common.RateBucketParams](config.BucketStoreSize)
if err != nil {
return err
}
}
ratelimiter = ratelimit.NewRateLimiter(globalParams, bucketStore, config.RatelimiterConfig.Allowlist, logger)
}

// TODO: create a separate metrics for batcher
metrics := disperser.NewMetrics(config.MetricsConfig.HTTPPort, logger)

server := apiserver.NewDispersalServer(config.ServerConfig, blobStore, logger, metrics, ratelimiter, config.RateConfig, config.BlobstoreConfig.MetadataHashAsBlobKey, kvStore, config.RetrieverAddr)
server := apiserver.NewDispersalServer(config.ServerConfig, blobStore, logger, metrics, config.RatelimiterConfig, config.EnableRatelimiter, config.RateConfig, config.BlobstoreConfig.MetadataHashAsBlobKey, kvStore, config.RetrieverAddr)

// Enable Metrics Block
if config.MetricsConfig.EnableMetrics {
Expand Down
25 changes: 1 addition & 24 deletions disperser/cmd/combined_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"github.com/0glabs/0g-da-client/common/aws/s3"
"github.com/0glabs/0g-da-client/common/geth"
"github.com/0glabs/0g-da-client/common/logging"
"github.com/0glabs/0g-da-client/common/ratelimit"
"github.com/0glabs/0g-da-client/common/store"
"github.com/0glabs/0g-da-client/disperser"
"github.com/0glabs/0g-da-client/disperser/cmd/combined_server/flags"
"github.com/urfave/cli"
Expand Down Expand Up @@ -54,30 +52,9 @@ func main() {
}

func RunDisperserServer(config Config, blobStore disperser.BlobStore, logger common.Logger, kvStore *disperser.Store) error {
var ratelimiter common.RateLimiter
if config.EnableRatelimiter {
globalParams := config.RatelimiterConfig.GlobalRateParams

var bucketStore common.KVStore[common.RateBucketParams]
if config.BucketTableName != "" {
dynamoClient, err := dynamodb.NewClient(config.AwsClientConfig, logger)
if err != nil {
return err
}
bucketStore = store.NewDynamoParamStore[common.RateBucketParams](dynamoClient, config.BucketTableName)
} else {
var err error
bucketStore, err = store.NewLocalParamStore[common.RateBucketParams](config.BucketStoreSize)
if err != nil {
return err
}
}
ratelimiter = ratelimit.NewRateLimiter(globalParams, bucketStore, config.RatelimiterConfig.Allowlist, logger)
}

metrics := disperser.NewMetrics(config.MetricsConfig.HTTPPort, logger)

server := apiserver.NewDispersalServer(config.ServerConfig, blobStore, logger, metrics, ratelimiter, config.RateConfig, config.BlobstoreConfig.MetadataHashAsBlobKey, kvStore, config.RetrieverAddr)
server := apiserver.NewDispersalServer(config.ServerConfig, blobStore, logger, metrics, config.RatelimiterConfig, config.EnableRatelimiter, config.RateConfig, config.BlobstoreConfig.MetadataHashAsBlobKey, kvStore, config.RetrieverAddr)

// Enable Metrics Block
if config.MetricsConfig.EnableMetrics {
Expand Down

0 comments on commit 0640906

Please sign in to comment.