Skip to content

Commit

Permalink
refactor: replace all implementations to impls folder
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Sep 30, 2024
1 parent 8e58840 commit f469e8b
Show file tree
Hide file tree
Showing 26 changed files with 289 additions and 153 deletions.
4 changes: 3 additions & 1 deletion fusio-object-store/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ impl<O: ObjectStore> Fs for S3Store<O> {

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
if let Some(WriteMode::Append) = options.write {
return Err(Error::Unsupported);
return Err(Error::Unsupported {
message: "append mode is not supported in Amazon S3".into(),
});
}
Ok(S3File {
inner: self.inner.clone(),
Expand Down
43 changes: 31 additions & 12 deletions fusio-object-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,48 @@ pub struct S3File<O: ObjectStore> {
pos: u64,
}

impl<O: ObjectStore> Read for S3File<O> {
async fn read_exact<B: IoBufMut>(&mut self, mut buf: B) -> Result<B, Error> {
let pos = self.pos as usize;

let mut opts = GetOptions::default();
let range = GetRange::Bounded(Range {
start: pos,
end: pos + buf.bytes_init(),
});
opts.range = Some(range);

impl<O: ObjectStore> S3File<O> {
async fn read_with_range<B: IoBufMut>(
&mut self,
range: GetRange,
mut buf: B,
) -> Result<B, Error> {
let opts = GetOptions {
range: Some(range),
..Default::default()
};
let result = self
.inner
.get_opts(&self.path, opts)
.await
.map_err(BoxedError::from)?;
let bytes = result.bytes().await.map_err(BoxedError::from)?;

self.pos += bytes.len() as u64;
buf.set_init(bytes.len());

buf.as_slice_mut().copy_from_slice(&bytes);
Ok(buf)
}
}

impl<O: ObjectStore> Read for S3File<O> {
async fn read_exact<B: IoBufMut>(&mut self, buf: B) -> Result<B, Error> {
let pos = self.pos as usize;

let range = GetRange::Bounded(Range {
start: pos,
end: pos + buf.bytes_init(),
});

self.read_with_range(range, buf).await
}

async fn read_to_end(&mut self, buf: Vec<u8>) -> Result<Vec<u8>, Error> {
let pos = self.pos as usize;
let range = GetRange::Offset(pos);

self.read_with_range(range, buf).await
}

async fn size(&self) -> Result<u64, Error> {
let options = GetOptions {
Expand Down
2 changes: 1 addition & 1 deletion fusio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ aws = [
]
bytes = ["dep:bytes"]
completion-based = []
default = ["dyn", "fs"]
default = ["aws", "dyn", "fs", "tokio", "tokio-http"]
dyn = []
fs = ["tokio?/rt"]
http = [
Expand Down
14 changes: 7 additions & 7 deletions fusio/benches/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::{cell::RefCell, io::SeekFrom, rc::Rc, sync::Arc};

use criterion::{criterion_group, criterion_main, Criterion};
use fusio::{
disk::TokioFs,
fs::{Fs, OpenOptions},
local::TokioFs,
path::Path,
IoBuf, IoBufMut, Write,
};
Expand Down Expand Up @@ -39,11 +39,11 @@ fn write(c: &mut Criterion) {
let file = file.clone();

async move {
let (result, _) =
fusio::dynamic::DynWrite::write_all(&mut *(*file).borrow_mut(), unsafe {
(&bytes.as_ref()[..]).to_buf_nocopy()
})
.await;
let file = &mut *(*file).borrow_mut();
let (result, _) = fusio::dynamic::DynWrite::write_all(file, unsafe {
(&bytes.as_ref()[..]).to_buf_nocopy()
})
.await;
result.unwrap();
}
})
Expand Down Expand Up @@ -98,7 +98,7 @@ fn read(c: &mut Criterion) {
let file = file.clone();

async move {
let _ = fusio::dynamic::DynSeek::seek(&mut *(*file).borrow_mut(), 0)
fusio::dynamic::DynSeek::seek(&mut *(*file).borrow_mut(), 0)
.await
.unwrap();
let _ = fusio::dynamic::DynRead::read_exact(&mut *(*file).borrow_mut(), unsafe {
Expand Down
21 changes: 21 additions & 0 deletions fusio/src/buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub trait IoBuf: Unpin + Sized + MaybeOwned + MaybeSend {
}

pub trait IoBufMut: IoBuf {
fn set_init(&mut self, init: usize);

fn as_mut_ptr(&mut self) -> *mut u8;

fn as_slice_mut(&mut self) -> &mut [u8] {
Expand Down Expand Up @@ -78,6 +80,10 @@ impl IoBuf for Vec<u8> {
}

impl IoBufMut for Vec<u8> {
fn set_init(&mut self, init: usize) {
self.resize(init, 0);
}

fn as_mut_ptr(&mut self) -> *mut u8 {
Vec::as_mut_ptr(self)
}
Expand Down Expand Up @@ -146,6 +152,8 @@ impl IoBuf for &mut [u8] {

#[cfg(not(feature = "completion-based"))]
impl IoBufMut for &mut [u8] {
fn set_init(&mut self, _init: usize) {}

fn as_mut_ptr(&mut self) -> *mut u8 {
<[u8]>::as_mut_ptr(self)
}
Expand Down Expand Up @@ -248,6 +256,10 @@ impl IoBuf for bytes::BytesMut {

#[cfg(feature = "bytes")]
impl IoBufMut for bytes::BytesMut {
fn set_init(&mut self, init: usize) {
self.resize(init, 0)
}

fn as_mut_ptr(&mut self) -> *mut u8 {
<[u8]>::as_mut_ptr(self)
}
Expand Down Expand Up @@ -399,6 +411,15 @@ impl IoBuf for BufMut {
}

impl IoBufMut for BufMut {
fn set_init(&mut self, init: usize) {
match &mut self.0 {
BufMutInner::Slice { .. } => {}
BufMutInner::Vec(vec) => vec.set_init(init),
#[cfg(feature = "bytes")]
BufMutInner::BytesMut(bytes) => bytes.set_init(init),
}
}

fn as_mut_ptr(&mut self) -> *mut u8 {
match &mut self.0 {
BufMutInner::Slice { ptr, .. } => *ptr,
Expand Down
20 changes: 8 additions & 12 deletions fusio/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,14 @@ use thiserror::Error;
#[non_exhaustive]
pub enum Error {
Io(#[from] io::Error),
#[cfg(feature = "http")]
Http(#[from] http::Error),
Path(#[from] crate::path::Error),
#[error("unsupported operation")]
Unsupported,
#[error("invalid url: {0}")]
InvalidUrl(BoxedError),
#[cfg(feature = "http")]
#[error("http request failed, status: {status_code}, body: {body}")]
HttpNotSuccess {
status_code: http::StatusCode,
body: String,
#[cfg(feature = "aws")]
#[error(transparent)]
S3Error(#[from] crate::remotes::aws::S3Error),
#[error(transparent)]
PathError(#[from] crate::path::Error),
#[error("unsupported operation: {message}")]
Unsupported {
message: String,
},
#[error(transparent)]
Other(#[from] BoxedError),
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 2 additions & 0 deletions fusio/src/impls/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod disk;
pub mod remotes;
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl<'a> AwsAuthorizer<'a> {
/// * Otherwise it is set to the hex encoded SHA256 of the request body
///
/// [AWS SigV4]: https://docs.aws.amazon.com/IAM/latest/UserGuide/create-signed-request.html
pub(crate) async fn authorize<B>(&self, request: &mut Request<B>) -> Result<(), AutohrizeError>
pub(crate) async fn authorize<B>(&self, request: &mut Request<B>) -> Result<(), AuthorizeError>
where
B: Body<Data = Bytes> + Clone + Unpin,
B::Error: std::error::Error + Send + Sync + 'static,
Expand All @@ -157,7 +157,7 @@ impl<'a> AwsAuthorizer<'a> {
let host = request
.uri()
.authority()
.ok_or(AutohrizeError::NoHost)?
.ok_or(AuthorizeError::NoHost)?
.as_str()
.to_string();
request.headers_mut().insert(HOST, host.parse()?);
Expand Down Expand Up @@ -185,7 +185,7 @@ impl<'a> AwsAuthorizer<'a> {
.clone()
.collect()
.await
.map_err(|_| AutohrizeError::BodyNoFrame)?
.map_err(|_| AuthorizeError::BodyNoFrame)?
.to_bytes();
hex_digest(&bytes)
}
Expand Down Expand Up @@ -414,15 +414,15 @@ fn hex_digest(bytes: &[u8]) -> String {
}

#[derive(Debug, Error)]
pub enum AutohrizeError {
pub enum AuthorizeError {
#[error("Invalid header value: {0}")]
InvalidHeaderValue(#[from] http::header::InvalidHeaderValue),
#[error("Invalid URL: {0}")]
InvalidUrl(#[from] url::ParseError),
#[error("No host in URL")]
NoHost,
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("Failed to sign request: {0}")]
SignHashFailed(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("Body no frame")]
BodyNoFrame,
}
Expand Down
14 changes: 14 additions & 0 deletions fusio/src/impls/remotes/aws/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use thiserror::Error;

use crate::remotes::aws::credential::AuthorizeError;
use crate::remotes::http::HttpError;

#[derive(Debug, Error)]
pub enum S3Error {
#[error("http error: {0}")]
HttpError(#[from] HttpError),
#[error("authorize error: {0}")]
AuthorizeError(#[from] AuthorizeError),
#[error("xml parse error: {0}")]
XmlParseError(#[from] quick_xml::DeError),
}
47 changes: 30 additions & 17 deletions fusio/src/remotes/aws/fs.rs → fusio/src/impls/remotes/aws/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use http_body_util::{BodyExt, Empty};
use serde::{Deserialize, Serialize};
use url::Url;

use super::S3Error;
use super::{credential::AwsCredential, options::S3Options, S3File};
use crate::remotes::http::HttpError;
use crate::{
fs::{FileMeta, Fs, OpenOptions, WriteMode},
path::Path,
Expand Down Expand Up @@ -99,7 +101,9 @@ impl Fs for AmazonS3 {
options: OpenOptions,
) -> Result<Self::File, crate::Error> {
if let Some(WriteMode::Append) = options.write {
return Err(Error::Unsupported);
return Err(Error::Unsupported {
message: "append mode is not supported in Amazon S3".into(),
});
}

Ok(S3File::new(
Expand All @@ -126,39 +130,39 @@ impl Fs for AmazonS3 {
query.push(("continuation-token", token.as_str()));
}

let mut url = Url::from_str(self.options.endpoint.as_str()).map_err(|e| Error::InvalidUrl(e.into()))?;
let mut url = Url::from_str(self.options.endpoint.as_str()).map_err(|e| S3Error::from(HttpError::from(e)))?;
{
let mut pairs = url.query_pairs_mut();
let serializer = serde_urlencoded::Serializer::new(&mut pairs);
query
.serialize(serializer)
.map_err(|e| Error::InvalidUrl(e.into()))?;
.map_err(|e| S3Error::from(HttpError::from(e)))?;
}

let mut request = Request::builder()
.method(Method::GET)
.uri(url.as_str())
.body(Empty::<Bytes>::new())?;
request.sign(&self.options).await?;
let response = self.client.send_request(request).await?;
.body(Empty::<Bytes>::new()).map_err(|e| S3Error::from(HttpError::from(e)))?;
request.sign(&self.options).await.map_err(S3Error::from)?;
let response = self.client.send_request(request).await.map_err(S3Error::from)?;

if !response.status().is_success() {
yield Err(Error::HttpNotSuccess { status_code: response.status(), body: String::from_utf8_lossy(
yield Err(S3Error::from(HttpError::HttpNotSuccess { status: response.status(), body: String::from_utf8_lossy(
&response
.collect()
.await
.map_err(|e| Error::Other(e.into()))?.to_bytes()).to_string()
});
}).into());
return;
}

let mut response: ListResponse = quick_xml::de::from_reader(
response
.collect()
.await
.map_err(|e| Error::Other(e.into()))?
.map_err(S3Error::from)?
.aggregate().reader()
).map_err(|e| Error::Other(e.into()))?;
).map_err(S3Error::from)?;

next_token = response.next_continuation_token.take();

Expand All @@ -178,19 +182,27 @@ impl Fs for AmazonS3 {

async fn remove(&self, path: &Path) -> Result<(), Error> {
let mut url = Url::from_str(self.options.endpoint.as_str())
.map_err(|e| Error::InvalidUrl(e.into()))?;
.map_err(|e| S3Error::from(HttpError::from(e)))?;
url.set_path(path.as_ref());

let mut request = Request::builder()
.method(Method::DELETE)
.uri(url.as_str())
.body(Empty::<Bytes>::new())?;
request.sign(&self.options).await?;
let response = self.client.send_request(request).await?;
.body(Empty::<Bytes>::new())
.map_err(|e| S3Error::from(HttpError::from(e)))?;
request
.sign(&self.options)
.await
.map_err(S3Error::from)?;
let response = self
.client
.send_request(request)
.await
.map_err(S3Error::from)?;

if !response.status().is_success() {
return Err(Error::HttpNotSuccess {
status_code: response.status(),
return Err(S3Error::from(HttpError::HttpNotSuccess {
status: response.status(),
body: String::from_utf8_lossy(
&response
.collect()
Expand All @@ -199,7 +211,8 @@ impl Fs for AmazonS3 {
.to_bytes(),
)
.to_string(),
});
})
.into());
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
mod credential;
mod error;
#[cfg(feature = "fs")]
pub mod fs;
pub(crate) mod options;
mod s3;
pub(crate) mod sign;

pub use error::S3Error;
pub use s3::S3File;

const STRICT_ENCODE_SET: percent_encoding::AsciiSet = percent_encoding::NON_ALPHANUMERIC
Expand Down
File renamed without changes.
Loading

0 comments on commit f469e8b

Please sign in to comment.