diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala index 104b2846bce1..2796a04dbf3f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala @@ -36,12 +36,9 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getLastCommitMetadata import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{Arguments, MethodSource} import java.io.File import java.util.concurrent.{CountDownLatch, TimeUnit} -import scala.collection.Seq class TestInsertTable extends HoodieSparkSqlTestBase { @@ -3093,59 +3090,16 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } - test("Test table with insert dup policy - drop case") { - withSQLConf("hoodie.datasource.insert.dup.policy" -> "drop") { - withTempDir { tmp => - val targetTable = generateTableName - val tablePath = s"${tmp.getCanonicalPath}/$targetTable" - - spark.sql( - s""" - |create table ${targetTable} ( - | `id` string, - | `name` string, - | `dt` bigint, - | `day` STRING, - | `hour` INT - |) using hudi - |tblproperties ( - | 'primaryKey' = 'id', - | 'type' = 'MOR', - | 'preCombineField'='dt', - | 'hoodie.index.type' = 'BUCKET', - | 'hoodie.bucket.index.hash.field' = 'id', - | 'hoodie.bucket.index.num.buckets'= 512 - | ) - partitioned by (`day`,`hour`) - location '${tablePath}' - """.stripMargin) - - spark.sql("set spark.sql.shuffle.partitions = 11") - spark.sql( - s""" - |insert into ${targetTable} - |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 10 as `hour` - |""".stripMargin) - spark.sql( - s""" - |insert into ${targetTable} - |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour` - |""".stripMargin) - - // check result after insert and merge data into target table - checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")( - Seq("1", "aa", 123, "2024-02-19", 10) - ) - } - } - } - - test("Test table with insert dup policy - fail case") { - withSQLConf("hoodie.datasource.insert.dup.policy" -> "fail") { - withTempDir { tmp => + test("Test table with insert dup policy") { + withTempDir { tmp => + Seq( + NONE_INSERT_DUP_POLICY, + FAIL_INSERT_DUP_POLICY, + DROP_INSERT_DUP_POLICY).foreach(policy => { val targetTable = generateTableName val tablePath = s"${tmp.getCanonicalPath}/$targetTable" + spark.sql(s"set hoodie.datasource.insert.dup.policy=$policy") spark.sql( s""" |create table ${targetTable} ( @@ -3173,59 +3127,30 @@ class TestInsertTable extends HoodieSparkSqlTestBase { |insert into ${targetTable} |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 10 as `hour` |""".stripMargin) - checkException( - () => spark.sql(s""" - |insert into ${targetTable} - |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour` - |""".stripMargin))(s"Duplicate records detected. The number of records after deduplication (0) is less than the incoming records (1).") - } - } - } - - test("Test table with insert dup policy - none case") { - withSQLConf("hoodie.datasource.insert.dup.policy" -> "none") { - withTempDir { tmp => - val targetTable = generateTableName - val tablePath = s"${tmp.getCanonicalPath}/$targetTable" - - spark.sql( - s""" - |create table ${targetTable} ( - | `id` string, - | `name` string, - | `dt` bigint, - | `day` STRING, - | `hour` INT - |) using hudi - |tblproperties ( - | 'primaryKey' = 'id', - | 'type' = 'MOR', - | 'preCombineField'= 'dt', - | 'hoodie.index.type' = 'BUCKET', - | 'hoodie.bucket.index.hash.field' = 'id', - | 'hoodie.bucket.index.num.buckets'= 512 - | ) - partitioned by (`day`,`hour`) - location '${tablePath}' - """.stripMargin) - - spark.sql("set spark.sql.shuffle.partitions = 11") - spark.sql( - s""" - |insert into ${targetTable} - |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 10 as `hour` - |""".stripMargin) - spark.sql( - s""" - |insert into ${targetTable} - |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour` - |""".stripMargin) - - // check result after insert and merge data into target table - checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")( - Seq("1", "aa", 1234, "2024-02-19", 10) - ) - } + if (policy.equals(FAIL_INSERT_DUP_POLICY)) { + checkExceptionContain( + () => spark.sql( + s""" + |insert into ${targetTable} + |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour` + |""".stripMargin))(s"Duplicate key found for insert statement, key is: 1") + } else { + spark.sql( + s""" + |insert into ${targetTable} + |select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour` + |""".stripMargin) + if (policy.equals(NONE_INSERT_DUP_POLICY)) { + checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")( + Seq("1", "aa", 1234, "2024-02-19", 10) + ) + } else { + checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")( + Seq("1", "aa", 123, "2024-02-19", 10) + ) + } + } + }) } } }