From 400caf61313cbe4289be44f766e0bac5f248160e Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Mon, 6 Jan 2025 20:11:05 -0800 Subject: [PATCH 1/8] Enforce insert deduplicate policy for spark SQL --- .../apache/hudi/HoodieSparkSqlWriter.scala | 66 +++++++-- .../hudi/TestHoodieSparkSqlWriter.scala | 54 ++++++- .../spark/sql/hudi/dml/TestInsertTable.scala | 136 ++++++++++++++++++ 3 files changed, 244 insertions(+), 12 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 6ae1158c221ef..586f096698831 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -29,6 +29,7 @@ import org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTable import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption} import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams +import org.apache.hudi.HoodieSparkSqlWriterInternal.handleInsertDuplicates import org.apache.hudi.HoodieSparkUtils.sparkAdapter import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema @@ -45,7 +46,7 @@ import org.apache.hudi.common.model._ import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys +import org.apache.hudi.common.util.ConfigUtils.{getAllConfigKeys, getStringWithAltKeys} import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} import org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, WRITE_TABLE_VERSION} @@ -66,8 +67,7 @@ import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.sync.common.util.SyncUtilHelpers import org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException import org.apache.hudi.util.SparkKeyGenUtils - -import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.sql.HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier @@ -79,7 +79,6 @@ import org.apache.spark.{SPARK_VERSION, SparkContext} import org.slf4j.LoggerFactory import java.util.function.BiConsumer - import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.{Failure, Success, Try} @@ -506,12 +505,8 @@ class HoodieSparkSqlWriterInternal { case Failure(e) => throw new HoodieRecordCreationException("Failed to create Hoodie Spark Record", e) } - val dedupedHoodieRecords = - if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) && operation != WriteOperationType.INSERT_OVERWRITE_TABLE && operation != WriteOperationType.INSERT_OVERWRITE) { - DataSourceUtils.dropDuplicates(jsc, hoodieRecords, parameters.asJava) - } else { - hoodieRecords - } + // Remove duplicates from incoming records based on existing keys from storage. + val dedupedHoodieRecords = handleInsertDuplicates(hoodieRecords, hoodieConfig, operation, jsc, parameters) client.startCommitWithTime(instantTime, commitActionType) try { val writeResult = DataSourceUtils.doWriteOperation(client, dedupedHoodieRecords, instantTime, operation, @@ -1165,3 +1160,54 @@ class HoodieSparkSqlWriterInternal { .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq newQueryExecution) } } + +object HoodieSparkSqlWriterInternal { + // Check if duplicates should be dropped. + def shouldDropDuplicatesForInserts(hoodieConfig: HoodieConfig): Boolean = { + hoodieConfig.contains(INSERT_DUP_POLICY) && + hoodieConfig.getString(INSERT_DUP_POLICY).equalsIgnoreCase(DROP_INSERT_DUP_POLICY) + } + + // Check if we should fail if duplicates are found. + def shouldFailWhenDuplicatesFound(hoodieConfig: HoodieConfig): Boolean = { + hoodieConfig.contains(INSERT_DUP_POLICY) && + hoodieConfig.getString(INSERT_DUP_POLICY).equalsIgnoreCase(FAIL_INSERT_DUP_POLICY) + } + + // Check if deduplication is required. + def isDeduplicationRequired(hoodieConfig: HoodieConfig): Boolean = { + hoodieConfig.getBoolean(INSERT_DROP_DUPS) || + shouldFailWhenDuplicatesFound(hoodieConfig) || + shouldDropDuplicatesForInserts(hoodieConfig) + } + + // Check if deduplication is needed. + def isDeduplicationNeeded(operation: WriteOperationType): Boolean = { + operation != WriteOperationType.INSERT_OVERWRITE_TABLE && + operation != WriteOperationType.INSERT_OVERWRITE + } + + def handleInsertDuplicates(incomingRecords: JavaRDD[HoodieRecord[_]], + hoodieConfig: HoodieConfig, + operation: WriteOperationType, + jsc: JavaSparkContext, + parameters: Map[String, String]): JavaRDD[HoodieRecord[_]] = { + // If no deduplication is needed, return the incoming records as is + if (!isDeduplicationRequired(hoodieConfig) || !isDeduplicationNeeded(operation)) { + incomingRecords + } else { + // Perform deduplication + val deduplicatedRecords = DataSourceUtils.dropDuplicates(jsc, incomingRecords, parameters.asJava) + // Check if duplicates were found and fail if necessary + if (shouldFailWhenDuplicatesFound(hoodieConfig) && + deduplicatedRecords.count() < incomingRecords.count()) { + throw new HoodieException( + s"Duplicate records detected. " + + s"The number of records after deduplication (${deduplicatedRecords.count()}) " + + s"is less than the incoming records (${incomingRecords.count()})." + ) + } + deduplicatedRecords + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index f31697fdbce41..12e3d7303e604 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -30,9 +30,10 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestUtils} import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient - import org.apache.avro.Schema import org.apache.commons.io.FileUtils +import org.apache.hudi.DataSourceWriteOptions.{DROP_INSERT_DUP_POLICY, FAIL_INSERT_DUP_POLICY, INSERT_DROP_DUPS, INSERT_DUP_POLICY} +import org.apache.hudi.common.config.HoodieConfig import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import org.apache.spark.sql.functions.{expr, lit} @@ -1245,6 +1246,56 @@ def testBulkInsertForDropPartitionColumn(): Unit = { assert(exc.getMessage.contains("Consistent hashing bucket index does not work with COW table. Use simple bucket index or an MOR table.")) } + @Test + def testShouldDropDuplicatesForInserts(): Unit = { + val hoodieConfig: HoodieConfig = new HoodieConfig() + var shouldDrop: Boolean = HoodieSparkSqlWriterInternal.shouldDropDuplicatesForInserts(hoodieConfig) + assertFalse(shouldDrop) + + hoodieConfig.setValue(INSERT_DUP_POLICY.key, DROP_INSERT_DUP_POLICY) + shouldDrop = HoodieSparkSqlWriterInternal.shouldDropDuplicatesForInserts(hoodieConfig) + assertTrue(shouldDrop) + } + + @Test + def testShouldFailWhenDuplicatesFound(): Unit = { + val hoodieConfig: HoodieConfig = new HoodieConfig() + var shouldFail: Boolean = HoodieSparkSqlWriterInternal.shouldFailWhenDuplicatesFound(hoodieConfig) + assertFalse(shouldFail) + + hoodieConfig.setValue(INSERT_DUP_POLICY.key, FAIL_INSERT_DUP_POLICY) + shouldFail = HoodieSparkSqlWriterInternal.shouldFailWhenDuplicatesFound(hoodieConfig) + assertTrue(shouldFail) + } + + @Test + def testIsDeduplicationRequired(): Unit = { + val hoodieConfig: HoodieConfig = new HoodieConfig() + var isRequired: Boolean = HoodieSparkSqlWriterInternal.isDeduplicationRequired(hoodieConfig) + assertFalse(isRequired) + + hoodieConfig.setValue(INSERT_DUP_POLICY.key, FAIL_INSERT_DUP_POLICY) + isRequired = HoodieSparkSqlWriterInternal.isDeduplicationRequired(hoodieConfig) + assertTrue(isRequired) + + hoodieConfig.setValue(INSERT_DUP_POLICY.key, DROP_INSERT_DUP_POLICY) + isRequired = HoodieSparkSqlWriterInternal.isDeduplicationRequired(hoodieConfig) + assertTrue(isRequired) + + hoodieConfig.setValue(INSERT_DUP_POLICY.key, "") + hoodieConfig.setValue(INSERT_DROP_DUPS.key, "true") + isRequired = HoodieSparkSqlWriterInternal.isDeduplicationRequired(hoodieConfig) + assertTrue(isRequired) + } + + @Test + def testIsDeduplicationNeeded(): Unit = { + assertFalse(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.INSERT_OVERWRITE)) + assertFalse(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.INSERT_OVERWRITE_TABLE)) + assertTrue(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.INSERT)) + assertTrue(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.UPSERT)) + } + private def fetchActualSchema(): Schema = { val tableMetaClient = createMetaClient(spark, tempBasePath) new TableSchemaResolver(tableMetaClient).getTableAvroSchema(false) @@ -1272,5 +1323,4 @@ object TestHoodieSparkSqlWriter { Arguments.arguments("*5/03/1*", Seq("2016/03/15")), Arguments.arguments("2016/03/*", Seq("2015/03/16", "2015/03/17"))) } - } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala index 7814d75a05e5a..64ab784430111 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala @@ -3087,4 +3087,140 @@ class TestInsertTable extends HoodieSparkSqlTestBase { }) } } + + test("Test table with insert dup policy - drop case") { + withSQLConf("hoodie.datasource.insert.dup.policy" -> "drop") { + withTempDir { tmp => + val targetTable = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$targetTable" + + spark.sql( + s""" + |create table ${targetTable} ( + | `id` string, + | `name` string, + | `dt` bigint, + | `day` STRING, + | `hour` INT + |) using hudi + |tblproperties ( + | 'primaryKey' = 'id', + | 'type' = 'MOR', + | 'preCombineField'='dt', + | 'hoodie.index.type' = 'BUCKET', + | 'hoodie.bucket.index.hash.field' = 'id', + | 'hoodie.bucket.index.num.buckets'=512 + | ) + partitioned by (`day`,`hour`) + location '${tablePath}' + """.stripMargin) + + spark.sql("set spark.sql.shuffle.partitions = 11") + spark.sql( + s""" + |insert into ${targetTable} + |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 10 as `hour` + |""".stripMargin) + spark.sql( + s""" + |insert into ${targetTable} + |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour` + |""".stripMargin) + + // check result after insert and merge data into target table + checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")( + Seq("1", "aa", 123, "2024-02-19", 10) + ) + } + } + } + + test("Test table with insert dup policy - fail case") { + withSQLConf("hoodie.datasource.insert.dup.policy" -> "fail") { + withTempDir { tmp => + val targetTable = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$targetTable" + + spark.sql( + s""" + |create table ${targetTable} ( + | `id` string, + | `name` string, + | `dt` bigint, + | `day` STRING, + | `hour` INT + |) using hudi + |tblproperties ( + | 'primaryKey' = 'id', + | 'type' = 'MOR', + | 'preCombineField'='dt', + | 'hoodie.index.type' = 'BUCKET', + | 'hoodie.bucket.index.hash.field' = 'id', + | 'hoodie.bucket.index.num.buckets'=512 + | ) + partitioned by (`day`,`hour`) + location '${tablePath}' + """.stripMargin) + + spark.sql("set spark.sql.shuffle.partitions = 11") + spark.sql( + s""" + |insert into ${targetTable} + |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 10 as `hour` + |""".stripMargin) + checkException( + () => spark.sql(s""" + |insert into ${targetTable} + |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour` + |""".stripMargin))(s"Duplicate records detected. The number of records after deduplication (0) is less than the incoming records (1).") + } + } + } + + test("Test table with insert dup policy - none case") { + withSQLConf("hoodie.datasource.insert.dup.policy" -> "none") { + withTempDir { tmp => + val targetTable = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$targetTable" + + spark.sql( + s""" + |create table ${targetTable} ( + | `id` string, + | `name` string, + | `dt` bigint, + | `day` STRING, + | `hour` INT + |) using hudi + |tblproperties ( + | 'primaryKey' = 'id', + | 'type' = 'MOR', + | 'preCombineField'='dt', + | 'hoodie.index.type' = 'BUCKET', + | 'hoodie.bucket.index.hash.field' = 'id', + | 'hoodie.bucket.index.num.buckets'=512 + | ) + partitioned by (`day`,`hour`) + location '${tablePath}' + """.stripMargin) + + spark.sql("set spark.sql.shuffle.partitions = 11") + spark.sql( + s""" + |insert into ${targetTable} + |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 10 as `hour` + |""".stripMargin) + spark.sql( + s""" + |insert into ${targetTable} + |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour` + |""".stripMargin) + + // check result after insert and merge data into target table + checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")( + Seq("1", "aa", 1234, "2024-02-19", 10) + ) + } + } + } } From 9c146545fd7994ec53eb761349d828d8fc50dfc4 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Tue, 7 Jan 2025 00:09:01 -0800 Subject: [PATCH 2/8] Fix some ci failures --- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 13 ++++++++----- .../apache/hudi/TestHoodieSparkSqlWriter.scala | 3 ++- .../spark/sql/hudi/dml/TestInsertTable.scala | 17 ++++++++--------- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 586f096698831..80688999d5c0c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -29,7 +29,7 @@ import org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTable import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption} import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams -import org.apache.hudi.HoodieSparkSqlWriterInternal.handleInsertDuplicates +import org.apache.hudi.HoodieSparkSqlWriterInternal.{handleInsertDuplicates, shouldDropDuplicatesForInserts, shouldFailWhenDuplicatesFound} import org.apache.hudi.HoodieSparkUtils.sparkAdapter import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema @@ -549,13 +549,16 @@ class HoodieSparkSqlWriterInternal { var operation = WriteOperationType.fromValue(hoodieConfig.getString(OPERATION)) // TODO clean up // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS is true + // or INSERT_DUP_POLICY is `drop` or `fail`. // Auto-correct the operation to "insert" if OPERATION is set to "upsert" wrongly // or not set (in which case it will be set as "upsert" by parametersWithWriteDefaults()) . - if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) && + if ((hoodieConfig.getBoolean(INSERT_DROP_DUPS) || + shouldFailWhenDuplicatesFound(hoodieConfig) || + shouldDropDuplicatesForInserts(hoodieConfig)) && operation == WriteOperationType.UPSERT) { log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " + - s"when $INSERT_DROP_DUPS is set to be true, " + + s"when $INSERT_DROP_DUPS is set to be true, or $INSERT_DUP_POLICY is set to fail or drop, " + s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL") operation = WriteOperationType.INSERT @@ -1183,8 +1186,8 @@ object HoodieSparkSqlWriterInternal { // Check if deduplication is needed. def isDeduplicationNeeded(operation: WriteOperationType): Boolean = { - operation != WriteOperationType.INSERT_OVERWRITE_TABLE && - operation != WriteOperationType.INSERT_OVERWRITE + operation == WriteOperationType.INSERT || + operation == WriteOperationType.INSERT_PREPPED } def handleInsertDuplicates(incomingRecords: JavaRDD[HoodieRecord[_]], diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 12e3d7303e604..14a253977fe7f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -1292,8 +1292,9 @@ def testBulkInsertForDropPartitionColumn(): Unit = { def testIsDeduplicationNeeded(): Unit = { assertFalse(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.INSERT_OVERWRITE)) assertFalse(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.INSERT_OVERWRITE_TABLE)) + assertFalse(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.UPSERT)) assertTrue(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.INSERT)) - assertTrue(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.UPSERT)) + assertTrue(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.INSERT_PREPPED)) } private def fetchActualSchema(): Schema = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala index 64ab784430111..7fff304921264 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala @@ -379,7 +379,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { Seq(2, "a2", 12.0, 1000) ) - assertThrows[HoodieDuplicateKeyException] { + assertThrows[HoodieException] { try { spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") } catch { @@ -449,9 +449,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase { Seq(1, "a1", 10.0), Seq(2, "a2", 12.0) ) - spark.sql("set hoodie.merge.allow.duplicate.on.inserts = false") - assertThrows[HoodieDuplicateKeyException] { + assertThrows[HoodieException] { try { spark.sql(s"insert into $tableName select 1, 'a1', 10") } catch { @@ -2787,7 +2786,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { ) if (expectExceptionOnSecondBatch) { - assertThrows[HoodieDuplicateKeyException] { + assertThrows[HoodieException] { try { spark.sql( s""" @@ -3109,7 +3108,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { | 'preCombineField'='dt', | 'hoodie.index.type' = 'BUCKET', | 'hoodie.bucket.index.hash.field' = 'id', - | 'hoodie.bucket.index.num.buckets'=512 + | 'hoodie.bucket.index.num.buckets'= 512 | ) partitioned by (`day`,`hour`) location '${tablePath}' @@ -3153,10 +3152,10 @@ class TestInsertTable extends HoodieSparkSqlTestBase { |tblproperties ( | 'primaryKey' = 'id', | 'type' = 'MOR', - | 'preCombineField'='dt', + | 'preCombineField'= 'dt', | 'hoodie.index.type' = 'BUCKET', | 'hoodie.bucket.index.hash.field' = 'id', - | 'hoodie.bucket.index.num.buckets'=512 + | 'hoodie.bucket.index.num.buckets'= 512 | ) partitioned by (`day`,`hour`) location '${tablePath}' @@ -3195,10 +3194,10 @@ class TestInsertTable extends HoodieSparkSqlTestBase { |tblproperties ( | 'primaryKey' = 'id', | 'type' = 'MOR', - | 'preCombineField'='dt', + | 'preCombineField'= 'dt', | 'hoodie.index.type' = 'BUCKET', | 'hoodie.bucket.index.hash.field' = 'id', - | 'hoodie.bucket.index.num.buckets'=512 + | 'hoodie.bucket.index.num.buckets'= 512 | ) partitioned by (`day`,`hour`) location '${tablePath}' From 4a99cb7c4857fcc691cd7d95df8c1e8975c43e71 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Tue, 7 Jan 2025 08:58:31 -0800 Subject: [PATCH 3/8] Fix ci failures --- .../org/apache/spark/sql/hudi/dml/TestInsertTable.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala index 7fff304921264..8fc389da4b6f5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala @@ -2606,7 +2606,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { test("Test FAIL insert dup policy with INSERT_INTO explicit new configs") { withRecordType(Seq(HoodieRecordType.AVRO))(withTempDir { tmp => Seq("cow", "mor").foreach { tableType => - val operation = WriteOperationType.UPSERT + val operation = WriteOperationType.INSERT val dupPolicy = FAIL_INSERT_DUP_POLICY withTable(generateTableName) { tableName => ingestAndValidateDataDupPolicy(tableType, tableName, tmp, operation, @@ -2638,6 +2638,9 @@ class TestInsertTable extends HoodieSparkSqlTestBase { | PARTITIONED BY (boolean_field, float_field, byte_field, short_field, decimal_field, date_field, string_field, timestamp_field) |LOCATION '${tmp.getCanonicalPath}' """.stripMargin) + // Avoid operation type modification. + spark.sql(s"set ${INSERT_DROP_DUPS.key}=false") + spark.sql(s"set ${INSERT_DUP_POLICY.key}=$NONE_INSERT_DUP_POLICY") // Insert data into partitioned table spark.sql( From d928b49239b1b19fd2c0c34a72797953f7180182 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Tue, 7 Jan 2025 10:20:45 -0800 Subject: [PATCH 4/8] Remove ValidateDuplicateKeyPayload class --- .../spark/sql/hudi/ProvidesHoodieConfig.scala | 41 ++--------------- .../command/ValidateDuplicateKeyPayload.scala | 44 ------------------- 2 files changed, 4 insertions(+), 81 deletions(-) delete mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ValidateDuplicateKeyPayload.scala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 7656e61146311..a6008cbf5234f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hudi.HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{filterHoodieConfigs, isUsingHiveCatalog} import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, getPartitionPathFieldWriteConfig} -import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload} +import org.apache.spark.sql.hudi.command.SqlKeyGenerator import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PARTITION_OVERWRITE_MODE import org.apache.spark.sql.types.StructType @@ -96,23 +96,6 @@ trait ProvidesHoodieConfig extends Logging { defaultOpts = defaultOpts, overridingOpts = overridingOpts) } - - private def deducePayloadClassNameLegacy(operation: String, tableType: String, insertMode: InsertMode): String = { - if (operation == UPSERT_OPERATION_OPT_VAL && - tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) { - // Validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload - // on reading. - // TODO use HoodieSparkValidateDuplicateKeyRecordMerger when SparkRecordMerger is default - classOf[ValidateDuplicateKeyPayload].getCanonicalName - } else if (operation == INSERT_OPERATION_OPT_VAL && tableType == COW_TABLE_TYPE_OPT_VAL && - insertMode == InsertMode.STRICT) { - // Validate duplicate key for inserts to COW table when using strict insert mode. - classOf[ValidateDuplicateKeyPayload].getCanonicalName - } else { - classOf[DefaultHoodieRecordPayload].getCanonicalName - } - } - /** * Deduce the sql write operation for INSERT_INTO */ @@ -274,25 +257,9 @@ trait ProvidesHoodieConfig extends Logging { Map() } - // try to use new insert dup policy instead of legacy insert mode to deduce payload class. If only insert mode is explicitly specified, - // w/o specifying any value for insert dup policy, legacy configs will be honored. But on all other cases (i.e when neither of the configs is set, - // or when both configs are set, or when only insert dup policy is set), we honor insert dup policy and ignore the insert mode. - val useLegacyInsertDropDupFlow = insertModeSet && !insertDupPolicySet - val deducedPayloadClassName = if (useLegacyInsertDropDupFlow) { - deducePayloadClassNameLegacy(operation, tableType, insertMode) - } else { - if (insertDupPolicy == FAIL_INSERT_DUP_POLICY) { - classOf[ValidateDuplicateKeyPayload].getCanonicalName - } else { - classOf[DefaultHoodieRecordPayload].getCanonicalName - } - } - - val (recordMergeMode, recordMergeStrategy) = if (deducedPayloadClassName.equals(classOf[ValidateDuplicateKeyPayload].getCanonicalName)) { - (RecordMergeMode.CUSTOM.name(), HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID) - } else { - (RecordMergeMode.EVENT_TIME_ORDERING.name(), HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID) - } + val deducedPayloadClassName = classOf[DefaultHoodieRecordPayload].getCanonicalName + val recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING.name + val recordMergeStrategy = HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID if (tableConfig.getPayloadClass.equals(classOf[DefaultHoodieRecordPayload].getCanonicalName) && tableConfig.getRecordMergeMode.equals(RecordMergeMode.EVENT_TIME_ORDERING)) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ValidateDuplicateKeyPayload.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ValidateDuplicateKeyPayload.scala deleted file mode 100644 index 1a8297e20ce32..0000000000000 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ValidateDuplicateKeyPayload.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hudi.command - -import org.apache.avro.Schema -import org.apache.avro.generic.{GenericRecord, IndexedRecord} -import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord} -import org.apache.hudi.common.util.{Option => HOption} -import org.apache.hudi.exception.HoodieDuplicateKeyException - -import java.util.Properties - -/** - * Validate the duplicate key for insert statement without enable the INSERT_DROP_DUPS_OPT - * config. - */ -class ValidateDuplicateKeyPayload(record: GenericRecord, orderingVal: Comparable[_]) - extends DefaultHoodieRecordPayload(record, orderingVal) { - - def this(record: HOption[GenericRecord]) { - this(if (record.isPresent) record.get else null, 0) - } - - override def combineAndGetUpdateValue(currentValue: IndexedRecord, - schema: Schema, properties: Properties): HOption[IndexedRecord] = { - val key = currentValue.asInstanceOf[GenericRecord].get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString - throw new HoodieDuplicateKeyException(key) - } -} From bf6e175e74104ec0c40556263d46fa91cd21589c Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Wed, 8 Jan 2025 13:35:51 -0800 Subject: [PATCH 5/8] Address comments --- .../java/org/apache/hudi/DataSourceUtils.java | 44 ++++++++++++++----- .../apache/hudi/HoodieSparkSqlWriter.scala | 3 +- .../org/apache/hudi/TestDataSourceUtils.java | 2 +- .../spark/sql/hudi/dml/TestInsertTable.scala | 14 +++--- .../hudi/utilities/streamer/StreamSync.java | 2 +- 5 files changed, 47 insertions(+), 18 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index c8666711189a7..8d5da5403a954 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -38,6 +38,7 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieDuplicateKeyException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.storage.HoodieStorage; @@ -284,24 +285,47 @@ public static HoodieRecord createHoodieRecord(GenericRecord gr, HoodieKey hKey, * @param writeConfig HoodieWriteConfig */ @SuppressWarnings("unchecked") - public static JavaRDD dropDuplicates(HoodieSparkEngineContext engineContext, JavaRDD incomingHoodieRecords, - HoodieWriteConfig writeConfig) { + public static JavaRDD doDropDuplicates(HoodieSparkEngineContext engineContext, + JavaRDD incomingHoodieRecords, + HoodieWriteConfig writeConfig, + boolean failOnDuplicates) { try { SparkRDDReadClient client = new SparkRDDReadClient<>(engineContext, writeConfig); return client.tagLocation(incomingHoodieRecords) - .filter(r -> !((HoodieRecord) r).isCurrentLocationKnown()); + .filter(r -> shouldIncludeRecord((HoodieRecord) r, failOnDuplicates)); } catch (TableNotFoundException e) { - // this will be executed when there is no hoodie table yet - // so no dups to drop + // No table exists yet, so no duplicates to drop return incomingHoodieRecords; } } + /** + * Determines if a record should be included in the result after deduplication. + * + * @param record The Hoodie record to evaluate. + * @param failOnDuplicates Whether to fail on detecting duplicates. + * @return true if the record should be included; false otherwise. + */ + private static boolean shouldIncludeRecord(HoodieRecord record, boolean failOnDuplicates) { + if (!record.isCurrentLocationKnown()) { + return true; + } + if (failOnDuplicates) { + // Fail if duplicates are found and the flag is set + throw new HoodieDuplicateKeyException(record.getRecordKey()); + } + return false; + } + @SuppressWarnings("unchecked") - public static JavaRDD dropDuplicates(JavaSparkContext jssc, JavaRDD incomingHoodieRecords, - Map parameters) { - HoodieWriteConfig writeConfig = - HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build(); - return dropDuplicates(new HoodieSparkEngineContext(jssc), incomingHoodieRecords, writeConfig); + public static JavaRDD dropDuplicates(JavaSparkContext jssc, + JavaRDD incomingHoodieRecords, + Map parameters, + boolean failOnDuplicates) { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(parameters.get("path")) + .withProps(parameters).build(); + return doDropDuplicates( + new HoodieSparkEngineContext(jssc), incomingHoodieRecords, writeConfig, failOnDuplicates); } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 80688999d5c0c..ee7afa30d910e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -1200,7 +1200,8 @@ object HoodieSparkSqlWriterInternal { incomingRecords } else { // Perform deduplication - val deduplicatedRecords = DataSourceUtils.dropDuplicates(jsc, incomingRecords, parameters.asJava) + val deduplicatedRecords = DataSourceUtils.dropDuplicates( + jsc, incomingRecords, parameters.asJava, shouldFailWhenDuplicatesFound(hoodieConfig)) // Check if duplicates were found and fail if necessary if (shouldFailWhenDuplicatesFound(hoodieConfig) && deduplicatedRecords.count() < incomingRecords.count()) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestDataSourceUtils.java index 375a7e9ded33f..dd51eee18e338 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -52,7 +52,7 @@ void testDeduplicationAgainstRecordsAlreadyInTable() { Map parameters = config.getProps().entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().toString(), entry -> entry.getValue().toString())); List newRecords = dataGen.generateInserts(newCommitTime, 10); List inputRecords = Stream.concat(records.subList(0, 10).stream(), newRecords.stream()).collect(Collectors.toList()); - List output = DataSourceUtils.dropDuplicates(jsc, jsc.parallelize(inputRecords, 1), parameters).collect(); + List output = DataSourceUtils.dropDuplicates(jsc, jsc.parallelize(inputRecords, 1), parameters, false).collect(); Set expectedRecordKeys = newRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet()); assertEquals(expectedRecordKeys, output.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet())); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala index 8fc389da4b6f5..104b2846bce17 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala @@ -26,7 +26,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver} import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.util.{Option => HOption} import org.apache.hudi.config.{HoodieClusteringConfig, HoodieIndexConfig, HoodieWriteConfig} -import org.apache.hudi.exception.{HoodieDuplicateKeyException, HoodieException} +import org.apache.hudi.exception.HoodieDuplicateKeyException import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient @@ -36,9 +36,12 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getLastCommitMetadata import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.{Arguments, MethodSource} import java.io.File import java.util.concurrent.{CountDownLatch, TimeUnit} +import scala.collection.Seq class TestInsertTable extends HoodieSparkSqlTestBase { @@ -379,7 +382,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { Seq(2, "a2", 12.0, 1000) ) - assertThrows[HoodieException] { + assertThrows[HoodieDuplicateKeyException] { try { spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") } catch { @@ -450,7 +453,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { Seq(2, "a2", 12.0) ) spark.sql("set hoodie.merge.allow.duplicate.on.inserts = false") - assertThrows[HoodieException] { + assertThrows[HoodieDuplicateKeyException] { try { spark.sql(s"insert into $tableName select 1, 'a1', 10") } catch { @@ -2789,7 +2792,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { ) if (expectExceptionOnSecondBatch) { - assertThrows[HoodieException] { + assertThrows[HoodieDuplicateKeyException] { try { spark.sql( s""" @@ -2877,7 +2880,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { """.stripMargin) // drop dups is not supported in bulk_insert row writer path. - assertThrows[HoodieException] { + assertThrows[HoodieDuplicateKeyException] { try { spark.sql(s"insert into $tableName values(1, 'a1', 10, '2021-07-18')") } catch { @@ -3226,3 +3229,4 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } } + diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 12c869666e1f7..318db0fbfaff7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -926,7 +926,7 @@ private WriteClientWriteResult writeToSink(InputBatch inputBatch, String instant JavaRDD records = (JavaRDD) inputBatch.getBatch().orElseGet(() -> hoodieSparkContext.emptyRDD()); // filter dupes if needed if (cfg.filterDupes) { - records = DataSourceUtils.dropDuplicates(hoodieSparkContext, records, writeClient.getConfig()); + records = DataSourceUtils.doDropDuplicates(hoodieSparkContext, records, writeClient.getConfig(), false); } HoodieWriteResult writeResult = null; From 092c79222b3d77c1a56e4b874c6efbf7b2e3ffff Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Wed, 8 Jan 2025 14:45:29 -0800 Subject: [PATCH 6/8] Fix the tests --- .../spark/sql/hudi/dml/TestInsertTable.scala | 137 ++++-------------- 1 file changed, 31 insertions(+), 106 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala index 104b2846bce17..2796a04dbf3fd 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala @@ -36,12 +36,9 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getLastCommitMetadata import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{Arguments, MethodSource} import java.io.File import java.util.concurrent.{CountDownLatch, TimeUnit} -import scala.collection.Seq class TestInsertTable extends HoodieSparkSqlTestBase { @@ -3093,59 +3090,16 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } - test("Test table with insert dup policy - drop case") { - withSQLConf("hoodie.datasource.insert.dup.policy" -> "drop") { - withTempDir { tmp => - val targetTable = generateTableName - val tablePath = s"${tmp.getCanonicalPath}/$targetTable" - - spark.sql( - s""" - |create table ${targetTable} ( - | `id` string, - | `name` string, - | `dt` bigint, - | `day` STRING, - | `hour` INT - |) using hudi - |tblproperties ( - | 'primaryKey' = 'id', - | 'type' = 'MOR', - | 'preCombineField'='dt', - | 'hoodie.index.type' = 'BUCKET', - | 'hoodie.bucket.index.hash.field' = 'id', - | 'hoodie.bucket.index.num.buckets'= 512 - | ) - partitioned by (`day`,`hour`) - location '${tablePath}' - """.stripMargin) - - spark.sql("set spark.sql.shuffle.partitions = 11") - spark.sql( - s""" - |insert into ${targetTable} - |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 10 as `hour` - |""".stripMargin) - spark.sql( - s""" - |insert into ${targetTable} - |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour` - |""".stripMargin) - - // check result after insert and merge data into target table - checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")( - Seq("1", "aa", 123, "2024-02-19", 10) - ) - } - } - } - - test("Test table with insert dup policy - fail case") { - withSQLConf("hoodie.datasource.insert.dup.policy" -> "fail") { - withTempDir { tmp => + test("Test table with insert dup policy") { + withTempDir { tmp => + Seq( + NONE_INSERT_DUP_POLICY, + FAIL_INSERT_DUP_POLICY, + DROP_INSERT_DUP_POLICY).foreach(policy => { val targetTable = generateTableName val tablePath = s"${tmp.getCanonicalPath}/$targetTable" + spark.sql(s"set hoodie.datasource.insert.dup.policy=$policy") spark.sql( s""" |create table ${targetTable} ( @@ -3173,59 +3127,30 @@ class TestInsertTable extends HoodieSparkSqlTestBase { |insert into ${targetTable} |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 10 as `hour` |""".stripMargin) - checkException( - () => spark.sql(s""" - |insert into ${targetTable} - |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour` - |""".stripMargin))(s"Duplicate records detected. The number of records after deduplication (0) is less than the incoming records (1).") - } - } - } - - test("Test table with insert dup policy - none case") { - withSQLConf("hoodie.datasource.insert.dup.policy" -> "none") { - withTempDir { tmp => - val targetTable = generateTableName - val tablePath = s"${tmp.getCanonicalPath}/$targetTable" - - spark.sql( - s""" - |create table ${targetTable} ( - | `id` string, - | `name` string, - | `dt` bigint, - | `day` STRING, - | `hour` INT - |) using hudi - |tblproperties ( - | 'primaryKey' = 'id', - | 'type' = 'MOR', - | 'preCombineField'= 'dt', - | 'hoodie.index.type' = 'BUCKET', - | 'hoodie.bucket.index.hash.field' = 'id', - | 'hoodie.bucket.index.num.buckets'= 512 - | ) - partitioned by (`day`,`hour`) - location '${tablePath}' - """.stripMargin) - - spark.sql("set spark.sql.shuffle.partitions = 11") - spark.sql( - s""" - |insert into ${targetTable} - |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 10 as `hour` - |""".stripMargin) - spark.sql( - s""" - |insert into ${targetTable} - |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour` - |""".stripMargin) - - // check result after insert and merge data into target table - checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")( - Seq("1", "aa", 1234, "2024-02-19", 10) - ) - } + if (policy.equals(FAIL_INSERT_DUP_POLICY)) { + checkExceptionContain( + () => spark.sql( + s""" + |insert into ${targetTable} + |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour` + |""".stripMargin))(s"Duplicate key found for insert statement, key is: 1") + } else { + spark.sql( + s""" + |insert into ${targetTable} + |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour` + |""".stripMargin) + if (policy.equals(NONE_INSERT_DUP_POLICY)) { + checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")( + Seq("1", "aa", 1234, "2024-02-19", 10) + ) + } else { + checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")( + Seq("1", "aa", 123, "2024-02-19", 10) + ) + } + } + }) } } } From 46dc4d7d27e14adca2a3bda983d57662ddd9e1c0 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Wed, 8 Jan 2025 15:01:17 -0800 Subject: [PATCH 7/8] Add ds test --- .../apache/hudi/TestInsertDedupPolicy.scala | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestInsertDedupPolicy.scala diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestInsertDedupPolicy.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestInsertDedupPolicy.scala new file mode 100644 index 0000000000000..a51c5c539c47c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestInsertDedupPolicy.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig} +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig} +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness +import org.apache.spark.SparkException +import org.apache.spark.sql.{Dataset, Row, SaveMode} +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.{Arguments, MethodSource} + +class TestInsertDedupPolicy extends SparkClientFunctionalTestHarness { + val firstInsertData: Seq[(Int, String, String, String, Double, Int)] = Seq( + (10, "1", "rider-A", "driver-A", 19.10, 7), + (10, "2", "rider-B", "driver-B", 27.70, 1), + (10, "3", "rider-C", "driver-C", 33.90, 10)) + val secondInsertData: Seq[(Int, String, String, String, Double, Int)] = Seq( + (11, "1", "rider-A", "driver-A", 1.1, 1), + (11, "2", "rider-B", "driver-B", 2.2, 2), + (11, "5", "rider-C", "driver-C", 3.3, 3)) + val expectedForDrop: Seq[(Int, String, String, String, Double, Int)] = Seq( + (10, "1", "rider-A", "driver-A", 19.10, 7), + (10, "2", "rider-B", "driver-B", 27.70, 1), + (10, "3", "rider-C", "driver-C", 33.90, 10), + (11, "5", "rider-C", "driver-C", 3.3, 3)) + val expectedForNone: Seq[(Int, String, String, String, Double, Int)] = Seq( + (11, "1", "rider-A", "driver-A", 1.1, 1), + (11, "2", "rider-B", "driver-B", 2.2, 2), + (10, "3", "rider-C", "driver-C", 33.90, 10), + (11, "5", "rider-C", "driver-C", 3.3, 3)) + + @ParameterizedTest + @MethodSource(Array("provideParams")) + def testInsertLogic(tableType: String, + recordType: String, + dedupPolicy: String): Unit = { + val sparkOpts: Map[String, String] = Map( + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet", + HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key -> classOf[DefaultSparkRecordMerger].getName) + val fgReaderOpts: Map[String, String] = Map( + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> "true", + HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key -> "true", + HoodieWriteConfig.RECORD_MERGE_MODE.key -> "EVENT_TIME_ORDERING") + val insertDedupPolicy: Map[String, String] = Map(INSERT_DUP_POLICY.key -> dedupPolicy) + val opts = if (recordType.equals("SPARK")) { + sparkOpts ++ fgReaderOpts ++ insertDedupPolicy + } else { + fgReaderOpts ++ insertDedupPolicy + } + val columns = Seq("ts", "key", "rider", "driver", "fare", "number") + + // Write the first batch of data. + val inserts = spark.createDataFrame(firstInsertData).toDF(columns: _*) + inserts.write.format("hudi"). + option(RECORDKEY_FIELD.key, "key"). + option(PRECOMBINE_FIELD.key, "ts"). + option(TABLE_TYPE.key, tableType). + option(DataSourceWriteOptions.TABLE_NAME.key, "test_table"). + option(HoodieCompactionConfig.INLINE_COMPACT.key, "false"). + options(opts). + mode(SaveMode.Overwrite). + save(basePath) + + val insertsWithDup = spark.createDataFrame(secondInsertData).toDF(columns: _*) + if (dedupPolicy.equals(FAIL_INSERT_DUP_POLICY)) { + // Write and check throws. + Assertions.assertThrows( + classOf[SparkException], + () => insertsWithDup.write.format("hudi").options(opts).mode(SaveMode.Append).save(basePath) + ) + } else { + // Write data. + insertsWithDup.write.format("hudi").options(opts).mode(SaveMode.Append).save(basePath) + // Validate the data. + val df = spark.read.format("hudi").options(opts).load(basePath) + val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number").sort("key") + val expected = if (dedupPolicy.equals(DROP_INSERT_DUP_POLICY)) expectedForDrop else expectedForNone + val expectedDf = spark.createDataFrame(expected).toDF(columns: _*).sort("key") + TestInsertDedupPolicy.validate(expectedDf, finalDf) + } + } +} + +object TestInsertDedupPolicy { + def provideParams(): java.util.List[Arguments] = { + java.util.Arrays.asList( + Arguments.of("MERGE_ON_READ", "AVRO", NONE_INSERT_DUP_POLICY), + Arguments.of("MERGE_ON_READ", "SPARK", NONE_INSERT_DUP_POLICY), + Arguments.of("MERGE_ON_READ", "AVRO", DROP_INSERT_DUP_POLICY), + Arguments.of("MERGE_ON_READ", "SPARK", DROP_INSERT_DUP_POLICY), + Arguments.of("MERGE_ON_READ", "AVRO", FAIL_INSERT_DUP_POLICY), + Arguments.of("MERGE_ON_READ", "SPARK", FAIL_INSERT_DUP_POLICY), + Arguments.of("COPY_ON_WRITE", "AVRO", NONE_INSERT_DUP_POLICY), + Arguments.of("COPY_ON_WRITE", "SPARK", NONE_INSERT_DUP_POLICY), + Arguments.of("COPY_ON_WRITE", "AVRO", DROP_INSERT_DUP_POLICY), + Arguments.of("COPY_ON_WRITE", "SPARK", DROP_INSERT_DUP_POLICY), + Arguments.of("COPY_ON_WRITE", "AVRO", FAIL_INSERT_DUP_POLICY), + Arguments.of("COPY_ON_WRITE", "SPARK", FAIL_INSERT_DUP_POLICY) + ) + } + + def validate(expectedDf: Dataset[Row], actualDf: Dataset[Row]): Unit = { + val expectedMinusActual = expectedDf.except(actualDf) + val actualMinusExpected = actualDf.except(expectedDf) + expectedDf.show(false) + actualDf.show(false) + assertTrue(expectedMinusActual.isEmpty && actualMinusExpected.isEmpty) + } +} + + From 845d6c3734005d81ee2fa46f484675bf8025d761 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Wed, 8 Jan 2025 15:06:06 -0800 Subject: [PATCH 8/8] refactor --- .../scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index ee7afa30d910e..ba7b2fa7715f2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -1200,18 +1200,8 @@ object HoodieSparkSqlWriterInternal { incomingRecords } else { // Perform deduplication - val deduplicatedRecords = DataSourceUtils.dropDuplicates( + DataSourceUtils.dropDuplicates( jsc, incomingRecords, parameters.asJava, shouldFailWhenDuplicatesFound(hoodieConfig)) - // Check if duplicates were found and fail if necessary - if (shouldFailWhenDuplicatesFound(hoodieConfig) && - deduplicatedRecords.count() < incomingRecords.count()) { - throw new HoodieException( - s"Duplicate records detected. " + - s"The number of records after deduplication (${deduplicatedRecords.count()}) " + - s"is less than the incoming records (${incomingRecords.count()})." - ) - } - deduplicatedRecords } } }