Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Nov 19, 2024
1 parent 11e4cfd commit fc41760
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 40 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ parquet = { version = "53", default-features = false, features = [
"lz4",
"snap",
] }
parquet-lru = { version = "0.1.0", path = "parquet-lru" }
parquet-lru = { version = "0.2.0", path = "parquet-lru" }
pin-project-lite = "0.2"
regex = "1"
thiserror = "2.0.3"
Expand Down
7 changes: 5 additions & 2 deletions parquet-lru/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
[package]
description = "Implement LRU cache for parquet::arrow::async_reader::AsyncFileReader."
description = "Implement LRU cache reader for parquet::arrow::async_reader::AsyncFileReader."
documentation = "https://docs.rs/parquet-lru"
edition = "2021"
license = "Apache-2.0"
name = "parquet-lru"
version = "0.1.0"
version = "0.2.0"

[package.metadata.docs.rs]
all-features = true

[features]
default = []
Expand Down
12 changes: 6 additions & 6 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ where

pub(crate) async fn check_then_compaction(
&mut self,
parquet_lru_cache: ParquetLru,
parquet_lru: ParquetLru,
) -> Result<(), CompactionError<R>> {
let mut guard = self.schema.write().await;

Expand Down Expand Up @@ -110,7 +110,7 @@ where
&mut delete_gens,
&guard.record_instance,
&self.manager,
parquet_lru_cache,
parquet_lru,
)
.await?;
}
Expand Down Expand Up @@ -198,7 +198,7 @@ where
delete_gens: &mut Vec<(FileId, usize)>,
instance: &RecordInstance,
manager: &StoreManager,
parquet_cache: ParquetLru,
parquet_lru: ParquetLru,
) -> Result<(), CompactionError<R>> {
let mut level = 0;

Expand All @@ -224,7 +224,7 @@ where
.await?;

streams.push(ScanStream::SsTable {
inner: SsTable::open(parquet_cache.clone(), scope.gen, file)
inner: SsTable::open(parquet_lru.clone(), scope.gen, file)
.await?
.scan(
(Bound::Unbounded, Bound::Unbounded),
Expand All @@ -247,7 +247,7 @@ where
None,
ProjectionMask::all(),
level_fs.clone(),
parquet_cache.clone(),
parquet_lru.clone(),
)
.ok_or(CompactionError::EmptyLevel)?;

Expand All @@ -268,7 +268,7 @@ where
None,
ProjectionMask::all(),
level_fs.clone(),
parquet_cache.clone(),
parquet_lru.clone(),
)
.ok_or(CompactionError::EmptyLevel)?;

Expand Down
26 changes: 13 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ where
version_set: VersionSet<R>,
lock_map: LockMap<R::Key>,
manager: Arc<StoreManager>,
parquet_lru_cache: ParquetLru,
parquet_lru: ParquetLru,
_p: PhantomData<E>,
}

Expand Down Expand Up @@ -309,7 +309,7 @@ where
version_set,
lock_map: Arc::new(Default::default()),
manager,
parquet_lru_cache: lru_cache,
parquet_lru: lru_cache,
_p: Default::default(),
})
}
Expand All @@ -324,7 +324,7 @@ where
self.schema.read().await,
self.version_set.current().await,
self.manager.clone(),
self.parquet_lru_cache.clone(),
self.parquet_lru.clone(),
)
}

Expand Down Expand Up @@ -381,7 +381,7 @@ where
key,
self.version_set.load_ts(),
Projection::All,
self.parquet_lru_cache.clone(),
self.parquet_lru.clone(),
)
.await?
.and_then(|entry| {
Expand Down Expand Up @@ -409,7 +409,7 @@ where
self.version_set.load_ts(),
&*current,
Box::new(|_| None),
self.parquet_lru_cache.clone(),
self.parquet_lru.clone(),
).take().await?;

while let Some(record) = scan.next().await {
Expand Down Expand Up @@ -596,7 +596,7 @@ where
key: &'get R::Key,
ts: Timestamp,
projection: Projection,
parquet_cache: ParquetLru,
parquet_lru: ParquetLru,
) -> Result<Option<Entry<'get, R>>, DbError<R>> {
if let Some(entry) = self.mutable.get(key, ts) {
return Ok(Some(Entry::Mutable(entry)));
Expand Down Expand Up @@ -630,7 +630,7 @@ where
manager,
TimestampedRef::new(key, ts),
projection,
parquet_cache,
parquet_lru,
)
.await?
.map(|entry| Entry::RecordBatch(entry)))
Expand Down Expand Up @@ -671,7 +671,7 @@ where
projection_indices: Option<Vec<usize>>,
projection: ProjectionMask,

parquet_cache: ParquetLru,
parquet_lru: ParquetLru,
}

impl<'scan, 'range, R> Scan<'scan, 'range, R>
Expand All @@ -687,7 +687,7 @@ where
fn_pre_stream: Box<
dyn FnOnce(Option<ProjectionMask>) -> Option<ScanStream<'scan, R>> + Send + 'scan,
>,
parquet_cache: ParquetLru,
parquet_lru: ParquetLru,
) -> Self {
Self {
schema,
Expand All @@ -700,7 +700,7 @@ where
limit: None,
projection_indices: None,
projection: ProjectionMask::all(),
parquet_cache,
parquet_lru,
}
}

Expand Down Expand Up @@ -776,7 +776,7 @@ where
self.ts,
self.limit,
self.projection,
self.parquet_cache,
self.parquet_lru,
)
.await?;

