Skip to content

Commit

Permalink
Use the effective predicate when doing partition matching
Browse files Browse the repository at this point in the history
Use the effective predicate instead of the dynamic filter predicate
to check for partition matching.
This change results in short circuiting the page source and not
having to read anymore the data file footer in the exotic case
where a partition filter acts as unenforced
predicate due to table partition spec evolution.
  • Loading branch information
findinpath committed Jan 2, 2024
1 parent 71499d5 commit 007eda2
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -419,20 +419,20 @@ private TupleDomain<IcebergColumnHandle> getEffectivePredicate(
TupleDomain<IcebergColumnHandle> dynamicFilterPredicate,
TupleDomain<IcebergColumnHandle> unenforcedPredicate)
{
TupleDomain<IcebergColumnHandle> effectivePredicate = unenforcedPredicate.intersect(dynamicFilterPredicate);
if (dynamicFilterPredicate.isAll() || dynamicFilterPredicate.isNone() || partitionKeys.isEmpty()) {
return unenforcedPredicate.intersect(dynamicFilterPredicate);
return effectivePredicate;
}
Set<IcebergColumnHandle> partitionColumns = partitionKeys.keySet().stream()
.map(fieldId -> getColumnHandle(tableSchema.findField(fieldId), typeManager))
.collect(toImmutableSet());
Supplier<Map<ColumnHandle, NullableValue>> partitionValues = memoize(() -> getPartitionValues(partitionColumns, partitionKeys));
if (!partitionMatchesPredicate(partitionColumns, partitionValues, dynamicFilterPredicate)) {
if (!partitionMatchesPredicate(partitionColumns, partitionValues, effectivePredicate)) {
return TupleDomain.none();
}
// Filter out partition columns domains from the dynamic filter because they should be irrelevant at data file level
dynamicFilterPredicate = dynamicFilterPredicate
return effectivePredicate
.filter((columnHandle, domain) -> !partitionKeys.containsKey(columnHandle.getId()));
return unenforcedPredicate.intersect(dynamicFilterPredicate);
}

private Set<IcebergColumnHandle> requiredColumnsForDeletes(Schema schema, List<DeleteFile> deletes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,169 @@ public void testDynamicSplitPruningWithExplicitPartitionFilter()
}
}

