Skip to content

Commit

Permalink
Transport trait (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
neonphog authored Dec 12, 2024
1 parent bafe69b commit f04ef8a
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 76 deletions.
194 changes: 121 additions & 73 deletions crates/api/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use crate::{protocol::*, *};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

/// This is the low-level backend transport handler.
/// This is the low-level backend transport handler designed to work
/// with [DefaultTransport].
/// Construct using ([TxImpHnd::new]), with a high-level [DynTxHandler],
/// then call [TxImpHnd::gen_transport] to return the high-level handler
/// then call [DefaultTransport::create] to return the high-level handler
/// from the [TransportFactory].
pub struct TxImpHnd {
handler: DynTxHandler,
Expand All @@ -26,17 +27,6 @@ impl TxImpHnd {
})
}

/// When constructing a [Transport] from a [TransportFactory],
/// this function does the actual wrapping of your implemementation
/// to produce the [Transport] struct.
pub fn gen_transport(&self, imp: DynTxImp) -> Transport {
Transport {
imp,
space_map: self.space_map.clone(),
mod_map: self.mod_map.clone(),
}
}

/// Call this when you receive or bind a new address at which
/// this local node can be reached by peers
pub fn new_listening_address(&self, this_url: Url) {
Expand Down Expand Up @@ -148,20 +138,89 @@ pub trait TxImp: 'static + Send + Sync + std::fmt::Debug {
/// Trait-object [TxImp].
pub type DynTxImp = Arc<dyn TxImp>;

/// A high-level wrapper around a low-level [DynTxImp] transport implementation.
pub trait Transport {
/// Register a space handler for receiving incoming notifications.
///
/// Panics if you attempt to register a duplicate handler for
/// a space.
fn register_space_handler(
&self,
space: SpaceId,
handler: DynTxSpaceHandler,
);

/// Register a module handler for receiving incoming module messages.
///
/// Panics if you attempt to register a duplicate handler for the
/// same (space, module).
fn register_module_handler(
&self,
space: SpaceId,
module: String,
handler: DynTxModuleHandler,
);

/// Make a best effort to notify a peer that we are disconnecting and why.
/// After a short time out, the connection will be closed even if the
/// disconnect reason message is still pending.
fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()>;

/// Notify a remote peer within a space. This is a fire-and-forget
/// type message. The future this call returns will indicate any errors
/// that occur up to the point where the message is handed off to
/// the transport backend. After that, the future will return `Ok(())`
/// but the remote peer may or may not actually receive the message.
fn send_space_notify(
&self,
peer: Url,
space: SpaceId,
data: bytes::Bytes,
) -> BoxFut<'_, K2Result<()>>;

/// Notify a remote peer module within a space. This is a fire-and-forget
/// type message. The future this call returns will indicate any errors
/// that occur up to the point where the message is handed off to
/// the transport backend. After that, the future will return `Ok(())`
/// but the remote peer may or may not actually receive the message.
fn send_module(
&self,
peer: Url,
space: SpaceId,
module: String,
data: bytes::Bytes,
) -> BoxFut<'_, K2Result<()>>;
}

/// Trait-object [Transport].
pub type DynTransport = Arc<dyn Transport>;

/// A high-level wrapper around a low-level [DynTxImp] transport implementation.
#[derive(Clone, Debug)]
pub struct Transport {
pub struct DefaultTransport {
imp: DynTxImp,
space_map: Arc<Mutex<HashMap<SpaceId, DynTxSpaceHandler>>>,
mod_map: Arc<Mutex<HashMap<(SpaceId, String), DynTxModuleHandler>>>,
}

impl Transport {
/// Register a space handler for receiving incoming notifications.
impl DefaultTransport {
/// When constructing a [Transport] from a [TransportFactory],
/// this function does the actual wrapping of your implemementation
/// to produce the [Transport] struct.
///
/// Panics if you attempt to register a duplicate handler for
/// a space.
pub fn register_space_handler(
/// [DefaultTransport] is built to be used with the provided [TxImpHnd].
pub fn create(hnd: &TxImpHnd, imp: DynTxImp) -> DynTransport {
let out: DynTransport = Arc::new(DefaultTransport {
imp,
space_map: hnd.space_map.clone(),
mod_map: hnd.mod_map.clone(),
});
out
}
}

impl Transport for DefaultTransport {
fn register_space_handler(
&self,
space: SpaceId,
handler: DynTxSpaceHandler,
Expand All @@ -177,11 +236,7 @@ impl Transport {
}
}

/// Register a module handler for receiving incoming module messages.
///
/// Panics if you attempt to register a duplicate handler for the
/// same (space, module).
pub fn register_module_handler(
fn register_module_handler(
&self,
space: SpaceId,
module: String,
Expand All @@ -198,69 +253,62 @@ impl Transport {
}
}

/// Make a best effort to notify a peer that we are disconnecting and why.
/// After a short time out, the connection will be closed even if the
/// disconnect reason message is still pending.
pub async fn disconnect(&self, peer: Url, reason: Option<String>) {
let payload = match reason {
None => None,
Some(reason) => match (K2Proto {
ty: k2_proto::Ty::Disconnect as i32,
data: bytes::Bytes::copy_from_slice(reason.as_bytes()),
space: None,
module: None,
})
.encode()
{
Ok(payload) => Some((reason, payload)),
Err(_) => None,
},
};

self.imp.disconnect(peer, payload).await;
fn disconnect(&self, peer: Url, reason: Option<String>) -> BoxFut<'_, ()> {
Box::pin(async move {
let payload = match reason {
None => None,
Some(reason) => match (K2Proto {
ty: k2_proto::Ty::Disconnect as i32,
data: bytes::Bytes::copy_from_slice(reason.as_bytes()),
space: None,
module: None,
})
.encode()
{
Ok(payload) => Some((reason, payload)),
Err(_) => None,
},
};

self.imp.disconnect(peer, payload).await;
})
}

/// Notify a remote peer within a space. This is a fire-and-forget
/// type message. The future this call returns will indicate any errors
/// that occur up to the point where the message is handed off to
/// the transport backend. After that, the future will return `Ok(())`
/// but the remote peer may or may not actually receive the message.
pub async fn send_space_notify(
fn send_space_notify(
&self,
peer: Url,
space: SpaceId,
data: bytes::Bytes,
) -> K2Result<()> {
let enc = (K2Proto {
ty: k2_proto::Ty::Notify as i32,
data,
space: Some(space.into()),
module: None,
) -> BoxFut<'_, K2Result<()>> {
Box::pin(async move {
let enc = (K2Proto {
ty: k2_proto::Ty::Notify as i32,
data,
space: Some(space.into()),
module: None,
})
.encode()?;
self.imp.send(peer, enc).await
})
.encode()?;
self.imp.send(peer, enc).await
}

/// Notify a remote peer module within a space. This is a fire-and-forget
/// type message. The future this call returns will indicate any errors
/// that occur up to the point where the message is handed off to
/// the transport backend. After that, the future will return `Ok(())`
/// but the remote peer may or may not actually receive the message.
pub async fn send_module(
fn send_module(
&self,
peer: Url,
space: SpaceId,
module: String,
data: bytes::Bytes,
) -> K2Result<()> {
let enc = (K2Proto {
ty: k2_proto::Ty::Module as i32,
data,
space: Some(space.into()),
module: Some(module),
) -> BoxFut<'_, K2Result<()>> {
Box::pin(async move {
let enc = (K2Proto {
ty: k2_proto::Ty::Module as i32,
data,
space: Some(space.into()),
module: Some(module),
})
.encode()?;
self.imp.send(peer, enc).await
})
.encode()?;
self.imp.send(peer, enc).await
}
}

Expand Down Expand Up @@ -361,7 +409,7 @@ pub trait TransportFactory: 'static + Send + Sync + std::fmt::Debug {
&self,
builder: Arc<builder::Builder>,
handler: Arc<TxImpHnd>,
) -> BoxFut<'static, K2Result<Transport>>;
) -> BoxFut<'static, K2Result<DynTransport>>;
}

/// Trait-object [TransportFactory].
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/factories/mem_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ impl TransportFactory for MemTransportFactory {
&self,
builder: Arc<builder::Builder>,
handler: Arc<TxImpHnd>,
) -> BoxFut<'static, K2Result<Transport>> {
) -> BoxFut<'static, K2Result<DynTransport>> {
Box::pin(async move {
let config = builder
.config
.get_module_config::<MemTransportConfig>(MOD_NAME)?;
let imp = MemTransport::create(config, handler.clone()).await;
Ok(handler.gen_transport(imp))
Ok(DefaultTransport::create(&handler, imp))
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/factories/mem_transport/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl TrackHnd {
}
}

async fn gen_tx(hnd: DynTxHandler) -> Transport {
async fn gen_tx(hnd: DynTxHandler) -> DynTransport {
let builder = Arc::new(crate::default_builder());
let hnd = TxImpHnd::new(hnd);
builder
Expand Down

0 comments on commit f04ef8a

Please sign in to comment.