Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8766][DNM] Enabling cols stats by default with writer #12595

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
Expand Down Expand Up @@ -80,6 +81,7 @@ public class HoodieMetadataWriteUtils {
* @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
* @param failedWritesCleaningPolicy Cleaning policy on failed writes
*/
@VisibleForTesting
public static HoodieWriteConfig createMetadataWriteConfig(
HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) {
String tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
Expand Down Expand Up @@ -473,6 +474,7 @@ public void testMetadataTableServices() throws Exception {
.enable(true)
.enableMetrics(false)
.withMaxNumDeltaCommitsBeforeCompaction(3) // after 3 delta commits for regular writer operations, compaction should kick in.
.withMetadataIndexColumnStats(false) // HUDI-8774
.build()).build();
initWriteConfigAndMetatableWriter(writeConfig, true);

Expand Down Expand Up @@ -572,6 +574,7 @@ public void testMetadataTableCompactionWithPendingInstants() throws Exception {
.enable(true)
.enableMetrics(false)
.withMaxNumDeltaCommitsBeforeCompaction(4)
.withMetadataIndexColumnStats(false) // HUDI-8774
.build()).build();
initWriteConfigAndMetatableWriter(writeConfig, true);
doWriteOperation(testTable, metaClient.createNewInstantTime(), INSERT);
Expand Down Expand Up @@ -1681,6 +1684,7 @@ public void testMetadataMultiWriter() throws Exception {
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
.withEngineType(EngineType.JAVA)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
Expand Down Expand Up @@ -105,6 +106,7 @@ public void init(HoodieTableType tableType, Option<HoodieWriteConfig> writeConfi
? writeConfig.get() : getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true,
enableMetadataTable, enableMetrics, true,
validateMetadataPayloadStateConsistency)
.withEngineType(EngineType.JAVA)
.build();
initWriteConfigAndMetatableWriter(this.writeConfig, enableMetadataTable);
}
Expand Down Expand Up @@ -306,6 +308,7 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesClea
.enable(useFileListingMetadata)
.enableMetrics(enableMetrics)
.ignoreSpuriousDeletes(validateMetadataPayloadConsistency)
.withMetadataIndexColumnStats(false) // HUDI-8774
.build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
.withExecutorMetrics(enableMetrics).withReporterType(MetricsReporterType.INMEMORY.name()).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ public void testRollbackCommit() throws Exception {
.withRollbackUsingMarkers(false)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build()).build(); // HUDI-8815

try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context));
Expand Down Expand Up @@ -633,7 +634,8 @@ public void testAutoRollbackInflightCommit() throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).build();
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build()).build();

try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter, Option.of(context));
Expand Down Expand Up @@ -668,7 +670,8 @@ public void testAutoRollbackInflightCommit() throws Exception {
// Set Failed Writes rollback to EAGER
config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build()).build();
final String commitTime5 = "20160506030631";
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
client.startCommitWithTime(commitTime5);
Expand Down Expand Up @@ -824,7 +827,8 @@ public void testFallbackToListingBasedRollbackForCompletedInstant() throws Excep
.withRollbackUsingMarkers(true) // rollback using markers to test fallback to listing based rollback for completed instant
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build()).build();

