diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index c7e14b6b4e1b9..fb938ef6d31f0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -104,6 +104,7 @@ import java.util.Properties; import java.util.stream.Collectors; +import static org.apache.hudi.common.config.HoodieMemoryConfig.DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES; import static org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY; import static org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY; import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1; @@ -529,6 +530,12 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Port at which the timeline server listens for requests. When running embedded in each writer, it picks " + "a free port and communicates to all the executors. This should rarely be changed."); + public static final ConfigProperty CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES = ConfigProperty + .key("hoodie.cdc.file.group.iterator.memory.spill.bytes") + .defaultValue(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES) + .markAdvanced() + .withDocumentation("Amount of memory in bytes to be used in bytes for CDCFileGroupIterator holding data in-memory, before spilling to disk."); + public static final ConfigProperty EMBEDDED_TIMELINE_NUM_SERVER_THREADS = ConfigProperty .key("hoodie.embed.timeline.server.threads") .defaultValue("-1") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala index c9516ff249e3a..ed466a39600c3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala @@ -24,7 +24,7 @@ import org.apache.hudi.HoodieBaseRelation.BaseFileReader import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport import org.apache.hudi.avro.HoodieAvroUtils -import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.config.{HoodieMemoryConfig, HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger, HoodieLogFile, HoodieRecord, HoodieRecordMerger, HoodieRecordPayload} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils} @@ -36,11 +36,16 @@ import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.HoodiePayloadConfig import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.storage.{StorageConfiguration, StoragePath} - import org.apache.avro.Schema import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE + +import org.apache.hudi.common.util.collection.ExternalSpillableMap.DiskMapType +import org.apache.hudi.common.util.{DefaultSizeEstimator, FileIOUtils, Option} +import org.apache.hudi.common.util.collection.ExternalSpillableMap +import org.apache.hudi.config.HoodieWriteConfig import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection import org.apache.spark.sql.avro.HoodieAvroDeserializer import org.apache.spark.sql.catalyst.InternalRow @@ -48,11 +53,12 @@ import org.apache.spark.sql.catalyst.expressions.Projection import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String +import org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED -import java.io.Closeable -import java.util.Properties +import java.io.{Closeable, Serializable} +import java.util +import java.util.{Locale, Map, Properties} import java.util.stream.Collectors - import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.mutable @@ -192,11 +198,26 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit, */ private var beforeImageRecords: mutable.Map[String, GenericRecord] = mutable.Map.empty +// private var beforeImageRecords: util.Map[String, GenericRecord] = new ExternalSpillableMap[String, GenericRecord]( +// props.getLong(HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.key(), HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.defaultValue()), +// props.getString(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key, FileIOUtils.getDefaultSpillableMapBasePath), +// new DefaultSizeEstimator[String], +// new DefaultSizeEstimator[GenericRecord], +// ExternalSpillableMap.DiskMapType.valueOf(props.getString(SPILLABLE_DISK_MAP_TYPE.key(), SPILLABLE_DISK_MAP_TYPE.defaultValue().toString).toUpperCase(Locale.ROOT)), +// props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())); + /** * Keep the after-image data. Only one case will use this: * the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is [[OP_KEY_ONLY]] or [[DATA_BEFORE]]. */ - private var afterImageRecords: mutable.Map[String, InternalRow] = mutable.Map.empty + private var afterImageRecords: util.Map[String, InternalRow] = new ExternalSpillableMap[String, InternalRow]( + props.getLong(HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.key(), HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.defaultValue()), + props.getString(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key, FileIOUtils.getDefaultSpillableMapBasePath), + new DefaultSizeEstimator[String], + new DefaultSizeEstimator[InternalRow], + ExternalSpillableMap.DiskMapType.valueOf(props.getString(SPILLABLE_DISK_MAP_TYPE.key(), SPILLABLE_DISK_MAP_TYPE.defaultValue().toString).toUpperCase(Locale.ROOT)), + props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())); + private var internalRowToJsonStringConverter = new InternalRowToJsonStringConverter(originTableSchema) @@ -273,9 +294,9 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit, recordToLoad.update(2, recordToJsonAsUTF8String(before)) parse(op) match { case INSERT => - recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey))) + recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey))) case UPDATE => - recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey))) + recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey))) case _ => recordToLoad.update(3, null) } @@ -287,10 +308,10 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit, parse(op) match { case INSERT => recordToLoad.update(2, null) - recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey))) + recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey))) case UPDATE => recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey))) - recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey))) + recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey))) case _ => recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey))) recordToLoad.update(3, null) @@ -398,7 +419,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit, // load afterFileSlice to afterImageRecords if (currentCDCFileSplit.getAfterFileSlice.isPresent) { val iter = loadFileSlice(currentCDCFileSplit.getAfterFileSlice.get()) - afterImageRecords = mutable.Map.empty + afterImageRecords.clear() iter.foreach { row => val key = getRecordKey(row) afterImageRecords.put(key, row.copy())