Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8758] Enforce insert deduplicate policy for spark SQL #12588

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieDuplicateKeyException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.storage.HoodieStorage;
Expand Down Expand Up @@ -284,24 +285,47 @@ public static HoodieRecord createHoodieRecord(GenericRecord gr, HoodieKey hKey,
* @param writeConfig HoodieWriteConfig
*/
@SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(HoodieSparkEngineContext engineContext, JavaRDD<HoodieRecord> incomingHoodieRecords,
HoodieWriteConfig writeConfig) {
public static JavaRDD<HoodieRecord> doDropDuplicates(HoodieSparkEngineContext engineContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should rename this to handleDuplicates

JavaRDD<HoodieRecord> incomingHoodieRecords,
HoodieWriteConfig writeConfig,
boolean failOnDuplicates) {
try {
SparkRDDReadClient client = new SparkRDDReadClient<>(engineContext, writeConfig);
return client.tagLocation(incomingHoodieRecords)
.filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown());
.filter(r -> shouldIncludeRecord((HoodieRecord<HoodieRecordPayload>) r, failOnDuplicates));
} catch (TableNotFoundException e) {
// this will be executed when there is no hoodie table yet
// so no dups to drop
// No table exists yet, so no duplicates to drop
return incomingHoodieRecords;
}
}

/**
* Determines if a record should be included in the result after deduplication.
*
* @param record The Hoodie record to evaluate.
* @param failOnDuplicates Whether to fail on detecting duplicates.
* @return true if the record should be included; false otherwise.
*/
private static boolean shouldIncludeRecord(HoodieRecord<?> record, boolean failOnDuplicates) {
if (!record.isCurrentLocationKnown()) {
return true;
}
if (failOnDuplicates) {
// Fail if duplicates are found and the flag is set
throw new HoodieDuplicateKeyException(record.getRecordKey());
}
return false;
}

@SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords,
Map<String, String> parameters) {
HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build();
return dropDuplicates(new HoodieSparkEngineContext(jssc), incomingHoodieRecords, writeConfig);
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, lets rename this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add java docs please

JavaRDD<HoodieRecord> incomingHoodieRecords,
Map<String, String> parameters,
boolean failOnDuplicates) {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(parameters.get("path"))
.withProps(parameters).build();
return doDropDuplicates(
new HoodieSparkEngineContext(jssc), incomingHoodieRecords, writeConfig, failOnDuplicates);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTable
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams
import org.apache.hudi.HoodieSparkSqlWriterInternal.{handleInsertDuplicates, shouldDropDuplicatesForInserts, shouldFailWhenDuplicatesFound}
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema
Expand All @@ -45,7 +46,7 @@ import org.apache.hudi.common.model._
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ConfigUtils.getAllConfigKeys
import org.apache.hudi.common.util.ConfigUtils.{getAllConfigKeys, getStringWithAltKeys}
import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, WRITE_TABLE_VERSION}
Expand All @@ -66,8 +67,7 @@ import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
import org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
import org.apache.hudi.util.SparkKeyGenUtils

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.sql.HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -79,7 +79,6 @@ import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.slf4j.LoggerFactory

import java.util.function.BiConsumer

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -506,12 +505,8 @@ class HoodieSparkSqlWriterInternal {
case Failure(e) => throw new HoodieRecordCreationException("Failed to create Hoodie Spark Record", e)
}

