From 14a35187acb25bbcb2240ebe72b4ee3b61e52f89 Mon Sep 17 00:00:00 2001 From: Matthew Cramerus <8771538+suremarc@users.noreply.github.com> Date: Fri, 27 Dec 2024 11:19:39 -0600 Subject: [PATCH] Incremental view maintenance (#3) * port MV dependency code * improved documentation * fix spelling mistake * fix typo * don't forget license header * readme * explain what UDTF means * fix typo in readme --- Cargo.toml | 1 + README.md | 55 +- src/lib.rs | 19 +- src/materialized.rs | 53 +- src/materialized/dependencies.rs | 1511 ++++++++++++++++++++++++++++ src/materialized/file_metadata.rs | 78 +- src/materialized/hive_partition.rs | 2 +- src/materialized/row_metadata.rs | 128 ++- src/materialized/util.rs | 41 + 9 files changed, 1795 insertions(+), 93 deletions(-) create mode 100644 src/materialized/util.rs diff --git a/Cargo.toml b/Cargo.toml index ff06e56..00ad412 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ datafusion-common = "43" datafusion-expr = "43" datafusion-functions = "43" datafusion-functions-aggregate = "43" +datafusion-optimizer = "43" datafusion-physical-expr = "43" datafusion-physical-plan = "43" datafusion-sql = "43" diff --git a/README.md b/README.md index 50caf96..aaf4512 100644 --- a/README.md +++ b/README.md @@ -1 +1,54 @@ -# datafusion-materialized-views \ No newline at end of file +# datafusion-materialized-views + +An implementation of incremental view maintenance & query rewriting for materialized views in DataFusion. + +A **materialized view** is a view whose query has been pre-computed and saved for later use. This can drastically speed up workloads by pre-computing at least a large fragment of a user-provided query. Furthermore, by implementing a _view matching_ algorithm, we can implement an optimizer that rewrites queries to automatically make use of materialized views where possible and beneficial, a concept known as *query rewriting*. + +Efficiently maintaining the up-to-dateness of a materialized view is a problem known as *incremental view maintenance*. It is a hard problem in general, but we make some simplifying assumptions: + +* Data is stored as Hive-partitioned files in object storage. +* The smallest unit of data that can be updated is a single file. + +This is a typical pattern with DataFusion, as files in object storage usually are immutable (especially if they are Parquet) and can only be replaced, not appended to or modified. However, it does mean that our implementation of incremental view maintenance only works for Hive-partitioned materialized views in object storage. (Future work may generalize this to alternate storage sources, but the requirement of logically partitioned tables remains.) In contrast, the view matching problem does not depend on the underlying physical representation of the tables. + +## Example + +Here we walk through a hypothetical example of setting up a materialized view, to illustrate +what this library offers. The core of the incremental view maintenance implementation is a UDTF (User-Defined Table Function), +called `file_dependencies`, that outputs a build graph for a materialized view. This gives users the information they need to determine +when partitions of the materialized view need to be recomputed. + +```sql +-- Create a base table +CREATE EXTERNAL TABLE t1 (column0 TEXT, date DATE) +STORED AS PARQUET +PARTITIONED BY (date) +LOCATION 's3://t1/'; + +INSERT INTO t1 VALUES +('a', '2021-01-01'), +('b', '2022-02-02'), +('c', '2022-02-03'), -- Two values in the year 2022 +('d', '2023-03-03'); + +-- Pretend we can create materialized views in SQL +-- The TableProvider implementation will need to implement the Materialized trait. +CREATE MATERIALIZED VIEW m1 AS SELECT + COUNT(*) AS count, + date_part('YEAR', date) AS year +PARTITIONED BY (year) +LOCATION 's3://m1/'; + +-- Show the dependency graph for m1 using the file_dependencies UDTF +SELECT * FROM file_dependencies('m1'); + ++--------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+ +| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified | ++--------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+ +| s3://m1/year=2021/ | datafusion | public | t1 | s3://t1/date=2021-01-01/data.parquet | 2023-07-11T16:29:26 | +| s3://m1/year=2022/ | datafusion | public | t1 | s3://t1/date=2022-02-02/data.parquet | 2023-07-11T16:45:22 | +| s3://m1/year=2022/ | datafusion | public | t1 | s3://t1/date=2022-02-03/data.parquet | 2023-07-11T16:45:44 | +| s3://m1/year=2023/ | datafusion | public | t1 | s3://t1/date=2023-03-03/data.parquet | 2023-07-11T16:45:44 | ++--------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+ +``` + diff --git a/src/lib.rs b/src/lib.rs index 766c5ef..e8f2c77 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,11 +25,20 @@ /// By analyzing the fragment of the materialized view query pertaining to the partition columns, /// we can derive a build graph that relates the files of a materialized views and the files of the tables it depends on. /// -/// A central trait is defined for Hive-partitioned tables, [`ListingTableLike`](materialized::ListingTableLike). Note that -/// all implementations of [`ListingTableLike`](materialized::ListingTableLike) must be registered using the -/// [`register_listing_table`](materialized::register_listing_table) function, otherwise the tables may not be detected by -/// the incremental view maintenance code, including components such as [`FileMetadata`](materialized::file_metadata::FileMetadata) -/// or [`RowMetadataRegistry`](materialized::row_metadata::RowMetadataRegistry). +/// Two central traits are defined: +/// +/// * [`ListingTableLike`](materialized::ListingTableLike): a trait that abstracts Hive-partitioned tables in object storage; +/// * [`Materialized`](materialized::Materialized): a materialized `ListingTableLike` defined by a user-provided query. +/// +/// Note that all implementations of `ListingTableLike` and `Materialized` must be registered using the +/// [`register_listing_table`](materialized::register_listing_table) and +/// [`register_materialized`](materialized::register_materialized) functions respectively, +/// otherwise the tables may not be detected by the incremental view maintenance code, +/// including components such as [`FileMetadata`](materialized::file_metadata::FileMetadata), +/// [`RowMetadataRegistry`](materialized::row_metadata::RowMetadataRegistry), or the +/// [`file_dependencies`](materialized::dependencies::file_dependencies) UDTF. +/// +/// By default, `ListingTableLike` is implemented for [`ListingTable`](datafusion::datasource::listing::ListingTable), pub mod materialized; /// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views. diff --git a/src/materialized.rs b/src/materialized.rs index 0151920..10c44d5 100644 --- a/src/materialized.rs +++ b/src/materialized.rs @@ -16,7 +16,7 @@ // under the License. /// Track dependencies of materialized data in object storage -mod dependencies; +pub mod dependencies; /// Pluggable metadata sources for incremental view maintenance pub mod row_metadata; @@ -27,6 +27,9 @@ pub mod file_metadata; /// A UDF that parses Hive partition elements from object storage paths. mod hive_partition; +/// Some private utility functions +mod util; + use std::{ any::{type_name, Any, TypeId}, fmt::Debug, @@ -78,7 +81,7 @@ impl ListingTableLike for ListingTable { /// Register a [`ListingTableLike`] implementation in this registry. /// This allows `cast_to_listing_table` to easily downcast a [`TableProvider`] -/// into a [`ListingTableLike`] where possible. +/// into a `ListingTableLike` where possible. pub fn register_listing_table() { TABLE_TYPE_REGISTRY.register_listing_table::(); } @@ -95,15 +98,31 @@ pub trait Materialized: ListingTableLike { fn query(&self) -> LogicalPlan; } +/// Register a [`Materialized`] implementation in this registry. +/// This allows `cast_to_materialized` to easily downcast a [`TableProvider`] +/// into a `Materialized` where possible. +/// +/// Note that this will also register `T` as a [`ListingTableLike`]. +pub fn register_materialized() { + TABLE_TYPE_REGISTRY.register_materialized::(); +} + +/// Attempt to cast the given TableProvider into a [`Materialized`]. +/// If the table's type has not been registered using [`register_materialized`], will return `None`. +pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> { + TABLE_TYPE_REGISTRY.cast_to_materialized(table) +} + type Downcaster = Arc Option<&T> + Send + Sync>; -/// A registry for implementations of [`ListingTableLike`], used for downcasting -/// arbitrary TableProviders into `dyn ListingTableLike` where possible. +/// A registry for implementations of library-defined traits, used for downcasting +/// arbitrary TableProviders into `ListingTableLike` and `Materialized` trait objects where possible. /// -/// This is used throughout the crate as a singleton to store all known implementations of `ListingTableLike`. -/// By default, [`ListingTable`] is registered. +/// This is used throughout the crate as a singleton to store all known implementations of `ListingTableLike` and `Materialized`. +/// By default, [`ListingTable`] is registered as a `ListingTableLike`. struct TableTypeRegistry { listing_table_accessors: DashMap)>, + materialized_accessors: DashMap)>, } impl Debug for TableTypeRegistry { @@ -125,6 +144,7 @@ impl Default for TableTypeRegistry { fn default() -> Self { let new = Self { listing_table_accessors: DashMap::new(), + materialized_accessors: DashMap::new(), }; new.register_listing_table::(); @@ -143,6 +163,18 @@ impl TableTypeRegistry { ); } + fn register_materialized(&self) { + self.materialized_accessors.insert( + TypeId::of::(), + ( + type_name::(), + Arc::new(|any| any.downcast_ref::().map(|t| t as &dyn Materialized)), + ), + ); + + self.register_listing_table::(); + } + fn cast_to_listing_table<'a>( &'a self, table: &'a dyn TableProvider, @@ -151,4 +183,13 @@ impl TableTypeRegistry { .get(&table.as_any().type_id()) .and_then(|r| r.value().1(table.as_any())) } + + fn cast_to_materialized<'a>( + &'a self, + table: &'a dyn TableProvider, + ) -> Option<&'a dyn Materialized> { + self.materialized_accessors + .get(&table.as_any().type_id()) + .and_then(|r| r.value().1(table.as_any())) + } } diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index b248758..e0baebd 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -14,3 +14,1514 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + +use datafusion::{ + catalog::CatalogProviderList, + config::{CatalogOptions, ConfigOptions}, + datasource::{function::TableFunctionImpl, TableProvider, ViewTable}, + prelude::{flatten, get_field, make_array}, +}; +use datafusion_common::{ + alias::AliasGenerator, + internal_err, + tree_node::{Transformed, TreeNode}, + DFSchema, DataFusionError, Result, ScalarValue, +}; +use datafusion_expr::{col, lit, utils::split_conjunction, Expr, LogicalPlan, TableScan}; +use datafusion_functions::string::expr_fn::{concat, concat_ws}; +use datafusion_optimizer::{analyzer::expand_wildcard_rule::ExpandWildcardRule, AnalyzerRule}; +use datafusion_sql::TableReference; +use itertools::{Either, Itertools}; +use std::{collections::HashSet, sync::Arc}; + +use crate::materialized::META_COLUMN; + +use super::{cast_to_materialized, row_metadata::RowMetadataRegistry, util, Materialized}; + +/// A table function that shows build targets and dependencies for a materialized view: +/// +/// ```ignore +/// fn file_dependencies(table_ref: Utf8) -> Table +/// ``` +/// +/// `table_ref` should point to a table provider registered for the current session +/// that implements [`Materialized`]. Otherwise the function will throw an error. +/// +/// # Example +/// +/// ```sql +/// SELECT * FROM file_dependencies('datafusion.public.m1'); +/// +/// +--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+ +/// | target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified | +/// +--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+ +/// | s3://m1/partition_column=2021/ | datafusion | public | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 | +/// | s3://m1/partition_column=2022/ | datafusion | public | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 | +/// | s3://m1/partition_column=2023/ | datafusion | public | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 | +/// +--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+ +/// ``` +pub fn file_dependencies( + catalog_list: Arc, + row_metadata_registry: Arc, + options: &ConfigOptions, +) -> Arc { + Arc::new(FileDependenciesUdtf::new( + catalog_list, + row_metadata_registry, + options, + )) +} + +#[derive(Debug)] +struct FileDependenciesUdtf { + catalog_list: Arc, + config_options: ConfigOptions, + row_metadata_registry: Arc, +} + +impl FileDependenciesUdtf { + fn new( + catalog_list: Arc, + row_metadata_registry: Arc, + options: &ConfigOptions, + ) -> Self { + Self { + catalog_list, + config_options: options.clone(), + row_metadata_registry, + } + } +} + +impl TableFunctionImpl for FileDependenciesUdtf { + fn call(&self, args: &[Expr]) -> Result> { + let table_name = get_table_name(args)?; + + let table_ref = TableReference::from(table_name).resolve( + &self.config_options.catalog.default_catalog, + &self.config_options.catalog.default_schema, + ); + + let table = util::get_table(self.catalog_list.as_ref(), &table_ref) + .map_err(|e| DataFusionError::Plan(e.to_string()))?; + + let mv = cast_to_materialized(table.as_ref()).ok_or(DataFusionError::Plan( + "file_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized".to_string(), + ))?; + + Ok(Arc::new(ViewTable::try_new( + file_dependencies_plan(mv, self.row_metadata_registry.clone(), &self.config_options)?, + None, + )?)) + } +} + +/// Extract table name from args passed to TableFunctionImpl::call() +fn get_table_name(args: &[Expr]) -> Result<&String> { + match &args[0] { + Expr::Literal(ScalarValue::Utf8(Some(table_name))) => Ok(table_name), + _ => Err(DataFusionError::Plan( + "expected a single string literal argument to file_dependencies".to_string(), + )), + } +} + +/// Returns a logical plan that, when executed, lists expected build targets +/// for this materialized view, together with the dependencies for each target. +pub fn file_dependencies_plan( + materialized_view: &dyn Materialized, + row_metadata_registry: Arc, + config_options: &ConfigOptions, +) -> Result { + use datafusion_expr::logical_plan::*; + + let plan = materialized_view.query().clone(); + + let partition_cols = materialized_view.partition_columns(); + let partition_col_indices = plan + .schema() + .fields() + .iter() + .enumerate() + .filter_map(|(i, f)| partition_cols.contains(f.name()).then_some(i)) + .collect(); + + // First expand all wildcards + let plan = ExpandWildcardRule {}.analyze(plan, config_options)?; + + // Prune non-partition columns from all table scans + let pruned_plan = pushdown_projection_inexact(plan, &partition_col_indices)?; + + // Now bubble up file metadata to the top of the plan + let pruned_plan_with_source_files = + push_up_file_metadata(pruned_plan, &config_options.catalog, row_metadata_registry)?; + + // We now have data in the following form: + // (partition_col0, partition_col1, ..., __meta) + // The last column is a list of structs containing the row metadata + // We need to unnest it + + // Find the single column with the name '__meta' + let files = pruned_plan_with_source_files + .schema() + .columns() + .into_iter() + .find(|c| c.name.starts_with(META_COLUMN)) + .ok_or_else(|| DataFusionError::Plan(format!("Plan contains no {META_COLUMN} column")))?; + let files_col = Expr::Column(files.clone()); + + LogicalPlanBuilder::from(pruned_plan_with_source_files) + .unnest_column(files)? + .project(vec![ + construct_target_path_from_partition_columns(materialized_view).alias("target"), + get_field(files_col.clone(), "table_catalog").alias("source_table_catalog"), + get_field(files_col.clone(), "table_schema").alias("source_table_schema"), + get_field(files_col.clone(), "table_name").alias("source_table_name"), + get_field(files_col.clone(), "source_uri").alias("source_uri"), + get_field(files_col.clone(), "last_modified").alias("source_last_modified"), + ])? + .distinct()? + .build() +} + +fn construct_target_path_from_partition_columns(materialized_view: &dyn Materialized) -> Expr { + let table_path = lit(materialized_view.table_paths()[0] + .as_str() + // Trim the / (we'll add it back later if we need it) + .trim_end_matches("/")); + // Construct the paths for the build targets + let mut hive_column_path_elements = materialized_view + .partition_columns() + .iter() + .map(|column_name| concat([lit(column_name.as_str()), lit("="), col(column_name)].to_vec())) + .collect::>(); + hive_column_path_elements.insert(0, table_path); + + concat(vec![ + // concat_ws doesn't work if there are < 2 elements to concat + if hive_column_path_elements.len() == 1 { + hive_column_path_elements.pop().unwrap() + } else { + concat_ws(lit("/"), hive_column_path_elements) + }, + // Always need a trailing slash on directory paths + lit("/"), + ]) +} + +/// An implementation of "inexact" projection pushdown that eliminates aggregations, windows, sorts, & limits. +/// Does not preserve order or row multiplicity and may return rows outside of the original projection. +/// However, it has the following property: +/// Let P be a projection operator. +/// If A is the original plan and A' is the result of "inexact" projection pushdown, we have PA ⊆ A'. +/// +/// The purpose is to be as aggressive as possible with projection pushdown at the sacrifice of exactness. +fn pushdown_projection_inexact(plan: LogicalPlan, indices: &HashSet) -> Result { + use datafusion_expr::logical_plan::*; + + let plan_formatted = format!("{}", plan.display()); + match plan { + LogicalPlan::Projection(Projection { expr, input, .. }) => { + let new_exprs = expr + .into_iter() + .enumerate() + .filter_map(|(i, expr)| indices.contains(&i).then_some(expr)) + .collect_vec(); + + let child_indices = new_exprs + .iter() + .flat_map(|e| e.column_refs().into_iter()) + .map(|c| input.schema().index_of_column(c).unwrap()) + .collect::>(); + + Projection::try_new( + new_exprs, + pushdown_projection_inexact(Arc::unwrap_or_clone(input), &child_indices) + .map(Arc::new)?, + ) + .map(LogicalPlan::Projection) + } + LogicalPlan::Filter(ref filter) => { + let mut indices = indices.clone(); + + let new_filter = widen_filter(&filter.predicate, &mut indices, &plan)?; + + let filter = match plan { + LogicalPlan::Filter(filter) => filter, + _ => unreachable!(), + }; + + Filter::try_new( + new_filter, + pushdown_projection_inexact(Arc::unwrap_or_clone(filter.input), &indices) + .map(Arc::new)?, + ) + .map(LogicalPlan::Filter) + } + LogicalPlan::Window(Window { + input, + window_expr: _, + .. + }) => { + // Window nodes take their input and append window expressions to the end. + // If our projection doesn't include window expressions, we can just turn + // the window into a regular projection. + let num_non_window_cols = input.schema().fields().len(); + if indices.iter().any(|&i| i >= num_non_window_cols) { + return internal_err!("Can't push down projection through window functions"); + } + + pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices) + } + LogicalPlan::Aggregate(Aggregate { + input, group_expr, .. + }) => { + // Aggregate node schemas are the GROUP BY expressions followed by the aggregate expressions. + let num_group_exprs = group_expr.len(); + if indices.iter().any(|&i| i >= num_group_exprs) { + return internal_err!("Can't push down projection through aggregate functions"); + } + + let new_exprs = group_expr + .into_iter() + .enumerate() + .filter_map(|(i, expr)| indices.contains(&i).then_some(expr)) + .collect_vec(); + + let child_indices = new_exprs + .iter() + .flat_map(|e| e.column_refs().into_iter()) + .map(|c| input.schema().index_of_column(c).unwrap()) + .collect::>(); + + Projection::try_new( + new_exprs, + pushdown_projection_inexact(Arc::unwrap_or_clone(input), &child_indices) + .map(Arc::new)?, + ) + .map(LogicalPlan::Projection) + } + LogicalPlan::Join(ref join) => { + let join_type = join.join_type; + match join_type { + JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {} + _ => { + return Err(DataFusionError::Internal(format!( + "unsupported join type: {join_type}" + ))) + } + }; + + let mut indices = indices.clone(); + + // Relax the filter so that it can be computed from the + // "pruned" children + let filter = join + .filter + .as_ref() + .map(|f| widen_filter(f, &mut indices, &plan)) + .transpose()?; + + let (mut left_child_indices, mut right_child_indices) = + indices.iter().partition_map(|&i| { + if i < join.left.schema().fields().len() { + Either::Left(i) + } else { + Either::Right(i - join.left.schema().fields().len()) + } + }); + + let on = join.on.iter().try_fold(vec![], |mut v, (lexpr, rexpr)| { + // The ON clause includes filters like `lexpr = rexpr` + // If either side is considered 'relevant', we include it. + // See documentation for [`expr_is_relevant`]. + if expr_is_relevant(lexpr, &left_child_indices, &join.left)? + || expr_is_relevant(rexpr, &right_child_indices, &join.right)? + { + add_all_columns_to_indices(lexpr, &mut left_child_indices, &join.left)?; + add_all_columns_to_indices(rexpr, &mut right_child_indices, &join.right)?; + v.push((lexpr.clone(), rexpr.clone())) + } + + Ok::<_, DataFusionError>(v) + })?; + + let join = match plan { + LogicalPlan::Join(join) => join, + _ => unreachable!(), + }; + + let left = + pushdown_projection_inexact(Arc::unwrap_or_clone(join.left), &left_child_indices) + .map(Arc::new)?; + let right = + pushdown_projection_inexact(Arc::unwrap_or_clone(join.right), &right_child_indices) + .map(Arc::new)?; + + let schema = project_dfschema(join.schema.as_ref(), &indices).map(Arc::new)?; + + Ok(LogicalPlan::Join(Join { + left, + right, + on, + filter, + join_type, + schema, + ..join + })) + } + LogicalPlan::Union(Union { inputs, schema, .. }) => { + let inputs = inputs + .into_iter() + .map(Arc::unwrap_or_clone) + .map(|plan| pushdown_projection_inexact(plan, indices)) + .map_ok(Arc::new) + .collect::>>()?; + + Ok(LogicalPlan::Union(Union { + inputs, + schema: project_dfschema(schema.as_ref(), indices).map(Arc::new)?, + })) + } + LogicalPlan::TableScan(ref scan) => { + let mut indices = indices.clone(); + let filters = scan + .filters + .iter() + .map(|f| widen_filter(f, &mut indices, &plan)) + .collect::>>()?; + + let new_projection = scan + .projection + .clone() + .unwrap_or((0..scan.source.schema().fields().len()).collect()) + .into_iter() + .enumerate() + .filter_map(|(i, j)| indices.contains(&i).then_some(j)) + .collect_vec(); + + let scan = match plan { + LogicalPlan::TableScan(scan) => scan, + _ => unreachable!(), + }; + + TableScan::try_new( + scan.table_name, + scan.source, + Some(new_projection), + filters, + None, + ) + .map(LogicalPlan::TableScan) + } + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row, + schema, + }) => Ok(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row, + schema: project_dfschema(schema.as_ref(), indices).map(Arc::new)?, + })), + LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => SubqueryAlias::try_new( + pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices).map(Arc::new)?, + alias, + ) + .map(LogicalPlan::SubqueryAlias), + LogicalPlan::Limit(Limit { input, .. }) | LogicalPlan::Sort(Sort { input, .. }) => { + // Ignore sorts/limits entirely and remove them from the plan + pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices) + } + LogicalPlan::Values(Values { schema, values }) => { + let schema = project_dfschema(&schema, indices).map(Arc::new)?; + let values = values + .into_iter() + .map(|row| { + row.into_iter() + .enumerate() + .filter_map(|(i, v)| indices.contains(&i).then_some(v)) + .collect_vec() + }) + .collect_vec(); + + Ok(LogicalPlan::Values(Values { schema, values })) + } + LogicalPlan::Distinct(Distinct::All(input)) => { + pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices) + .map(Arc::new) + .map(Distinct::All) + .map(LogicalPlan::Distinct) + } + LogicalPlan::Unnest(unnest) => { + // Map parent indices to child indices. + // The columns of an unnest node have a many-to-one relation + // to the columns of the input. + let child_indices = indices + .iter() + .map(|&i| unnest.dependency_indices[i]) + .collect::>(); + + let input_using_columns = unnest.input.using_columns()?; + let input_schema = unnest.input.schema(); + let columns_to_unnest = + unnest + .exec_columns + .into_iter() + .try_fold(vec![], |mut v, c| { + let c = c.normalize_with_schemas_and_ambiguity_check( + &[&[input_schema.as_ref()]], + &input_using_columns, + )?; + let idx = input_schema.index_of_column(&c)?; + if child_indices.contains(&idx) { + v.push(c); + } + + Ok::<_, DataFusionError>(v) + })?; + + let columns_to_project = unnest + .schema + .columns() + .into_iter() + .enumerate() + .filter_map(|(i, c)| indices.contains(&i).then_some(c)) + .map(Expr::Column) + .collect_vec(); + + LogicalPlanBuilder::from(pushdown_projection_inexact( + Arc::unwrap_or_clone(unnest.input), + &child_indices, + )?) + .unnest_columns_with_options(columns_to_unnest, unnest.options)? + .project(columns_to_project)? + .build() + } + + _ => internal_err!("Unsupported logical plan node: {}", plan.display()), + } + .map_err(|e| e.context(format!("plan: \n{plan_formatted}"))) +} + +/// 'Widen' a filter, i.e. given a predicate P, +/// compute P' such that P' is true whenever P is. +/// In particular, P' should be computed using columns whose indices are in `indices`. +/// +/// # Mutating `indices` +/// +/// Currently under some conditions this function will add new entries to `indices`. +/// This is particularly important in some cases involving joins. For example, +/// consider the following plan: +/// +/// ```ignore +/// Projection: t2.year, t2.month, t2.day, t2.feed, t2.column2, t3.column1 +/// Inner Join: Using t2.year = t3.year +/// TableScan: t2 +/// TableScan: t3 +/// ``` +/// +/// If we want to prune all parts of the plan not related to t2.year, we'd get something like this: +/// +/// ```ignore +/// Projection: t2.year +/// Inner Join: Using +/// TableScan: t2 projection=[year] +/// TableScan: t3 projection=[] +/// ``` +/// +/// Notice that the filter in the inner join is gone. This is because `t3.year` is not obviously referenced in the definition of `t2.year`; +/// it is only implicitly used in the join filter. +/// +/// To get around this, we look at filter expressions, and if they contain a _single_ column in the index set, +/// we add the rest of the columns from the filter to the index set, to ensure all of the filter's inputs +/// will be present. +fn widen_filter( + predicate: &Expr, + indices: &mut HashSet, + parent: &LogicalPlan, +) -> Result { + let conjunctions = split_conjunction(predicate); + + conjunctions.into_iter().try_fold(lit(true), |a, b| { + Ok(if expr_is_relevant(b, indices, parent)? { + add_all_columns_to_indices(b, indices, parent)?; + a.and(b.clone()) + } else { + a + }) + }) +} + +/// An expression is considered 'relevant' if a single column is inside our index set. +fn expr_is_relevant(expr: &Expr, indices: &HashSet, parent: &LogicalPlan) -> Result { + let schemas = parent + .inputs() + .iter() + .map(|input| input.schema().as_ref()) + .collect_vec(); + let using_columns = parent.using_columns()?; + + for c in expr.column_refs() { + let normalized_column = c + .clone() + .normalize_with_schemas_and_ambiguity_check(&[&schemas], &using_columns)?; + let column_idx = parent.schema().index_of_column(&normalized_column)?; + + if indices.contains(&column_idx) { + return Ok(true); + } + } + + Ok(false) +} + +/// Get all referenced columns in the expression, +/// and add them to the index set. +fn add_all_columns_to_indices( + expr: &Expr, + indices: &mut HashSet, + parent: &LogicalPlan, +) -> Result<()> { + let schemas = parent + .inputs() + .iter() + .map(|input| input.schema().as_ref()) + .collect_vec(); + let using_columns = parent.using_columns()?; + + for c in expr.column_refs() { + let normalized_column = c + .clone() + .normalize_with_schemas_and_ambiguity_check(&[&schemas], &using_columns)?; + let column_idx = parent.schema().index_of_column(&normalized_column)?; + + indices.insert(column_idx); + } + + Ok(()) +} + +fn project_dfschema(schema: &DFSchema, indices: &HashSet) -> Result { + let qualified_fields = (0..schema.fields().len()) + .filter_map(|i| { + indices.contains(&i).then_some({ + let (reference, field) = schema.qualified_field(i); + (reference.cloned(), Arc::new(field.clone())) + }) + }) + .collect_vec(); + + // todo: handle functional dependencies + DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone()) +} + +/// Rewrite TableScans on top of the file metadata table, +/// assuming the query only uses the S3 partition columns. +/// Then push up the file metadata to the output of this plan. +/// +/// The result will have a single new column with an autogenerated name "__meta_" +/// which contains the source file metadata for a given row in the output. +fn push_up_file_metadata( + plan: LogicalPlan, + catalog_options: &CatalogOptions, + row_metadata_registry: Arc, +) -> Result { + let alias_generator = AliasGenerator::new(); + plan.transform_up(|plan| { + match plan { + LogicalPlan::TableScan(scan) => { + scan_columns_from_row_metadata(scan, catalog_options, row_metadata_registry.clone()) + } + plan => project_row_metadata_from_input(plan, &alias_generator), + } + .and_then(LogicalPlan::recompute_schema) + .map(Transformed::yes) + }) + .map(|t| t.data) +} + +/// Assuming the input has any columns of the form "__meta_", +/// push up the file columns through the output of this LogicalPlan node. +/// The output will have a single new column of the form "__meta_". +fn project_row_metadata_from_input( + plan: LogicalPlan, + alias_generator: &AliasGenerator, +) -> Result { + use datafusion_expr::logical_plan::*; + + // find all file metadata columns and collapse them into one concatenated list + match plan { + LogicalPlan::Projection(Projection { expr, input, .. }) => { + let file_md_columns = input + .schema() + .columns() + .into_iter() + .filter_map(|c| c.name.starts_with(META_COLUMN).then_some(Expr::Column(c))) + .collect_vec(); + Projection::try_new( + expr.into_iter() + .chain(Some( + flatten(make_array(file_md_columns)) + .alias(alias_generator.next(META_COLUMN)), + )) + .collect_vec(), + input, + ) + .map(LogicalPlan::Projection) + } + _ => { + let plan = plan.recompute_schema()?; + let (file_md_columns, original_columns) = plan + .schema() + .columns() + .into_iter() + .partition::, _>(|c| c.name.starts_with(META_COLUMN)); + + Projection::try_new( + original_columns + .into_iter() + .map(Expr::Column) + .chain(Some( + flatten(make_array( + file_md_columns.into_iter().map(Expr::Column).collect_vec(), + )) + .alias(alias_generator.next(META_COLUMN)), + )) + .collect_vec(), + Arc::new(plan), + ) + .map(LogicalPlan::Projection) + } + } +} + +/// Turn a TableScan into an equivalent scan on the row metadata source, +/// assuming that every column in the table scan is a partition column; +/// also adds a new column to the TableScan, "__meta" +/// which is a List of Struct column including the row metadata. +fn scan_columns_from_row_metadata( + scan: TableScan, + catalog_options: &CatalogOptions, + row_metadata_registry: Arc, +) -> Result { + let table_ref = scan.table_name.clone().resolve( + &catalog_options.default_catalog, + &catalog_options.default_schema, + ); + + let source = row_metadata_registry.get_source(&table_ref)?; + + // [`RowMetadataSource`] returns a Struct, + // but the MV algorithm expects a list of structs at each node in the plan. + let mut exprs = scan + .projected_schema + .fields() + .iter() + .map(|f| col((None, f))) + .collect_vec(); + exprs.push(make_array(vec![col(META_COLUMN)]).alias(META_COLUMN)); + + source + .row_metadata(table_ref, &scan)? + .project(exprs)? + .alias(scan.table_name.clone())? + .filter( + scan.filters + .clone() + .into_iter() + .fold(lit(true), |a, b| a.and(b)), + )? + .build() +} + +#[cfg(test)] +mod test { + use std::{any::Any, collections::HashSet, sync::Arc}; + + use arrow::util::pretty::pretty_format_batches; + use arrow_schema::SchemaRef; + use datafusion::{ + assert_batches_eq, assert_batches_sorted_eq, + catalog::{Session, TableProvider}, + datasource::listing::ListingTableUrl, + execution::session_state::SessionStateBuilder, + prelude::{DataFrame, SessionConfig, SessionContext}, + }; + use datafusion_common::{Column, Result, ScalarValue}; + use datafusion_expr::{Expr, JoinType, LogicalPlan, TableType}; + use datafusion_physical_plan::ExecutionPlan; + use datafusion_sql::TableReference; + use itertools::Itertools; + + use crate::materialized::{ + dependencies::pushdown_projection_inexact, + register_materialized, + row_metadata::{ObjectStoreRowMetadataSource, RowMetadataRegistry, RowMetadataSource}, + ListingTableLike, Materialized, + }; + + use super::file_dependencies; + + /// A mock materialized view. + #[derive(Debug)] + struct MockMaterializedView { + table_path: ListingTableUrl, + partition_columns: Vec, + query: LogicalPlan, + file_ext: &'static str, + } + + #[async_trait::async_trait] + impl TableProvider for MockMaterializedView { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::new(self.query.schema().as_arrow().clone()) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + unimplemented!() + } + } + + impl ListingTableLike for MockMaterializedView { + fn table_paths(&self) -> Vec { + vec![self.table_path.clone()] + } + + fn partition_columns(&self) -> Vec { + self.partition_columns.clone() + } + + fn file_ext(&self) -> String { + self.file_ext.to_string() + } + } + + impl Materialized for MockMaterializedView { + fn query(&self) -> LogicalPlan { + self.query.clone() + } + } + + async fn setup() -> Result { + let _ = env_logger::builder().is_test(true).try_init(); + + register_materialized::(); + + let state = SessionStateBuilder::new() + .with_default_features() + .with_config( + SessionConfig::new() + .with_default_catalog_and_schema("datafusion", "test") + .set( + "datafusion.explain.logical_plan_only", + &ScalarValue::Boolean(Some(true)), + ) + .set( + "datafusion.sql_parser.dialect", + &ScalarValue::Utf8(Some("duckdb".into())), + ) + .set( + // See discussion in this issue: + // https://github.com/apache/datafusion/issues/13065 + "datafusion.execution.skip_physical_aggregate_schema_check", + &ScalarValue::Boolean(Some(true)), + ), + ) + .build(); + + let ctx = SessionContext::new_with_state(state); + + ctx.sql( + "CREATE TABLE t1 AS VALUES + ('2021', 3, 'A'), + ('2022', 4, 'B'), + ('2023', 5, 'C')", + ) + .await? + .collect() + .await?; + + ctx.sql( + "CREATE TABLE t2 ( + year STRING, + month STRING, + day STRING, + feed CHAR, + column2 INTEGER + ) AS VALUES + ('2023', '01', '01', 'A', 1), + ('2023', '01', '02', 'B', 2), + ('2023', '01', '03', 'C', 3), + ('2024', '12', '04', 'X', 4), + ('2024', '12', '05', 'Y', 5), + ('2024', '12', '06', 'Z', 6)", + ) + .await? + .collect() + .await?; + + ctx.sql( + "CREATE TABLE t3 ( + year STRING, + column1 INTEGER + ) AS VALUES + (2023, 1), + (2024, 2)", + ) + .await? + .collect() + .await?; + + ctx.sql( + // create a fake file metadata table to use as a mock + "CREATE TABLE file_metadata ( + table_catalog STRING, + table_schema STRING, + table_name STRING, + file_path STRING, + last_modified TIMESTAMP, + size BIGINT UNSIGNED + ) AS VALUES + ('datafusion', 'test', 't1', 's3://t1/column1=2021/data.01.parquet', '2023-07-11T16:29:26Z', 0), + ('datafusion', 'test', 't1', 's3://t1/column1=2022/data.01.parquet', '2023-07-11T16:45:22Z', 0), + ('datafusion', 'test', 't1', 's3://t1/column1=2023/data.01.parquet', '2023-07-11T16:45:44Z', 0), + ('datafusion', 'test', 't2', 's3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet', '2023-07-11T16:29:26Z', 0), + ('datafusion', 'test', 't2', 's3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet', '2023-07-11T16:45:22Z', 0), + ('datafusion', 'test', 't2', 's3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet', '2023-07-11T16:45:44Z', 0), + ('datafusion', 'test', 't2', 's3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet', '2023-07-11T16:29:26Z', 0), + ('datafusion', 'test', 't2', 's3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet', '2023-07-11T16:45:22Z', 0), + ('datafusion', 'test', 't2', 's3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet', '2023-07-11T16:45:44Z', 0), + ('datafusion', 'test', 't3', 's3://t3/year=2023/data.01.parquet', '2023-07-11T16:45:44Z', 0), + ('datafusion', 'test', 't3', 's3://t3/year=2024/data.01.parquet', '2023-07-11T16:45:44Z', 0) + " + ) + .await? + .collect() + .await?; + + let row_metadata_registry = Arc::new(RowMetadataRegistry::default()); + let t1_ref = TableReference::parse_str("t1").resolve( + &ctx.state().config_options().catalog.default_catalog, + &ctx.state().config_options().catalog.default_schema, + ); + let t2_ref = TableReference::parse_str("t2").resolve( + &ctx.state().config_options().catalog.default_catalog, + &ctx.state().config_options().catalog.default_schema, + ); + let t3_ref = TableReference::parse_str("t3").resolve( + &ctx.state().config_options().catalog.default_catalog, + &ctx.state().config_options().catalog.default_schema, + ); + + let metadata_table = ctx.table_provider("file_metadata").await?; + let object_store_metadata_source = Arc::new( + ObjectStoreRowMetadataSource::with_file_metadata(metadata_table), + ); + + for r in [t1_ref, t2_ref, t3_ref] { + row_metadata_registry.register_source( + &r, + Arc::clone(&object_store_metadata_source) as Arc, + ); + } + + ctx.register_udtf( + "file_dependencies", + file_dependencies( + ctx.state().catalog_list().clone(), + row_metadata_registry.clone(), + ctx.copied_config().options(), + ), + ); + + Ok(ctx) + } + + #[tokio::test] + async fn test_deps() { + struct TestCase { + name: &'static str, + query_to_analyze: &'static str, + table_name: &'static str, + table_path: ListingTableUrl, + partition_cols: Vec<&'static str>, + file_extension: &'static str, + expected_output: Vec<&'static str>, + file_metadata: &'static str, + } + + let cases = &[ + TestCase { + name: "un-transformed partition column", + query_to_analyze: + "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1", + table_name: "m1", + table_path: ListingTableUrl::parse("s3://m1/").unwrap(), + partition_cols: vec!["partition_column"], + file_extension: ".parquet", + expected_output: vec![ + "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| s3://m1/partition_column=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m1/partition_column=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m1/partition_column=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |", + "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + ], + // second file is old + file_metadata: " + ('datafusion', 'test', 'm1', 's3://m1/partition_column=2021/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm1', 's3://m1/partition_column=2022/data.01.parquet', '2023-07-10T16:00:00Z', 0), + ('datafusion', 'test', 'm1', 's3://m1/partition_column=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0) + ", + }, + TestCase { + name: "transform year/month/day partition into timestamp partition", + query_to_analyze: " + SELECT DISTINCT + to_timestamp_nanos(concat_ws('-', year, month, day)) AS timestamp, + feed + FROM t2", + table_name: "m2", + table_path: ListingTableUrl::parse("s3://m2/").unwrap(), + partition_cols: vec!["timestamp", "feed"], + file_extension: ".parquet", + expected_output: vec![ + "+-----------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+-----------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| s3://m2/timestamp=2023-01-01T00:00:00/feed=A/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m2/timestamp=2023-01-02T00:00:00/feed=B/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m2/timestamp=2023-01-03T00:00:00/feed=C/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m2/timestamp=2024-12-04T00:00:00/feed=X/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m2/timestamp=2024-12-05T00:00:00/feed=Y/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m2/timestamp=2024-12-06T00:00:00/feed=Z/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44 |", + "+-----------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + ], + file_metadata: " + ('datafusion', 'test', 'm2', 's3://m2/timestamp=2023-01-01T00:00:00/feed=A/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm2', 's3://m2/timestamp=2023-01-02T00:00:00/feed=B/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm2', 's3://m2/timestamp=2023-01-03T00:00:00/feed=C/data.01.parquet', '2023-07-10T16:00:00Z', 0), + ('datafusion', 'test', 'm2', 's3://m2/timestamp=2024-12-04T00:00:00/feed=X/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm2', 's3://m2/timestamp=2024-12-05T00:00:00/feed=Y/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm2', 's3://m2/timestamp=2024-12-06T00:00:00/feed=Z/data.01.parquet', '2023-07-10T16:00:00Z', 0) + ", + }, + TestCase { + name: "materialized view has no partitions", + query_to_analyze: "SELECT column1 AS output FROM t3", + table_name: "m3", + table_path: ListingTableUrl::parse("s3://m3/").unwrap(), + partition_cols: vec![], + file_extension: ".parquet", + expected_output: vec![ + "+----------+----------------------+---------------------+-------------------+-----------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+----------+----------------------+---------------------+-------------------+-----------------------------------+----------------------+", + "| s3://m3/ | datafusion | test | t3 | s3://t3/year=2023/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m3/ | datafusion | test | t3 | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44 |", + "+----------+----------------------+---------------------+-------------------+-----------------------------------+----------------------+", + ], + file_metadata: " + ('datafusion', 'test', 'm3', 's3://m3/data.01.parquet', '2023-07-12T16:00:00Z', 0) + ", + }, + TestCase { + name: "simple equijoin on year", + query_to_analyze: "SELECT * FROM t2 INNER JOIN t3 USING (year)", + table_name: "m4", + table_path: ListingTableUrl::parse("s3://m4/").unwrap(), + partition_cols: vec!["year"], + file_extension: ".parquet", + expected_output: vec![ + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2023/ | datafusion | test | t3 | s3://t3/year=2023/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2024/ | datafusion | test | t3 | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44 |", + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + ], + file_metadata: " + ('datafusion', 'test', 'm4', 's3://m4/year=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm4', 's3://m4/year=2024/data.01.parquet', '2023-07-12T16:00:00Z', 0) + ", + }, + TestCase { + name: "triangular join on year", + query_to_analyze: " + SELECT + t2.*, + t3.* EXCLUDE(year), + t3.year AS \"t3.year\" + FROM t2 + INNER JOIN t3 + ON (t2.year <= t3.year)", + table_name: "m4", + table_path: ListingTableUrl::parse("s3://m4/").unwrap(), + partition_cols: vec!["year"], + file_extension: ".parquet", + expected_output: vec![ + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2023/ | datafusion | test | t3 | s3://t3/year=2023/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2023/ | datafusion | test | t3 | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2024/ | datafusion | test | t3 | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44 |", + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + ], + file_metadata: " + ('datafusion', 'test', 'm4', 's3://m4/year=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm4', 's3://m4/year=2024/data.01.parquet', '2023-07-12T16:00:00Z', 0) + ", + }, + TestCase { + name: "triangular left join, strict <", + query_to_analyze: " + SELECT + t2.*, + t3.* EXCLUDE(year), + t3.year AS \"t3.year\" + FROM t2 + LEFT JOIN t3 + ON (t2.year < t3.year)", + table_name: "m4", + table_path: ListingTableUrl::parse("s3://m4/").unwrap(), + partition_cols: vec!["year"], + file_extension: ".parquet", + expected_output: vec![ + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2023/ | datafusion | test | t3 | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44 |", + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + ], + file_metadata: " + ('datafusion', 'test', 'm4', 's3://m4/year=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm4', 's3://m4/year=2024/data.01.parquet', '2023-07-12T16:00:00Z', 0) + ", + }, + ]; + + async fn run_test(case: &TestCase) -> Result<()> { + let context = setup().await.unwrap(); + + let plan = context + .sql(case.query_to_analyze) + .await? + .into_optimized_plan()?; + + println!("original plan: \n{}", plan.display_indent()); + + let partition_col_indices = plan + .schema() + .columns() + .into_iter() + .enumerate() + .filter_map(|(i, c)| case.partition_cols.contains(&c.name.as_str()).then_some(i)) + .collect(); + println!("indices: {:?}", partition_col_indices); + let analyzed = pushdown_projection_inexact(plan.clone(), &partition_col_indices)?; + println!( + "inexact projection pushdown:\n{}", + analyzed.display_indent() + ); + + context + .register_table( + case.table_name, + Arc::new(MockMaterializedView { + table_path: case.table_path.clone(), + partition_columns: case + .partition_cols + .iter() + .map(|s| s.to_string()) + .collect(), + query: plan, + file_ext: case.file_extension, + }), + ) + .expect("couldn't register materialized view"); + + context + .sql(&format!( + "INSERT INTO file_metadata VALUES {}", + case.file_metadata, + )) + .await? + .collect() + .await?; + + let df = context + .sql(&format!( + "SELECT * FROM file_dependencies('{}', 'v2')", + case.table_name, + )) + .await + .map_err(|e| e.context("get file dependencies"))?; + df.clone().explain(false, false)?.show().await?; + df.clone().show().await?; + + assert_batches_sorted_eq!(case.expected_output, &df.collect().await?); + + Ok(()) + } + + for case in cases { + run_test(case) + .await + .unwrap_or_else(|e| panic!("{} failed: {e}", case.name)); + } + } + + #[tokio::test] + async fn test_projection_pushdown_inexact() -> Result<()> { + struct TestCase { + name: &'static str, + query_to_analyze: &'static str, + projection: &'static [&'static str], + expected_plan: Vec<&'static str>, + expected_output: Vec<&'static str>, + } + + let cases = &[ + TestCase { + name: "simple projection", + query_to_analyze: + "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1", + projection: &["partition_column"], + expected_plan: vec![ + "+--------------+--------------------------------------------+", + "| plan_type | plan |", + "+--------------+--------------------------------------------+", + "| logical_plan | Projection: t1.column1 AS partition_column |", + "| | TableScan: t1 projection=[column1] |", + "+--------------+--------------------------------------------+", + ], + expected_output: vec![ + "+------------------+", + "| partition_column |", + "+------------------+", + "| 2021 |", + "| 2022 |", + "| 2023 |", + "+------------------+", + ], + }, + TestCase { + name: "compound expressions", + query_to_analyze: " + SELECT DISTINCT + to_timestamp_nanos(concat_ws('-', year, month, day)) AS timestamp, + feed + FROM t2", + projection: &["timestamp", "feed"], + expected_plan: vec![ + "+--------------+-------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+--------------+-------------------------------------------------------------------------------------------------------+", + "| logical_plan | Projection: to_timestamp_nanos(concat_ws(Utf8(\"-\"), t2.year, t2.month, t2.day)) AS timestamp, t2.feed |", + "| | TableScan: t2 projection=[year, month, day, feed] |", + "+--------------+-------------------------------------------------------------------------------------------------------+", + ] + , + expected_output: vec![ + "+---------------------+------+", + "| timestamp | feed |", + "+---------------------+------+", + "| 2023-01-01T00:00:00 | A |", + "| 2023-01-02T00:00:00 | B |", + "| 2023-01-03T00:00:00 | C |", + "| 2024-12-04T00:00:00 | X |", + "| 2024-12-05T00:00:00 | Y |", + "| 2024-12-06T00:00:00 | Z |", + "+---------------------+------+", + ], + }, + TestCase { + name: "empty projection", + query_to_analyze: "SELECT column1 AS output FROM t3", + projection: &[], + expected_plan: vec![ + "+--------------+-----------------------------+", + "| plan_type | plan |", + "+--------------+-----------------------------+", + "| logical_plan | TableScan: t3 projection=[] |", + "+--------------+-----------------------------+", + ], + expected_output: vec![ + "++", + "++", + "++", + ], + }, + TestCase { + name: "simple equijoin on year", + query_to_analyze: "SELECT * FROM t2 INNER JOIN t3 USING (year)", + projection: &["year"], + expected_plan: vec![ + "+--------------+-------------------------------------+", + "| plan_type | plan |", + "+--------------+-------------------------------------+", + "| logical_plan | Projection: t2.year |", + "| | Inner Join: t2.year = t3.year |", + "| | TableScan: t2 projection=[year] |", + "| | TableScan: t3 projection=[year] |", + "+--------------+-------------------------------------+", + ], + expected_output: vec![ + "+------+", + "| year |", + "+------+", + "| 2023 |", + "| 2023 |", + "| 2023 |", + "| 2024 |", + "| 2024 |", + "| 2024 |", + "+------+", + ], + }, + TestCase { + name: "triangular join on year", + query_to_analyze: " + SELECT + t2.*, + t3.* EXCLUDE(year), + t3.year AS \"t3.year\" + FROM t2 + INNER JOIN t3 + ON (t2.year <= t3.year)", + projection: &["year"], + expected_plan: vec![ + "+--------------+-------------------------------------------+", + "| plan_type | plan |", + "+--------------+-------------------------------------------+", + "| logical_plan | Projection: t2.year |", + "| | Inner Join: Filter: t2.year <= t3.year |", + "| | TableScan: t2 projection=[year] |", + "| | TableScan: t3 projection=[year] |", + "+--------------+-------------------------------------------+", + ], + expected_output: vec![ + "+------+", + "| year |", + "+------+", + "| 2023 |", + "| 2023 |", + "| 2023 |", + "| 2023 |", + "| 2023 |", + "| 2023 |", + "| 2024 |", + "| 2024 |", + "| 2024 |", + "+------+", + ], + }, + TestCase { + name: "window & unnest", + query_to_analyze: " + SELECT + \"unnest_placeholder(date).year\" AS year, + \"unnest_placeholder(date).month\" AS month, + \"unnest_placeholder(date).day\" AS day, + arr + FROM ( + SELECT + unnest(date), + unnest(arr) AS arr + FROM ( + SELECT + named_struct('year', year, 'month', month, 'day', day) AS date, + array_agg(column2) + OVER (ORDER BY year, month, day) + AS arr + FROM t2 + ) + )", + projection: &["year", "month", "day"], + expected_plan: vec![ + "+--------------+---------------------------------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+--------------+---------------------------------------------------------------------------------------------------------------------------------+", + "| logical_plan | Projection: unnest_placeholder(date).year AS year, unnest_placeholder(date).month AS month, unnest_placeholder(date).day AS day |", + "| | Unnest: lists[] structs[unnest_placeholder(date)] |", + "| | Projection: named_struct(Utf8(\"year\"), t2.year, Utf8(\"month\"), t2.month, Utf8(\"day\"), t2.day) AS unnest_placeholder(date) |", + "| | TableScan: t2 projection=[year, month, day] |", + "+--------------+---------------------------------------------------------------------------------------------------------------------------------+", + ], + expected_output: vec![ + "+------+-------+-----+", + "| year | month | day |", + "+------+-------+-----+", + "| 2023 | 01 | 01 |", + "| 2023 | 01 | 02 |", + "| 2023 | 01 | 03 |", + "| 2024 | 12 | 04 |", + "| 2024 | 12 | 05 |", + "| 2024 | 12 | 06 |", + "+------+-------+-----+", + ], + }, + TestCase { + name: "outer join + union", + query_to_analyze: " + SELECT + COALESCE(t1.year, t2.year) AS year, + t1.column2 + FROM (SELECT column1 AS year, column2 FROM t1) t1 + FULL OUTER JOIN (SELECT year, column2 FROM t2) t2 + USING (year) + UNION ALL + SELECT year, column1 AS column2 FROM t3 + ", + projection: &["year"], + expected_plan: vec![ + "+--------------+--------------------------------------------------+", + "| plan_type | plan |", + "+--------------+--------------------------------------------------+", + "| logical_plan | Union |", + "| | Projection: coalesce(t1.year, t2.year) AS year |", + "| | Full Join: Using t1.year = t2.year |", + "| | SubqueryAlias: t1 |", + "| | Projection: t1.column1 AS year |", + "| | TableScan: t1 projection=[column1] |", + "| | SubqueryAlias: t2 |", + "| | TableScan: t2 projection=[year] |", + "| | TableScan: t3 projection=[year] |", + "+--------------+--------------------------------------------------+", + ], + expected_output: vec![ + "+------+", + "| year |", + "+------+", + "| 2021 |", + "| 2022 |", + "| 2023 |", + "| 2023 |", + "| 2023 |", + "| 2023 |", + "| 2024 |", + "| 2024 |", + "| 2024 |", + "| 2024 |", + "+------+", + ], + } + ]; + + async fn run_test(case: &TestCase) -> Result<()> { + let context = setup().await?; + + let df = context.sql(case.query_to_analyze).await?; + df.clone().explain(false, false)?.show().await?; + + let plan = df.clone().into_optimized_plan()?; + + let indices = case + .projection + .iter() + .map(|&name| { + plan.schema() + .index_of_column(&Column::new_unqualified(name)) + }) + .collect::>>()?; + + let analyzed = DataFrame::new( + context.state(), + pushdown_projection_inexact(plan.clone(), &indices)?, + ); + analyzed.clone().explain(false, false)?.show().await?; + + // Check the following property of pushdown_projection_inexact: + // if A' = pushdown_projection_inexact(A, P), where P is the projection, + // then PA ⊆ A'. + if !case.projection.is_empty() { + let select_original = df + .clone() + .select( + case.projection + .iter() + .map(|&name| Expr::Column(Column::new_unqualified(name))) + .collect_vec(), + ) + .map_err(|e| e.context("select projection from original plan"))? + .distinct()?; + + let excess = analyzed + .clone() + .distinct()? + .join( + select_original.clone(), + JoinType::RightAnti, + case.projection, + case.projection, + None, + ) + .map_err(|e| e.context("join in subset inclusion test"))?; + + assert_eq!( + excess + .clone() + .count() + .await + .map_err(|e| e.context("execute subset inclusion test"))?, + 0, + "unexpected extra rows in transformed query:\n{} + original:\n{} + inexact pushdown:\n{} + ", + pretty_format_batches(&excess.collect().await?)?, + pretty_format_batches(&select_original.collect().await?)?, + pretty_format_batches(&analyzed.clone().distinct()?.collect().await?)? + ); + } + + assert_batches_eq!( + case.expected_plan, + &analyzed.clone().explain(false, false)?.collect().await? + ); + assert_batches_sorted_eq!(case.expected_output, &analyzed.collect().await?); + + Ok(()) + } + + for case in cases { + run_test(case) + .await + .unwrap_or_else(|e| panic!("{} failed: {e}", case.name)); + } + + Ok(()) + } +} diff --git a/src/materialized/file_metadata.rs b/src/materialized/file_metadata.rs index 0e79e5d..fd57cc0 100644 --- a/src/materialized/file_metadata.rs +++ b/src/materialized/file_metadata.rs @@ -22,7 +22,7 @@ use async_trait::async_trait; use datafusion::catalog::SchemaProvider; use datafusion::catalog::{CatalogProvider, Session}; use datafusion::datasource::listing::ListingTableUrl; -use datafusion::datasource::{provider_as_source, TableProvider}; +use datafusion::datasource::TableProvider; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::physical_expr::{create_physical_expr, EquivalenceProperties}; use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal}; @@ -47,9 +47,6 @@ use std::any::Any; use std::sync::Arc; use crate::materialized::cast_to_listing_table; -use crate::materialized::{hive_partition::hive_partition, META_COLUMN}; - -use super::row_metadata::RowMetadataSource; /// A virtual file metadata table, inspired by the information schema column table. #[derive(Debug, Clone)] @@ -126,79 +123,6 @@ impl TableProvider for FileMetadata { } } -impl RowMetadataSource for FileMetadata { - fn name(&self) -> &str { - "FileMetadata" - } - - /// Scan for partition column values using object store metadata. - /// This allows us to efficiently scan for distinct partition column values without - /// ever reading from a table directly, which is useful for low-overhead - /// incremental view maintenance. - fn row_metadata( - self: Arc, - table: datafusion_sql::ResolvedTableReference, - scan: &datafusion_expr::TableScan, - ) -> Result { - use datafusion::datasource::source_as_provider; - use datafusion::functions::core::expr_fn::named_struct; - use datafusion::prelude::*; - - // Check that the remaining columns in the source table scans are indeed partition columns - let partition_cols = cast_to_listing_table(source_as_provider(&scan.source)?.as_ref()) - .ok_or_else(|| { - DataFusionError::Internal(format!( - "Table '{}' was not registered in TableTypeRegistry", - scan.table_name - )) - })? - .partition_columns(); - - for column in scan.projected_schema.columns() { - if !partition_cols.contains(&column.name) { - return Err(DataFusionError::Internal(format!("Row metadata not available on non-partition column from source table '{table}': {}", column.name))); - } - } - - let fields = scan.projected_schema.fields(); - - let row_metadata_expr = make_array(vec![named_struct(vec![ - lit("table_catalog"), - col("table_catalog"), - lit("table_schema"), - col("table_schema"), - lit("table_name"), - col("table_name"), - lit("source_uri"), // Map file_path to source_uri - col("file_path"), - lit("last_modified"), - col("last_modified"), - ])]) - .alias(META_COLUMN); - - datafusion_expr::LogicalPlanBuilder::scan("file_metadata", provider_as_source(self), None)? - .filter( - col("table_catalog") - .eq(lit(table.catalog.as_ref())) - .and(col("table_schema").eq(lit(table.schema.as_ref()))) - .and(col("table_name").eq(lit(table.table.as_ref()))), - )? - .project( - fields - .iter() - .map(|field| { - // CAST(hive_partition(file_path, 'field_name', true) AS field_data_type) AS field_name - cast( - hive_partition(vec![col("file_path"), lit(field.name()), lit(true)]), - field.data_type().clone(), - ) - .alias(field.name()) - }) - .chain(Some(row_metadata_expr)), - ) - } -} - /// An [`ExecutionPlan`] that scans object store metadata. pub struct FileMetadataExec { table_schema: SchemaRef, diff --git a/src/materialized/hive_partition.rs b/src/materialized/hive_partition.rs index 34bb7ae..075750e 100644 --- a/src/materialized/hive_partition.rs +++ b/src/materialized/hive_partition.rs @@ -53,7 +53,7 @@ pub static HIVE_PARTITION_UDF_NAME: &str = "hive_partition"; /// SELECT /// column_name, /// hive_partition( -/// 's3://atlas/sip/trades/year=2006/month=01/day=02/trades-2006-01-02.parquet', +/// 's3://sip/trades/year=2006/month=01/day=02/trades-2006-01-02.parquet', /// column_name /// ) AS partition_value /// FROM (VALUES ('year'), ('month'), ('day')) AS partition_columns (column_name); diff --git a/src/materialized/row_metadata.rs b/src/materialized/row_metadata.rs index d0df6e0..add8df3 100644 --- a/src/materialized/row_metadata.rs +++ b/src/materialized/row_metadata.rs @@ -16,10 +16,13 @@ // under the License. use dashmap::DashMap; +use datafusion::catalog::TableProvider; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{LogicalPlanBuilder, TableScan}; use datafusion_sql::ResolvedTableReference; -use std::sync::Arc; +use std::{collections::BTreeMap, sync::Arc}; + +use super::{file_metadata::FileMetadata, hive_partition::hive_partition, META_COLUMN}; /// Registry that manages metadata sources for different tables. /// Provides a centralized way to register and retrieve metadata sources @@ -29,6 +32,21 @@ pub struct RowMetadataRegistry { metadata_sources: DashMap>, } +impl std::fmt::Debug for RowMetadataRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RowMetadataRegistry") + .field( + "metadata_sources", + &self + .metadata_sources + .iter() + .map(|r| (r.key().clone(), r.value().name().to_string())) + .collect::>(), + ) + .finish() + } +} + impl RowMetadataRegistry { /// Registers a metadata source for a specific table. /// Returns the previously registered source for this table, if any @@ -52,7 +70,7 @@ impl RowMetadataRegistry { /// A source for "row metadata", that associates rows from a table with /// metadata used for incremental view maintenance. /// -/// Most use cases should default to using [`FileMetadata`](super::file_metadata::FileMetadata) for their [`RowMetadataSource`], +/// Most use cases should default to using [`FileMetadata`] for their [`RowMetadataSource`], /// which uses object store metadata to perform incremental view maintenance on Hive-partitioned tables. /// However, in some use cases it is necessary to track metadata at a more granular level than Hive partitions. /// In such cases, users may implement a custom [`RowMetadataSource`] containing this metadata. @@ -81,8 +99,112 @@ pub trait RowMetadataSource: Send + Sync { /// That is, for each row in the original table scan, the [`RowMetadataSource`] should contain at least /// one row (but potentially more) with the same values, plus the `__meta` column. fn row_metadata( - self: Arc, + &self, table: ResolvedTableReference, scan: &TableScan, ) -> Result; } + +/// A [`RowMetadataSource`] that uses an object storage API to retrieve +/// partition columns and timestamp metadata. +/// +/// Object store metadata by default comes from [`FileMetadata`], but +/// may be overridden with a custom [`TableProvider`] using +/// [`Self::with_file_metadata`]. +#[derive(Debug, Clone)] +pub struct ObjectStoreRowMetadataSource { + file_metadata: Arc, +} + +impl ObjectStoreRowMetadataSource { + /// Create a new [`ObjectStoreRowMetadataSource`] from the [`FileMetadata`] table + pub fn new(file_metadata: Arc) -> Self { + Self::with_file_metadata(file_metadata) + } + + /// Create a new [`ObjectStoreRowMetadataSource`] using a custom file metadata source + pub fn with_file_metadata(file_metadata: Arc) -> Self { + Self { file_metadata } + } +} + +impl RowMetadataSource for ObjectStoreRowMetadataSource { + fn name(&self) -> &str { + "ObjectStoreRowMetadataSource" + } + + /// Scan for partition column values using object store metadata. + /// This allows us to efficiently scan for distinct partition column values without + /// ever reading from a table directly, which is useful for low-overhead + /// incremental view maintenance. + fn row_metadata( + &self, + table: datafusion_sql::ResolvedTableReference, + scan: &datafusion_expr::TableScan, + ) -> Result { + use datafusion::{datasource::provider_as_source, prelude::*}; + + // Disable this check in tests + #[cfg(not(test))] + { + // Check that the remaining columns in the source table scans are indeed partition columns + let partition_cols = super::cast_to_listing_table( + datafusion::datasource::source_as_provider(&scan.source)?.as_ref(), + ) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Table '{}' was not registered in TableTypeRegistry", + scan.table_name + )) + })? + .partition_columns(); + + for column in scan.projected_schema.columns() { + if !partition_cols.contains(&column.name) { + return Err(DataFusionError::Internal(format!("Row metadata not available on non-partition column from source table '{table}': {}", column.name))); + } + } + } + + let fields = scan.projected_schema.fields(); + + let row_metadata_expr = named_struct(vec![ + lit("table_catalog"), + col("table_catalog"), + lit("table_schema"), + col("table_schema"), + lit("table_name"), + col("table_name"), + lit("source_uri"), // Map file_path to source_uri + col("file_path"), + lit("last_modified"), + col("last_modified"), + ]) + .alias(META_COLUMN); + + datafusion_expr::LogicalPlanBuilder::scan( + "file_metadata", + provider_as_source(Arc::clone(&self.file_metadata)), + None, + )? + .filter( + col("table_catalog") + .eq(lit(table.catalog.as_ref())) + .and(col("table_schema").eq(lit(table.schema.as_ref()))) + .and(col("table_name").eq(lit(table.table.as_ref()))), + )? + .project( + fields + .iter() + .map(|field| { + // CAST(hive_partition(file_path, 'field_name', true) AS field_data_type) AS field_name + cast( + hive_partition(vec![col("file_path"), lit(field.name()), lit(true)]), + field.data_type().clone(), + ) + .alias(field.name()) + }) + .chain(Some(row_metadata_expr)), + ) + } +} diff --git a/src/materialized/util.rs b/src/materialized/util.rs new file mode 100644 index 0000000..7977f8d --- /dev/null +++ b/src/materialized/util.rs @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::catalog::{CatalogProviderList, TableProvider}; +use datafusion_common::{DataFusionError, Result}; +use datafusion_sql::ResolvedTableReference; + +pub fn get_table( + catalog_list: &dyn CatalogProviderList, + table_ref: &ResolvedTableReference, +) -> Result> { + let catalog = catalog_list + .catalog(table_ref.catalog.as_ref()) + .ok_or_else(|| DataFusionError::Plan(format!("no such catalog {}", table_ref.catalog)))?; + + let schema = catalog + .schema(table_ref.schema.as_ref()) + .ok_or_else(|| DataFusionError::Plan(format!("no such schema {}", table_ref.schema)))?; + + // NOTE: this is bad, we are calling async code in a sync context. + // We should file an issue about async in UDTFs. + futures::executor::block_on(schema.table(table_ref.table.as_ref())) + .map_err(|e| e.context(format!("couldn't get table '{}'", table_ref.table)))? + .ok_or_else(|| DataFusionError::Plan(format!("no such table {}", table_ref.schema))) +}