Skip to content

Commit

Permalink
save progress
Browse files Browse the repository at this point in the history
  • Loading branch information
Davis-Zhang-Onehouse committed Jan 9, 2025
1 parent 3040aa5 commit e720dcf
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import java.util.Properties;
import java.util.stream.Collectors;

import static org.apache.hudi.common.config.HoodieMemoryConfig.DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
import static org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY;
import static org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
Expand Down Expand Up @@ -529,6 +530,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Port at which the timeline server listens for requests. When running embedded in each writer, it picks "
+ "a free port and communicates to all the executors. This should rarely be changed.");

public static final ConfigProperty<Long> CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES = ConfigProperty
.key("hoodie.cdc.file.group.iterator.memory.spill.bytes")
.defaultValue(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
.markAdvanced()
.withDocumentation("Amount of memory in bytes to be used in bytes for CDCFileGroupIterator holding data in-memory, before spilling to disk.");

public static final ConfigProperty<String> EMBEDDED_TIMELINE_NUM_SERVER_THREADS = ConfigProperty
.key("hoodie.embed.timeline.server.threads")
.defaultValue("-1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hudi.HoodieBaseRelation.BaseFileReader
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.config.{HoodieMemoryConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger, HoodieLogFile, HoodieRecord, HoodieRecordMerger, HoodieRecordPayload}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils}
Expand All @@ -36,23 +36,29 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodiePayloadConfig
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.storage.{StorageConfiguration, StoragePath}

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE

import org.apache.hudi.common.util.collection.ExternalSpillableMap.DiskMapType
import org.apache.hudi.common.util.{DefaultSizeEstimator, FileIOUtils, Option}
import org.apache.hudi.common.util.collection.ExternalSpillableMap
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.avro.HoodieAvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Projection
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED

import java.io.Closeable
import java.util.Properties
import java.io.{Closeable, Serializable}
import java.util
import java.util.{Locale, Map, Properties}
import java.util.stream.Collectors

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -192,11 +198,26 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
*/
private var beforeImageRecords: mutable.Map[String, GenericRecord] = mutable.Map.empty

// private var beforeImageRecords: util.Map[String, GenericRecord] = new ExternalSpillableMap[String, GenericRecord](
// props.getLong(HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.key(), HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.defaultValue()),
// props.getString(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key, FileIOUtils.getDefaultSpillableMapBasePath),
// new DefaultSizeEstimator[String],
// new DefaultSizeEstimator[GenericRecord],
// ExternalSpillableMap.DiskMapType.valueOf(props.getString(SPILLABLE_DISK_MAP_TYPE.key(), SPILLABLE_DISK_MAP_TYPE.defaultValue().toString).toUpperCase(Locale.ROOT)),
// props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()));

/**
* Keep the after-image data. Only one case will use this:
* the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is [[OP_KEY_ONLY]] or [[DATA_BEFORE]].
*/
private var afterImageRecords: mutable.Map[String, InternalRow] = mutable.Map.empty
private var afterImageRecords: util.Map[String, InternalRow] = new ExternalSpillableMap[String, InternalRow](
props.getLong(HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.key(), HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.defaultValue()),
props.getString(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key, FileIOUtils.getDefaultSpillableMapBasePath),
new DefaultSizeEstimator[String],
new DefaultSizeEstimator[InternalRow],
ExternalSpillableMap.DiskMapType.valueOf(props.getString(SPILLABLE_DISK_MAP_TYPE.key(), SPILLABLE_DISK_MAP_TYPE.defaultValue().toString).toUpperCase(Locale.ROOT)),
props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()));


private var internalRowToJsonStringConverter = new InternalRowToJsonStringConverter(originTableSchema)

Expand Down Expand Up @@ -273,9 +294,9 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
recordToLoad.update(2, recordToJsonAsUTF8String(before))
parse(op) match {
case INSERT =>
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey)))
case UPDATE =>
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey)))
case _ =>
recordToLoad.update(3, null)
}
Expand All @@ -287,10 +308,10 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
parse(op) match {
case INSERT =>
recordToLoad.update(2, null)
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey)))
case UPDATE =>
recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey)))
case _ =>
recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
recordToLoad.update(3, null)
Expand Down Expand Up @@ -398,7 +419,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
// load afterFileSlice to afterImageRecords
if (currentCDCFileSplit.getAfterFileSlice.isPresent) {
val iter = loadFileSlice(currentCDCFileSplit.getAfterFileSlice.get())
afterImageRecords = mutable.Map.empty
afterImageRecords.clear()
iter.foreach { row =>
val key = getRecordKey(row)
afterImageRecords.put(key, row.copy())
Expand Down

0 comments on commit e720dcf

Please sign in to comment.