Skip to content

Commit

Permalink
feat: update Read & Write for tokio-uring impl (#35)
Browse files Browse the repository at this point in the history
* feat: update Read & Write for tokio-uring impl

* fix: default `Read::read_to_end` panic on seeked
  • Loading branch information
KKould authored Sep 30, 2024
1 parent 5682847 commit a054ed9
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 49 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,43 @@ jobs:
command: test
args: --package fusio --features "monoio, futures"

tokio_uring_check:
name: Rust project check on tokio_uring
runs-on: ${{ matrix.os }}
strategy:
matrix:
os:
- ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install latest
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
components: rustfmt, clippy

# `cargo check` command here will use installed `nightly`
# as it is set as an "override" for current directory

- name: Run cargo clippy on tokio-uring
uses: actions-rs/cargo@v1
with:
command: check
args: --package fusio --features "tokio-uring, futures"

- name: Run cargo build on tokio-uring
uses: actions-rs/cargo@v1
with:
command: build
args: --package fusio --features "tokio-uring, futures"

- name: Run cargo test on tokio-uring
uses: actions-rs/cargo@v1
with:
command: test
args: --package fusio --features "tokio-uring, futures"

# 2
fmt:
name: Rust fmt
Expand Down
4 changes: 4 additions & 0 deletions fusio/src/dynamic/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl<'read> Read for Box<dyn DynFile + 'read> {
Ok(unsafe { B::recover_from_buf_mut(buf) })
}

async fn read_to_end(&mut self, buf: Vec<u8>) -> Result<Vec<u8>, Error> {
DynRead::read_to_end(self.as_mut(), buf).await
}

async fn size(&self) -> Result<u64, Error> {
DynRead::size(self.as_ref()).await
}
Expand Down
62 changes: 47 additions & 15 deletions fusio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,8 @@ pub trait Read: MaybeSend + MaybeSync {

fn read_to_end(
&mut self,
mut buf: Vec<u8>,
) -> impl Future<Output = Result<Vec<u8>, Error>> + MaybeSend {
async move {
buf.resize(self.size().await? as usize, 0);
let buf = self.read_exact(buf).await?;
Ok(buf)
}
}
buf: Vec<u8>,
) -> impl Future<Output = Result<Vec<u8>, Error>> + MaybeSend;

fn size(&self) -> impl Future<Output = Result<u64, Error>> + MaybeSend;
}
Expand All @@ -93,6 +87,11 @@ where
Ok(buf)
}

async fn read_to_end(&mut self, mut buf: Vec<u8>) -> Result<Vec<u8>, Error> {
let _ = std::io::Read::read_to_end(self, &mut buf)?;
Ok(buf)
}

async fn size(&self) -> Result<u64, Error> {
Ok(self.get_ref().as_ref().len() as u64)
}
Expand Down Expand Up @@ -143,6 +142,13 @@ impl<R: Read> Read for &mut R {
R::read_exact(self, buf)
}

fn read_to_end(
&mut self,
buf: Vec<u8>,
) -> impl Future<Output = Result<Vec<u8>, Error>> + MaybeSend {
R::read_to_end(self, buf)
}

fn size(&self) -> impl Future<Output = Result<u64, Error>> + MaybeSend {
R::size(self)
}
Expand Down Expand Up @@ -171,7 +177,9 @@ impl<W: Write> Write for &mut W {

#[cfg(test)]
mod tests {
use super::{Read, Write};
use std::future::Future;

use super::{MaybeSend, Read, Write};
use crate::{buf::IoBufMut, Error, IoBuf, Seek};

#[allow(unused)]
Expand Down Expand Up @@ -233,6 +241,13 @@ mod tests {
.inspect(|buf| self.cnt += buf.bytes_init())
}

async fn read_to_end(&mut self, buf: Vec<u8>) -> Result<Vec<u8>, Error> {
self.r
.read_to_end(buf)
.await
.inspect(|buf| self.cnt += buf.bytes_init())
}

async fn size(&self) -> Result<u64, Error> {
self.r.size().await
}
Expand All @@ -259,13 +274,24 @@ mod tests {
writer.sync_data().await.unwrap();

let mut reader = CountRead::new(read);
reader.seek(0).await.unwrap();
{
reader.seek(0).await.unwrap();

let mut buf = vec![];
buf = reader.read_to_end(buf).await.unwrap();

let mut buf = vec![];
buf = reader.read_to_end(buf).await.unwrap();
assert_eq!(buf.bytes_init(), 4);
assert_eq!(buf.as_slice(), &[2, 0, 2, 4]);
}
{
reader.seek(2).await.unwrap();

let mut buf = vec![];
buf = reader.read_to_end(buf).await.unwrap();

assert_eq!(buf.bytes_init(), 4);
assert_eq!(buf.as_slice(), &[2, 0, 2, 4]);
assert_eq!(buf.bytes_init(), 2);
assert_eq!(buf.as_slice(), &[2, 4]);
}
}

#[cfg(feature = "futures")]
Expand Down Expand Up @@ -418,11 +444,17 @@ mod tests {
use tempfile::tempfile;
use tokio_uring::fs::File;

use crate::local::tokio_uring::TokioUringFile;

tokio_uring::start(async {
let read = tempfile().unwrap();
let write = read.try_clone().unwrap();

write_and_read(File::from_std(write), File::from_std(read)).await;
write_and_read(
TokioUringFile::from(File::from_std(write)),
TokioUringFile::from(File::from_std(read)),
)
.await;
});
}
}
4 changes: 3 additions & 1 deletion fusio/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub(crate) mod monoio;
#[cfg(feature = "tokio")]
pub(crate) mod tokio;
#[cfg(all(feature = "tokio-uring", target_os = "linux"))]
mod tokio_uring;
pub(crate) mod tokio_uring;

#[cfg(all(feature = "monoio", feature = "fs"))]
#[allow(unused)]
Expand All @@ -21,5 +21,7 @@ cfg_if::cfg_if! {
pub type LocalFs = TokioFs;
} else if #[cfg(feature = "monoio")] {
pub type LocalFs = MonoIoFs;
} else if #[cfg(feature = "tokio-uring")] {
pub type LocalFs = TokioUringFs;
}
}
10 changes: 9 additions & 1 deletion fusio/src/local/monoio/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#[cfg(feature = "fs")]
pub mod fs;

