Skip to content

Commit

Permalink
feat(fetch): improve back off feature for unresponsive agents (#60)
Browse files Browse the repository at this point in the history
* feat: implement basic fetch queue type

* refactor: move trait to api & clean up

* feat: add fetch task skeleton

* feat: add batching to fetch task

* docs: udpate fetch queue description

* style: fix lints

* refactor: factorize fetch queue

* refactor: add config and builder

* feat: add request loop logic & test

* refactor: index by op id instead of agent id

* feat: cancel pause when ops are added

* refactor: fetch ops individually with a simple queue

* refactor: rename FetchQueue to Fetch

* refactor: rename package to fetch

* docs: update fetch specs

* test: add test for parallel request count

* test: add multi op multi agent test

* fix lints

* refactor: move fetch into core crate

* refactor(fetch): reinstate loop waking mechanism

* fix(fetch): pause bf re-adding op to give other requests a chance to be processed

* test: fix flaky tests

* style: fix lints

* style: fix docs

* test: increase timeout for slow windows

* test: increase timeout - that clearly is too long even for slow windows

* feat: add cool down list for unresponsive agents

* fix: shorten check intervals in tests

* test: fix race cond & add unhappy fetch test

* style: lint fix

* feat(fetch): add debug to fetch trait

* refactor(fetch): use channel as fetch queue

* refactor(fetch): replace index map by simple hash set

* feat: re-insert fetch requests into the channel & add queue test

* test: update multi agent test

* refactor: split fetch task out into separate fn

* build: move rand to workspace deps

* refactor: maange cool-down list

* refactor: remove config from fetch struct

* refactor: inner struct

* style: fix lints

* refactor: inner struct somemore

* refactor: combine state objects in one mutex

* Update crates/core/src/factories/core_fetch.rs

Co-authored-by: ThetaSinner <[email protected]>

* refactor(fetch): default to 120 secs as cool down interval

* refactor: improve op adding efficiency

* refactor: try_send instead of send to fetch queue to prevent dead-lock

* rename task spawner

* refactor: convert eprintln to tracing

* refactor: encapsulate cool down list into its own struct

* refactor: use actual transport interface

* refactor: increase fetch queue buffer to 16_384

* refactor: move task spawning back into constructor

* fix: use actual mod name

* feat: add drop impl to inner to abort fetch tasks

* refactor: replace tokio mutexes by std ones

* refactor: add space id & peer store to factory creator

* style: clippy

* feat: add peer store to fetch factory

* test: add agents to peer store

* refactor: move agent builder to test utils

* refactor: order tests some more

* refactor: merge Inner into CoreFetch

* refactor: release lock on ops after adding elements

* refactor: remove op/agent from ops to fetch set when re-inserting into queue fails

* refactor: simplify instant comparison

* refactor: move peer url getter to separate fn

* doc: update text

* refactor: replace try_send by send when adding ops to fetch queue

* feat(fetch): add proto definition for ops to fetch

* test: adapt tests to op message

* refactor: re-insert ops into queue only when peer is responsive

* refactor: add helpers for ser/de op ids

* refactor: simplify op id from impls

* style: append line break to fetch.proto

* style: add empty line to separate test sections

* test: de-flakify fetch queue test

* test: test that ops are removed from set when agent is not in peer store

* refactor: rename AgentBuild to AgentBuilder

* refactor: simplify enc/dec test

* test: fix typos

* refactor: put unresponsive agents on exponential back off

* refactor: check for max back off at the end

* refactor: set vec capacity for fetch tasks

* refactor: simplify request retention

* refactor: rename fn is_agent_backing_off to is_agent_on_back_off

* fix: remove request when max back off interval expired

* refactor: change back_off_interval to 20 s

* test: increase timeout due to flakiness

* wip

* fix: remove all requests from set when agent has been backed off max

* fix: the lints

* refactor: move back off into separate module

---------

Co-authored-by: Jost Schulte <[email protected]>
Co-authored-by: ThetaSinner <[email protected]>
  • Loading branch information
3 people authored Jan 3, 2025
1 parent d236dfb commit d9b7132
Show file tree
Hide file tree
Showing 6 changed files with 542 additions and 299 deletions.
96 changes: 95 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ kitsune2_api = { version = "0.0.1-alpha", path = "crates/api" }
async-channel = "2.3.1"
# this is used by bootstrap_srv as the http server implementation.
axum = { version = "0.7.9", default-features = false }
# used by fetch module to back off requests to unresponsive agents
backon = "1.3"
# debugging is far easier when you can see short byte arrays
# as base64 instead of decimal u8s.
base64 = "0.22.1"
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ categories = ["network-programming"]
edition = "2021"

[dependencies]
backon = { workspace = true }
bytes = { workspace = true }
ed25519-dalek = { workspace = true }
futures = { workspace = true }
Expand Down
Loading

0 comments on commit d9b7132

Please sign in to comment.