Skip to content

Commit

Permalink
Enforce insert deduplicate policy for spark SQL
Browse files Browse the repository at this point in the history
  • Loading branch information
linliu-code committed Jan 7, 2025
1 parent 6c73075 commit 66d6247
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTable
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams
import org.apache.hudi.HoodieSparkSqlWriterInternal.handleInsertDuplicates
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema
Expand All @@ -45,7 +46,7 @@ import org.apache.hudi.common.model._
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys
import org.apache.hudi.common.util.ConfigUtils.{getAllConfigKeys, getStringWithAltKeys}
import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, WRITE_TABLE_VERSION}
Expand All @@ -66,8 +67,7 @@ import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
import org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
import org.apache.hudi.util.SparkKeyGenUtils

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.sql.HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -79,7 +79,6 @@ import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.slf4j.LoggerFactory

import java.util.function.BiConsumer

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -506,12 +505,8 @@ class HoodieSparkSqlWriterInternal {
case Failure(e) => throw new HoodieRecordCreationException("Failed to create Hoodie Spark Record", e)
}

val dedupedHoodieRecords =
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) && operation != WriteOperationType.INSERT_OVERWRITE_TABLE && operation != WriteOperationType.INSERT_OVERWRITE) {
DataSourceUtils.dropDuplicates(jsc, hoodieRecords, parameters.asJava)
} else {
hoodieRecords
}
// Remove duplicates from incoming records based on existing keys from storage.
val dedupedHoodieRecords = handleInsertDuplicates(hoodieRecords, hoodieConfig, operation, jsc, parameters)
client.startCommitWithTime(instantTime, commitActionType)
try {
val writeResult = DataSourceUtils.doWriteOperation(client, dedupedHoodieRecords, instantTime, operation,
Expand Down Expand Up @@ -1165,3 +1160,53 @@ class HoodieSparkSqlWriterInternal {
.map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq newQueryExecution)
}
}

object HoodieSparkSqlWriterInternal {
// Check if duplicates should be dropped.
def shouldDropDuplicatesForInserts(hoodieConfig: HoodieConfig): Boolean = {
hoodieConfig.contains(INSERT_DUP_POLICY) &&
hoodieConfig.getString(INSERT_DUP_POLICY).equalsIgnoreCase(DROP_INSERT_DUP_POLICY)
}

// Check if we should fail if duplicates are found.
def shouldFailWhenDuplicatesFound(hoodieConfig: HoodieConfig): Boolean = {
hoodieConfig.contains(INSERT_DUP_POLICY) &&
hoodieConfig.getString(INSERT_DUP_POLICY).equalsIgnoreCase(FAIL_INSERT_DUP_POLICY)
}

// Check if deduplication is required.
def isDeduplicationRequired(hoodieConfig: HoodieConfig): Boolean = {
hoodieConfig.getBoolean(INSERT_DROP_DUPS) ||
shouldFailWhenDuplicatesFound(hoodieConfig) ||
shouldDropDuplicatesForInserts(hoodieConfig)
}

// Check if deduplication is needed.
def isDeduplicationNeeded(operation: WriteOperationType): Boolean = {
operation != WriteOperationType.INSERT_OVERWRITE_TABLE &&
operation != WriteOperationType.INSERT_OVERWRITE
}

def handleInsertDuplicates(incomingRecords: JavaRDD[HoodieRecord[_]],
hoodieConfig: HoodieConfig,
operation: WriteOperationType,
jsc: JavaSparkContext,
parameters: Map[String, String]): JavaRDD[HoodieRecord[_]] = {
// If no deduplication is needed, return the incoming records as is
if (!isDeduplicationRequired(hoodieConfig) || !isDeduplicationNeeded(operation)) {
return incomingRecords
}
// Perform deduplication
val deduplicatedRecords = DataSourceUtils.dropDuplicates(jsc, incomingRecords, parameters.asJava)
// Check if duplicates were found and fail if necessary
if (shouldFailWhenDuplicatesFound(hoodieConfig) &&
deduplicatedRecords.count() < incomingRecords.count()) {
throw new HoodieException(
s"Duplicate records detected. " +
s"The number of records after deduplication (${deduplicatedRecords.count()}) " +
s"is less than the incoming records (${incomingRecords.count()})."
)
}
deduplicatedRecords
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestUtils}
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient

import org.apache.avro.Schema
import org.apache.commons.io.FileUtils
import org.apache.hudi.DataSourceWriteOptions.{DROP_INSERT_DUP_POLICY, FAIL_INSERT_DUP_POLICY, INSERT_DROP_DUPS, INSERT_DUP_POLICY}
import org.apache.hudi.common.config.HoodieConfig
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{expr, lit}
Expand Down Expand Up @@ -1245,6 +1246,56 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
assert(exc.getMessage.contains("Consistent hashing bucket index does not work with COW table. Use simple bucket index or an MOR table."))
}

