From 00ddd6c3e0f35c22c16786c7d3e6c777023fc6e0 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 17 Oct 2023 12:19:55 +0200 Subject: [PATCH] Avoid redundant MV freshness calculation when result unused Avoid `getMaterializedViewFreshness` call when reading `system.metadata.materialized_views` and `freshness`, `last_fresh_time` columns are neither predicated on, nor selected by the query. --- .../system/MaterializedViewSystemTable.java | 48 +++++++++++++------ .../system/SystemPageSourceProvider.java | 17 +++++-- .../io/trino/spi/connector/SystemTable.java | 7 +++ .../ClassLoaderSafeSystemTable.java | 10 ++++ .../TestIcebergMetadataFileOperations.java | 4 +- .../TestIcebergMetastoreAccessOperations.java | 4 +- 6 files changed, 69 insertions(+), 21 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/connector/system/MaterializedViewSystemTable.java b/core/trino-main/src/main/java/io/trino/connector/system/MaterializedViewSystemTable.java index d12e7074b66d..8af514cbaa46 100644 --- a/core/trino-main/src/main/java/io/trino/connector/system/MaterializedViewSystemTable.java +++ b/core/trino-main/src/main/java/io/trino/connector/system/MaterializedViewSystemTable.java @@ -36,8 +36,12 @@ import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.LongTimestampWithTimeZone; +import java.util.Map.Entry; import java.util.Optional; +import java.util.Set; +import static com.google.common.collect.MoreCollectors.onlyElement; +import static com.google.common.collect.Streams.mapWithIndex; import static io.trino.connector.system.jdbc.FilterUtil.isImpossibleObjectName; import static io.trino.connector.system.jdbc.FilterUtil.tablePrefix; import static io.trino.connector.system.jdbc.FilterUtil.tryGetSingleVarcharValue; @@ -50,6 +54,8 @@ import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND; import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; +import static java.lang.Math.toIntExact; +import static java.util.Map.entry; import static java.util.Objects.requireNonNull; public class MaterializedViewSystemTable @@ -95,7 +101,8 @@ public ConnectorTableMetadata getTableMetadata() public RecordCursor cursor( ConnectorTransactionHandle transactionHandle, ConnectorSession connectorSession, - TupleDomain constraint) + TupleDomain constraint, + Set requiredColumns) { Session session = ((FullConnectorSession) connectorSession).getSession(); InMemoryRecordSet.Builder displayTable = InMemoryRecordSet.builder(getTableMetadata()); @@ -109,6 +116,7 @@ public RecordCursor cursor( } Optional tableFilter = tryGetSingleVarcharValue(tableDomain); + boolean needFreshness = requiredColumns.contains(columnIndex("freshness")) || requiredColumns.contains(columnIndex("last_fresh_time")); listCatalogNames(session, metadata, accessControl, catalogDomain).forEach(catalogName -> { // TODO A connector may be able to pull information from multiple schemas at once, so pass the schema filter to the connector instead. @@ -119,29 +127,31 @@ public RecordCursor cursor( if (isImpossibleObjectName(schemaName)) { continue; } - addMaterializedViewForCatalog(session, displayTable, tablePrefix(catalogName, Optional.of(schemaName), tableFilter)); + addMaterializedViewForCatalog(session, displayTable, tablePrefix(catalogName, Optional.of(schemaName), tableFilter), needFreshness); } } else { - addMaterializedViewForCatalog(session, displayTable, tablePrefix(catalogName, Optional.empty(), tableFilter)); + addMaterializedViewForCatalog(session, displayTable, tablePrefix(catalogName, Optional.empty(), tableFilter), needFreshness); } }); return displayTable.build().cursor(); } - private void addMaterializedViewForCatalog(Session session, InMemoryRecordSet.Builder displayTable, QualifiedTablePrefix tablePrefix) + private void addMaterializedViewForCatalog(Session session, InMemoryRecordSet.Builder displayTable, QualifiedTablePrefix tablePrefix, boolean needFreshness) { getMaterializedViews(session, metadata, accessControl, tablePrefix).forEach((tableName, definition) -> { QualifiedObjectName name = new QualifiedObjectName(tablePrefix.getCatalogName(), tableName.getSchemaName(), tableName.getTableName()); - MaterializedViewFreshness freshness; + Optional freshness = Optional.empty(); - try { - freshness = metadata.getMaterializedViewFreshness(session, name); - } - catch (MaterializedViewNotFoundException e) { - // Ignore materialized view that was dropped during query execution (race condition) - return; + if (needFreshness) { + try { + freshness = Optional.of(metadata.getMaterializedViewFreshness(session, name)); + } + catch (MaterializedViewNotFoundException e) { + // Ignore materialized view that was dropped during query execution (race condition) + return; + } } Object[] materializedViewRow = createMaterializedViewRow(name, freshness, definition); @@ -151,7 +161,7 @@ private void addMaterializedViewForCatalog(Session session, InMemoryRecordSet.Bu private static Object[] createMaterializedViewRow( QualifiedObjectName name, - MaterializedViewFreshness freshness, + Optional freshness, ViewInfo definition) { return new Object[] { @@ -168,9 +178,11 @@ private static Object[] createMaterializedViewRow( .map(storageTable -> storageTable.getSchemaTableName().getTableName()) .orElse(""), // freshness - freshness.getFreshness().name(), + freshness.map(MaterializedViewFreshness::getFreshness) + .map(Enum::name) + .orElse(null), // last_fresh_time - freshness.getLastFreshTime() + freshness.flatMap(MaterializedViewFreshness::getLastFreshTime) .map(instant -> LongTimestampWithTimeZone.fromEpochSecondsAndFraction( instant.getEpochSecond(), (long) instant.getNano() * PICOSECONDS_PER_NANOSECOND, @@ -180,4 +192,12 @@ private static Object[] createMaterializedViewRow( definition.getOriginalSql() }; } + + private static int columnIndex(String columnName) + { + return toIntExact(mapWithIndex(TABLE_DEFINITION.getColumns().stream(), (column, index) -> entry(column.getName(), index)) + .filter(entry -> entry.getKey().equals(columnName)) + .map(Entry::getValue) + .collect(onlyElement())); + } } diff --git a/core/trino-main/src/main/java/io/trino/connector/system/SystemPageSourceProvider.java b/core/trino-main/src/main/java/io/trino/connector/system/SystemPageSourceProvider.java index 952f8486e8ad..66816f39bfe4 100644 --- a/core/trino-main/src/main/java/io/trino/connector/system/SystemPageSourceProvider.java +++ b/core/trino-main/src/main/java/io/trino/connector/system/SystemPageSourceProvider.java @@ -14,6 +14,7 @@ package io.trino.connector.system; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import io.trino.plugin.base.MappedPageSource; import io.trino.plugin.base.MappedRecordSet; import io.trino.spi.TrinoException; @@ -38,6 +39,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; @@ -83,6 +85,7 @@ public ConnectorPageSource createPageSource( } ImmutableList.Builder userToSystemFieldIndex = ImmutableList.builder(); + ImmutableSet.Builder requiredColumns = ImmutableSet.builder(); for (ColumnHandle column : columns) { String columnName = ((SystemColumnHandle) column).getColumnName(); @@ -92,6 +95,7 @@ public ConnectorPageSource createPageSource( } userToSystemFieldIndex.add(index); + requiredColumns.add(index); } TupleDomain constraint = systemSplit.getConstraint(); @@ -105,11 +109,18 @@ public ConnectorPageSource createPageSource( return new MappedPageSource(systemTable.pageSource(systemTransaction.getConnectorTransactionHandle(), session, newConstraint), userToSystemFieldIndex.build()); } catch (UnsupportedOperationException e) { - return new RecordPageSource(new MappedRecordSet(toRecordSet(systemTransaction.getConnectorTransactionHandle(), systemTable, session, newConstraint), userToSystemFieldIndex.build())); + return new RecordPageSource(new MappedRecordSet( + toRecordSet( + systemTransaction.getConnectorTransactionHandle(), + systemTable, + session, + newConstraint, + requiredColumns.build()), + userToSystemFieldIndex.build())); } } - private static RecordSet toRecordSet(ConnectorTransactionHandle sourceTransaction, SystemTable table, ConnectorSession session, TupleDomain constraint) + private static RecordSet toRecordSet(ConnectorTransactionHandle sourceTransaction, SystemTable table, ConnectorSession session, TupleDomain constraint, Set requiredColumns) { return new RecordSet() { @@ -126,7 +137,7 @@ public List getColumnTypes() @Override public RecordCursor cursor() { - return table.cursor(sourceTransaction, session, constraint); + return table.cursor(sourceTransaction, session, constraint, requiredColumns); } }; } diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/SystemTable.java b/core/trino-spi/src/main/java/io/trino/spi/connector/SystemTable.java index ad5af77d9ebc..a5313f7a4f75 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/SystemTable.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/SystemTable.java @@ -15,6 +15,8 @@ import io.trino.spi.predicate.TupleDomain; +import java.util.Set; + /** * Exactly one of {@link #cursor} or {@link #pageSource} must be implemented. */ @@ -40,6 +42,11 @@ default RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connec throw new UnsupportedOperationException(); } + default RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint, Set requiredColumns) + { + return cursor(transactionHandle, session, constraint); + } + /** * Create a page source for the data in this table. * diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeSystemTable.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeSystemTable.java index 29aa46ef4e06..a2bffa436b28 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeSystemTable.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeSystemTable.java @@ -23,6 +23,8 @@ import io.trino.spi.connector.SystemTable; import io.trino.spi.predicate.TupleDomain; +import java.util.Set; + import static java.util.Objects.requireNonNull; public class ClassLoaderSafeSystemTable @@ -62,6 +64,14 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect } } + @Override + public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint, Set requiredColumns) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.cursor(transactionHandle, session, constraint, requiredColumns); + } + } + @Override public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java index e9554c138826..eddd2a409eae 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java @@ -715,7 +715,7 @@ public void testSystemMetadataMaterializedViews() // Bulk retrieval without selecting freshness assertFileSystemAccesses(session, "SELECT schema_name, name FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA", ImmutableMultiset.builder() - .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 4) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 2) .build()); // Bulk retrieval for two schemas @@ -733,7 +733,7 @@ public void testSystemMetadataMaterializedViews() // Pointed lookup without selecting freshness assertFileSystemAccesses(session, "SELECT schema_name, name FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA AND name = 'mv1'", ImmutableMultiset.builder() - .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 3) + .add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM)) .build()); assertUpdate("DROP SCHEMA " + schemaName + " CASCADE"); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java index e7187ea60e98..5a759f918532 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java @@ -456,7 +456,7 @@ public void testSystemMetadataMaterializedViews() assertMetastoreInvocations(session, "SELECT schema_name, name FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA", ImmutableMultiset.builder() .add(GET_TABLES_WITH_PARAMETER) - .addCopies(GET_TABLE, 6) + .addCopies(GET_TABLE, 4) .build()); // Bulk retrieval for two schemas @@ -475,7 +475,7 @@ public void testSystemMetadataMaterializedViews() // Pointed lookup without selecting freshness assertMetastoreInvocations(session, "SELECT schema_name, name FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA AND name = 'mv1'", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 4) + .addCopies(GET_TABLE, 2) .build()); assertUpdate("DROP SCHEMA " + schemaName + " CASCADE");