Skip to content

Commit

Permalink
Merge branch 'main' into bootstrap-cli
Browse files Browse the repository at this point in the history
  • Loading branch information
neonphog committed Dec 17, 2024
2 parents ea9001a + 9a143a5 commit 7d2fe0c
Show file tree
Hide file tree
Showing 17 changed files with 1,239 additions and 92 deletions.
29 changes: 15 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"crates/test_utils",
"crates/tool_proto_build",
]

resolver = "2"

[workspace.dependencies]
Expand Down Expand Up @@ -75,6 +76,4 @@ kitsune2_memory = { path = "crates/memory" }
kitsune2_test_utils = { path = "crates/test_utils" }

rand = "0.8.5"
# this is also used by the binary bootstrap_srv. But, since this monorepo
# is largely libraries, leaving this in this section.
tracing-subscriber = "0.3"
9 changes: 9 additions & 0 deletions crates/api/proto/fetch.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
syntax = "proto3";

package kitsune2.fetch;

// A list of op ids.
message OpIds {
// Op ids.
repeated bytes data = 1;
}
8 changes: 8 additions & 0 deletions crates/api/proto/gen/kitsune2.fetch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// This file is @generated by prost-build.
/// A list of op ids.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct OpIds {
/// Op ids.
#[prost(bytes = "bytes", repeated, tag = "1")]
pub data: ::prost::alloc::vec::Vec<::prost::bytes::Bytes>,
}
6 changes: 6 additions & 0 deletions crates/api/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ pub struct Builder {
/// [bootstrap::Bootstrap] instances for initial WAN discovery.
pub bootstrap: bootstrap::DynBootstrapFactory,

/// The [fetch::FetchFactory] to be used for creating
/// [fetch::Fetch] instances.
pub fetch: fetch::DynFetchFactory,

/// The [transport::TransportFactory] to be used for creating
/// [transport::Transport] instances.
pub transport: transport::DynTransportFactory,
Expand All @@ -48,13 +52,15 @@ impl Builder {
space,
peer_store,
bootstrap,
fetch,
transport,
} = self;

kitsune.default_config(config)?;
space.default_config(config)?;
peer_store.default_config(config)?;
bootstrap.default_config(config)?;
fetch.default_config(config)?;
transport.default_config(config)?;

Ok(())
Expand Down
104 changes: 104 additions & 0 deletions crates/api/src/fetch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//! Kitsune2 fetch types.
use std::sync::Arc;

use prost::Message;

use crate::{
builder, config, peer_store::DynPeerStore, transport::DynTransport,
AgentId, BoxFut, K2Error, K2Result, OpId, SpaceId,
};

include!("../proto/gen/kitsune2.fetch.rs");

impl From<Vec<OpId>> for OpIds {
fn from(value: Vec<OpId>) -> Self {
Self {
data: value.into_iter().map(Into::into).collect(),
}
}
}

impl From<OpIds> for Vec<OpId> {
fn from(value: OpIds) -> Self {
value.data.into_iter().map(Into::into).collect()
}
}

/// Serialize list of op ids for sending over the wire.
pub fn serialize_op_ids(value: Vec<OpId>) -> bytes::Bytes {
let bytes = OpIds::from(value).encode_to_vec();
bytes::Bytes::copy_from_slice(&bytes)
}

/// Deserialize list of op ids.
pub fn deserialize_op_ids(value: bytes::Bytes) -> K2Result<Vec<OpId>> {
let op_ids = OpIds::decode(value).map_err(K2Error::other)?;
let vec = Vec::from(op_ids);
Ok(vec)
}

/// Trait for implementing a fetch module to fetch ops from other agents.
pub trait Fetch: 'static + Send + Sync + std::fmt::Debug {
/// Add op ids to be fetched.
fn add_ops(
&self,
op_list: Vec<OpId>,
source: AgentId,
) -> BoxFut<'_, K2Result<()>>;
}

/// Trait object [Fetch].
pub type DynFetch = Arc<dyn Fetch>;

/// A factory for creating Fetch instances.
pub trait FetchFactory: 'static + Send + Sync + std::fmt::Debug {
/// Help the builder construct a default config from the chosen
/// module factories.
fn default_config(&self, config: &mut config::Config) -> K2Result<()>;

/// Construct a Fetch instance.
fn create(
&self,
builder: Arc<builder::Builder>,
space_id: SpaceId,
peer_store: DynPeerStore,
transport: DynTransport,
) -> BoxFut<'static, K2Result<DynFetch>>;
}

/// Trait object [FetchFactory].
pub type DynFetchFactory = Arc<dyn FetchFactory>;

#[cfg(test)]
mod test {
use super::*;
use prost::Message;

#[test]
fn happy_encode_decode() {
let op_id_1 = OpId::from(bytes::Bytes::from_static(b"some_op_id"));
let op_id_2 = OpId::from(bytes::Bytes::from_static(b"another_op_id"));
let op_id_vec = vec![op_id_1, op_id_2];
let op_ids = OpIds::from(op_id_vec.clone());

let op_ids_enc = op_ids.encode_to_vec();
let op_ids_dec = OpIds::decode(op_ids_enc.as_slice()).unwrap();
let op_ids_dec_vec = Vec::from(op_ids_dec.clone());

assert_eq!(op_ids, op_ids_dec);
assert_eq!(op_id_vec, op_ids_dec_vec);
}

#[test]
fn bytes_from_op_ids() {
let op_id_1 = OpId::from(bytes::Bytes::from_static(b"some_op_id"));
let op_id_2 = OpId::from(bytes::Bytes::from_static(b"another_op_id"));
let op_id_vec = vec![op_id_1, op_id_2];

let bytes = serialize_op_ids(op_id_vec.clone());
let op_id_vec_deserialized = deserialize_op_ids(bytes.clone()).unwrap();

assert_eq!(op_id_vec_deserialized, op_id_vec);
}
}
2 changes: 2 additions & 0 deletions crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub use id::{AgentId, OpId, SpaceId};
mod timestamp;
pub use timestamp::*;

pub mod fetch;

pub mod op_store;
pub use op_store::*;

Expand Down
2 changes: 1 addition & 1 deletion crates/api/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub trait TxImp: 'static + Send + Sync + std::fmt::Debug {
pub type DynTxImp = Arc<dyn TxImp>;

/// A high-level wrapper around a low-level [DynTxImp] transport implementation.
pub trait Transport {
pub trait Transport: Send + Sync {
/// Register a space handler for receiving incoming notifications.
///
/// Panics if you attempt to register a duplicate handler for
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bytes = { workspace = true }
ed25519-dalek = { workspace = true }
futures = { workspace = true }
kitsune2_api = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "sync", "time"] }
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/factories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,8 @@ pub use mem_bootstrap::*;
mod core_bootstrap;
pub use core_bootstrap::*;

mod core_fetch;
pub use core_fetch::*;

mod mem_transport;
pub use mem_transport::*;
Loading

0 comments on commit 7d2fe0c

Please sign in to comment.