Skip to content

Commit

Permalink
Delete the oldest tracked version metadata files after commit
Browse files Browse the repository at this point in the history
  • Loading branch information
7hong authored and ebyhr committed Dec 24, 2024
1 parent 47720cb commit 9c05d05
Show file tree
Hide file tree
Showing 16 changed files with 136 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.CatalogUtil.deleteRemovedMetadataFiles;
import static org.apache.iceberg.TableMetadataParser.getFileExtension;
import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION;
import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION_DEFAULT;
Expand Down Expand Up @@ -174,6 +175,7 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata)
}
else {
commitToExistingTable(base, metadata);
deleteRemovedMetadataFiles(fileIo, base, metadata);
}

shouldRefresh = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,32 @@
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

import java.io.IOException;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.concurrent.MoreFutures.tryGetFutureValue;
import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory;
import static io.trino.plugin.iceberg.IcebergTestUtils.getMetadataFileAndUpdatedMillis;
import static io.trino.plugin.iceberg.IcebergTestUtils.withSmallRowGroups;
import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.DROP_TABLE;
import static io.trino.testing.TestingAccessControlManager.privilege;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_TABLE;
import static io.trino.testing.TestingConnectorSession.SESSION;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.String.format;
Expand Down Expand Up @@ -887,6 +896,39 @@ protected AutoCloseable createAdditionalTables(String schema)
return () -> {};
}

@Test
public void testMetadataDeleteAfterCommitEnabled()
throws IOException
{
if (!hasBehavior(SUPPORTS_CREATE_TABLE)) {
return;
}

int metadataPreviousVersionCount = 5;
String tableName = "test_metadata_delete_after_commit_enabled" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + "(_bigint BIGINT, _varchar VARCHAR)");
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES extra_properties = MAP(ARRAY['write.metadata.delete-after-commit.enabled'], ARRAY['true'])");
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES extra_properties = MAP(ARRAY['write.metadata.previous-versions-max'], ARRAY['" + metadataPreviousVersionCount + "'])");
String tableLocation = getTableLocation(tableName);

Map<String, Long> historyMetadataFiles = getMetadataFileAndUpdatedMillis(fileSystem, tableLocation);
for (int i = 0; i < 10; i++) {
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a')", 1);
Map<String, Long> metadataFiles = getMetadataFileAndUpdatedMillis(fileSystem, tableLocation);
historyMetadataFiles.putAll(metadataFiles);
assertThat(metadataFiles.size()).isLessThanOrEqualTo(1 + metadataPreviousVersionCount);
Set<String> expectMetadataFiles = historyMetadataFiles
.entrySet()
.stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.limit(metadataPreviousVersionCount + 1)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
assertThat(metadataFiles.keySet()).containsAll(expectMetadataFiles);
}
assertUpdate("DROP TABLE " + tableName);
}

