Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8553] Support writing record positions to log blocks from Spark SQL UPDATE and DELETE statements #12612

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -591,4 +591,20 @@ private static void findAnyMissingFieldsRec(Schema incomingSchema,
public static String createSchemaErrorString(String errorMessage, Schema writerSchema, Schema tableSchema) {
return String.format("%s\nwriterSchema: %s\ntableSchema: %s", errorMessage, writerSchema, tableSchema);
}

public static Schema projectSchema(Schema schema, List<String> fieldNames, Schema.Field newField) {
List<Schema.Field> fieldList = fieldNames.stream().map(name -> {
Schema.Field originalField = schema.getField(name);
return new Schema.Field(
originalField.name(),
originalField.schema(),
originalField.doc(),
originalField.defaultVal());
}).collect(Collectors.toList());
fieldList.add(newField);
Schema newSchema = Schema.createRecord(
schema.getName(), "", schema.getNamespace(), false);
newSchema.setFields(fieldList);
return newSchema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,12 @@ object DataSourceWriteOptions {
* operations are already prepped.
*/
val SPARK_SQL_WRITES_PREPPED_KEY = "_hoodie.spark.sql.writes.prepped";
val ROW_POSITION_META_COLUMN = "_hoodie_row_position"
val SPARK_READ_ATTACH_ROW_POSITION: ConfigProperty[Boolean] = ConfigProperty
.key("_hoodie.spark.read.attach.row.position")
.defaultValue(false)
.withDocumentation("When enabled, the datasource read and SQL query return the results with "
+ s"row position column, $ROW_POSITION_META_COLUMN, attached.")

/**
* May be derive partition path from incoming df if not explicitly set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package org.apache.hudi
import org.apache.hudi.common.data.HoodieData

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel._

Expand Down Expand Up @@ -63,4 +65,11 @@ object HoodieCatalystUtils extends SparkAdapterSupport {
data.unpersist()
}
}

def toUnresolved(e: Expression): Expression = {
e transform {
case AttributeReference(name, _, _, _) =>
UnresolvedAttribute(name)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,27 @@

package org.apache.hudi

import org.apache.hudi.DataSourceWriteOptions.{INSERT_DROP_DUPS, PAYLOAD_CLASS_NAME, PRECOMBINE_FIELD}
import org.apache.hudi.DataSourceWriteOptions.{INSERT_DROP_DUPS, PRECOMBINE_FIELD}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model._
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils, SparkKeyGeneratorInterface}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils, SparkKeyGeneratorInterface}

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.TaskContext
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, HoodieInternalRowUtils}
import org.apache.spark.sql.HoodieInternalRowUtils.getCachedUnsafeRowWriter
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, HoodieInternalRowUtils}
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -225,12 +226,13 @@ object HoodieCreateRecordUtils {
else {
None
}
val recordLocation: Option[HoodieRecordLocation] = if (instantTime.isDefined && fileName.isDefined) {
val fileId = FSUtils.getFileId(fileName.get)
Some(new HoodieRecordLocation(instantTime.get, fileId))
val recordPosition: Option[Long] = if (fetchRecordLocationFromMetaFields) {
Option(avroRec.get(ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)).map(_.asInstanceOf[Long])
} else {
None
}
val recordLocation: Option[HoodieRecordLocation] =
createHoodieRecordLocation(instantTime, fileName, recordPosition)
(hoodieKey, recordLocation)
}

Expand Down Expand Up @@ -264,13 +266,27 @@ object HoodieCreateRecordUtils {
None
}

val recordLocation: Option[HoodieRecordLocation] = if (instantTime.isDefined && fileName.isDefined) {
val fileId = FSUtils.getFileId(fileName.get)
Some(new HoodieRecordLocation(instantTime.get, fileId))
val recordPosition: Option[Long] = if (fetchRecordLocationFromMetaFields) {
// TODO(yihua): fix
Option(sourceRow.getLong(6))
} else {
None
}

val recordLocation: Option[HoodieRecordLocation] =
createHoodieRecordLocation(instantTime, fileName, recordPosition)

(new HoodieKey(recordKey, partitionPath), recordLocation)
}

private def createHoodieRecordLocation(instantTime: Option[String],
fileName: Option[String],
recordPosition: Option[Long]) = {
if (instantTime.isDefined && fileName.isDefined && recordPosition.isDefined) {
val fileId = FSUtils.getFileId(fileName.get)
Some(new HoodieRecordLocation(instantTime.get, fileId, recordPosition.get))
} else {
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,25 @@
package org.apache.spark.sql.hudi.command

import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES, SPARK_SQL_WRITES_PREPPED_KEY}
import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.HoodieCatalystUtils.toUnresolved

import org.apache.spark.sql.HoodieCatalystExpressionUtils.attributeEquals
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.spark.sql._
import org.apache.spark.sql.HoodieCatalystExpressionUtils.attributeEquals
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, Project, UpdateTable}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, UpdateTable}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.types.LongType

import scala.jdk.CollectionConverters.{mapAsJavaMapConverter, seqAsJavaListConverter}

case class UpdateHoodieTableCommand(ut: UpdateTable) extends HoodieLeafRunnableCommand
with SparkAdapterSupport with ProvidesHoodieConfig {
Expand All @@ -45,11 +54,16 @@ case class UpdateHoodieTableCommand(ut: UpdateTable) extends HoodieLeafRunnableC
case Assignment(attr: AttributeReference, value) => attr -> value
}

val rowIndexAttr = AttributeReference(
ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType, nullable = true)()

val attributeSeq = ut.table.output

val filteredOutput = if (sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
, SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
ut.table.output
attributeSeq
} else {
removeMetaFields(ut.table.output)
removeMetaFields(attributeSeq)
}

val targetExprs = filteredOutput.map { targetAttr =>
Expand All @@ -62,7 +76,22 @@ case class UpdateHoodieTableCommand(ut: UpdateTable) extends HoodieLeafRunnableC
}

val condition = ut.condition.getOrElse(TrueLiteral)
val filteredPlan = Filter(condition, Project(targetExprs, ut.table))

val x = convertToAvroSchema(catalogTable.tableSchema, catalogTable.tableName)
val schema = AvroSchemaUtils.projectSchema(
convertToAvroSchema(catalogTable.tableSchema, catalogTable.tableName),
attributeSeq.map(e => e.name).asJava,
new Schema.Field(
ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME,
Schema.createUnion(
Schema.create(Schema.Type.NULL), SchemaBuilder.builder().longType())
))
val options = Map(
"hoodie.datasource.query.type" -> "snapshot",
"path" -> catalogTable.metaClient.getBasePath.toString)
val relation = sparkAdapter.createRelation(
sparkSession.sqlContext, catalogTable.metaClient, schema, Array.empty, options.asJava)
val filteredPlan = Filter(toUnresolved(condition), LogicalRelation(relation))

val config = if (sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
, SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient

import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.junit.jupiter.api.Assertions.assertEquals

class TestUpdateTable extends HoodieSparkSqlTestBase {

test("Test Update Table") {
withRecordType()(withTempDir { tmp =>
Seq(true, false).foreach { sparkSqlOptimizedWrites =>
Seq("cow", "mor").foreach { tableType =>
Seq(true).foreach { sparkSqlOptimizedWrites =>
Seq("mor").foreach { tableType =>
val tableName = generateTableName
// create table
spark.sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
package org.apache.spark.sql.execution.datasources.parquet

import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.DataSourceWriteOptions.SPARK_READ_ATTACH_ROW_POSITION

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
Expand All @@ -36,7 +37,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileFormat, PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{LongType, StructType}

class Spark35ParquetReader(enableVectorizedReader: Boolean,
datetimeRebaseModeInRead: String,
Expand All @@ -53,7 +54,8 @@ class Spark35ParquetReader(enableVectorizedReader: Boolean,
capacity: Int,
returningBatch: Boolean,
enableRecordFilter: Boolean,
timeZoneId: Option[String]) extends SparkParquetReaderBase(
timeZoneId: Option[String],
attachRowPosition: Boolean) extends SparkParquetReaderBase(
enableVectorizedReader = enableVectorizedReader,
enableParquetFilterPushDown = enableParquetFilterPushDown,
pushDownDate = pushDownDate,
Expand Down Expand Up @@ -91,6 +93,7 @@ class Spark35ParquetReader(enableVectorizedReader: Boolean,
val filePath = file.toPath
val split = new FileSplit(filePath, file.start, file.length, Array.empty[String])

requiredSchema.add(ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType, nullable = true)
val schemaEvolutionUtils = new Spark3ParquetSchemaEvolutionUtils(sharedConf, filePath, requiredSchema,
partitionSchema, internalSchemaOpt)

Expand Down Expand Up @@ -285,6 +288,9 @@ object Spark35ParquetReader extends SparkParquetReaderBuilder {
capacity = sqlConf.parquetVectorizedReaderBatchSize,
returningBatch = returningBatch,
enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
timeZoneId = Some(sqlConf.sessionLocalTimeZone))
timeZoneId = Some(sqlConf.sessionLocalTimeZone),
attachRowPosition = sqlConf.getConfString(
SPARK_READ_ATTACH_ROW_POSITION.key,
SPARK_READ_ATTACH_ROW_POSITION.defaultValue.toString).toBoolean)
}
}
Loading