Skip to content

Commit

Permalink
Improve performance when listing columns in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
shohamyamin authored and ebyhr committed Dec 23, 2024
1 parent 715eb7b commit 4f48087
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -228,6 +231,7 @@
import static io.trino.plugin.base.filter.UtcConstraintExtractor.extractTupleDomain;
import static io.trino.plugin.base.projection.ApplyProjectionUtil.extractSupportedProjectedColumns;
import static io.trino.plugin.base.projection.ApplyProjectionUtil.replaceWithNewVariables;
import static io.trino.plugin.base.util.ExecutorUtil.processWithAdditionalThreads;
import static io.trino.plugin.base.util.Procedures.checkProcedureArgument;
import static io.trino.plugin.hive.HiveMetadata.TRANSACTIONAL;
import static io.trino.plugin.hive.HiveTimestampPrecision.DEFAULT_PRECISION;
Expand Down Expand Up @@ -407,6 +411,7 @@ public class IcebergMetadata
private final boolean addFilesProcedureEnabled;
private final Predicate<String> allowedExtraProperties;
private final ExecutorService icebergScanExecutor;
private final Executor metadataFetchingExecutor;

private final Map<IcebergTableHandle, AtomicReference<TableStatistics>> tableStatisticsCache = new ConcurrentHashMap<>();

Expand All @@ -423,7 +428,8 @@ public IcebergMetadata(
Optional<HiveMetastoreFactory> metastoreFactory,
boolean addFilesProcedureEnabled,
Predicate<String> allowedExtraProperties,
ExecutorService icebergScanExecutor)
ExecutorService icebergScanExecutor,
Executor metadataFetchingExecutor)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.trinoCatalogHandle = requireNonNull(trinoCatalogHandle, "trinoCatalogHandle is null");
Expand All @@ -435,6 +441,7 @@ public IcebergMetadata(
this.addFilesProcedureEnabled = addFilesProcedureEnabled;
this.allowedExtraProperties = requireNonNull(allowedExtraProperties, "allowedExtraProperties is null");
this.icebergScanExecutor = requireNonNull(icebergScanExecutor, "icebergScanExecutor is null");
this.metadataFetchingExecutor = requireNonNull(metadataFetchingExecutor, "metadataFetchingExecutor is null");
}

@Override
Expand Down Expand Up @@ -982,23 +989,40 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
tableMetadatas.add(TableColumnsMetadata.forTable(tableName, columns));
});

for (SchemaTableName tableName : remainingTables) {
try {
Table icebergTable = catalog.loadTable(session, tableName);
List<ColumnMetadata> columns = getColumnMetadatas(icebergTable.schema(), typeManager);
tableMetadatas.add(TableColumnsMetadata.forTable(tableName, columns));
}
catch (TableNotFoundException e) {
// Table disappeared during listing operation
}
catch (UnknownTableTypeException e) {
// Skip unsupported table type in case that the table redirects are not enabled
}
catch (RuntimeException e) {
// Table can be being removed and this may cause all sorts of exceptions. Log, because we're catching broadly.
log.warn(e, "Failed to access metadata of table %s during streaming table columns for %s", tableName, prefix);
}
List<Callable<Optional<TableColumnsMetadata>>> tasks = remainingTables.stream()
.map(tableName -> (Callable<Optional<TableColumnsMetadata>>) () -> {
try {
Table icebergTable = catalog.loadTable(session, tableName);
List<ColumnMetadata> columns = getColumnMetadatas(icebergTable.schema(), typeManager);
return Optional.of(TableColumnsMetadata.forTable(tableName, columns));
}
catch (TableNotFoundException e) {
// Table disappeared during listing operation
return Optional.empty();
}
catch (UnknownTableTypeException e) {
// Skip unsupported table type in case that the table redirects are not enabled
return Optional.empty();
}
catch (RuntimeException e) {
// Table can be being removed and this may cause all sorts of exceptions. Log, because we're catching broadly.
log.warn(e, "Failed to access metadata of table %s during streaming table columns for %s", tableName, prefix);
return Optional.empty();
}
})
.collect(toImmutableList());

try {
List<TableColumnsMetadata> taskResults = processWithAdditionalThreads(tasks, metadataFetchingExecutor).stream()
.flatMap(Optional::stream) // Flatten the Optionals into a stream
.collect(toImmutableList());

tableMetadatas.addAll(taskResults);
}
catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}

