Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8849] Reenable TestExpressionIndex #12624

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ExpressionIndexSupport(spark: SparkSession,
val (prunedPartitions, prunedFileNames) = getPrunedPartitionsAndFileNames(fileIndex, prunedPartitionsAndFileSlices)
val expressionIndexRecords = loadExpressionIndexRecords(indexPartition, prunedPartitions, readInMemory)
loadTransposed(queryReferencedColumns, readInMemory, expressionIndexRecords, expressionIndexQuery) {
transposedColStatsDF =>Some(getCandidateFiles(transposedColStatsDF, Seq(expressionIndexQuery), prunedFileNames, isExpressionIndex = true))
transposedColStatsDF =>Some(getCandidateFiles(transposedColStatsDF, Seq(expressionIndexQuery), prunedFileNames, isExpressionIndex = true, Option.apply(indexDefinition)))
}
} else if (indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS)) {
val prunedPartitionAndFileNames = getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices, includeLogFiles = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.hudi
import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.model.{FileSlice, HoodieIndexDefinition}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.keygen.KeyGenUtils
import org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR
Expand Down Expand Up @@ -102,8 +102,13 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,
(prunedPartitions, prunedFiles)
}

protected def getCandidateFiles(indexDf: DataFrame, queryFilters: Seq[Expression], fileNamesFromPrunedPartitions: Set[String], isExpressionIndex: Boolean = false): Set[String] = {
val indexedCols : Seq[String] = metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
protected def getCandidateFiles(indexDf: DataFrame, queryFilters: Seq[Expression], fileNamesFromPrunedPartitions: Set[String],
isExpressionIndex: Boolean = false, indexDefinitionOpt: Option[HoodieIndexDefinition] = Option.empty): Set[String] = {
val indexedCols : Seq[String] = if (indexDefinitionOpt.isDefined) {
indexDefinitionOpt.get.getSourceFields.asScala.toSeq
} else {
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
}
val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, isExpressionIndex, indexedCols)).reduce(And)
if (indexFilter.equals(TrueLiteral)) {
// if there are any non indexed cols or we can't translate source expr, we have to read all files and may not benefit from col stats lookup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,14 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, SaveMode, functions}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
import org.scalatest.Ignore

import java.util.stream.Collectors
import scala.collection.JavaConverters

@Ignore
class TestExpressionIndex extends HoodieSparkSqlTestBase {

override protected def beforeAll(): Unit = {
spark.sql("set hoodie.metadata.index.column.stats.enable=false")
initQueryIndexConf()
}

Expand Down Expand Up @@ -777,6 +776,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)

setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
if (HoodieSparkUtils.gteqSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled=false")
Expand Down Expand Up @@ -856,6 +856,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)

setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
if (HoodieSparkUtils.gteqSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled=false")
Expand Down Expand Up @@ -1471,6 +1472,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)

setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
if (HoodieSparkUtils.gteqSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled=false")
Expand Down Expand Up @@ -1618,6 +1620,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)

setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
if (HoodieSparkUtils.gteqSpark3_4) {
Expand Down Expand Up @@ -1942,7 +1945,9 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
if (shouldRollback) {
// rollback the operation
val lastCompletedInstant = metaClient.reloadActiveTimeline().getCommitsTimeline.filterCompletedInstants().lastInstant()
val writeClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), getWriteConfig(Map.empty, metaClient.getBasePath.toString))
val writeConfig = getWriteConfig(Map.empty, metaClient.getBasePath.toString)
writeConfig.setValue("hoodie.metadata.index.column.stats.enable", "false")
val writeClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), writeConfig)
writeClient.rollback(lastCompletedInstant.get().requestedTime)
// validate the expression index
checkAnswer(metadataSql)(
Expand All @@ -1968,7 +1973,8 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
HoodieWriteConfig.TBL_NAME.key -> tableName,
RECORDKEY_FIELD.key -> "c1",
PRECOMBINE_FIELD.key -> "c1",
PARTITIONPATH_FIELD.key() -> "c8"
PARTITIONPATH_FIELD.key() -> "c8",
"hoodie.metadata.index.column.stats.enable" -> "false"
)
val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json-partition-pruning").toString

Expand Down Expand Up @@ -2055,7 +2061,8 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
PRECOMBINE_FIELD.key -> "c1",
PARTITIONPATH_FIELD.key() -> "c8",
// setting IndexType to be INMEMORY to simulate Global Index nature
HoodieIndexConfig.INDEX_TYPE.key -> HoodieIndex.IndexType.INMEMORY.name()
HoodieIndexConfig.INDEX_TYPE.key -> HoodieIndex.IndexType.INMEMORY.name(),
"hoodie.metadata.index.column.stats.enable" -> "false"
)
val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json-partition-pruning").toString

Expand Down
Loading