Skip to content

Commit

Permalink
Incremental view maintenance (datafusion-contrib#3)
Browse files Browse the repository at this point in the history
* port MV dependency code

* improved documentation

* fix spelling mistake

* fix typo

* don't forget license header

* readme

* explain what UDTF means

* fix typo in readme
  • Loading branch information
suremarc authored Dec 27, 2024
1 parent b7974e8 commit 14a3518
Show file tree
Hide file tree
Showing 9 changed files with 1,795 additions and 93 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ datafusion-common = "43"
datafusion-expr = "43"
datafusion-functions = "43"
datafusion-functions-aggregate = "43"
datafusion-optimizer = "43"
datafusion-physical-expr = "43"
datafusion-physical-plan = "43"
datafusion-sql = "43"
Expand Down
55 changes: 54 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,54 @@
# datafusion-materialized-views
# datafusion-materialized-views

An implementation of incremental view maintenance & query rewriting for materialized views in DataFusion.

A **materialized view** is a view whose query has been pre-computed and saved for later use. This can drastically speed up workloads by pre-computing at least a large fragment of a user-provided query. Furthermore, by implementing a _view matching_ algorithm, we can implement an optimizer that rewrites queries to automatically make use of materialized views where possible and beneficial, a concept known as *query rewriting*.

Efficiently maintaining the up-to-dateness of a materialized view is a problem known as *incremental view maintenance*. It is a hard problem in general, but we make some simplifying assumptions:

* Data is stored as Hive-partitioned files in object storage.
* The smallest unit of data that can be updated is a single file.

This is a typical pattern with DataFusion, as files in object storage usually are immutable (especially if they are Parquet) and can only be replaced, not appended to or modified. However, it does mean that our implementation of incremental view maintenance only works for Hive-partitioned materialized views in object storage. (Future work may generalize this to alternate storage sources, but the requirement of logically partitioned tables remains.) In contrast, the view matching problem does not depend on the underlying physical representation of the tables.

## Example

Here we walk through a hypothetical example of setting up a materialized view, to illustrate
what this library offers. The core of the incremental view maintenance implementation is a UDTF (User-Defined Table Function),
called `file_dependencies`, that outputs a build graph for a materialized view. This gives users the information they need to determine
when partitions of the materialized view need to be recomputed.

```sql
-- Create a base table
CREATE EXTERNAL TABLE t1 (column0 TEXT, date DATE)
STORED AS PARQUET
PARTITIONED BY (date)
LOCATION 's3://t1/';

INSERT INTO t1 VALUES
('a', '2021-01-01'),
('b', '2022-02-02'),
('c', '2022-02-03'), -- Two values in the year 2022
('d', '2023-03-03');

-- Pretend we can create materialized views in SQL
-- The TableProvider implementation will need to implement the Materialized trait.
CREATE MATERIALIZED VIEW m1 AS SELECT
COUNT(*) AS count,
date_part('YEAR', date) AS year
PARTITIONED BY (year)
LOCATION 's3://m1/';

-- Show the dependency graph for m1 using the file_dependencies UDTF
SELECT * FROM file_dependencies('m1');

+--------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+
| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |
+--------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+
| s3://m1/year=2021/ | datafusion | public | t1 | s3://t1/date=2021-01-01/data.parquet | 2023-07-11T16:29:26 |
| s3://m1/year=2022/ | datafusion | public | t1 | s3://t1/date=2022-02-02/data.parquet | 2023-07-11T16:45:22 |
| s3://m1/year=2022/ | datafusion | public | t1 | s3://t1/date=2022-02-03/data.parquet | 2023-07-11T16:45:44 |
| s3://m1/year=2023/ | datafusion | public | t1 | s3://t1/date=2023-03-03/data.parquet | 2023-07-11T16:45:44 |
+--------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+
```

19 changes: 14 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,20 @@
/// By analyzing the fragment of the materialized view query pertaining to the partition columns,
/// we can derive a build graph that relates the files of a materialized views and the files of the tables it depends on.
///
/// A central trait is defined for Hive-partitioned tables, [`ListingTableLike`](materialized::ListingTableLike). Note that
/// all implementations of [`ListingTableLike`](materialized::ListingTableLike) must be registered using the
/// [`register_listing_table`](materialized::register_listing_table) function, otherwise the tables may not be detected by
/// the incremental view maintenance code, including components such as [`FileMetadata`](materialized::file_metadata::FileMetadata)
/// or [`RowMetadataRegistry`](materialized::row_metadata::RowMetadataRegistry).
/// Two central traits are defined:
///
/// * [`ListingTableLike`](materialized::ListingTableLike): a trait that abstracts Hive-partitioned tables in object storage;
/// * [`Materialized`](materialized::Materialized): a materialized `ListingTableLike` defined by a user-provided query.
///
/// Note that all implementations of `ListingTableLike` and `Materialized` must be registered using the
/// [`register_listing_table`](materialized::register_listing_table) and
/// [`register_materialized`](materialized::register_materialized) functions respectively,
/// otherwise the tables may not be detected by the incremental view maintenance code,
/// including components such as [`FileMetadata`](materialized::file_metadata::FileMetadata),
/// [`RowMetadataRegistry`](materialized::row_metadata::RowMetadataRegistry), or the
/// [`file_dependencies`](materialized::dependencies::file_dependencies) UDTF.
///
/// By default, `ListingTableLike` is implemented for [`ListingTable`](datafusion::datasource::listing::ListingTable),
pub mod materialized;

/// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views.
Expand Down
53 changes: 47 additions & 6 deletions src/materialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

/// Track dependencies of materialized data in object storage
mod dependencies;
pub mod dependencies;

/// Pluggable metadata sources for incremental view maintenance
pub mod row_metadata;
Expand All @@ -27,6 +27,9 @@ pub mod file_metadata;
/// A UDF that parses Hive partition elements from object storage paths.
mod hive_partition;

/// Some private utility functions
mod util;

use std::{
any::{type_name, Any, TypeId},
fmt::Debug,
Expand Down Expand Up @@ -78,7 +81,7 @@ impl ListingTableLike for ListingTable {

/// Register a [`ListingTableLike`] implementation in this registry.
/// This allows `cast_to_listing_table` to easily downcast a [`TableProvider`]
/// into a [`ListingTableLike`] where possible.
/// into a `ListingTableLike` where possible.
pub fn register_listing_table<T: ListingTableLike>() {
TABLE_TYPE_REGISTRY.register_listing_table::<T>();
}
Expand All @@ -95,15 +98,31 @@ pub trait Materialized: ListingTableLike {
fn query(&self) -> LogicalPlan;
}

/// Register a [`Materialized`] implementation in this registry.
/// This allows `cast_to_materialized` to easily downcast a [`TableProvider`]
/// into a `Materialized` where possible.
///
/// Note that this will also register `T` as a [`ListingTableLike`].
pub fn register_materialized<T: Materialized>() {
TABLE_TYPE_REGISTRY.register_materialized::<T>();
}

/// Attempt to cast the given TableProvider into a [`Materialized`].
/// If the table's type has not been registered using [`register_materialized`], will return `None`.
pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> {
TABLE_TYPE_REGISTRY.cast_to_materialized(table)
}

type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;

/// A registry for implementations of [`ListingTableLike`], used for downcasting
/// arbitrary TableProviders into `dyn ListingTableLike` where possible.
/// A registry for implementations of library-defined traits, used for downcasting
/// arbitrary TableProviders into `ListingTableLike` and `Materialized` trait objects where possible.
///
/// This is used throughout the crate as a singleton to store all known implementations of `ListingTableLike`.
/// By default, [`ListingTable`] is registered.
/// This is used throughout the crate as a singleton to store all known implementations of `ListingTableLike` and `Materialized`.
/// By default, [`ListingTable`] is registered as a `ListingTableLike`.
struct TableTypeRegistry {
listing_table_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn ListingTableLike>)>,
materialized_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Materialized>)>,
}

impl Debug for TableTypeRegistry {
Expand All @@ -125,6 +144,7 @@ impl Default for TableTypeRegistry {
fn default() -> Self {
let new = Self {
listing_table_accessors: DashMap::new(),
materialized_accessors: DashMap::new(),
};
new.register_listing_table::<ListingTable>();

Expand All @@ -143,6 +163,18 @@ impl TableTypeRegistry {
);
}

fn register_materialized<T: Materialized>(&self) {
self.materialized_accessors.insert(
TypeId::of::<T>(),
(
type_name::<T>(),
Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn Materialized)),
),
);

self.register_listing_table::<T>();
}

fn cast_to_listing_table<'a>(
&'a self,
table: &'a dyn TableProvider,
Expand All @@ -151,4 +183,13 @@ impl TableTypeRegistry {
.get(&table.as_any().type_id())
.and_then(|r| r.value().1(table.as_any()))
}

fn cast_to_materialized<'a>(
&'a self,
table: &'a dyn TableProvider,
) -> Option<&'a dyn Materialized> {
self.materialized_accessors
.get(&table.as_any().type_id())
.and_then(|r| r.value().1(table.as_any()))
}
}
Loading

0 comments on commit 14a3518

Please sign in to comment.