Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signing Agent Infos #55

Merged
merged 23 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ ed25519-dalek = "2.1.1"
num_cpus = "1.16.0"
# api uses this for the kitsune2 wire protocol.
prost = "0.13.3"
# used to generate private cryptography keys.
rand = "0.8.5"
# kitsune types need to be serializable for network transmission.
serde = { version = "1.0.215", features = ["derive"] }
# kitsune2 agent info is serialized as json to improve debugability of
Expand Down Expand Up @@ -77,5 +79,4 @@ kitsune2_bootstrap_srv = { path = "crates/bootstrap_srv" }
kitsune2_memory = { path = "crates/memory" }
kitsune2_test_utils = { path = "crates/test_utils" }

rand = "0.8.5"
tracing-subscriber = "0.3"
97 changes: 86 additions & 11 deletions crates/api/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ use std::sync::Arc;
/// Defines a type capable of cryptographic signatures.
pub trait Signer {
/// Sign the encoded data, returning the resulting detached signature bytes.
fn sign(
&self,
agent_info: &AgentInfo,
message: &[u8],
) -> BoxFut<'_, K2Result<bytes::Bytes>>;
fn sign<'a, 'b: 'a, 'c: 'a>(
neonphog marked this conversation as resolved.
Show resolved Hide resolved
&'a self,
agent_info: &'b AgentInfo,
message: &'c [u8],
) -> BoxFut<'a, K2Result<bytes::Bytes>>;
}

/// Defines a type capable of cryptographic verification.
Expand Down Expand Up @@ -121,11 +121,83 @@ impl Verifier for DynVerifier {
pub trait LocalAgent: Signer + 'static + Send + Sync + std::fmt::Debug {
/// The [AgentId] of this local agent.
fn agent(&self) -> &AgentId;

/// Register a callback to be invoked when [Self::invoke_cb] is called.
/// Implementations need only track a single cb. If this is called again,
/// use only the new one.
fn register_cb(&self, cb: Arc<dyn Fn() + 'static + Send + Sync>);

/// Invoke the registered cb if one has been set.
/// This can be treated as a no-op rather than an error if [Self::register_cb] has not yet been called.
fn invoke_cb(&self);

/// Access the current storage arc for this local agent.
///
/// This will be used by the space module to construct [AgentInfoSigned].
neonphog marked this conversation as resolved.
Show resolved Hide resolved
fn get_cur_storage_arc(&self) -> DhtArc;

/// Set the current storage arc for this local agent.
/// This will be initially set to zero on space join.
/// The gossip module will update this as data is collected.
fn set_cur_storage_arc(&self, arc: DhtArc);

/// This is a chance for the implementor to influence how large
/// a storage arc should be for this agent. The gossip module will
/// attempt to collect enough data for claiming storage authority
/// over this range.
fn get_tgt_storage_arc(&self) -> DhtArc;

/// The sharding module will attempt to determine an ideal target
/// arc for this agent. An implementation is free to use or discard
/// this information when returning the arc in [Self::get_tgt_storage_arc].
/// This will initially be set to zero on join, but the sharding module
ThetaSinner marked this conversation as resolved.
Show resolved Hide resolved
/// may later update this to FULL or a true target value.
fn set_tgt_storage_arc_hint(&self, arc: DhtArc);
}

/// Trait-object [LocalAgent].
pub type DynLocalAgent = Arc<dyn LocalAgent>;

impl LocalAgent for DynLocalAgent {
fn agent(&self) -> &AgentId {
(**self).agent()
}

fn register_cb(&self, cb: Arc<dyn Fn() + 'static + Send + Sync>) {
(**self).register_cb(cb);
}

fn invoke_cb(&self) {
(**self).invoke_cb();
}

fn get_cur_storage_arc(&self) -> DhtArc {
(**self).get_cur_storage_arc()
}

fn set_cur_storage_arc(&self, arc: DhtArc) {
(**self).set_cur_storage_arc(arc);
}

fn get_tgt_storage_arc(&self) -> DhtArc {
(**self).get_tgt_storage_arc()
}

fn set_tgt_storage_arc_hint(&self, arc: DhtArc) {
(**self).set_tgt_storage_arc_hint(arc);
}
}

impl Signer for DynLocalAgent {
fn sign<'a, 'b: 'a, 'c: 'a>(
&'a self,
agent_info: &'b AgentInfo,
message: &'c [u8],
) -> BoxFut<'a, K2Result<bytes::Bytes>> {
(**self).sign(agent_info, message)
}
}

