Skip to content

Commit

Permalink
Short circuit page source in case of partition mismatch
Browse files Browse the repository at this point in the history
In case that the dynamic filter completes after scheduling of split
on the worker, the results in the split will be getting pruned in
the situation that there is a partition predicate mismatch.
  • Loading branch information
findinpath authored and raunaqmorarka committed Dec 30, 2023
1 parent 31e74bc commit 89105bd
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.base.Suppliers;
import com.google.common.base.VerifyException;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.BiMap;
Expand Down Expand Up @@ -133,6 +132,7 @@
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Suppliers.memoize;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand Down Expand Up @@ -170,9 +170,11 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.isUseFileSizeFromMetadata;
import static io.trino.plugin.iceberg.IcebergSessionProperties.useParquetBloomFilter;
import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD;
import static io.trino.plugin.iceberg.IcebergSplitSource.partitionMatchesPredicate;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionValues;
import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles;
import static io.trino.plugin.iceberg.delete.EqualityDeleteFilter.readEqualityDeletes;
import static io.trino.plugin.iceberg.delete.PositionDeleteFilter.readPositionDeletes;
Expand Down Expand Up @@ -332,8 +334,11 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) {
}
});

TupleDomain<IcebergColumnHandle> effectivePredicate = unenforcedPredicate
.intersect(dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast))
TupleDomain<IcebergColumnHandle> effectivePredicate = getEffectivePredicate(
tableSchema,
partitionKeys,
dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast),
unenforcedPredicate)
.simplify(ICEBERG_DOMAIN_COMPACTION_THRESHOLD);
if (effectivePredicate.isNone()) {
return new EmptyPageSource();
Expand Down Expand Up @@ -386,7 +391,7 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) {
.map(readerColumns -> readerColumns.get().stream().map(IcebergColumnHandle.class::cast).collect(toList()))
.orElse(requiredColumns);

Supplier<Optional<RowPredicate>> deletePredicate = Suppliers.memoize(() -> {
Supplier<Optional<RowPredicate>> deletePredicate = memoize(() -> {
List<DeleteFilter> deleteFilters = readDeletes(
session,
tableSchema,
Expand All @@ -408,6 +413,26 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) {
deletePredicate);
}

private TupleDomain<IcebergColumnHandle> getEffectivePredicate(
Schema tableSchema,
Map<Integer, Optional<String>> partitionKeys,
TupleDomain<IcebergColumnHandle> dynamicFilterPredicate,
TupleDomain<IcebergColumnHandle> unenforcedPredicate)
{
if (dynamicFilterPredicate.isAll() || dynamicFilterPredicate.isNone() || partitionKeys.isEmpty()) {
return unenforcedPredicate.intersect(dynamicFilterPredicate);
}
Set<IcebergColumnHandle> partitionColumns = partitionKeys.keySet().stream()
.map(fieldId -> getColumnHandle(tableSchema.findField(fieldId), typeManager))
.collect(toImmutableSet());
Supplier<Map<ColumnHandle, NullableValue>> partitionValues = memoize(() -> getPartitionValues(partitionColumns, partitionKeys));
if (!partitionMatchesPredicate(partitionColumns, partitionValues, dynamicFilterPredicate)) {
return TupleDomain.none();
}

return unenforcedPredicate.intersect(dynamicFilterPredicate);
}

private Set<IcebergColumnHandle> requiredColumnsForDeletes(Schema schema, List<DeleteFile> deletes)
{
ImmutableSet.Builder<IcebergColumnHandle> requiredColumns = ImmutableSet.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.trino.operator.OperatorStats;
import io.trino.plugin.hive.TestingHivePlugin;
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.server.DynamicFilterService;
import io.trino.spi.QueryId;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.Constraint;
Expand Down Expand Up @@ -101,6 +102,7 @@
import static com.google.common.collect.MoreCollectors.onlyElement;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static io.trino.SystemSessionProperties.DETERMINE_PARTITION_COUNT_FOR_WRITE_ENABLED;
import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING;
import static io.trino.SystemSessionProperties.SCALE_WRITERS;
import static io.trino.SystemSessionProperties.TASK_MAX_WRITER_COUNT;
import static io.trino.SystemSessionProperties.TASK_MIN_WRITER_COUNT;
Expand Down Expand Up @@ -7353,6 +7355,65 @@ public void testUuidDynamicFilter()
}
}

@Test
public void testDynamicFilterWithExplicitPartitionFilter()
{
String catalog = getSession().getCatalog().orElseThrow();
try (TestTable salesTable = new TestTable(getQueryRunner()::execute, "sales_table", "(date date, receipt_id varchar, amount decimal(10,2)) with (partitioning=array['date'])");
TestTable dimensionTable = new TestTable(getQueryRunner()::execute, "dimension_table", "(date date, following_holiday boolean, year int)")) {
assertUpdate("""
INSERT INTO %s
VALUES
(DATE '2023-01-01' , false, 2023),
(DATE '2023-01-02' , true, 2023),
(DATE '2023-01-03' , false, 2023)""".formatted(dimensionTable.getName()), 3);
assertUpdate("""
INSERT INTO %s
VALUES
(DATE '2023-01-02' , '#2023#1', DECIMAL '122.12'),
(DATE '2023-01-02' , '#2023#2', DECIMAL '124.12'),
(DATE '2023-01-02' , '#2023#3', DECIMAL '99.99'),
(DATE '2023-01-02' , '#2023#4', DECIMAL '95.12'),
(DATE '2023-01-03' , '#2023#5', DECIMAL '199.12'),
(DATE '2023-01-04' , '#2023#6', DECIMAL '99.55'),
(DATE '2023-01-05' , '#2023#7', DECIMAL '50.11'),
(DATE '2023-01-05' , '#2023#8', DECIMAL '60.20'),
(DATE '2023-01-05' , '#2023#9', DECIMAL '70.75'),
(DATE '2023-01-05' , '#2023#10', DECIMAL '80.12')""".formatted(salesTable.getName()), 10);

String selectQuery = """
SELECT receipt_id
FROM %s s
JOIN %s d
ON s.date = d.date
WHERE
d.following_holiday = true AND
d.date BETWEEN DATE '2023-01-01' AND DATE '2024-01-01'""".formatted(salesTable.getName(), dimensionTable.getName());
MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId(
Session.builder(getSession())
.setCatalogSessionProperty(catalog, DYNAMIC_FILTERING_WAIT_TIMEOUT, "10s")
.build(),
selectQuery);
MaterializedResult expected = computeActual(
Session.builder(getSession())
.setSystemProperty(ENABLE_DYNAMIC_FILTERING, "false")
.build(),
selectQuery);
assertEqualsIgnoreOrder(result.getResult(), expected);

DynamicFilterService.DynamicFiltersStats dynamicFiltersStats = getDistributedQueryRunner().getCoordinator()
.getQueryManager()
.getFullQueryInfo(result.getQueryId())
.getQueryStats()
.getDynamicFiltersStats();
// The dynamic filter reduces the range specified for the partition column `date` from `date :: [[2023-01-01, 2024-01-01]]` to `date :: {[2023-01-02]}`
assertThat(dynamicFiltersStats.getTotalDynamicFilters()).isEqualTo(1L);
assertThat(dynamicFiltersStats.getLazyDynamicFilters()).isEqualTo(1L);
assertThat(dynamicFiltersStats.getReplicatedDynamicFilters()).isEqualTo(0L);
assertThat(dynamicFiltersStats.getDynamicFiltersCompleted()).isEqualTo(1L);
}
}

@Override
protected void verifyTableNameLengthFailurePermissible(Throwable e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.SqlDecimal;
import io.trino.spi.type.Type;
import io.trino.testing.TestingConnectorSession;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -53,7 +57,9 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.file.Files;
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -67,6 +73,8 @@
import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.PRIMITIVE;
import static io.trino.plugin.iceberg.IcebergFileFormat.ORC;
import static io.trino.plugin.iceberg.util.OrcTypeConverter.toOrcType;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.Decimals.writeShortDecimal;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.testing.TestingHandles.TEST_CATALOG_HANDLE;
Expand Down Expand Up @@ -186,6 +194,149 @@ public void testDynamicSplitPruningOnUnpartitionedTable()
}
}

@Test
public void testDynamicSplitPruningWithExplicitPartitionFilter()
throws IOException
{
String tableName = "sales_table";
String dateColumnName = "date";
ColumnIdentity dateColumnIdentity = new ColumnIdentity(1, dateColumnName, PRIMITIVE, ImmutableList.of());
IcebergColumnHandle dateColumnHandle = new IcebergColumnHandle(dateColumnIdentity, DATE, ImmutableList.of(), DATE, Optional.empty());
long dateColumnValue = LocalDate.of(2023, 1, 10).toEpochDay();
String receiptColumnName = "receipt";
ColumnIdentity receiptColumnIdentity = new ColumnIdentity(2, receiptColumnName, PRIMITIVE, ImmutableList.of());
IcebergColumnHandle receiptColumnHandle = new IcebergColumnHandle(receiptColumnIdentity, VARCHAR, ImmutableList.of(), VARCHAR, Optional.empty());
String receiptColumnValue = "#12345";
String amountColumnName = "amount";
ColumnIdentity amountColumnIdentity = new ColumnIdentity(3, amountColumnName, PRIMITIVE, ImmutableList.of());
DecimalType amountColumnType = DecimalType.createDecimalType(10, 2);
IcebergColumnHandle amountColumnHandle = new IcebergColumnHandle(amountColumnIdentity, amountColumnType, ImmutableList.of(), amountColumnType, Optional.empty());
BigDecimal amountColumnValue = new BigDecimal("1234567.65");
Schema tableSchema = new Schema(
optional(dateColumnIdentity.getId(), dateColumnName, Types.DateType.get()),
optional(receiptColumnIdentity.getId(), receiptColumnName, Types.StringType.get()),
optional(amountColumnIdentity.getId(), amountColumnName, Types.DecimalType.of(10, 2)));
PartitionSpec partitionSpec = PartitionSpec.builderFor(tableSchema)
.identity(dateColumnName)
.build();

IcebergConfig icebergConfig = new IcebergConfig();
HiveTransactionHandle transaction = new HiveTransactionHandle(false);
try (TempFile file = new TempFile()) {
Files.delete(file.path());

TrinoOutputFile outputFile = new LocalOutputFile(file.file());
TrinoInputFile inputFile = new LocalInputFile(file.file());
List<String> columnNames = ImmutableList.of(dateColumnName, receiptColumnName, amountColumnName);
List<Type> types = ImmutableList.of(DATE, VARCHAR, amountColumnType);

try (OrcWriter writer = new OrcWriter(
OutputStreamOrcDataSink.create(outputFile),
columnNames,
types,
toOrcType(tableSchema),
NONE,
new OrcWriterOptions(),
ImmutableMap.of(),
true,
OrcWriteValidation.OrcWriteValidationMode.BOTH,
new OrcWriterStats())) {
BlockBuilder dateBuilder = DATE.createBlockBuilder(null, 1);
DATE.writeLong(dateBuilder, dateColumnValue);
BlockBuilder receiptBuilder = VARCHAR.createBlockBuilder(null, 1);
VARCHAR.writeString(receiptBuilder, receiptColumnValue);
BlockBuilder amountBuilder = amountColumnType.createBlockBuilder(null, 1);
writeShortDecimal(amountBuilder, amountColumnValue.unscaledValue().longValueExact());
writer.write(new Page(dateBuilder.build(), receiptBuilder.build(), amountBuilder.build()));
}

IcebergSplit split = new IcebergSplit(
inputFile.toString(),
0,
inputFile.length(),
inputFile.length(),
-1, // invalid; normally known
ORC,
PartitionSpecParser.toJson(partitionSpec),
PartitionData.toJson(new PartitionData(new Object[] {dateColumnValue})),
ImmutableList.of(),
SplitWeight.standard());

String tablePath = inputFile.location().fileName();
TableHandle tableHandle = new TableHandle(
TEST_CATALOG_HANDLE,
new IcebergTableHandle(
CatalogHandle.fromId("iceberg:NORMAL:v12345"),
"test_schema",
tableName,
TableType.DATA,
Optional.empty(),
SchemaParser.toJson(tableSchema),
Optional.of(PartitionSpecParser.toJson(partitionSpec)),
2,
TupleDomain.all(),
TupleDomain.all(),
OptionalLong.empty(),
ImmutableSet.of(dateColumnHandle),
Optional.empty(),
tablePath,
ImmutableMap.of(),
false,
Optional.empty(),
ImmutableSet.of(),
Optional.of(false)),
transaction);

// Simulate situations where the dynamic filter (e.g.: while performing a JOIN with another table) reduces considerably
// the amount of data to be processed from the current table

TupleDomain<ColumnHandle> differentDatePredicate = TupleDomain.withColumnDomains(
ImmutableMap.of(
dateColumnHandle,
Domain.singleValue(DATE, LocalDate.of(2023, 2, 2).toEpochDay())));
TupleDomain<ColumnHandle> nonOverlappingDatePredicate = TupleDomain.withColumnDomains(
ImmutableMap.of(
dateColumnHandle,
Domain.create(ValueSet.ofRanges(Range.greaterThanOrEqual(DATE, LocalDate.of(2023, 2, 2).toEpochDay())), true)));
for (TupleDomain<ColumnHandle> partitionPredicate : List.of(differentDatePredicate, nonOverlappingDatePredicate)) {
try (ConnectorPageSource emptyPageSource = createTestingPageSource(
transaction,
icebergConfig,
split,
tableHandle,
ImmutableList.of(dateColumnHandle, receiptColumnHandle, amountColumnHandle),
getDynamicFilter(partitionPredicate))) {
assertThat(emptyPageSource.getNextPage()).isNull();
}
}

TupleDomain<ColumnHandle> sameDatePredicate = TupleDomain.withColumnDomains(
ImmutableMap.of(
dateColumnHandle,
Domain.singleValue(DATE, dateColumnValue)));
TupleDomain<ColumnHandle> overlappingDatePredicate = TupleDomain.withColumnDomains(
ImmutableMap.of(
dateColumnHandle,
Domain.create(ValueSet.ofRanges(Range.range(DATE, LocalDate.of(2023, 1, 1).toEpochDay(), true, LocalDate.of(2023, 2, 1).toEpochDay(), false)), true)));
for (TupleDomain<ColumnHandle> partitionPredicate : List.of(sameDatePredicate, overlappingDatePredicate)) {
try (ConnectorPageSource nonEmptyPageSource = createTestingPageSource(
transaction,
icebergConfig,
split,
tableHandle,
ImmutableList.of(dateColumnHandle, receiptColumnHandle, amountColumnHandle),
getDynamicFilter(partitionPredicate))) {
Page page = nonEmptyPageSource.getNextPage();
assertThat(page).isNotNull();
assertThat(page.getPositionCount()).isEqualTo(1);
assertThat(page.getBlock(0).getInt(0, 0)).isEqualTo(dateColumnValue);
assertThat(page.getBlock(1).getSlice(0, 0, page.getBlock(1).getSliceLength(0)).toStringUtf8()).isEqualTo(receiptColumnValue);
assertThat(((SqlDecimal) amountColumnType.getObjectValue(null, page.getBlock(2), 0)).toBigDecimal()).isEqualTo(amountColumnValue);
}
}
}
}

private static ConnectorPageSource createTestingPageSource(
HiveTransactionHandle transaction,
IcebergConfig icebergConfig,
Expand Down

0 comments on commit 89105bd

Please sign in to comment.