Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signing Agent Infos #55

Merged
merged 23 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ clap = "4.5.21"
# on shutdown.
ctrlc = { version = "3.4.5", features = ["termination"] }
# bootstrap_srv for signature verification.
ed25519-dalek = "2.1.1"
ed25519-dalek = { version = "2.1.1", features = ["rand_core"] }
ThetaSinner marked this conversation as resolved.
Show resolved Hide resolved
# bootstrap_srv uses this to determine worker thread count.
num_cpus = "1.16.0"
# api uses this for the kitsune2 wire protocol.
prost = "0.13.3"
# used to generate private cryptography keys.
rand = "0.8.5"
# kitsune types need to be serializable for network transmission.
serde = { version = "1.0.215", features = ["derive"] }
# kitsune2 agent info is serialized as json to improve debugability of
Expand Down Expand Up @@ -75,5 +77,4 @@ kitsune2_bootstrap_srv = { path = "crates/bootstrap_srv" }
kitsune2_memory = { path = "crates/memory" }
kitsune2_test_utils = { path = "crates/test_utils" }

rand = "0.8.5"
tracing-subscriber = "0.3"
97 changes: 86 additions & 11 deletions crates/api/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ use std::sync::Arc;
/// Defines a type capable of cryptographic signatures.
pub trait Signer {
/// Sign the encoded data, returning the resulting detached signature bytes.
fn sign(
&self,
agent_info: &AgentInfo,
message: &[u8],
) -> BoxFut<'_, K2Result<bytes::Bytes>>;
fn sign<'a, 'b: 'a, 'c: 'a>(
neonphog marked this conversation as resolved.
Show resolved Hide resolved
&'a self,
agent_info: &'b AgentInfo,
message: &'c [u8],
) -> BoxFut<'a, K2Result<bytes::Bytes>>;
}

/// Defines a type capable of cryptographic verification.
Expand Down Expand Up @@ -121,11 +121,83 @@ impl Verifier for DynVerifier {
pub trait LocalAgent: Signer + 'static + Send + Sync + std::fmt::Debug {
/// The [AgentId] of this local agent.
fn agent(&self) -> &AgentId;

/// Register a callback to be invoked when [Self::invoke_cb] is called.
/// Implementations need only track a single cb. If this is called again,
/// use only the new one.
fn register_cb(&self, cb: Arc<dyn Fn() + 'static + Send + Sync>);

/// Invoke the registered cb if one has been set.
/// This can be ignored if [Self::register_cb] has not yet been called.
neonphog marked this conversation as resolved.
Show resolved Hide resolved
fn invoke_cb(&self);

/// Access the current storage arc for this local agent.
/// This will be used by the space module to construct [AgentInfoSigned].
neonphog marked this conversation as resolved.
Show resolved Hide resolved
fn get_cur_storage_arc(&self) -> DhtArc;

/// Set the current storage arc for this local agent.
/// This will be initially set to zero on space join,
ThetaSinner marked this conversation as resolved.
Show resolved Hide resolved
/// then will be updated by the gossip module as best effort to
/// collect data for the range is made.
neonphog marked this conversation as resolved.
Show resolved Hide resolved
fn set_cur_storage_arc(&self, arc: DhtArc);

/// This is a chance for the implementor to influence how large
/// a storage arc should be for this agent. The gossip module will
/// attempt to collect enough data for claiming storage authority
/// over this range.
fn get_tgt_storage_arc(&self) -> DhtArc;

/// The sharding module will attempt to determine an ideal target
/// arc for this agent. An implementation is free to use or discard
/// this information when returning the arc in [Self::get_tgt_storage_arc].
/// This will initially be set to zero on join, but the sharding module
ThetaSinner marked this conversation as resolved.
Show resolved Hide resolved
/// may later update this to FULL or a true target value.
fn set_tgt_storage_arc_hint(&self, arc: DhtArc);
}

/// Trait-object [LocalAgent].
pub type DynLocalAgent = Arc<dyn LocalAgent>;

impl LocalAgent for DynLocalAgent {
fn agent(&self) -> &AgentId {
(**self).agent()
}

fn register_cb(&self, cb: Arc<dyn Fn() + 'static + Send + Sync>) {
(**self).register_cb(cb);
}

fn invoke_cb(&self) {
(**self).invoke_cb();
}

fn get_cur_storage_arc(&self) -> DhtArc {
(**self).get_cur_storage_arc()
}

fn set_cur_storage_arc(&self, arc: DhtArc) {
(**self).set_cur_storage_arc(arc);
}

fn get_tgt_storage_arc(&self) -> DhtArc {
(**self).get_tgt_storage_arc()
}

fn set_tgt_storage_arc_hint(&self, arc: DhtArc) {
(**self).set_tgt_storage_arc_hint(arc);
}
}

impl Signer for DynLocalAgent {
fn sign<'a, 'b: 'a, 'c: 'a>(
&'a self,
agent_info: &'b AgentInfo,
message: &'c [u8],
) -> BoxFut<'a, K2Result<bytes::Bytes>> {
(**self).sign(agent_info, message)
}
}