val dedupedHoodieRecords =
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) && operation != WriteOperationType.INSERT_OVERWRITE_TABLE && operation != WriteOperationType.INSERT_OVERWRITE) {
DataSourceUtils.dropDuplicates(jsc, hoodieRecords, parameters.asJava)
} else {
hoodieRecords
}
// Remove duplicates from incoming records based on existing keys from storage.
val dedupedHoodieRecords = handleInsertDuplicates(hoodieRecords, hoodieConfig, operation, jsc, parameters)
client.startCommitWithTime(instantTime, commitActionType)
try {
val writeResult = DataSourceUtils.doWriteOperation(client, dedupedHoodieRecords, instantTime, operation,
Expand Down Expand Up @@ -554,13 +549,16 @@ class HoodieSparkSqlWriterInternal {
var operation = WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
// TODO clean up
// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS is true
// or INSERT_DUP_POLICY is `drop` or `fail`.
// Auto-correct the operation to "insert" if OPERATION is set to "upsert" wrongly
// or not set (in which case it will be set as "upsert" by parametersWithWriteDefaults()) .
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
if ((hoodieConfig.getBoolean(INSERT_DROP_DUPS) ||
shouldFailWhenDuplicatesFound(hoodieConfig) ||
shouldDropDuplicatesForInserts(hoodieConfig)) &&
operation == WriteOperationType.UPSERT) {

log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
s"when $INSERT_DROP_DUPS is set to be true, " +
s"when $INSERT_DROP_DUPS is set to be true, or $INSERT_DUP_POLICY is set to fail or drop, " +
s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")

operation = WriteOperationType.INSERT
Expand Down Expand Up @@ -1165,3 +1163,45 @@ class HoodieSparkSqlWriterInternal {
.map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq newQueryExecution)
}
}

object HoodieSparkSqlWriterInternal {
// Check if duplicates should be dropped.
def shouldDropDuplicatesForInserts(hoodieConfig: HoodieConfig): Boolean = {
hoodieConfig.contains(INSERT_DUP_POLICY) &&
hoodieConfig.getString(INSERT_DUP_POLICY).equalsIgnoreCase(DROP_INSERT_DUP_POLICY)
}

// Check if we should fail if duplicates are found.
def shouldFailWhenDuplicatesFound(hoodieConfig: HoodieConfig): Boolean = {
hoodieConfig.contains(INSERT_DUP_POLICY) &&
hoodieConfig.getString(INSERT_DUP_POLICY).equalsIgnoreCase(FAIL_INSERT_DUP_POLICY)
}

// Check if deduplication is required.
def isDeduplicationRequired(hoodieConfig: HoodieConfig): Boolean = {
hoodieConfig.getBoolean(INSERT_DROP_DUPS) ||
shouldFailWhenDuplicatesFound(hoodieConfig) ||
shouldDropDuplicatesForInserts(hoodieConfig)
}

// Check if deduplication is needed.
def isDeduplicationNeeded(operation: WriteOperationType): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from where did you pull this from?
is it somewhere in master. or are we introducing this newly in this patch ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it from the master, LINE 510. I modified it to narrow down the impact to only insert. Previously it could impact upsert also.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha.
btw, this could apply only for INSERT and not INSERT_PREPPED

operation == WriteOperationType.INSERT ||
operation == WriteOperationType.INSERT_PREPPED
}

def handleInsertDuplicates(incomingRecords: JavaRDD[HoodieRecord[_]],
hoodieConfig: HoodieConfig,
operation: WriteOperationType,
jsc: JavaSparkContext,
parameters: Map[String, String]): JavaRDD[HoodieRecord[_]] = {
// If no deduplication is needed, return the incoming records as is
if (!isDeduplicationRequired(hoodieConfig) || !isDeduplicationNeeded(operation)) {
incomingRecords
} else {
// Perform deduplication
DataSourceUtils.dropDuplicates(
jsc, incomingRecords, parameters.asJava, shouldFailWhenDuplicatesFound(hoodieConfig))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hudi.HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{filterHoodieConfigs, isUsingHiveCatalog}
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, getPartitionPathFieldWriteConfig}
import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
import org.apache.spark.sql.hudi.command.SqlKeyGenerator
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PARTITION_OVERWRITE_MODE
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -96,23 +96,6 @@ trait ProvidesHoodieConfig extends Logging {
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
}


private def deducePayloadClassNameLegacy(operation: String, tableType: String, insertMode: InsertMode): String = {
if (operation == UPSERT_OPERATION_OPT_VAL &&
tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
// Validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload
// on reading.
// TODO use HoodieSparkValidateDuplicateKeyRecordMerger when SparkRecordMerger is default
classOf[ValidateDuplicateKeyPayload].getCanonicalName
} else if (operation == INSERT_OPERATION_OPT_VAL && tableType == COW_TABLE_TYPE_OPT_VAL &&
insertMode == InsertMode.STRICT) {
// Validate duplicate key for inserts to COW table when using strict insert mode.
classOf[ValidateDuplicateKeyPayload].getCanonicalName
} else {
classOf[DefaultHoodieRecordPayload].getCanonicalName
}
}

/**
* Deduce the sql write operation for INSERT_INTO
*/
Expand Down Expand Up @@ -274,25 +257,9 @@ trait ProvidesHoodieConfig extends Logging {
Map()
}

// try to use new insert dup policy instead of legacy insert mode to deduce payload class. If only insert mode is explicitly specified,
// w/o specifying any value for insert dup policy, legacy configs will be honored. But on all other cases (i.e when neither of the configs is set,
// or when both configs are set, or when only insert dup policy is set), we honor insert dup policy and ignore the insert mode.
val useLegacyInsertDropDupFlow = insertModeSet && !insertDupPolicySet
val deducedPayloadClassName = if (useLegacyInsertDropDupFlow) {
deducePayloadClassNameLegacy(operation, tableType, insertMode)
} else {
if (insertDupPolicy == FAIL_INSERT_DUP_POLICY) {
classOf[ValidateDuplicateKeyPayload].getCanonicalName
} else {
classOf[DefaultHoodieRecordPayload].getCanonicalName
}
}

val (recordMergeMode, recordMergeStrategy) = if (deducedPayloadClassName.equals(classOf[ValidateDuplicateKeyPayload].getCanonicalName)) {
(RecordMergeMode.CUSTOM.name(), HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID)
} else {
(RecordMergeMode.EVENT_TIME_ORDERING.name(), HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID)
}
val deducedPayloadClassName = classOf[DefaultHoodieRecordPayload].getCanonicalName
val recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING.name
val recordMergeStrategy = HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID

if (tableConfig.getPayloadClass.equals(classOf[DefaultHoodieRecordPayload].getCanonicalName) &&
tableConfig.getRecordMergeMode.equals(RecordMergeMode.EVENT_TIME_ORDERING)) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void testDeduplicationAgainstRecordsAlreadyInTable() {
Map<String, String> parameters = config.getProps().entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().toString(), entry -> entry.getValue().toString()));
List<HoodieRecord> newRecords = dataGen.generateInserts(newCommitTime, 10);
List<HoodieRecord> inputRecords = Stream.concat(records.subList(0, 10).stream(), newRecords.stream()).collect(Collectors.toList());
List<HoodieRecord> output = DataSourceUtils.dropDuplicates(jsc, jsc.parallelize(inputRecords, 1), parameters).collect();
List<HoodieRecord> output = DataSourceUtils.dropDuplicates(jsc, jsc.parallelize(inputRecords, 1), parameters, false).collect();
Set<String> expectedRecordKeys = newRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet());
assertEquals(expectedRecordKeys, output.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestUtils}
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient

import org.apache.avro.Schema
import org.apache.commons.io.FileUtils
import org.apache.hudi.DataSourceWriteOptions.{DROP_INSERT_DUP_POLICY, FAIL_INSERT_DUP_POLICY, INSERT_DROP_DUPS, INSERT_DUP_POLICY}
import org.apache.hudi.common.config.HoodieConfig
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{expr, lit}
Expand Down Expand Up @@ -1245,6 +1246,57 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
assert(exc.getMessage.contains("Consistent hashing bucket index does not work with COW table. Use simple bucket index or an MOR table."))
}