@Test
def testShouldDropDuplicatesForInserts(): Unit = {
val hoodieConfig: HoodieConfig = new HoodieConfig()
var shouldDrop: Boolean = HoodieSparkSqlWriterInternal.shouldDropDuplicatesForInserts(hoodieConfig)
assertFalse(shouldDrop)

hoodieConfig.setValue(INSERT_DUP_POLICY.key, DROP_INSERT_DUP_POLICY)
shouldDrop = HoodieSparkSqlWriterInternal.shouldDropDuplicatesForInserts(hoodieConfig)
assertTrue(shouldDrop)
}

@Test
def testShouldFailWhenDuplicatesFound(): Unit = {
val hoodieConfig: HoodieConfig = new HoodieConfig()
var shouldFail: Boolean = HoodieSparkSqlWriterInternal.shouldFailWhenDuplicatesFound(hoodieConfig)
assertFalse(shouldFail)

hoodieConfig.setValue(INSERT_DUP_POLICY.key, FAIL_INSERT_DUP_POLICY)
shouldFail = HoodieSparkSqlWriterInternal.shouldFailWhenDuplicatesFound(hoodieConfig)
assertTrue(shouldFail)
}

@Test
def testIsDeduplicationRequired(): Unit = {
val hoodieConfig: HoodieConfig = new HoodieConfig()
var isRequired: Boolean = HoodieSparkSqlWriterInternal.isDeduplicationRequired(hoodieConfig)
assertFalse(isRequired)

hoodieConfig.setValue(INSERT_DUP_POLICY.key, FAIL_INSERT_DUP_POLICY)
isRequired = HoodieSparkSqlWriterInternal.isDeduplicationRequired(hoodieConfig)
assertTrue(isRequired)

hoodieConfig.setValue(INSERT_DUP_POLICY.key, DROP_INSERT_DUP_POLICY)
isRequired = HoodieSparkSqlWriterInternal.isDeduplicationRequired(hoodieConfig)
assertTrue(isRequired)

hoodieConfig.setValue(INSERT_DUP_POLICY.key, "")
hoodieConfig.setValue(INSERT_DROP_DUPS.key, "true")
isRequired = HoodieSparkSqlWriterInternal.isDeduplicationRequired(hoodieConfig)
assertTrue(isRequired)
}

@Test
def testIsDeduplicationNeeded(): Unit = {
assertFalse(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.INSERT_OVERWRITE))
assertFalse(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.INSERT_OVERWRITE_TABLE))
assertTrue(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.INSERT))
assertTrue(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.UPSERT))
}

private def fetchActualSchema(): Schema = {
val tableMetaClient = createMetaClient(spark, tempBasePath)
new TableSchemaResolver(tableMetaClient).getTableAvroSchema(false)
Expand Down Expand Up @@ -1272,5 +1323,4 @@ object TestHoodieSparkSqlWriter {
Arguments.arguments("*5/03/1*", Seq("2016/03/15")),
Arguments.arguments("2016/03/*", Seq("2015/03/16", "2015/03/17")))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3087,4 +3087,140 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
})
}
}

