Skip to content

Commit

Permalink
Added partition column to the Iceberg's File metadata table
Browse files Browse the repository at this point in the history
  • Loading branch information
osscm committed Jul 28, 2023
1 parent 2d669da commit f7f6462
Show file tree
Hide file tree
Showing 7 changed files with 409 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.spi.block.Block;
Expand All @@ -34,7 +35,10 @@
import jakarta.annotation.Nullable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableGroup;
Expand All @@ -44,6 +48,7 @@
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructLikeWrapper;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -53,9 +58,11 @@
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionColumnType;
import static io.trino.plugin.iceberg.IcebergUtil.partitionTypes;
import static io.trino.plugin.iceberg.PartitionTable.getAllPartitionFields;
import static io.trino.spi.block.MapValueBuilder.buildMapValue;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
Expand All @@ -71,29 +78,39 @@ public class FilesTable
private final TypeManager typeManager;
private final Table icebergTable;
private final Optional<Long> snapshotId;
private final Optional<IcebergPartitionColumn> partitionColumnType;
private final List<PartitionField> partitionFields;
private final Map<Integer, Type.PrimitiveType> idToPrimitiveTypeMapping;

public FilesTable(SchemaTableName tableName, TypeManager typeManager, Table icebergTable, Optional<Long> snapshotId)
{
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");

tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"),
ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("content", INTEGER))
.add(new ColumnMetadata("file_path", VARCHAR))
.add(new ColumnMetadata("file_format", VARCHAR))
.add(new ColumnMetadata("record_count", BIGINT))
.add(new ColumnMetadata("file_size_in_bytes", BIGINT))
.add(new ColumnMetadata("column_sizes", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata("value_counts", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata("null_value_counts", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata("nan_value_counts", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata("lower_bounds", typeManager.getType(mapType(INTEGER.getTypeSignature(), VARCHAR.getTypeSignature()))))
.add(new ColumnMetadata("upper_bounds", typeManager.getType(mapType(INTEGER.getTypeSignature(), VARCHAR.getTypeSignature()))))
.add(new ColumnMetadata("key_metadata", VARBINARY))
.add(new ColumnMetadata("split_offsets", new ArrayType(BIGINT)))
.add(new ColumnMetadata("equality_ids", new ArrayType(INTEGER)))
.build());
this.partitionFields = getAllPartitionFields(icebergTable);

this.partitionColumnType = getPartitionColumnType(partitionFields, icebergTable.schema(), typeManager);

List<ColumnMetadata> connectorMetadata = Lists.newArrayList(
new ColumnMetadata("content", INTEGER),
new ColumnMetadata("file_path", VARCHAR),
new ColumnMetadata("file_format", VARCHAR),
new ColumnMetadata("record_count", BIGINT),
new ColumnMetadata("file_size_in_bytes", BIGINT),
new ColumnMetadata("column_sizes", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))),
new ColumnMetadata("value_counts", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))),
new ColumnMetadata("null_value_counts", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))),
new ColumnMetadata("nan_value_counts", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))),
new ColumnMetadata("lower_bounds", typeManager.getType(mapType(INTEGER.getTypeSignature(), VARCHAR.getTypeSignature()))),
new ColumnMetadata("upper_bounds", typeManager.getType(mapType(INTEGER.getTypeSignature(), VARCHAR.getTypeSignature()))),
new ColumnMetadata("key_metadata", VARBINARY),
new ColumnMetadata("split_offsets", new ArrayType(BIGINT)),
new ColumnMetadata("equality_ids", new ArrayType(INTEGER)));
partitionColumnType.ifPresent(icebergPartitionColumn ->
connectorMetadata.add(new ColumnMetadata("partition", icebergPartitionColumn.rowType())));

idToPrimitiveTypeMapping = IcebergUtil.primitiveFieldTypes(icebergTable.schema());
tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), connectorMetadata);
this.snapshotId = requireNonNull(snapshotId, "snapshotId is null");
}

Expand Down Expand Up @@ -124,7 +141,8 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
.useSnapshot(snapshotId.get())
.includeColumnStats();