use std::future::Future;

use monoio::fs::File;

use crate::{buf::IoBufMut, Error, IoBuf, Read, Seek, Write};
use crate::{buf::IoBufMut, Error, IoBuf, MaybeSend, Read, Seek, Write};

#[repr(transparent)]
struct MonoioBuf<B> {
Expand Down Expand Up @@ -94,6 +96,12 @@ impl Read for MonoioFile {
Ok(buf.buf)
}

async fn read_to_end(&mut self, mut buf: Vec<u8>) -> Result<Vec<u8>, Error> {
buf.resize((self.size().await? - self.pos) as usize, 0);

Ok(self.read_exact(buf).await?)
}

async fn size(&self) -> Result<u64, Error> {
let metadata = File::metadata(self.file.as_ref().expect("read file after closed")).await?;
Ok(metadata.len())
Expand Down
5 changes: 5 additions & 0 deletions fusio/src/local/tokio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ impl Read for File {
Ok(buf)
}

async fn read_to_end(&mut self, mut buf: Vec<u8>) -> Result<Vec<u8>, Error> {
let _ = AsyncReadExt::read_to_end(self, &mut buf).await?;
Ok(buf)
}

async fn size(&self) -> Result<u64, Error> {
Ok(self.metadata().await?.len())
}
Expand Down
55 changes: 42 additions & 13 deletions fusio/src/local/tokio_uring/fs.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,62 @@
use std::{io, path::Path};

use async_stream::stream;
use futures_core::Stream;
use tokio_uring::fs::{remove_file, File};
use tokio_uring::fs::{create_dir_all, remove_file};

use crate::fs::{FileMeta, Fs};
use crate::{
fs::{FileMeta, Fs, OpenOptions, WriteMode},
local::tokio_uring::TokioUringFile,
path::{path_to_local, Path},
Error,
};

pub struct TokioUringFs;

impl Fs for TokioUringFs {
type File = File;
type File = TokioUringFile;

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
let local_path = path_to_local(path)?;

let file = tokio_uring::fs::OpenOptions::new()
.read(options.read)
.write(options.write.is_some())
.create(options.create)
.append(options.write == Some(WriteMode::Append))
.truncate(options.write == Some(WriteMode::Overwrite))
.open(&local_path)
.await?;

async fn open(&self, path: impl AsRef<Path>) -> io::Result<Self::File> {
File::open(path).await
Ok(TokioUringFile {
file: Some(file),
pos: 0,
})
}

async fn create_dir_all(path: &Path) -> Result<(), Error> {
let path = path_to_local(path)?;
create_dir_all(path).await?;

Ok(())
}

async fn list(
&self,
path: impl AsRef<Path>,
) -> io::Result<impl Stream<Item = io::Result<FileMeta>>> {
let dir = path.as_ref().read_dir()?;
path: &Path,
) -> Result<impl Stream<Item = Result<FileMeta, Error>>, Error> {
let path = path_to_local(path)?;
let dir = path.read_dir()?;

Ok(stream! {
for entry in dir {
yield Ok(crate::fs::FileMeta { path: entry?.path() });
let entry = entry?;
yield Ok(FileMeta { path: Path::from_filesystem_path(entry.path())?, size: entry.metadata()?.len() });
}
})
}

async fn remove(&self, path: impl AsRef<Path>) -> io::Result<()> {
remove_file(path).await
async fn remove(&self, path: &Path) -> Result<(), Error> {
let path = path_to_local(path)?;

Ok(remove_file(path).await?)
}
}
Loading

0 comments on commit a054ed9

Please sign in to comment.