Skip to content

Commit

Permalink
feat(core): add response queue to fetch module
Browse files Browse the repository at this point in the history
  • Loading branch information
jost-s committed Jan 3, 2025
1 parent 3942008 commit edbf162
Show file tree
Hide file tree
Showing 7 changed files with 990 additions and 647 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ axum = { workspace = true, default-features = false, features = [
"tokio",
] }
ed25519-dalek = { workspace = true, features = ["rand_core"] }
kitsune2_memory = { workspace = true }
rand = { workspace = true }
tokio = { workspace = true, features = ["full"] }
164 changes: 134 additions & 30 deletions crates/core/src/factories/core_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
//!
//! ### Fetch tasks
//!
//! A channel acts as the queue structure for the fetch tasks. Requests to send are passed
//! #### Requests
//!
//! A channel acts as the queue structure for the fetch request tasks. Ops to fetch are sent
//! one by one through the channel to the receiving tasks running in parallel. The flow
//! of sending fetch requests is as follows:
//!
Expand All @@ -36,24 +38,38 @@
//! from the set of requests if it is received in the meantime, and thus prevent a redundant
//! fetch request.
//!
//! #### Responses
//!
//! Similarly to fetch requests, a channel serves as a queue for responses to fetch requests. The queue
//! has the following properties:
//! - Simple queue which processes items in the order of the incoming requests.
//! - Requests consist of a list of requested op ids and the requesting agent id.
//! - Attempts to look up the op in the data store and send response are done once.
//! - Requests for data that the remote doesn't hold should be logged.
//! - If none of the requested ops could be read from the store, no response is sent.
//! - If sending or reception fails, it's the caller's responsibility to request again.
//!
//! ### Incoming op task
//!
//! - Incoming op is written to the data store.
//! - Once persisted successfully, op is removed from the set of ops to fetch.
use kitsune2_api::{
builder,
fetch::{
serialize_op_ids, serialize_ops, DynFetch, DynFetchFactory, Fetch,
FetchFactory,
},
op_store, peer_store,
transport::DynTransport,
AgentId, BoxFut, K2Result, OpId, SpaceId, Url,
};
use std::{
collections::HashSet,
sync::{Arc, Mutex},
};

use back_off::BackOffList;
use kitsune2_api::{
builder,
fetch::{serialize_op_ids, DynFetch, DynFetchFactory, Fetch, FetchFactory},
peer_store,
transport::DynTransport,
AgentId, BoxFut, K2Result, OpId, SpaceId, Url,
};
use tokio::{
sync::mpsc::{channel, Receiver, Sender},
task::JoinHandle,
Expand Down Expand Up @@ -129,6 +145,7 @@ impl FetchFactory for CoreFetchFactory {
builder: Arc<builder::Builder>,
space_id: SpaceId,
peer_store: peer_store::DynPeerStore,
op_store: op_store::DynOpStore,
transport: DynTransport,
) -> BoxFut<'static, K2Result<DynFetch>> {
Box::pin(async move {
Expand All @@ -138,6 +155,7 @@ impl FetchFactory for CoreFetchFactory {
config.core_fetch,
space_id,
peer_store,
op_store,
transport,
));
Ok(out)
Expand All @@ -146,6 +164,7 @@ impl FetchFactory for CoreFetchFactory {
}

type FetchRequest = (OpId, AgentId);
type FetchResponse = (Vec<OpId>, AgentId);

#[derive(Debug)]
struct State {
Expand All @@ -156,18 +175,22 @@ struct State {
#[derive(Debug)]
struct CoreFetch {
state: Arc<Mutex<State>>,
fetch_queue_tx: Sender<FetchRequest>,
fetch_tasks: Vec<JoinHandle<()>>,
request_tx: Sender<FetchRequest>,
response_tx: Sender<FetchResponse>,
tasks: Vec<JoinHandle<()>>,
}

impl CoreFetch {
fn new(
config: CoreFetchConfig,
space_id: SpaceId,
peer_store: peer_store::DynPeerStore,
op_store: op_store::DynOpStore,
transport: DynTransport,
) -> Self {
Self::spawn_fetch_tasks(config, space_id, peer_store, transport)
Self::spawn_fetch_tasks(
config, space_id, peer_store, op_store, transport,
)
}
}

Expand All @@ -192,29 +215,49 @@ impl Fetch for CoreFetch {
// Pass requests to fetch tasks.
for op_id in op_list {
if let Err(err) =
self.fetch_queue_tx.send((op_id, source.clone())).await
self.request_tx.send((op_id, source.clone())).await
{
tracing::warn!(
"could not pass fetch request to fetch task: {err}"
"could not pass fetch request to request task: {err}"
);
}
}

Ok(())
})
}

fn respond_with_ops(
&self,
ops: Vec<OpId>,
to: AgentId,
) -> BoxFut<'_, K2Result<()>> {
Box::pin(async move {
if let Err(err) = self.response_tx.send((ops, to)).await {
tracing::warn!(
"could not pass fetch request to response task: {err}"
);
}
Ok(())
})
}
}

