diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 21511aecdaac..7a9eac782690 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -42,6 +42,7 @@ import io.trino.spi.type.TypeManager; import jakarta.annotation.Nullable; import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpecParser; @@ -100,6 +101,8 @@ import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.iceberg.FileContent.EQUALITY_DELETES; +import static org.apache.iceberg.FileContent.POSITION_DELETES; import static org.apache.iceberg.types.Conversions.fromByteBuffer; public class IcebergSplitSource @@ -521,10 +524,28 @@ private IcebergSplit toIcebergSplit(FileScanTask task, TupleDomain deleteFile.content() == POSITION_DELETES)) { + // Presence of each data position is looked up in a combined bitmap of deleted positions + weight += dataWeight; + } + + long equalityDeletes = task.deletes().stream() + .filter(deleteFile -> deleteFile.content() == EQUALITY_DELETES) + .mapToLong(ContentFile::recordCount) + .sum(); + // Every row is a separate equality predicate that must be applied to all data rows + weight += equalityDeletes * dataWeight; + return weight; + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index 6e6c8be146a6..de96d8d8bd94 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -31,6 +31,8 @@ import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; import io.trino.plugin.iceberg.catalog.rest.DefaultIcebergFileSystemFactory; +import io.trino.plugin.iceberg.fileio.ForwardingFileIo; +import io.trino.spi.SplitWeight; import io.trino.spi.catalog.CatalogName; import io.trino.spi.connector.CatalogHandle; import io.trino.spi.connector.ColumnHandle; @@ -46,9 +48,18 @@ import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; import io.trino.testing.TestingConnectorSession; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -57,6 +68,7 @@ import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -66,6 +78,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import static com.google.common.io.MoreFiles.deleteRecursively; @@ -74,6 +87,7 @@ import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; import static io.trino.plugin.iceberg.IcebergSplitSource.createFileStatisticsDomain; import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; +import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTable; import static io.trino.spi.connector.Constraint.alwaysTrue; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.tpch.TpchTable.NATION; @@ -372,6 +386,56 @@ public void testNullStatisticsMaps() .isEqualTo(TupleDomain.withColumnDomains(ImmutableMap.of(bigintColumn, Domain.notNull(BIGINT)))); } + @Test + public void testSplitWeight() + throws Exception + { + SchemaTableName schemaTableName = new SchemaTableName("tpch", "nation"); + Table nationTable = catalog.loadTable(SESSION, schemaTableName); + // Decrease target split size so that changes in split weight are significant enough to be detected + nationTable.updateProperties() + .set(TableProperties.SPLIT_SIZE, "10000") + .commit(); + IcebergTableHandle tableHandle = createTableHandle(schemaTableName, nationTable, TupleDomain.all()); + + IcebergSplit split = generateSplit(nationTable, tableHandle, DynamicFilter.EMPTY); + SplitWeight weightWithoutDelete = split.getSplitWeight(); + + String dataFilePath = (String) computeActual("SELECT file_path FROM \"" + schemaTableName.getTableName() + "$files\" LIMIT 1").getOnlyValue(); + + // Write position delete file + FileIO fileIo = new ForwardingFileIo(fileSystemFactory.create(SESSION)); + PositionDeleteWriter writer = Parquet.writeDeletes(fileIo.newOutputFile("local:///delete_file_" + UUID.randomUUID())) + .createWriterFunc(GenericParquetWriter::buildWriter) + .forTable(nationTable) + .overwrite() + .rowSchema(nationTable.schema()) + .withSpec(PartitionSpec.unpartitioned()) + .buildPositionWriter(); + PositionDelete positionDelete = PositionDelete.create(); + PositionDelete record = positionDelete.set(dataFilePath, 0, GenericRecord.create(nationTable.schema())); + try (Closeable ignored = writer) { + writer.write(record); + } + nationTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit(); + + split = generateSplit(nationTable, tableHandle, DynamicFilter.EMPTY); + SplitWeight splitWeightWithPositionDelete = split.getSplitWeight(); + assertThat(splitWeightWithPositionDelete.getRawValue()).isGreaterThan(weightWithoutDelete.getRawValue()); + + // Write equality delete file + writeEqualityDeleteForTable( + nationTable, + fileSystemFactory, + Optional.of(nationTable.spec()), + Optional.of(new PartitionData(new Long[] {1L})), + ImmutableMap.of("regionkey", 1L), + Optional.empty()); + + split = generateSplit(nationTable, tableHandle, DynamicFilter.EMPTY); + assertThat(split.getSplitWeight().getRawValue()).isGreaterThan(splitWeightWithPositionDelete.getRawValue()); + } + private IcebergSplit generateSplit(Table nationTable, IcebergTableHandle tableHandle, DynamicFilter dynamicFilter) throws Exception { @@ -387,7 +451,7 @@ private IcebergSplit generateSplit(Table nationTable, IcebergTableHandle tableHa alwaysTrue(), new TestingTypeManager(), false, - new IcebergConfig().getMinimumAssignedSplitWeight(), + 0, new DefaultCachingHostAddressProvider())) { ImmutableList.Builder builder = ImmutableList.builder(); while (!splitSource.isFinished()) {