@Test
public void testDynamicSplitPruningWithExplicitPartitionFilterPartitionEvolution()
throws IOException
{
String tableName = "sales_table";
String yearColumnName = "year";
ColumnIdentity yearColumnIdentity = new ColumnIdentity(1, yearColumnName, PRIMITIVE, ImmutableList.of());
IcebergColumnHandle yearColumnHandle = new IcebergColumnHandle(yearColumnIdentity, INTEGER, ImmutableList.of(), INTEGER, Optional.empty());
long yearColumnValue = 2023L;
String monthColumnName = "month";
ColumnIdentity monthColumnIdentity = new ColumnIdentity(2, monthColumnName, PRIMITIVE, ImmutableList.of());
IcebergColumnHandle monthColumnHandle = new IcebergColumnHandle(monthColumnIdentity, INTEGER, ImmutableList.of(), INTEGER, Optional.empty());
long monthColumnValue = 1L;
String receiptColumnName = "receipt";
ColumnIdentity receiptColumnIdentity = new ColumnIdentity(3, receiptColumnName, PRIMITIVE, ImmutableList.of());
IcebergColumnHandle receiptColumnHandle = new IcebergColumnHandle(receiptColumnIdentity, VARCHAR, ImmutableList.of(), VARCHAR, Optional.empty());
String receiptColumnValue = "#12345";
String amountColumnName = "amount";
ColumnIdentity amountColumnIdentity = new ColumnIdentity(4, amountColumnName, PRIMITIVE, ImmutableList.of());
DecimalType amountColumnType = DecimalType.createDecimalType(10, 2);
IcebergColumnHandle amountColumnHandle = new IcebergColumnHandle(amountColumnIdentity, amountColumnType, ImmutableList.of(), amountColumnType, Optional.empty());
BigDecimal amountColumnValue = new BigDecimal("1234567.65");
Schema tableSchema = new Schema(
optional(yearColumnIdentity.getId(), yearColumnName, Types.IntegerType.get()),
optional(monthColumnIdentity.getId(), monthColumnName, Types.IntegerType.get()),
optional(receiptColumnIdentity.getId(), receiptColumnName, Types.StringType.get()),
optional(amountColumnIdentity.getId(), amountColumnName, Types.DecimalType.of(10, 2)));
PartitionSpec partitionSpec = PartitionSpec.builderFor(tableSchema)
.identity(yearColumnName)
.build();

IcebergConfig icebergConfig = new IcebergConfig();
HiveTransactionHandle transaction = new HiveTransactionHandle(false);
try (TempFile file = new TempFile()) {
Files.delete(file.path());

TrinoOutputFile outputFile = new LocalOutputFile(file.file());
TrinoInputFile inputFile = new LocalInputFile(file.file());
List<String> columnNames = ImmutableList.of(yearColumnName, monthColumnName, receiptColumnName, amountColumnName);
List<Type> types = ImmutableList.of(INTEGER, INTEGER, VARCHAR, amountColumnType);

try (OrcWriter writer = new OrcWriter(
OutputStreamOrcDataSink.create(outputFile),
columnNames,
types,
toOrcType(tableSchema),
NONE,
new OrcWriterOptions(),
ImmutableMap.of(),
true,
OrcWriteValidation.OrcWriteValidationMode.BOTH,
new OrcWriterStats())) {
BlockBuilder yearBuilder = INTEGER.createBlockBuilder(null, 1);
INTEGER.writeLong(yearBuilder, yearColumnValue);
BlockBuilder monthBuilder = INTEGER.createBlockBuilder(null, 1);
INTEGER.writeLong(monthBuilder, monthColumnValue);
BlockBuilder receiptBuilder = VARCHAR.createBlockBuilder(null, 1);
VARCHAR.writeString(receiptBuilder, receiptColumnValue);
BlockBuilder amountBuilder = amountColumnType.createBlockBuilder(null, 1);
writeShortDecimal(amountBuilder, amountColumnValue.unscaledValue().longValueExact());
writer.write(new Page(yearBuilder.build(), monthBuilder.build(), receiptBuilder.build(), amountBuilder.build()));
}

IcebergSplit split = new IcebergSplit(
inputFile.toString(),
0,
inputFile.length(),
inputFile.length(),
-1, // invalid; normally known
ORC,
PartitionSpecParser.toJson(partitionSpec),
PartitionData.toJson(new PartitionData(new Object[] {yearColumnValue})),
ImmutableList.of(),
SplitWeight.standard());

String tablePath = inputFile.location().fileName();
// Simulate the situation where `month` column is added at a later phase as partitioning column
// in addition to the `year` column, which leads to use it as unenforced predicate in the table handle
// after applying the filter
TableHandle tableHandle = new TableHandle(
TEST_CATALOG_HANDLE,
new IcebergTableHandle(
CatalogHandle.fromId("iceberg:NORMAL:v12345"),
"test_schema",
tableName,
TableType.DATA,
Optional.empty(),
SchemaParser.toJson(tableSchema),
Optional.of(PartitionSpecParser.toJson(partitionSpec)),
2,
TupleDomain.withColumnDomains(
ImmutableMap.of(
yearColumnHandle,
Domain.create(ValueSet.ofRanges(Range.range(INTEGER, 2023L, true, 2024L, true)), true))),
TupleDomain.withColumnDomains(
ImmutableMap.of(
monthColumnHandle,
Domain.create(ValueSet.ofRanges(Range.range(INTEGER, 1L, true, 12L, true)), true))),
OptionalLong.empty(),
ImmutableSet.of(yearColumnHandle, monthColumnHandle, receiptColumnHandle, amountColumnHandle),
Optional.empty(),
tablePath,
ImmutableMap.of(),
false,
Optional.empty(),
ImmutableSet.of(),
Optional.of(false)),
transaction);

// Simulate situations where the dynamic filter (e.g.: while performing a JOIN with another table) reduces considerably
// the amount of data to be processed from the current table
TupleDomain<ColumnHandle> differentYearPredicate = TupleDomain.withColumnDomains(
ImmutableMap.of(
yearColumnHandle,
Domain.singleValue(INTEGER, 2024L)));
TupleDomain<ColumnHandle> sameYearAndDifferentMonthPredicate = TupleDomain.withColumnDomains(
ImmutableMap.of(
yearColumnHandle,
Domain.singleValue(INTEGER, 2023L),
monthColumnHandle,
Domain.singleValue(INTEGER, 2L)));
for (TupleDomain<ColumnHandle> partitionPredicate : List.of(differentYearPredicate, sameYearAndDifferentMonthPredicate)) {
try (ConnectorPageSource emptyPageSource = createTestingPageSource(
transaction,
icebergConfig,
split,
tableHandle,
ImmutableList.of(yearColumnHandle, monthColumnHandle, receiptColumnHandle, amountColumnHandle),
getDynamicFilter(partitionPredicate))) {
assertThat(emptyPageSource.getNextPage()).isNull();
}
}

TupleDomain<ColumnHandle> sameYearPredicate = TupleDomain.withColumnDomains(
ImmutableMap.of(
yearColumnHandle,
Domain.singleValue(INTEGER, 2023L)));
TupleDomain<ColumnHandle> sameYearAndMonthPredicate = TupleDomain.withColumnDomains(
ImmutableMap.of(
yearColumnHandle,
Domain.singleValue(INTEGER, 2023L),
monthColumnHandle,
Domain.singleValue(INTEGER, 1L)));
for (TupleDomain<ColumnHandle> partitionPredicate : List.of(sameYearPredicate, sameYearAndMonthPredicate)) {
try (ConnectorPageSource nonEmptyPageSource = createTestingPageSource(
transaction,
icebergConfig,
split,
tableHandle,
ImmutableList.of(yearColumnHandle, monthColumnHandle, receiptColumnHandle, amountColumnHandle),
getDynamicFilter(partitionPredicate))) {
Page page = nonEmptyPageSource.getNextPage();
assertThat(page).isNotNull();
assertThat(page.getPositionCount()).isEqualTo(1);
assertThat(page.getBlock(0).getInt(0, 0)).isEqualTo(2023L);
assertThat(page.getBlock(1).getInt(0, 0)).isEqualTo(1L);
assertThat(page.getBlock(2).getSlice(0, 0, page.getBlock(2).getSliceLength(0)).toStringUtf8()).isEqualTo(receiptColumnValue);
assertThat(((SqlDecimal) amountColumnType.getObjectValue(null, page.getBlock(3), 0)).toBigDecimal()).isEqualTo(amountColumnValue);
}
}
}
}

private static ConnectorPageSource createTestingPageSource(
HiveTransactionHandle transaction,
IcebergConfig icebergConfig,
Expand Down

0 comments on commit 007eda2

Please sign in to comment.