@Test
def testShouldDropDuplicatesForInserts(): Unit = {
val hoodieConfig: HoodieConfig = new HoodieConfig()
var shouldDrop: Boolean = HoodieSparkSqlWriterInternal.shouldDropDuplicatesForInserts(hoodieConfig)
assertFalse(shouldDrop)

hoodieConfig.setValue(INSERT_DUP_POLICY.key, DROP_INSERT_DUP_POLICY)
shouldDrop = HoodieSparkSqlWriterInternal.shouldDropDuplicatesForInserts(hoodieConfig)
assertTrue(shouldDrop)
}

@Test
def testShouldFailWhenDuplicatesFound(): Unit = {
val hoodieConfig: HoodieConfig = new HoodieConfig()
var shouldFail: Boolean = HoodieSparkSqlWriterInternal.shouldFailWhenDuplicatesFound(hoodieConfig)
assertFalse(shouldFail)

hoodieConfig.setValue(INSERT_DUP_POLICY.key, FAIL_INSERT_DUP_POLICY)
shouldFail = HoodieSparkSqlWriterInternal.shouldFailWhenDuplicatesFound(hoodieConfig)
assertTrue(shouldFail)
}

@Test
def testIsDeduplicationRequired(): Unit = {
val hoodieConfig: HoodieConfig = new HoodieConfig()
var isRequired: Boolean = HoodieSparkSqlWriterInternal.isDeduplicationRequired(hoodieConfig)
assertFalse(isRequired)

hoodieConfig.setValue(INSERT_DUP_POLICY.key, FAIL_INSERT_DUP_POLICY)
isRequired = HoodieSparkSqlWriterInternal.isDeduplicationRequired(hoodieConfig)
assertTrue(isRequired)

hoodieConfig.setValue(INSERT_DUP_POLICY.key, DROP_INSERT_DUP_POLICY)
isRequired = HoodieSparkSqlWriterInternal.isDeduplicationRequired(hoodieConfig)
assertTrue(isRequired)

hoodieConfig.setValue(INSERT_DUP_POLICY.key, "")
hoodieConfig.setValue(INSERT_DROP_DUPS.key, "true")
isRequired = HoodieSparkSqlWriterInternal.isDeduplicationRequired(hoodieConfig)
assertTrue(isRequired)
}

@Test
def testIsDeduplicationNeeded(): Unit = {
assertFalse(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.INSERT_OVERWRITE))
assertFalse(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.INSERT_OVERWRITE_TABLE))
assertFalse(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.UPSERT))
assertTrue(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.INSERT))
assertTrue(HoodieSparkSqlWriterInternal.isDeduplicationNeeded(WriteOperationType.INSERT_PREPPED))
}

private def fetchActualSchema(): Schema = {
val tableMetaClient = createMetaClient(spark, tempBasePath)
new TableSchemaResolver(tableMetaClient).getTableAvroSchema(false)
Expand Down Expand Up @@ -1272,5 +1324,4 @@ object TestHoodieSparkSqlWriter {
Arguments.arguments("*5/03/1*", Seq("2016/03/15")),
Arguments.arguments("2016/03/*", Seq("2015/03/16", "2015/03/17")))
}

}
Loading
Loading