diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 1adab5feb5e7..5212772b99e9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -15,11 +15,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import com.google.common.io.Closer; import io.airlift.units.DataSize; import io.airlift.units.Duration; +import io.trino.cache.NonEvictableCache; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; @@ -61,6 +65,7 @@ import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkArgument; @@ -72,6 +77,8 @@ import static com.google.common.collect.Sets.intersection; import static com.google.common.math.LongMath.saturatedAdd; import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.cache.CacheUtils.uncheckedCacheGet; +import static io.trino.cache.SafeCaches.buildNonEvictableCache; import static io.trino.plugin.iceberg.ExpressionConverter.isConvertableToIcebergExpression; import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; import static io.trino.plugin.iceberg.IcebergColumnHandle.fileModifiedTimeColumnHandle; @@ -109,7 +116,7 @@ public class IcebergSplitSource private final DynamicFilter dynamicFilter; private final long dynamicFilteringWaitTimeoutMillis; private final Stopwatch dynamicFilterWaitStopwatch; - private final Constraint constraint; + private final PartitionConstraintMatcher partitionConstraintMatcher; private final TypeManager typeManager; private final Closer closer = Closer.create(); private final double minimumAssignedSplitWeight; @@ -151,7 +158,7 @@ public IcebergSplitSource( this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null"); this.dynamicFilteringWaitTimeoutMillis = dynamicFilteringWaitTimeout.toMillis(); this.dynamicFilterWaitStopwatch = Stopwatch.createStarted(); - this.constraint = requireNonNull(constraint, "constraint is null"); + this.partitionConstraintMatcher = new PartitionConstraintMatcher(constraint); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.recordScannedFiles = recordScannedFiles; this.minimumAssignedSplitWeight = minimumAssignedSplitWeight; @@ -309,7 +316,7 @@ private boolean pruneFileScanTask(FileScanTask fileScanTask, boolean fileHasNoDe } } - return !partitionMatchesConstraint(identityPartitionColumns, partitionValues, constraint); + return !partitionConstraintMatcher.matches(identityPartitionColumns, partitionValues); } private boolean noDataColumnsProjected(FileScanTask fileScanTask) @@ -436,19 +443,38 @@ else if (upperBound != null) { return Domain.create(ValueSet.ofRanges(statisticsRange), mayContainNulls); } - static boolean partitionMatchesConstraint( - Set identityPartitionColumns, - Supplier> partitionValues, - Constraint constraint) + private static class PartitionConstraintMatcher { - // We use Constraint just to pass functional predicate here from DistributedExecutionPlanner - verify(constraint.getSummary().isAll()); + private final NonEvictableCache, Boolean> partitionConstraintResults; + private final Optional>> predicate; + private final Optional> predicateColumns; + + private PartitionConstraintMatcher(Constraint constraint) + { + // We use Constraint just to pass functional predicate here from DistributedExecutionPlanner + verify(constraint.getSummary().isAll()); + this.predicate = constraint.predicate(); + this.predicateColumns = constraint.getPredicateColumns(); + this.partitionConstraintResults = buildNonEvictableCache(CacheBuilder.newBuilder().maximumSize(1000)); + } - if (constraint.predicate().isEmpty() || - intersection(constraint.getPredicateColumns().orElseThrow(), identityPartitionColumns).isEmpty()) { - return true; + boolean matches( + Set identityPartitionColumns, + Supplier> partitionValuesSupplier) + { + if (predicate.isEmpty()) { + return true; + } + Set predicatePartitionColumns = intersection(predicateColumns.orElseThrow(), identityPartitionColumns); + if (predicatePartitionColumns.isEmpty()) { + return true; + } + Map partitionValues = partitionValuesSupplier.get(); + return uncheckedCacheGet( + partitionConstraintResults, + ImmutableMap.copyOf(Maps.filterKeys(partitionValues, predicatePartitionColumns::contains)), + () -> predicate.orElseThrow().test(partitionValues)); } - return constraint.predicate().get().test(partitionValues.get()); } @VisibleForTesting diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index ba732ec9dacf..4570584a180f 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -4236,6 +4236,18 @@ public void testSplitPruningForFilterOnPartitionColumn() verifySplitCount("SELECT * FROM " + tableName + " WHERE regionkey % 5 = 3", 1); assertUpdate("DROP TABLE " + tableName); + + // Partition by multiple columns + assertUpdate(noRedistributeWrites, "CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey', 'nationkey']) AS SELECT * FROM nation", 25); + // Create 2 files per partition + assertUpdate(noRedistributeWrites, "INSERT INTO " + tableName + " SELECT * FROM nation", 25); + // sanity check that table contains exactly 50 files + assertThat(computeScalar("SELECT count(*) FROM \"" + tableName + "$files\"")).isEqualTo(50L); + + verifySplitCount("SELECT * FROM " + tableName + " WHERE regionkey % 5 = 3", 10); + verifySplitCount("SELECT * FROM " + tableName + " WHERE (regionkey * 2) - nationkey = 0", 6); + + assertUpdate("DROP TABLE " + tableName); } @Test