mod serde_string_timestamp {
pub fn serialize<S>(
t: &crate::Timestamp,
Expand All @@ -150,7 +222,9 @@ mod serde_string_timestamp {
}

/// AgentInfo stores metadata related to agents.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[derive(
Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash,
)]
#[serde(rename_all = "camelCase")]
pub struct AgentInfo {
/// The agent id.
Expand Down Expand Up @@ -181,6 +255,7 @@ pub struct AgentInfo {
}

/// Signed agent information.
#[derive(PartialEq, Eq, Hash)]
pub struct AgentInfoSigned {
/// The decoded information associated with this agent.
agent_info: AgentInfo,
Expand Down Expand Up @@ -324,11 +399,11 @@ mod test {
struct TestCrypto;

impl Signer for TestCrypto {
fn sign(
&self,
_agent_info: &AgentInfo,
_encoded: &[u8],
) -> BoxFut<'_, K2Result<bytes::Bytes>> {
fn sign<'a, 'b: 'a, 'c: 'a>(
&'a self,
_agent_info: &'b AgentInfo,
_encoded: &'c [u8],
) -> BoxFut<'a, K2Result<bytes::Bytes>> {
Box::pin(async move { Ok(bytes::Bytes::from_static(SIG)) })
}
}
Expand Down
12 changes: 12 additions & 0 deletions crates/api/src/kitsune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ use std::sync::Arc;

/// Handler for events coming out of Kitsune2.
pub trait KitsuneHandler: 'static + Send + Sync + std::fmt::Debug {
/// A notification that a new listening address has been bound.
/// Peers should now go to this new address to reach this node.
fn new_listening_address(&self, this_url: Url) {
drop(this_url);
}

/// A peer has disconnected from us. If they did so gracefully
/// the reason will be is_some().
fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
drop((peer, reason));
}

/// Gather preflight data to send to a new opening connection.
/// Returning an Err result will close this connection.
///
Expand Down
34 changes: 25 additions & 9 deletions crates/api/src/space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,37 @@ use std::sync::Arc;

/// Handler for events coming out of Kitsune2 such as messages from peers.
pub trait SpaceHandler: 'static + Send + Sync + std::fmt::Debug {
/// We have received an incomming message from a remote peer.
///
/// If this callback handler returns an `Err` response, the connection
/// will be closed immediately.
/// The sync handler for receiving notifications sent by a remote
/// peer in reference to a particular space. If this callback returns
/// an error, then the connection which sent the message will be closed.
//
// Note: this is the minimal low-level messaging unit. We can decide
// later if we want to handle request/response tracking in
// kitsune itself as a convenience or if users of this lib
// should have to implement that if they want it.
fn incoming_message(
fn recv_notify(
&self,
peer: AgentId,
to_agent: AgentId,
from_agent: AgentId,
space: SpaceId,
data: bytes::Bytes,
) -> K2Result<()>;
) -> K2Result<()> {
drop((to_agent, from_agent, space, data));
Ok(())
}
}

/// Trait-object [SpaceHandler].
pub type DynSpaceHandler = Arc<dyn SpaceHandler>;

/// Represents a unique dht space within which to communicate with peers.
///
/// A space in Kitsune2 is largely just responsible for hooking up
/// modules within that space. However, it also has a couple responsibilities:
///
/// - The space provides the space-level notification send/recv ability.
/// - The space manages the generation / publishing of agent infos
/// for joined local agents.
pub trait Space: 'static + Send + Sync + std::fmt::Debug {
/// Get a reference to the peer store being used by this space.
/// This could allow you to inject peer info from some source other
Expand Down Expand Up @@ -56,8 +67,12 @@ pub trait Space: 'static + Send + Sync + std::fmt::Debug {
// later if we want to handle request/response tracking in
// kitsune itself as a convenience or if users of this lib
// should have to implement that if they want it.
fn send_message(&self, peer: AgentId, data: bytes::Bytes)
-> BoxFut<'_, ()>;
fn send_notify(
&self,
to_agent: AgentId,
from_agent: AgentId,
data: bytes::Bytes,
) -> BoxFut<'_, K2Result<()>>;
}

/// Trait-object [Space].
Expand All @@ -75,6 +90,7 @@ pub trait SpaceFactory: 'static + Send + Sync + std::fmt::Debug {
builder: Arc<builder::Builder>,
handler: DynSpaceHandler,
space: SpaceId,
tx: transport::DynTransport,
) -> BoxFut<'static, K2Result<DynSpace>>;
}

Expand Down
Loading