From 62acd1b1cd69c54fd50bc0a64afb68f50ec0f167 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Wed, 29 Nov 2023 23:14:27 -0800 Subject: [PATCH] Move already created check inside of metastore implementations --- .../plugin/deltalake/DeltaLakeMetadata.java | 56 +----- .../procedure/RegisterTableProcedure.java | 18 +- ...estDeltaLakeCreateSchemaInternalRetry.java | 123 ------------- ...TestDeltaLakeCreateTableInternalRetry.java | 173 ------------------ .../SemiTransactionalHiveMetastore.java | 22 +-- .../metastore/file/FileHiveMetastore.java | 27 ++- .../metastore/glue/GlueHiveMetastore.java | 27 +++ .../thrift/BridgingHiveMetastore.java | 41 ++++- .../TestHiveCreateSchemaInternalRetry.java | 105 ----------- .../metastore/AbstractTestHiveMetastore.java | 100 ++++++++++ .../metastore/TestBridgingHiveMetastore.java | 50 +++++ .../hive/metastore/TestFileHiveMetastore.java | 57 ++++++ .../hive/metastore/TestGlueHiveMetastore.java | 45 +++++ .../hms/AbstractMetastoreTableOperations.java | 24 +-- .../iceberg/catalog/hms/TrinoHiveCatalog.java | 25 +-- .../TestIcebergCreateTableInternalRetry.java | 155 ---------------- 16 files changed, 350 insertions(+), 698 deletions(-) delete mode 100644 plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateSchemaInternalRetry.java delete mode 100644 plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateTableInternalRetry.java delete mode 100644 plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveCreateSchemaInternalRetry.java create mode 100644 plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/AbstractTestHiveMetastore.java create mode 100644 plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestBridgingHiveMetastore.java create mode 100644 plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestFileHiveMetastore.java create mode 100644 plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestGlueHiveMetastore.java delete mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCreateTableInternalRetry.java diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 6c71ba67e8f3..49a639f1c0fb 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -66,8 +66,6 @@ import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriter; import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriterFactory; import io.trino.plugin.hive.HiveType; -import io.trino.plugin.hive.SchemaAlreadyExistsException; -import io.trino.plugin.hive.TableAlreadyExistsException; import io.trino.plugin.hive.TrinoViewHiveMetastore; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Database; @@ -836,18 +834,7 @@ public void createSchema(ConnectorSession session, String schemaName, Map existingDatabase = metastore.getDatabase(schemaName); - if (existingDatabase.isEmpty() || !isCreatedBy(existingDatabase.get(), queryId)) { - throw e; - } - } + metastore.createDatabase(database); } @Override @@ -989,21 +976,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe // As a precaution, clear the caches statisticsAccess.invalidateCache(schemaTableName, Optional.of(location)); transactionLogAccess.invalidateCache(schemaTableName, Optional.of(location)); - try { - metastore.createTable( - session, - table, - principalPrivileges); - } - catch (TableAlreadyExistsException e) { - // Ignore TableAlreadyExistsException when table looks like created by us. - // This may happen when an actually successful metastore create call is retried - // e.g. because of a timeout on our side. - Optional existingTable = metastore.getRawMetastoreTable(schemaName, tableName); - if (existingTable.isEmpty() || !isCreatedBy(existingTable.get(), queryId)) { - throw e; - } - } + metastore.createTable(session, table, principalPrivileges); } public static Table buildTable(ConnectorSession session, SchemaTableName schemaTableName, String location, boolean isExternal) @@ -1317,18 +1290,7 @@ public Optional finishCreateTable( // As a precaution, clear the caches statisticsAccess.invalidateCache(schemaTableName, Optional.of(location)); transactionLogAccess.invalidateCache(schemaTableName, Optional.of(location)); - try { - metastore.createTable(session, table, principalPrivileges); - } - catch (TableAlreadyExistsException e) { - // Ignore TableAlreadyExistsException when table looks like created by us. - // This may happen when an actually successful metastore create call is retried - // e.g. because of a timeout on our side. - Optional
existingTable = metastore.getRawMetastoreTable(schemaName, tableName); - if (existingTable.isEmpty() || !isCreatedBy(existingTable.get(), queryId)) { - throw e; - } - } + metastore.createTable(session, table, principalPrivileges); } catch (Exception e) { // Remove the transaction log entry if the table creation fails @@ -1346,18 +1308,6 @@ public Optional finishCreateTable( return Optional.empty(); } - private static boolean isCreatedBy(Database database, String queryId) - { - Optional databaseQueryId = getQueryId(database); - return databaseQueryId.isPresent() && databaseQueryId.get().equals(queryId); - } - - public static boolean isCreatedBy(Table table, String queryId) - { - Optional tableQueryId = getQueryId(table); - return tableQueryId.isPresent() && tableQueryId.get().equals(queryId); - } - @Override public void setTableComment(ConnectorSession session, ConnectorTableHandle tableHandle, Optional comment) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/RegisterTableProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/RegisterTableProcedure.java index 52429e5e891d..de3c38e216cb 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/RegisterTableProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/RegisterTableProcedure.java @@ -27,7 +27,6 @@ import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; -import io.trino.plugin.hive.TableAlreadyExistsException; import io.trino.plugin.hive.metastore.PrincipalPrivileges; import io.trino.plugin.hive.metastore.Table; import io.trino.spi.TrinoException; @@ -48,7 +47,6 @@ import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_TABLE; import static io.trino.plugin.deltalake.DeltaLakeMetadata.buildTable; import static io.trino.plugin.deltalake.DeltaLakeMetadata.getQueryId; -import static io.trino.plugin.deltalake.DeltaLakeMetadata.isCreatedBy; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; @@ -185,21 +183,7 @@ private void doRegisterTable( getQueryId(table).orElseThrow(() -> new IllegalArgumentException("Query id is not present")).equals(queryId), "Table '%s' does not have correct query id set", table); - try { - metastore.createTable( - session, - table, - principalPrivileges); - } - catch (TableAlreadyExistsException e) { - // Ignore TableAlreadyExistsException when table looks like created by us. - // This may happen when an actually successful metastore create call is retried - // e.g. because of a timeout on our side. - Optional
existingTable = metastore.getRawMetastoreTable(schemaName, tableName); - if (existingTable.isEmpty() || !isCreatedBy(existingTable.get(), queryId)) { - throw e; - } - } + metastore.createTable(session, table, principalPrivileges); } } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateSchemaInternalRetry.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateSchemaInternalRetry.java deleted file mode 100644 index 6fca8fde9f12..000000000000 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateSchemaInternalRetry.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.deltalake; - -import com.google.common.collect.ImmutableMap; -import io.trino.Session; -import io.trino.plugin.deltalake.metastore.TestingDeltaLakeMetastoreModule; -import io.trino.plugin.hive.NodeVersion; -import io.trino.plugin.hive.SchemaAlreadyExistsException; -import io.trino.plugin.hive.metastore.Database; -import io.trino.plugin.hive.metastore.HiveMetastore; -import io.trino.plugin.hive.metastore.HiveMetastoreConfig; -import io.trino.plugin.hive.metastore.file.FileHiveMetastore; -import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.QueryRunner; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.Map; -import java.util.Optional; - -import static com.google.common.io.MoreFiles.deleteRecursively; -import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; -import static com.google.inject.util.Modules.EMPTY_MODULE; -import static io.trino.plugin.deltalake.DeltaLakeConnectorFactory.CONNECTOR_NAME; -import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; -import static io.trino.testing.TestingNames.randomNameSuffix; -import static io.trino.testing.TestingSession.testSessionBuilder; -import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; - -@TestInstance(PER_CLASS) -public class TestDeltaLakeCreateSchemaInternalRetry - extends AbstractTestQueryFramework -{ - private static final String CATALOG_NAME = "delta_lake"; - private static final String TEST_SCHEMA_TIMEOUT = "test_delta_lake_schema_" + randomNameSuffix(); - private static final String TEST_SCHEMA_DIFFERENT_SESSION = "test_delta_lake_schema_" + randomNameSuffix(); - - private String dataDirectory; - private HiveMetastore metastore; - - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - Session session = testSessionBuilder() - .setCatalog(CATALOG_NAME) - .build(); - DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).build(); - - this.dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("delta_lake_data").toString(); - this.metastore = new FileHiveMetastore( - new NodeVersion("testversion"), - HDFS_FILE_SYSTEM_FACTORY, - new HiveMetastoreConfig().isHideDeltaLakeTables(), - new FileHiveMetastoreConfig() - .setCatalogDirectory(dataDirectory) - .setMetastoreUser("test")) - { - @Override - public synchronized void createDatabase(Database database) - { - if (database.getDatabaseName().equals(TEST_SCHEMA_DIFFERENT_SESSION)) { - // By modifying query id test simulates that schema was created from different session. - database = Database.builder(database) - .setParameters(ImmutableMap.of(TRINO_QUERY_ID_NAME, "new_query_id")) - .build(); - } - // Simulate retry mechanism with timeout failure. - // 1. createDatabase correctly create schema but timeout is triggered - // 2. Retry to createDatabase throws SchemaAlreadyExistsException - super.createDatabase(database); - throw new SchemaAlreadyExistsException(database.getDatabaseName()); - } - }; - - queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), Optional.empty(), EMPTY_MODULE)); - queryRunner.createCatalog(CATALOG_NAME, CONNECTOR_NAME, Map.of()); - return queryRunner; - } - - @AfterAll - public void tearDown() - throws IOException - { - if (metastore != null) { - metastore.dropDatabase(TEST_SCHEMA_TIMEOUT, false); - metastore.dropDatabase(TEST_SCHEMA_DIFFERENT_SESSION, false); - deleteRecursively(Path.of(dataDirectory), ALLOW_INSECURE); - } - } - - @Test - public void testSchemaCreationWithTimeout() - { - assertQuerySucceeds("CREATE SCHEMA " + TEST_SCHEMA_TIMEOUT); - assertQuery("SHOW SCHEMAS LIKE '" + TEST_SCHEMA_TIMEOUT + "'", "VALUES ('" + TEST_SCHEMA_TIMEOUT + "')"); - } - - @Test - public void testSchemaCreationFailsWhenCreatedWithDifferentSession() - { - assertQueryFails("CREATE SCHEMA " + TEST_SCHEMA_DIFFERENT_SESSION, "Schema already exists: '" + TEST_SCHEMA_DIFFERENT_SESSION + "'"); - assertQuery("SHOW SCHEMAS LIKE '" + TEST_SCHEMA_DIFFERENT_SESSION + "'", "VALUES ('" + TEST_SCHEMA_DIFFERENT_SESSION + "')"); - } -} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateTableInternalRetry.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateTableInternalRetry.java deleted file mode 100644 index 5e2dac8c5364..000000000000 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateTableInternalRetry.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.deltalake; - -import com.google.common.collect.ImmutableMap; -import io.trino.Session; -import io.trino.plugin.deltalake.metastore.TestingDeltaLakeMetastoreModule; -import io.trino.plugin.hive.NodeVersion; -import io.trino.plugin.hive.TableAlreadyExistsException; -import io.trino.plugin.hive.metastore.HiveMetastore; -import io.trino.plugin.hive.metastore.HiveMetastoreConfig; -import io.trino.plugin.hive.metastore.PrincipalPrivileges; -import io.trino.plugin.hive.metastore.Table; -import io.trino.plugin.hive.metastore.file.FileHiveMetastore; -import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.QueryRunner; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.Map; -import java.util.Optional; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static com.google.common.base.Verify.verify; -import static com.google.common.io.MoreFiles.deleteRecursively; -import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; -import static com.google.inject.util.Modules.EMPTY_MODULE; -import static io.trino.plugin.deltalake.DeltaLakeConnectorFactory.CONNECTOR_NAME; -import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; -import static io.trino.testing.TestingSession.testSessionBuilder; -import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; - -@TestInstance(PER_CLASS) -public class TestDeltaLakeCreateTableInternalRetry - extends AbstractTestQueryFramework -{ - private static final String CATALOG_NAME = "delta_lake"; - private static final String SCHEMA_NAME = "test_create_table"; - - private String dataDirectory; - - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - Session session = testSessionBuilder() - .setCatalog(CATALOG_NAME) - .setSchema(SCHEMA_NAME) - .build(); - DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).build(); - - dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("delta_lake_data").toString(); - HiveMetastore metastore = new FileHiveMetastore( - new NodeVersion("testversion"), - HDFS_FILE_SYSTEM_FACTORY, - new HiveMetastoreConfig().isHideDeltaLakeTables(), - new FileHiveMetastoreConfig() - .setCatalogDirectory(dataDirectory) - .setMetastoreUser("test")) - { - @Override - public synchronized void createTable(Table table, PrincipalPrivileges principalPrivileges) - { - if (table.getTableName().startsWith("test_different_session")) { - // By modifying query id test simulates that table was created from different session. - table = Table.builder(table) - .setParameters(ImmutableMap.of(TRINO_QUERY_ID_NAME, "new_query_id")) - .build(); - } - // Simulate retry mechanism with timeout failure of ThriftHiveMetastore. - // 1. createTable correctly create table but timeout is triggered - // 2. Retry to createTable throws TableAlreadyExistsException - super.createTable(table, principalPrivileges); - throw new TableAlreadyExistsException(table.getSchemaTableName()); - } - }; - queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), Optional.empty(), EMPTY_MODULE)); - queryRunner.createCatalog(CATALOG_NAME, CONNECTOR_NAME, Map.of("delta.register-table-procedure.enabled", "true")); - queryRunner.execute("CREATE SCHEMA " + SCHEMA_NAME); - return queryRunner; - } - - @AfterAll - public void tearDown() - throws IOException - { - if (dataDirectory != null) { - deleteRecursively(Path.of(dataDirectory), ALLOW_INSECURE); - } - } - - @Test - public void testCreateTableInternalRetry() - { - assertQuerySucceeds("CREATE TABLE test_ct_internal_retry(a int)"); - assertQuery("SHOW TABLES LIKE 'test_ct_internal_retry'", "VALUES 'test_ct_internal_retry'"); - } - - @Test - public void testCreateTableAsSelectInternalRetry() - { - assertQuerySucceeds("CREATE TABLE test_ctas_internal_retry AS SELECT 1 a"); - assertQuery("SHOW TABLES LIKE 'test_ctas_internal_retry'", "VALUES 'test_ctas_internal_retry'"); - } - - @Test - public void testRegisterTableInternalRetry() - { - assertQuerySucceeds("CREATE TABLE test_register_table_internal_retry AS SELECT 1 a"); - String tableLocation = getTableLocation("test_register_table_internal_retry"); - assertUpdate("CALL system.unregister_table(current_schema, 'test_register_table_internal_retry')"); - - assertQuerySucceeds("CALL system.register_table(current_schema, 'test_register_table_internal_retry', '" + tableLocation + "')"); - assertQuery("SHOW TABLES LIKE 'test_register_table_internal_retry'", "VALUES 'test_register_table_internal_retry'"); - } - - @Test - public void testCreateTableFailureWithDifferentSession() - { - assertQueryFails("CREATE TABLE test_different_session_ct(a int)", "Table already exists: .*"); - assertQuery("SHOW TABLES LIKE 'test_different_session_ct'", "VALUES 'test_different_session_ct'"); - } - - @Test - public void testCreateTableAsSelectFailureWithDifferentSession() - { - assertQueryFails("CREATE TABLE test_different_session_ctas_failure AS SELECT 1 a", "Failed to write Delta Lake transaction log entry"); - assertQuery("SHOW TABLES LIKE 'test_different_session_ctas_failure'", "VALUES 'test_different_session_ctas_failure'"); - } - - @Test - public void testRegisterTableFailureWithDifferentSession() - { - assertQuerySucceeds("CREATE TABLE test_register_table_failure AS SELECT 1 a"); - String tableLocation = getTableLocation("test_register_table_failure"); - assertUpdate("CALL system.unregister_table(current_schema, 'test_register_table_failure')"); - - assertQueryFails( - "CALL system.register_table(current_schema, 'test_different_session_register_table_failure', '" + tableLocation + "')", - "Table already exists: .*"); - assertQuery("SHOW TABLES LIKE 'test_different_session_register_table_failure'", "VALUES 'test_different_session_register_table_failure'"); - } - - private String getTableLocation(String tableName) - { - Pattern locationPattern = Pattern.compile(".*location = '(.*?)'.*", Pattern.DOTALL); - Matcher m = locationPattern.matcher((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()); - if (m.find()) { - String location = m.group(1); - verify(!m.find(), "Unexpected second match"); - return location; - } - throw new IllegalStateException("Location not found in SHOW CREATE TABLE result"); - } -} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java index 833229fbcb28..cabc1de51b45 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java @@ -42,7 +42,6 @@ import io.trino.plugin.hive.PartitionNotFoundException; import io.trino.plugin.hive.PartitionStatistics; import io.trino.plugin.hive.PartitionUpdateAndMergeResults; -import io.trino.plugin.hive.SchemaAlreadyExistsException; import io.trino.plugin.hive.TableAlreadyExistsException; import io.trino.plugin.hive.TableInvalidationCallback; import io.trino.plugin.hive.acid.AcidOperation; @@ -464,26 +463,7 @@ public synchronized void createDatabase(ConnectorSession session, Database datab "Database '%s' does not have correct query id set", database.getDatabaseName()); - setExclusive(delegate -> { - try { - delegate.createDatabase(database); - } - catch (SchemaAlreadyExistsException e) { - // Ignore SchemaAlreadyExistsException when database looks like created by us. - // This may happen when an actually successful metastore create call is retried - // e.g. because of a timeout on our side. - Optional existingDatabase = delegate.getDatabase(database.getDatabaseName()); - if (existingDatabase.isEmpty() || !isCreatedBy(existingDatabase.get(), queryId)) { - throw e; - } - } - }); - } - - private static boolean isCreatedBy(Database database, String queryId) - { - Optional databaseQueryId = getQueryId(database); - return databaseQueryId.isPresent() && databaseQueryId.get().equals(queryId); + setExclusive(delegate -> delegate.createDatabase(database)); } public synchronized void dropDatabase(ConnectorSession session, String schemaName) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java index 994b7174b268..95d195032f72 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/file/FileHiveMetastore.java @@ -96,6 +96,7 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED; import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; +import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues; import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE; import static io.trino.plugin.hive.TableType.MANAGED_TABLE; @@ -194,7 +195,18 @@ public synchronized void createDatabase(Database database) database.getParameters()); verifyDatabaseNameLength(database.getDatabaseName()); - verifyDatabaseNotExists(database.getDatabaseName()); + + Optional existingDatabase = getDatabase(database.getDatabaseName()); + if (existingDatabase.isPresent()) { + // Do not throw SchemaAlreadyExistsException if this query has already created the database. + // This may happen when an actually successful metastore create call is retried, + // because of a timeout on our side. + String expectedQueryId = database.getParameters().get(TRINO_QUERY_ID_NAME); + if (expectedQueryId != null && expectedQueryId.equals(existingDatabase.get().getParameters().get(TRINO_QUERY_ID_NAME))) { + return; + } + throw new SchemaAlreadyExistsException(database.getDatabaseName()); + } Location databaseMetadataDirectory = getDatabaseMetadataDirectory(database.getDatabaseName()); writeSchemaFile(DATABASE, databaseMetadataDirectory, databaseCodec, new DatabaseMetadata(currentVersion, database), false); @@ -340,7 +352,18 @@ public synchronized void createTable(Table table, PrincipalPrivileges principalP { verifyTableNameLength(table.getTableName()); verifyDatabaseExists(table.getDatabaseName()); - verifyTableNotExists(table.getDatabaseName(), table.getTableName()); + + Optional
existingTable = getTable(table.getDatabaseName(), table.getTableName()); + if (existingTable.isPresent()) { + // Do not throw TableAlreadyExistsException if this query has already created the table. + // This may happen when an actually successful metastore create call is retried, + // because of a timeout on our side. + String expectedQueryId = table.getParameters().get(TRINO_QUERY_ID_NAME); + if (expectedQueryId != null && expectedQueryId.equals(existingTable.get().getParameters().get(TRINO_QUERY_ID_NAME))) { + return; + } + throw new TableAlreadyExistsException(new SchemaTableName(table.getDatabaseName(), table.getTableName())); + } Location tableMetadataDirectory = getTableMetadataDirectory(table); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java index 141b33b30d3b..e1efe5654fcd 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java @@ -146,6 +146,7 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; +import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; import static io.trino.plugin.hive.TableType.MANAGED_TABLE; import static io.trino.plugin.hive.TableType.VIRTUAL_VIEW; import static io.trino.plugin.hive.metastore.MetastoreUtil.makePartitionName; @@ -518,6 +519,19 @@ public void createDatabase(Database database) glueClient.createDatabase(new CreateDatabaseRequest().withDatabaseInput(databaseInput))); } catch (AlreadyExistsException e) { + // Do not throw SchemaAlreadyExistsException if this query has already created the database. + // This may happen when an actually successful metastore create call is retried, + // because of a timeout on our side. + String expectedQueryId = database.getParameters().get(TRINO_QUERY_ID_NAME); + if (expectedQueryId != null) { + String existingQueryId = getDatabase(database.getDatabaseName()) + .map(Database::getParameters) + .map(parameters -> parameters.get(TRINO_QUERY_ID_NAME)) + .orElse(null); + if (expectedQueryId.equals(existingQueryId)) { + return; + } + } throw new SchemaAlreadyExistsException(database.getDatabaseName(), e); } catch (AmazonServiceException e) { @@ -594,6 +608,19 @@ public void createTable(Table table, PrincipalPrivileges principalPrivileges) .withTableInput(input))); } catch (AlreadyExistsException e) { + // Do not throw TableAlreadyExistsException if this query has already created the table. + // This may happen when an actually successful metastore create call is retried, + // because of a timeout on our side. + String expectedQueryId = table.getParameters().get(TRINO_QUERY_ID_NAME); + if (expectedQueryId != null) { + String existingQueryId = getTable(table.getDatabaseName(), table.getTableName()) + .map(Table::getParameters) + .map(parameters -> parameters.get(TRINO_QUERY_ID_NAME)) + .orElse(null); + if (expectedQueryId.equals(existingQueryId)) { + return; + } + } throw new TableAlreadyExistsException(new SchemaTableName(table.getDatabaseName(), table.getTableName()), e); } catch (EntityNotFoundException e) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java index bf015ef3af10..5cd62ce4babf 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/BridgingHiveMetastore.java @@ -20,6 +20,8 @@ import io.trino.plugin.hive.HivePartition; import io.trino.plugin.hive.HiveType; import io.trino.plugin.hive.PartitionStatistics; +import io.trino.plugin.hive.SchemaAlreadyExistsException; +import io.trino.plugin.hive.TableAlreadyExistsException; import io.trino.plugin.hive.acid.AcidOperation; import io.trino.plugin.hive.acid.AcidTransaction; import io.trino.plugin.hive.metastore.AcidTransactionOwner; @@ -54,6 +56,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; +import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; import static io.trino.plugin.hive.metastore.MetastoreUtil.isAvroTableWithSchemaSet; import static io.trino.plugin.hive.metastore.MetastoreUtil.verifyCanDropColumn; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.csvSchemaFields; @@ -194,7 +197,24 @@ public Optional> getAllViews() @Override public void createDatabase(Database database) { - delegate.createDatabase(toMetastoreApiDatabase(database)); + try { + delegate.createDatabase(toMetastoreApiDatabase(database)); + } + catch (SchemaAlreadyExistsException e) { + // Ignore SchemaAlreadyExistsException when this query has already created the database. + // This may happen when an actually successful metastore create call is retried, + // because of a timeout on our side. + String expectedQueryId = database.getParameters().get(TRINO_QUERY_ID_NAME); + if (expectedQueryId != null) { + String existingQueryId = getDatabase(database.getDatabaseName()) + .map(Database::getParameters) + .map(parameters -> parameters.get(TRINO_QUERY_ID_NAME)) + .orElse(null); + if (!expectedQueryId.equals(existingQueryId)) { + throw e; + } + } + } } @Override @@ -235,7 +255,24 @@ public void setDatabaseOwner(String databaseName, HivePrincipal principal) @Override public void createTable(Table table, PrincipalPrivileges principalPrivileges) { - delegate.createTable(toMetastoreApiTable(table, principalPrivileges)); + try { + delegate.createTable(toMetastoreApiTable(table, principalPrivileges)); + } + catch (TableAlreadyExistsException e) { + // Ignore TableAlreadyExistsException when this query has already created the table. + // This may happen when an actually successful metastore create call is retried, + // because of a timeout on our side. + String expectedQueryId = table.getParameters().get(TRINO_QUERY_ID_NAME); + if (expectedQueryId != null) { + String existingQueryId = getTable(table.getDatabaseName(), table.getTableName()) + .map(Table::getParameters) + .map(parameters -> parameters.get(TRINO_QUERY_ID_NAME)) + .orElse(null); + if (!expectedQueryId.equals(existingQueryId)) { + throw e; + } + } + } } @Override diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveCreateSchemaInternalRetry.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveCreateSchemaInternalRetry.java deleted file mode 100644 index 2aca3f200752..000000000000 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveCreateSchemaInternalRetry.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.hive; - -import com.google.common.collect.ImmutableMap; -import io.trino.plugin.hive.metastore.Database; -import io.trino.plugin.hive.metastore.HiveMetastore; -import io.trino.plugin.hive.metastore.HiveMetastoreConfig; -import io.trino.plugin.hive.metastore.file.FileHiveMetastore; -import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.QueryRunner; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; - -import java.io.IOException; - -import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; -import static io.trino.testing.TestingNames.randomNameSuffix; -import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; - -@TestInstance(PER_CLASS) -public class TestHiveCreateSchemaInternalRetry - extends AbstractTestQueryFramework -{ - private static final String TEST_SCHEMA_TIMEOUT = "test_hive_schema_" + randomNameSuffix(); - private static final String TEST_SCHEMA_DIFFERENT_SESSION = "test_hive_schema_" + randomNameSuffix(); - - private HiveMetastore metastore; - - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - return HiveQueryRunner.builder() - .setCreateTpchSchemas(false) - .setMetastore(distributedQueryRunner -> metastore = createMetastore(distributedQueryRunner.getCoordinator().getBaseDataDir().resolve("hive_data").toString())) - .build(); - } - - private FileHiveMetastore createMetastore(String dataDirectory) - { - return new FileHiveMetastore( - new NodeVersion("testversion"), - HDFS_FILE_SYSTEM_FACTORY, - new HiveMetastoreConfig().isHideDeltaLakeTables(), - new FileHiveMetastoreConfig() - .setCatalogDirectory(dataDirectory) - .setMetastoreUser("test")) - { - @Override - public synchronized void createDatabase(Database database) - { - if (database.getDatabaseName().equals(TEST_SCHEMA_DIFFERENT_SESSION)) { - // By modifying query id test simulates that schema was created from different session. - database = Database.builder(database) - .setParameters(ImmutableMap.of(TRINO_QUERY_ID_NAME, "new_query_id")) - .build(); - } - // Simulate retry mechanism with timeout failure. - // 1. createDatabase correctly create schema but timeout is triggered - // 2. Retry to createDatabase throws SchemaAlreadyExistsException - super.createDatabase(database); - throw new SchemaAlreadyExistsException(database.getDatabaseName()); - } - }; - } - - @AfterAll - public void tearDown() - throws IOException - { - if (metastore != null) { - metastore.dropDatabase(TEST_SCHEMA_TIMEOUT, false); - metastore.dropDatabase(TEST_SCHEMA_DIFFERENT_SESSION, false); - } - } - - @Test - public void testSchemaCreationWithTimeout() - { - assertQuerySucceeds("CREATE SCHEMA " + TEST_SCHEMA_TIMEOUT); - assertQuery("SHOW SCHEMAS LIKE '" + TEST_SCHEMA_TIMEOUT + "'", "VALUES ('" + TEST_SCHEMA_TIMEOUT + "')"); - } - - @Test - public void testSchemaCreationFailsWhenCreatedWithDifferentSession() - { - assertQueryFails("CREATE SCHEMA " + TEST_SCHEMA_DIFFERENT_SESSION, "Schema already exists: '" + TEST_SCHEMA_DIFFERENT_SESSION + "'"); - assertQuery("SHOW SCHEMAS LIKE '" + TEST_SCHEMA_DIFFERENT_SESSION + "'", "VALUES ('" + TEST_SCHEMA_DIFFERENT_SESSION + "')"); - } -} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/AbstractTestHiveMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/AbstractTestHiveMetastore.java new file mode 100644 index 000000000000..82460db7532c --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/AbstractTestHiveMetastore.java @@ -0,0 +1,100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore; + +import io.trino.plugin.hive.SchemaAlreadyExistsException; +import io.trino.plugin.hive.TableAlreadyExistsException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Execution; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; + +import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; +import static io.trino.plugin.hive.HiveStorageFormat.PARQUET; +import static io.trino.plugin.hive.HiveType.HIVE_STRING; +import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE; +import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES; +import static io.trino.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; +import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; + +@TestInstance(PER_CLASS) +@Execution(SAME_THREAD) +abstract class AbstractTestHiveMetastore +{ + private HiveMetastore metastore; + + public void setMetastore(HiveMetastore metastore) + { + this.metastore = metastore; + } + + @Test + void testCreateDatabase() + { + String databaseName = "test_database_" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + Database.Builder database = Database.builder() + .setDatabaseName(databaseName) + .setParameters(Map.of(TRINO_QUERY_ID_NAME, "query_id")) + .setOwnerName(Optional.empty()) + .setOwnerType(Optional.empty()); + metastore.createDatabase(database.build()); + // second call with the same query ID succeeds + metastore.createDatabase(database.build()); + + database.setParameters(Map.of(TRINO_QUERY_ID_NAME, "another_query_id")); + assertThatThrownBy(() -> metastore.createDatabase(database.build())) + .isInstanceOf(SchemaAlreadyExistsException.class); + + metastore.dropDatabase(databaseName, false); + } + + @Test + void testCreateTable() + { + String databaseName = "test_database_" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + Database.Builder database = Database.builder() + .setDatabaseName(databaseName) + .setOwnerName(Optional.empty()) + .setOwnerType(Optional.empty()); + metastore.createDatabase(database.build()); + + String tableName = "test_table"; + Table.Builder table = Table.builder() + .setDatabaseName(databaseName) + .setTableName(tableName) + .setParameters(Map.of(TRINO_QUERY_ID_NAME, "query_id")) + .setTableType(EXTERNAL_TABLE.name()) + .setDataColumns(List.of(new Column("test_column", HIVE_STRING, Optional.empty(), Map.of()))) + .setOwner(Optional.empty()); + table.getStorageBuilder() + .setLocation(Optional.of("/tmp/location")) + .setStorageFormat(fromHiveStorageFormat(PARQUET)); + metastore.createTable(table.build(), NO_PRIVILEGES); + // second call with the same query ID succeeds + metastore.createTable(table.build(), NO_PRIVILEGES); + + table.setParameters(Map.of(TRINO_QUERY_ID_NAME, "another_query_id")); + assertThatThrownBy(() -> metastore.createTable(table.build(), NO_PRIVILEGES)) + .isInstanceOf(TableAlreadyExistsException.class); + + metastore.dropTable(databaseName, tableName, false); + metastore.dropDatabase(databaseName, false); + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestBridgingHiveMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestBridgingHiveMetastore.java new file mode 100644 index 000000000000..f8242a403cfb --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestBridgingHiveMetastore.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore; + +import io.trino.plugin.hive.containers.HiveHadoop; +import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore; +import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreConfig; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Execution; + +import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; +import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; + +@TestInstance(PER_CLASS) +@Execution(SAME_THREAD) +final class TestBridgingHiveMetastore + extends AbstractTestHiveMetastore +{ + private final HiveHadoop hiveHadoop; + + TestBridgingHiveMetastore() + { + hiveHadoop = HiveHadoop.builder().build(); + hiveHadoop.start(); + + setMetastore(new BridgingHiveMetastore(testingThriftHiveMetastoreBuilder() + .metastoreClient(hiveHadoop.getHiveMetastoreEndpoint()) + .thriftMetastoreConfig(new ThriftMetastoreConfig().setDeleteFilesOnDrop(true)) + .build())); + } + + @AfterAll + void afterAll() + { + hiveHadoop.stop(); + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestFileHiveMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestFileHiveMetastore.java new file mode 100644 index 000000000000..61e389e84af3 --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestFileHiveMetastore.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore; + +import io.trino.filesystem.local.LocalFileSystemFactory; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.hive.metastore.file.FileHiveMetastore; +import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; +import org.junit.jupiter.api.AfterAll; + +import java.io.IOException; +import java.nio.file.Path; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static java.nio.file.Files.createTempDirectory; + +final class TestFileHiveMetastore + extends AbstractTestHiveMetastore +{ + private final Path tempDir; + + TestFileHiveMetastore() + throws IOException + { + tempDir = createTempDirectory("test"); + tempDir.toFile().mkdirs(); + LocalFileSystemFactory fileSystemFactory = new LocalFileSystemFactory(tempDir); + + setMetastore(new FileHiveMetastore( + new NodeVersion("testversion"), + fileSystemFactory, + false, + new FileHiveMetastoreConfig() + .setCatalogDirectory("local:///") + .setMetastoreUser("test") + .setDisableLocationChecks(true))); + } + + @AfterAll + void tearDown() + throws IOException + { + deleteRecursively(tempDir, ALLOW_INSECURE); + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestGlueHiveMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestGlueHiveMetastore.java new file mode 100644 index 000000000000..c2ebc9590a3b --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/TestGlueHiveMetastore.java @@ -0,0 +1,45 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore; + +import org.junit.jupiter.api.AfterAll; + +import java.io.IOException; +import java.nio.file.Path; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.hive.metastore.glue.TestingGlueHiveMetastore.createTestingGlueHiveMetastore; +import static java.nio.file.Files.createTempDirectory; + +final class TestGlueHiveMetastore + extends AbstractTestHiveMetastore +{ + private final Path tempDir; + + TestGlueHiveMetastore() + throws IOException + { + tempDir = createTempDirectory("test"); + tempDir.toFile().mkdirs(); + setMetastore(createTestingGlueHiveMetastore(tempDir)); + } + + @AfterAll + void tearDown() + throws IOException + { + deleteRecursively(tempDir, ALLOW_INSECURE); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java index c9458be31721..0fc67d7d0c56 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java @@ -40,7 +40,6 @@ import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage; import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom; -import static io.trino.plugin.iceberg.IcebergUtil.TRINO_QUERY_ID_NAME; import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; @@ -125,32 +124,11 @@ protected final void commitNewTable(TableMetadata metadata) try { metastore.createTable(table, privileges); } - catch (SchemaNotFoundException e) { + catch (SchemaNotFoundException | TableAlreadyExistsException e) { // clean up metadata files corresponding to the current transaction fileIo.deleteFile(newMetadataLocation); throw e; } - catch (TableAlreadyExistsException e) { - // Ignore TableAlreadyExistsException when table looks like created by us. - // This may happen when an actually successful metastore create call is retried - // e.g. because of a timeout on our side. - refreshFromMetadataLocation(getRefreshedLocation(true)); - if (!isCreatedBy(this.currentMetadata, session.getQueryId())) { - fileIo.deleteFile(newMetadataLocation); - throw e; - } - } - } - - public static boolean isCreatedBy(TableMetadata existingTableMetadata, String queryId) - { - Optional tableQueryId = getQueryId(existingTableMetadata); - return tableQueryId.isPresent() && tableQueryId.get().equals(queryId); - } - - private static Optional getQueryId(TableMetadata tableMetadata) - { - return Optional.ofNullable(tableMetadata.currentSnapshot().summary().get(TRINO_QUERY_ID_NAME)); } protected Table.Builder updateMetastoreTable(Table.Builder builder, TableMetadata metadata, String metadataLocation, Optional previousMetadataLocation) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index 6fdf7cff121b..702fd0ccd699 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -25,7 +25,6 @@ import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HiveSchemaProperties; -import io.trino.plugin.hive.TableAlreadyExistsException; import io.trino.plugin.hive.TrinoViewHiveMetastore; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Database; @@ -346,29 +345,7 @@ public void registerTable(ConnectorSession session, SchemaTableName schemaTableN .setParameter(METADATA_LOCATION_PROP, tableMetadata.metadataFileLocation()); PrincipalPrivileges privileges = owner.map(MetastoreUtil::buildInitialPrivilegeSet).orElse(NO_PRIVILEGES); - try { - metastore.createTable(builder.build(), privileges); - } - catch (TableAlreadyExistsException e) { - // Ignore TableAlreadyExistsException when table looks like created by us. - // This may happen when an actually successful metastore create call is retried - // e.g. because of a timeout on our side. - Optional existingTable = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName()); - if (existingTable.isEmpty() || !isCreatedBy(existingTable.get(), session.getQueryId())) { - throw e; - } - } - } - - public static boolean isCreatedBy(io.trino.plugin.hive.metastore.Table table, String queryId) - { - Optional tableQueryId = getQueryId(table); - return tableQueryId.isPresent() && tableQueryId.get().equals(queryId); - } - - private static Optional getQueryId(io.trino.plugin.hive.metastore.Table table) - { - return Optional.ofNullable(table.getParameters().get(TRINO_QUERY_ID_NAME)); + metastore.createTable(builder.build(), privileges); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCreateTableInternalRetry.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCreateTableInternalRetry.java deleted file mode 100644 index 94f3535b3ac4..000000000000 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergCreateTableInternalRetry.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg; - -import com.google.common.collect.ImmutableMap; -import io.trino.Session; -import io.trino.plugin.hive.NodeVersion; -import io.trino.plugin.hive.TableAlreadyExistsException; -import io.trino.plugin.hive.metastore.HiveMetastore; -import io.trino.plugin.hive.metastore.HiveMetastoreConfig; -import io.trino.plugin.hive.metastore.PrincipalPrivileges; -import io.trino.plugin.hive.metastore.Table; -import io.trino.plugin.hive.metastore.file.FileHiveMetastore; -import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; -import io.trino.plugin.iceberg.catalog.file.TestingIcebergFileMetastoreCatalogModule; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.QueryRunner; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; - -import java.io.File; -import java.io.IOException; -import java.util.Optional; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static com.google.common.base.Verify.verify; -import static com.google.common.io.MoreFiles.deleteRecursively; -import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; -import static com.google.inject.util.Modules.EMPTY_MODULE; -import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; -import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; -import static io.trino.testing.TestingSession.testSessionBuilder; -import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; - -@TestInstance(PER_CLASS) -public class TestIcebergCreateTableInternalRetry - extends AbstractTestQueryFramework -{ - private static final String SCHEMA_NAME = "iceberg_internal_retry_schema"; - private File metastoreDir; - - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - Session session = testSessionBuilder() - .setCatalog(ICEBERG_CATALOG) - .setSchema(SCHEMA_NAME) - .build(); - DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).build(); - metastoreDir = queryRunner.getCoordinator().getBaseDataDir().resolve("test_iceberg_table_smoke_test").toFile(); - this.metastoreDir.deleteOnExit(); - HiveMetastore metastore = new FileHiveMetastore( - new NodeVersion("testversion"), - HDFS_FILE_SYSTEM_FACTORY, - new HiveMetastoreConfig().isHideDeltaLakeTables(), - new FileHiveMetastoreConfig() - .setCatalogDirectory(metastoreDir.toURI().toString()) - .setMetastoreUser("test")) - { - @Override - public synchronized void createTable(Table table, PrincipalPrivileges principalPrivileges) - { - if (table.getTableName().startsWith("test_different_session")) { - // By modifying query id test simulates that table was created from different session. - table = Table.builder(table) - .setParameters(ImmutableMap.of(TRINO_QUERY_ID_NAME, "new_query_id")) - .build(); - } - // Simulate retry mechanism with timeout failure of ThriftHiveMetastore. - // 1. createTable correctly create table but timeout is triggered - // 2. Retry to createTable throws TableAlreadyExistsException - super.createTable(table, principalPrivileges); - throw new TableAlreadyExistsException(table.getSchemaTableName()); - } - }; - - queryRunner.installPlugin(new TestingIcebergPlugin(Optional.of(new TestingIcebergFileMetastoreCatalogModule(metastore)), Optional.empty(), EMPTY_MODULE)); - queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", ImmutableMap.of("iceberg.register-table-procedure.enabled", "true")); - queryRunner.execute("CREATE SCHEMA " + SCHEMA_NAME); - return queryRunner; - } - - @AfterAll - public void tearDown() - throws IOException - { - deleteRecursively(metastoreDir.toPath(), ALLOW_INSECURE); - } - - @Test - public void testCreateTableInternalRetry() - { - assertQuerySucceeds("CREATE TABLE test_ct_internal_retry(a int)"); - assertQuery("SHOW TABLES LIKE 'test_ct_internal_retry'", "VALUES 'test_ct_internal_retry'"); - } - - @Test - public void testCreateTableAsSelectInternalRetry() - { - assertQuerySucceeds("CREATE TABLE test_ctas_internal_retry AS SELECT 1 a"); - assertQuery("SHOW TABLES LIKE 'test_ctas_internal_retry'", "VALUES 'test_ctas_internal_retry'"); - } - - @Test - public void testRegisterTableInternalRetry() - { - assertQuerySucceeds("CREATE TABLE test_register_table_internal_retry AS SELECT 1 a"); - String tableLocation = getTableLocation("test_register_table_internal_retry"); - assertUpdate("CALL system.unregister_table(current_schema, 'test_register_table_internal_retry')"); - - assertQuerySucceeds("CALL system.register_table(current_schema, 'test_register_table_internal_retry', '" + tableLocation + "')"); - assertQuery("SHOW TABLES LIKE 'test_register_table_internal_retry'", "VALUES 'test_register_table_internal_retry'"); - } - - @Test - public void testRegisterTableFailureWithDifferentSession() - { - assertQuerySucceeds("CREATE TABLE test_register_table_failure AS SELECT 1 a"); - String tableLocation = getTableLocation("test_register_table_failure"); - assertUpdate("CALL system.unregister_table(current_schema, 'test_register_table_failure')"); - - assertQueryFails( - "CALL system.register_table(current_schema, 'test_different_session_register_table_failure', '" + tableLocation + "')", - "Table already exists: .*"); - assertQuery("SHOW TABLES LIKE 'test_different_session_register_table_failure'", "VALUES 'test_different_session_register_table_failure'"); - } - - private String getTableLocation(String tableName) - { - Pattern locationPattern = Pattern.compile(".*location = '(.*?)'.*", Pattern.DOTALL); - Matcher m = locationPattern.matcher((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()); - if (m.find()) { - String location = m.group(1); - verify(!m.find(), "Unexpected second match"); - return location; - } - throw new IllegalStateException("Location not found in SHOW CREATE TABLE result"); - } -}