Skip to content

Commit

Permalink
Support complex type in migrate procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjian2664 committed Oct 19, 2023
1 parent cdb4f8f commit 175c109
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.AppendFiles;
Expand Down Expand Up @@ -303,17 +300,15 @@ private Schema toIcebergSchema(List<Column> columns)
Types.NestedField field = Types.NestedField.of(index, false, column.getName(), type, column.getComment().orElse(null));
icebergColumns.add(field);
}

org.apache.iceberg.types.Type icebergSchema = Types.StructType.of(icebergColumns);
icebergSchema = TypeUtil.assignFreshIds(icebergSchema, nextFieldId::getAndIncrement);
// Assign column id start from 1
icebergSchema = TypeUtil.assignFreshIds(icebergSchema, new AtomicInteger(1)::getAndIncrement);
return new Schema(icebergSchema.asStructType().fields());
}

private static org.apache.iceberg.types.Type toIcebergType(Type type, AtomicInteger nextFieldId)
{
if (type instanceof ArrayType || type instanceof MapType || type instanceof RowType) {
// TODO https://github.com/trinodb/trino/issues/17583 Add support for these complex types
throw new TrinoException(NOT_SUPPORTED, "Migrating %s type is not supported".formatted(type));
}
if (type.equals(TINYINT) || type.equals(SMALLINT)) {
return Types.IntegerType.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -101,6 +102,42 @@ public void testMigrateTableWithTinyintType(IcebergFileFormat fileFormat)

assertUpdate("INSERT INTO " + icebergTableName + " VALUES -2147483648, 2147483647", 2);
assertQuery("SELECT * FROM " + icebergTableName, "VALUES (NULL), (-2147483648), (-128), (127), (2147483647)");
assertUpdate("DROP TABLE " + tableName);
}

@Test(dataProvider = "fileFormats")
public void testMigrateTableWithComplexType(IcebergFileFormat fileFormat)
{
String tableName = "test_migrate_complex_" + randomNameSuffix();
String hiveTableName = "hive.tpch." + tableName;
String icebergTableName = "iceberg.tpch." + tableName;

assertUpdate("CREATE TABLE " + hiveTableName + " WITH (format='" + fileFormat + "') AS SELECT 1 x, array[2, 3] a, CAST(map(array['key1'], array['value1']) AS map(varchar, varchar)) b, CAST(row(1) AS row(d integer)) c", 1);
assertQueryFails("SELECT * FROM " + icebergTableName, "Not an Iceberg table: .*");

assertUpdate("CALL iceberg.system.migrate('tpch', '" + tableName + "')");

assertThat((String) computeScalar("SHOW CREATE TABLE " + icebergTableName))
.contains("format = '%s'".formatted(fileFormat));

@Language("SQL") String firstRow = "VALUES (" +
"1, " +
"ARRAY[2, 3], " +
"CAST(map(ARRAY['key1'], ARRAY['value1']) AS map(varchar, varchar)), " +
"CAST(row(1) AS row(d integer)))";
assertThat(query("SELECT * FROM " + icebergTableName))
.matches(firstRow);
assertQuery("SELECT count(*) FROM " + icebergTableName, "VALUES 1");

@Language("SQL") String secondRow = " VALUES (" +
"2, " +
"ARRAY[4, 5], " +
"CAST(map(ARRAY['key2'], ARRAY['value2']) AS map(varchar, varchar)), " +
"CAST(row(2) AS row(d integer)))";
assertUpdate("INSERT INTO " + icebergTableName + secondRow, 1);
assertQuery("SELECT count(*) FROM " + icebergTableName, "VALUES 2");
assertThat(query("SELECT * FROM " + icebergTableName))
.matches(firstRow + " UNION ALL " + secondRow);

assertUpdate("DROP TABLE " + tableName);
}
Expand Down Expand Up @@ -132,6 +169,77 @@ public void testMigrateTableWithSmallintType(IcebergFileFormat fileFormat)
assertUpdate("DROP TABLE " + tableName);
}