// create test table with all commits completed
try (HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(metaClient.getStorageConf(), config, context)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsMetadataTable).build())
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsMetadataTable)
.withMetadataIndexColumnStats(false).build())
// test uses test table infra. So, col stats is not available/populated.
.withWriteConcurrencyMode(writeConcurrencyMode)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
.build())
Expand Down Expand Up @@ -343,7 +345,7 @@ public void testArchivalWithAutoAdjustmentBasedOnCleanConfigs(String cleaningPol
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(5).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(5).withMetadataIndexColumnStats(false).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.withCleanerPolicy(HoodieCleaningPolicy.valueOf(cleaningPolicy))
Expand Down Expand Up @@ -1652,6 +1654,7 @@ public void testArchivalAndCompactionInMetadataTable() throws Exception {
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.withMaxNumDeltaCommitsBeforeCompaction(8)
.withMetadataIndexColumnStats(false) // test uses test table infra. So, col stats is not available/populated.
.build())
.forTable("test-trip-table").build();
initWriteConfigAndMetatableWriter(writeConfig, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieRecord;
Expand Down Expand Up @@ -360,8 +361,11 @@ public void testRollbackBackup() throws Exception {
.withBaseFilesInPartition(p1, "id21").getLeft()
.withBaseFilesInPartition(p2, "id22").getLeft();

// we are using test table infra. So, col stats are not populated.
HoodieTable table =
this.getHoodieTable(metaClient, getConfigBuilder().withRollbackBackupEnabled(true).build());
this.getHoodieTable(metaClient, getConfigBuilder().withRollbackBackupEnabled(true)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build())
.build());
HoodieInstant needRollBackInstant = HoodieTestUtils.getCompleteInstant(
metaClient.getStorage(), metaClient.getTimelinePath(),
"002", HoodieTimeline.COMMIT_ACTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public void testKeepLatestFileVersions() throws Exception {
public void testKeepLatestFileVersionsWithBootstrapFileClean() throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanBootstrapBaseFileEnabled(true)
.withCleanerParallelism(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.metadata.HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal;

/**
Expand All @@ -116,6 +117,7 @@
public class HoodieAvroUtils {

public static final String AVRO_VERSION = Schema.class.getPackage().getImplementationVersion();

private static final ThreadLocal<BinaryEncoder> BINARY_ENCODER = ThreadLocal.withInitial(() -> null);
private static final ThreadLocal<BinaryDecoder> BINARY_DECODER = ThreadLocal.withInitial(() -> null);

Expand Down Expand Up @@ -1447,17 +1449,30 @@ public static Object wrapValueIntoAvro(Comparable<?> value) {
}
}

public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper) {
return unwrapAvroValueWrapper(avroValueWrapper, false, Option.empty(), Option.empty());
}

/**
* Unwraps Avro value wrapper into Java value.
*
* @param avroValueWrapper A wrapped value with Avro type wrapper.
* @return Java value.
*/
public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper) {
public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper, boolean handleObfuscatedFlow, Option<String> fieldName, Option<GenericRecord> record) {
if (avroValueWrapper == null) {
return null;
} else if (avroValueWrapper instanceof DateWrapper) {
return LocalDate.ofEpochDay(((DateWrapper) avroValueWrapper).getValue());
}

if (handleObfuscatedFlow) {
Pair<Boolean, String> isValueWrapperObfuscated = getIsValueWrapperObfuscated(record.get(), fieldName.get());
if (isValueWrapperObfuscated.getKey()) {
return unwrapAvroValueWrapper(avroValueWrapper, isValueWrapperObfuscated.getValue());
}
}

if (avroValueWrapper instanceof DateWrapper) {
return Date.valueOf(LocalDate.ofEpochDay(((DateWrapper) avroValueWrapper).getValue()));
} else if (avroValueWrapper instanceof DecimalWrapper) {
Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
return AVRO_DECIMAL_CONVERSION.fromBytes(((DecimalWrapper) avroValueWrapper).getValue(), valueSchema, valueSchema.getLogicalType());
Expand All @@ -1481,10 +1496,38 @@ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper) {
// NOTE: This branch could be hit b/c Avro records could be reconstructed
// as {@code GenericRecord)
// TODO add logical type decoding
GenericRecord record = (GenericRecord) avroValueWrapper;
return (Comparable<?>) record.get("value");
GenericRecord genRec = (GenericRecord) avroValueWrapper;
return (Comparable<?>) genRec.get("value");
} else {
throw new UnsupportedOperationException(String.format("Unsupported type of the value (%s)", avroValueWrapper.getClass()));
}
}

public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper, String wrapperClassName) {
if (avroValueWrapper == null) {
return null;
} else if (DateWrapper.class.getSimpleName().equals(wrapperClassName)) {
return Date.valueOf(LocalDate.ofEpochDay((Integer)((Record) avroValueWrapper).get(0)));
} else if (TimestampMicrosWrapper.class.getSimpleName().equals(wrapperClassName)) {
Instant instant = microsToInstant((Long)((Record) avroValueWrapper).get(0));
return Timestamp.from(instant);
} else if (DecimalWrapper.class.getSimpleName().equals(wrapperClassName)) {
Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer) ((Record) avroValueWrapper).get(0), valueSchema, valueSchema.getLogicalType());
} else {
throw new UnsupportedOperationException(String.format("Unsupported type of the value (%s)", avroValueWrapper.getClass()));
}
}

private static Pair<Boolean, String> getIsValueWrapperObfuscated(GenericRecord record, String subFieldName) {
Object statsValue = ((GenericRecord) record.get(SCHEMA_FIELD_ID_COLUMN_STATS)).get(subFieldName);
if (statsValue != null) {
boolean toReturn = ((GenericRecord) statsValue).getSchema().getName().equals(DateWrapper.class.getSimpleName())
|| ((GenericRecord) statsValue).getSchema().getName().equals(TimestampMicrosWrapper.class.getSimpleName());
if (toReturn) {
return Pair.of(true, ((GenericRecord) statsValue).getSchema().getName());
}
}
return Pair.of(false, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,8 @@ public Builder withPartitionStatsIndexParallelism(int parallelism) {

public HoodieMetadataConfig build() {
metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType));
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_COLUMN_STATS, getDefaultColStatsEnable(engineType));
// fix me: disable when schema on read is enabled.
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
return metadataConfig;
}
Expand All @@ -772,6 +774,18 @@ private boolean getDefaultMetadataEnable(EngineType engineType) {
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}

private boolean getDefaultColStatsEnable(EngineType engineType) {
switch (engineType) {
case SPARK:
return true;
case FLINK:
case JAVA:
return false; // HUDI-8814
default:
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ public static HoodieColumnRangeMetadata<Comparable> fromColumnStats(HoodieMetada
return HoodieColumnRangeMetadata.<Comparable>create(
columnStats.getFileName(),
columnStats.getColumnName(),
unwrapAvroValueWrapper(columnStats.getMinValue()),
unwrapAvroValueWrapper(columnStats.getMaxValue()),
unwrapAvroValueWrapper(columnStats.getMinValue()), // misses for special handling.
unwrapAvroValueWrapper(columnStats.getMaxValue()), // misses for special handling.
columnStats.getNullCount(),
columnStats.getValueCount(),
columnStats.getTotalSize(),
Expand Down
Loading