From 3517ae7c3dcf68ca5080729b016ed8e1a9c9465e Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Wed, 27 Dec 2023 16:28:57 +0530 Subject: [PATCH] Add parquet_ignore_statistics session property to delta lake --- .../DeltaLakePageSourceProvider.java | 4 +- .../deltalake/DeltaLakeSessionProperties.java | 11 +++++ .../TableChangesFunctionProcessor.java | 4 +- .../deltalake/TestPredicatePushdown.java | 42 +++++++++++++++++++ 4 files changed, 59 insertions(+), 2 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java index 4a56b213cd53..0b0b0d27c8bc 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java @@ -86,6 +86,7 @@ import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockRowCount; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockSize; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetSmallFileThreshold; +import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetIgnoreStatistics; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetUseColumnIndex; import static io.trino.plugin.deltalake.delete.DeletionVectors.readDeletionVectors; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; @@ -204,7 +205,8 @@ public ConnectorPageSource createPageSource( ParquetReaderOptions options = parquetReaderOptions.withMaxReadBlockSize(getParquetMaxReadBlockSize(session)) .withMaxReadBlockRowCount(getParquetMaxReadBlockRowCount(session)) .withSmallFileThreshold(getParquetSmallFileThreshold(session)) - .withUseColumnIndex(isParquetUseColumnIndex(session)); + .withUseColumnIndex(isParquetUseColumnIndex(session)) + .withIgnoreStatistics(isParquetIgnoreStatistics(session)); Map parquetFieldIdToName = columnMappingMode == ColumnMappingMode.ID ? loadParquetIdAndNameMapping(inputFile, options) : ImmutableMap.of(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java index 753ca07a8069..065ac2e187fa 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java @@ -56,6 +56,7 @@ public final class DeltaLakeSessionProperties private static final String PARQUET_MAX_READ_BLOCK_ROW_COUNT = "parquet_max_read_block_row_count"; private static final String PARQUET_SMALL_FILE_THRESHOLD = "parquet_small_file_threshold"; private static final String PARQUET_USE_COLUMN_INDEX = "parquet_use_column_index"; + private static final String PARQUET_IGNORE_STATISTICS = "parquet_ignore_statistics"; private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size"; private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size"; private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size"; @@ -131,6 +132,11 @@ public DeltaLakeSessionProperties( "Use Parquet column index", parquetReaderConfig.isUseColumnIndex(), false), + booleanProperty( + PARQUET_IGNORE_STATISTICS, + "Ignore statistics from Parquet to allow querying files with corrupted or incorrect statistics", + parquetReaderConfig.isIgnoreStatistics(), + false), dataSizeProperty( PARQUET_WRITER_BLOCK_SIZE, "Parquet: Writer block size", @@ -257,6 +263,11 @@ public static boolean isParquetUseColumnIndex(ConnectorSession session) return session.getProperty(PARQUET_USE_COLUMN_INDEX, Boolean.class); } + public static boolean isParquetIgnoreStatistics(ConnectorSession session) + { + return session.getProperty(PARQUET_IGNORE_STATISTICS, Boolean.class); + } + public static DataSize getParquetWriterBlockSize(ConnectorSession session) { return session.getProperty(PARQUET_WRITER_BLOCK_SIZE, DataSize.class); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java index d5cf1f482538..d18e32462a17 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java @@ -47,6 +47,7 @@ import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockRowCount; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockSize; +import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetIgnoreStatistics; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetUseColumnIndex; import static io.trino.plugin.deltalake.functions.tablechanges.TableChangesFileType.CDF_FILE; import static io.trino.spi.function.table.TableFunctionProcessorState.Finished.FINISHED; @@ -175,7 +176,8 @@ private static DeltaLakePageSource createDeltaLakePageSource( parquetReaderOptions = parquetReaderOptions .withMaxReadBlockSize(getParquetMaxReadBlockSize(session)) .withMaxReadBlockRowCount(getParquetMaxReadBlockRowCount(session)) - .withUseColumnIndex(isParquetUseColumnIndex(session)); + .withUseColumnIndex(isParquetUseColumnIndex(session)) + .withIgnoreStatistics(isParquetIgnoreStatistics(session)); List splitColumns = switch (split.fileType()) { case CDF_FILE -> ImmutableList.builder().addAll(handle.columns()) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestPredicatePushdown.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestPredicatePushdown.java index ec95becbea82..f09ef7ed3d2f 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestPredicatePushdown.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestPredicatePushdown.java @@ -15,21 +15,27 @@ import com.google.common.collect.ContiguousSet; import com.google.common.collect.ImmutableMap; +import io.trino.Session; +import io.trino.operator.OperatorStats; import io.trino.plugin.hive.containers.HiveMinioDataLake; import io.trino.spi.QueryId; import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; +import org.intellij.lang.annotations.Language; import org.junit.jupiter.api.Test; import java.nio.file.Path; import java.util.OptionalLong; import java.util.Set; +import static com.google.common.collect.MoreCollectors.onlyElement; import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG; import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.createS3DeltaLakeQueryRunner; +import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; import static io.trino.testing.TestingNames.randomNameSuffix; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; @@ -130,6 +136,42 @@ public void testUpdatePushdown() table)); } + @Test + public void testIgnoreParquetStatistics() + { + String table = testTable.register("ignore_parquet_statistics"); + @Language("SQL") String query = "SELECT * FROM " + table + " WHERE custkey = 1450"; + + DistributedQueryRunner queryRunner = getDistributedQueryRunner(); + MaterializedResultWithQueryId resultWithoutParquetStatistics = queryRunner.executeWithQueryId( + Session.builder(getSession()) + .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "parquet_ignore_statistics", "true") + .build(), + query); + OperatorStats queryStatsWithoutParquetStatistics = getOperatorStats(resultWithoutParquetStatistics.getQueryId()); + assertThat(queryStatsWithoutParquetStatistics.getPhysicalInputPositions()).isGreaterThan(0); + + MaterializedResultWithQueryId resultWithParquetStatistics = queryRunner.executeWithQueryId(getSession(), query); + OperatorStats queryStatsWithParquetStatistics = getOperatorStats(resultWithParquetStatistics.getQueryId()); + assertThat(queryStatsWithParquetStatistics.getPhysicalInputPositions()).isGreaterThan(0); + assertThat(queryStatsWithParquetStatistics.getPhysicalInputPositions()) + .isLessThan(queryStatsWithoutParquetStatistics.getPhysicalInputPositions()); + + assertEqualsIgnoreOrder(resultWithParquetStatistics.getResult(), resultWithoutParquetStatistics.getResult()); + } + + private OperatorStats getOperatorStats(QueryId queryId) + { + return getDistributedQueryRunner().getCoordinator() + .getQueryManager() + .getFullQueryInfo(queryId) + .getQueryStats() + .getOperatorSummaries() + .stream() + .filter(summary -> summary.getOperatorType().startsWith("TableScan") || summary.getOperatorType().startsWith("Scan")) + .collect(onlyElement()); + } + /** * Assert on the number of rows read and updated by a read operation *