Skip to content

Commit

Permalink
refactor: factor out {Sender,Receiver}Core
Browse files Browse the repository at this point in the history
  • Loading branch information
threadexio committed May 8, 2024
1 parent 5535071 commit 4c51ecb
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 122 deletions.
6 changes: 3 additions & 3 deletions channels/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ mod deframer;
mod recv;
mod send;

pub use self::deframer::Deframer;
pub use self::recv::{recv_async, recv_sync};
pub use self::send::{send_async, send_sync, SendPcb};
pub(crate) use self::deframer::Deframer;
pub(crate) use self::recv::ReceiverCore;
pub(crate) use self::send::{SendPcb, SenderCore};
48 changes: 26 additions & 22 deletions channels/src/protocol/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ use crate::statistics::StatIO;

use super::deframer::{DeframeError, DeframeStatus, Deframer};

pub(crate) struct ReceiverCore<R> {
pub(crate) reader: StatIO<R>,
pub(crate) deframer: Deframer,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RecvPayloadError<Io> {
pub enum CoreRecvError<Io> {
ChecksumError,
ExceededMaximumSize,
InvalidHeader,
Expand All @@ -17,10 +22,10 @@ pub enum RecvPayloadError<Io> {
ZeroSizeFragment,
}

impl<Io> From<DeframeError> for RecvPayloadError<Io> {
impl<Io> From<DeframeError> for CoreRecvError<Io> {
fn from(value: DeframeError) -> Self {
use CoreRecvError as B;
use DeframeError as A;
use RecvPayloadError as B;

match value {
A::ChecksumError => B::ChecksumError,
Expand All @@ -33,10 +38,10 @@ impl<Io> From<DeframeError> for RecvPayloadError<Io> {
}
}

impl<Des, Io> From<RecvPayloadError<Io>> for RecvError<Des, Io> {
fn from(value: RecvPayloadError<Io>) -> Self {
impl<Des, Io> From<CoreRecvError<Io>> for RecvError<Des, Io> {
fn from(value: CoreRecvError<Io>) -> Self {
use CoreRecvError as A;
use RecvError as B;
use RecvPayloadError as A;

match value {
A::ChecksumError => B::ChecksumError,
Expand All @@ -58,38 +63,37 @@ channels_macros::replace! {
(await =>)
(recv => recv_sync)
(Read => Read)
(run => run_sync)
]
// Asynchronous version
[
(async => async)
(await => .await)
(recv => recv_async)
(Read => AsyncRead)
(run => run_async)
]
}
code: {

pub async fn recv<R>(
reader: &mut StatIO<R>,
deframer: &mut Deframer,
) -> Result<Vec<u8>, RecvPayloadError<R::Error>>
impl<R> ReceiverCore<R>
where
R: Read,
{
use DeframeStatus::{NotReady, Ready};
pub async fn recv(
&mut self,
) -> Result<Vec<u8>, CoreRecvError<R::Error>> {
use DeframeStatus::{NotReady, Ready};

reader.statistics.inc_ops();
self.reader.statistics.inc_ops();

loop {
match deframer.deframe(&mut reader.statistics) {
Ready(Ok(payload)) => break Ok(payload),
Ready(Err(e)) => break Err(e.into()),
NotReady(r) => {
reader.read(r.buf) await
.map_err(RecvPayloadError::Io)?;
continue;
loop {
match self.deframer.deframe(&mut self.reader.statistics) {
Ready(Ok(payload)) => break Ok(payload),
Ready(Err(e)) => break Err(e.into()),
NotReady(r) => {
self.reader.read(r.buf) await
.map_err(CoreRecvError::Io)?;
continue;
}
}
}
}
Expand Down
86 changes: 45 additions & 41 deletions channels/src/protocol/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,27 @@ use crate::io::{AsyncWrite, Write};
use crate::sender::Config;
use crate::statistics::StatIO;

pub(crate) struct SenderCore<W> {
pub(crate) writer: StatIO<W>,
pub(crate) config: Config,
pub(crate) pcb: SendPcb,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SendPayloadError<Io> {
pub enum CoreSendError<Io> {
Io(Io),
}

impl<Io> From<Io> for SendPayloadError<Io> {
impl<Io> From<Io> for CoreSendError<Io> {
fn from(value: Io) -> Self {
Self::Io(value)
}
}

impl<Ser, Io> From<SendPayloadError<Io>> for SendError<Ser, Io> {
fn from(value: SendPayloadError<Io>) -> Self {
impl<Ser, Io> From<CoreSendError<Io>> for SendError<Ser, Io> {
fn from(value: CoreSendError<Io>) -> Self {
use CoreSendError as A;
use SendError as B;
use SendPayloadError as A;

match value {
A::Io(x) => B::Io(x),
Expand Down Expand Up @@ -142,60 +148,58 @@ channels_macros::replace! {
(await =>)
(send => send_sync)
(Write => Write)
(run => run_sync)
]
// Asynchronous version
[
(async => async)
(await => .await)
(send => send_async)
(Write => AsyncWrite)
(run => run_async)
]
}
code: {

pub async fn send<W>(
config: &Config,
pcb: &mut SendPcb,
writer: &mut StatIO<W>,
payload: &[u8],
) -> Result<(), SendPayloadError<W::Error>>
impl<W> SenderCore<W>
where
W: Write
W: Write,
{
let with_checksum = if config.use_header_checksum() {
WithChecksum::Yes
} else {
WithChecksum::No
};

let mut buf = Vec::new();

for packet in as_packets(pcb, payload) {
let header_bytes = packet.header.to_bytes(with_checksum);

if config.coalesce_writes() {
buf.reserve_exact(packet.header.length.as_usize());
buf.extend_from_slice(&header_bytes);
buf.extend_from_slice(packet.payload);
writer.write(&buf) await?;
buf.clear();
pub async fn send(
&mut self,
data: &[u8],
) -> Result<(), CoreSendError<W::Error>> {
let with_checksum = if self.config.use_header_checksum() {
WithChecksum::Yes
} else {
writer.write(&header_bytes) await?;
writer.write(packet.payload) await?;
WithChecksum::No
};

let mut buf = Vec::new();

for packet in as_packets(&mut self.pcb, data) {
let header_bytes = packet.header.to_bytes(with_checksum);

if self.config.coalesce_writes() {
buf.reserve_exact(packet.header.length.as_usize());
buf.extend_from_slice(&header_bytes);
buf.extend_from_slice(packet.payload);
self.writer.write(&buf) await?;
buf.clear();
} else {
self.writer.write(&header_bytes) await?;
self.writer.write(packet.payload) await?;
}

self.writer.statistics.inc_packets();
}

writer.statistics.inc_packets();
}

if config.flush_on_send() {
writer.flush() await?;
}
if self.config.flush_on_send() {
self.writer.flush() await?;
}

writer.statistics.inc_ops();
self.writer.statistics.inc_ops();

Ok(())
Ok(())
}
}

}
Expand Down
37 changes: 13 additions & 24 deletions channels/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use channels_packet::PacketLength;

use crate::error::RecvError;
use crate::io::{AsyncRead, Container, IntoRead, Read};
use crate::protocol::Deframer;
use crate::protocol::{Deframer, ReceiverCore};
use crate::serdes::Deserializer;

#[allow(unused_imports)]
Expand All @@ -17,9 +17,8 @@ use crate::statistics::{StatIO, Statistics};
/// The receiving-half of the channel.
pub struct Receiver<T, R, D> {
_marker: PhantomData<fn() -> T>,
reader: StatIO<R>,
deserializer: D,
deframer: Deframer,
core: ReceiverCore<R>,
}

impl<T> Receiver<T, (), ()> {
Expand Down Expand Up @@ -138,7 +137,7 @@ impl<T, R, D> Receiver<T, R, D> {
/// ```
#[inline]
pub fn config(&self) -> &Config {
self.deframer.config()
self.core.deframer.config()
}

/// Get an iterator over incoming messages.
Expand Down Expand Up @@ -203,7 +202,7 @@ impl<T, R, D> Receiver<T, R, D> {
#[inline]
#[cfg(feature = "statistics")]
pub fn statistics(&self) -> &Statistics {
&self.reader.statistics
&self.core.reader.statistics
}
}

Expand Down Expand Up @@ -236,7 +235,7 @@ where
/// ```
#[inline]
pub fn get(&self) -> &R::Inner {
self.reader.inner.get_ref()
self.core.reader.inner.get_ref()
}

/// Get a mutable reference to the underlying reader. Directly reading from
Expand Down Expand Up @@ -266,13 +265,13 @@ where
/// ```
#[inline]
pub fn get_mut(&mut self) -> &mut R::Inner {
self.reader.inner.get_mut()
self.core.reader.inner.get_mut()
}

/// Destruct the receiver and get back the underlying reader.
#[inline]
pub fn into_reader(self) -> R::Inner {
self.reader.inner.into_inner()
self.core.reader.inner.into_inner()
}
}

Expand Down Expand Up @@ -322,12 +321,7 @@ where
pub async fn recv(
&mut self,
) -> Result<T, RecvError<D::Error, R::Error>> {
let mut payload = crate::protocol::recv_async(
&mut self.reader,
&mut self.deframer,
)
.await?;

let mut payload = self.core.recv_async().await?;
self.deserialize_t(&mut payload)
}
}
Expand Down Expand Up @@ -361,11 +355,7 @@ where
pub fn recv_blocking(
&mut self,
) -> Result<T, RecvError<D::Error, R::Error>> {
let mut payload = crate::protocol::recv_sync(
&mut self.reader,
&mut self.deframer,
)?;

let mut payload = self.core.recv_sync()?;
self.deserialize_t(&mut payload)
}
}
Expand Down Expand Up @@ -545,16 +535,15 @@ impl<T, R, D> Builder<T, R, D> {
/// ```
#[inline]
pub fn build(self) -> Receiver<T, R, D> {
let reader = StatIO::new(self.reader);
let deserializer = self.deserializer;
let config = self.config.unwrap_or_default();
let deframer = Deframer::new(config);
let deserializer = self.deserializer;
let reader = StatIO::new(self.reader);

Receiver {
_marker: PhantomData,
deserializer,
reader,
deframer,
core: ReceiverCore { reader, deframer },
}
}
}
Expand Down Expand Up @@ -789,7 +778,7 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Receiver")
.field("reader", &self.reader)
.field("reader", &self.core.reader)
.field("deserializer", &self.deserializer)
.field("config", self.config())
.finish_non_exhaustive()
Expand Down
Loading

0 comments on commit 4c51ecb

Please sign in to comment.