@Test(dataProvider = "fileFormats")
public void testMigrateTableSchemaEvolution(IcebergFileFormat fileFormat)
throws Exception
{
String randomNameSuffix = randomNameSuffix();
String tableNameOneColumn = "test_migrate_one_column_" + randomNameSuffix;
String tableNameTwoColumns = "test_migrate_two_columns_" + randomNameSuffix;
String hiveTableNameOneColumn = "hive.tpch." + tableNameOneColumn;
String hiveTableNameTwoColumns = "hive.tpch." + tableNameTwoColumns;
String icebergTableNameTwoColumns = "iceberg.tpch." + tableNameTwoColumns;

assertUpdate("CREATE TABLE " + hiveTableNameOneColumn + " WITH (format='" + fileFormat + "') AS SELECT 1 col1", 1);
assertUpdate("CREATE TABLE " + hiveTableNameTwoColumns + " WITH (format='" + fileFormat + "') AS SELECT 2 col1, CAST(row(10, 20) AS row(x integer, y integer)) AS nested", 1);

// Copy the parquet file containing only one column to the table with two columns
Path tableNameOneColumnLocation = Path.of("%s/tpch/%s".formatted(dataDirectory, tableNameOneColumn));
Path tableNameTwoColumnsLocation = Path.of("%s/tpch/%s".formatted(dataDirectory, tableNameTwoColumns));
try (Stream<Path> files = Files.list(tableNameOneColumnLocation)) {
Path file = files.filter(path -> !path.getFileName().toString().startsWith(".")).collect(onlyElement());
Files.copy(file, tableNameTwoColumnsLocation.resolve(file.getFileName()));
}

assertUpdate("CALL iceberg.system.migrate('tpch', '" + tableNameTwoColumns + "')");

assertThat(query("SELECT * FROM " + icebergTableNameTwoColumns))
.skippingTypesCheck()
.matches("VALUES (1, CAST(null AS row(x integer, y integer))), (2, row(10, 20))");

assertUpdate("INSERT INTO " + icebergTableNameTwoColumns + " VALUES (3, row(100, 200))", 1);
assertThat(query("SELECT * FROM " + icebergTableNameTwoColumns))
.skippingTypesCheck()
.matches("VALUES (1, CAST(null AS row(x integer, y integer))), (2, row(10, 20)), (3, row(100, 200))");

assertUpdate("DROP TABLE " + icebergTableNameTwoColumns);
}

@Test(dataProvider = "fileFormats")
public void testMigrateTableRowColumnSchemaEvolution(IcebergFileFormat fileFormat)
throws Exception
{
String randomNameSuffix = randomNameSuffix();
String tableNameRowOneField = "test_migrate_row_one_field_" + randomNameSuffix;
String tableNameRowTwoFields = "test_migrate_row_two_fields_" + randomNameSuffix;
String hiveTableNameRowOneField = "hive.tpch." + tableNameRowOneField;
String hiveTableNameRowTwoFields = "hive.tpch." + tableNameRowTwoFields;
String icebergTableNameRowTwoFields = "iceberg.tpch." + tableNameRowTwoFields;

assertUpdate("CREATE TABLE " + hiveTableNameRowOneField + " WITH (format='" + fileFormat + "') AS SELECT CAST(row(1) AS row(x integer)) as nested", 1);
assertUpdate("CREATE TABLE " + hiveTableNameRowTwoFields + " WITH (format='" + fileFormat + "') AS SELECT CAST(row(10, 20) AS row(x integer, y integer)) AS nested", 1);

Path tableNameRowOneFieldLocation = Path.of("%s/tpch/%s".formatted(dataDirectory, tableNameRowOneField));
Path tableNameRowTwoFieldsLocation = Path.of("%s/tpch/%s".formatted(dataDirectory, tableNameRowTwoFields));
try (Stream<Path> files = Files.list(tableNameRowOneFieldLocation)) {
Path file = files.filter(path -> !path.getFileName().toString().startsWith(".")).collect(onlyElement());
Files.copy(file, tableNameRowTwoFieldsLocation.resolve(file.getFileName()));
}

assertUpdate("CALL iceberg.system.migrate('tpch', '" + tableNameRowTwoFields + "')");

assertThat(query("SELECT * FROM " + icebergTableNameRowTwoFields))
.skippingTypesCheck()
.matches("VALUES row(CAST((1,null) AS row(x integer, y integer))), row(row(10, 20))");

assertUpdate("INSERT INTO " + icebergTableNameRowTwoFields + " VALUES (row(row(100, 200)))", 1);
assertThat(query("SELECT * FROM " + icebergTableNameRowTwoFields))
.skippingTypesCheck()
.matches("VALUES row(CAST((1, null) AS row(x integer, y integer))), row(row(10, 20)), row(row(100, 200))");

assertUpdate("DROP TABLE " + icebergTableNameRowTwoFields);
}

@DataProvider
public static Object[][] fileFormats()
{
Expand Down Expand Up @@ -318,26 +426,6 @@ public void testMigrateUnsupportedColumnType()
assertUpdate("DROP TABLE " + hiveTableName);
}

