diff --git a/crates/api/src/transport.rs b/crates/api/src/transport.rs index e19fe282..9f416380 100644 --- a/crates/api/src/transport.rs +++ b/crates/api/src/transport.rs @@ -4,9 +4,10 @@ use crate::{protocol::*, *}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; -/// This is the low-level backend transport handler. +/// This is the low-level backend transport handler designed to work +/// with [DefaultTransport]. /// Construct using ([TxImpHnd::new]), with a high-level [DynTxHandler], -/// then call [TxImpHnd::gen_transport] to return the high-level handler +/// then call [DefaultTransport::create] to return the high-level handler /// from the [TransportFactory]. pub struct TxImpHnd { handler: DynTxHandler, @@ -26,17 +27,6 @@ impl TxImpHnd { }) } - /// When constructing a [Transport] from a [TransportFactory], - /// this function does the actual wrapping of your implemementation - /// to produce the [Transport] struct. - pub fn gen_transport(&self, imp: DynTxImp) -> Transport { - Transport { - imp, - space_map: self.space_map.clone(), - mod_map: self.mod_map.clone(), - } - } - /// Call this when you receive or bind a new address at which /// this local node can be reached by peers pub fn new_listening_address(&self, this_url: Url) { @@ -148,20 +138,89 @@ pub trait TxImp: 'static + Send + Sync + std::fmt::Debug { /// Trait-object [TxImp]. pub type DynTxImp = Arc; +/// A high-level wrapper around a low-level [DynTxImp] transport implementation. +pub trait Transport { + /// Register a space handler for receiving incoming notifications. + /// + /// Panics if you attempt to register a duplicate handler for + /// a space. + fn register_space_handler( + &self, + space: SpaceId, + handler: DynTxSpaceHandler, + ); + + /// Register a module handler for receiving incoming module messages. + /// + /// Panics if you attempt to register a duplicate handler for the + /// same (space, module). + fn register_module_handler( + &self, + space: SpaceId, + module: String, + handler: DynTxModuleHandler, + ); + + /// Make a best effort to notify a peer that we are disconnecting and why. + /// After a short time out, the connection will be closed even if the + /// disconnect reason message is still pending. + fn disconnect(&self, peer: Url, reason: Option) -> BoxFut<'_, ()>; + + /// Notify a remote peer within a space. This is a fire-and-forget + /// type message. The future this call returns will indicate any errors + /// that occur up to the point where the message is handed off to + /// the transport backend. After that, the future will return `Ok(())` + /// but the remote peer may or may not actually receive the message. + fn send_space_notify( + &self, + peer: Url, + space: SpaceId, + data: bytes::Bytes, + ) -> BoxFut<'_, K2Result<()>>; + + /// Notify a remote peer module within a space. This is a fire-and-forget + /// type message. The future this call returns will indicate any errors + /// that occur up to the point where the message is handed off to + /// the transport backend. After that, the future will return `Ok(())` + /// but the remote peer may or may not actually receive the message. + fn send_module( + &self, + peer: Url, + space: SpaceId, + module: String, + data: bytes::Bytes, + ) -> BoxFut<'_, K2Result<()>>; +} + +/// Trait-object [Transport]. +pub type DynTransport = Arc; + /// A high-level wrapper around a low-level [DynTxImp] transport implementation. #[derive(Clone, Debug)] -pub struct Transport { +pub struct DefaultTransport { imp: DynTxImp, space_map: Arc>>, mod_map: Arc>>, } -impl Transport { - /// Register a space handler for receiving incoming notifications. +impl DefaultTransport { + /// When constructing a [Transport] from a [TransportFactory], + /// this function does the actual wrapping of your implemementation + /// to produce the [Transport] struct. /// - /// Panics if you attempt to register a duplicate handler for - /// a space. - pub fn register_space_handler( + /// [DefaultTransport] is built to be used with the provided [TxImpHnd]. + pub fn create(hnd: &TxImpHnd, imp: DynTxImp) -> DynTransport { + let out: DynTransport = Arc::new(DefaultTransport { + imp, + space_map: hnd.space_map.clone(), + mod_map: hnd.mod_map.clone(), + }); + out + } +} + +impl Transport for DefaultTransport { + fn register_space_handler( &self, space: SpaceId, handler: DynTxSpaceHandler, @@ -177,11 +236,7 @@ impl Transport { } } - /// Register a module handler for receiving incoming module messages. - /// - /// Panics if you attempt to register a duplicate handler for the - /// same (space, module). - pub fn register_module_handler( + fn register_module_handler( &self, space: SpaceId, module: String, @@ -198,69 +253,62 @@ impl Transport { } } - /// Make a best effort to notify a peer that we are disconnecting and why. - /// After a short time out, the connection will be closed even if the - /// disconnect reason message is still pending. - pub async fn disconnect(&self, peer: Url, reason: Option) { - let payload = match reason { - None => None, - Some(reason) => match (K2Proto { - ty: k2_proto::Ty::Disconnect as i32, - data: bytes::Bytes::copy_from_slice(reason.as_bytes()), - space: None, - module: None, - }) - .encode() - { - Ok(payload) => Some((reason, payload)), - Err(_) => None, - }, - }; - - self.imp.disconnect(peer, payload).await; + fn disconnect(&self, peer: Url, reason: Option) -> BoxFut<'_, ()> { + Box::pin(async move { + let payload = match reason { + None => None, + Some(reason) => match (K2Proto { + ty: k2_proto::Ty::Disconnect as i32, + data: bytes::Bytes::copy_from_slice(reason.as_bytes()), + space: None, + module: None, + }) + .encode() + { + Ok(payload) => Some((reason, payload)), + Err(_) => None, + }, + }; + + self.imp.disconnect(peer, payload).await; + }) } - /// Notify a remote peer within a space. This is a fire-and-forget - /// type message. The future this call returns will indicate any errors - /// that occur up to the point where the message is handed off to - /// the transport backend. After that, the future will return `Ok(())` - /// but the remote peer may or may not actually receive the message. - pub async fn send_space_notify( + fn send_space_notify( &self, peer: Url, space: SpaceId, data: bytes::Bytes, - ) -> K2Result<()> { - let enc = (K2Proto { - ty: k2_proto::Ty::Notify as i32, - data, - space: Some(space.into()), - module: None, + ) -> BoxFut<'_, K2Result<()>> { + Box::pin(async move { + let enc = (K2Proto { + ty: k2_proto::Ty::Notify as i32, + data, + space: Some(space.into()), + module: None, + }) + .encode()?; + self.imp.send(peer, enc).await }) - .encode()?; - self.imp.send(peer, enc).await } - /// Notify a remote peer module within a space. This is a fire-and-forget - /// type message. The future this call returns will indicate any errors - /// that occur up to the point where the message is handed off to - /// the transport backend. After that, the future will return `Ok(())` - /// but the remote peer may or may not actually receive the message. - pub async fn send_module( + fn send_module( &self, peer: Url, space: SpaceId, module: String, data: bytes::Bytes, - ) -> K2Result<()> { - let enc = (K2Proto { - ty: k2_proto::Ty::Module as i32, - data, - space: Some(space.into()), - module: Some(module), + ) -> BoxFut<'_, K2Result<()>> { + Box::pin(async move { + let enc = (K2Proto { + ty: k2_proto::Ty::Module as i32, + data, + space: Some(space.into()), + module: Some(module), + }) + .encode()?; + self.imp.send(peer, enc).await }) - .encode()?; - self.imp.send(peer, enc).await } } @@ -361,7 +409,7 @@ pub trait TransportFactory: 'static + Send + Sync + std::fmt::Debug { &self, builder: Arc, handler: Arc, - ) -> BoxFut<'static, K2Result>; + ) -> BoxFut<'static, K2Result>; } /// Trait-object [TransportFactory]. diff --git a/crates/core/src/factories/mem_transport.rs b/crates/core/src/factories/mem_transport.rs index 57d78b8f..381499d9 100644 --- a/crates/core/src/factories/mem_transport.rs +++ b/crates/core/src/factories/mem_transport.rs @@ -38,13 +38,13 @@ impl TransportFactory for MemTransportFactory { &self, builder: Arc, handler: Arc, - ) -> BoxFut<'static, K2Result> { + ) -> BoxFut<'static, K2Result> { Box::pin(async move { let config = builder .config .get_module_config::(MOD_NAME)?; let imp = MemTransport::create(config, handler.clone()).await; - Ok(handler.gen_transport(imp)) + Ok(DefaultTransport::create(&handler, imp)) }) } } diff --git a/crates/core/src/factories/mem_transport/test.rs b/crates/core/src/factories/mem_transport/test.rs index 8cd1e481..d7051bc0 100644 --- a/crates/core/src/factories/mem_transport/test.rs +++ b/crates/core/src/factories/mem_transport/test.rs @@ -192,7 +192,7 @@ impl TrackHnd { } } -async fn gen_tx(hnd: DynTxHandler) -> Transport { +async fn gen_tx(hnd: DynTxHandler) -> DynTransport { let builder = Arc::new(crate::default_builder()); let hnd = TxImpHnd::new(hnd); builder