private long getMostRecentSnapshotId(String tableName)
{
return (long) Iterables.getOnlyElement(getQueryRunner().execute(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC LIMIT 1", tableName))
Expand All @@ -901,7 +943,14 @@ private ZonedDateTime getSnapshotTime(String tableName, long snapshotId)

protected String getTableLocation(String tableName)
{
return (String) computeScalar("SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*/[^/]*$', '') FROM " + tableName);
Pattern locationPattern = Pattern.compile(".*location = '(.*?)'.*", Pattern.DOTALL);
Matcher m = locationPattern.matcher((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue());
if (m.find()) {
String location = m.group(1);
verify(!m.find(), "Unexpected second match");
return location;
}
throw new IllegalStateException("Location not found in SHOW CREATE TABLE result");
}

protected abstract void dropTableFromMetastore(String tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ protected QueryRunner createQueryRunner()
.put("s3.max-connections", "2") // verify no leaks
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.writer-sort-buffer-size", "1MB")
.put("iceberg.allowed-extra-properties", "write.metadata.delete-after-commit.enabled,write.metadata.previous-versions-max")
.putAll(getAdditionalIcebergProperties())
.buildOrThrow())
.setSchemaInitializer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import io.airlift.slice.Slice;
import io.trino.Session;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
Expand All @@ -40,15 +42,20 @@
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog;
import io.trino.plugin.iceberg.fileio.ForwardingInputFile;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.TestingTypeManager;
import io.trino.testing.QueryRunner;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

Expand All @@ -60,6 +67,8 @@
import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.createPerTransactionCache;
import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.METADATA_JSON;
import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.fromFilePath;
import static io.trino.testing.TestingConnectorSession.SESSION;

public final class IcebergTestUtils
Expand Down Expand Up @@ -190,4 +199,19 @@ public static BaseTable loadTable(String tableName,
directExecutor());
return (BaseTable) loadIcebergTable(catalog, tableOperationsProvider, SESSION, new SchemaTableName(schemaName, tableName));
}

public static Map<String, Long> getMetadataFileAndUpdatedMillis(TrinoFileSystem trinoFileSystem, String tableLocation)
throws IOException
{
FileIterator fileIterator = trinoFileSystem.listFiles(Location.of(tableLocation + "/metadata"));
Map<String, Long> metadataFiles = new HashMap<>();
while (fileIterator.hasNext()) {
FileEntry entry = fileIterator.next();
if (fromFilePath(entry.location().path()) == METADATA_JSON) {
TableMetadata tableMetadata = TableMetadataParser.read(null, new ForwardingInputFile(trinoFileSystem.newInputFile(entry.location())));
metadataFiles.put(entry.location().path(), tableMetadata.lastUpdatedMillis());
}
}
return metadataFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ protected QueryRunner createQueryRunner()
.put("azure.access-key", accessKey)
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.writer-sort-buffer-size", "1MB")
.put("iceberg.allowed-extra-properties", "write.metadata.delete-after-commit.enabled,write.metadata.previous-versions-max")
.buildOrThrow())
.setSchemaInitializer(
SchemaInitializer.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ protected QueryRunner createQueryRunner()
.setIcebergProperties(ImmutableMap.of(
"iceberg.file-format", format.name(),
"iceberg.register-table-procedure.enabled", "true",
"iceberg.writer-sort-buffer-size", "1MB"))
"iceberg.writer-sort-buffer-size", "1MB",
"iceberg.allowed-extra-properties", "write.metadata.delete-after-commit.enabled,write.metadata.previous-versions-max"))
.build();
metastore = getHiveMetastore(queryRunner);
return queryRunner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ protected QueryRunner createQueryRunner()
.put("iceberg.file-format", format.name())
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.writer-sort-buffer-size", "1MB")
.put("iceberg.allowed-extra-properties", "write.metadata.delete-after-commit.enabled,write.metadata.previous-versions-max")
.buildOrThrow())
.setSchemaInitializer(
SchemaInitializer.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.metastore.Column;
import io.trino.metastore.HiveMetastore;
Expand Down Expand Up @@ -74,6 +75,7 @@
import org.junit.jupiter.api.TestInstance;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -96,6 +98,7 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory;
import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore;
import static io.trino.plugin.iceberg.IcebergTestUtils.getMetadataFileAndUpdatedMillis;
import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTable;
import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTableWithSchema;
import static io.trino.spi.type.BigintType.BIGINT;
Expand All @@ -113,6 +116,8 @@
import static org.apache.iceberg.FileFormat.ORC;
import static org.apache.iceberg.FileFormat.PARQUET;
import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED;
import static org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.iceberg.mapping.NameMappingParser.toJson;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -1516,6 +1521,39 @@ void testEnvironmentContext()
}
}

@Test
public void testMetadataDeleteAfterCommitEnabled()
throws IOException
{
int metadataPreviousVersionCount = 5;
String tableName = "test_metadata_delete_after_commit_enabled" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + "(_bigint BIGINT, _varchar VARCHAR)");
BaseTable icebergTable = loadTable(tableName);
String location = icebergTable.location();
icebergTable.updateProperties()
.set(METADATA_DELETE_AFTER_COMMIT_ENABLED, "true")
.set(METADATA_PREVIOUS_VERSIONS_MAX, String.valueOf(metadataPreviousVersionCount))
.commit();

TrinoFileSystem trinoFileSystem = fileSystemFactory.create(SESSION);
Map<String, Long> historyMetadataFiles = getMetadataFileAndUpdatedMillis(trinoFileSystem, location);
for (int i = 0; i < 10; i++) {
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a')", 1);
Map<String, Long> metadataFiles = getMetadataFileAndUpdatedMillis(trinoFileSystem, location);
historyMetadataFiles.putAll(metadataFiles);
assertThat(metadataFiles.size()).isLessThanOrEqualTo(1 + metadataPreviousVersionCount);
Set<String> expectMetadataFiles = historyMetadataFiles
.entrySet()
.stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.limit(metadataPreviousVersionCount + 1)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
assertThat(metadataFiles.keySet()).containsAll(expectMetadataFiles);
}
assertUpdate("DROP TABLE " + tableName);
}

