Skip to content

Commit

Permalink
fix: remove all requests from set when agent has been backed off max
Browse files Browse the repository at this point in the history
  • Loading branch information
jost-s committed Dec 19, 2024
1 parent 8595a5b commit bc62cad
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 163 deletions.
52 changes: 32 additions & 20 deletions crates/core/src/factories/core_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,20 @@ pub mod config {
pub struct CoreFetchConfig {
/// How many parallel op fetch requests can be made at once. Default: 2.
pub parallel_request_count: u8,
/// Duration in ms to keep an unresponsive agent on the cool-down list. Default: 120_000.
pub cool_down_interval_ms: u64,
/// Duration in ms to keep an unresponsive agent on the back off list. Default: 20_000.
pub back_off_interval_ms: u64,
/// Maximum exponent for back off interval. Back off duration is calculated
/// back_off_interval * 2^back_off_exponent. Default: 4.
pub max_back_off_exponent: u32,
}

impl Default for CoreFetchConfig {
// Maximum back off is 11:40 min.
fn default() -> Self {
Self {
parallel_request_count: 2,
cool_down_interval_ms: 120_000,
back_off_interval_ms: 20_000,
max_back_off_exponent: 4,
}
}
}
Expand Down Expand Up @@ -213,7 +218,8 @@ impl CoreFetch {
),
}));

let mut fetch_tasks = Vec::new();
let mut fetch_tasks =
Vec::with_capacity(config.parallel_request_count as usize);
for _ in 0..config.parallel_request_count {
let task = tokio::task::spawn(CoreFetch::fetch_task(
state.clone(),
Expand Down Expand Up @@ -252,7 +258,7 @@ impl CoreFetch {
continue;
}

lock.back_off_list.is_agent_backing_off(&agent_id)
lock.back_off_list.is_agent_on_back_off(&agent_id)
};

// Send request if agent is not on back off list.
Expand All @@ -265,14 +271,11 @@ impl CoreFetch {
{
Some(url) => url,
None => {
// Agent not in peer store. Remove all associated op ids.
let mut lock = state.lock().unwrap();
lock.requests = lock
state
.lock()
.unwrap()
.requests
.clone()
.into_iter()
.filter(|(_, a)| *a != agent_id)
.collect();
.retain(|(_, a)| *a != agent_id);
continue;
}
};
Expand All @@ -290,6 +293,7 @@ impl CoreFetch {
.await
{
Ok(()) => {
// If agent was on back off list, remove them.
state
.lock()
.unwrap()
Expand All @@ -301,12 +305,13 @@ impl CoreFetch {
let mut lock = state.lock().unwrap();
lock.back_off_list.back_off_agent(&agent_id);

// If max back off interval has expired for the agent,
// give up on requesting ops from them.
if lock
.back_off_list
.is_agent_at_max_back_off(&agent_id)
.has_max_back_off_expired(&agent_id)
{
lock.requests
.remove(&(op_id.clone(), agent_id.clone()));
lock.requests.retain(|(_, a)| *a != agent_id);
}
}
}
Expand Down Expand Up @@ -378,16 +383,18 @@ impl BackOffList {
pub fn back_off_agent(&mut self, agent_id: &AgentId) {
match self.state.entry(agent_id.clone()) {
Entry::Occupied(mut o) => {
o.get_mut().0 = Instant::now();
o.get_mut().1 = self.max_back_off_exponent.min(o.get().1 + 1);
if o.get().1 != self.max_back_off_exponent {
o.get_mut().0 = Instant::now();
o.get_mut().1 += 1;
}
}
Entry::Vacant(v) => {
v.insert((Instant::now(), 0));
}
}
}

pub fn is_agent_backing_off(&mut self, agent_id: &AgentId) -> bool {
pub fn is_agent_on_back_off(&mut self, agent_id: &AgentId) -> bool {
match self.state.get(agent_id) {
Some((instant, exponent)) => {
instant.elapsed().as_millis()
Expand All @@ -397,10 +404,15 @@ impl BackOffList {
}
}

pub fn is_agent_at_max_back_off(&self, agent_id: &AgentId) -> bool {
pub fn has_max_back_off_expired(&self, agent_id: &AgentId) -> bool {
self.state
.get(agent_id)
.map(|v| v.1 == self.max_back_off_exponent)
.map(|(instant, exponent)| {
*exponent == self.max_back_off_exponent
&& instant.elapsed().as_millis()
> (self.back_off_interval * 2_u64.pow(*exponent))
as u128
})
.unwrap_or(false)
}

Expand Down
Loading

0 comments on commit bc62cad

Please sign in to comment.