-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(fetch): increment back off times #56
Conversation
// Re-insert the fetch request into the queue. | ||
if let Err(err) = fetch_request_tx | ||
.try_send((op_id.clone(), agent_id.clone())) | ||
{ | ||
tracing::warn!("could not re-insert fetch request for op {op_id} to agent {agent_id} into queue: {err}"); | ||
// Remove op id/agent id from set to prevent build-up of state. | ||
state | ||
.lock() | ||
.unwrap() | ||
.ops | ||
.remove(&(op_id, agent_id)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always re-insert requests back into queue, not only when successful.
// Agent is unresponsive. | ||
// Remove associated op ids from set to prevent build-up of state. | ||
.back_off_list | ||
.remove_agent(&agent_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove agent from back off list after a successful send (in case they were on the list, otherwise no-op).
.is_agent_at_max_back_off(&agent_id) | ||
{ | ||
lock.requests | ||
.remove(&(op_id.clone(), agent_id.clone())); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the agent has been backed off max number of times, stop requesting the op from that agent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd expect to see a remove all for the ops here and taking the agent off the back off list?
If we're done waiting, then I think what we want is to drop all ops from this source and let the loop filter the incoming channel based on the op set.
That means if the source comes back online then it can resume fetching before the next max timeout elapses. I'm also thinking we'll end up making a connection request for each outstanding op/source pair for this source at at the max backoff interval?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beautiful idea! Yes, now it makes sense to remove all ops of the unresponsive agent.
The agent could be removed from the list explicitly, but any successful request to them will do that anyway.
pub fn is_agent_at_max_back_off(&self, agent_id: &AgentId) -> bool { | ||
self.state | ||
.get(agent_id) | ||
.map(|v| v.1 == self.max_back_off_exponent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly confusing here too, because we'll actually only wait for delay * 2^(exponent - 1) as the last back off. The agent will report being at max back off before they have necessarily waited for the whole period. I think the logic actually works here but I'd be more confident if this was a check that the final window had expired.
Though, see the comment above about a library that does this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed name and also check of whether the back off interval has elapsed.
back_off_list.back_off_agent(&agent_id); | ||
assert!(back_off_list.is_agent_backing_off(&agent_id)); | ||
|
||
std::thread::sleep(Duration::from_millis(back_off_interval_ms + 1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be done by manipulating the clock with Tokio?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can't. The function is_agent_backing_off
would have to use tokio::time::Instant::now()
too for this to work. std::time::Instant
s are unaffected by it.
@ThetaSinner Regarding using backon, I've started doing that and am happy to change to it. It is not much less custom code, because backon just gives us a series of durations. We still need to store at which back off interval the agent currently is, when the interval started and if all configured intervals have elapsed. I'll push an alternative branch with it tomorrow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question about terminating sources. Otherwise looking good.
If backon
doesn't work as well as I hoped in this case then that's okay. Up to you whether that's adding value or just dependencies we can do without :)
@@ -176,66 +176,7 @@ async fn fetch_queue() { | |||
} | |||
|
|||
#[tokio::test(flavor = "multi_thread")] | |||
async fn happy_multi_op_fetch_from_single_agent() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleting this and the other test, because they're both single agent tests and are kinda redundant while having a multi agent test.
superseded by #60 |
Previously fetch requests were only re-inserted into the queue when successfully sent. That renders the cool down/back off list ineffective, because unresponsive agents are put on the list but requests to them are never retried.
Requests are always added to the queue now. The concern about state building up if agents on the back off list never become responsive again is addressed by introducing an incremental back off time with a maximum. When the maximum back off is hit for an agent, the request is removed from the set of fetch requests to make. This will effectively prevent the build-up of state and leads to giving up on completely unresponsive agents.