Skip to content

Commit

Permalink
Add $all_entries metadata table to Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Dec 24, 2024
1 parent 4f48087 commit d783cb1
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
import jakarta.annotation.Nullable;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetricsUtil.ReadableMetricsStruct;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Table;
Expand All @@ -43,6 +44,7 @@
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.Slices.wrappedHeapBuffer;
import static io.trino.plugin.iceberg.FilesTable.getIcebergIdToTypeMapping;
Expand All @@ -59,8 +61,10 @@
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES;
import static org.apache.iceberg.MetadataTableType.ENTRIES;

// https://iceberg.apache.org/docs/latest/spark-queries/#all-entries
// https://iceberg.apache.org/docs/latest/spark-queries/#entries
public class EntriesTable
extends BaseSystemTable
Expand All @@ -70,15 +74,16 @@ public class EntriesTable
private final Optional<IcebergPartitionColumn> partitionColumn;
private final List<Type> partitionTypes;

public EntriesTable(TypeManager typeManager, SchemaTableName tableName, Table icebergTable, ExecutorService executor)
public EntriesTable(TypeManager typeManager, SchemaTableName tableName, Table icebergTable, MetadataTableType metadataTableType, ExecutorService executor)
{
super(
requireNonNull(icebergTable, "icebergTable is null"),
new ConnectorTableMetadata(
requireNonNull(tableName, "tableName is null"),
columns(requireNonNull(typeManager, "typeManager is null"), icebergTable)),
ENTRIES,
metadataTableType,
executor);
checkArgument(metadataTableType == ALL_ENTRIES || metadataTableType == ENTRIES, "Unexpected metadata table type: %s", metadataTableType);
idToTypeMapping = getIcebergIdToTypeMapping(icebergTable.schema());
primitiveFields = IcebergUtil.primitiveFields(icebergTable.schema()).stream()
.sorted(Comparator.comparing(Types.NestedField::name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.joining;
import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES;
import static org.apache.iceberg.MetadataTableType.ENTRIES;
import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations;
import static org.apache.iceberg.ReachableFileUtil.statisticsFilesLocations;
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
Expand Down Expand Up @@ -707,7 +709,8 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
case ALL_MANIFESTS -> Optional.of(new AllManifestsTable(tableName, table, icebergScanExecutor));
case MANIFESTS -> Optional.of(new ManifestsTable(tableName, table, getCurrentSnapshotId(table)));
case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table), icebergScanExecutor));
case ENTRIES -> Optional.of(new EntriesTable(typeManager, tableName, table, icebergScanExecutor));
case ALL_ENTRIES -> Optional.of(new EntriesTable(typeManager, tableName, table, ALL_ENTRIES, icebergScanExecutor));
case ENTRIES -> Optional.of(new EntriesTable(typeManager, tableName, table, ENTRIES, icebergScanExecutor));
case PROPERTIES -> Optional.of(new PropertiesTable(tableName, table));
case REFS -> Optional.of(new RefsTable(tableName, table, icebergScanExecutor));
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public enum TableType
MANIFESTS,
PARTITIONS,
FILES,
ALL_ENTRIES,
ENTRIES,
PROPERTIES,
REFS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,27 @@ public void testFilesTableWithDelete()
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_with_delete");
}

@Test
void testAllEntriesTable()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_all_entries", "AS SELECT 1 id, DATE '2014-01-01' dt")) {
assertThat(query("DESCRIBE \"" + table.getName() + "$all_entries\""))
.matches("DESCRIBE \"" + table.getName() + "$entries\"");

assertThat(query("SELECT * FROM \"" + table.getName() + "$all_entries\""))
.matches("SELECT * FROM \"" + table.getName() + "$entries\"");

assertUpdate("DELETE FROM " + table.getName(), 1);

assertThat(computeActual("SELECT status FROM \"" + table.getName() + "$all_entries\"").getOnlyColumnAsSet())
.containsExactly(1, 2);
assertThat(computeActual("SELECT status FROM \"" + table.getName() + "$entries\"").getOnlyColumnAsSet())
.containsExactly(2);
assertThat(query("SELECT * FROM \"" + table.getName() + "$all_entries\" WHERE status = 2"))
.matches("SELECT * FROM \"" + table.getName() + "$entries\"");
}
}

@Test
void testEntriesTable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static io.trino.plugin.hive.metastore.MetastoreMethod.GET_TABLES;
import static io.trino.plugin.hive.metastore.MetastoreMethod.REPLACE_TABLE;
import static io.trino.plugin.iceberg.IcebergSessionProperties.COLLECT_EXTENDED_STATISTICS_ON_WRITE;
import static io.trino.plugin.iceberg.TableType.ALL_ENTRIES;
import static io.trino.plugin.iceberg.TableType.ALL_MANIFESTS;
import static io.trino.plugin.iceberg.TableType.DATA;
import static io.trino.plugin.iceberg.TableType.ENTRIES;
Expand Down Expand Up @@ -339,6 +340,12 @@ public void testSelectSystemTable()
.addCopies(GET_TABLE, 1)
.build());

// select from $all_entries
assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$all_entries\"",
ImmutableMultiset.<MetastoreMethod>builder()
.addCopies(GET_TABLE, 1)
.build());

// select from $entries
assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$entries\"",
ImmutableMultiset.<MetastoreMethod>builder()
Expand All @@ -356,7 +363,7 @@ public void testSelectSystemTable()

// This test should get updated if a new system table is added.
assertThat(TableType.values())
.containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, ALL_MANIFESTS, MANIFESTS, PARTITIONS, FILES, ENTRIES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
.containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, ALL_MANIFESTS, MANIFESTS, PARTITIONS, FILES, ALL_ENTRIES, ENTRIES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static io.trino.plugin.hive.metastore.glue.GlueMetastoreMethod.GET_TABLES;
import static io.trino.plugin.hive.metastore.glue.GlueMetastoreMethod.UPDATE_TABLE;
import static io.trino.plugin.iceberg.IcebergSessionProperties.COLLECT_EXTENDED_STATISTICS_ON_WRITE;
import static io.trino.plugin.iceberg.TableType.ALL_ENTRIES;
import static io.trino.plugin.iceberg.TableType.ALL_MANIFESTS;
import static io.trino.plugin.iceberg.TableType.DATA;
import static io.trino.plugin.iceberg.TableType.ENTRIES;
Expand Down Expand Up @@ -471,6 +472,12 @@ public void testSelectSystemTable()
.add(GET_TABLE)
.build());

// select from $all_entries
assertGlueMetastoreApiInvocations("SELECT * FROM \"test_select_snapshots$all_entries\"",
ImmutableMultiset.<GlueMetastoreMethod>builder()
.add(GET_TABLE)
.build());

// select from $entries
assertGlueMetastoreApiInvocations("SELECT * FROM \"test_select_snapshots$entries\"",
ImmutableMultiset.<GlueMetastoreMethod>builder()
Expand All @@ -494,7 +501,7 @@ public void testSelectSystemTable()

// This test should get updated if a new system table is added.
assertThat(TableType.values())
.containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, ALL_MANIFESTS, MANIFESTS, PARTITIONS, FILES, ENTRIES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
.containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, ALL_MANIFESTS, MANIFESTS, PARTITIONS, FILES, ALL_ENTRIES, ENTRIES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
}
finally {
getQueryRunner().execute("DROP TABLE IF EXISTS test_select_snapshots");
Expand Down

0 comments on commit d783cb1

Please sign in to comment.