Skip to content

Commit

Permalink
fix: Buffered transactions dont finish correctly
Browse files Browse the repository at this point in the history
A Buffered transaction will hold any data written to it in an internal
`Vec<u8>`. When it is `finish()`ed, it will write that buffer to the
underlying writer. The `finish` and `poll_finish` implementations would
write any such data but without remembering how much data they had
written. This is not a problem for the synchronous `finish()` as that
writes the buffer all in one go and as such does not need to remember
how much data it wrote. However, in the asynchronous implementation, if
the transaction happenned to also want a flush after writing the data
and `poll_flush()` returned with `Poll::Pending`, then the entire
`poll_finish()` would return with `Poll::Pending`. The next time the
`Finish` future would be polled, the `poll_future()` will re-write the
entire buffer back to the underlying writer because it wouldn't know if
any of it had been written previously. This commit changes the internal
buffer from a `&mut Vec<u8>` to a `Cursor<&mut Vec<u8>>` which holds a
position in the buffer eliminating the bug.
  • Loading branch information
threadexio committed Jul 11, 2024
1 parent 339e82d commit b4957ff
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions channels-io/src/transaction/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use core::task::{ready, Context, Poll};

use alloc::vec::Vec;

use crate::buf::Cursor;
use crate::transaction::{AsyncWriteTransaction, WriteTransaction};
use crate::{AsyncWrite, AsyncWriteExt, Write, WriteExt};

Expand All @@ -15,7 +16,7 @@ use crate::{AsyncWrite, AsyncWriteExt, Write, WriteExt};
#[derive(Debug)]
pub struct Buffered<'a, W> {
writer: W,
buf: &'a mut Vec<u8>,
buf: Cursor<&'a mut Vec<u8>>,
wants_flush: bool,
}

Expand All @@ -29,7 +30,7 @@ impl<'a, W> Buffered<'a, W> {
/// If `buf` is not empty.
pub fn new(writer: W, buf: &'a mut Vec<u8>) -> Self {
assert!(buf.is_empty(), "buf should be empty");
Self { writer, buf, wants_flush: false }
Self { writer, buf: Cursor::new(buf), wants_flush: false }
}

/// Get a reference to the underlying writer.
Expand All @@ -53,7 +54,7 @@ where
&mut self,
buf: &[u8],
) -> Result<usize, Self::Error> {
self.buf.extend_from_slice(buf);
self.buf.get_mut().extend_from_slice(buf);
Ok(buf.len())
}

Expand All @@ -70,7 +71,7 @@ where
fn finish(self) -> Result<(), Self::Error> {
let Self { buf, wants_flush, mut writer } = self;

writer.write(buf.as_slice())?;
writer.write(buf)?;

if wants_flush {
writer.flush()?;
Expand All @@ -91,7 +92,7 @@ where
_: &mut Context,
buf: &[u8],
) -> Poll<Result<usize, Self::Error>> {
self.buf.extend_from_slice(buf);
self.buf.get_mut().extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}

Expand All @@ -113,9 +114,8 @@ where
cx: &mut Context,
) -> Poll<Result<(), Self::Error>> {
let Self { ref mut buf, wants_flush, ref mut writer } = *self;
let buf = &mut **buf;

ready!(Pin::new(&mut *writer).poll_write(cx, buf.as_slice()))?;
ready!(Pin::new(&mut *writer).poll_write(cx, buf))?;

if wants_flush {
ready!(Pin::new(&mut *writer).poll_flush(cx))?;
Expand Down

0 comments on commit b4957ff

Please sign in to comment.