Skip to content

Commit

Permalink
feat(BOUN-1333, BOUN-1334): ratelimit: add scaling, change allow rule…
Browse files Browse the repository at this point in the history
…, change shedding latencies (#3379)

* Adds optional autoscaling of rules to the ratelimiter (CLI arg
`--rate-limit-generic-autoscale`). If enabled it will divide the limit
by the number of API BNs in the registry and recalculate whenever the
number changes (when the new snapshot is published)
* Remove implicit allow rule for the ratelimit canister, instead just
bypass the ratelimiting for the requests that originate from localhost.
This allows to create ratelimit rules for the canister itself
* Change HTTP code from `429` to `403` when rejecting requests by the
rule that has `limit: block`
* Bump load shedding latencies in `ic-gateway` to fix the false
positives that we're getting occasionally

---------

Co-authored-by: IDX GitLab Automation <[email protected]>
  • Loading branch information
blind-oracle and IDX GitLab Automation authored Jan 10, 2025
1 parent 19e3c68 commit 8df1883
Show file tree
Hide file tree
Showing 9 changed files with 681 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ SHED_SYSTEM_CPU="0.95"
SHED_SYSTEM_MEMORY="0.95"
SHED_SHARDED_EWMA="0.6"
SHED_SHARDED_PASSTHROUGH="20000"
SHED_SHARDED_LATENCY="query:1s,call:1s,sync_call:13s,read_state:1s,read_state_subnet:1s,status:100ms,health:100ms,registrations:5s,http:5s"
SHED_SHARDED_LATENCY="query:2s,call:2s,sync_call:13s,read_state:2s,read_state_subnet:2s,status:100ms,health:100ms,registrations:5s,http:5s"
EOF

if [ ! -z "${DENYLIST_URL:-}" ]; then
Expand Down
1 change: 1 addition & 0 deletions rs/boundary_node/ic_boundary/src/check/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub fn generate_custom_registry_snapshot(
nns_public_key: vec![],
subnets,
nodes: nodes_hash,
api_bns: vec![],
}
}

Expand Down
7 changes: 7 additions & 0 deletions rs/boundary_node/ic_boundary/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,13 @@ pub struct RateLimiting {
/// Maximum number of shards that we store (per rule)
#[clap(env, long, default_value = "30000")]
pub rate_limit_generic_max_shards: u64,

/// Whether to use the number of API BNs from the registry to scale the rate limit rules.
/// E.g. if a ratelimit action is set to "500/1h" and the number of API BNs is 5 then the
/// rule would be adjusted to "100/1h" so that the total ratelimit of all API BNs would be "500/1h".
/// Important: if after the divison the numerator would be less than 1 then it would be rounded to 1.
#[clap(env, long)]
pub rate_limit_generic_autoscale: bool,
}

