Skip to content

Commit

Permalink
Extract pruneFileScanTask in IcebergSplitSource
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka committed Dec 27, 2023
1 parent 85cd8e4 commit 88b4c48
Showing 1 changed file with 48 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,50 +235,10 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
FileScanTask wholeFileTask = fileScanIterator.next();
boolean fileHasNoDeletions = wholeFileTask.deletes().isEmpty();

if (fileHasNoDeletions &&
maxScannedFileSizeInBytes.isPresent() &&
wholeFileTask.file().fileSizeInBytes() > maxScannedFileSizeInBytes.get()) {
if (pruneFileScanTask(wholeFileTask, fileHasNoDeletions, dynamicFilterPredicate)) {
continue;
}

if (!pathDomain.includesNullableValue(utf8Slice(wholeFileTask.file().path().toString()))) {
continue;
}
if (!fileModifiedTimeDomain.isAll()) {
long fileModifiedTime = getModificationTime(wholeFileTask.file().path().toString());
if (!fileModifiedTimeDomain.includesNullableValue(packDateTimeWithZone(fileModifiedTime, UTC_KEY))) {
continue;
}
}

Schema fileSchema = wholeFileTask.spec().schema();
Map<Integer, Optional<String>> partitionKeys = getPartitionKeys(wholeFileTask);

Set<IcebergColumnHandle> identityPartitionColumns = partitionKeys.keySet().stream()
.map(fieldId -> getColumnHandle(fileSchema.findField(fieldId), typeManager))
.collect(toImmutableSet());

Supplier<Map<ColumnHandle, NullableValue>> partitionValues = memoize(() -> getPartitionValues(identityPartitionColumns, partitionKeys));

if (!dynamicFilterPredicate.isAll() && !dynamicFilterPredicate.equals(pushedDownDynamicFilterPredicate)) {
if (!partitionMatchesPredicate(
identityPartitionColumns,
partitionValues,
dynamicFilterPredicate)) {
continue;
}
if (!fileMatchesPredicate(
fieldIdToType,
dynamicFilterPredicate,
wholeFileTask.file().lowerBounds(),
wholeFileTask.file().upperBounds(),
wholeFileTask.file().nullValueCounts())) {
continue;
}
}
if (!partitionMatchesConstraint(identityPartitionColumns, partitionValues, constraint)) {
continue;
}
if (recordScannedFiles) {
// Positional and Equality deletes can only be cleaned up if the whole table has been optimized.
// Equality deletes may apply to many files, and position deletes may be grouped together. This makes it difficult to know if they are obsolete.
Expand All @@ -305,6 +265,53 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
return completedFuture(new ConnectorSplitBatch(splits, isFinished()));
}

private boolean pruneFileScanTask(FileScanTask fileScanTask, boolean fileHasNoDeletions, TupleDomain<IcebergColumnHandle> dynamicFilterPredicate)
{
if (fileHasNoDeletions &&
maxScannedFileSizeInBytes.isPresent() &&
fileScanTask.file().fileSizeInBytes() > maxScannedFileSizeInBytes.get()) {
return true;
}

if (!pathDomain.includesNullableValue(utf8Slice(fileScanTask.file().path().toString()))) {
return true;
}
if (!fileModifiedTimeDomain.isAll()) {
long fileModifiedTime = getModificationTime(fileScanTask.file().path().toString());
if (!fileModifiedTimeDomain.includesNullableValue(packDateTimeWithZone(fileModifiedTime, UTC_KEY))) {
return true;
}
}

Schema fileSchema = fileScanTask.spec().schema();
Map<Integer, Optional<String>> partitionKeys = getPartitionKeys(fileScanTask);

Set<IcebergColumnHandle> identityPartitionColumns = partitionKeys.keySet().stream()
.map(fieldId -> getColumnHandle(fileSchema.findField(fieldId), typeManager))
.collect(toImmutableSet());

Supplier<Map<ColumnHandle, NullableValue>> partitionValues = memoize(() -> getPartitionValues(identityPartitionColumns, partitionKeys));

if (!dynamicFilterPredicate.isAll() && !dynamicFilterPredicate.equals(pushedDownDynamicFilterPredicate)) {
if (!partitionMatchesPredicate(
identityPartitionColumns,
partitionValues,
dynamicFilterPredicate)) {
return true;
}
if (!fileMatchesPredicate(
fieldIdToType,
dynamicFilterPredicate,
fileScanTask.file().lowerBounds(),
fileScanTask.file().upperBounds(),
fileScanTask.file().nullValueCounts())) {
return true;
}
}

return !partitionMatchesConstraint(identityPartitionColumns, partitionValues, constraint);
}

private boolean noDataColumnsProjected(FileScanTask fileScanTask)
{
return fileScanTask.spec().fields().stream()
Expand Down

0 comments on commit 88b4c48

Please sign in to comment.