private void testHighlyNestedFieldPartitioningWithTimestampTransform(String partitioning, String partitionDirectoryRegex, Set<String> expectedPartitionDirectories)
{
String tableName = "test_highly_nested_field_partitioning_with_timestamp_transform_" + randomNameSuffix();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ protected QueryRunner createQueryRunner()
"iceberg.catalog.type", "glue",
"hive.metastore.glue.default-warehouse-dir", schemaPath(),
"iceberg.register-table-procedure.enabled", "true",
"iceberg.writer-sort-buffer-size", "1MB"))
"iceberg.writer-sort-buffer-size", "1MB",
"iceberg.allowed-extra-properties", "write.metadata.delete-after-commit.enabled,write.metadata.previous-versions-max"))
.setSchemaInitializer(
SchemaInitializer.builder()
.withClonedTpchTables(REQUIRED_TPCH_TABLES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ protected QueryRunner createQueryRunner()
.put("iceberg.jdbc-catalog.catalog-name", "tpch")
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.writer-sort-buffer-size", "1MB")
.put("iceberg.allowed-extra-properties", "write.metadata.delete-after-commit.enabled,write.metadata.previous-versions-max")
.put("iceberg.jdbc-catalog.default-warehouse-dir", warehouseLocation.getAbsolutePath())
.put("iceberg.jdbc-catalog.retryable-status-codes", "57P01,57P05")
.buildOrThrow())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ protected QueryRunner createQueryRunner()
"iceberg.catalog.type", "nessie",
"iceberg.nessie-catalog.uri", nessieContainer.getRestApiUri(),
"iceberg.nessie-catalog.default-warehouse-dir", tempDir.toString(),
"iceberg.writer-sort-buffer-size", "1MB"))
"iceberg.writer-sort-buffer-size", "1MB",
"iceberg.allowed-extra-properties", "write.metadata.delete-after-commit.enabled,write.metadata.previous-versions-max"))
.setSchemaInitializer(
SchemaInitializer.builder()
.withClonedTpchTables(ImmutableList.<TpchTable<?>>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ protected QueryRunner createQueryRunner()
.addIcebergProperty("iceberg.file-format", format.name())
.addIcebergProperty("iceberg.register-table-procedure.enabled", "true")
.addIcebergProperty("iceberg.writer-sort-buffer-size", "1MB")
.addIcebergProperty("iceberg.allowed-extra-properties", "write.metadata.delete-after-commit.enabled,write.metadata.previous-versions-max")
.addIcebergProperty("iceberg.catalog.type", "rest")
.addIcebergProperty("iceberg.rest-catalog.nested-namespace-enabled", "true")
.addIcebergProperty("iceberg.rest-catalog.uri", polarisCatalog.restUri() + "/api/catalog")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ protected QueryRunner createQueryRunner()
.put("iceberg.rest-catalog.uri", testServer.getBaseUrl().toString())
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.writer-sort-buffer-size", "1MB")
.put("iceberg.allowed-extra-properties", "write.metadata.delete-after-commit.enabled,write.metadata.previous-versions-max")
.buildOrThrow();

Map<String, String> nestedNamespaceEnabled = ImmutableMap.<String, String>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ protected QueryRunner createQueryRunner()
.put("iceberg.rest-catalog.uri", testServer.getBaseUrl().toString())
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.writer-sort-buffer-size", "1MB")
.put("iceberg.allowed-extra-properties", "write.metadata.delete-after-commit.enabled,write.metadata.previous-versions-max")
.buildOrThrow())
.setInitialTables(REQUIRED_TPCH_TABLES)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,4 +528,12 @@ public void testIcebergTablesFunction()
assertThatThrownBy(super::testIcebergTablesFunction)
.hasStackTraceContaining("Access Denied");
}

@Test
@Override
public void testMetadataDeleteAfterCommitEnabled()
{
assertThatThrownBy(super::testMetadataDeleteAfterCommitEnabled)
.hasStackTraceContaining("Access Denied");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ protected QueryRunner createQueryRunner()
.put("iceberg.rest-catalog.uri", "http://" + restCatalogBackendContainer.getRestCatalogEndpoint())
.put("iceberg.rest-catalog.vended-credentials-enabled", "true")
.put("iceberg.writer-sort-buffer-size", "1MB")
.put("iceberg.allowed-extra-properties", "write.metadata.delete-after-commit.enabled,write.metadata.previous-versions-max")
.put("fs.hadoop.enabled", "false")
.put("fs.native-s3.enabled", "true")
.put("s3.region", MINIO_REGION)
Expand Down

0 comments on commit 9c05d05

Please sign in to comment.