Skip to content

Commit

Permalink
Avoid redundant MV freshness calculation when result unused
Browse files Browse the repository at this point in the history
Avoid `getMaterializedViewFreshness` call when reading
`system.metadata.materialized_views` and `freshness`, `last_fresh_time`
columns are neither predicated on, nor selected by the query.
  • Loading branch information
findepi committed Oct 18, 2023
1 parent 6ed291e commit 00ddd6c
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.LongTimestampWithTimeZone;

import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;

import static com.google.common.collect.MoreCollectors.onlyElement;
import static com.google.common.collect.Streams.mapWithIndex;
import static io.trino.connector.system.jdbc.FilterUtil.isImpossibleObjectName;
import static io.trino.connector.system.jdbc.FilterUtil.tablePrefix;
import static io.trino.connector.system.jdbc.FilterUtil.tryGetSingleVarcharValue;
Expand All @@ -50,6 +54,8 @@
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
import static java.lang.Math.toIntExact;
import static java.util.Map.entry;
import static java.util.Objects.requireNonNull;

public class MaterializedViewSystemTable
Expand Down Expand Up @@ -95,7 +101,8 @@ public ConnectorTableMetadata getTableMetadata()
public RecordCursor cursor(
ConnectorTransactionHandle transactionHandle,
ConnectorSession connectorSession,
TupleDomain<Integer> constraint)
TupleDomain<Integer> constraint,
Set<Integer> requiredColumns)
{
Session session = ((FullConnectorSession) connectorSession).getSession();
InMemoryRecordSet.Builder displayTable = InMemoryRecordSet.builder(getTableMetadata());
Expand All @@ -109,6 +116,7 @@ public RecordCursor cursor(
}

Optional<String> tableFilter = tryGetSingleVarcharValue(tableDomain);
boolean needFreshness = requiredColumns.contains(columnIndex("freshness")) || requiredColumns.contains(columnIndex("last_fresh_time"));

listCatalogNames(session, metadata, accessControl, catalogDomain).forEach(catalogName -> {
// TODO A connector may be able to pull information from multiple schemas at once, so pass the schema filter to the connector instead.
Expand All @@ -119,29 +127,31 @@ public RecordCursor cursor(
if (isImpossibleObjectName(schemaName)) {
continue;
}
addMaterializedViewForCatalog(session, displayTable, tablePrefix(catalogName, Optional.of(schemaName), tableFilter));
addMaterializedViewForCatalog(session, displayTable, tablePrefix(catalogName, Optional.of(schemaName), tableFilter), needFreshness);
}
}
else {
addMaterializedViewForCatalog(session, displayTable, tablePrefix(catalogName, Optional.empty(), tableFilter));
addMaterializedViewForCatalog(session, displayTable, tablePrefix(catalogName, Optional.empty(), tableFilter), needFreshness);
}
});

return displayTable.build().cursor();
}

