Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
neonphog committed Dec 13, 2024
1 parent c80b4e8 commit 6d0ed72
Show file tree
Hide file tree
Showing 13 changed files with 270 additions and 35 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions crates/api/src/kitsune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ use std::sync::Arc;

/// Handler for events coming out of Kitsune2.
pub trait KitsuneHandler: 'static + Send + Sync + std::fmt::Debug {
/// A notification that a new listening address has been bound.
/// Peers should now go to this new address to reach this node.
fn new_listening_address(&self, this_url: Url) {
drop(this_url);
}

/// A peer has disconnected from us. If they did so gracefully
/// the reason will be is_some().
fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
drop((peer, reason));
}

/// Gather preflight data to send to a new opening connection.
/// Returning an Err result will close this connection.
///
Expand Down
27 changes: 18 additions & 9 deletions crates/api/src/space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@ use std::sync::Arc;

/// Handler for events coming out of Kitsune2 such as messages from peers.
pub trait SpaceHandler: 'static + Send + Sync + std::fmt::Debug {
/// We have received an incomming message from a remote peer.
///
/// If this callback handler returns an `Err` response, the connection
/// will be closed immediately.
/// The sync handler for receiving notifications sent by a remote
/// peer in reference to a particular space. If this callback returns
/// an error, then the connection which sent the message will be closed.
//
// Note: this is the minimal low-level messaging unit. We can decide
// later if we want to handle request/response tracking in
// kitsune itself as a convenience or if users of this lib
// should have to implement that if they want it.
fn incoming_message(
fn recv_notify(
&self,
peer: AgentId,
to_agent: AgentId,
from_agent: AgentId,
space: SpaceId,
data: bytes::Bytes,
) -> K2Result<()>;
) -> K2Result<()> {
drop((to_agent, from_agent, space, data));
Ok(())
}
}

/// Trait-object [SpaceHandler].
Expand Down Expand Up @@ -56,8 +60,12 @@ pub trait Space: 'static + Send + Sync + std::fmt::Debug {
// later if we want to handle request/response tracking in
// kitsune itself as a convenience or if users of this lib
// should have to implement that if they want it.
fn send_message(&self, peer: AgentId, data: bytes::Bytes)
-> BoxFut<'_, ()>;
fn send_notify(
&self,
to_agent: AgentId,
from_agent: AgentId,
data: bytes::Bytes,
) -> BoxFut<'_, K2Result<()>>;
}

/// Trait-object [Space].
Expand All @@ -75,6 +83,7 @@ pub trait SpaceFactory: 'static + Send + Sync + std::fmt::Debug {
builder: Arc<builder::Builder>,
handler: DynSpaceHandler,
space: SpaceId,
tx: transport::DynTransport,
) -> BoxFut<'static, K2Result<DynSpace>>;
}

Expand Down
25 changes: 17 additions & 8 deletions crates/api/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl TxImpHnd {
let space = SpaceId::from(space);
if let Some(h) = self.space_map.lock().unwrap().get(&space)
{
h.recv_space_notify(peer, space, data);
h.recv_space_notify(peer, space, data)?;
}
}
Ok(())
Expand All @@ -104,7 +104,7 @@ impl TxImpHnd {
.unwrap()
.get(&(space.clone(), module.clone()))
{
h.recv_module_msg(peer, space, module, data);
h.recv_module_msg(peer, space, module, data)?;
}
}
Ok(())
Expand Down Expand Up @@ -139,7 +139,7 @@ pub trait TxImp: 'static + Send + Sync + std::fmt::Debug {
pub type DynTxImp = Arc<dyn TxImp>;