@Test
public void testMigrateUnsupportedComplexColumnType()
{
// TODO https://github.com/trinodb/trino/issues/17583 Add support for these complex types
String tableName = "test_migrate_unsupported_complex_column_type_" + randomNameSuffix();
String hiveTableName = "hive.tpch." + tableName;

assertUpdate("CREATE TABLE " + hiveTableName + " AS SELECT array[1] x", 1);
assertQueryFails("CALL iceberg.system.migrate('tpch', '" + tableName + "')", "\\QMigrating array(integer) type is not supported");
assertUpdate("DROP TABLE " + hiveTableName);

assertUpdate("CREATE TABLE " + hiveTableName + " AS SELECT map(array['key'], array[2]) x", 1);
assertQueryFails("CALL iceberg.system.migrate('tpch', '" + tableName + "')", "\\QMigrating map(varchar(3), integer) type is not supported");
assertUpdate("DROP TABLE " + hiveTableName);

assertUpdate("CREATE TABLE " + hiveTableName + " AS SELECT CAST(row(1) AS row(y integer)) x", 1);
assertQueryFails("CALL iceberg.system.migrate('tpch', '" + tableName + "')", "\\QMigrating row(y integer) type is not supported");
assertUpdate("DROP TABLE " + hiveTableName);
}

@Test
public void testMigrateUnsupportedTableFormat()
{
Expand Down
10 changes: 10 additions & 0 deletions testing/trino-product-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@
<artifactId>java-driver-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
*/
package io.trino.tests.product.iceberg;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.tempto.ProductTest;
import io.trino.tempto.assertions.QueryAssert.Row;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.sql.JDBCType;
import java.util.List;
import java.util.function.Consumer;

Expand All @@ -32,6 +36,10 @@
import static io.trino.tests.product.utils.QueryExecutors.onSpark;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.lang.String.format;
import static java.sql.JDBCType.ARRAY;
import static java.sql.JDBCType.INTEGER;
import static java.sql.JDBCType.JAVA_OBJECT;
import static java.sql.JDBCType.STRUCT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -111,6 +119,38 @@ public void testMigrateHiveTableWithSmallintType(String fileFormat)
onTrino().executeQuery("DROP TABLE " + icebergTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "fileFormats")
public void testMigrateHiveTableWithComplexType(String fileFormat)
throws JsonProcessingException
{
String tableName = "test_migrate_complex_" + randomNameSuffix();
String hiveTableName = "hive.default." + tableName;
String icebergTableName = "iceberg.default." + tableName;
String sparkTableName = "iceberg_test.default." + tableName;
onTrino().executeQuery("DROP TABLE IF EXISTS " + hiveTableName);
onTrino().executeQuery("CREATE TABLE " + hiveTableName + " WITH (format='" + fileFormat + "') AS SELECT 1 x, array[2, 3] a, CAST(map(array['key'], array['value']) AS map(varchar, varchar)) b, CAST(row(1) AS row(d integer)) c");

onTrino().executeQuery("CALL iceberg.system.migrate('default', '" + tableName + "')");

List<Object> expectedRowValue = ImmutableList.of(1, ImmutableList.of(2, 3), ImmutableMap.of("key", "value"), ImmutableMap.of("d", 1));

List<JDBCType> trinoExpectedColumnTypes = ImmutableList.of(INTEGER, ARRAY, JAVA_OBJECT, JAVA_OBJECT);
String trinoExpectedRowValueString = expectedRowValue.toString();
assertThat(onTrino().executeQuery("SELECT * FROM " + icebergTableName))
.hasRowsCount(1)
.matches(queryResult -> queryResult.getColumnTypes().equals(trinoExpectedColumnTypes), "Complex type columns type mismatch in Trino")
.matches(queryResult -> queryResult.row(0).toString().equals(trinoExpectedRowValueString), "Complex type columns value mismatch in Trino");

List<JDBCType> sparkExpectedColumnTypes = ImmutableList.of(INTEGER, ARRAY, JAVA_OBJECT, STRUCT);
String sparkExpectedRowValueString = new ObjectMapper().writeValueAsString(expectedRowValue);
assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName))
.hasRowsCount(1)
.matches(queryResult -> queryResult.getColumnTypes().equals(sparkExpectedColumnTypes), "Complex type columns type mismatch in Spark")
.matches(queryResult -> queryResult.row(0).toString().replaceAll(" ", "").equals(sparkExpectedRowValueString), "Complex type columns value mismatch in Spark");

onTrino().executeQuery("DROP TABLE IF EXISTS " + icebergTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testMigrateHivePartitionedTable()
{
Expand Down

0 comments on commit 175c109

Please sign in to comment.