Skip to content

Commit

Permalink
Add config to reject Iceberg queries without partition filter
Browse files Browse the repository at this point in the history
Co-authored-by: zhangminglei <zhangminglei@bilibili.com>
  • Loading branch information
2 people authored and Marton Bod committed Oct 12, 2023
1 parent 5548599 commit 94d05bd
Show file tree
Hide file tree
Showing 11 changed files with 401 additions and 13 deletions.
4 changes: 4 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ implementation is used:
* - ``iceberg.register-table-procedure.enabled``
- Enable to allow user to call ``register_table`` procedure.
- ``false``
* - ``iceberg.query-partition-filter-required``
- Set to ``true`` to force a query to use a partition filter.
You can use the ``query_partition_filter_required`` catalog session property for temporary, catalog specific use.
- ``false``
```

## Type mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class IcebergConfig
private double minimumAssignedSplitWeight = 0.05;
private Optional<String> materializedViewsStorageSchema = Optional.empty();
private boolean sortedWritingEnabled = true;
private boolean queryPartitionFilterRequired;

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -367,4 +368,17 @@ public IcebergConfig setSortedWritingEnabled(boolean sortedWritingEnabled)
this.sortedWritingEnabled = sortedWritingEnabled;
return this;
}

@Config("iceberg.query-partition-filter-required")
@ConfigDescription("Require a filter on at least one partition column")
public IcebergConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired)
{
this.queryPartitionFilterRequired = queryPartitionFilterRequired;
return this;
}

public boolean isQueryPartitionFilterRequired()
{
return queryPartitionFilterRequired;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
Expand Down Expand Up @@ -217,6 +218,7 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isMergeManifestsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isQueryPartitionFilterRequired;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
Expand Down Expand Up @@ -258,6 +260,7 @@
import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY;
import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.QUERY_REJECTED;
import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.FRESH;
import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.STALE;
import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.UNKNOWN;
Expand Down Expand Up @@ -429,6 +432,8 @@ public ConnectorTableHandle getTableHandle(
table.location(),
table.properties(),
false,
Optional.empty(),
ImmutableSet.of(),
Optional.empty());
}

Expand Down Expand Up @@ -657,6 +662,41 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
.build();
}

@Override
public void validateScan(ConnectorSession session, ConnectorTableHandle handle)
{
IcebergTableHandle table = (IcebergTableHandle) handle;
if (isQueryPartitionFilterRequired(session) && table.getEnforcedPredicate().isAll() && table.getAnalyzeColumns().isEmpty()) {
Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());
Optional<PartitionSpec> partitionSpec = table.getPartitionSpecJson()
.map(partitionSpecJson -> PartitionSpecParser.fromJson(schema, partitionSpecJson));
if (partitionSpec.isEmpty() || partitionSpec.get().isUnpartitioned()) {
return;
}
Set<Integer> columnsWithPredicates = new HashSet<>();
table.getConstraintColumns().stream()
.map(IcebergColumnHandle::getId)
.forEach(columnsWithPredicates::add);
table.getUnenforcedPredicate().getDomains().ifPresent(domain -> domain.keySet().stream()
.map(IcebergColumnHandle::getId)
.forEach(columnsWithPredicates::add));
Set<Integer> partitionColumns = partitionSpec.get().fields().stream()
.filter(field -> !field.transform().isVoid())
.map(PartitionField::sourceId)
.collect(toImmutableSet());
if (Collections.disjoint(columnsWithPredicates, partitionColumns)) {
String partitionColumnNames = partitionSpec.get().fields().stream()
.filter(field -> !field.transform().isVoid())
.map(PartitionField::sourceId)
.map(id -> schema.idToName().get(id))
.collect(joining(", "));
throw new TrinoException(
QUERY_REJECTED,
format("Filter required for %s on at least one of the partition columns: %s", table.getSchemaTableName(), partitionColumnNames));
}
}
}

@Override
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
{
Expand Down Expand Up @@ -2001,7 +2041,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
});

return new ConnectorAnalyzeMetadata(
tableHandle,
handle.withAnalyzeColumns(analyzeColumnNames.or(() -> Optional.of(ImmutableSet.of()))),
getStatisticsCollectionMetadata(
tableMetadata,
analyzeColumnNames,
Expand Down Expand Up @@ -2384,7 +2424,9 @@ public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(Connect
table.getTableLocation(),
table.getStorageProperties(),
table.isRecordScannedFiles(),
table.getMaxScannedFileSize());
table.getMaxScannedFileSize(),
table.getConstraintColumns(),
table.getAnalyzeColumns());

return Optional.of(new LimitApplicationResult<>(table, false, false));
}
Expand All @@ -2395,7 +2437,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
IcebergTableHandle table = (IcebergTableHandle) handle;
ConstraintExtractor.ExtractionResult extractionResult = extractTupleDomain(constraint);
TupleDomain<IcebergColumnHandle> predicate = extractionResult.tupleDomain();
if (predicate.isAll()) {
if (predicate.isAll() && constraint.getPredicateColumns().isEmpty()) {
return Optional.empty();
}
if (table.getLimit().isPresent()) {
Expand Down Expand Up @@ -2455,8 +2497,16 @@ else if (isMetadataColumnId(columnHandle.getId())) {
remainingConstraint = TupleDomain.withColumnDomains(newUnenforced).intersect(TupleDomain.withColumnDomains(unsupported));
}

Set<IcebergColumnHandle> newConstraintColumns = constraint.getPredicateColumns()
.map(columnHandles -> columnHandles.stream()
.map(columnHandle -> (IcebergColumnHandle) columnHandle)
.collect(toImmutableSet()))
.orElse(ImmutableSet.of());

if (newEnforcedConstraint.equals(table.getEnforcedPredicate())
&& newUnenforcedConstraint.equals(table.getUnenforcedPredicate())) {
&& newUnenforcedConstraint.equals(table.getUnenforcedPredicate())
&& newConstraintColumns.equals(table.getConstraintColumns())
&& constraint.getPredicateColumns().isEmpty()) {
return Optional.empty();
}

Expand All @@ -2478,7 +2528,9 @@ else if (isMetadataColumnId(columnHandle.getId())) {
table.getTableLocation(),
table.getStorageProperties(),
table.isRecordScannedFiles(),
table.getMaxScannedFileSize()),
table.getMaxScannedFileSize(),
Sets.union(table.getConstraintColumns(), newConstraintColumns),
table.getAnalyzeColumns()),
remainingConstraint.transformKeys(ColumnHandle.class::cast),
extractionResult.remainingExpression(),
false));
Expand Down Expand Up @@ -2626,7 +2678,9 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
originalHandle.getTableLocation(),
originalHandle.getStorageProperties(),
originalHandle.isRecordScannedFiles(),
originalHandle.getMaxScannedFileSize()),
originalHandle.getMaxScannedFileSize(),
originalHandle.getConstraintColumns(),
originalHandle.getAnalyzeColumns()),
handle -> {
Table icebergTable = catalog.loadTable(session, handle.getSchemaTableName());
return TableStatisticsReader.getTableStatistics(typeManager, session, handle, icebergTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public final class IcebergSessionProperties
public static final String REMOVE_ORPHAN_FILES_MIN_RETENTION = "remove_orphan_files_min_retention";
private static final String MERGE_MANIFESTS_ON_WRITE = "merge_manifests_on_write";
private static final String SORTED_WRITING_ENABLED = "sorted_writing_enabled";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -299,6 +300,11 @@ public IcebergSessionProperties(
"Enable sorted writing to tables with a specified sort order",
icebergConfig.isSortedWritingEnabled(),
false))
.add(booleanProperty(
QUERY_PARTITION_FILTER_REQUIRED,
"Require filter on partition column",
icebergConfig.isQueryPartitionFilterRequired(),
false))
.build();
}

Expand Down Expand Up @@ -489,4 +495,9 @@ public static boolean isSortedWritingEnabled(ConnectorSession session)
{
return session.getProperty(SORTED_WRITING_ENABLED, Boolean.class);
}

public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
{
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public class IcebergTableHandle
// Filter guaranteed to be enforced by Iceberg connector
private final TupleDomain<IcebergColumnHandle> enforcedPredicate;

// Columns that are present in {@link Constraint#predicate()} applied on the table scan
private final Set<IcebergColumnHandle> constraintColumns;

// semantically limit is applied after enforcedPredicate
private final OptionalLong limit;

Expand All @@ -65,6 +68,9 @@ public class IcebergTableHandle
private final boolean recordScannedFiles;
private final Optional<DataSize> maxScannedFileSize;

// ANALYZE only
private final Optional<Set<String>> analyzeColumns;

@JsonCreator
public static IcebergTableHandle fromJsonForDeserializationOnly(
@JsonProperty("catalog") CatalogHandle catalog,
Expand Down Expand Up @@ -100,6 +106,8 @@ public static IcebergTableHandle fromJsonForDeserializationOnly(
tableLocation,
storageProperties,
false,
Optional.empty(),
ImmutableSet.of(),
Optional.empty());
}

Expand All @@ -120,7 +128,9 @@ public IcebergTableHandle(
String tableLocation,
Map<String, String> storageProperties,
boolean recordScannedFiles,
Optional<DataSize> maxScannedFileSize)
Optional<DataSize> maxScannedFileSize,
Set<IcebergColumnHandle> constraintColumns,
Optional<Set<String>> analyzeColumns)
{
this.catalog = requireNonNull(catalog, "catalog is null");
this.schemaName = requireNonNull(schemaName, "schemaName is null");
Expand All @@ -139,6 +149,8 @@ public IcebergTableHandle(
this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null"));
this.recordScannedFiles = recordScannedFiles;
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null"));
this.analyzeColumns = requireNonNull(analyzeColumns, "analyzeColumns is null");
}

@JsonProperty
Expand Down Expand Up @@ -244,6 +256,18 @@ public Optional<DataSize> getMaxScannedFileSize()
return maxScannedFileSize;
}

@JsonIgnore
public Set<IcebergColumnHandle> getConstraintColumns()
{
return constraintColumns;
}

@JsonIgnore
public Optional<Set<String>> getAnalyzeColumns()
{
return analyzeColumns;
}

public SchemaTableName getSchemaTableName()
{
return new SchemaTableName(schemaName, tableName);
Expand Down Expand Up @@ -273,7 +297,33 @@ public IcebergTableHandle withProjectedColumns(Set<IcebergColumnHandle> projecte
tableLocation,
storageProperties,
recordScannedFiles,
maxScannedFileSize);
maxScannedFileSize,
constraintColumns,
analyzeColumns);
}

public IcebergTableHandle withAnalyzeColumns(Optional<Set<String>> analyzeColumns)
{
return new IcebergTableHandle(
catalog,
schemaName,
tableName,
tableType,
snapshotId,
tableSchemaJson,
partitionSpecJson,
formatVersion,
unenforcedPredicate,
enforcedPredicate,
limit,
projectedColumns,
nameMappingJson,
tableLocation,
storageProperties,
recordScannedFiles,
maxScannedFileSize,
constraintColumns,
analyzeColumns);
}

public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize)
Expand All @@ -295,7 +345,9 @@ public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxSc
tableLocation,
storageProperties,
recordScannedFiles,
Optional.of(maxScannedFileSize));
Optional.of(maxScannedFileSize),
constraintColumns,
analyzeColumns);
}

@Override
Expand Down Expand Up @@ -325,7 +377,9 @@ public boolean equals(Object o)
Objects.equals(nameMappingJson, that.nameMappingJson) &&
Objects.equals(tableLocation, that.tableLocation) &&
Objects.equals(storageProperties, that.storageProperties) &&
Objects.equals(maxScannedFileSize, that.maxScannedFileSize);
Objects.equals(maxScannedFileSize, that.maxScannedFileSize) &&
Objects.equals(constraintColumns, that.constraintColumns) &&
Objects.equals(analyzeColumns, that.analyzeColumns);
}

@Override
Expand All @@ -348,7 +402,9 @@ public int hashCode()
tableLocation,
storageProperties,
recordScannedFiles,
maxScannedFileSize);
maxScannedFileSize,
constraintColumns,
analyzeColumns);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,24 @@ public void testMetadataTables()
}
}

@Test
public void testPartitionFilterRequired()
{
String tableName = "test_partition_" + randomNameSuffix();

Session session = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", "query_partition_filter_required", "true")
.build();

assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])");
assertUpdate(session, "INSERT INTO " + tableName + " (id, a, ds) VALUES (1, 'a', 'a')", 1);
String query = "SELECT id FROM " + tableName + " WHERE a = 'a'";
String failureMessage = "Filter required for tpch.*\\." + tableName + " on at least one of the partition columns: ds";
assertQueryFails(session, query, failureMessage);
assertQueryFails(session, "EXPLAIN " + query, failureMessage);
assertUpdate(session, "DROP TABLE " + tableName);
}

protected abstract boolean isFileSorted(Location path, String sortColumnName);

@Test
Expand Down
Loading

0 comments on commit 94d05bd

Please sign in to comment.