impl CoreFetch {
pub fn spawn_fetch_tasks(
config: CoreFetchConfig,
space_id: SpaceId,
peer_store: peer_store::DynPeerStore,
op_store: op_store::DynOpStore,
transport: DynTransport,
) -> Self {
// Create a channel to send new requests to fetch to the tasks. This is in effect the fetch queue.
let (fetch_queue_tx, fetch_queue_rx) = channel::<FetchRequest>(16_384);
let fetch_queue_rx = Arc::new(tokio::sync::Mutex::new(fetch_queue_rx));
// Create a channel to send op requests to request tasks. This is the fetch queue.
let (request_tx, request_rx) = channel::<FetchRequest>(16_384);
let request_rx = Arc::new(tokio::sync::Mutex::new(request_rx));

// Create another channel to send incoming requests to the response task. This is the fetch response queue.
let (response_tx, response_rx) = channel::<FetchResponse>(16_384);
let response_rx = Arc::new(tokio::sync::Mutex::new(response_rx));

let state = Arc::new(Mutex::new(State {
requests: HashSet::new(),
Expand All @@ -225,28 +268,41 @@ impl CoreFetch {
),
}));

let mut fetch_tasks =
let mut tasks =
Vec::with_capacity(config.parallel_request_count as usize);
// Spawn request tasks.
for _ in 0..config.parallel_request_count {
let task = tokio::task::spawn(CoreFetch::fetch_task(
state.clone(),
fetch_queue_tx.clone(),
fetch_queue_rx.clone(),
peer_store.clone(),
space_id.clone(),
transport.clone(),
));
fetch_tasks.push(task);
let request_task =
tokio::task::spawn(CoreFetch::spawn_request_task(
state.clone(),
request_tx.clone(),
request_rx.clone(),
peer_store.clone(),
space_id.clone(),
transport.clone(),
));
tasks.push(request_task);
}

// Spawn response task.
let response_task = tokio::task::spawn(CoreFetch::spawn_response_task(
response_rx,
peer_store,
op_store,
transport.clone(),
space_id.clone(),
));
tasks.push(response_task);

Self {
state,
fetch_queue_tx,
fetch_tasks,
request_tx,
response_tx,
tasks,
}
}

async fn fetch_task(
async fn spawn_request_task(
state: Arc<Mutex<State>>,
fetch_request_tx: Sender<FetchRequest>,
fetch_request_rx: Arc<tokio::sync::Mutex<Receiver<FetchRequest>>>,
Expand Down Expand Up @@ -335,6 +391,54 @@ impl CoreFetch {
}
}

async fn spawn_response_task(
response_rx: Arc<tokio::sync::Mutex<Receiver<FetchResponse>>>,
peer_store: peer_store::DynPeerStore,
op_store: op_store::DynOpStore,
transport: DynTransport,
space_id: SpaceId,
) {
while let Some((op_ids, agent_id)) =
response_rx.lock().await.recv().await
{
tracing::debug!(?op_ids, ?agent_id, "incoming request");
let peer = match CoreFetch::get_peer_url_from_store(
&agent_id,
peer_store.clone(),
)
.await
{
None => continue,
Some(url) => url,
};

// Fetch ops to send from store.
let ops = match op_store.retrieve_ops(op_ids.clone()).await {
Err(err) => {
tracing::error!("could not read ops from store: {err}");
continue;
}
Ok(ops) => ops,
};
if ops.is_empty() {
// Do not send a response when no ops could be read.
continue;
}
let data = serialize_ops(ops);

if let Err(err) = transport
.send_module(peer, space_id.clone(), MOD_NAME.to_string(), data)
.await
{
tracing::warn!(
?op_ids,
?agent_id,
"could not send ops to requesting agent: {err}"
);
}
}
}

async fn get_peer_url_from_store(
agent_id: &AgentId,
peer_store: peer_store::DynPeerStore,
Expand Down Expand Up @@ -365,7 +469,7 @@ impl CoreFetch {

impl Drop for CoreFetch {
fn drop(&mut self) {
for t in self.fetch_tasks.iter() {
for t in self.tasks.iter() {
t.abort();
}
}
Expand Down
Loading

0 comments on commit edbf162

Please sign in to comment.