test("Test table with insert dup policy - drop case") {
withSQLConf("hoodie.datasource.insert.dup.policy" -> "drop") {
withTempDir { tmp =>
val targetTable = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$targetTable"

spark.sql(
s"""
|create table ${targetTable} (
| `id` string,
| `name` string,
| `dt` bigint,
| `day` STRING,
| `hour` INT
|) using hudi
|tblproperties (
| 'primaryKey' = 'id',
| 'type' = 'MOR',
| 'preCombineField'='dt',
| 'hoodie.index.type' = 'BUCKET',
| 'hoodie.bucket.index.hash.field' = 'id',
| 'hoodie.bucket.index.num.buckets'=512
| )
partitioned by (`day`,`hour`)
location '${tablePath}'
""".stripMargin)

spark.sql("set spark.sql.shuffle.partitions = 11")
spark.sql(
s"""
|insert into ${targetTable}
|select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 10 as `hour`
|""".stripMargin)
spark.sql(
s"""
|insert into ${targetTable}
|select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour`
|""".stripMargin)

// check result after insert and merge data into target table
checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")(
Seq("1", "aa", 123, "2024-02-19", 10)
)
}
}
}

test("Test table with insert dup policy - fail case") {
withSQLConf("hoodie.datasource.insert.dup.policy" -> "fail") {
withTempDir { tmp =>
val targetTable = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$targetTable"

spark.sql(
s"""
|create table ${targetTable} (
| `id` string,
| `name` string,
| `dt` bigint,
| `day` STRING,
| `hour` INT
|) using hudi
|tblproperties (
| 'primaryKey' = 'id',
| 'type' = 'MOR',
| 'preCombineField'='dt',
| 'hoodie.index.type' = 'BUCKET',
| 'hoodie.bucket.index.hash.field' = 'id',
| 'hoodie.bucket.index.num.buckets'=512
| )
partitioned by (`day`,`hour`)
location '${tablePath}'
""".stripMargin)

spark.sql("set spark.sql.shuffle.partitions = 11")
spark.sql(
s"""
|insert into ${targetTable}
|select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 10 as `hour`
|""".stripMargin)
checkException(
() => spark.sql(s"""
|insert into ${targetTable}
|select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour`
|""".stripMargin))(s"Duplicate records detected. The number of records after deduplication (0) is less than the incoming records (1).")
}
}
}

test("Test table with insert dup policy - none case") {
withSQLConf("hoodie.datasource.insert.dup.policy" -> "none") {
withTempDir { tmp =>
val targetTable = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$targetTable"

spark.sql(
s"""
|create table ${targetTable} (
| `id` string,
| `name` string,
| `dt` bigint,
| `day` STRING,
| `hour` INT
|) using hudi
|tblproperties (
| 'primaryKey' = 'id',
| 'type' = 'MOR',
| 'preCombineField'='dt',
| 'hoodie.index.type' = 'BUCKET',
| 'hoodie.bucket.index.hash.field' = 'id',
| 'hoodie.bucket.index.num.buckets'=512
| )
partitioned by (`day`,`hour`)
location '${tablePath}'
""".stripMargin)

spark.sql("set spark.sql.shuffle.partitions = 11")
spark.sql(
s"""
|insert into ${targetTable}
|select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 10 as `hour`
|""".stripMargin)
spark.sql(
s"""
|insert into ${targetTable}
|select '1' as id, 'aa' as name, 1234 as dt, '2024-02-19' as `day`, 10 as `hour`
|""".stripMargin)

// check result after insert and merge data into target table
checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")(
Seq("1", "aa", 1234, "2024-02-19", 10)
)
}
}
}
}

0 comments on commit 66d6247

Please sign in to comment.