diff --git c/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java w/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java index b7b3d12254d710b1ed17058eda6bd466d4541a77..982552cb6f011413d992b1d8d3fd223a5a052091 100644 --- c/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java +++ w/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java @@ -198,11 +198,17 @@ public Object deserialize(List columnNames, List columnTypes, private Object worker(Object datum, Schema fileSchema, Schema recordSchema, TypeInfo columnType) throws AvroSerdeException { - // Klaxon! Klaxon! Klaxon! - // Avro requires NULLable types to be defined as unions of some type T - // and NULL. This is annoying and we're going to hide it from the user. + if (datum == null) + return null; + + // Avro requires nullable types to be defined as unions of some type T + // and NULL. This is annoying and we're going to hide it from the user. + if (AvroSerdeUtils.isNullableType(recordSchema)) { - return deserializeNullableUnion(datum, fileSchema, recordSchema, columnType); + recordSchema = AvroSerdeUtils.getOtherTypeFromNullableType(recordSchema); + } + if (fileSchema != null && AvroSerdeUtils.isNullableType(fileSchema)) { + fileSchema = AvroSerdeUtils.getOtherTypeFromNullableType(fileSchema); } switch(columnType.getCategory()) { @@ -300,68 +306,6 @@ private Object deserializePrimitive(Object datum, Schema fileSchema, Schema reco } } - /** - * Extract either a null or the correct type from a Nullable type. - */ - private Object deserializeNullableUnion(Object datum, Schema fileSchema, Schema recordSchema, TypeInfo columnType) - throws AvroSerdeException { - if (recordSchema.getTypes().size() == 2) { - // A type like [NULL, T] - return deserializeSingleItemNullableUnion(datum, fileSchema, recordSchema, columnType); - } else { - // Types like [NULL, T1, T2, ...] - if (datum == null) { - return null; - } else { - Schema newRecordSchema = AvroSerdeUtils.getOtherTypeFromNullableType(recordSchema); - return worker(datum, fileSchema, newRecordSchema, columnType); - } - } - } - - private Object deserializeSingleItemNullableUnion(Object datum, - Schema fileSchema, - Schema recordSchema, - TypeInfo columnType) - throws AvroSerdeException { - int tag = GenericData.get().resolveUnion(recordSchema, datum); // Determine index of value - Schema schema = recordSchema.getTypes().get(tag); - if (schema.getType().equals(Type.NULL)) { - return null; - } - - Schema currentFileSchema = null; - if (fileSchema != null) { - if (fileSchema.getType() == Type.UNION) { - // The fileSchema may have the null value in a different position, so - // we need to get the correct tag - try { - tag = GenericData.get().resolveUnion(fileSchema, datum); - currentFileSchema = fileSchema.getTypes().get(tag); - } catch (UnresolvedUnionException e) { - if (LOG.isDebugEnabled()) { - String datumClazz = null; - if (datum != null) { - datumClazz = datum.getClass().getName(); - } - String msg = "File schema union could not resolve union. fileSchema = " + fileSchema + - ", recordSchema = " + recordSchema + ", datum class = " + datumClazz + ": " + e; - LOG.debug(msg, e); - } - // This occurs when the datum type is different between - // the file and record schema. For example if datum is long - // and the field in the file schema is int. See HIVE-9462. - // in this case we will re-use the record schema as the file - // schema, Ultimately we need to clean this code up and will - // do as a follow-on to HIVE-9462. - currentFileSchema = schema; - } - } else { - currentFileSchema = fileSchema; - } - } - return worker(datum, currentFileSchema, schema, columnType); - } private Object deserializeStruct(GenericData.Record datum, Schema fileSchema, StructTypeInfo columnType) throws AvroSerdeException { diff --git c/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java w/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index 1746a0f3a3c7e8dd2aa150333b630362ffd20a98..3955611733c9eb511dc343aaf2891f2ab0285239 100644 --- c/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ w/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -136,6 +136,11 @@ public void initialize(Configuration configuration, Properties properties) throw this.columnNames = StringInternUtils.internStringsInList(aoig.getColumnNames()); this.columnTypes = aoig.getColumnTypes(); this.oi = aoig.getObjectInspector(); + + if(!badSchema) { + this.avroSerializer = new AvroSerializer(); + this.avroDeserializer = new AvroDeserializer(); + } } private boolean hasExternalSchema(Properties properties) { @@ -214,7 +219,7 @@ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerD if(badSchema) { throw new BadSchemaException(); } - return getSerializer().serialize(o, objectInspector, columnNames, columnTypes, schema); + return avroSerializer.serialize(o, objectInspector, columnNames, columnTypes, schema); } @Override @@ -222,7 +227,7 @@ public Object deserialize(Writable writable) throws SerDeException { if(badSchema) { throw new BadSchemaException(); } - return getDeserializer().deserialize(columnNames, columnTypes, writable, schema); + return avroDeserializer.deserialize(columnNames, columnTypes, writable, schema); } @Override @@ -236,22 +241,6 @@ public SerDeStats getSerDeStats() { return null; } - private AvroDeserializer getDeserializer() { - if(avroDeserializer == null) { - avroDeserializer = new AvroDeserializer(); - } - - return avroDeserializer; - } - - private AvroSerializer getSerializer() { - if(avroSerializer == null) { - avroSerializer = new AvroSerializer(); - } - - return avroSerializer; - } - @Override public boolean shouldStoreFieldsInMetastore(Map tableParams) { return !hasExternalSchema(tableParams); diff --git c/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java w/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java index 391a300549d8e549fa3cfa089696a14b1240df1a..d16abdb88c4b372272fd603b4fecce66eabb5f4b 100644 --- c/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java +++ w/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java @@ -205,12 +205,25 @@ public static boolean isNullableType(Schema schema) { } /** - * In a nullable type, get the schema for the non-nullable type. This method - * does no checking that the provides Schema is nullable. + * If the union schema is a nullable union, get the schema for the non-nullable type. + * This method does no checking that the provided Schema is nullable. If the provided + * union schema is non-nullable, it simply returns the union schema */ - public static Schema getOtherTypeFromNullableType(Schema schema) { - List itemSchemas = new ArrayList<>(); - for (Schema itemSchema : schema.getTypes()) { + public static Schema getOtherTypeFromNullableType(Schema unionSchema) { + final List types = unionSchema.getTypes(); + if (types.size() == 2) { // most common scenario + if (types.get(0).getType() == Schema.Type.NULL) { + return types.get(1); + } + if (types.get(1).getType() == Schema.Type.NULL) { + return types.get(0); + } + // not a nullable union + return unionSchema; + } + + final List itemSchemas = new ArrayList<>(); + for (Schema itemSchema : types) { if (!Schema.Type.NULL.equals(itemSchema.getType())) { itemSchemas.add(itemSchema); } diff --git c/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java w/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java index 3dc33311c8ce8b3aac6eda0c4e4b3fdcd2eeaeb1..ef97d2dd83d9909075086fe6781ad1c4ac88b9d3 100644 --- c/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java +++ w/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector; +import org.junit.Assert; import org.junit.Test; public class TestAvroDeserializer { @@ -255,6 +256,20 @@ private ResultPair(ObjectInspector oi, Object value, Object unionObject) { } } + @Test + public void canDeserializeSingleItemUnions() throws SerDeException, IOException { + Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.SINGLE_ITEM_UNION_SCHEMA); + GenericData.Record record = new GenericData.Record(s); + + record.put("aUnion", "this is a string"); + + ResultPair result = unionTester(s, record); + assertTrue(result.value instanceof String); + assertEquals("this is a string", result.value); + UnionObjectInspector uoi = (UnionObjectInspector)result.oi; + assertEquals(0, uoi.getTag(result.unionObject)); + } + @Test public void canDeserializeUnions() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.UNION_SCHEMA); @@ -361,6 +376,58 @@ private ResultPair unionTester(Schema ws, Schema rs, GenericData.Record record) return new ResultPair(fieldObjectInspector, value, theUnion); } + @Test + public void primitiveSchemaEvolution() throws Exception { + Schema fileSchema = AvroSerdeUtils.getSchemaFor( + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"r1\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"int_field\",\n" + + " \"type\": \"int\"\n" + + " }\n" + + " ]\n" + + "}" + ); + Schema readerSchema = AvroSerdeUtils.getSchemaFor( + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"r1\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"int_field\",\n" + + " \"type\": \"int\"\n" + + " },\n" + + " {\n" + + " \"name\": \"dec_field\",\n" + + " \"type\": [\n" + + " \"null\",\n" + + " {\n" + + " \"type\": \"bytes\",\n" + + " \"logicalType\": \"decimal\",\n" + + " \"precision\": 5,\n" + + " \"scale\": 4\n" + + " }\n" + + " ],\n" + + " \"default\": null\n" + + " }\n" + + " ]\n" + + "}" + ); + GenericData.Record record = new GenericData.Record(fileSchema); + + record.put("int_field", 1); + assertTrue(GENERIC_DATA.validate(fileSchema, record)); + AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record); + AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(readerSchema); + + AvroDeserializer de = new AvroDeserializer(); + List row = (List) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, readerSchema); + Assert.assertEquals(1, row.get(0)); + Assert.assertNull(row.get(1)); + } + @Test // Enums are one of two types we fudge for Hive. Enums go in, Strings come out. public void canDeserializeEnums() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.ENUM_SCHEMA); diff --git c/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java w/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java index 3736a1f8fcc089469efb79a2c4b22db032b7dc58..ee83ba360ba784eaebcefa40f933d6ac3c8885e4 100644 --- c/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java +++ w/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java @@ -101,6 +101,17 @@ " ]\n" + "}"; public static final String NULLABLE_RECORD_SCHEMA = "[\"null\", " + RECORD_SCHEMA + "]"; + public static final String SINGLE_ITEM_UNION_SCHEMA = "{\n" + + " \"namespace\": \"test.a.rossa\",\n" + + " \"name\": \"oneUnion\",\n" + + " \"type\": \"record\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\":\"aUnion\",\n" + + " \"type\":[\"string\"]\n" + + " }\n" + + " ]\n" + + "}"; public static final String UNION_SCHEMA = "{\n" + " \"namespace\": \"test.a.rossa\",\n" + " \"name\": \"oneUnion\",\n" +