#[derive(Args)]
Expand Down
42 changes: 21 additions & 21 deletions rs/boundary_node/ic_boundary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use ic_types::{crypto::threshold_sig::ThresholdSigPublicKey, messages::MessageId
use nix::unistd::{getpgid, setpgid, Pid};
use prometheus::Registry;
use rand::rngs::OsRng;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tower::{limit::ConcurrencyLimitLayer, util::MapResponseLayer, ServiceBuilder};
use tower_http::{compression::CompressionLayer, request_id::MakeRequestUuid, ServiceBuilderExt};
Expand Down Expand Up @@ -200,6 +201,9 @@ pub async fn main(cli: Cli) -> Result<(), Error> {
// Setup registry-related stuff
let persister = Persister::new(Arc::clone(&routing_table));

// Snapshot update notification channels
let (channel_snapshot_send, channel_snapshot_recv) = tokio::sync::watch::channel(None);

// Registry Client
let (registry_client, registry_replicator, nns_pub_key) =
if let Some(v) = &cli.registry.registry_local_store_path {
Expand All @@ -221,6 +225,8 @@ pub async fn main(cli: Cli) -> Result<(), Error> {
WithMetricsPersist(persister, MetricParamsPersist::new(&metrics_registry)),
http_client_check,
&metrics_registry,
channel_snapshot_send,
channel_snapshot_recv.clone(),
&mut runners,
)?;

Expand Down Expand Up @@ -285,28 +291,25 @@ pub async fn main(cli: Cli) -> Result<(), Error> {
let generic_limiter_opts = generic::Options {
tti: cli.rate_limiting.rate_limit_generic_tti,
max_shards: cli.rate_limiting.rate_limit_generic_max_shards,
poll_interval: cli.rate_limiting.rate_limit_generic_poll_interval,
autoscale: cli.rate_limiting.rate_limit_generic_autoscale,
};
let generic_limiter = if let Some(v) = &cli.rate_limiting.rate_limit_generic_file {
Some(Arc::new(generic::GenericLimiter::new_from_file(
v.clone(),
generic_limiter_opts,
channel_snapshot_recv,
)))
} else if let Some(v) = cli.rate_limiting.rate_limit_generic_canister_id {
Some(Arc::new(generic::GenericLimiter::new_from_canister(
v,
agent.clone().unwrap(),
generic_limiter_opts,
cli.misc.crypto_config.is_some(),
channel_snapshot_recv,
)))
} else {
cli.rate_limiting.rate_limit_generic_canister_id.map(|x| {
Arc::new(if cli.misc.crypto_config.is_some() {
generic::GenericLimiter::new_from_canister_update(
x,
agent.clone().unwrap(),
generic_limiter_opts,
)
} else {
generic::GenericLimiter::new_from_canister_query(
x,
agent.clone().unwrap(),
generic_limiter_opts,
)
})
})
None
};

// HTTP Logs Anonymization
Expand Down Expand Up @@ -438,11 +441,7 @@ pub async fn main(cli: Cli) -> Result<(), Error> {
runners.push(Box::new(metrics_runner));

if let Some(v) = generic_limiter {
let runner = Box::new(WithThrottle(
v,
ThrottleParams::new(cli.rate_limiting.rate_limit_generic_poll_interval),
));
runners.push(runner);
runners.push(Box::new(v));
}

// HTTP Logs Anonymization
Expand Down Expand Up @@ -608,10 +607,11 @@ fn setup_registry(
persister: WithMetricsPersist<Persister>,
http_client_check: Arc<dyn http::Client>,
metrics_registry: &Registry,
channel_snapshot_send: watch::Sender<Option<Arc<RegistrySnapshot>>>,
channel_snapshot_recv: watch::Receiver<Option<Arc<RegistrySnapshot>>>,
runners: &mut Vec<Box<dyn Run>>,
) -> Result<(Option<RegistryReplicator>, Option<ThresholdSigPublicKey>), Error> {
// Snapshots
let (channel_snapshot_send, channel_snapshot_recv) = tokio::sync::watch::channel(None);
let snapshot_runner = WithMetricsSnapshot(
{
let mut snapshotter = Snapshotter::new(
Expand Down
29 changes: 7 additions & 22 deletions rs/boundary_node/ic_boundary/src/rate_limiting/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl FetchesConfig for CanisterConfigFetcherUpdate {
}
}

pub struct CanisterFetcher(pub Arc<dyn FetchesConfig>, pub CanisterId);
pub struct CanisterFetcher(pub Arc<dyn FetchesConfig>);

#[async_trait]
impl FetchesRules for CanisterFetcher {
Expand Down Expand Up @@ -115,14 +115,7 @@ impl FetchesRules for CanisterFetcher {
));
}

// Create an explicit allow rule that excludes the ratelimit canister
// from being affected by any of the following rules.
let mut allowlist = vec![RateLimitRule {
canister_id: Some(self.1.get().0),
..Default::default()
}];

let mut rules = response
let rules = response
.config
.rules
.into_iter()
Expand All @@ -142,14 +135,13 @@ impl FetchesRules for CanisterFetcher {
})
.collect::<Result<Vec<_>, _>>()?;

allowlist.append(&mut rules);
Ok(allowlist)
Ok(rules)
}
}

#[cfg(test)]
mod test {
use std::{str::FromStr, time::Duration};
use std::time::Duration;

use candid::Encode;
use indoc::indoc;
Expand Down Expand Up @@ -256,28 +248,21 @@ mod test {

#[tokio::test]
async fn test_canister_fetcher() {
let canister_id = CanisterId::from_str("pawub-syaaa-aaaam-qb7zq-cai").unwrap();

// Check bad schema
let canister_fetcher = CanisterFetcher(Arc::new(FakeConfigFetcherBadSchema), canister_id);
let canister_fetcher = CanisterFetcher(Arc::new(FakeConfigFetcherBadSchema));
assert!(canister_fetcher.fetch_rules().await.is_err());

// Check missing rule
let canister_fetcher = CanisterFetcher(Arc::new(FakeConfigFetcherNoneRule), canister_id);
let canister_fetcher = CanisterFetcher(Arc::new(FakeConfigFetcherNoneRule));
assert!(canister_fetcher.fetch_rules().await.is_err());

// Check correct rules parsing
let canister_fetcher = CanisterFetcher(Arc::new(FakeConfigFetcherOk), canister_id);
let canister_fetcher = CanisterFetcher(Arc::new(FakeConfigFetcherOk));
let rules = canister_fetcher.fetch_rules().await.unwrap();

assert_eq!(
rules,
vec![
// Make sure there's an explicit allow rule
RateLimitRule {
canister_id: Some(canister_id.get().0),
..Default::default()
},
RateLimitRule {
canister_id: Some(principal!("aaaaa-aa")),
subnet_id: Some(principal!(
Expand Down
Loading

0 comments on commit 8df1883

Please sign in to comment.