Skip to content

Commit

Permalink
Fix the tests
Browse files Browse the repository at this point in the history
  • Loading branch information
linliu-code committed Jan 8, 2025
1 parent bf6e175 commit 092c792
Showing 1 changed file with 31 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,9 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getLastCommitMetadata
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}

import java.io.File
import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection.Seq

class TestInsertTable extends HoodieSparkSqlTestBase {

Expand Down Expand Up @@ -3093,59 +3090,16 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}

test("Test table with insert dup policy - drop case") {
withSQLConf("hoodie.datasource.insert.dup.policy" -> "drop") {
withTempDir { tmp =>
val targetTable = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$targetTable"

spark.sql(
s"""
|create table ${targetTable} (
| `id` string,
| `name` string,
| `dt` bigint,
| `day` STRING,
| `hour` INT
|) using hudi
|tblproperties (
| 'primaryKey' = 'id',
| 'type' = 'MOR',
| 'preCombineField'='dt',
| 'hoodie.index.type' = 'BUCKET',
| 'hoodie.bucket.index.hash.field' = 'id',
| 'hoodie.bucket.index.num.buckets'= 512
| )
partitioned by (`day`,`hour`)
location '${tablePath}'
""".stripMargin)

spark.sql("set spark.sql.shuffle.partitions = 11")
spark.sql(
s"""
|insert into ${targetTable}
|select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 10 as `hour`
|""".stripMargin)
spark.sql(
s"""
|insert into ${targetTable}
|select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour`
|""".stripMargin)

// check result after insert and merge data into target table
checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")(
Seq("1", "aa", 123, "2024-02-19", 10)
)
}
}
}

test("Test table with insert dup policy - fail case") {
withSQLConf("hoodie.datasource.insert.dup.policy" -> "fail") {
withTempDir { tmp =>
test("Test table with insert dup policy") {
withTempDir { tmp =>
Seq(
NONE_INSERT_DUP_POLICY,
FAIL_INSERT_DUP_POLICY,
DROP_INSERT_DUP_POLICY).foreach(policy => {
val targetTable = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$targetTable"

spark.sql(s"set hoodie.datasource.insert.dup.policy=$policy")
spark.sql(
s"""
|create table ${targetTable} (
Expand Down Expand Up @@ -3173,59 +3127,30 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
|insert into ${targetTable}
|select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 10 as `hour`
|""".stripMargin)
checkException(
() => spark.sql(s"""
|insert into ${targetTable}
|select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour`
|""".stripMargin))(s"Duplicate records detected. The number of records after deduplication (0) is less than the incoming records (1).")
}
}
}

test("Test table with insert dup policy - none case") {
withSQLConf("hoodie.datasource.insert.dup.policy" -> "none") {
withTempDir { tmp =>
val targetTable = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$targetTable"

spark.sql(
s"""
|create table ${targetTable} (
| `id` string,
| `name` string,
| `dt` bigint,
| `day` STRING,
| `hour` INT
|) using hudi
|tblproperties (
| 'primaryKey' = 'id',
| 'type' = 'MOR',
| 'preCombineField'= 'dt',
| 'hoodie.index.type' = 'BUCKET',
| 'hoodie.bucket.index.hash.field' = 'id',
| 'hoodie.bucket.index.num.buckets'= 512
| )
partitioned by (`day`,`hour`)
location '${tablePath}'
""".stripMargin)

spark.sql("set spark.sql.shuffle.partitions = 11")
spark.sql(
s"""
|insert into ${targetTable}
|select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 10 as `hour`
|""".stripMargin)
spark.sql(
s"""
|insert into ${targetTable}
|select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour`
|""".stripMargin)

// check result after insert and merge data into target table
checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")(
Seq("1", "aa", 1234, "2024-02-19", 10)
)
}
if (policy.equals(FAIL_INSERT_DUP_POLICY)) {
checkExceptionContain(
() => spark.sql(
s"""
|insert into ${targetTable}
|select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour`
|""".stripMargin))(s"Duplicate key found for insert statement, key is: 1")
} else {
spark.sql(
s"""
|insert into ${targetTable}
|select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour`
|""".stripMargin)
if (policy.equals(NONE_INSERT_DUP_POLICY)) {
checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")(
Seq("1", "aa", 1234, "2024-02-19", 10)
)
} else {
checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")(
Seq("1", "aa", 123, "2024-02-19", 10)
)
}
}
})
}
}
}
Expand Down

0 comments on commit 092c792

Please sign in to comment.