Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exclude files that do not satisfy the metadata predicate #17693

Merged
merged 2 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2801,7 +2801,12 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
Optional.empty()), // forAnalyze does not affect stats
handle -> {
Table icebergTable = catalog.loadTable(session, handle.getSchemaTableName());
return TableStatisticsReader.getTableStatistics(typeManager, session, handle, icebergTable);
chenjian2664 marked this conversation as resolved.
Show resolved Hide resolved
chenjian2664 marked this conversation as resolved.
Show resolved Hide resolved
return TableStatisticsReader.getTableStatistics(
typeManager,
session,
handle,
icebergTable,
fileSystemFactory.create(session.getIdentity(), icebergTable.io().properties()));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,10 @@
import io.airlift.units.DataSize;
chenjian2664 marked this conversation as resolved.
Show resolved Hide resolved
import io.airlift.units.Duration;
import io.trino.cache.NonEvictableCache;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.cache.CachingHostAddressProvider;
import io.trino.plugin.iceberg.delete.DeleteFile;
import io.trino.plugin.iceberg.util.DataFileWithDeleteFiles;
import io.trino.spi.SplitWeight;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
Expand Down Expand Up @@ -82,16 +79,16 @@
import static io.trino.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.plugin.iceberg.ExpressionConverter.isConvertableToIcebergExpression;
import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import static io.trino.plugin.iceberg.IcebergColumnHandle.fileModifiedTimeColumnHandle;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getSplitSize;
import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD;
import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino;
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
import static io.trino.plugin.iceberg.IcebergUtil.getFileModifiedTimePathDomain;
import static io.trino.plugin.iceberg.IcebergUtil.getModificationTime;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionValues;
import static io.trino.plugin.iceberg.IcebergUtil.getPathDomain;
import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes;
import static io.trino.plugin.iceberg.TypeConverter.toIcebergType;
import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone;
Expand Down Expand Up @@ -298,7 +295,7 @@ private boolean pruneFileScanTask(FileScanTask fileScanTask, boolean fileHasNoDe
return true;
}
if (!fileModifiedTimeDomain.isAll()) {
long fileModifiedTime = getModificationTime(fileScanTask.file().path().toString());
long fileModifiedTime = getModificationTime(fileScanTask.file().path().toString(), fileSystemFactory.create(session.getIdentity(), fileIoProperties));
if (!fileModifiedTimeDomain.includesNullableValue(packDateTimeWithZone(fileModifiedTime, UTC_KEY))) {
return true;
}
Expand Down Expand Up @@ -337,17 +334,6 @@ private boolean noDataColumnsProjected(FileScanTask fileScanTask)
.containsAll(projectedBaseColumns);
}

private long getModificationTime(String path)
{
try {
TrinoInputFile inputFile = fileSystemFactory.create(session.getIdentity(), fileIoProperties).newInputFile(Location.of(path));
return inputFile.lastModified().toEpochMilli();
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to get file modification time: " + path, e);
}
}

private void finish()
{
close();
Expand Down Expand Up @@ -533,26 +519,4 @@ private IcebergSplit toIcebergSplit(FileScanTask task, TupleDomain<IcebergColumn
fileIoProperties,
cachingHostAddressProvider.getHosts(task.file().path().toString(), ImmutableList.of()));
}

private static Domain getPathDomain(TupleDomain<IcebergColumnHandle> effectivePredicate)
{
IcebergColumnHandle pathColumn = pathColumnHandle();
Domain domain = effectivePredicate.getDomains().orElseThrow(() -> new IllegalArgumentException("Unexpected NONE tuple domain"))
.get(pathColumn);
if (domain == null) {
return Domain.all(pathColumn.getType());
}
return domain;
}

private static Domain getFileModifiedTimePathDomain(TupleDomain<IcebergColumnHandle> effectivePredicate)
{
IcebergColumnHandle fileModifiedTimeColumn = fileModifiedTimeColumnHandle();
Domain domain = effectivePredicate.getDomains().orElseThrow(() -> new IllegalArgumentException("Unexpected NONE tuple domain"))
.get(fileModifiedTimeColumn);
if (domain == null) {
return Domain.all(fileModifiedTimeColumn.getType());
}
return domain;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.plugin.iceberg.PartitionTransforms.ColumnTransform;
import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
Expand All @@ -42,6 +43,7 @@
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Int128;
Expand Down Expand Up @@ -102,7 +104,9 @@
import static io.trino.plugin.base.io.ByteBuffers.getWrappedBytes;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity;
import static io.trino.plugin.iceberg.IcebergColumnHandle.fileModifiedTimeColumnHandle;
import static io.trino.plugin.iceberg.IcebergColumnHandle.fileModifiedTimeColumnMetadata;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnMetadata;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
Expand Down Expand Up @@ -883,4 +887,37 @@ else if (versionNumber == latestMetadataVersion) {
}
return getOnlyElement(latestMetadataLocations).toString();
}

public static Domain getPathDomain(TupleDomain<IcebergColumnHandle> effectivePredicate)
chenjian2664 marked this conversation as resolved.
Show resolved Hide resolved
{
IcebergColumnHandle pathColumn = pathColumnHandle();
Domain domain = effectivePredicate.getDomains().orElseThrow(() -> new IllegalArgumentException("Unexpected NONE tuple domain"))
.get(pathColumn);
if (domain == null) {
return Domain.all(pathColumn.getType());
}
return domain;
}

public static Domain getFileModifiedTimePathDomain(TupleDomain<IcebergColumnHandle> effectivePredicate)
{
IcebergColumnHandle fileModifiedTimeColumn = fileModifiedTimeColumnHandle();
Domain domain = effectivePredicate.getDomains().orElseThrow(() -> new IllegalArgumentException("Unexpected NONE tuple domain"))
.get(fileModifiedTimeColumn);
if (domain == null) {
return Domain.all(fileModifiedTimeColumn.getType());
}
return domain;
}

public static long getModificationTime(String path, TrinoFileSystem fileSystem)
{
try {
TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(path));
return inputFile.lastModified().toEpochMilli();
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to get file modification time: " + path, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.airlift.log.Logger;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.statistics.ColumnStatistics;
import io.trino.spi.statistics.DoubleRange;
Expand Down Expand Up @@ -56,11 +58,17 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Streams.stream;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergUtil.getColumns;
import static io.trino.plugin.iceberg.IcebergUtil.getFileModifiedTimePathDomain;
import static io.trino.plugin.iceberg.IcebergUtil.getModificationTime;
import static io.trino.plugin.iceberg.IcebergUtil.getPathDomain;
import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.Long.parseLong;
Expand All @@ -77,15 +85,16 @@ private TableStatisticsReader() {}

public static final String APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY = "ndv";

public static TableStatistics getTableStatistics(TypeManager typeManager, ConnectorSession session, IcebergTableHandle tableHandle, Table icebergTable)
public static TableStatistics getTableStatistics(TypeManager typeManager, ConnectorSession session, IcebergTableHandle tableHandle, Table icebergTable, TrinoFileSystem fileSystem)
{
return makeTableStatistics(
typeManager,
icebergTable,
tableHandle.getSnapshotId(),
tableHandle.getEnforcedPredicate(),
tableHandle.getUnenforcedPredicate(),
isExtendedStatisticsEnabled(session));
isExtendedStatisticsEnabled(session),
fileSystem);
}

@VisibleForTesting
Expand All @@ -95,7 +104,8 @@ public static TableStatistics makeTableStatistics(
Optional<Long> snapshot,
TupleDomain<IcebergColumnHandle> enforcedConstraint,
TupleDomain<IcebergColumnHandle> unenforcedConstraint,
boolean extendedStatisticsEnabled)
boolean extendedStatisticsEnabled,
TrinoFileSystem fileSystem)
{
if (snapshot.isEmpty()) {
// No snapshot, so no data.
Expand Down Expand Up @@ -125,15 +135,28 @@ public static TableStatistics makeTableStatistics(
.map(column -> Maps.immutableEntry(column.fieldId(), column.type()))
.collect(toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));

Domain pathDomain = getPathDomain(effectivePredicate);
Domain fileModifiedTimeDomain = getFileModifiedTimePathDomain(effectivePredicate);
TableScan tableScan = icebergTable.newScan()
// Table enforced constraint may include eg $path column predicate which is not handled by Iceberg library TODO apply $path and $file_modified_time filters here
.filter(toIcebergExpression(effectivePredicate.filter((column, domain) -> !isMetadataColumnId(column.getId()))))
.useSnapshot(snapshotId)
.includeColumnStats();

IcebergStatistics.Builder icebergStatisticsBuilder = new IcebergStatistics.Builder(columns, typeManager);
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
fileScanTasks.forEach(fileScanTask -> icebergStatisticsBuilder.acceptDataFile(fileScanTask.file(), fileScanTask.spec()));
fileScanTasks.forEach(fileScanTask -> {
chenjian2664 marked this conversation as resolved.
Show resolved Hide resolved
if (!pathDomain.isAll() && !pathDomain.includesNullableValue(utf8Slice(fileScanTask.file().path().toString()))) {
return;
}
if (!fileModifiedTimeDomain.isAll()) {
long fileModifiedTime = getModificationTime(fileScanTask.file().path().toString(), fileSystem);
if (!fileModifiedTimeDomain.includesNullableValue(packDateTimeWithZone(fileModifiedTime, UTC_KEY))) {
return;
}
}

icebergStatisticsBuilder.acceptDataFile(fileScanTask.file(), fileScanTask.spec());
});
}
catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Loading