mod serde_string_timestamp {
pub fn serialize<S>(
t: &crate::Timestamp,
Expand All @@ -150,7 +222,9 @@ mod serde_string_timestamp {
}

/// AgentInfo stores metadata related to agents.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[derive(
Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash,
)]
#[serde(rename_all = "camelCase")]
pub struct AgentInfo {
/// The agent id.
Expand Down Expand Up @@ -181,6 +255,7 @@ pub struct AgentInfo {
}

/// Signed agent information.
#[derive(PartialEq, Eq, Hash)]
pub struct AgentInfoSigned {
/// The decoded information associated with this agent.
agent_info: AgentInfo,
Expand Down Expand Up @@ -324,11 +399,11 @@ mod test {
struct TestCrypto;

impl Signer for TestCrypto {
fn sign(
&self,
_agent_info: &AgentInfo,
_encoded: &[u8],
) -> BoxFut<'_, K2Result<bytes::Bytes>> {
fn sign<'a, 'b: 'a, 'c: 'a>(
&'a self,
_agent_info: &'b AgentInfo,
_encoded: &'c [u8],
) -> BoxFut<'a, K2Result<bytes::Bytes>> {
Box::pin(async move { Ok(bytes::Bytes::from_static(SIG)) })
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/api/src/space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ pub trait SpaceHandler: 'static + Send + Sync + std::fmt::Debug {
pub type DynSpaceHandler = Arc<dyn SpaceHandler>;

/// Represents a unique dht space within which to communicate with peers.
///
/// A space in Kitsune2 is largely just responsible for hooking up
/// modules within that space. However, it also has a couple responsibilities:
///
/// - The space provides the space-level notification send/recv ability.
/// - The space manages the generation / publishing of agent infos
/// for joined local agents.
pub trait Space: 'static + Send + Sync + std::fmt::Debug {
/// Get a reference to the peer store being used by this space.
/// This could allow you to inject peer info from some source other
Expand Down
20 changes: 11 additions & 9 deletions crates/api/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ impl TxImpHnd {

/// A low-level transport implementation.
pub trait TxImp: 'static + Send + Sync + std::fmt::Debug {
/// Get the current url if any.
fn url(&self) -> Option<Url>;

/// Indicates that the implementation should close any open connections to
/// the given peer. If a payload is provided, the implementation can
/// make a best effort to send it to the remote first on a short timeout.
Expand All @@ -144,11 +147,13 @@ pub trait Transport: 'static + Send + Sync + std::fmt::Debug {
///
/// Panics if you attempt to register a duplicate handler for
/// a space.
///
/// Returns the current url if any.
fn register_space_handler(
&self,
space: SpaceId,
handler: DynTxSpaceHandler,
);
) -> Option<Url>;

/// Register a module handler for receiving incoming module messages.
///
Expand Down Expand Up @@ -224,16 +229,13 @@ impl Transport for DefaultTransport {
&self,
space: SpaceId,
handler: DynTxSpaceHandler,
) {
if self
.space_map
.lock()
.unwrap()
.insert(space.clone(), handler)
.is_some()
{
) -> Option<Url> {
let mut lock = self.space_map.lock().unwrap();
if lock.insert(space.clone(), handler).is_some() {
panic!("Attempted to register duplicate space handler! {space}");
}
// keep the lock locked while we fetch the url for atomicity.
self.imp.url()
}

fn register_module_handler(
Expand Down
4 changes: 3 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ edition = "2021"
[dependencies]
backon = { workspace = true }
bytes = { workspace = true }
ed25519-dalek = { workspace = true }
ed25519-dalek = { workspace = true, features = ["rand_core"] }
futures = { workspace = true }
kitsune2_api = { workspace = true }
prost = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "sync", "time"] }
Expand All @@ -29,5 +30,6 @@ axum = { workspace = true, default-features = false, features = [
"tokio",
] }
ed25519-dalek = { workspace = true, features = ["rand_core"] }
kitsune2_test_utils = { workspace = true }
rand = { workspace = true }
tokio = { workspace = true, features = ["full"] }
3 changes: 0 additions & 3 deletions crates/core/src/factories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ pub use core_space::*;
pub mod mem_peer_store;
pub use mem_peer_store::MemPeerStoreFactory;

#[cfg(test)]
pub(crate) use mem_peer_store::test_utils;

pub mod mem_bootstrap;
pub use mem_bootstrap::MemBootstrapFactory;

Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/factories/core_bootstrap/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::sync::{Arc, Mutex};
struct TestCrypto;

impl agent::Signer for TestCrypto {
fn sign(
&self,
agent_info: &agent::AgentInfo,
encoded: &[u8],
) -> BoxFut<'_, K2Result<bytes::Bytes>> {
fn sign<'a, 'b: 'a, 'c: 'a>(
&'a self,
agent_info: &'b agent::AgentInfo,
encoded: &'c [u8],
) -> BoxFut<'a, K2Result<bytes::Bytes>> {
use ed25519_dalek::*;

let s1: AgentId = serde_json::from_str(&format!(
Expand Down
28 changes: 14 additions & 14 deletions crates/core/src/factories/core_fetch/back_off.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,17 @@ impl BackOff {

#[cfg(test)]
mod test {
use std::{sync::Arc, time::Duration};

use kitsune2_api::{fetch::Fetch, SpaceId, Url};

use crate::{
default_builder,
factories::{
core_fetch::{
back_off::BackOffList,
test::{create_op_list, random_agent_id, MockTransport},
CoreFetch, CoreFetchConfig,
},
test_utils::AgentBuilder,
factories::core_fetch::{
back_off::BackOffList,
test::{create_op_list, random_agent_id, MockTransport},
CoreFetch, CoreFetchConfig,
},
};
use kitsune2_api::{fetch::Fetch, SpaceId, Url};
use kitsune2_test_utils::agent::*;
use std::{sync::Arc, time::Duration};

#[test]
fn back_off() {
Expand Down Expand Up @@ -177,7 +173,7 @@ mod test {
url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())),
..Default::default()
}
.build();
.build(TestLocalAgent::default());
peer_store.insert(vec![agent_info.clone()]).await.unwrap();

let fetch = CoreFetch::new(
Expand Down Expand Up @@ -215,6 +211,10 @@ mod test {
}

#[tokio::test(flavor = "multi_thread")]
#[cfg_attr(
target_os = "windows",
ignore = "flaky: back_off.rs:333:10: called `Result::unwrap()` on an `Err` value: Elapsed(())"
)]
async fn requests_are_dropped_when_max_back_off_expired() {
let builder =
Arc::new(default_builder().with_default_config().unwrap());
Expand All @@ -235,7 +235,7 @@ mod test {
url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())),
..Default::default()
}
.build();
.build(TestLocalAgent::default());
let agent_url_1 = agent_info_1.url.clone().unwrap();
peer_store.insert(vec![agent_info_1.clone()]).await.unwrap();

Expand All @@ -247,7 +247,7 @@ mod test {
url: Some(Some(Url::from_str("wss://127.0.0.1:2").unwrap())),
..Default::default()
}
.build();
.build(TestLocalAgent::default());
peer_store.insert(vec![agent_info_2.clone()]).await.unwrap();

let fetch = CoreFetch::new(
Expand Down
Loading
Loading