Skip to content

Commit

Permalink
Increase weight of splits with delete files in iceberg
Browse files Browse the repository at this point in the history
Splits with deletes require more work to process than a
comparable split without delete. This should be accounted
in iceberg split weight
  • Loading branch information
raunaqmorarka committed Aug 18, 2024
1 parent b512dfe commit 5286f64
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.trino.spi.type.TypeManager;
import jakarta.annotation.Nullable;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpecParser;
Expand Down Expand Up @@ -100,6 +101,8 @@
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.iceberg.FileContent.EQUALITY_DELETES;
import static org.apache.iceberg.FileContent.POSITION_DELETES;
import static org.apache.iceberg.types.Conversions.fromByteBuffer;

public class IcebergSplitSource
Expand Down Expand Up @@ -521,10 +524,28 @@ private IcebergSplit toIcebergSplit(FileScanTask task, TupleDomain<IcebergColumn
task.deletes().stream()
.map(DeleteFile::fromIceberg)
.collect(toImmutableList()),
SplitWeight.fromProportion(clamp((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight, 1.0)),
SplitWeight.fromProportion(clamp(getSplitWeight(task), minimumAssignedSplitWeight, 1.0)),
fileStatisticsDomain,
fileIoProperties,
cachingHostAddressProvider.getHosts(task.file().path().toString(), ImmutableList.of()),
task.file().dataSequenceNumber());
}

private double getSplitWeight(FileScanTask task)
{
double dataWeight = (double) task.length() / tableScan.targetSplitSize();
double weight = dataWeight;
if (task.deletes().stream().anyMatch(deleteFile -> deleteFile.content() == POSITION_DELETES)) {
// Presence of each data position is looked up in a combined bitmap of deleted positions
weight += dataWeight;
}

long equalityDeletes = task.deletes().stream()
.filter(deleteFile -> deleteFile.content() == EQUALITY_DELETES)
.mapToLong(ContentFile::recordCount)
.sum();
// Every row is a separate equality predicate that must be applied to all data rows
weight += equalityDeletes * dataWeight;
return weight;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog;
import io.trino.plugin.iceberg.catalog.rest.DefaultIcebergFileSystemFactory;
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.spi.SplitWeight;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ColumnHandle;
Expand All @@ -46,9 +48,18 @@
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorSession;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand All @@ -57,6 +68,7 @@
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -66,6 +78,7 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import static com.google.common.io.MoreFiles.deleteRecursively;
Expand All @@ -74,6 +87,7 @@
import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static io.trino.plugin.iceberg.IcebergSplitSource.createFileStatisticsDomain;
import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory;
import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTable;
import static io.trino.spi.connector.Constraint.alwaysTrue;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.tpch.TpchTable.NATION;
Expand Down Expand Up @@ -372,6 +386,56 @@ public void testNullStatisticsMaps()
.isEqualTo(TupleDomain.withColumnDomains(ImmutableMap.of(bigintColumn, Domain.notNull(BIGINT))));
}

@Test
public void testSplitWeight()
throws Exception
{
SchemaTableName schemaTableName = new SchemaTableName("tpch", "nation");
Table nationTable = catalog.loadTable(SESSION, schemaTableName);
// Decrease target split size so that changes in split weight are significant enough to be detected
nationTable.updateProperties()
.set(TableProperties.SPLIT_SIZE, "10000")
.commit();
IcebergTableHandle tableHandle = createTableHandle(schemaTableName, nationTable, TupleDomain.all());

IcebergSplit split = generateSplit(nationTable, tableHandle, DynamicFilter.EMPTY);
SplitWeight weightWithoutDelete = split.getSplitWeight();

String dataFilePath = (String) computeActual("SELECT file_path FROM \"" + schemaTableName.getTableName() + "$files\" LIMIT 1").getOnlyValue();

// Write position delete file
FileIO fileIo = new ForwardingFileIo(fileSystemFactory.create(SESSION));
PositionDeleteWriter<Record> writer = Parquet.writeDeletes(fileIo.newOutputFile("local:///delete_file_" + UUID.randomUUID()))
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(nationTable)
.overwrite()
.rowSchema(nationTable.schema())
.withSpec(PartitionSpec.unpartitioned())
.buildPositionWriter();
PositionDelete<Record> positionDelete = PositionDelete.create();
PositionDelete<Record> record = positionDelete.set(dataFilePath, 0, GenericRecord.create(nationTable.schema()));
try (Closeable ignored = writer) {
writer.write(record);
}
nationTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit();

split = generateSplit(nationTable, tableHandle, DynamicFilter.EMPTY);
SplitWeight splitWeightWithPositionDelete = split.getSplitWeight();
assertThat(splitWeightWithPositionDelete.getRawValue()).isGreaterThan(weightWithoutDelete.getRawValue());

// Write equality delete file
writeEqualityDeleteForTable(
nationTable,
fileSystemFactory,
Optional.of(nationTable.spec()),
Optional.of(new PartitionData(new Long[] {1L})),
ImmutableMap.of("regionkey", 1L),
Optional.empty());

split = generateSplit(nationTable, tableHandle, DynamicFilter.EMPTY);
assertThat(split.getSplitWeight().getRawValue()).isGreaterThan(splitWeightWithPositionDelete.getRawValue());
}

private IcebergSplit generateSplit(Table nationTable, IcebergTableHandle tableHandle, DynamicFilter dynamicFilter)
throws Exception
{
Expand All @@ -387,7 +451,7 @@ private IcebergSplit generateSplit(Table nationTable, IcebergTableHandle tableHa
alwaysTrue(),
new TestingTypeManager(),
false,
new IcebergConfig().getMinimumAssignedSplitWeight(),
0,
new DefaultCachingHostAddressProvider())) {
ImmutableList.Builder<IcebergSplit> builder = ImmutableList.builder();
while (!splitSource.isFinished()) {
Expand Down

0 comments on commit 5286f64

Please sign in to comment.