Skip to content

Commit

Permalink
Keep only needed mapping in TableToPartitionMapping
Browse files Browse the repository at this point in the history
Keep the mapping only for columns which are needed for query execution.
With big enough table the mapping alone could take 100kB+. With recent
change we reduced the size by order of ~8 by using Int2IntArrayMap
internally but still 10kB+ of memory per each split is noticable memory
pressure on coordinator.
  • Loading branch information
losipiuk committed Oct 10, 2023
1 parent 9709be6 commit 823423e
Showing 1 changed file with 46 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.Streams;
import com.google.inject.Inject;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.stats.CounterStat;
Expand Down Expand Up @@ -53,6 +54,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -62,6 +64,7 @@
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterators.peekingIterator;
import static com.google.common.collect.Iterators.singletonIterator;
import static com.google.common.collect.Iterators.transform;
Expand Down Expand Up @@ -235,12 +238,18 @@ public ConnectorSplitSource getSplits(
return emptySplitSource();
}

Set<String> neededColumnNames = Streams.concat(hiveTable.getProjectedColumns().stream(), hiveTable.getConstraintColumns().stream())
.map(columnHandle -> ((HiveColumnHandle) columnHandle).getBaseColumnName()) // possible duplicates are handled by toImmutableSet at the end
.map(columnName -> columnName.toLowerCase(ENGLISH))
.collect(toImmutableSet());

Iterator<HivePartitionMetadata> hivePartitions = getPartitionMetadata(
session,
metastore,
table,
peekingIterator(partitions),
bucketHandle.map(HiveBucketHandle::toTableBucketProperty));
bucketHandle.map(HiveBucketHandle::toTableBucketProperty),
neededColumnNames);

HiveSplitLoader hiveSplitLoader = new BackgroundHiveSplitLoader(
table,
Expand Down Expand Up @@ -292,7 +301,8 @@ private Iterator<HivePartitionMetadata> getPartitionMetadata(
SemiTransactionalHiveMetastore metastore,
Table table,
PeekingIterator<HivePartition> hivePartitions,
Optional<HiveBucketProperty> bucketProperty)
Optional<HiveBucketProperty> bucketProperty,
Set<String> neededColumnNames)
{
if (!hivePartitions.hasNext()) {
return emptyIterator();
Expand Down Expand Up @@ -338,7 +348,8 @@ private Iterator<HivePartitionMetadata> getPartitionMetadata(
table,
bucketProperty,
hivePartition,
partition.get()));
partition.get(),
neededColumnNames));
}

return results.build();
Expand All @@ -356,7 +367,8 @@ private static HivePartitionMetadata toPartitionMetadata(
Table table,
Optional<HiveBucketProperty> bucketProperty,
HivePartition hivePartition,
Partition partition)
Partition partition,
Set<String> neededColumnNames)
{
SchemaTableName tableName = table.getSchemaTableName();
String partName = makePartitionName(table, partition);
Expand All @@ -379,7 +391,8 @@ private static HivePartitionMetadata toPartitionMetadata(
if ((tableColumns == null) || (partitionColumns == null)) {
throw new TrinoException(HIVE_INVALID_METADATA, format("Table '%s' or partition '%s' has null columns", tableName, partName));
}
TableToPartitionMapping tableToPartitionMapping = getTableToPartitionMapping(usePartitionColumnNames, typeManager, hiveTimestampPrecision, tableName, partName, tableColumns, partitionColumns);

TableToPartitionMapping tableToPartitionMapping = getTableToPartitionMapping(usePartitionColumnNames, typeManager, hiveTimestampPrecision, tableName, partName, tableColumns, partitionColumns, neededColumnNames);

if (bucketProperty.isPresent()) {
HiveBucketProperty partitionBucketProperty = partition.getStorage().getBucketProperty()
Expand Down Expand Up @@ -417,13 +430,25 @@ private static HivePartitionMetadata toPartitionMetadata(
return new HivePartitionMetadata(hivePartition, Optional.of(partition), tableToPartitionMapping);
}

private static TableToPartitionMapping getTableToPartitionMapping(boolean usePartitionColumnNames, TypeManager typeManager, HiveTimestampPrecision hiveTimestampPrecision, SchemaTableName tableName, String partName, List<Column> tableColumns, List<Column> partitionColumns)
private static TableToPartitionMapping getTableToPartitionMapping(
boolean usePartitionColumnNames,
TypeManager typeManager,
HiveTimestampPrecision hiveTimestampPrecision,
SchemaTableName tableName,
String partName,
List<Column> tableColumns,
List<Column> partitionColumns,
Set<String> neededColumnNames)
{
if (usePartitionColumnNames) {
return getTableToPartitionMappingByColumnNames(typeManager, tableName, partName, tableColumns, partitionColumns, hiveTimestampPrecision);
return getTableToPartitionMappingByColumnNames(typeManager, tableName, partName, tableColumns, partitionColumns, neededColumnNames, hiveTimestampPrecision);
}
ImmutableMap.Builder<Integer, HiveTypeName> columnCoercions = ImmutableMap.builder();
for (int i = 0; i < min(partitionColumns.size(), tableColumns.size()); i++) {
if (!neededColumnNames.contains(tableColumns.get(i).getName().toLowerCase(ENGLISH))) {
// skip columns not used in the query
continue;
}
HiveType tableType = tableColumns.get(i).getType();
HiveType partitionType = partitionColumns.get(i).getType();
if (!tableType.equals(partitionType)) {
Expand All @@ -449,11 +474,23 @@ private static boolean isPartitionUsesColumnNames(ConnectorSession session, Opti
};
}

private static TableToPartitionMapping getTableToPartitionMappingByColumnNames(TypeManager typeManager, SchemaTableName tableName, String partName, List<Column> tableColumns, List<Column> partitionColumns, HiveTimestampPrecision hiveTimestampPrecision)
private static TableToPartitionMapping getTableToPartitionMappingByColumnNames(
TypeManager typeManager,
SchemaTableName tableName,
String partName,
List<Column> tableColumns,
List<Column> partitionColumns,
Set<String> neededColumnNames,
HiveTimestampPrecision hiveTimestampPrecision)
{
ImmutableMap.Builder<String, Integer> partitionColumnIndexesBuilder = ImmutableMap.builderWithExpectedSize(partitionColumns.size());
for (int i = 0; i < partitionColumns.size(); i++) {
partitionColumnIndexesBuilder.put(partitionColumns.get(i).getName().toLowerCase(ENGLISH), i);
String columnName = partitionColumns.get(i).getName().toLowerCase(ENGLISH);
if (!neededColumnNames.contains(columnName)) {
// skip columns not used in the query
continue;
}
partitionColumnIndexesBuilder.put(columnName, i);
}
Map<String, Integer> partitionColumnsByIndex = partitionColumnIndexesBuilder.buildOrThrow();

Expand Down

0 comments on commit 823423e

Please sign in to comment.