Skip to content

Commit

Permalink
format rate_limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenRi committed Oct 31, 2023
1 parent a05da35 commit f731a0e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 25 deletions.
67 changes: 44 additions & 23 deletions include/ylt/coro_io/rate_limiter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@
#include <algorithm>
#include <chrono>
#include <iostream>
#include <ylt/easylog.hpp>
#include <ylt/coro_io/coro_io.hpp>
#include <ylt/easylog.hpp>

namespace coro_io {
class rate_limiter {
public:
async_simple::coro::Lazy<std::chrono::milliseconds> acquire(int permits) {
co_await this->lock_.coLock();
std::chrono::milliseconds wait_mills = reserve_and_get_wait_length(permits, current_time_mills());
std::chrono::milliseconds wait_mills =
reserve_and_get_wait_length(permits, current_time_mills());
this->lock_.unlock();
co_await coro_io::sleep_for(std::chrono::milliseconds(wait_mills));
co_return wait_mills;
Expand All @@ -42,18 +43,25 @@ class rate_limiter {
}

protected:
virtual void do_set_rate(double permitsPerSecond, std::chrono::steady_clock::time_point now_micros) = 0;
virtual std::chrono::steady_clock::time_point reserve_earliest_available(int permits, std::chrono::steady_clock::time_point now_micros) = 0;
virtual void do_set_rate(
double permitsPerSecond,
std::chrono::steady_clock::time_point now_micros) = 0;
virtual std::chrono::steady_clock::time_point reserve_earliest_available(
int permits, std::chrono::steady_clock::time_point now_micros) = 0;
std::chrono::steady_clock::time_point current_time_mills() {
return std::chrono::steady_clock::now();
//return std::chrono::duration_cast<std::chrono::milliseconds>(
// now.time_since_epoch());
// return std::chrono::duration_cast<std::chrono::milliseconds>(
// now.time_since_epoch());
}

private:
std::chrono::milliseconds reserve_and_get_wait_length(int permits, std::chrono::steady_clock::time_point now_micros) {
std::chrono::steady_clock::time_point moment_available = reserve_earliest_available(permits, now_micros);
std::chrono::milliseconds diff_mills = std::chrono::duration_cast<std::chrono::milliseconds>(moment_available - now_micros);
std::chrono::milliseconds reserve_and_get_wait_length(
int permits, std::chrono::steady_clock::time_point now_micros) {
std::chrono::steady_clock::time_point moment_available =
reserve_earliest_available(permits, now_micros);
std::chrono::milliseconds diff_mills =
std::chrono::duration_cast<std::chrono::milliseconds>(moment_available -
now_micros);
return std::max(diff_mills, std::chrono::milliseconds(0));
}

Expand All @@ -63,37 +71,49 @@ class rate_limiter {
class abstract_smooth_rate_limiter : public rate_limiter {
protected:
virtual void do_set_rate(double permits_per_second,
double stable_internal_micros) = 0;
double stable_internal_micros) = 0;
virtual long stored_permits_to_wait_time(double stored_permits,
double permits_to_take) = 0;
double permits_to_take) = 0;
virtual double cool_down_internal_micros() = 0;
void resync(std::chrono::steady_clock::time_point now_micros) {
// if next_free_ticket is in the past, resync to now
ELOG_DEBUG << "now micros: " << std::chrono::duration_cast<std::chrono::milliseconds>(now_micros.time_since_epoch()).count() << ", next_free_ticket_micros_: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(this->next_free_ticket_micros_.time_since_epoch()).count();
ELOG_DEBUG << "now micros: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
now_micros.time_since_epoch())
.count()
<< ", next_free_ticket_micros_: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
this->next_free_ticket_micros_.time_since_epoch())
.count();
if (now_micros > this->next_free_ticket_micros_) {
std::chrono::milliseconds diff_mills = std::chrono::duration_cast<std::chrono::milliseconds>(now_micros - this->next_free_ticket_micros_);
std::chrono::milliseconds diff_mills =
std::chrono::duration_cast<std::chrono::milliseconds>(
now_micros - this->next_free_ticket_micros_);
double newPermits = diff_mills.count() / cool_down_internal_micros();
this->stored_permits_ =
std::min(this->max_permits_, this->stored_permits_ + newPermits);
this->next_free_ticket_micros_ = now_micros;
}
}
void do_set_rate(double permits_per_second, std::chrono::steady_clock::time_point now_micros) override {
void do_set_rate(double permits_per_second,
std::chrono::steady_clock::time_point now_micros) override {
resync(now_micros);
double stable_internal_micros = 1000 / permits_per_second;
this->stable_internal_micros_ = stable_internal_micros;
do_set_rate(permits_per_second, stable_internal_micros);
}
std::chrono::steady_clock::time_point reserve_earliest_available(int required_permits, std::chrono::steady_clock::time_point now_micros) {
std::chrono::steady_clock::time_point reserve_earliest_available(
int required_permits, std::chrono::steady_clock::time_point now_micros) {
resync(now_micros);
std::chrono::steady_clock::time_point return_value = this->next_free_ticket_micros_;
std::chrono::steady_clock::time_point return_value =
this->next_free_ticket_micros_;
double stored_permits_to_spend =
std::min((double)required_permits, this->stored_permits_);
double fresh_permits = required_permits - stored_permits_to_spend;
std::chrono::milliseconds wait_micros = std::chrono::milliseconds(stored_permits_to_wait_time(this->stored_permits_,
stored_permits_to_spend) +
(long)(fresh_permits * this->stable_internal_micros_));
std::chrono::milliseconds wait_micros = std::chrono::milliseconds(
stored_permits_to_wait_time(this->stored_permits_,
stored_permits_to_spend) +
(long)(fresh_permits * this->stable_internal_micros_));
this->next_free_ticket_micros_ += wait_micros;
this->stored_permits_ -= stored_permits_to_spend;
return return_value;
Expand Down Expand Up @@ -139,16 +159,17 @@ class smooth_bursty_rate_limiter : public abstract_smooth_rate_limiter {
<< ", stored_permits_:" << this->stored_permits_;
}

long stored_permits_to_wait_time(double stored_permits, double permits_to_take) {
long stored_permits_to_wait_time(double stored_permits,
double permits_to_take) {
return 0L;
}

double cool_down_internal_micros() { return this->stable_internal_micros_; }

private:
/**
* The work(permits) of how many seconds can be saved up if the rate_limiter is
* unused.
* The work(permits) of how many seconds can be saved up if the rate_limiter
* is unused.
*/
double max_burst_seconds_ = 0;
};
Expand Down
6 changes: 4 additions & 2 deletions src/coro_io/tests/test_rate_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ long current_time_mills() {

TEST_CASE("test smooth_bursty_rate_limiter simple") {
coro_io::smooth_bursty_rate_limiter rate_limiter(1);
std::chrono::milliseconds wait_time = async_simple::coro::syncAwait(rate_limiter.acquire(1));
std::chrono::milliseconds wait_time =
async_simple::coro::syncAwait(rate_limiter.acquire(1));
CHECK_EQ(0, wait_time.count());
}

Expand Down Expand Up @@ -45,7 +46,8 @@ TEST_CASE("test smooth_bursty_rate_limiter single thread") {
long start_mills = current_time_mills();
coro_io::smooth_bursty_rate_limiter rate_limiter(permits_per_second);
for (int i = 0; i < permits_to_acquire; i++) {
std::chrono::milliseconds wait_mills = async_simple::coro::syncAwait(rate_limiter.acquire(1));
std::chrono::milliseconds wait_mills =
async_simple::coro::syncAwait(rate_limiter.acquire(1));
ELOG_INFO << "wait for " << wait_mills.count();
}
double cost = (current_time_mills() - start_mills) / 1000.0;
Expand Down

0 comments on commit f731a0e

Please sign in to comment.