Skip to content

Commit

Permalink
Signing Agent Infos (#55)
Browse files Browse the repository at this point in the history
* wip

* test

* test

* signing agent infos

* cleanup testing

* more test cleanup

* wip

* wip

* Apply suggestions from code review

Co-authored-by: ThetaSinner <[email protected]>

* review comment

* review comment

* review comment

* Update crates/core/src/factories/core_space.rs

Co-authored-by: Jost <[email protected]>

* review comment

* flaky test

* fmt

---------

Co-authored-by: ThetaSinner <[email protected]>
Co-authored-by: Jost <[email protected]>
  • Loading branch information
3 people authored Jan 6, 2025
1 parent 448c23a commit 9485a26
Show file tree
Hide file tree
Showing 21 changed files with 762 additions and 246 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ ed25519-dalek = "2.1.1"
num_cpus = "1.16.0"
# api uses this for the kitsune2 wire protocol.
prost = "0.13.3"
# used to generate private cryptography keys.
rand = "0.8.5"
# kitsune types need to be serializable for network transmission.
serde = { version = "1.0.215", features = ["derive"] }
# kitsune2 agent info is serialized as json to improve debugability of
Expand Down Expand Up @@ -77,5 +79,4 @@ kitsune2_bootstrap_srv = { path = "crates/bootstrap_srv" }
kitsune2_memory = { path = "crates/memory" }
kitsune2_test_utils = { path = "crates/test_utils" }

rand = "0.8.5"
tracing-subscriber = "0.3"
97 changes: 86 additions & 11 deletions crates/api/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ use std::sync::Arc;
/// Defines a type capable of cryptographic signatures.
pub trait Signer {
/// Sign the encoded data, returning the resulting detached signature bytes.
fn sign(
&self,
agent_info: &AgentInfo,
message: &[u8],
) -> BoxFut<'_, K2Result<bytes::Bytes>>;
fn sign<'a, 'b: 'a, 'c: 'a>(
&'a self,
agent_info: &'b AgentInfo,
message: &'c [u8],
) -> BoxFut<'a, K2Result<bytes::Bytes>>;
}

/// Defines a type capable of cryptographic verification.
Expand Down Expand Up @@ -121,11 +121,83 @@ impl Verifier for DynVerifier {
pub trait LocalAgent: Signer + 'static + Send + Sync + std::fmt::Debug {
/// The [AgentId] of this local agent.
fn agent(&self) -> &AgentId;

/// Register a callback to be invoked when [Self::invoke_cb] is called.
/// Implementations need only track a single cb. If this is called again,
/// use only the new one.
fn register_cb(&self, cb: Arc<dyn Fn() + 'static + Send + Sync>);

/// Invoke the registered cb if one has been set.
/// This can be treated as a no-op rather than an error if [Self::register_cb] has not yet been called.
fn invoke_cb(&self);

/// Access the current storage arc for this local agent.
///
/// This will be used by the space module to construct [AgentInfoSigned].
fn get_cur_storage_arc(&self) -> DhtArc;

/// Set the current storage arc for this local agent.
/// This will be initially set to zero on space join.
/// The gossip module will update this as data is collected.
fn set_cur_storage_arc(&self, arc: DhtArc);

/// This is a chance for the implementor to influence how large
/// a storage arc should be for this agent. The gossip module will
/// attempt to collect enough data for claiming storage authority
/// over this range.
fn get_tgt_storage_arc(&self) -> DhtArc;

/// The sharding module will attempt to determine an ideal target
/// arc for this agent. An implementation is free to use or discard
/// this information when returning the arc in [Self::get_tgt_storage_arc].
/// This will initially be set to zero on join, but the sharding module
/// may later update this to FULL or a true target value.
fn set_tgt_storage_arc_hint(&self, arc: DhtArc);
}

/// Trait-object [LocalAgent].
pub type DynLocalAgent = Arc<dyn LocalAgent>;

impl LocalAgent for DynLocalAgent {
fn agent(&self) -> &AgentId {
(**self).agent()
}

fn register_cb(&self, cb: Arc<dyn Fn() + 'static + Send + Sync>) {
(**self).register_cb(cb);
}

fn invoke_cb(&self) {
(**self).invoke_cb();
}

fn get_cur_storage_arc(&self) -> DhtArc {
(**self).get_cur_storage_arc()
}

fn set_cur_storage_arc(&self, arc: DhtArc) {
(**self).set_cur_storage_arc(arc);
}

fn get_tgt_storage_arc(&self) -> DhtArc {
(**self).get_tgt_storage_arc()
}

fn set_tgt_storage_arc_hint(&self, arc: DhtArc) {
(**self).set_tgt_storage_arc_hint(arc);
}
}

impl Signer for DynLocalAgent {
fn sign<'a, 'b: 'a, 'c: 'a>(
&'a self,
agent_info: &'b AgentInfo,
message: &'c [u8],
) -> BoxFut<'a, K2Result<bytes::Bytes>> {
(**self).sign(agent_info, message)
}
}

