From 88b4c4820c7f0eae359d801fab32ab21236f6d5d Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Wed, 27 Dec 2023 17:09:12 +0530 Subject: [PATCH] Extract pruneFileScanTask in IcebergSplitSource --- .../plugin/iceberg/IcebergSplitSource.java | 89 ++++++++++--------- 1 file changed, 48 insertions(+), 41 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 6432e6a8df19..9f12d7621d99 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -235,50 +235,10 @@ public CompletableFuture getNextBatch(int maxSize) FileScanTask wholeFileTask = fileScanIterator.next(); boolean fileHasNoDeletions = wholeFileTask.deletes().isEmpty(); - if (fileHasNoDeletions && - maxScannedFileSizeInBytes.isPresent() && - wholeFileTask.file().fileSizeInBytes() > maxScannedFileSizeInBytes.get()) { + if (pruneFileScanTask(wholeFileTask, fileHasNoDeletions, dynamicFilterPredicate)) { continue; } - if (!pathDomain.includesNullableValue(utf8Slice(wholeFileTask.file().path().toString()))) { - continue; - } - if (!fileModifiedTimeDomain.isAll()) { - long fileModifiedTime = getModificationTime(wholeFileTask.file().path().toString()); - if (!fileModifiedTimeDomain.includesNullableValue(packDateTimeWithZone(fileModifiedTime, UTC_KEY))) { - continue; - } - } - - Schema fileSchema = wholeFileTask.spec().schema(); - Map> partitionKeys = getPartitionKeys(wholeFileTask); - - Set identityPartitionColumns = partitionKeys.keySet().stream() - .map(fieldId -> getColumnHandle(fileSchema.findField(fieldId), typeManager)) - .collect(toImmutableSet()); - - Supplier> partitionValues = memoize(() -> getPartitionValues(identityPartitionColumns, partitionKeys)); - - if (!dynamicFilterPredicate.isAll() && !dynamicFilterPredicate.equals(pushedDownDynamicFilterPredicate)) { - if (!partitionMatchesPredicate( - identityPartitionColumns, - partitionValues, - dynamicFilterPredicate)) { - continue; - } - if (!fileMatchesPredicate( - fieldIdToType, - dynamicFilterPredicate, - wholeFileTask.file().lowerBounds(), - wholeFileTask.file().upperBounds(), - wholeFileTask.file().nullValueCounts())) { - continue; - } - } - if (!partitionMatchesConstraint(identityPartitionColumns, partitionValues, constraint)) { - continue; - } if (recordScannedFiles) { // Positional and Equality deletes can only be cleaned up if the whole table has been optimized. // Equality deletes may apply to many files, and position deletes may be grouped together. This makes it difficult to know if they are obsolete. @@ -305,6 +265,53 @@ public CompletableFuture getNextBatch(int maxSize) return completedFuture(new ConnectorSplitBatch(splits, isFinished())); } + private boolean pruneFileScanTask(FileScanTask fileScanTask, boolean fileHasNoDeletions, TupleDomain dynamicFilterPredicate) + { + if (fileHasNoDeletions && + maxScannedFileSizeInBytes.isPresent() && + fileScanTask.file().fileSizeInBytes() > maxScannedFileSizeInBytes.get()) { + return true; + } + + if (!pathDomain.includesNullableValue(utf8Slice(fileScanTask.file().path().toString()))) { + return true; + } + if (!fileModifiedTimeDomain.isAll()) { + long fileModifiedTime = getModificationTime(fileScanTask.file().path().toString()); + if (!fileModifiedTimeDomain.includesNullableValue(packDateTimeWithZone(fileModifiedTime, UTC_KEY))) { + return true; + } + } + + Schema fileSchema = fileScanTask.spec().schema(); + Map> partitionKeys = getPartitionKeys(fileScanTask); + + Set identityPartitionColumns = partitionKeys.keySet().stream() + .map(fieldId -> getColumnHandle(fileSchema.findField(fieldId), typeManager)) + .collect(toImmutableSet()); + + Supplier> partitionValues = memoize(() -> getPartitionValues(identityPartitionColumns, partitionKeys)); + + if (!dynamicFilterPredicate.isAll() && !dynamicFilterPredicate.equals(pushedDownDynamicFilterPredicate)) { + if (!partitionMatchesPredicate( + identityPartitionColumns, + partitionValues, + dynamicFilterPredicate)) { + return true; + } + if (!fileMatchesPredicate( + fieldIdToType, + dynamicFilterPredicate, + fileScanTask.file().lowerBounds(), + fileScanTask.file().upperBounds(), + fileScanTask.file().nullValueCounts())) { + return true; + } + } + + return !partitionMatchesConstraint(identityPartitionColumns, partitionValues, constraint); + } + private boolean noDataColumnsProjected(FileScanTask fileScanTask) { return fileScanTask.spec().fields().stream()