diff --git a/channels/src/protocol/mod.rs b/channels/src/protocol/mod.rs index 00ba3c2..630e07e 100644 --- a/channels/src/protocol/mod.rs +++ b/channels/src/protocol/mod.rs @@ -2,6 +2,6 @@ mod deframer; mod recv; mod send; -pub use self::deframer::Deframer; -pub use self::recv::{recv_async, recv_sync}; -pub use self::send::{send_async, send_sync, SendPcb}; +pub(crate) use self::deframer::Deframer; +pub(crate) use self::recv::ReceiverCore; +pub(crate) use self::send::{SendPcb, SenderCore}; diff --git a/channels/src/protocol/recv.rs b/channels/src/protocol/recv.rs index e453961..60c52b6 100644 --- a/channels/src/protocol/recv.rs +++ b/channels/src/protocol/recv.rs @@ -6,8 +6,13 @@ use crate::statistics::StatIO; use super::deframer::{DeframeError, DeframeStatus, Deframer}; +pub(crate) struct ReceiverCore { + pub(crate) reader: StatIO, + pub(crate) deframer: Deframer, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum RecvPayloadError { +pub enum CoreRecvError { ChecksumError, ExceededMaximumSize, InvalidHeader, @@ -17,10 +22,10 @@ pub enum RecvPayloadError { ZeroSizeFragment, } -impl From for RecvPayloadError { +impl From for CoreRecvError { fn from(value: DeframeError) -> Self { + use CoreRecvError as B; use DeframeError as A; - use RecvPayloadError as B; match value { A::ChecksumError => B::ChecksumError, @@ -33,10 +38,10 @@ impl From for RecvPayloadError { } } -impl From> for RecvError { - fn from(value: RecvPayloadError) -> Self { +impl From> for RecvError { + fn from(value: CoreRecvError) -> Self { + use CoreRecvError as A; use RecvError as B; - use RecvPayloadError as A; match value { A::ChecksumError => B::ChecksumError, @@ -58,7 +63,6 @@ channels_macros::replace! { (await =>) (recv => recv_sync) (Read => Read) - (run => run_sync) ] // Asynchronous version [ @@ -66,30 +70,30 @@ channels_macros::replace! { (await => .await) (recv => recv_async) (Read => AsyncRead) - (run => run_async) ] } code: { -pub async fn recv( - reader: &mut StatIO, - deframer: &mut Deframer, -) -> Result, RecvPayloadError> +impl ReceiverCore where R: Read, { - use DeframeStatus::{NotReady, Ready}; + pub async fn recv( + &mut self, + ) -> Result, CoreRecvError> { + use DeframeStatus::{NotReady, Ready}; - reader.statistics.inc_ops(); + self.reader.statistics.inc_ops(); - loop { - match deframer.deframe(&mut reader.statistics) { - Ready(Ok(payload)) => break Ok(payload), - Ready(Err(e)) => break Err(e.into()), - NotReady(r) => { - reader.read(r.buf) await - .map_err(RecvPayloadError::Io)?; - continue; + loop { + match self.deframer.deframe(&mut self.reader.statistics) { + Ready(Ok(payload)) => break Ok(payload), + Ready(Err(e)) => break Err(e.into()), + NotReady(r) => { + self.reader.read(r.buf) await + .map_err(CoreRecvError::Io)?; + continue; + } } } } diff --git a/channels/src/protocol/send.rs b/channels/src/protocol/send.rs index c2d5298..a11be19 100644 --- a/channels/src/protocol/send.rs +++ b/channels/src/protocol/send.rs @@ -7,21 +7,27 @@ use crate::io::{AsyncWrite, Write}; use crate::sender::Config; use crate::statistics::StatIO; +pub(crate) struct SenderCore { + pub(crate) writer: StatIO, + pub(crate) config: Config, + pub(crate) pcb: SendPcb, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum SendPayloadError { +pub enum CoreSendError { Io(Io), } -impl From for SendPayloadError { +impl From for CoreSendError { fn from(value: Io) -> Self { Self::Io(value) } } -impl From> for SendError { - fn from(value: SendPayloadError) -> Self { +impl From> for SendError { + fn from(value: CoreSendError) -> Self { + use CoreSendError as A; use SendError as B; - use SendPayloadError as A; match value { A::Io(x) => B::Io(x), @@ -142,7 +148,6 @@ channels_macros::replace! { (await =>) (send => send_sync) (Write => Write) - (run => run_sync) ] // Asynchronous version [ @@ -150,52 +155,51 @@ channels_macros::replace! { (await => .await) (send => send_async) (Write => AsyncWrite) - (run => run_async) ] } code: { -pub async fn send( - config: &Config, - pcb: &mut SendPcb, - writer: &mut StatIO, - payload: &[u8], -) -> Result<(), SendPayloadError> +impl SenderCore where - W: Write + W: Write, { - let with_checksum = if config.use_header_checksum() { - WithChecksum::Yes - } else { - WithChecksum::No - }; - - let mut buf = Vec::new(); - - for packet in as_packets(pcb, payload) { - let header_bytes = packet.header.to_bytes(with_checksum); - - if config.coalesce_writes() { - buf.reserve_exact(packet.header.length.as_usize()); - buf.extend_from_slice(&header_bytes); - buf.extend_from_slice(packet.payload); - writer.write(&buf) await?; - buf.clear(); + pub async fn send( + &mut self, + data: &[u8], + ) -> Result<(), CoreSendError> { + let with_checksum = if self.config.use_header_checksum() { + WithChecksum::Yes } else { - writer.write(&header_bytes) await?; - writer.write(packet.payload) await?; + WithChecksum::No + }; + + let mut buf = Vec::new(); + + for packet in as_packets(&mut self.pcb, data) { + let header_bytes = packet.header.to_bytes(with_checksum); + + if self.config.coalesce_writes() { + buf.reserve_exact(packet.header.length.as_usize()); + buf.extend_from_slice(&header_bytes); + buf.extend_from_slice(packet.payload); + self.writer.write(&buf) await?; + buf.clear(); + } else { + self.writer.write(&header_bytes) await?; + self.writer.write(packet.payload) await?; + } + + self.writer.statistics.inc_packets(); } - writer.statistics.inc_packets(); - } - - if config.flush_on_send() { - writer.flush() await?; - } + if self.config.flush_on_send() { + self.writer.flush() await?; + } - writer.statistics.inc_ops(); + self.writer.statistics.inc_ops(); - Ok(()) + Ok(()) + } } } diff --git a/channels/src/receiver.rs b/channels/src/receiver.rs index aeda4f5..0b66e47 100644 --- a/channels/src/receiver.rs +++ b/channels/src/receiver.rs @@ -8,7 +8,7 @@ use channels_packet::PacketLength; use crate::error::RecvError; use crate::io::{AsyncRead, Container, IntoRead, Read}; -use crate::protocol::Deframer; +use crate::protocol::{Deframer, ReceiverCore}; use crate::serdes::Deserializer; #[allow(unused_imports)] @@ -17,9 +17,8 @@ use crate::statistics::{StatIO, Statistics}; /// The receiving-half of the channel. pub struct Receiver { _marker: PhantomData T>, - reader: StatIO, deserializer: D, - deframer: Deframer, + core: ReceiverCore, } impl Receiver { @@ -138,7 +137,7 @@ impl Receiver { /// ``` #[inline] pub fn config(&self) -> &Config { - self.deframer.config() + self.core.deframer.config() } /// Get an iterator over incoming messages. @@ -203,7 +202,7 @@ impl Receiver { #[inline] #[cfg(feature = "statistics")] pub fn statistics(&self) -> &Statistics { - &self.reader.statistics + &self.core.reader.statistics } } @@ -236,7 +235,7 @@ where /// ``` #[inline] pub fn get(&self) -> &R::Inner { - self.reader.inner.get_ref() + self.core.reader.inner.get_ref() } /// Get a mutable reference to the underlying reader. Directly reading from @@ -266,13 +265,13 @@ where /// ``` #[inline] pub fn get_mut(&mut self) -> &mut R::Inner { - self.reader.inner.get_mut() + self.core.reader.inner.get_mut() } /// Destruct the receiver and get back the underlying reader. #[inline] pub fn into_reader(self) -> R::Inner { - self.reader.inner.into_inner() + self.core.reader.inner.into_inner() } } @@ -322,12 +321,7 @@ where pub async fn recv( &mut self, ) -> Result> { - let mut payload = crate::protocol::recv_async( - &mut self.reader, - &mut self.deframer, - ) - .await?; - + let mut payload = self.core.recv_async().await?; self.deserialize_t(&mut payload) } } @@ -361,11 +355,7 @@ where pub fn recv_blocking( &mut self, ) -> Result> { - let mut payload = crate::protocol::recv_sync( - &mut self.reader, - &mut self.deframer, - )?; - + let mut payload = self.core.recv_sync()?; self.deserialize_t(&mut payload) } } @@ -545,16 +535,15 @@ impl Builder { /// ``` #[inline] pub fn build(self) -> Receiver { - let reader = StatIO::new(self.reader); - let deserializer = self.deserializer; let config = self.config.unwrap_or_default(); let deframer = Deframer::new(config); + let deserializer = self.deserializer; + let reader = StatIO::new(self.reader); Receiver { _marker: PhantomData, deserializer, - reader, - deframer, + core: ReceiverCore { reader, deframer }, } } } @@ -789,7 +778,7 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Receiver") - .field("reader", &self.reader) + .field("reader", &self.core.reader) .field("deserializer", &self.deserializer) .field("config", self.config()) .finish_non_exhaustive() diff --git a/channels/src/sender.rs b/channels/src/sender.rs index 55ab61b..42a6088 100644 --- a/channels/src/sender.rs +++ b/channels/src/sender.rs @@ -8,7 +8,7 @@ use alloc::vec::Vec; use crate::error::SendError; use crate::io::{AsyncWrite, Container, IntoWrite, Write}; -use crate::protocol::SendPcb; +use crate::protocol::{SendPcb, SenderCore}; use crate::serdes::Serializer; #[allow(unused_imports)] @@ -17,10 +17,8 @@ use crate::statistics::{StatIO, Statistics}; /// The sending-half of the channel. pub struct Sender { _marker: PhantomData T>, - writer: StatIO, serializer: S, - pcb: SendPcb, - config: Config, + core: SenderCore, } impl Sender { @@ -139,7 +137,7 @@ impl Sender { /// ``` #[inline] pub fn config(&self) -> &Config { - &self.config + &self.core.config } /// Get statistics on this sender. @@ -158,7 +156,7 @@ impl Sender { #[inline] #[cfg(feature = "statistics")] pub fn statistics(&self) -> &Statistics { - &self.writer.statistics + &self.core.writer.statistics } } @@ -195,7 +193,7 @@ where /// ``` #[inline] pub fn get(&self) -> &W::Inner { - self.writer.inner.get_ref() + self.core.writer.inner.get_ref() } /// Get a mutable reference to the underlying writer. Directly writing to @@ -229,13 +227,13 @@ where /// ``` #[inline] pub fn get_mut(&mut self) -> &mut W::Inner { - self.writer.inner.get_mut() + self.core.writer.inner.get_mut() } /// Destruct the sender and get back the underlying writer. #[inline] pub fn into_writer(self) -> W::Inner { - self.writer.inner.into_inner() + self.core.writer.inner.into_inner() } } @@ -297,15 +295,7 @@ where data: &T, ) -> Result<(), SendError> { let payload = self.serialize_t(data)?; - - crate::protocol::send_async( - &self.config, - &mut self.pcb, - &mut self.writer, - &payload, - ) - .await?; - + self.core.send_async(&payload).await?; Ok(()) } } @@ -354,14 +344,7 @@ where data: &T, ) -> Result<(), SendError> { let payload = self.serialize_t(data)?; - - crate::protocol::send_sync( - &self.config, - &mut self.pcb, - &mut self.writer, - &payload, - )?; - + self.core.send_sync(&payload)?; Ok(()) } } @@ -496,12 +479,15 @@ impl Builder { /// ``` #[inline] pub fn build(self) -> Sender { + let config = self.config.unwrap_or_default(); + let pcb = SendPcb::default(); + let serializer = self.serializer; + let writer = StatIO::new(self.writer); + Sender { _marker: PhantomData, - writer: StatIO::new(self.writer), - serializer: self.serializer, - pcb: SendPcb::default(), - config: self.config.unwrap_or_default(), + serializer, + core: SenderCore { writer, config, pcb }, } } } @@ -704,9 +690,9 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Sender") - .field("writer", &self.writer) + .field("writer", &self.core.writer) .field("serializer", &self.serializer) - .field("config", &self.config) + .field("config", &self.config()) .finish_non_exhaustive() } }