PlanFilesIterable planFilesIterable = new PlanFilesIterable(tableScan.planFiles(), idToTypeMapping, types, typeManager);
PlanFilesIterable planFilesIterable = new
PlanFilesIterable(tableScan.planFiles(), idToTypeMapping, types, typeManager, partitionColumnType, partitionFields, idToPrimitiveTypeMapping);
return planFilesIterable.cursor();
}

Expand All @@ -134,18 +152,32 @@ private static class PlanFilesIterable
{
private final CloseableIterable<FileScanTask> planFiles;
private final Map<Integer, Type> idToTypeMapping;
private final Map<Integer, Type.PrimitiveType> idToPrimitiveTypeMapping;
private final List<io.trino.spi.type.Type> types;
private boolean closed;
private final MapType integerToBigintMapType;
private final MapType integerToVarcharMapType;
private TypeManager typeManager;
Optional<IcebergPartitionColumn> partitionColumnType;
private final List<PartitionField> partitionFields;

public PlanFilesIterable(CloseableIterable<FileScanTask> planFiles, Map<Integer, Type> idToTypeMapping, List<io.trino.spi.type.Type> types, TypeManager typeManager)
public PlanFilesIterable(CloseableIterable<FileScanTask> planFiles,
Map<Integer, Type> idToTypeMapping,
List<io.trino.spi.type.Type> types,
TypeManager typeManager,
Optional<IcebergPartitionColumn> partitionColumnType,
List<PartitionField> partitionFields,
Map<Integer, Type.PrimitiveType> idToPrimitiveTypeMapping)
{
this.planFiles = requireNonNull(planFiles, "planFiles is null");
this.idToTypeMapping = ImmutableMap.copyOf(requireNonNull(idToTypeMapping, "idToTypeMapping is null"));
this.idToPrimitiveTypeMapping = idToPrimitiveTypeMapping;
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
this.integerToBigintMapType = new MapType(INTEGER, BIGINT, typeManager.getTypeOperators());
this.integerToVarcharMapType = new MapType(INTEGER, VARCHAR, typeManager.getTypeOperators());
this.typeManager = typeManager;
this.partitionColumnType = partitionColumnType;
this.partitionFields = partitionFields;
addCloseable(planFiles);
}

Expand Down Expand Up @@ -182,7 +214,8 @@ public boolean hasNext()
@Override
public List<Object> next()
{
return getRecord(planFilesIterator.next().file());
FileScanTask fileScanTask = planFilesIterator.next();
return getRecord(fileScanTask.file(), fileScanTask.spec());
}

@Override
Expand All @@ -195,7 +228,8 @@ public void close()
};
}