Expand Down Expand Up @@ -829,7 +829,7 @@ where
self.ts,
self.limit,
self.projection,
self.parquet_cache,
self.parquet_lru,
)
.await?;
let merge_stream = MergeStream::from_vec(streams, self.ts).await?;
Expand Down Expand Up @@ -1350,7 +1350,7 @@ pub(crate) mod tests {
version_set,
lock_map: Arc::new(Default::default()),
manager,
parquet_lru_cache: Arc::new(NoCache::default()),
parquet_lru: Arc::new(NoCache::default()),
_p: Default::default(),
})
}
Expand Down
12 changes: 6 additions & 6 deletions src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ where
share: RwLockReadGuard<'s, Schema<R>>,
version: VersionRef<R>,
manager: Arc<StoreManager>,
parquet_cache: ParquetLru,
parquet_lru: ParquetLru,
}

impl<'s, R> Snapshot<'s, R>
Expand All @@ -41,7 +41,7 @@ where
key,
self.ts,
projection,
self.parquet_cache.clone(),
self.parquet_lru.clone(),
)
.await?
.and_then(|entry| {
Expand All @@ -64,22 +64,22 @@ where
self.ts,
&self.version,
Box::new(move |_: Option<ProjectionMask>| None),
self.parquet_cache.clone(),
self.parquet_lru.clone(),
)
}

pub(crate) fn new(
share: RwLockReadGuard<'s, Schema<R>>,
version: VersionRef<R>,
manager: Arc<StoreManager>,
parquet_cache: ParquetLru,
parquet_lru: ParquetLru,
) -> Self {
Self {
ts: version.load_ts(),
share,
version,
manager,
parquet_cache,
parquet_lru,
}
}

Expand Down Expand Up @@ -109,7 +109,7 @@ where
self.ts,
&self.version,
fn_pre_stream,
self.parquet_cache.clone(),
self.parquet_lru.clone(),
)
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/stream/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ where
status: FutureStatus<'level, R>,
fs: Arc<dyn DynFs>,
path: Option<Path>,
parquet_cache: Arc<dyn DynLruCache<Ulid> + Send + Sync>,
parquet_lru: Arc<dyn DynLruCache<Ulid> + Send + Sync>,
}

impl<'level, R> LevelStream<'level, R>
Expand All @@ -77,7 +77,7 @@ where
limit: Option<usize>,
projection_mask: ProjectionMask,
fs: Arc<dyn DynFs>,
parquet_cache: Arc<dyn DynLruCache<Ulid> + Send + Sync>,
parquet_lru: Arc<dyn DynLruCache<Ulid> + Send + Sync>,
) -> Option<Self> {
let (lower, upper) = range;
let mut gens: VecDeque<FileId> = version.level_slice[level][start..end + 1]
Expand All @@ -99,7 +99,7 @@ where
status,
fs,
path: None,
parquet_cache,
parquet_lru,
})
}
}
Expand Down Expand Up @@ -175,7 +175,7 @@ where
Poll::Ready(Ok(file)) => {
let id = *id;
self.status = FutureStatus::OpenSst(Box::pin(SsTable::open(
self.parquet_cache.clone(),
self.parquet_lru.clone(),
id,
file,
)));
Expand Down
16 changes: 8 additions & 8 deletions src/version/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ where
manager: &StoreManager,
key: &TimestampedRef<R::Key>,
projection_mask: ProjectionMask,
parquet_cache: ParquetLru,
parquet_lru: ParquetLru,
) -> Result<Option<RecordBatchEntry<R>>, VersionError<R>> {
let level_0_path = self
.option
Expand All @@ -139,7 +139,7 @@ where
0,
scope.gen,
projection_mask.clone(),
parquet_cache.clone(),
parquet_lru.clone(),
)
.await?
{
Expand Down Expand Up @@ -167,7 +167,7 @@ where
leve,
sort_runs[index].gen,
projection_mask.clone(),
parquet_cache.clone(),
parquet_lru.clone(),
)
.await?
{
Expand All @@ -185,7 +185,7 @@ where
level: usize,
gen: FileId,
projection_mask: ProjectionMask,
parquet_cache: ParquetLru,
parquet_lru: ParquetLru,
) -> Result<Option<RecordBatchEntry<R>>, VersionError<R>> {
let file = store
.open_options(
Expand All @@ -194,7 +194,7 @@ where
)
.await
.map_err(VersionError::Fusio)?;
SsTable::<R>::open(parquet_cache, gen, file)
SsTable::<R>::open(parquet_lru, gen, file)
.await?
.get(key, projection_mask)
.await
Expand All @@ -220,7 +220,7 @@ where
ts: Timestamp,
limit: Option<usize>,
projection_mask: ProjectionMask,
parquet_cache: ParquetLru,
parquet_lru: ParquetLru,
) -> Result<(), VersionError<R>> {
let level_0_path = self
.option
Expand All @@ -238,7 +238,7 @@ where
)
.await
.map_err(VersionError::Fusio)?;
let table = SsTable::open(parquet_cache.clone(), scope.gen, file).await?;
let table = SsTable::open(parquet_lru.clone(), scope.gen, file).await?;

streams.push(ScanStream::SsTable {
inner: table
Expand Down Expand Up @@ -283,7 +283,7 @@ where
limit,
projection_mask.clone(),
level_fs.clone(),
parquet_cache.clone(),
parquet_lru.clone(),
)
.unwrap(),
});
Expand Down

0 comments on commit fc41760

Please sign in to comment.