/// A high-level wrapper around a low-level [DynTxImp] transport implementation.
pub trait Transport {
pub trait Transport: 'static + Send + Sync + std::fmt::Debug {
/// Register a space handler for receiving incoming notifications.
///
/// Panics if you attempt to register a duplicate handler for
Expand Down Expand Up @@ -371,9 +371,16 @@ pub type DynTxHandler = Arc<dyn TxHandler>;
/// Handler for space-related events.
pub trait TxSpaceHandler: TxBaseHandler {
/// The sync handler for receiving notifications sent by a remote
/// peer in reference to a particular space.
fn recv_space_notify(&self, peer: Url, space: SpaceId, data: bytes::Bytes) {
/// peer in reference to a particular space. If this callback returns
/// an error, then the connection which sent the message will be closed.
fn recv_space_notify(
&self,
peer: Url,
space: SpaceId,
data: bytes::Bytes,
) -> K2Result<()> {
drop((peer, space, data));
Ok(())
}
}

Expand All @@ -383,15 +390,17 @@ pub type DynTxSpaceHandler = Arc<dyn TxSpaceHandler>;
/// Handler for module-related events.
pub trait TxModuleHandler: TxBaseHandler {
/// The sync handler for receiving module messages sent by a remote
/// peer in reference to a particular space.
/// peer in reference to a particular space. If this callback returns
/// an error, then the connection which sent the message will be closed.
fn recv_module_msg(
&self,
peer: Url,
space: SpaceId,
module: String,
data: bytes::Bytes,
) {
) -> K2Result<()> {
drop((peer, space, module, data));
Ok(())
}
}

Expand All @@ -408,7 +417,7 @@ pub trait TransportFactory: 'static + Send + Sync + std::fmt::Debug {
fn create(
&self,
builder: Arc<builder::Builder>,
handler: Arc<TxImpHnd>,
handler: DynTxHandler,
) -> BoxFut<'static, K2Result<DynTransport>>;
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bytes = { workspace = true }
ed25519-dalek = { workspace = true }
futures = { workspace = true }
kitsune2_api = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["sync", "rt"] }
Expand Down
17 changes: 17 additions & 0 deletions crates/core/proto/gen/kitsune2.space.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// This file is @generated by prost-build.
/// A Kitsune2 space protocol message.
///
/// There is only a single space-level message type. That is a notify
/// between two agents at that space level. Making this a very simple message.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct K2SpaceProto {
/// The destination agent.
#[prost(bytes = "bytes", tag = "1")]
pub to_agent: ::prost::bytes::Bytes,
/// The source agent.
#[prost(bytes = "bytes", tag = "2")]
pub from_agent: ::prost::bytes::Bytes,
/// The payload or content of this message.
#[prost(bytes = "bytes", tag = "3")]
pub data: ::prost::bytes::Bytes,
}
18 changes: 18 additions & 0 deletions crates/core/proto/space.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
syntax = "proto3";

package kitsune2.space;

// A Kitsune2 space protocol message.
//
// There is only a single space-level message type. That is a notify
// between two agents at that space level. Making this a very simple message.
message K2SpaceProto {
// The destination agent.
bytes to_agent = 1;

// The source agent.
bytes from_agent = 2;

// The payload or content of this message.
bytes data = 3;
}
57 changes: 52 additions & 5 deletions crates/core/src/factories/core_kitsune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,54 @@ impl KitsuneFactory for CoreKitsuneFactory {
let config = builder
.config
.get_module_config::<CoreKitsuneConfig>(MOD_NAME)?;
let out: DynKitsune =
Arc::new(CoreKitsune::new(builder.clone(), config, handler));
let tx = builder
.transport
.create(
builder.clone(),
Arc::new(TxHandlerTranslator(handler.clone())),
)
.await?;
let out: DynKitsune = Arc::new(CoreKitsune::new(
builder.clone(),
config,
handler,
tx,
));
Ok(out)
})
}
}

#[derive(Debug)]
struct TxHandlerTranslator(DynKitsuneHandler);

impl transport::TxBaseHandler for TxHandlerTranslator {
fn new_listening_address(&self, this_url: Url) {
self.0.new_listening_address(this_url);
}

fn peer_disconnect(&self, peer: Url, reason: Option<String>) {
self.0.peer_disconnect(peer, reason);
}
}

impl transport::TxHandler for TxHandlerTranslator {
fn preflight_gather_outgoing(
&self,
peer_url: Url,
) -> K2Result<bytes::Bytes> {
self.0.preflight_gather_outgoing(peer_url)
}

fn preflight_validate_incoming(
&self,
peer_url: Url,
data: bytes::Bytes,
) -> K2Result<()> {
self.0.preflight_validate_incoming(peer_url, data)
}
}

type SpaceFut =
futures::future::Shared<BoxFut<'static, K2Result<space::DynSpace>>>;
type Map = HashMap<SpaceId, SpaceFut>;
Expand All @@ -59,18 +100,21 @@ struct CoreKitsune {
builder: Arc<builder::Builder>,
handler: DynKitsuneHandler,
map: std::sync::Mutex<Map>,
tx: transport::DynTransport,
}

impl CoreKitsune {
pub fn new(
builder: Arc<builder::Builder>,
_config: CoreKitsuneConfig,
handler: DynKitsuneHandler,
tx: transport::DynTransport,
) -> Self {
Self {
builder,
handler,
map: std::sync::Mutex::new(HashMap::new()),
tx,
}
}
}
Expand All @@ -88,13 +132,14 @@ impl Kitsune for CoreKitsune {
Entry::Vacant(e) => {
let builder = self.builder.clone();
let handler = self.handler.clone();
let tx = self.tx.clone();
e.insert(futures::future::FutureExt::shared(Box::pin(
async move {
let sh =
handler.create_space(space.clone()).await?;
let s = builder
.space
.create(builder.clone(), sh, space)
.create(builder.clone(), sh, space, tx)
.await?;
Ok(s)
},
Expand All @@ -119,9 +164,11 @@ mod test {
struct S;

impl SpaceHandler for S {
fn incoming_message(
fn recv_notify(
&self,
_peer: AgentId,
_to_agent: AgentId,
_from_agent: AgentId,
_space: SpaceId,
_data: bytes::Bytes,
) -> K2Result<()> {
// this test is a bit of a stub for now until we have the
Expand Down
Loading

0 comments on commit 6d0ed72

Please sign in to comment.