Skip to content

Commit

Permalink
Add parquet_ignore_statistics session property to delta lake
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka committed Jan 2, 2024
1 parent de86270 commit 3517ae7
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockRowCount;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockSize;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetSmallFileThreshold;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetIgnoreStatistics;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetUseColumnIndex;
import static io.trino.plugin.deltalake.delete.DeletionVectors.readDeletionVectors;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
Expand Down Expand Up @@ -204,7 +205,8 @@ public ConnectorPageSource createPageSource(
ParquetReaderOptions options = parquetReaderOptions.withMaxReadBlockSize(getParquetMaxReadBlockSize(session))
.withMaxReadBlockRowCount(getParquetMaxReadBlockRowCount(session))
.withSmallFileThreshold(getParquetSmallFileThreshold(session))
.withUseColumnIndex(isParquetUseColumnIndex(session));
.withUseColumnIndex(isParquetUseColumnIndex(session))
.withIgnoreStatistics(isParquetIgnoreStatistics(session));

Map<Integer, String> parquetFieldIdToName = columnMappingMode == ColumnMappingMode.ID ? loadParquetIdAndNameMapping(inputFile, options) : ImmutableMap.of();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public final class DeltaLakeSessionProperties
private static final String PARQUET_MAX_READ_BLOCK_ROW_COUNT = "parquet_max_read_block_row_count";
private static final String PARQUET_SMALL_FILE_THRESHOLD = "parquet_small_file_threshold";
private static final String PARQUET_USE_COLUMN_INDEX = "parquet_use_column_index";
private static final String PARQUET_IGNORE_STATISTICS = "parquet_ignore_statistics";
private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size";
private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size";
private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size";
Expand Down Expand Up @@ -131,6 +132,11 @@ public DeltaLakeSessionProperties(
"Use Parquet column index",
parquetReaderConfig.isUseColumnIndex(),
false),
booleanProperty(
PARQUET_IGNORE_STATISTICS,
"Ignore statistics from Parquet to allow querying files with corrupted or incorrect statistics",
parquetReaderConfig.isIgnoreStatistics(),
false),
dataSizeProperty(
PARQUET_WRITER_BLOCK_SIZE,
"Parquet: Writer block size",
Expand Down Expand Up @@ -257,6 +263,11 @@ public static boolean isParquetUseColumnIndex(ConnectorSession session)
return session.getProperty(PARQUET_USE_COLUMN_INDEX, Boolean.class);
}

public static boolean isParquetIgnoreStatistics(ConnectorSession session)
{
return session.getProperty(PARQUET_IGNORE_STATISTICS, Boolean.class);
}

public static DataSize getParquetWriterBlockSize(ConnectorSession session)
{
return session.getProperty(PARQUET_WRITER_BLOCK_SIZE, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockRowCount;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockSize;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetIgnoreStatistics;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetUseColumnIndex;
import static io.trino.plugin.deltalake.functions.tablechanges.TableChangesFileType.CDF_FILE;
import static io.trino.spi.function.table.TableFunctionProcessorState.Finished.FINISHED;
Expand Down Expand Up @@ -175,7 +176,8 @@ private static DeltaLakePageSource createDeltaLakePageSource(
parquetReaderOptions = parquetReaderOptions
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session))
.withMaxReadBlockRowCount(getParquetMaxReadBlockRowCount(session))
.withUseColumnIndex(isParquetUseColumnIndex(session));
.withUseColumnIndex(isParquetUseColumnIndex(session))
.withIgnoreStatistics(isParquetIgnoreStatistics(session));

List<DeltaLakeColumnHandle> splitColumns = switch (split.fileType()) {
case CDF_FILE -> ImmutableList.<DeltaLakeColumnHandle>builder().addAll(handle.columns())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,27 @@

import com.google.common.collect.ContiguousSet;
import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.operator.OperatorStats;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.spi.QueryId;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResult;
import io.trino.testing.MaterializedResultWithQueryId;
import io.trino.testing.MaterializedRow;
import io.trino.testing.QueryRunner;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.Test;

import java.nio.file.Path;
import java.util.OptionalLong;
import java.util.Set;

import static com.google.common.collect.MoreCollectors.onlyElement;
import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG;
import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.createS3DeltaLakeQueryRunner;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -130,6 +136,42 @@ public void testUpdatePushdown()
table));
}

@Test
public void testIgnoreParquetStatistics()
{
String table = testTable.register("ignore_parquet_statistics");
@Language("SQL") String query = "SELECT * FROM " + table + " WHERE custkey = 1450";

DistributedQueryRunner queryRunner = getDistributedQueryRunner();
MaterializedResultWithQueryId resultWithoutParquetStatistics = queryRunner.executeWithQueryId(
Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "parquet_ignore_statistics", "true")
.build(),
query);
OperatorStats queryStatsWithoutParquetStatistics = getOperatorStats(resultWithoutParquetStatistics.getQueryId());
assertThat(queryStatsWithoutParquetStatistics.getPhysicalInputPositions()).isGreaterThan(0);

MaterializedResultWithQueryId resultWithParquetStatistics = queryRunner.executeWithQueryId(getSession(), query);
OperatorStats queryStatsWithParquetStatistics = getOperatorStats(resultWithParquetStatistics.getQueryId());
assertThat(queryStatsWithParquetStatistics.getPhysicalInputPositions()).isGreaterThan(0);
assertThat(queryStatsWithParquetStatistics.getPhysicalInputPositions())
.isLessThan(queryStatsWithoutParquetStatistics.getPhysicalInputPositions());

assertEqualsIgnoreOrder(resultWithParquetStatistics.getResult(), resultWithoutParquetStatistics.getResult());
}

private OperatorStats getOperatorStats(QueryId queryId)
{
return getDistributedQueryRunner().getCoordinator()
.getQueryManager()
.getFullQueryInfo(queryId)
.getQueryStats()
.getOperatorSummaries()
.stream()
.filter(summary -> summary.getOperatorType().startsWith("TableScan") || summary.getOperatorType().startsWith("Scan"))
.collect(onlyElement());
}

/**
* Assert on the number of rows read and updated by a read operation
*
Expand Down

0 comments on commit 3517ae7

Please sign in to comment.