From 2e1de3b151e57d01658f7001a700db5fe5979175 Mon Sep 17 00:00:00 2001 From: Jack Klamer Date: Wed, 27 Dec 2023 11:33:33 -0600 Subject: [PATCH] Allow union data to conform to smaller union HIVE/AVRO: It is possible for data that is written using a 3 element union to be read with a 2 element union provided that either all data types can be coerced (already possible) or the offending data type(s) isn't present. This change delays all type errors to read time to allow more type leniency. --- .../hive/formats/avro/AvroPageDataReader.java | 20 ++++++++++- ...tAvroPageDataReaderWithoutTypeManager.java | 35 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroPageDataReader.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroPageDataReader.java index 2d3f1bfb4acc..365f421ed7cc 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroPageDataReader.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/avro/AvroPageDataReader.java @@ -208,11 +208,29 @@ private static BlockBuildingDecoder createBlockBuildingDecoderForAction(Resolver yield new ReaderUnionCoercedIntoRowBlockBuildingDecoder((Resolver.ReaderUnion) action, typeManager); } } - case ERROR -> throw new AvroTypeException("Resolution action returned with error " + action); + case ERROR -> new TypeErrorThrower((Resolver.ErrorAction) action); case SKIP -> throw new IllegalStateException("Skips filtered by row step"); }; } + private static class TypeErrorThrower + extends BlockBuildingDecoder + { + private final Resolver.ErrorAction action; + + public TypeErrorThrower(Resolver.ErrorAction action) + { + this.action = requireNonNull(action, "action is null"); + } + + @Override + protected void decodeIntoBlock(Decoder decoder, BlockBuilder builder) + throws IOException + { + throw new IOException(new AvroTypeException("Resolution action returned with error " + action)); + } + } + // Different plugins may have different Avro Schema to Type mappings // that are currently transforming GenericDatumReader returned objects into their target type during the record reading process // This block building decoder allows plugin writers to port that code directly and use within this reader diff --git a/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/avro/TestAvroPageDataReaderWithoutTypeManager.java b/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/avro/TestAvroPageDataReaderWithoutTypeManager.java index bcc88c9068b9..2234c0a3bed4 100644 --- a/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/avro/TestAvroPageDataReaderWithoutTypeManager.java +++ b/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/avro/TestAvroPageDataReaderWithoutTypeManager.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import io.airlift.slice.Slice; import io.trino.filesystem.TrinoInputFile; import io.trino.spi.Page; @@ -335,4 +336,38 @@ public void testCoercionOfUnionToStruct() assertThat(totalRecords).isEqualTo(3); } } + + @Test + public void testRead3UnionWith2UnionDataWith2Union() + throws IOException, AvroTypeException + { + Schema twoUnion = Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.INT)); + Schema threeUnion = Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.INT), Schema.create(Schema.Type.STRING)); + + Schema twoUnionRecord = SchemaBuilder.builder() + .record("aRecord") + .fields() + .name("aField") + .type(twoUnion) + .noDefault() + .endRecord(); + + Schema threeUnionRecord = SchemaBuilder.builder() + .record("aRecord") + .fields() + .name("aField") + .type(threeUnion) + .noDefault() + .endRecord(); + + // write a file with the 3 union schema, using 2 union data + TrinoInputFile inputFile = createWrittenFileWithData(threeUnionRecord, ImmutableList.copyOf(Iterables.transform(new RandomData(twoUnionRecord, 1000), object -> (GenericRecord) object))); + + //read the file with the 2 union schema and ensure that no error thrown + try (AvroFileReader avroFileReader = new AvroFileReader(inputFile, twoUnionRecord, NoOpAvroTypeManager.INSTANCE)) { + while (avroFileReader.hasNext()) { + assertThat(avroFileReader.next()).isNotNull(); + } + } + } }