Skip to content

Commit

Permalink
feat: support parquet lru reader (#215)
Browse files Browse the repository at this point in the history
* feat: support parquet lru reader

* refactor: use parquet lru reader as trait object

* rename
  • Loading branch information
ethe authored Nov 19, 2024
1 parent d26bd9f commit 4844cb8
Show file tree
Hide file tree
Showing 14 changed files with 450 additions and 49 deletions.
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
workspace = { members = ["tonbo_macros"] }
workspace = { members = ["parquet-lru", "tonbo_macros"] }

[package]
description = "An embedded persistent KV database in Rust."
Expand Down Expand Up @@ -83,7 +83,7 @@ fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "80389936
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
lockable = "0.0.8"
lockable = "0.1.1"
once_cell = "1"
parquet = { version = "53", default-features = false, features = [
"async",
Expand All @@ -93,14 +93,15 @@ parquet = { version = "53", default-features = false, features = [
"lz4",
"snap",
] }
parquet-lru = { version = "0.2.0", path = "parquet-lru" }
pin-project-lite = "0.2"
regex = "1"
thiserror = "1"
thiserror = "2.0.3"
tokio = { version = "1", features = ["io-util"], default-features = false }
tokio-util = { version = "0.7" }
tonbo_macros = { version = "0.2.0", path = "tonbo_macros" }
tracing = "0.1"
ulid = "1"
ulid = { version = "1", features = ["serde"] }

# Only used for benchmarks
log = "0.4.22"
Expand Down
22 changes: 22 additions & 0 deletions parquet-lru/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
description = "Implement LRU cache reader for parquet::arrow::async_reader::AsyncFileReader."
documentation = "https://docs.rs/parquet-lru"
edition = "2021"
license = "Apache-2.0"
name = "parquet-lru"
version = "0.2.0"

[package.metadata.docs.rs]
all-features = true

[features]
default = []
foyer = ["dep:foyer", "dep:serde"]

[dependencies]
bytes = { version = "1.8.0", features = ["serde"] }
foyer = { version = "0.12.2", optional = true }
futures-core = "0.3.31"
futures-util = "0.3.31"
parquet = { version = "53.2.0", features = ["async"] }
serde = { version = "1.0.214", optional = true }
49 changes: 49 additions & 0 deletions parquet-lru/src/dyn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::{ops::Range, sync::Arc};

use bytes::Bytes;
use futures_core::future::BoxFuture;
use parquet::{
arrow::async_reader::AsyncFileReader, errors::Result, file::metadata::ParquetMetaData,
};

use crate::LruCache;

pub struct BoxedFileReader {
inner: Box<dyn AsyncFileReader>,
}

impl BoxedFileReader {
pub fn new<T: AsyncFileReader + 'static>(inner: T) -> Self {
Self {
inner: Box::new(inner),
}
}
}

impl AsyncFileReader for BoxedFileReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
self.inner.get_bytes(range)
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
self.inner.get_metadata()
}

fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
self.inner.get_byte_ranges(ranges)
}
}

pub trait DynLruCache<K> {
fn get_reader(&self, key: K, reader: BoxedFileReader) -> BoxFuture<'_, BoxedFileReader>;
}

impl<K, C> DynLruCache<K> for C
where
K: 'static + Send,
C: LruCache<K> + Sized + Send + Sync,
{
fn get_reader(&self, key: K, reader: BoxedFileReader) -> BoxFuture<'_, BoxedFileReader> {
Box::pin(async move { BoxedFileReader::new(self.get_reader(key, reader).await) })
}
}
145 changes: 145 additions & 0 deletions parquet-lru/src/foyer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use std::{hash::Hash, ops::Range, sync::Arc};

use bytes::Bytes;
use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use parquet::{
arrow::async_reader::AsyncFileReader,
errors::{ParquetError, Result},
file::metadata::ParquetMetaData,
};
use serde::{Deserialize, Serialize};

use crate::LruCache;

#[derive(Clone)]
pub struct FoyerCache<K>
where
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
{
inner: Arc<FoyerCacheInner<K>>,
}

pub struct FoyerCacheInner<K>
where
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
{
meta: foyer::Cache<K, Arc<ParquetMetaData>>,
data: foyer::HybridCache<(K, Range<usize>), Bytes>,
}

impl<K> LruCache<K> for FoyerCache<K>
where
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static,
{
type LruReader<R>
= FoyerReader<K, R>
where
R: AsyncFileReader + 'static;

async fn get_reader<R>(&self, key: K, reader: R) -> FoyerReader<K, R>
where
R: AsyncFileReader,
{
FoyerReader::new(self.clone(), key, reader)
}
}

pub struct FoyerReader<K, R>
where
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
{
cache: FoyerCache<K>,
key: K,
reader: R,
}

impl<K, R> FoyerReader<K, R>
where
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
R: AsyncFileReader,
{
fn new(cache: FoyerCache<K>, key: K, reader: R) -> Self {
Self { cache, key, reader }
}
}

impl<K, R> AsyncFileReader for FoyerReader<K, R>
where
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static,
R: AsyncFileReader,
{
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
async move {
if let Some(data) = self
.cache
.inner
.data
.get(&(self.key.clone(), range.clone()))
.await
.map_err(|e| ParquetError::External(e.into()))?
{
Ok(data.value().clone())
} else {
let data = self.reader.get_bytes(range.clone()).await?;
self.cache
.inner
.data
.insert((self.key.clone(), range), data.clone());
Ok(data)
}
}
.boxed()
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
async move {
if let Some(meta) = self.cache.inner.meta.get(&self.key) {
Ok(meta.value().clone())
} else {
let meta = self.reader.get_metadata().await?;
self.cache.inner.meta.insert(self.key.clone(), meta.clone());
Ok(meta)
}
}
.boxed()
}

fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
async move {
let mut missed = Vec::with_capacity(ranges.len());
let mut results = Vec::with_capacity(ranges.len());
for (id, range) in ranges.iter().enumerate() {
if let Some(data) = self
.cache
.inner
.data
.get(&(self.key.clone(), range.clone()))
.await
.map_err(|e| ParquetError::External(e.into()))?
{
results.push((id, data.value().clone()));
} else {
missed.push((id, range));
}
}
if !missed.is_empty() {
let data = self
.reader
.get_byte_ranges(missed.iter().map(|&(_, r)| r.clone()).collect())
.await?;
for (id, range) in missed {
let data = data[id].clone();
self.cache
.inner
.data
.insert((self.key.clone(), range.clone()), data.clone());
results.push((id, data));
}
}
results.sort_by_key(|(id, _)| *id);
Ok(results.into_iter().map(|(_, data)| data).collect())
}
.boxed()
}
}
57 changes: 57 additions & 0 deletions parquet-lru/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
mod r#dyn;
#[cfg(feature = "foyer")]
pub mod foyer;

use std::{future::Future, marker::PhantomData};

use parquet::arrow::async_reader::AsyncFileReader;

pub use crate::r#dyn::*;

pub trait LruCache<K>
where
K: 'static,
{
type LruReader<R>: AsyncFileReader + 'static
where
R: AsyncFileReader + 'static;

fn get_reader<R>(&self, key: K, reader: R) -> impl Future<Output = Self::LruReader<R>> + Send
where
R: AsyncFileReader + 'static;
}

#[derive(Default)]
pub struct NoCache<K> {
_phantom: PhantomData<K>,
}

impl<K> Clone for NoCache<K> {
fn clone(&self) -> Self {
Self {
_phantom: PhantomData,
}
}
}

unsafe impl<K> Send for NoCache<K> {}

unsafe impl<K> Sync for NoCache<K> {}

impl<K> LruCache<K> for NoCache<K>
where
K: 'static,
{
type LruReader<R>
= R
where
R: AsyncFileReader + 'static;

#[allow(clippy::manual_async_fn)]
fn get_reader<R>(&self, _key: K, reader: R) -> impl Future<Output = R> + Send
where
R: AsyncFileReader,
{
async move { reader }
}
}
16 changes: 13 additions & 3 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
version::{
edit::VersionEdit, set::VersionSet, TransactionTs, Version, VersionError, MAX_LEVEL,
},
DbOption, Schema,
DbOption, ParquetLru, Schema,
};

#[derive(Debug)]
Expand Down Expand Up @@ -59,7 +59,10 @@ where
}
}

pub(crate) async fn check_then_compaction(&mut self) -> Result<(), CompactionError<R>> {
pub(crate) async fn check_then_compaction(
&mut self,
parquet_lru: ParquetLru,
) -> Result<(), CompactionError<R>> {
let mut guard = self.schema.write().await;

guard.trigger.reset();
Expand Down Expand Up @@ -107,6 +110,7 @@ where
&mut delete_gens,
&guard.record_instance,
&self.manager,
parquet_lru,
)
.await?;
}
Expand Down Expand Up @@ -194,6 +198,7 @@ where
delete_gens: &mut Vec<(FileId, usize)>,
instance: &RecordInstance,
manager: &StoreManager,
parquet_lru: ParquetLru,
) -> Result<(), CompactionError<R>> {
let mut level = 0;

Expand All @@ -219,7 +224,7 @@ where
.await?;

streams.push(ScanStream::SsTable {
inner: SsTable::open(file)
inner: SsTable::open(parquet_lru.clone(), scope.gen, file)
.await?
.scan(
(Bound::Unbounded, Bound::Unbounded),
Expand All @@ -242,6 +247,7 @@ where
None,
ProjectionMask::all(),
level_fs.clone(),
parquet_lru.clone(),
)
.ok_or(CompactionError::EmptyLevel)?;

Expand All @@ -262,6 +268,7 @@ where
None,
ProjectionMask::all(),
level_fs.clone(),
parquet_lru.clone(),
)
.ok_or(CompactionError::EmptyLevel)?;

Expand Down Expand Up @@ -512,6 +519,7 @@ pub(crate) mod tests {
use fusio_dispatch::FsOptions;
use fusio_parquet::writer::AsyncWriter;
use parquet::arrow::AsyncArrowWriter;
use parquet_lru::NoCache;
use tempfile::TempDir;

use crate::{
Expand Down Expand Up @@ -809,6 +817,7 @@ pub(crate) mod tests {
&mut vec![],
&RecordInstance::Normal,
&manager,
Arc::new(NoCache::default()),
)
.await
.unwrap();
Expand Down Expand Up @@ -1200,6 +1209,7 @@ pub(crate) mod tests {
&mut vec![],
&RecordInstance::Normal,
&manager,
Arc::new(NoCache::default()),
)
.await
.unwrap();
Expand Down
Loading

0 comments on commit 4844cb8

Please sign in to comment.