mod serde_string_timestamp {
pub fn serialize<S>(
t: &crate::Timestamp,
Expand All @@ -150,7 +222,9 @@ mod serde_string_timestamp {
}

/// AgentInfo stores metadata related to agents.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[derive(
Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash,
)]
#[serde(rename_all = "camelCase")]
pub struct AgentInfo {
/// The agent id.
Expand Down Expand Up @@ -181,6 +255,7 @@ pub struct AgentInfo {
}

/// Signed agent information.
#[derive(PartialEq, Eq, Hash)]
pub struct AgentInfoSigned {
/// The decoded information associated with this agent.
agent_info: AgentInfo,
Expand Down Expand Up @@ -324,11 +399,11 @@ mod test {
struct TestCrypto;

impl Signer for TestCrypto {
fn sign(
&self,
_agent_info: &AgentInfo,
_encoded: &[u8],
) -> BoxFut<'_, K2Result<bytes::Bytes>> {
fn sign<'a, 'b: 'a, 'c: 'a>(
&'a self,
_agent_info: &'b AgentInfo,
_encoded: &'c [u8],
) -> BoxFut<'a, K2Result<bytes::Bytes>> {
Box::pin(async move { Ok(bytes::Bytes::from_static(SIG)) })
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/api/src/space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ pub trait SpaceHandler: 'static + Send + Sync + std::fmt::Debug {
pub type DynSpaceHandler = Arc<dyn SpaceHandler>;

/// Represents a unique dht space within which to communicate with peers.
///
/// A space in Kitsune2 is largely just responsible for hooking up
/// modules within that space. However, it also has a couple responsibilities:
///
/// - The space provides the space-level notification send/recv ability.
/// - The space manages the generation / publishing of agent infos
/// for joined local agents.
pub trait Space: 'static + Send + Sync + std::fmt::Debug {
/// Get a reference to the peer store being used by this space.
/// This could allow you to inject peer info from some source other
Expand Down
20 changes: 11 additions & 9 deletions crates/api/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ impl TxImpHnd {

/// A low-level transport implementation.
pub trait TxImp: 'static + Send + Sync + std::fmt::Debug {
/// Get the current url if any.
fn url(&self) -> Option<Url>;

/// Indicates that the implementation should close any open connections to
/// the given peer. If a payload is provided, the implementation can
/// make a best effort to send it to the remote first on a short timeout.
Expand All @@ -144,11 +147,13 @@ pub trait Transport: 'static + Send + Sync + std::fmt::Debug {
///
/// Panics if you attempt to register a duplicate handler for
/// a space.
///
/// Returns the current url if any.
fn register_space_handler(
&self,
space: SpaceId,
handler: DynTxSpaceHandler,
);
) -> Option<Url>;

/// Register a module handler for receiving incoming module messages.
///
Expand Down Expand Up @@ -224,16 +229,13 @@ impl Transport for DefaultTransport {
&self,
space: SpaceId,
handler: DynTxSpaceHandler,
) {
if self
.space_map
.lock()
.unwrap()
.insert(space.clone(), handler)
.is_some()
{
) -> Option<Url> {
let mut lock = self.space_map.lock().unwrap();
if lock.insert(space.clone(), handler).is_some() {
panic!("Attempted to register duplicate space handler! {space}");
}
// keep the lock locked while we fetch the url for atomicity.
self.imp.url()
}

fn register_module_handler(
Expand Down
4 changes: 3 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ edition = "2021"
[dependencies]
backon = { workspace = true }
bytes = { workspace = true }
ed25519-dalek = { workspace = true }
ed25519-dalek = { workspace = true, features = ["rand_core"] }
futures = { workspace = true }
kitsune2_api = { workspace = true }
prost = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "sync", "time"] }
Expand All @@ -29,5 +30,6 @@ axum = { workspace = true, default-features = false, features = [
"tokio",
] }
ed25519-dalek = { workspace = true, features = ["rand_core"] }
kitsune2_test_utils = { workspace = true }
rand = { workspace = true }
tokio = { workspace = true, features = ["full"] }
3 changes: 0 additions & 3 deletions crates/core/src/factories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ pub use core_space::*;
pub mod mem_peer_store;
pub use mem_peer_store::MemPeerStoreFactory;

#[cfg(test)]
pub(crate) use mem_peer_store::test_utils;

pub mod mem_bootstrap;
pub use mem_bootstrap::MemBootstrapFactory;

Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/factories/core_bootstrap/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::sync::{Arc, Mutex};
struct TestCrypto;

impl agent::Signer for TestCrypto {
fn sign(
&self,
agent_info: &agent::AgentInfo,
encoded: &[u8],
) -> BoxFut<'_, K2Result<bytes::Bytes>> {
fn sign<'a, 'b: 'a, 'c: 'a>(
&'a self,
agent_info: &'b agent::AgentInfo,
encoded: &'c [u8],
) -> BoxFut<'a, K2Result<bytes::Bytes>> {
use ed25519_dalek::*;

let s1: AgentId = serde_json::from_str(&format!(
Expand Down
28 changes: 14 additions & 14 deletions crates/core/src/factories/core_fetch/back_off.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,17 @@ impl BackOff {

#[cfg(test)]
mod test {
use std::{sync::Arc, time::Duration};

use kitsune2_api::{fetch::Fetch, SpaceId, Url};

use crate::{
default_builder,
factories::{
core_fetch::{
back_off::BackOffList,
test::{create_op_list, random_agent_id, MockTransport},
CoreFetch, CoreFetchConfig,
},
test_utils::AgentBuilder,
factories::core_fetch::{
back_off::BackOffList,
test::{create_op_list, random_agent_id, MockTransport},
CoreFetch, CoreFetchConfig,
},
};
use kitsune2_api::{fetch::Fetch, SpaceId, Url};
use kitsune2_test_utils::agent::*;
use std::{sync::Arc, time::Duration};

#[test]
fn back_off() {
Expand Down Expand Up @@ -177,7 +173,7 @@ mod test {
url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())),
..Default::default()
}
.build();
.build(TestLocalAgent::default());
peer_store.insert(vec![agent_info.clone()]).await.unwrap();

let fetch = CoreFetch::new(
Expand Down Expand Up @@ -215,6 +211,10 @@ mod test {
}

#[tokio::test(flavor = "multi_thread")]
#[cfg_attr(
target_os = "windows",
ignore = "flaky: back_off.rs:333:10: called `Result::unwrap()` on an `Err` value: Elapsed(())"
)]
async fn requests_are_dropped_when_max_back_off_expired() {
let builder =
Arc::new(default_builder().with_default_config().unwrap());
Expand All @@ -235,7 +235,7 @@ mod test {
url: Some(Some(Url::from_str("wss://127.0.0.1:1").unwrap())),
..Default::default()
}
.build();
.build(TestLocalAgent::default());
let agent_url_1 = agent_info_1.url.clone().unwrap();
peer_store.insert(vec![agent_info_1.clone()]).await.unwrap();

Expand All @@ -247,7 +247,7 @@ mod test {
url: Some(Some(Url::from_str("wss://127.0.0.1:2").unwrap())),
..Default::default()
}
.build();
.build(TestLocalAgent::default());
peer_store.insert(vec![agent_info_2.clone()]).await.unwrap();

let fetch = CoreFetch::new(
Expand Down
Loading

0 comments on commit 9485a26

Please sign in to comment.