diff --git a/include/ylt/coro_io/rate_limiter.hpp b/include/ylt/coro_io/rate_limiter.hpp index 319129ba1..832c984e4 100644 --- a/include/ylt/coro_io/rate_limiter.hpp +++ b/include/ylt/coro_io/rate_limiter.hpp @@ -22,15 +22,16 @@ #include #include #include -#include #include +#include namespace coro_io { class rate_limiter { public: async_simple::coro::Lazy acquire(int permits) { co_await this->lock_.coLock(); - std::chrono::milliseconds wait_mills = reserve_and_get_wait_length(permits, current_time_mills()); + std::chrono::milliseconds wait_mills = + reserve_and_get_wait_length(permits, current_time_mills()); this->lock_.unlock(); co_await coro_io::sleep_for(std::chrono::milliseconds(wait_mills)); co_return wait_mills; @@ -42,18 +43,25 @@ class rate_limiter { } protected: - virtual void do_set_rate(double permitsPerSecond, std::chrono::steady_clock::time_point now_micros) = 0; - virtual std::chrono::steady_clock::time_point reserve_earliest_available(int permits, std::chrono::steady_clock::time_point now_micros) = 0; + virtual void do_set_rate( + double permitsPerSecond, + std::chrono::steady_clock::time_point now_micros) = 0; + virtual std::chrono::steady_clock::time_point reserve_earliest_available( + int permits, std::chrono::steady_clock::time_point now_micros) = 0; std::chrono::steady_clock::time_point current_time_mills() { return std::chrono::steady_clock::now(); - //return std::chrono::duration_cast( - // now.time_since_epoch()); + // return std::chrono::duration_cast( + // now.time_since_epoch()); } private: - std::chrono::milliseconds reserve_and_get_wait_length(int permits, std::chrono::steady_clock::time_point now_micros) { - std::chrono::steady_clock::time_point moment_available = reserve_earliest_available(permits, now_micros); - std::chrono::milliseconds diff_mills = std::chrono::duration_cast(moment_available - now_micros); + std::chrono::milliseconds reserve_and_get_wait_length( + int permits, std::chrono::steady_clock::time_point now_micros) { + std::chrono::steady_clock::time_point moment_available = + reserve_earliest_available(permits, now_micros); + std::chrono::milliseconds diff_mills = + std::chrono::duration_cast(moment_available - + now_micros); return std::max(diff_mills, std::chrono::milliseconds(0)); } @@ -63,37 +71,49 @@ class rate_limiter { class abstract_smooth_rate_limiter : public rate_limiter { protected: virtual void do_set_rate(double permits_per_second, - double stable_internal_micros) = 0; + double stable_internal_micros) = 0; virtual long stored_permits_to_wait_time(double stored_permits, - double permits_to_take) = 0; + double permits_to_take) = 0; virtual double cool_down_internal_micros() = 0; void resync(std::chrono::steady_clock::time_point now_micros) { // if next_free_ticket is in the past, resync to now - ELOG_DEBUG << "now micros: " << std::chrono::duration_cast(now_micros.time_since_epoch()).count() << ", next_free_ticket_micros_: " - << std::chrono::duration_cast(this->next_free_ticket_micros_.time_since_epoch()).count(); + ELOG_DEBUG << "now micros: " + << std::chrono::duration_cast( + now_micros.time_since_epoch()) + .count() + << ", next_free_ticket_micros_: " + << std::chrono::duration_cast( + this->next_free_ticket_micros_.time_since_epoch()) + .count(); if (now_micros > this->next_free_ticket_micros_) { - std::chrono::milliseconds diff_mills = std::chrono::duration_cast(now_micros - this->next_free_ticket_micros_); + std::chrono::milliseconds diff_mills = + std::chrono::duration_cast( + now_micros - this->next_free_ticket_micros_); double newPermits = diff_mills.count() / cool_down_internal_micros(); this->stored_permits_ = std::min(this->max_permits_, this->stored_permits_ + newPermits); this->next_free_ticket_micros_ = now_micros; } } - void do_set_rate(double permits_per_second, std::chrono::steady_clock::time_point now_micros) override { + void do_set_rate(double permits_per_second, + std::chrono::steady_clock::time_point now_micros) override { resync(now_micros); double stable_internal_micros = 1000 / permits_per_second; this->stable_internal_micros_ = stable_internal_micros; do_set_rate(permits_per_second, stable_internal_micros); } - std::chrono::steady_clock::time_point reserve_earliest_available(int required_permits, std::chrono::steady_clock::time_point now_micros) { + std::chrono::steady_clock::time_point reserve_earliest_available( + int required_permits, std::chrono::steady_clock::time_point now_micros) { resync(now_micros); - std::chrono::steady_clock::time_point return_value = this->next_free_ticket_micros_; + std::chrono::steady_clock::time_point return_value = + this->next_free_ticket_micros_; double stored_permits_to_spend = std::min((double)required_permits, this->stored_permits_); double fresh_permits = required_permits - stored_permits_to_spend; - std::chrono::milliseconds wait_micros = std::chrono::milliseconds(stored_permits_to_wait_time(this->stored_permits_, - stored_permits_to_spend) + - (long)(fresh_permits * this->stable_internal_micros_)); + std::chrono::milliseconds wait_micros = std::chrono::milliseconds( + stored_permits_to_wait_time(this->stored_permits_, + stored_permits_to_spend) + + (long)(fresh_permits * this->stable_internal_micros_)); this->next_free_ticket_micros_ += wait_micros; this->stored_permits_ -= stored_permits_to_spend; return return_value; @@ -139,7 +159,8 @@ class smooth_bursty_rate_limiter : public abstract_smooth_rate_limiter { << ", stored_permits_:" << this->stored_permits_; } - long stored_permits_to_wait_time(double stored_permits, double permits_to_take) { + long stored_permits_to_wait_time(double stored_permits, + double permits_to_take) { return 0L; } @@ -147,8 +168,8 @@ class smooth_bursty_rate_limiter : public abstract_smooth_rate_limiter { private: /** - * The work(permits) of how many seconds can be saved up if the rate_limiter is - * unused. + * The work(permits) of how many seconds can be saved up if the rate_limiter + * is unused. */ double max_burst_seconds_ = 0; }; diff --git a/src/coro_io/tests/test_rate_limiter.cpp b/src/coro_io/tests/test_rate_limiter.cpp index 2842014fb..613b4ecfc 100644 --- a/src/coro_io/tests/test_rate_limiter.cpp +++ b/src/coro_io/tests/test_rate_limiter.cpp @@ -14,7 +14,8 @@ long current_time_mills() { TEST_CASE("test smooth_bursty_rate_limiter simple") { coro_io::smooth_bursty_rate_limiter rate_limiter(1); - std::chrono::milliseconds wait_time = async_simple::coro::syncAwait(rate_limiter.acquire(1)); + std::chrono::milliseconds wait_time = + async_simple::coro::syncAwait(rate_limiter.acquire(1)); CHECK_EQ(0, wait_time.count()); } @@ -45,7 +46,8 @@ TEST_CASE("test smooth_bursty_rate_limiter single thread") { long start_mills = current_time_mills(); coro_io::smooth_bursty_rate_limiter rate_limiter(permits_per_second); for (int i = 0; i < permits_to_acquire; i++) { - std::chrono::milliseconds wait_mills = async_simple::coro::syncAwait(rate_limiter.acquire(1)); + std::chrono::milliseconds wait_mills = + async_simple::coro::syncAwait(rate_limiter.acquire(1)); ELOG_INFO << "wait for " << wait_mills.count(); } double cost = (current_time_mills() - start_mills) / 1000.0;