Skip to content

Commit

Permalink
Cache result of evaluating constraint per partition in iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka committed Jan 10, 2024
1 parent 6021f33 commit d071fa2
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.io.Closer;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.cache.NonEvictableCache;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
Expand Down Expand Up @@ -61,6 +65,7 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -72,6 +77,8 @@
import static com.google.common.collect.Sets.intersection;
import static com.google.common.math.LongMath.saturatedAdd;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.cache.CacheUtils.uncheckedCacheGet;
import static io.trino.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.plugin.iceberg.ExpressionConverter.isConvertableToIcebergExpression;
import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import static io.trino.plugin.iceberg.IcebergColumnHandle.fileModifiedTimeColumnHandle;
Expand Down Expand Up @@ -109,7 +116,7 @@ public class IcebergSplitSource
private final DynamicFilter dynamicFilter;
private final long dynamicFilteringWaitTimeoutMillis;
private final Stopwatch dynamicFilterWaitStopwatch;
private final Constraint constraint;
private final PartitionConstraintMatcher partitionConstraintMatcher;
private final TypeManager typeManager;
private final Closer closer = Closer.create();
private final double minimumAssignedSplitWeight;
Expand Down Expand Up @@ -151,7 +158,7 @@ public IcebergSplitSource(
this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null");
this.dynamicFilteringWaitTimeoutMillis = dynamicFilteringWaitTimeout.toMillis();
this.dynamicFilterWaitStopwatch = Stopwatch.createStarted();
this.constraint = requireNonNull(constraint, "constraint is null");
this.partitionConstraintMatcher = new PartitionConstraintMatcher(constraint);
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.recordScannedFiles = recordScannedFiles;
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
Expand Down Expand Up @@ -309,7 +316,7 @@ private boolean pruneFileScanTask(FileScanTask fileScanTask, boolean fileHasNoDe
}
}

return !partitionMatchesConstraint(identityPartitionColumns, partitionValues, constraint);
return !partitionConstraintMatcher.matches(identityPartitionColumns, partitionValues);
}

private boolean noDataColumnsProjected(FileScanTask fileScanTask)
Expand Down Expand Up @@ -436,19 +443,38 @@ else if (upperBound != null) {
return Domain.create(ValueSet.ofRanges(statisticsRange), mayContainNulls);
}

static boolean partitionMatchesConstraint(
Set<IcebergColumnHandle> identityPartitionColumns,
Supplier<Map<ColumnHandle, NullableValue>> partitionValues,
Constraint constraint)
private static class PartitionConstraintMatcher
{
// We use Constraint just to pass functional predicate here from DistributedExecutionPlanner
verify(constraint.getSummary().isAll());
private final NonEvictableCache<Map<ColumnHandle, NullableValue>, Boolean> partitionConstraintResults;
private final Optional<Predicate<Map<ColumnHandle, NullableValue>>> predicate;
private final Optional<Set<ColumnHandle>> predicateColumns;

private PartitionConstraintMatcher(Constraint constraint)
{
// We use Constraint just to pass functional predicate here from DistributedExecutionPlanner
verify(constraint.getSummary().isAll());
this.predicate = constraint.predicate();
this.predicateColumns = constraint.getPredicateColumns();
this.partitionConstraintResults = buildNonEvictableCache(CacheBuilder.newBuilder().maximumSize(1000));
}

if (constraint.predicate().isEmpty() ||
intersection(constraint.getPredicateColumns().orElseThrow(), identityPartitionColumns).isEmpty()) {
return true;
boolean matches(
Set<IcebergColumnHandle> identityPartitionColumns,
Supplier<Map<ColumnHandle, NullableValue>> partitionValuesSupplier)
{
if (predicate.isEmpty()) {
return true;
}
Set<ColumnHandle> predicatePartitionColumns = intersection(predicateColumns.orElseThrow(), identityPartitionColumns);
if (predicatePartitionColumns.isEmpty()) {
return true;
}
Map<ColumnHandle, NullableValue> partitionValues = partitionValuesSupplier.get();
return uncheckedCacheGet(
partitionConstraintResults,
ImmutableMap.copyOf(Maps.filterKeys(partitionValues, predicatePartitionColumns::contains)),
() -> predicate.orElseThrow().test(partitionValues));
}
return constraint.predicate().get().test(partitionValues.get());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4236,6 +4236,18 @@ public void testSplitPruningForFilterOnPartitionColumn()
verifySplitCount("SELECT * FROM " + tableName + " WHERE regionkey % 5 = 3", 1);

assertUpdate("DROP TABLE " + tableName);

// Partition by multiple columns
assertUpdate(noRedistributeWrites, "CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey', 'nationkey']) AS SELECT * FROM nation", 25);
// Create 2 files per partition
assertUpdate(noRedistributeWrites, "INSERT INTO " + tableName + " SELECT * FROM nation", 25);
// sanity check that table contains exactly 50 files
assertThat(computeScalar("SELECT count(*) FROM \"" + tableName + "$files\"")).isEqualTo(50L);

verifySplitCount("SELECT * FROM " + tableName + " WHERE regionkey % 5 = 3", 10);
verifySplitCount("SELECT * FROM " + tableName + " WHERE (regionkey * 2) - nationkey = 0", 6);

assertUpdate("DROP TABLE " + tableName);
}

@Test
Expand Down

0 comments on commit d071fa2

Please sign in to comment.