private void addMaterializedViewForCatalog(Session session, InMemoryRecordSet.Builder displayTable, QualifiedTablePrefix tablePrefix)
private void addMaterializedViewForCatalog(Session session, InMemoryRecordSet.Builder displayTable, QualifiedTablePrefix tablePrefix, boolean needFreshness)
{
getMaterializedViews(session, metadata, accessControl, tablePrefix).forEach((tableName, definition) -> {
QualifiedObjectName name = new QualifiedObjectName(tablePrefix.getCatalogName(), tableName.getSchemaName(), tableName.getTableName());
MaterializedViewFreshness freshness;
Optional<MaterializedViewFreshness> freshness = Optional.empty();

try {
freshness = metadata.getMaterializedViewFreshness(session, name);
}
catch (MaterializedViewNotFoundException e) {
// Ignore materialized view that was dropped during query execution (race condition)
return;
if (needFreshness) {
try {
freshness = Optional.of(metadata.getMaterializedViewFreshness(session, name));
}
catch (MaterializedViewNotFoundException e) {
// Ignore materialized view that was dropped during query execution (race condition)
return;
}
}

Object[] materializedViewRow = createMaterializedViewRow(name, freshness, definition);
Expand All @@ -151,7 +161,7 @@ private void addMaterializedViewForCatalog(Session session, InMemoryRecordSet.Bu

private static Object[] createMaterializedViewRow(
QualifiedObjectName name,
MaterializedViewFreshness freshness,
Optional<MaterializedViewFreshness> freshness,
ViewInfo definition)
{
return new Object[] {
Expand All @@ -168,9 +178,11 @@ private static Object[] createMaterializedViewRow(
.map(storageTable -> storageTable.getSchemaTableName().getTableName())
.orElse(""),
// freshness
freshness.getFreshness().name(),
freshness.map(MaterializedViewFreshness::getFreshness)
.map(Enum::name)
.orElse(null),
// last_fresh_time
freshness.getLastFreshTime()
freshness.flatMap(MaterializedViewFreshness::getLastFreshTime)
.map(instant -> LongTimestampWithTimeZone.fromEpochSecondsAndFraction(
instant.getEpochSecond(),
(long) instant.getNano() * PICOSECONDS_PER_NANOSECOND,
Expand All @@ -180,4 +192,12 @@ private static Object[] createMaterializedViewRow(
definition.getOriginalSql()
};
}

private static int columnIndex(String columnName)
{
return toIntExact(mapWithIndex(TABLE_DEFINITION.getColumns().stream(), (column, index) -> entry(column.getName(), index))
.filter(entry -> entry.getKey().equals(columnName))
.map(Entry::getValue)
.collect(onlyElement()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.connector.system;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.trino.plugin.base.MappedPageSource;
import io.trino.plugin.base.MappedRecordSet;
import io.trino.spi.TrinoException;
Expand All @@ -38,6 +39,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
Expand Down Expand Up @@ -83,6 +85,7 @@ public ConnectorPageSource createPageSource(
}

ImmutableList.Builder<Integer> userToSystemFieldIndex = ImmutableList.builder();
ImmutableSet.Builder<Integer> requiredColumns = ImmutableSet.builder();
for (ColumnHandle column : columns) {
String columnName = ((SystemColumnHandle) column).getColumnName();

Expand All @@ -92,6 +95,7 @@ public ConnectorPageSource createPageSource(
}

userToSystemFieldIndex.add(index);
requiredColumns.add(index);
}

TupleDomain<ColumnHandle> constraint = systemSplit.getConstraint();
Expand All @@ -105,11 +109,18 @@ public ConnectorPageSource createPageSource(
return new MappedPageSource(systemTable.pageSource(systemTransaction.getConnectorTransactionHandle(), session, newConstraint), userToSystemFieldIndex.build());
}
catch (UnsupportedOperationException e) {
return new RecordPageSource(new MappedRecordSet(toRecordSet(systemTransaction.getConnectorTransactionHandle(), systemTable, session, newConstraint), userToSystemFieldIndex.build()));
return new RecordPageSource(new MappedRecordSet(
toRecordSet(
systemTransaction.getConnectorTransactionHandle(),
systemTable,
session,
newConstraint,
requiredColumns.build()),
userToSystemFieldIndex.build()));
}
}

private static RecordSet toRecordSet(ConnectorTransactionHandle sourceTransaction, SystemTable table, ConnectorSession session, TupleDomain<Integer> constraint)
private static RecordSet toRecordSet(ConnectorTransactionHandle sourceTransaction, SystemTable table, ConnectorSession session, TupleDomain<Integer> constraint, Set<Integer> requiredColumns)
{
return new RecordSet()
{
Expand All @@ -126,7 +137,7 @@ public List<Type> getColumnTypes()
@Override
public RecordCursor cursor()
{
return table.cursor(sourceTransaction, session, constraint);
return table.cursor(sourceTransaction, session, constraint, requiredColumns);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import io.trino.spi.predicate.TupleDomain;

import java.util.Set;

/**
* Exactly one of {@link #cursor} or {@link #pageSource} must be implemented.
*/
Expand All @@ -40,6 +42,11 @@ default RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connec
throw new UnsupportedOperationException();
}

default RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint, Set<Integer> requiredColumns)
{
return cursor(transactionHandle, session, constraint);
}

/**
* Create a page source for the data in this table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.TupleDomain;

import java.util.Set;

import static java.util.Objects.requireNonNull;

public class ClassLoaderSafeSystemTable
Expand Down Expand Up @@ -62,6 +64,14 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
}
}

@Override
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint, Set<Integer> requiredColumns)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.cursor(transactionHandle, session, constraint, requiredColumns);
}
}

@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ public void testSystemMetadataMaterializedViews()
// Bulk retrieval without selecting freshness
assertFileSystemAccesses(session, "SELECT schema_name, name FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 4)
.addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 2)
.build());

// Bulk retrieval for two schemas
Expand All @@ -733,7 +733,7 @@ public void testSystemMetadataMaterializedViews()
// Pointed lookup without selecting freshness
assertFileSystemAccesses(session, "SELECT schema_name, name FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA AND name = 'mv1'",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 3)
.add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM))
.build());

assertUpdate("DROP SCHEMA " + schemaName + " CASCADE");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ public void testSystemMetadataMaterializedViews()
assertMetastoreInvocations(session, "SELECT schema_name, name FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA",
ImmutableMultiset.builder()
.add(GET_TABLES_WITH_PARAMETER)
.addCopies(GET_TABLE, 6)
.addCopies(GET_TABLE, 4)
.build());

// Bulk retrieval for two schemas
Expand All @@ -475,7 +475,7 @@ public void testSystemMetadataMaterializedViews()
// Pointed lookup without selecting freshness
assertMetastoreInvocations(session, "SELECT schema_name, name FROM system.metadata.materialized_views WHERE schema_name = CURRENT_SCHEMA AND name = 'mv1'",
ImmutableMultiset.builder()
.addCopies(GET_TABLE, 4)
.addCopies(GET_TABLE, 2)
.build());

assertUpdate("DROP SCHEMA " + schemaName + " CASCADE");
Expand Down

0 comments on commit 00ddd6c

Please sign in to comment.