diff --git a/Cargo.lock b/Cargo.lock index 87e28826..2b025770 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,9 +225,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.3" +version = "1.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27f657647bcff5394bf56c7317665bbf790a137a50eaaa5c6bfbb9e27a518f2d" +checksum = "9157bbaa6b165880c27a4293a474c91cdcf265cc68cc829bf10be0964a391caf" dependencies = [ "shlex", ] @@ -326,9 +326,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.20" +version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crypto-common" @@ -720,9 +720,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.5.1" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97818827ef4f364230e16705d4706e2897df2bb60617d6ca15d598025a3c481f" +checksum = "256fb8d4bd6413123cc9d91832d78325c48ff41677595be797d90f42969beae0" dependencies = [ "bytes", "futures-channel", @@ -970,6 +970,7 @@ dependencies = [ "ed25519-dalek", "futures", "kitsune2_api", + "prost", "rand", "serde", "serde_json", @@ -1079,9 +1080,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "miniz_oxide" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +checksum = "a2ef2593ffb6958c941575cee70c8e257438749971869c4ae5acf6f91a168a61" dependencies = [ "adler2", ] @@ -1455,9 +1456,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.10.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" +checksum = "d2bf47e6ff922db3825eb750c4e2ff784c6ff8fb9e13046ef6a1d1c5401b0b37" [[package]] name = "rustls-webpki" @@ -1682,18 +1683,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.6" +version = "2.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fec2a1820ebd077e2b90c4df007bebf344cd394098a13c563957d0afc83ea47" +checksum = "93605438cbd668185516ab499d589afb7ee1859ea3d5fc8f6b0755e1c7443767" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "2.0.6" +version = "2.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d65750cab40f4ff1929fb1ba509e9914eb756131cef4210da8d5d700d26f6312" +checksum = "e1d8749b4531af2117677a5fcd12b1348a3fe2b81e36e61ffeac5c4aa3273e36" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index dc368c0d..0db4f32e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "crates/test_utils", "crates/tool_proto_build", ] + resolver = "2" [workspace.dependencies] @@ -75,6 +76,4 @@ kitsune2_memory = { path = "crates/memory" } kitsune2_test_utils = { path = "crates/test_utils" } rand = "0.8.5" -# this is also used by the binary bootstrap_srv. But, since this monorepo -# is largely libraries, leaving this in this section. tracing-subscriber = "0.3" diff --git a/crates/api/proto/fetch.proto b/crates/api/proto/fetch.proto new file mode 100644 index 00000000..c0afc583 --- /dev/null +++ b/crates/api/proto/fetch.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; + +package kitsune2.fetch; + +// A list of op ids. +message OpIds { + // Op ids. + repeated bytes data = 1; +} diff --git a/crates/api/proto/gen/kitsune2.fetch.rs b/crates/api/proto/gen/kitsune2.fetch.rs new file mode 100644 index 00000000..63d77927 --- /dev/null +++ b/crates/api/proto/gen/kitsune2.fetch.rs @@ -0,0 +1,8 @@ +// This file is @generated by prost-build. +/// A list of op ids. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct OpIds { + /// Op ids. + #[prost(bytes = "bytes", repeated, tag = "1")] + pub data: ::prost::alloc::vec::Vec<::prost::bytes::Bytes>, +} diff --git a/crates/api/src/builder.rs b/crates/api/src/builder.rs index b85e7abe..8b0409c1 100644 --- a/crates/api/src/builder.rs +++ b/crates/api/src/builder.rs @@ -31,6 +31,10 @@ pub struct Builder { /// [bootstrap::Bootstrap] instances for initial WAN discovery. pub bootstrap: bootstrap::DynBootstrapFactory, + /// The [fetch::FetchFactory] to be used for creating + /// [fetch::Fetch] instances. + pub fetch: fetch::DynFetchFactory, + /// The [transport::TransportFactory] to be used for creating /// [transport::Transport] instances. pub transport: transport::DynTransportFactory, @@ -48,6 +52,7 @@ impl Builder { space, peer_store, bootstrap, + fetch, transport, } = self; @@ -55,6 +60,7 @@ impl Builder { space.default_config(config)?; peer_store.default_config(config)?; bootstrap.default_config(config)?; + fetch.default_config(config)?; transport.default_config(config)?; Ok(()) diff --git a/crates/api/src/fetch.rs b/crates/api/src/fetch.rs new file mode 100644 index 00000000..03a80bfc --- /dev/null +++ b/crates/api/src/fetch.rs @@ -0,0 +1,104 @@ +//! Kitsune2 fetch types. + +use std::sync::Arc; + +use prost::Message; + +use crate::{ + builder, config, peer_store::DynPeerStore, transport::DynTransport, + AgentId, BoxFut, K2Error, K2Result, OpId, SpaceId, +}; + +include!("../proto/gen/kitsune2.fetch.rs"); + +impl From> for OpIds { + fn from(value: Vec) -> Self { + Self { + data: value.into_iter().map(Into::into).collect(), + } + } +} + +impl From for Vec { + fn from(value: OpIds) -> Self { + value.data.into_iter().map(Into::into).collect() + } +} + +/// Serialize list of op ids for sending over the wire. +pub fn serialize_op_ids(value: Vec) -> bytes::Bytes { + let bytes = OpIds::from(value).encode_to_vec(); + bytes::Bytes::copy_from_slice(&bytes) +} + +/// Deserialize list of op ids. +pub fn deserialize_op_ids(value: bytes::Bytes) -> K2Result> { + let op_ids = OpIds::decode(value).map_err(K2Error::other)?; + let vec = Vec::from(op_ids); + Ok(vec) +} + +/// Trait for implementing a fetch module to fetch ops from other agents. +pub trait Fetch: 'static + Send + Sync + std::fmt::Debug { + /// Add op ids to be fetched. + fn add_ops( + &self, + op_list: Vec, + source: AgentId, + ) -> BoxFut<'_, K2Result<()>>; +} + +/// Trait object [Fetch]. +pub type DynFetch = Arc; + +/// A factory for creating Fetch instances. +pub trait FetchFactory: 'static + Send + Sync + std::fmt::Debug { + /// Help the builder construct a default config from the chosen + /// module factories. + fn default_config(&self, config: &mut config::Config) -> K2Result<()>; + + /// Construct a Fetch instance. + fn create( + &self, + builder: Arc, + space_id: SpaceId, + peer_store: DynPeerStore, + transport: DynTransport, + ) -> BoxFut<'static, K2Result>; +} + +/// Trait object [FetchFactory]. +pub type DynFetchFactory = Arc; + +#[cfg(test)] +mod test { + use super::*; + use prost::Message; + + #[test] + fn happy_encode_decode() { + let op_id_1 = OpId::from(bytes::Bytes::from_static(b"some_op_id")); + let op_id_2 = OpId::from(bytes::Bytes::from_static(b"another_op_id")); + let op_id_vec = vec![op_id_1, op_id_2]; + let op_ids = OpIds::from(op_id_vec.clone()); + + let op_ids_enc = op_ids.encode_to_vec(); + let op_ids_dec = OpIds::decode(op_ids_enc.as_slice()).unwrap(); + let op_ids_dec_vec = Vec::from(op_ids_dec.clone()); + + assert_eq!(op_ids, op_ids_dec); + assert_eq!(op_id_vec, op_ids_dec_vec); + } + + #[test] + fn bytes_from_op_ids() { + let op_id_1 = OpId::from(bytes::Bytes::from_static(b"some_op_id")); + let op_id_2 = OpId::from(bytes::Bytes::from_static(b"another_op_id")); + let op_id_vec = vec![op_id_1, op_id_2]; + + let bytes = serialize_op_ids(op_id_vec.clone()); + let op_id_vec_deserialized = deserialize_op_ids(bytes.clone()).unwrap(); + + assert_eq!(op_id_vec_deserialized, op_id_vec); + } +} diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index e67318fb..de01277f 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -57,6 +57,8 @@ pub use id::{AgentId, OpId, SpaceId}; mod timestamp; pub use timestamp::*; +pub mod fetch; + pub mod op_store; pub use op_store::*; diff --git a/crates/api/src/transport.rs b/crates/api/src/transport.rs index 9f416380..baf9a1a5 100644 --- a/crates/api/src/transport.rs +++ b/crates/api/src/transport.rs @@ -139,7 +139,7 @@ pub trait TxImp: 'static + Send + Sync + std::fmt::Debug { pub type DynTxImp = Arc; /// A high-level wrapper around a low-level [DynTxImp] transport implementation. -pub trait Transport { +pub trait Transport: Send + Sync { /// Register a space handler for receiving incoming notifications. /// /// Panics if you attempt to register a duplicate handler for diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index b00d387b..e526bdac 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -15,6 +15,7 @@ bytes = { workspace = true } ed25519-dalek = { workspace = true } futures = { workspace = true } kitsune2_api = { workspace = true } +prost = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["macros", "rt", "sync", "time"] } diff --git a/crates/core/src/factories.rs b/crates/core/src/factories.rs index 77ae1278..f17d2d5d 100644 --- a/crates/core/src/factories.rs +++ b/crates/core/src/factories.rs @@ -15,5 +15,8 @@ pub use mem_bootstrap::*; mod core_bootstrap; pub use core_bootstrap::*; +mod core_fetch; +pub use core_fetch::*; + mod mem_transport; pub use mem_transport::*; diff --git a/crates/core/src/factories/core_fetch.rs b/crates/core/src/factories/core_fetch.rs new file mode 100644 index 00000000..b0ae2fef --- /dev/null +++ b/crates/core/src/factories/core_fetch.rs @@ -0,0 +1,388 @@ +//! Fetch is a Kitsune2 module for fetching ops from peers. +//! +//! In particular it tracks which ops need to be fetched from which agents, +//! sends fetch requests and processes incoming responses to these requests. +//! +//! It consists of multiple parts: +//! - State object that tracks op and agent ids in memory +//! - Fetch tasks that request tracked ops from agents +//! - Incoming op task that processes incoming responses to op requests by +//! - persisting ops to the data store +//! - removing op ids from in-memory data object +//! +//! ### State object [CoreFetch] +//! +//! - Exposes public method [CoreFetch::add_ops] that takes a list of op ids and an agent id. +//! - Stores pairs of ([OpId][AgentId]) in a set. +//! - A hash set is used to look up elements by key efficiently. Ops may be added redundantly +//! to the set with different sources to fetch from, so the set is keyed by op and agent id together. +//! +//! ### Fetch tasks +//! +//! A channel acts as the queue structure for the fetch tasks. Ops to fetch are sent +//! one by one through the channel to the receiving tasks running in parallel. The flow +//! of sending fetch requests is as follows: +//! +//! - Await fetch requests for ([OpId], [AgentId]) from the queue. +//! - Check if requested op id/agent id is still on the list of ops to fetch. +//! - In case the op has been received in the meantime and no longer needs to be fetched, +//! do nothing. +//! - Otherwise proceed. +//! - Check if agent is on a cool-down list of unresponsive agents. +//! - Dispatch request for op id from agent to transport module. +//! - If agent is unresponsive, put them on cool-down list. +//! - Re-send requested ([OpId], [AgentId]) to the queue again. It will be removed +//! from the list of ops to fetch if it is received in the meantime, and thus prevent a redundant +//! fetch request. +//! +//! ### Incoming op task +//! +//! - Incoming op is written to the data store. +//! - Once persisted successfully, op is removed from the set of ops to fetch. + +use std::{ + collections::{HashMap, HashSet}, + sync::{Arc, Mutex}, + time::Instant, +}; + +use kitsune2_api::{ + builder, + config::ModConfig, + fetch::{serialize_op_ids, DynFetch, DynFetchFactory, Fetch, FetchFactory}, + peer_store, + transport::DynTransport, + AgentId, BoxFut, K2Result, OpId, SpaceId, Url, +}; +use tokio::{ + sync::mpsc::{channel, Receiver, Sender}, + task::JoinHandle, +}; + +const MOD_NAME: &str = "Fetch"; + +/// A production-ready fetch module. +#[derive(Debug)] +pub struct CoreFetchFactory {} + +impl CoreFetchFactory { + /// Construct a new CoreFetchFactory. + pub fn create() -> DynFetchFactory { + Arc::new(Self {}) + } +} + +impl FetchFactory for CoreFetchFactory { + fn default_config( + &self, + config: &mut kitsune2_api::config::Config, + ) -> K2Result<()> { + config.add_default_module_config::( + MOD_NAME.to_string(), + )?; + Ok(()) + } + + fn create( + &self, + builder: Arc, + space_id: SpaceId, + peer_store: peer_store::DynPeerStore, + transport: DynTransport, + ) -> BoxFut<'static, K2Result> { + Box::pin(async move { + let config = builder.config.get_module_config(MOD_NAME)?; + let out: DynFetch = Arc::new(CoreFetch::new( + config, space_id, peer_store, transport, + )); + Ok(out) + }) + } +} + +/// Configuration parameters for [CoreFetchFactory]. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CoreFetchConfig { + /// How many parallel op fetch requests can be made at once. Default: 2. + pub parallel_request_count: u8, + /// Duration in ms to keep an unresponsive agent on the cool-down list. Default: 120_000. + pub cool_down_interval_ms: u64, +} + +impl Default for CoreFetchConfig { + fn default() -> Self { + Self { + parallel_request_count: 2, + cool_down_interval_ms: 120_000, + } + } +} + +impl ModConfig for CoreFetchConfig {} + +type FetchRequest = (OpId, AgentId); + +#[derive(Debug)] +struct State { + ops: HashSet, + cool_down_list: CoolDownList, +} + +#[derive(Debug)] +struct CoreFetch { + state: Arc>, + fetch_queue_tx: Sender, + fetch_tasks: Vec>, +} + +impl CoreFetch { + fn new( + config: CoreFetchConfig, + space_id: SpaceId, + peer_store: peer_store::DynPeerStore, + transport: DynTransport, + ) -> Self { + Self::spawn_fetch_tasks(config, space_id, peer_store, transport) + } +} + +impl Fetch for CoreFetch { + fn add_ops( + &self, + op_list: Vec, + source: AgentId, + ) -> BoxFut<'_, K2Result<()>> { + Box::pin(async move { + // Add ops to set. + { + let ops = &mut self.state.lock().unwrap().ops; + ops.extend( + op_list + .clone() + .into_iter() + .map(|op_id| (op_id.clone(), source.clone())), + ); + } + + // Pass ops to fetch tasks. + for op_id in op_list { + if let Err(err) = + self.fetch_queue_tx.send((op_id, source.clone())).await + { + tracing::warn!( + "could not pass fetch request to fetch task: {err}" + ); + } + } + + Ok(()) + }) + } +} + +impl CoreFetch { + pub fn spawn_fetch_tasks( + config: CoreFetchConfig, + space_id: SpaceId, + peer_store: peer_store::DynPeerStore, + transport: DynTransport, + ) -> Self { + // Create a channel to send new ops to fetch to the tasks. This is in effect the fetch queue. + let (fetch_queue_tx, fetch_queue_rx) = channel::(16_384); + let fetch_queue_rx = Arc::new(tokio::sync::Mutex::new(fetch_queue_rx)); + + let state = Arc::new(Mutex::new(State { + ops: HashSet::new(), + cool_down_list: CoolDownList::new(config.cool_down_interval_ms), + })); + + let mut fetch_tasks = Vec::new(); + for _ in 0..config.parallel_request_count { + let task = tokio::task::spawn(CoreFetch::fetch_task( + state.clone(), + fetch_queue_tx.clone(), + fetch_queue_rx.clone(), + peer_store.clone(), + space_id.clone(), + transport.clone(), + )); + fetch_tasks.push(task); + } + + Self { + state, + fetch_queue_tx, + fetch_tasks, + } + } + + async fn fetch_task( + state: Arc>, + fetch_request_tx: Sender, + fetch_request_rx: Arc>>, + peer_store: peer_store::DynPeerStore, + space_id: SpaceId, + transport: DynTransport, + ) { + while let Some((op_id, agent_id)) = + fetch_request_rx.lock().await.recv().await + { + let is_agent_cooling_down = { + let mut lock = state.lock().unwrap(); + + // Do nothing if op id is no longer in the set of ops to fetch. + if !lock.ops.contains(&(op_id.clone(), agent_id.clone())) { + continue; + } + + lock.cool_down_list.is_agent_cooling_down(&agent_id) + }; + tracing::trace!( + "is agent {agent_id} cooling down {is_agent_cooling_down}" + ); + + // Send request if agent is not on cool-down list. + if !is_agent_cooling_down { + let peer = match CoreFetch::get_peer_url_from_store( + &agent_id, + peer_store.clone(), + ) + .await + { + Some(url) => url, + None => { + let mut lock = state.lock().unwrap(); + lock.ops = lock + .ops + .clone() + .into_iter() + .filter(|(_, a)| *a != agent_id) + .collect(); + continue; + } + }; + + let data = serialize_op_ids(vec![op_id.clone()]); + + // Send fetch request to agent. + match transport + .send_module( + peer, + space_id.clone(), + MOD_NAME.to_string(), + data, + ) + .await + { + Ok(()) => { + // Re-insert the fetch request into the queue. + if let Err(err) = fetch_request_tx + .try_send((op_id.clone(), agent_id.clone())) + { + tracing::warn!("could not re-insert fetch request for op {op_id} to agent {agent_id} into queue: {err}"); + // Remove op id/agent id from set to prevent build-up of state. + state + .lock() + .unwrap() + .ops + .remove(&(op_id, agent_id)); + } + } + Err(err) => { + tracing::warn!("could not send fetch request for op {op_id} to agent {agent_id}: {err}"); + state + .lock() + .unwrap() + .cool_down_list + .add_agent(agent_id.clone()); + // Agent is unresponsive. + // Remove associated op ids from set to prevent build-up of state. + let mut lock = state.lock().unwrap(); + lock.ops = lock + .ops + .clone() + .into_iter() + .filter(|(_, a)| *a != agent_id) + .collect(); + } + } + } + } + } + + async fn get_peer_url_from_store( + agent_id: &AgentId, + peer_store: peer_store::DynPeerStore, + ) -> Option { + match peer_store.get(agent_id.clone()).await { + Ok(Some(peer)) => match &peer.url { + Some(url) => Some(url.clone()), + None => { + tracing::warn!("agent {agent_id} no longer on the network"); + None + } + }, + Ok(None) => { + tracing::warn!( + "could not find agent id {agent_id} in peer store" + ); + None + } + Err(err) => { + tracing::warn!( + "could not get agent id {agent_id} from peer store: {err}" + ); + None + } + } + } +} + +impl Drop for CoreFetch { + fn drop(&mut self) { + for t in self.fetch_tasks.iter() { + t.abort(); + } + } +} + +#[derive(Debug)] +struct CoolDownList { + state: HashMap, + cool_down_interval: u64, +} + +impl CoolDownList { + pub fn new(cool_down_interval: u64) -> Self { + Self { + state: HashMap::new(), + cool_down_interval, + } + } + + pub fn add_agent(&mut self, agent_id: AgentId) { + self.state.insert(agent_id, Instant::now()); + } + + pub fn is_agent_cooling_down(&mut self, agent_id: &AgentId) -> bool { + match self.state.get(agent_id) { + Some(instant) => { + if instant.elapsed().as_millis() + > self.cool_down_interval as u128 + { + // Cool down interval has elapsed. Remove agent from list. + self.state.remove(agent_id); + false + } else { + // Cool down interval has not elapsed, still cooling down. + true + } + } + None => false, + } + } +} + +#[cfg(test)] +mod test; diff --git a/crates/core/src/factories/core_fetch/test.rs b/crates/core/src/factories/core_fetch/test.rs new file mode 100644 index 00000000..351126e4 --- /dev/null +++ b/crates/core/src/factories/core_fetch/test.rs @@ -0,0 +1,600 @@ +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; + +use bytes::Bytes; +use kitsune2_api::{ + fetch::{deserialize_op_ids, Fetch}, + id::Id, + transport::Transport, + AgentId, K2Error, OpId, SpaceId, Url, +}; +use rand::Rng; + +use crate::{default_builder, factories::test_utils::AgentBuilder}; + +use super::{CoreFetch, CoreFetchConfig}; + +#[derive(Debug)] +pub struct MockTransport { + requests_sent: Arc>>, + fail: bool, +} + +impl MockTransport { + fn new(fail: bool) -> Arc { + Arc::new(Self { + requests_sent: Arc::new(Mutex::new(Vec::new())), + fail, + }) + } +} + +impl Transport for MockTransport { + fn send_module( + &self, + peer: kitsune2_api::Url, + _space: kitsune2_api::SpaceId, + _module: String, + data: bytes::Bytes, + ) -> kitsune2_api::BoxFut<'_, kitsune2_api::K2Result<()>> { + Box::pin(async move { + let op_ids = deserialize_op_ids(data).unwrap(); + let mut lock = self.requests_sent.lock().unwrap(); + op_ids.into_iter().for_each(|op_id| { + lock.push((op_id, peer.clone())); + }); + + if self.fail { + Err(K2Error::other("connection timed out")) + } else { + Ok(()) + } + }) + } + + fn disconnect( + &self, + _peer: Url, + _reason: Option, + ) -> kitsune2_api::BoxFut<'_, ()> { + unimplemented!() + } + + fn register_module_handler( + &self, + _space: SpaceId, + _module: String, + _handler: kitsune2_api::transport::DynTxModuleHandler, + ) { + unimplemented!() + } + + fn register_space_handler( + &self, + _space: SpaceId, + _handler: kitsune2_api::transport::DynTxSpaceHandler, + ) { + unimplemented!() + } + + fn send_space_notify( + &self, + _peer: Url, + _space: SpaceId, + _data: bytes::Bytes, + ) -> kitsune2_api::BoxFut<'_, kitsune2_api::K2Result<()>> { + unimplemented!() + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn fetch_queue() { + let builder = Arc::new(default_builder()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let mock_transport = MockTransport::new(false); + let config = CoreFetchConfig::default(); + + let op_id = random_op_id(); + let op_list = vec![op_id.clone()]; + let agent_id = random_agent_id(); + let agent_info = AgentBuilder { + agent: Some(agent_id.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), + ..Default::default() + } + .build(); + let agent_url = agent_info.url.clone().unwrap(); + peer_store.insert(vec![agent_info.clone()]).await.unwrap(); + + let fetch = CoreFetch::new( + config.clone(), + agent_info.space.clone(), + peer_store.clone(), + mock_transport.clone(), + ); + + let requests_sent = mock_transport.requests_sent.lock().unwrap().clone(); + assert!(requests_sent.is_empty()); + + // Add 1 op. + fetch.add_ops(op_list, agent_id.clone()).await.unwrap(); + + // Let the fetch request be sent multiple times. As only 1 op was added to the queue, + // this proves that it is being re-added to the queue after sending a request for it. + tokio::time::timeout(Duration::from_millis(10), async { + loop { + tokio::task::yield_now().await; + if mock_transport.requests_sent.lock().unwrap().len() >= 3 { + break; + } + } + }) + .await + .unwrap(); + + // Clear set of ops to fetch to stop sending requests. + fetch.state.lock().unwrap().ops.clear(); + + let mut num_requests_sent = + mock_transport.requests_sent.lock().unwrap().len(); + + // Wait for tasks to settle all requests. + tokio::time::timeout(Duration::from_millis(20), async { + loop { + tokio::time::sleep(Duration::from_millis(1)).await; + let current_num_requests_sent = + mock_transport.requests_sent.lock().unwrap().len(); + if current_num_requests_sent == num_requests_sent { + break; + } else { + num_requests_sent = current_num_requests_sent; + } + } + }) + .await + .unwrap(); + + // Check that all requests have been made for the 1 op to the agent. + assert!(mock_transport + .requests_sent + .lock() + .unwrap() + .iter() + .all(|request| request == &(op_id.clone(), agent_url.clone()))); + + // Give time for more requests to be sent, which shouldn't happen now that the set of + // ops to fetch is cleared. + tokio::time::sleep(Duration::from_millis(10)).await; + + // No more requests should have been sent. + // Ideally it were possible to check that no more fetch request have been passed back into + // the internal channel, but that would require a custom wrapper around the channel. + let requests_sent = mock_transport.requests_sent.lock().unwrap().clone(); + assert_eq!(requests_sent.len(), num_requests_sent); +} + +#[tokio::test(flavor = "multi_thread")] +async fn happy_multi_op_fetch_from_single_agent() { + let builder = Arc::new(default_builder()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let config = CoreFetchConfig::default(); + let mock_transport = MockTransport::new(false); + + let num_ops: usize = 50; + let op_list = create_op_list(num_ops as u16); + let agent_id = random_agent_id(); + let agent_info = AgentBuilder { + agent: Some(agent_id.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), + ..Default::default() + } + .build(); + let agent_url = agent_info.url.clone().unwrap(); + peer_store.insert(vec![agent_info.clone()]).await.unwrap(); + + let fetch = CoreFetch::new( + config.clone(), + agent_info.space.clone(), + peer_store.clone(), + mock_transport.clone(), + ); + + let mut expected_ops = Vec::new(); + op_list + .clone() + .into_iter() + .for_each(|op_id| expected_ops.push((op_id, agent_id.clone()))); + + fetch + .add_ops(op_list.clone(), agent_id.clone()) + .await + .unwrap(); + + // Check that at least one request was sent to the agent for each op. + tokio::time::timeout(Duration::from_millis(100), async { + loop { + tokio::task::yield_now().await; + let requests_sent = + mock_transport.requests_sent.lock().unwrap().clone(); + if requests_sent.len() >= num_ops { + op_list.clone().into_iter().all(|op_id| { + requests_sent.contains(&(op_id, agent_url.clone())) + }); + break; + } + } + }) + .await + .unwrap(); + + // Check that op ids are still part of ops to fetch. + let lock = fetch.state.lock().unwrap(); + assert!(expected_ops.iter().all(|v| lock.ops.contains(v))); +} + +#[tokio::test(flavor = "multi_thread")] +async fn happy_multi_op_fetch_from_multiple_agents() { + let builder = Arc::new(default_builder()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let config = CoreFetchConfig { + parallel_request_count: 5, + ..Default::default() + }; + let mock_transport = MockTransport::new(false); + let space_id = SpaceId::from(bytes::Bytes::from_static(b"space_1")); + + let op_list_1 = create_op_list(10); + let agent_1 = random_agent_id(); + let op_list_2 = create_op_list(20); + let agent_2 = random_agent_id(); + let op_list_3 = create_op_list(30); + let agent_3 = random_agent_id(); + let total_ops = op_list_1.len() + op_list_2.len() + op_list_3.len(); + + let agent_info_1 = AgentBuilder { + agent: Some(agent_1.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), + space: Some(space_id.clone()), + ..Default::default() + } + .build(); + let agent_info_2 = AgentBuilder { + agent: Some(agent_2.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:2").unwrap())), + space: Some(space_id.clone()), + ..Default::default() + } + .build(); + let agent_info_3 = AgentBuilder { + agent: Some(agent_3.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:3").unwrap())), + space: Some(space_id.clone()), + ..Default::default() + } + .build(); + let agent_url_1 = agent_info_1.url.clone().unwrap(); + let agent_url_2 = agent_info_2.url.clone().unwrap(); + let agent_url_3 = agent_info_3.url.clone().unwrap(); + peer_store + .insert(vec![agent_info_1, agent_info_2, agent_info_3]) + .await + .unwrap(); + let fetch = CoreFetch::new( + config.clone(), + space_id.clone(), + peer_store.clone(), + mock_transport.clone(), + ); + + let mut expected_requests = Vec::new(); + op_list_1 + .clone() + .into_iter() + .for_each(|op_id| expected_requests.push((op_id, agent_url_1.clone()))); + op_list_2 + .clone() + .into_iter() + .for_each(|op_id| expected_requests.push((op_id, agent_url_2.clone()))); + op_list_3 + .clone() + .into_iter() + .for_each(|op_id| expected_requests.push((op_id, agent_url_3.clone()))); + let mut expected_ops = Vec::new(); + op_list_1 + .clone() + .into_iter() + .for_each(|op_id| expected_ops.push((op_id, agent_1.clone()))); + op_list_2 + .clone() + .into_iter() + .for_each(|op_id| expected_ops.push((op_id, agent_2.clone()))); + op_list_3 + .clone() + .into_iter() + .for_each(|op_id| expected_ops.push((op_id, agent_3.clone()))); + + futures::future::join_all([ + fetch.add_ops(op_list_1.clone(), agent_1.clone()), + fetch.add_ops(op_list_2.clone(), agent_2.clone()), + fetch.add_ops(op_list_3.clone(), agent_3.clone()), + ]) + .await; + + // Check that at least one request was sent for each op. + tokio::time::timeout(Duration::from_millis(10), async { + loop { + tokio::task::yield_now().await; + let requests_sent = + mock_transport.requests_sent.lock().unwrap().clone(); + if requests_sent.len() >= total_ops + && expected_requests + .iter() + .all(|expected_op| requests_sent.contains(expected_op)) + { + break; + } + } + }) + .await + .unwrap(); + + // Check that op ids are still part of ops to fetch. + let lock = fetch.state.lock().unwrap(); + assert!(expected_ops.iter().all(|v| lock.ops.contains(v))); +} + +#[tokio::test(flavor = "multi_thread")] +async fn ops_are_cleared_when_agent_not_in_peer_store() { + let builder = Arc::new(default_builder()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let config = CoreFetchConfig::default(); + let mock_transport = MockTransport::new(false); + + let op_list = create_op_list(2); + let agent_id = random_agent_id(); + let agent_info = AgentBuilder { + agent: Some(agent_id.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), + ..Default::default() + } + .build(); + + let fetch = CoreFetch::new( + config.clone(), + agent_info.space.clone(), + peer_store.clone(), + mock_transport.clone(), + ); + + fetch.add_ops(op_list, agent_id).await.unwrap(); + + // Wait for agent to be looked up in peer store. + tokio::time::sleep(Duration::from_millis(10)).await; + + // Check that all op ids for agent have been removed from ops set. + assert!(fetch.state.lock().unwrap().ops.is_empty()); +} + +#[tokio::test(flavor = "multi_thread")] +async fn unresponsive_agents_are_put_on_cool_down_list() { + let builder = Arc::new(default_builder()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let config = CoreFetchConfig::default(); + let mock_transport = MockTransport::new(true); + + let op_list = create_op_list(1); + let agent_id = random_agent_id(); + let agent_info = AgentBuilder { + agent: Some(agent_id.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), + ..Default::default() + } + .build(); + peer_store.insert(vec![agent_info.clone()]).await.unwrap(); + + let fetch = CoreFetch::new( + config.clone(), + agent_info.space.clone(), + peer_store.clone(), + mock_transport.clone(), + ); + + fetch.add_ops(op_list, agent_id.clone()).await.unwrap(); + + tokio::time::timeout(Duration::from_millis(10), async { + loop { + tokio::task::yield_now().await; + if !mock_transport.requests_sent.lock().unwrap().is_empty() + && fetch + .state + .lock() + .unwrap() + .cool_down_list + .is_agent_cooling_down(&agent_id) + { + break; + } + } + }) + .await + .unwrap(); + + // Give time to remove op id from set. + tokio::time::sleep(Duration::from_millis(1)).await; + + // Op should have been removed from ops to fetch. + assert!(fetch.state.lock().unwrap().ops.is_empty()); +} + +#[tokio::test(flavor = "multi_thread")] +async fn add_ops_for_multiple_unresponsive_agents() { + let builder = Arc::new(default_builder()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let config = CoreFetchConfig::default(); + let mock_transport = MockTransport::new(true); + let space_id = SpaceId::from(bytes::Bytes::from_static(b"space_1")); + + let op_list_1 = create_op_list(2); + let agent_1 = random_agent_id(); + let op_list_2 = create_op_list(1); + let agent_2 = random_agent_id(); + let op_list_3 = create_op_list(1); + let agent_3 = random_agent_id(); + + let agent_info_1 = AgentBuilder { + agent: Some(agent_1.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())), + space: Some(space_id.clone()), + ..Default::default() + } + .build(); + let agent_info_2 = AgentBuilder { + agent: Some(agent_2.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:2").unwrap())), + space: Some(space_id.clone()), + ..Default::default() + } + .build(); + let agent_info_3 = AgentBuilder { + agent: Some(agent_3.clone()), + url: Some(Some(Url::from_str("wss://127.0.0.1:3").unwrap())), + space: Some(space_id.clone()), + ..Default::default() + } + .build(); + let agent_url_1 = agent_info_1.url.clone().unwrap(); + let agent_url_2 = agent_info_2.url.clone().unwrap(); + let agent_url_3 = agent_info_3.url.clone().unwrap(); + peer_store + .insert(vec![agent_info_1, agent_info_2, agent_info_3]) + .await + .unwrap(); + + let fetch = CoreFetch::new( + config.clone(), + space_id.clone(), + peer_store.clone(), + mock_transport.clone(), + ); + + // Add all ops to the queue. + futures::future::join_all([ + fetch.add_ops(op_list_1.clone(), agent_1.clone()), + fetch.add_ops(op_list_2.clone(), agent_2.clone()), + fetch.add_ops(op_list_3.clone(), agent_3.clone()), + ]) + .await; + + // Wait for one request for each agent. + let expected_agent_url = [agent_url_1, agent_url_2, agent_url_3]; + let expected_agents = [agent_1, agent_2, agent_3]; + tokio::time::timeout(Duration::from_millis(100), async { + loop { + tokio::time::sleep(Duration::from_millis(1)).await; + let requests_sent = + mock_transport.requests_sent.lock().unwrap().clone(); + let request_destinations = requests_sent + .iter() + .map(|(_, agent_id)| agent_id) + .collect::>(); + if expected_agent_url + .iter() + .all(|agent| request_destinations.contains(&agent)) + { + // Check all agents are on cool-down_list. + let cool_down_list = + &mut fetch.state.lock().unwrap().cool_down_list; + if expected_agents + .iter() + .all(|agent| cool_down_list.is_agent_cooling_down(agent)) + { + break; + } + } + } + }) + .await + .unwrap(); + + // Give time to remove op id from set. + tokio::time::sleep(Duration::from_millis(1)).await; + + // Op should have been removed from ops to fetch. + assert!( + fetch.state.lock().unwrap().ops.is_empty(), + "ops are {:?}", + fetch.state.lock().unwrap().ops.len() + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn agent_cooling_down_is_removed_from_list() { + let builder = Arc::new(default_builder()); + let peer_store = builder.peer_store.create(builder.clone()).await.unwrap(); + let config = CoreFetchConfig { + cool_down_interval_ms: 10, + ..Default::default() + }; + let mock_transport = MockTransport::new(false); + let space_id = SpaceId::from(bytes::Bytes::from_static(b"space_1")); + + let fetch = CoreFetch::new( + config.clone(), + space_id, + peer_store, + mock_transport.clone(), + ); + let agent_id = random_agent_id(); + + fetch + .state + .lock() + .unwrap() + .cool_down_list + .add_agent(agent_id.clone()); + + assert!(fetch + .state + .lock() + .unwrap() + .cool_down_list + .is_agent_cooling_down(&agent_id)); + + // Wait for the cool-down interval + 1 ms to avoid flakiness. + tokio::time::sleep(Duration::from_millis(config.cool_down_interval_ms + 1)) + .await; + + assert!(!fetch + .state + .lock() + .unwrap() + .cool_down_list + .is_agent_cooling_down(&agent_id)); +} + +fn random_id() -> Id { + let mut rng = rand::thread_rng(); + let mut bytes = [0u8; 32]; + rng.fill(&mut bytes); + let bytes = Bytes::from(bytes.to_vec()); + Id(bytes) +} + +fn random_op_id() -> OpId { + OpId(random_id()) +} + +fn random_agent_id() -> AgentId { + AgentId(random_id()) +} + +fn create_op_list(num_ops: u16) -> Vec { + let mut ops = Vec::new(); + for _ in 0..num_ops { + let op = random_op_id(); + ops.push(op.clone()); + } + ops +} diff --git a/crates/core/src/factories/mem_peer_store.rs b/crates/core/src/factories/mem_peer_store.rs index 034e20af..22ace581 100644 --- a/crates/core/src/factories/mem_peer_store.rs +++ b/crates/core/src/factories/mem_peer_store.rs @@ -256,3 +256,7 @@ impl Inner { #[cfg(test)] mod test; + +/// Peer store related utilities for testing. +#[cfg(test)] +pub mod test_utils; diff --git a/crates/core/src/factories/mem_peer_store/test.rs b/crates/core/src/factories/mem_peer_store/test.rs index 72408604..6f996529 100644 --- a/crates/core/src/factories/mem_peer_store/test.rs +++ b/crates/core/src/factories/mem_peer_store/test.rs @@ -1,4 +1,4 @@ -use super::*; +use super::{test_utils::AgentBuilder, *}; use kitsune2_api::id::Id; #[inline(always)] @@ -8,7 +8,6 @@ fn create() -> Inner { const AGENT_1: AgentId = AgentId(Id(bytes::Bytes::from_static(b"agent1"))); const AGENT_2: AgentId = AgentId(Id(bytes::Bytes::from_static(b"agent2"))); -const SPACE_1: SpaceId = SpaceId(Id(bytes::Bytes::from_static(b"space1"))); /// Sneak some test-data into the url field (as the peer id) /// this will let us validate store actions when we extract @@ -23,68 +22,6 @@ fn unsneak_url(u: &Url) -> String { u.peer_id().unwrap().into() } -#[derive(Debug, Default)] -struct AgentBuild { - pub agent: Option, - pub space: Option, - pub created_at: Option, - pub expires_at: Option, - pub is_tombstone: Option, - pub url: Option>, - pub storage_arc: Option, -} - -impl AgentBuild { - pub fn build(self) -> Arc { - static NEXT_AGENT: std::sync::atomic::AtomicU64 = - std::sync::atomic::AtomicU64::new(1); - let agent = self.agent.unwrap_or_else(|| { - let a = - NEXT_AGENT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let a = a.to_le_bytes(); - AgentId(Id(bytes::Bytes::copy_from_slice(&a))) - }); - let space = self.space.unwrap_or_else(|| SPACE_1.clone()); - let created_at = self.created_at.unwrap_or_else(Timestamp::now); - let expires_at = self.expires_at.unwrap_or_else(|| { - created_at + std::time::Duration::from_secs(60 * 20) - }); - let is_tombstone = self.is_tombstone.unwrap_or(false); - let url = self.url.unwrap_or(None); - let storage_arc = self.storage_arc.unwrap_or(DhtArc::FULL); - let agent_info = serde_json::to_string(&AgentInfo { - agent, - space, - created_at, - expires_at, - is_tombstone, - url, - storage_arc, - }) - .unwrap(); - #[derive(serde::Serialize)] - #[serde(rename_all = "camelCase")] - struct Enc { - agent_info: String, - signature: String, - } - let encoded = serde_json::to_string(&Enc { - agent_info, - signature: "".into(), - }) - .unwrap(); - #[derive(Debug)] - struct V; - impl Verifier for V { - fn verify(&self, _i: &AgentInfo, _m: &[u8], _s: &[u8]) -> bool { - true - } - } - // going through this trouble to use decode because it's sync - AgentInfoSigned::decode(&V, encoded.as_bytes()).unwrap() - } -} - #[test] fn empty_store() { let mut s = create(); @@ -97,12 +34,12 @@ fn prune_prunes_only_expired_agents() { let mut s = create(); s.insert(vec![ - AgentBuild { + AgentBuilder { expires_at: Some(Timestamp::now()), ..Default::default() } .build(), - AgentBuild { + AgentBuilder { expires_at: Some( Timestamp::now() + std::time::Duration::from_secs(10), ), @@ -123,7 +60,7 @@ fn prune_prunes_only_expired_agents() { fn happy_get() { let mut s = create(); - s.insert(vec![AgentBuild { + s.insert(vec![AgentBuilder { agent: Some(AGENT_1), ..Default::default() } @@ -138,12 +75,12 @@ fn happy_get_all() { let mut s = create(); s.insert(vec![ - AgentBuild { + AgentBuilder { agent: Some(AGENT_1), ..Default::default() } .build(), - AgentBuild { + AgentBuilder { agent: Some(AGENT_2), ..Default::default() } @@ -196,7 +133,7 @@ fn fixture_get_by_overlapping_storage_arc() { let mut s = create(); for (arc_name, arc) in arc_list.iter() { - s.insert(vec![AgentBuild { + s.insert(vec![AgentBuilder { storage_arc: Some(*arc), url: Some(Some(sneak_url(arc_name))), ..Default::default() @@ -222,7 +159,7 @@ fn fixture_get_near_location() { for idx in 0..8 { let loc = (u32::MAX / 8) * idx; - s.insert(vec![AgentBuild { + s.insert(vec![AgentBuilder { // for simplicity have agents claim arcs of len 1 storage_arc: Some(DhtArc::Arc(loc, loc + 1)), // set the url to the idx for matching @@ -234,19 +171,19 @@ fn fixture_get_near_location() { // these should not be returned because they are invalid. s.insert(vec![ - AgentBuild { + AgentBuilder { storage_arc: Some(DhtArc::Empty), url: Some(Some(sneak_url("zero-arc"))), ..Default::default() } .build(), - AgentBuild { + AgentBuilder { is_tombstone: Some(true), url: Some(Some(sneak_url("tombstone"))), ..Default::default() } .build(), - AgentBuild { + AgentBuilder { expires_at: Some(Timestamp::from_micros( Timestamp::now().as_micros() - std::time::Duration::from_secs(10).as_micros() as i64, diff --git a/crates/core/src/factories/mem_peer_store/test_utils.rs b/crates/core/src/factories/mem_peer_store/test_utils.rs new file mode 100644 index 00000000..728ba2de --- /dev/null +++ b/crates/core/src/factories/mem_peer_store/test_utils.rs @@ -0,0 +1,80 @@ +use std::sync::Arc; + +use kitsune2_api::{ + agent::{AgentInfo, AgentInfoSigned, Verifier}, + id::Id, + AgentId, DhtArc, SpaceId, Timestamp, Url, +}; + +const SPACE_1: SpaceId = SpaceId(Id(bytes::Bytes::from_static(b"space1"))); + +/// Agent builder for testing. +#[derive(Debug, Default)] +pub struct AgentBuilder { + /// Optional agent id. + pub agent: Option, + /// Optional space id. + pub space: Option, + /// Optional created at timestamp. + pub created_at: Option, + /// Optional expires at timestamp. + pub expires_at: Option, + /// Optional tombstone flag. + pub is_tombstone: Option, + /// Optional peer url. + pub url: Option>, + /// Optional storage arc. + pub storage_arc: Option, +} + +impl AgentBuilder { + /// Build an agent from given values or defaults. + pub fn build(self) -> Arc { + static NEXT_AGENT: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(1); + let agent = self.agent.unwrap_or_else(|| { + let a = + NEXT_AGENT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let a = a.to_le_bytes(); + AgentId(Id(bytes::Bytes::copy_from_slice(&a))) + }); + let space = self.space.unwrap_or_else(|| SPACE_1.clone()); + let created_at = self.created_at.unwrap_or_else(Timestamp::now); + let expires_at = self.expires_at.unwrap_or_else(|| { + created_at + std::time::Duration::from_secs(60 * 20) + }); + let is_tombstone = self.is_tombstone.unwrap_or(false); + let url = self.url.unwrap_or(None); + let storage_arc = self.storage_arc.unwrap_or(DhtArc::FULL); + let agent_info = serde_json::to_string(&AgentInfo { + agent, + space, + created_at, + expires_at, + is_tombstone, + url, + storage_arc, + }) + .unwrap(); + #[derive(serde::Serialize)] + #[serde(rename_all = "camelCase")] + struct Enc { + agent_info: String, + signature: String, + } + let encoded = serde_json::to_string(&Enc { + agent_info, + signature: "".into(), + }) + .unwrap(); + #[derive(Debug)] + struct V; + impl Verifier for V { + fn verify(&self, _i: &AgentInfo, _m: &[u8], _s: &[u8]) -> bool { + true + } + } + // going through this trouble to use decode because it's sync + AgentInfoSigned::decode(&V, encoded.as_bytes()).unwrap() + } +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 5e767481..979e5724 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -45,6 +45,7 @@ impl agent::Verifier for Ed25519Verifier { /// - `space` - The default space module is [factories::CoreSpaceFactory]. /// - `peer_store` - The default peer store is [factories::MemPeerStoreFactory]. /// - `bootstrap` - The default bootstrap is [factories::MemBootstrapFactory]. +/// - `fetch` - The default fetch module is [factories::CoreFetchFactory]. /// - `transport` - The default transport is [factories::MemTransportFactory]. pub fn default_builder() -> Builder { Builder { @@ -54,6 +55,7 @@ pub fn default_builder() -> Builder { space: factories::CoreSpaceFactory::create(), peer_store: factories::MemPeerStoreFactory::create(), bootstrap: factories::MemBootstrapFactory::create(), + fetch: factories::CoreFetchFactory::create(), transport: factories::MemTransportFactory::create(), } } diff --git a/crates/tool_proto_build/src/main.rs b/crates/tool_proto_build/src/main.rs index 04c30eea..bbf0ac81 100644 --- a/crates/tool_proto_build/src/main.rs +++ b/crates/tool_proto_build/src/main.rs @@ -2,6 +2,9 @@ fn main() { std::env::set_var("OUT_DIR", "../api/proto/gen"); prost_build::Config::new() .bytes(["."]) - .compile_protos(&["../api/proto/wire.proto"], &["../api/proto/"]) + .compile_protos( + &["../api/proto/wire.proto", "../api/proto/fetch.proto"], + &["../api/proto/"], + ) .expect("Failed to compile protobuf protocol files"); }