diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CreateTableException.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CreateTableException.java new file mode 100644 index 000000000000..66dfccc6a424 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CreateTableException.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.spi.TrinoException; +import io.trino.spi.connector.SchemaTableName; +import org.apache.iceberg.exceptions.CleanableFailure; + +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR; +import static java.lang.String.format; + +public class CreateTableException + extends TrinoException + implements CleanableFailure +{ + public CreateTableException(Throwable throwable, SchemaTableName tableName) + { + super(ICEBERG_COMMIT_ERROR, format("Failed to create table %s: %s", tableName, throwable.getMessage()), throwable); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java index 10120a8804f9..afdc4dbd211f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java @@ -16,14 +16,13 @@ import io.trino.annotation.NotThreadSafe; import io.trino.metastore.PrincipalPrivileges; import io.trino.metastore.Table; -import io.trino.plugin.hive.TableAlreadyExistsException; import io.trino.plugin.hive.metastore.MetastoreUtil; import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; +import io.trino.plugin.iceberg.CreateTableException; import io.trino.plugin.iceberg.UnknownTableTypeException; import io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; -import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.TableNotFoundException; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.io.FileIO; @@ -124,10 +123,11 @@ protected final void commitNewTable(TableMetadata metadata) try { metastore.createTable(table, privileges); } - catch (SchemaNotFoundException | TableAlreadyExistsException e) { - // clean up metadata files corresponding to the current transaction + catch (Exception e) { + // clean up metadata file corresponding to the current transaction fileIo.deleteFile(newMetadataLocation); - throw e; + // wrap exception in CleanableFailure to ensure that manifest list Avro files are also cleaned up + throw new CreateTableException(e, getSchemaTableName()); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 02ce4a0305c2..dd808c868dc6 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -8162,7 +8162,7 @@ public void testDynamicFilterWithExplicitPartitionFilter() @Override protected void verifyTableNameLengthFailurePermissible(Throwable e) { - assertThat(e).hasMessageMatching("Table name must be shorter than or equal to '128' characters but got .*"); + assertThat(e).hasMessageMatching(".*Table name must be shorter than or equal to '128' characters but got .*"); } @Test diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestIcebergFileMetastoreCreateTableFailure.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestIcebergFileMetastoreCreateTableFailure.java new file mode 100644 index 000000000000..3196539ce7a7 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/file/TestIcebergFileMetastoreCreateTableFailure.java @@ -0,0 +1,125 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.file; + +import io.trino.Session; +import io.trino.filesystem.local.LocalFileSystemFactory; +import io.trino.metastore.HiveMetastore; +import io.trino.metastore.PrincipalPrivileges; +import io.trino.metastore.Table; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.hive.metastore.HiveMetastoreConfig; +import io.trino.plugin.hive.metastore.file.FileHiveMetastore; +import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; +import io.trino.plugin.iceberg.TestingIcebergPlugin; +import io.trino.spi.connector.SchemaNotFoundException; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Execution; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; +import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; + +@TestInstance(PER_CLASS) +@Execution(SAME_THREAD) +public class TestIcebergFileMetastoreCreateTableFailure + extends AbstractTestQueryFramework +{ + private static final String ICEBERG_CATALOG = "iceberg"; + private static final String SCHEMA_NAME = "test_schema"; + + private Path dataDirectory; + private HiveMetastore metastore; + private final AtomicReference testException = new AtomicReference<>(); + + @Override + protected DistributedQueryRunner createQueryRunner() + throws Exception + { + this.dataDirectory = Files.createTempDirectory("test_iceberg_create_table_failure"); + // Using FileHiveMetastore as approximation of HMS + this.metastore = new FileHiveMetastore( + new NodeVersion("testversion"), + new LocalFileSystemFactory(Path.of(dataDirectory.toString())), + new HiveMetastoreConfig().isHideDeltaLakeTables(), + new FileHiveMetastoreConfig() + .setCatalogDirectory("local://")) + { + @Override + public synchronized void createTable(Table table, PrincipalPrivileges principalPrivileges) + { + if (testException.get() != null) { + throw testException.get(); + } + } + }; + + Session session = testSessionBuilder() + .setCatalog(ICEBERG_CATALOG) + .setSchema(SCHEMA_NAME) + .build(); + + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).build(); + queryRunner.installPlugin(new TestingIcebergPlugin(Path.of(dataDirectory.toString()), Optional.of(new TestingIcebergFileMetastoreCatalogModule(metastore)))); + queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg"); + queryRunner.execute("CREATE SCHEMA " + SCHEMA_NAME); + + return queryRunner; + } + + @AfterAll + public void cleanup() + throws Exception + { + if (metastore != null) { + metastore.dropDatabase(SCHEMA_NAME, true); + } + if (dataDirectory != null) { + deleteRecursively(dataDirectory, ALLOW_INSECURE); + } + } + + @Test + public void testCreateTableFailureMetadataCleanedUp() + { + testException.set(new SchemaNotFoundException("simulated_test_schema")); + String tableName = "test_create_failure_" + randomNameSuffix(); + String tableLocation = "local:///" + tableName; + String createTableSql = "CREATE TABLE " + tableName + " (a varchar) WITH (location = '" + tableLocation + "')"; + assertThatThrownBy(() -> getQueryRunner().execute(createTableSql)) + .hasMessageContaining("Schema simulated_test_schema not found"); + + Path metadataDirectory = dataDirectory.resolve(tableName, "metadata"); + assertThat(metadataDirectory).as("Metadata file should not exist").isEmptyDirectory(); + + // it should be possible to create a table with the same name after the failure + testException.set(null); + getQueryRunner().execute(createTableSql); + assertThat(metadataDirectory).as("Metadata file should not exist").isNotEmptyDirectory(); + } +}