private List<Object> getRecord(DataFile dataFile)
private List<Object> getRecord(DataFile dataFile,
PartitionSpec spec)
{
List<Object> columns = new ArrayList<>();
columns.add(dataFile.content().id());
Expand All @@ -212,7 +246,18 @@ private List<Object> getRecord(DataFile dataFile)
columns.add(toVarbinarySlice(dataFile.keyMetadata()));
columns.add(toBigintArrayBlock(dataFile.splitOffsets()));
columns.add(toIntegerArrayBlock(dataFile.equalityFieldIds()));
checkArgument(columns.size() == types.size(), "Expected %s types in row, but got %s values", types.size(), columns.size());

Types.StructType structType = spec.partitionType();
StructLike partitionStruct = dataFile.partition();
StructLikeWrapper partitionWrapper = StructLikeWrapper.forType(structType).set(partitionStruct);
StructLikeWrapperWithFieldIdToIndex structLikeWrapperWithFieldIdToIndex = new StructLikeWrapperWithFieldIdToIndex(partitionWrapper, structType);
List<Type> partitionTypes = partitionTypes(partitionFields, idToPrimitiveTypeMapping);
List<? extends Class<?>> partitionColumnClass = partitionTypes.stream()
.map(type -> type.typeId().javaClass())
.collect(toImmutableList());

// add data for partition columns
IcebergUtil.addPartitionColumn(partitionColumnType, partitionTypes, partitionColumnClass, structLikeWrapperWithFieldIdToIndex, columns);
return columns;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import io.trino.spi.type.RowType;

import java.util.List;
import java.util.Objects;

public class IcebergPartitionColumn
{
private final RowType rowType;
private final List<Integer> fieldIds;

public IcebergPartitionColumn(RowType rowType, List<Integer> fieldIds)
{
this.rowType = rowType;
this.fieldIds = fieldIds;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
IcebergPartitionColumn that = (IcebergPartitionColumn) o;
return Objects.equals(rowType, that.rowType()) && Objects.equals(fieldIds, that.fieldIds());
}

public RowType rowType()
{
return rowType;
}

public List<Integer> fieldIds()
{
return fieldIds;
}

@Override
public int hashCode()
{
return Objects.hash(rowType, fieldIds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Int128;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeOperators;
Expand Down Expand Up @@ -111,6 +112,7 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
import static io.trino.plugin.iceberg.IcebergTableProperties.getSortOrder;
import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation;
import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino;
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields;
import static io.trino.plugin.iceberg.SortFieldUtils.parseSortFields;
Expand All @@ -124,6 +126,7 @@
import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.block.RowValueBuilder.buildRowValue;
import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.NEVER_NULL;
import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL;
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
Expand All @@ -137,6 +140,7 @@
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.TypeUtils.writeNativeValue;
import static io.trino.spi.type.UuidType.javaUuidToTrinoUuid;
import static java.lang.Double.parseDouble;
import static java.lang.Float.floatToRawIntBits;
Expand Down Expand Up @@ -763,4 +767,59 @@ public static Map<String, Integer> columnNameToPositionInSchema(Schema schema)
(column, position) -> immutableEntry(column.name(), Long.valueOf(position).intValue()))
.collect(toImmutableMap(Entry::getKey, Entry::getValue));
}

public static Optional<IcebergPartitionColumn> getPartitionColumnType(List<PartitionField> fields, Schema schema, TypeManager typeManager)
{
if (fields.isEmpty()) {
return Optional.empty();
}
List<RowType.Field> partitionFields = fields.stream()
.map(field -> RowType.field(
field.name(),
toTrinoType(field.transform().getResultType(schema.findType(field.sourceId())), typeManager)))
.collect(toImmutableList());
List<Integer> fieldIds = fields.stream()
.map(PartitionField::fieldId)
.collect(toImmutableList());
return Optional.of(new IcebergPartitionColumn(RowType.from(partitionFields), fieldIds));
}

public static void addPartitionColumn(Optional<IcebergPartitionColumn> partitionColumn,
List<org.apache.iceberg.types.Type> partitionTypes,
List<? extends Class<?>> partitionColumnClass,
StructLikeWrapperWithFieldIdToIndex partitionStruct,
List<Object> row)
{
partitionColumn.ifPresent(partitionColumnType -> {
row.add(buildRowValue(partitionColumnType.rowType(), fields -> {
List<io.trino.spi.type.Type> partitionColumnTypes = partitionColumnType.rowType().getFields().stream()
.map(RowType.Field::getType)
.collect(toImmutableList());
for (int i = 0; i < partitionColumnTypes.size(); i++) {
io.trino.spi.type.Type trinoType = partitionColumnType.rowType().getFields().get(i).getType();
Object value = null;
Integer fieldId = partitionColumnType.fieldIds().get(i);
if (partitionStruct.fieldIdToIndex().containsKey(fieldId)) {
value = convertIcebergValueToTrino(
partitionTypes.get(i),
partitionStruct.structLikeWrapper().get().get(partitionStruct.fieldIdToIndex().get(fieldId), partitionColumnClass.get(i)));
}
writeNativeValue(trinoType, fields.get(i), value);
}
}));
});
}

public static List<org.apache.iceberg.types.Type> partitionTypes(
List<PartitionField> partitionFields,
Map<Integer, org.apache.iceberg.types.Type.PrimitiveType> idToPrimitiveTypeMapping)
{
ImmutableList.Builder<org.apache.iceberg.types.Type> partitionTypeBuilder = ImmutableList.builder();
for (PartitionField partitionField : partitionFields) {
org.apache.iceberg.types.Type.PrimitiveType sourceType = idToPrimitiveTypeMapping.get(partitionField.sourceId());
org.apache.iceberg.types.Type type = partitionField.transform().getResultType(sourceType);
partitionTypeBuilder.add(type);
}
return partitionTypeBuilder.build();
}
}
Loading

0 comments on commit f7f6462

Please sign in to comment.