return tableMetadatas.build();
})
.flatMap(List::stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.json.JsonCodec;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.RawHiveMetastoreFactory;
Expand All @@ -25,9 +26,11 @@
import io.trino.spi.type.TypeManager;

import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;

public class IcebergMetadataFactory
Expand All @@ -42,6 +45,7 @@ public class IcebergMetadataFactory
private final boolean addFilesProcedureEnabled;
private final Predicate<String> allowedExtraProperties;
private final ExecutorService icebergScanExecutor;
private final Executor metadataFetchingExecutor;

@Inject
public IcebergMetadataFactory(
Expand All @@ -53,6 +57,7 @@ public IcebergMetadataFactory(
TableStatisticsWriter tableStatisticsWriter,
@RawHiveMetastoreFactory Optional<HiveMetastoreFactory> metastoreFactory,
@ForIcebergScanPlanning ExecutorService icebergScanExecutor,
@ForIcebergMetadata ExecutorService metadataExecutorService,
IcebergConfig config)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand All @@ -70,6 +75,13 @@ public IcebergMetadataFactory(
else {
this.allowedExtraProperties = ImmutableSet.copyOf(requireNonNull(config.getAllowedExtraProperties(), "allowedExtraProperties is null"))::contains;
}

if (config.getMetadataParallelism() == 1) {
this.metadataFetchingExecutor = directExecutor();
}
else {
this.metadataFetchingExecutor = new BoundedExecutor(metadataExecutorService, config.getMetadataParallelism());
}
}

public IcebergMetadata create(ConnectorIdentity identity)
Expand All @@ -84,6 +96,7 @@ public IcebergMetadata create(ConnectorIdentity identity)
metastoreFactory,
addFilesProcedureEnabled,
allowedExtraProperties,
icebergScanExecutor);
icebergScanExecutor,
metadataFetchingExecutor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Optional;
import java.util.UUID;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.trino.metastore.TableInfo.ExtendedRelationType.TABLE;
Expand Down Expand Up @@ -122,7 +123,8 @@ public void testNonLowercaseNamespace()
Optional.empty(),
false,
_ -> false,
newDirectExecutorService());
newDirectExecutorService(),
directExecutor());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isFalse();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ public void testNonLowercaseGlueDatabase()
Optional.empty(),
false,
_ -> false,
newDirectExecutorService());
newDirectExecutorService(),
directExecutor());
assertThat(icebergMetadata.schemaExists(SESSION, databaseName)).as("icebergMetadata.schemaExists(databaseName)")
.isFalse();
assertThat(icebergMetadata.schemaExists(SESSION, trinoSchemaName)).as("icebergMetadata.schemaExists(trinoSchemaName)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Map;
import java.util.Optional;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
Expand Down Expand Up @@ -191,7 +192,8 @@ public void testNonLowercaseNamespace()
Optional.empty(),
false,
_ -> false,
newDirectExecutorService());
newDirectExecutorService(),
directExecutor());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Optional;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.trino.metastore.TableInfo.ExtendedRelationType.OTHER_VIEW;
Expand Down Expand Up @@ -129,7 +130,8 @@ public void testNonLowercaseNamespace()
Optional.empty(),
false,
_ -> false,
newDirectExecutorService());
newDirectExecutorService(),
directExecutor());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Map;
import java.util.Optional;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.trino.plugin.iceberg.catalog.snowflake.TestIcebergSnowflakeCatalogConnectorSmokeTest.S3_ACCESS_KEY;
Expand Down Expand Up @@ -225,7 +226,8 @@ public void testNonLowercaseNamespace()
Optional.empty(),
false,
_ -> false,
newDirectExecutorService());
newDirectExecutorService(),
directExecutor());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
Expand Down

0 comments on commit 4f48087

Please sign in to comment.