Skip to content

Commit

Permalink
Merge branch 'main' into go-go-gossip
Browse files Browse the repository at this point in the history
  • Loading branch information
ThetaSinner committed Jan 14, 2025
2 parents a38617e + afbf978 commit 3e0d930
Show file tree
Hide file tree
Showing 17 changed files with 382 additions and 114 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/api/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ pub struct Builder {
/// [op_store::OpStore] instances.
pub op_store: op_store::DynOpStoreFactory,

/// The [peer_meta_store::PeerMetaStoreFactory] to be used for creating
/// [peer_meta_store::PeerMetaStore] instances.
pub meta_store: peer_meta_store::DynPeerMetaStoreFactory,

/// The [gossip::GossipFactory] to be used for creating
/// [gossip::Gossip] instances.
pub gossip: gossip::DynGossipFactory,
Expand All @@ -64,6 +68,7 @@ impl Builder {
fetch,
transport,
op_store,
meta_store,
gossip,
} = &mut self;

Expand All @@ -74,6 +79,7 @@ impl Builder {
fetch.default_config(config)?;
transport.default_config(config)?;
op_store.default_config(config)?;
meta_store.default_config(config)?;
gossip.default_config(config)?;

config.mark_defaults_set();
Expand Down
7 changes: 7 additions & 0 deletions crates/api/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::sync::Arc;

use bytes::Bytes;
use prost::Message;

use crate::{
Expand All @@ -25,6 +26,12 @@ impl From<OpIds> for Vec<OpId> {
}
}

impl From<Ops> for Vec<Bytes> {
fn from(value: Ops) -> Self {
value.op_list.into_iter().map(|op| op.data).collect()
}
}

impl From<Vec<MetaOp>> for Ops {
fn from(value: Vec<MetaOp>) -> Self {
Self {
Expand Down
3 changes: 3 additions & 0 deletions crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ pub use timestamp::*;

pub mod fetch;

mod peer_meta_store;
pub use peer_meta_store::*;

mod gossip;
pub use gossip::*;

Expand Down
9 changes: 8 additions & 1 deletion crates/api/src/op_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::{
builder, config, BoxFut, DhtArc, K2Result, OpId, SpaceId, Timestamp,
};
use bytes::Bytes;
use futures::future::BoxFuture;
use std::cmp::Ordering;
use std::sync::Arc;
Expand All @@ -29,6 +30,12 @@ impl From<MetaOp> for Op {
}
}

impl From<MetaOp> for Bytes {
fn from(value: MetaOp) -> Self {
value.op_data.into()
}
}

/// An op that has been stored by the Kitsune host.
///
/// This is the basic unit of data that the host is expected to store. Whether that storage is
Expand Down Expand Up @@ -68,7 +75,7 @@ pub trait OpStore: 'static + Send + Sync + std::fmt::Debug {
/// if it is able to process them.
fn process_incoming_ops(
&self,
op_list: Vec<MetaOp>,
op_list: Vec<Bytes>,
) -> BoxFuture<'_, K2Result<()>>;

/// Retrieve a batch of ops from the host by time range.
Expand Down
55 changes: 55 additions & 0 deletions crates/api/src/peer_meta_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use crate::{builder, config, AgentId, BoxFut, K2Result, SpaceId, Timestamp};
use futures::future::BoxFuture;
use std::sync::Arc;

/// A store for peer metadata.
///
/// This is expected to be backed by a key-value store that keys by space, agent_id and key.
pub trait PeerMetaStore: 'static + Send + Sync + std::fmt::Debug {
/// Store a key-value pair for a given space and agent.
fn put(
&self,
space: SpaceId,
agent: AgentId,
key: String,
value: bytes::Bytes,
expiry: Option<Timestamp>,
) -> BoxFuture<'_, K2Result<()>>;

/// Get a value by key for a given space and agent.
fn get(
&self,
space: SpaceId,
agent: AgentId,
key: String,
) -> BoxFuture<'_, K2Result<Option<bytes::Bytes>>>;

/// Delete a key-value pair for a given space and agent.
fn delete(
&self,
space: SpaceId,
agent: AgentId,
key: String,
) -> BoxFuture<'_, K2Result<()>>;
}

/// Trait-object version of kitsune2 [PeerMetaStore].
pub type DynPeerMetaStore = Arc<dyn PeerMetaStore>;

/// A factory for constructing [PeerMetaStore] instances.
pub trait PeerMetaStoreFactory:
'static + Send + Sync + std::fmt::Debug
{
/// Help the builder construct a default config from the chosen
/// module factories.
fn default_config(&self, config: &mut config::Config) -> K2Result<()>;

/// Construct a meta store instance.
fn create(
&self,
builder: Arc<builder::Builder>,
) -> BoxFut<'static, K2Result<DynPeerMetaStore>>;
}

/// Trait-object [PeerMetaStoreFactory].
pub type DynPeerMetaStoreFactory = Arc<dyn PeerMetaStoreFactory>;
3 changes: 3 additions & 0 deletions crates/core/src/factories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub use mem_bootstrap::MemBootstrapFactory;
pub mod core_bootstrap;
pub use core_bootstrap::CoreBootstrapFactory;

mod mem_peer_meta_store;
pub use mem_peer_meta_store::*;

pub mod core_fetch;
pub use core_fetch::CoreFetchFactory;

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/factories/core_fetch/test/request_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async fn fetch_queue() {
let mut num_requests_sent = requests_sent.lock().unwrap().len();

// Wait for tasks to settle all requests.
tokio::time::timeout(Duration::from_millis(20), async {
tokio::time::timeout(Duration::from_millis(30), async {
loop {
tokio::time::sleep(Duration::from_millis(1)).await;
let current_num_requests_sent = requests_sent.lock().unwrap().len();
Expand Down
51 changes: 18 additions & 33 deletions crates/core/src/factories/core_fetch/test/response_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
Kitsune2MemoryOp, MemOpStoreFactory,
},
};
use bytes::Bytes;
use kitsune2_api::{
fetch::{serialize_op_ids, Ops},
id::Id,
Expand All @@ -19,9 +20,9 @@ use std::{
time::Duration,
};

type ResponsesSent = Vec<(Vec<MetaOp>, Url)>;
type ResponsesSent = Vec<(Vec<Bytes>, Url)>;

const SPACE_ID: SpaceId = SpaceId(Id(bytes::Bytes::from_static(b"space_id")));
const SPACE_ID: SpaceId = SpaceId(Id(Bytes::from_static(b"space_id")));

fn hash_op(input: &bytes::Bytes) -> OpId {
use sha2::{Digest, Sha256};
Expand All @@ -32,17 +33,9 @@ fn hash_op(input: &bytes::Bytes) -> OpId {
hash_bytes.into()
}

fn make_op(data: Vec<u8>) -> MetaOp {
fn make_op(data: Vec<u8>) -> Kitsune2MemoryOp {
let op_id = hash_op(&data.clone().into());
MetaOp {
op_id: op_id.clone(),
op_data: serde_json::to_vec(&Kitsune2MemoryOp::new(
op_id,
Timestamp::now(),
data,
))
.unwrap(),
}
Kitsune2MemoryOp::new(op_id, Timestamp::now(), data)
}

fn make_mock_transport(
Expand All @@ -57,21 +50,12 @@ fn make_mock_transport(
move |peer, space, module, data| {
assert_eq!(space, SPACE_ID);
assert_eq!(module, crate::factories::core_fetch::MOD_NAME);
let ops = Ops::decode(data).unwrap();
let ops = ops
let ops = Ops::decode(data)
.unwrap()
.op_list
.into_iter()
.map(|op| {
let op_data =
serde_json::from_slice::<Kitsune2MemoryOp>(&op.data)
.unwrap();
let op_id = hash_op(&bytes::Bytes::from(op_data.payload));
MetaOp {
op_id,
op_data: op.data.into(),
}
})
.collect();
.map(|op| op.data)
.collect::<Vec<_>>();
responses_sent.lock().unwrap().push((ops, peer));
Box::pin(async move { Ok(()) })
}
Expand Down Expand Up @@ -101,11 +85,12 @@ async fn respond_to_multiple_requests() {
let op_2 = make_op(vec![2; 128]);
let op_3 = make_op(vec![3; 128]);
// Insert op 1, 2, 3 into op store. Op 4 will not be returned in the response.
let stored_op = vec![op_1.clone(), op_2.clone(), op_3.clone()];
op_store
.process_incoming_ops(stored_op.clone())
.await
.unwrap();
let stored_ops = vec![
op_1.clone().into(),
op_2.clone().into(),
op_3.clone().into(),
];
op_store.process_incoming_ops(stored_ops).await.unwrap();

let fetch = CoreFetch::new(
config.clone(),
Expand Down Expand Up @@ -152,12 +137,12 @@ async fn respond_to_multiple_requests() {
assert!(responses_sent
.lock()
.unwrap()
.contains(&(vec![op_1, op_2], agent_url_1)));
.contains(&(vec![op_1.into(), op_2.into()], agent_url_1)));
// Only op 3 is in op store.
assert!(responses_sent
.lock()
.unwrap()
.contains(&(vec![op_3], agent_url_2)));
.contains(&(vec![op_3.into()], agent_url_2)));
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -252,7 +237,7 @@ async fn fail_to_respond_once_then_succeed() {

let op = make_op(vec![1; 128]);
op_store
.process_incoming_ops(vec![op.clone()])
.process_incoming_ops(vec![op.clone().into()])
.await
.unwrap();
let agent_url = Url::from_str("wss://127.0.0.1:1").unwrap();
Expand Down
32 changes: 12 additions & 20 deletions crates/core/src/factories/mem_op_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! The mem op store implementation provided by Kitsune2.
use crate::factories::mem_op_store::time_slice_hash_store::TimeSliceHashStore;
use bytes::Bytes;
use futures::future::BoxFuture;
use kitsune2_api::builder::Builder;
use kitsune2_api::config::Config;
Expand Down Expand Up @@ -77,27 +78,18 @@ impl From<Kitsune2MemoryOp> for StoredOp {
}
}

impl TryFrom<MetaOp> for Kitsune2MemoryOp {
type Error = serde_json::Error;

fn try_from(value: MetaOp) -> serde_json::Result<Self> {
let op = serde_json::from_slice(value.op_data.as_slice())?;

Ok(op)
impl From<Bytes> for Kitsune2MemoryOp {
fn from(value: Bytes) -> Self {
serde_json::from_slice(&value)
.expect("failed to deserialize Kitsune2MemoryOp from Op")
}
}

impl TryFrom<Kitsune2MemoryOp> for MetaOp {
type Error = K2Error;

fn try_from(value: Kitsune2MemoryOp) -> K2Result<Self> {
let op_data = serde_json::to_vec(&value).map_err(|e| {
K2Error::other_src("Failed to serialize Kitsune2MemoryOp", e)
})?;
Ok(MetaOp {
op_id: value.op_id,
op_data,
})
impl From<Kitsune2MemoryOp> for Bytes {
fn from(value: Kitsune2MemoryOp) -> Self {
serde_json::to_vec(&value)
.expect("failed to serialize Op to Kitsune2MemoryOp")
.into()
}
}

Expand Down Expand Up @@ -133,13 +125,13 @@ struct Kitsune2MemoryOpStoreInner {
impl OpStore for Kitsune2MemoryOpStore {
fn process_incoming_ops(
&self,
op_list: Vec<MetaOp>,
op_list: Vec<Bytes>,
) -> BoxFuture<'_, K2Result<()>> {
Box::pin(async move {
let ops_to_add = op_list
.iter()
.map(|op| -> serde_json::Result<(OpId, Kitsune2MemoryOp)> {
let op = Kitsune2MemoryOp::try_from(op.clone())?;
let op = Kitsune2MemoryOp::from(op.clone());
Ok((op.op_id.clone(), op))
})
.collect::<Result<Vec<_>, _>>().map_err(|e| {
Expand Down
Loading

0 comments on commit 3e0d930

Please sign in to comment.