From b4957ff408cf3d604d6c2028ce4291e587bf268d Mon Sep 17 00:00:00 2001 From: threadexio Date: Thu, 11 Jul 2024 13:06:54 +0300 Subject: [PATCH] fix: Buffered transactions dont finish correctly A Buffered transaction will hold any data written to it in an internal `Vec`. When it is `finish()`ed, it will write that buffer to the underlying writer. The `finish` and `poll_finish` implementations would write any such data but without remembering how much data they had written. This is not a problem for the synchronous `finish()` as that writes the buffer all in one go and as such does not need to remember how much data it wrote. However, in the asynchronous implementation, if the transaction happenned to also want a flush after writing the data and `poll_flush()` returned with `Poll::Pending`, then the entire `poll_finish()` would return with `Poll::Pending`. The next time the `Finish` future would be polled, the `poll_future()` will re-write the entire buffer back to the underlying writer because it wouldn't know if any of it had been written previously. This commit changes the internal buffer from a `&mut Vec` to a `Cursor<&mut Vec>` which holds a position in the buffer eliminating the bug. --- channels-io/src/transaction/buffered.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/channels-io/src/transaction/buffered.rs b/channels-io/src/transaction/buffered.rs index eaf433d..4690187 100644 --- a/channels-io/src/transaction/buffered.rs +++ b/channels-io/src/transaction/buffered.rs @@ -3,6 +3,7 @@ use core::task::{ready, Context, Poll}; use alloc::vec::Vec; +use crate::buf::Cursor; use crate::transaction::{AsyncWriteTransaction, WriteTransaction}; use crate::{AsyncWrite, AsyncWriteExt, Write, WriteExt}; @@ -15,7 +16,7 @@ use crate::{AsyncWrite, AsyncWriteExt, Write, WriteExt}; #[derive(Debug)] pub struct Buffered<'a, W> { writer: W, - buf: &'a mut Vec, + buf: Cursor<&'a mut Vec>, wants_flush: bool, } @@ -29,7 +30,7 @@ impl<'a, W> Buffered<'a, W> { /// If `buf` is not empty. pub fn new(writer: W, buf: &'a mut Vec) -> Self { assert!(buf.is_empty(), "buf should be empty"); - Self { writer, buf, wants_flush: false } + Self { writer, buf: Cursor::new(buf), wants_flush: false } } /// Get a reference to the underlying writer. @@ -53,7 +54,7 @@ where &mut self, buf: &[u8], ) -> Result { - self.buf.extend_from_slice(buf); + self.buf.get_mut().extend_from_slice(buf); Ok(buf.len()) } @@ -70,7 +71,7 @@ where fn finish(self) -> Result<(), Self::Error> { let Self { buf, wants_flush, mut writer } = self; - writer.write(buf.as_slice())?; + writer.write(buf)?; if wants_flush { writer.flush()?; @@ -91,7 +92,7 @@ where _: &mut Context, buf: &[u8], ) -> Poll> { - self.buf.extend_from_slice(buf); + self.buf.get_mut().extend_from_slice(buf); Poll::Ready(Ok(buf.len())) } @@ -113,9 +114,8 @@ where cx: &mut Context, ) -> Poll> { let Self { ref mut buf, wants_flush, ref mut writer } = *self; - let buf = &mut **buf; - ready!(Pin::new(&mut *writer).poll_write(cx, buf.as_slice()))?; + ready!(Pin::new(&mut *writer).poll_write(cx, buf))?; if wants_flush { ready!(Pin::new(&mut *writer).poll_flush(cx))?;