-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8758] Enforce insert deduplicate policy for spark SQL #12588
base: master
Are you sure you want to change the base?
Conversation
} | ||
|
||
// Check if deduplication is needed. | ||
def isDeduplicationNeeded(operation: WriteOperationType): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from where did you pull this from?
is it somewhere in master. or are we introducing this newly in this patch ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I take it from the master, LINE 510. I modified it to narrow down the impact to only insert. Previously it could impact upsert also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha.
btw, this could apply only for INSERT and not INSERT_PREPPED
incomingRecords | ||
} else { | ||
// Perform deduplication | ||
val deduplicatedRecords = DataSourceUtils.dropDuplicates(jsc, incomingRecords, parameters.asJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be better if we push it down to
public static JavaRDD<HoodieRecord> dropDuplicates(HoodieSparkEngineContext engineContext, JavaRDD<HoodieRecord> incomingHoodieRecords,
HoodieWriteConfig writeConfig) {
try {
SparkRDDReadClient client = new SparkRDDReadClient<>(engineContext, writeConfig);
return client.tagLocation(incomingHoodieRecords)
.filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown());
} catch (TableNotFoundException e) {
// this will be executed when there is no hoodie table yet
// so no dups to drop
return incomingHoodieRecords;
}
}
this method in DataSourceUtils only.
why trigger the dag twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
@@ -3087,4 +3089,140 @@ class TestInsertTable extends HoodieSparkSqlTestBase { | |||
}) | |||
} | |||
} | |||
|
|||
test("Test table with insert dup policy - drop case") { | |||
withSQLConf("hoodie.datasource.insert.dup.policy" -> "drop") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add tests for all 3 drop dup policy at spark ds layer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add the test in the second part. I can move it here.
|""".stripMargin) | ||
|
||
// check result after insert and merge data into target table | ||
checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have if
, else if
, and else
branch here and have just 1 test method. Why duplicate the test code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in that way, if one case fail, the other cases will stop. I remember parameterization does not work here. Need to confirm.
@@ -284,24 +285,47 @@ public static HoodieRecord createHoodieRecord(GenericRecord gr, HoodieKey hKey, | |||
* @param writeConfig HoodieWriteConfig | |||
*/ | |||
@SuppressWarnings("unchecked") | |||
public static JavaRDD<HoodieRecord> dropDuplicates(HoodieSparkEngineContext engineContext, JavaRDD<HoodieRecord> incomingHoodieRecords, | |||
HoodieWriteConfig writeConfig) { | |||
public static JavaRDD<HoodieRecord> doDropDuplicates(HoodieSparkEngineContext engineContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should rename this to handleDuplicates
HoodieWriteConfig writeConfig = | ||
HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build(); | ||
return dropDuplicates(new HoodieSparkEngineContext(jssc), incomingHoodieRecords, writeConfig); | ||
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, lets rename this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add java docs please
} | ||
|
||
// Check if deduplication is needed. | ||
def isDeduplicationNeeded(operation: WriteOperationType): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha.
btw, this could apply only for INSERT and not INSERT_PREPPED
(10, "3", "rider-C", "driver-C", 33.90, 10), | ||
(11, "5", "rider-C", "driver-C", 3.3, 3)) | ||
val expectedForNone: Seq[(Int, String, String, String, Double, Int)] = Seq( | ||
(11, "1", "rider-A", "driver-A", 1.1, 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we expect to see duplicate records with NONE as policy value ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
concat handle should be enabled by default right
java.util.Arrays.asList( | ||
Arguments.of("MERGE_ON_READ", "AVRO", NONE_INSERT_DUP_POLICY), | ||
Arguments.of("MERGE_ON_READ", "SPARK", NONE_INSERT_DUP_POLICY), | ||
Arguments.of("MERGE_ON_READ", "AVRO", DROP_INSERT_DUP_POLICY), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can avoid multiple record types. just table type and dup policy combos would do.
|""".stripMargin) | ||
if (policy.equals(NONE_INSERT_DUP_POLICY)) { | ||
checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 10")( | ||
Seq("1", "aa", 1234, "2024-02-19", 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above. we should see duplicates.
Change Logs
Based on value of
hoodie.datasource.insert.dup.policy
, we do:if its value is
drop
, we remove the duplicate records before insert;if its value is
fail
, we fail the insert query; andif its value is
none
or by default, we do not do dedup. (common path)Impact
This helps the Spark SQL insert query logic.
Risk level (write none, low medium or high below)
Medium.
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist