Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bootstrap and Re-Sign Agent Infos Before they Expire #61

Merged
merged 23 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 139 additions & 7 deletions crates/core/src/factories/core_space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,57 @@ use std::sync::{Arc, Mutex, Weak};

mod protocol;

/// CoreSpace configuration types.
pub mod config {
/// Configuration parameters for [CoreSpaceFactory](super::CoreSpaceFactory).
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CoreSpaceConfig {
/// The interval in millis at which we check for about to expire
/// local agent infos.
///
/// Default: 60s.
neonphog marked this conversation as resolved.
Show resolved Hide resolved
pub re_sign_freq_ms: u32,

/// The time in millis before an agent info expires, after which we will
/// re sign them.
///
/// Default: 5m.
neonphog marked this conversation as resolved.
Show resolved Hide resolved
pub re_sign_expire_time_ms: u32,
jost-s marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for CoreSpaceConfig {
fn default() -> Self {
Self {
re_sign_freq_ms: 1000 * 60,
re_sign_expire_time_ms: 1000 * 60 * 5,
}
}
}

impl CoreSpaceConfig {
/// Get re_sign_freq as a [std::time::Duration].
pub fn re_sign_freq(&self) -> std::time::Duration {
std::time::Duration::from_millis(self.re_sign_freq_ms as u64)
}

/// Get re_sign_expire_time_ms as a [std::time::Duration].
pub fn re_sign_expire_time_ms(&self) -> std::time::Duration {
std::time::Duration::from_millis(self.re_sign_expire_time_ms as u64)
}
}

/// Module-level configuration for CoreSpace.
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CoreSpaceModConfig {
/// CoreSpace configuration.
pub core_space: CoreSpaceConfig,
}
}

use config::*;

/// The core space implementation provided by Kitsune2.
/// You probably will have no reason to use something other than this.
/// This abstraction is mainly here for testing purposes.
Expand All @@ -20,8 +71,8 @@ impl CoreSpaceFactory {
}

impl SpaceFactory for CoreSpaceFactory {
fn default_config(&self, _config: &mut Config) -> K2Result<()> {
Ok(())
fn default_config(&self, config: &mut Config) -> K2Result<()> {
config.set_module_config(&CoreSpaceModConfig::default())
}

fn create(
Expand All @@ -32,7 +83,13 @@ impl SpaceFactory for CoreSpaceFactory {
tx: transport::DynTransport,
) -> BoxFut<'static, K2Result<DynSpace>> {
Box::pin(async move {
let config: CoreSpaceModConfig =
builder.config.get_module_config()?;
let peer_store = builder.peer_store.create(builder.clone()).await?;
let bootstrap = builder
.bootstrap
.create(builder.clone(), peer_store.clone(), space.clone())
.await?;
let inner = Arc::new(Mutex::new(InnerData {
local_agent_map: std::collections::HashMap::new(),
current_url: None,
Expand All @@ -43,7 +100,14 @@ impl SpaceFactory for CoreSpaceFactory {
Arc::new(TxHandlerTranslator(handler, this.clone())),
);
inner.lock().unwrap().current_url = current_url;
CoreSpace::new(space, tx, peer_store, inner)
CoreSpace::new(
config.core_space,
space,
tx,
peer_store,
bootstrap,
inner,
)
});
Ok(out)
})
Expand Down Expand Up @@ -92,7 +156,15 @@ struct CoreSpace {
space: SpaceId,
tx: transport::DynTransport,
peer_store: peer_store::DynPeerStore,
bootstrap: bootstrap::DynBootstrap,
inner: Arc<Mutex<InnerData>>,
task_check_agent_infos: tokio::task::JoinHandle<()>,
}

impl Drop for CoreSpace {
fn drop(&mut self) {
self.task_check_agent_infos.abort();
}
}

impl std::fmt::Debug for CoreSpace {
Expand All @@ -105,16 +177,25 @@ impl std::fmt::Debug for CoreSpace {

impl CoreSpace {
pub fn new(
config: CoreSpaceConfig,
space: SpaceId,
tx: transport::DynTransport,
peer_store: peer_store::DynPeerStore,
bootstrap: bootstrap::DynBootstrap,
inner: Arc<Mutex<InnerData>>,
) -> Self {
let task_check_agent_infos = tokio::task::spawn(check_agent_infos(
config,
peer_store.clone(),
inner.clone(),
));
Self {
space,
tx,
peer_store,
bootstrap,
inner,
task_check_agent_infos,
}
}

Expand Down Expand Up @@ -155,11 +236,13 @@ impl Space for CoreSpace {
let space = self.space.clone();
let local_agent2 = local_agent.clone();
let peer_store = self.peer_store.clone();
let bootstrap = self.bootstrap.clone();
local_agent.register_cb(Arc::new(move || {
let inner = inner.clone();
let space = space.clone();
let local_agent2 = local_agent2.clone();
let peer_store = peer_store.clone();
let bootstrap = bootstrap.clone();
tokio::task::spawn(async move {
// TODO - call an update function on the gossip module.
// TODO - call an update function on the sharding module.
Expand Down Expand Up @@ -198,14 +281,17 @@ impl Space for CoreSpace {
};

// add it to the peer_store.
if let Err(err) = peer_store.insert(vec![info]).await {
if let Err(err) =
peer_store.insert(vec![info.clone()]).await
{
tracing::warn!(
?err,
"failed to add agent info to peer store"
);
}

// TODO - send the new agent info to bootstrap.
// add it to bootstrapping.
bootstrap.put(info);
}
});
}));
Expand Down Expand Up @@ -260,14 +346,17 @@ impl Space for CoreSpace {
Ok(info) => info,
};

if let Err(err) = self.peer_store.insert(vec![info]).await {
if let Err(err) =
self.peer_store.insert(vec![info.clone()]).await
{
tracing::warn!(
?err,
"failed to tombstone agent info in peer store"
);
}

// TODO - send the tombstone info to bootstrap.
// also send the tombstone to the bootstrap server
self.bootstrap.put(info);
}
})
}
Expand Down Expand Up @@ -312,5 +401,48 @@ impl Space for CoreSpace {
}
}

async fn check_agent_infos(
config: CoreSpaceConfig,
peer_store: peer_store::DynPeerStore,
inner: Arc<Mutex<InnerData>>,
) {
loop {
// only check at this rate
tokio::time::sleep(config.re_sign_freq()).await;

// only re-sign if they expire within this time
let cutoff = Timestamp::now() + config.re_sign_expire_time_ms();

// get all the local agents
let agents = inner
.lock()
.unwrap()
.local_agent_map
.values()
.cloned()
.collect::<Vec<_>>();

for agent in agents {
// is this agent going to expire?
let should_re_sign = match peer_store
.get(agent.agent().clone())
.await
{
Ok(Some(info)) => info.expires_at <= cutoff,
Ok(None) => true,
Err(err) => {
tracing::debug!(?err, "error fetching agent in re-signing before expiry logic");
true
}
};

if should_re_sign {
// if so, re-sign it
agent.invoke_cb();
}
}
}
}

#[cfg(test)]
mod test;
91 changes: 91 additions & 0 deletions crates/core/src/factories/core_space/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,94 @@ async fn space_notify_send_recv() {
assert_eq!(TEST_SPACE, s);
assert_eq!("world", String::from_utf8_lossy(&d));
}

// this is a bit of an integration test...
// but the space module is a bit of an integration module...
#[tokio::test(flavor = "multi_thread")]
async fn space_local_agent_periodic_re_sign_and_bootstrap() {
#[derive(Debug)]
struct B(pub Mutex<Vec<Arc<agent::AgentInfoSigned>>>);

impl bootstrap::Bootstrap for B {
fn put(&self, info: Arc<agent::AgentInfoSigned>) {
self.0.lock().unwrap().push(info);
}
}

#[derive(Debug)]
struct BF(pub Arc<B>);

impl bootstrap::BootstrapFactory for BF {
fn default_config(&self, _config: &mut config::Config) -> K2Result<()> {
Ok(())
}

fn create(
&self,
_builder: Arc<builder::Builder>,
_peer_store: peer_store::DynPeerStore,
_space: SpaceId,
) -> BoxFut<'static, K2Result<bootstrap::DynBootstrap>> {
let out: bootstrap::DynBootstrap = self.0.clone();
Box::pin(async move { Ok(out) })
}
}

#[derive(Debug)]
struct S;

impl SpaceHandler for S {}

#[derive(Debug)]
struct K;

impl KitsuneHandler for K {
fn create_space(
&self,
_space: SpaceId,
) -> BoxFut<'_, K2Result<space::DynSpaceHandler>> {
Box::pin(async move {
let s: DynSpaceHandler = Arc::new(S);
Ok(s)
})
}
}

let b = Arc::new(B(Mutex::new(Vec::new())));

let builder = builder::Builder {
verifier: Arc::new(TestVerifier),
bootstrap: Arc::new(BF(b.clone())),
..crate::default_builder()
}
.with_default_config()
.unwrap();

builder
.config
.set_module_config(&super::CoreSpaceModConfig {
core_space: super::CoreSpaceConfig {
// check every 5 millis if we need to re-sign
re_sign_freq_ms: 5,
// set it so we have to re-sign ALL agent infos always
ThetaSinner marked this conversation as resolved.
Show resolved Hide resolved
re_sign_expire_time_ms: 1000 * 60 * 20,
},
})
.unwrap();

let k: DynKitsuneHandler = Arc::new(K);
let k1 = builder.build(k).await.unwrap();

let bob = Arc::new(TestLocalAgent::default()) as agent::DynLocalAgent;

let s1 = k1.space(TEST_SPACE.clone()).await.unwrap();

s1.local_agent_join(bob.clone()).await.unwrap();

iter_check!(1000, {
// see if bootstrap has received at least 5 new updated agent infos
if b.0.lock().unwrap().len() >= 5 {
break;
}
});
}
Loading