Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Early feedback] [HUDI-8163] Refactor UnMergedLogHandler with iterators #12608

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,16 @@ private static List<HoodieRecord> getUnmergedLogFileRecords(List<String> logFile
.withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime())
.withReaderSchema(readerSchema)
.withTableMetaClient(metaClient)
.withLogRecordScannerCallback(records::add)
.build();
scanner.scan(false);
Iterator<HoodieRecord<?>> recordIterator = scanner.iterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible we return an iterator too for method getUnmergedLogFileRecords to the list add/get can be simplified.

while (recordIterator.hasNext()) {
try {
records.add(recordIterator.next
());
} catch (Exception e) {
throw new HoodieException("Error while inserting record into queue", e);
}
}
return records;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,8 @@ && compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), GREATER_THA
/**
* Iterate over the GenericRecord in the block, read the hoodie key and partition path and call subclass processors to
* handle it.
* TODO:
* 1. what is the purpose of this method? should the HoodieRecord be added to a queue and consumed by an iterator?
*/
private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the main entry for handling the log blocks, each kind of log block would trigger invocation of specific row-level handling methods.

checkState(partitionNameOverrideOpt.isPresent() || partitionPathFieldOpt.isPresent(),
Expand Down Expand Up @@ -675,6 +677,7 @@ private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, int
processDataBlock((HoodieDataBlock) lastBlock, keySpecOpt);
break;
case DELETE_BLOCK:
// TODO: same question as processDataBlock
Arrays.stream(((HoodieDeleteBlock) lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord);
break;
case CORRUPT_BLOCK:
Expand Down Expand Up @@ -797,7 +800,7 @@ public List<String> getValidBlockInstants() {
return validBlockInstants;
}

private Pair<ClosableIterator<HoodieRecord>, Schema> getRecordsIterator(
protected Pair<ClosableIterator<HoodieRecord>, Schema> getRecordsIterator(
HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws IOException {
ClosableIterator<HoodieRecord> blockRecordsIterator;
if (keySpecOpt.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,22 @@
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;

import org.apache.avro.Schema;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -42,20 +49,14 @@
*/
public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScanner {

private final LogRecordScannerCallback callback;
private final RecordDeletionCallback recordDeletionCallback;

private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean reverseReader, int bufferSize,
LogRecordScannerCallback callback, RecordDeletionCallback recordDeletionCallback,
Option<InstantRange> instantRange, InternalSchema internalSchema,
boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger,
Option<HoodieTableMetaClient> hoodieTableMetaClientOption) {
super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize, instantRange,
false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger,
hoodieTableMetaClientOption);
this.callback = callback;
this.recordDeletionCallback = recordDeletionCallback;
}

/**
Expand All @@ -76,22 +77,23 @@ public static HoodieUnMergedLogRecordScanner.Builder newBuilder() {
return new Builder();
}

// TODO: check whether we can remove this method
@Override
protected <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws Exception {
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into queue of BoundedInMemoryExecutor.
// Just call callback without merging
if (callback != null) {
callback.apply(hoodieRecord.copy());
}
// if (callback != null) {
// callback.apply(hoodieRecord.copy());
// }
}

@Override
protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
if (recordDeletionCallback != null) {
recordDeletionCallback.apply(deleteRecord.getHoodieKey());
}
// if (recordDeletionCallback != null) {
// recordDeletionCallback.apply(deleteRecord.getHoodieKey());
// }
}

/**
Expand All @@ -111,6 +113,32 @@ public interface RecordDeletionCallback {
void apply(HoodieKey deletedKey);
}

/**
* Returns an iterator over the log records.
*/
public Iterator<HoodieRecord<?>> iterator() {
List<HoodieRecord<?>> records = new ArrayList<>();
try {
scan();

while (!getCurrentInstantLogBlocks().isEmpty()) {
HoodieLogBlock lastBlock = getCurrentInstantLogBlocks().pollLast();
if (lastBlock instanceof HoodieDataBlock) {
HoodieDataBlock dataBlock = (HoodieDataBlock) lastBlock;
Pair<ClosableIterator<HoodieRecord>, Schema> recordsIteratorSchemaPair = getRecordsIterator(dataBlock, Option.empty());
try (ClosableIterator<HoodieRecord> recordIterator = recordsIteratorSchemaPair.getLeft()) {
while (recordIterator.hasNext()) {
records.add(recordIterator.next());
}
}
}
}
} catch (Exception e) {
throw new HoodieException("Error while iterating over log records", e);
}
return records.iterator();
}

/**
* Builder used to build {@code HoodieUnMergedLogRecordScanner}.
*/
Expand Down Expand Up @@ -185,16 +213,6 @@ public Builder withInstantRange(Option<InstantRange> instantRange) {
return this;
}

public Builder withLogRecordScannerCallback(LogRecordScannerCallback callback) {
this.callback = callback;
return this;
}

public Builder withRecordDeletionCallback(RecordDeletionCallback recordDeletionCallback) {
this.recordDeletionCallback = recordDeletionCallback;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recordDeletionCallback is only consumer for deleted reords from the delete avro block, we may need another iterator impl specifically for it.

return this;
}

@Override
public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) {
this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
Expand All @@ -219,7 +237,7 @@ public HoodieUnMergedLogRecordScanner build() {
ValidationUtils.checkArgument(recordMerger != null);

return new HoodieUnMergedLogRecordScanner(storage, basePath, logFilePaths, readerSchema,
latestInstantTime, reverseReader, bufferSize, callback, recordDeletionCallback, instantRange,
latestInstantTime, reverseReader, bufferSize, instantRange,
internalSchema, enableOptimizedLogBlocksScan, recordMerger, Option.ofNullable(hoodieTableMetaClient));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1053,14 +1054,21 @@ public static Set<String> getRecordKeys(List<String> logFilePaths, HoodieTableMe
.withLatestInstantTime(latestCommitTimestamp)
.withReaderSchema(writerSchemaOpt.get())
.withTableMetaClient(datasetMetaClient);
if (includeValidKeys) {
builder.withLogRecordScannerCallback(record -> allRecordKeys.add(record.getRecordKey()));
}
if (includeDeletedKeys) {
builder.withRecordDeletionCallback(deletedKey -> allRecordKeys.add(deletedKey.getRecordKey()));

Iterator<HoodieRecord<?>> recordIterator = builder.build().iterator();
while (recordIterator.hasNext()) {
try {
HoodieRecord<?> record = recordIterator.next();
if (includeValidKeys) {
allRecordKeys.add(record.getRecordKey());
}
if (includeDeletedKeys) {
allRecordKeys.add(record.getRecordKey());
}
} catch (Exception e) {
throw new HoodieException("Error while inserting record into queue", e);
}
}
HoodieUnMergedLogRecordScanner scanner = builder.build();
scanner.scan();
return allRecordKeys;
}
return Collections.emptySet();
Expand Down Expand Up @@ -1656,9 +1664,17 @@ public static List<HoodieColumnRangeMetadata<Comparable>> getLogFileColumnRangeM
.withLatestInstantTime(datasetMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime())
.withReaderSchema(writerSchemaOpt.get())
.withTableMetaClient(datasetMetaClient)
.withLogRecordScannerCallback(records::add)
.build();
scanner.scan();

Iterator<HoodieRecord<?>> recordIterator = scanner.iterator();
while (recordIterator.hasNext()) {
try {
records.add(recordIterator.next());
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is exactly the same if we collect all the records in the collection again , maybe we do not remove those two callbacks and just add a new iterator impl for the case that really need it: the Flink streaming reader.

throw new HoodieException("Error while inserting record into queue", e);
}
}

if (records.isEmpty()) {
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hudi.common.util.queue.HoodieProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.internal.schema.InternalSchema;
Expand Down Expand Up @@ -237,10 +238,16 @@ private List<HoodieProducer<HoodieRecord<?>>> getParallelProducers(
) {
List<HoodieProducer<HoodieRecord<?>>> producers = new ArrayList<>();
producers.add(new FunctionBasedQueueProducer<>(queue -> {
HoodieUnMergedLogRecordScanner scanner =
scannerBuilder.withLogRecordScannerCallback(queue::insertRecord).build();
// Scan all the delta-log files, filling in the queue
scanner.scan();
HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build();
// Use the iterator to process records
Iterator<HoodieRecord<?>> recordIterator = scanner.iterator();
while (recordIterator.hasNext()) {
try {
queue.insertRecord(recordIterator.next());
} catch (Exception e) {
throw new HoodieException("Error while inserting record into queue", e);
}
}
return null;
}));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.hadoop.realtime;

import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
Expand All @@ -27,6 +28,7 @@
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.HoodieProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.RecordReaderValueIterator;
import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
Expand Down Expand Up @@ -105,16 +107,17 @@ private List<HoodieProducer<ArrayWritable>> getParallelProducers(
) {
return Arrays.asList(
new FunctionBasedQueueProducer<>(queue -> {
HoodieUnMergedLogRecordScanner scanner =
scannerBuilder.withLogRecordScannerCallback(record -> {
// convert Hoodie log record to Hadoop AvroWritable and buffer
GenericRecord rec = (GenericRecord) record.toIndexedRecord(getReaderSchema(), payloadProps).get().getData();
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), isSupportTimestamp());
queue.insertRecord(aWritable);
})
.build();
// Scan all the delta-log files, filling in the queue
scanner.scan();
HoodieUnMergedLogRecordScanner scanner = scannerBuilder.build();
Iterator<HoodieRecord<?>> recordIterator = scanner.iterator();
while (recordIterator.hasNext()) {
try {
GenericRecord rec = (GenericRecord) recordIterator.next().toIndexedRecord(getReaderSchema(), payloadProps).get().getData();
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), isSupportTimestamp());
queue.insertRecord(aWritable);
} catch (Exception e) {
throw new HoodieException("Error while inserting record into queue", e);
}
}
return null;
}),
new IteratorBasedQueueProducer<>(parquetRecordsIterator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -59,12 +60,21 @@ public static Option<Row> getLogFileColumnRangeMetadata(String filePath, HoodieT
.withBufferSize(maxBufferSize)
.withLatestInstantTime(latestCommitTime)
.withReaderSchema(writerSchemaOpt.get())
.withLogRecordScannerCallback(records::add)
.build();
scanner.scan();

Iterator<HoodieRecord<?>> recordIterator = scanner.iterator();
while (recordIterator.hasNext()) {
try {
records.add(recordIterator.next());
} catch (Exception e) {
throw new HoodieException("Error while inserting record into queue", e);
}
}

if (records.isEmpty()) {
return Option.empty();
}

Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataMap =
collectColumnRangeMetadata(records, fieldsToIndex, filePath, writerSchemaOpt.get());
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new ArrayList<>(columnRangeMetadataMap.values());
Expand Down