diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java index ecfe15f59d..62aab84f83 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java @@ -189,7 +189,9 @@ public Object deserialize(List columnNames, List columnTypes, String columnName = columnNames.get(i); Object datum = record.get(columnName); Schema datumSchema = record.getSchema().getField(columnName).schema(); - Schema.Field field = AvroSerdeUtils.isNullableType(fileSchema)?AvroSerdeUtils.getOtherTypeFromNullableType(fileSchema).getField(columnName):fileSchema.getField(columnName); + Schema.Field field = AvroSerdeUtils.isNullableType(fileSchema) + ? AvroSerdeUtils.getOtherTypeFromNullableType(fileSchema).getField(columnName) + : fileSchema.getField(columnName); objectRow.add(worker(datum, field == null ? null : field.schema(), datumSchema, columnType)); } @@ -198,11 +200,18 @@ public Object deserialize(List columnNames, List columnTypes, private Object worker(Object datum, Schema fileSchema, Schema recordSchema, TypeInfo columnType) throws AvroSerdeException { - // Klaxon! Klaxon! Klaxon! - // Avro requires NULLable types to be defined as unions of some type T - // and NULL. This is annoying and we're going to hide it from the user. - if(AvroSerdeUtils.isNullableType(recordSchema)) { - return deserializeNullableUnion(datum, fileSchema, recordSchema); + + if (datum == null) + return null; + + // Avro requires nullable types to be defined as unions of some type T + // and NULL. This is annoying and we're going to hide it from the user. + + if (recordSchema.getType() == Schema.Type.UNION) { + recordSchema = AvroSerdeUtils.getOtherTypeFromNullableType(recordSchema); + } + if (fileSchema != null && fileSchema.getType() == Schema.Type.UNION) { + fileSchema = AvroSerdeUtils.getOtherTypeFromNullableType(fileSchema); } switch(columnType.getCategory()) { @@ -300,71 +309,6 @@ private Object deserializePrimitive(Object datum, Schema fileSchema, Schema reco } } - /** - * Extract either a null or the correct type from a Nullable type. This is - * horrible in that we rebuild the TypeInfo every time. - */ - private Object deserializeNullableUnion(Object datum, Schema fileSchema, Schema recordSchema) - throws AvroSerdeException { - if (recordSchema.getTypes().size() == 2) { - // A type like [NULL, T] - return deserializeSingleItemNullableUnion(datum, fileSchema, recordSchema); - } else { - // Types like [NULL, T1, T2, ...] - if (datum == null) { - return null; - } else { - Schema newRecordSchema = AvroSerdeUtils.getOtherTypeFromNullableType(recordSchema); - return worker(datum, fileSchema, newRecordSchema, - SchemaToTypeInfo.generateTypeInfo(newRecordSchema, null)); - } - } - } - - private Object deserializeSingleItemNullableUnion(Object datum, - Schema fileSchema, - Schema recordSchema) - throws AvroSerdeException { - int tag = GenericData.get().resolveUnion(recordSchema, datum); // Determine index of value - Schema schema = recordSchema.getTypes().get(tag); - if (schema.getType().equals(Type.NULL)) { - return null; - } - - Schema currentFileSchema = null; - if (fileSchema != null) { - if (fileSchema.getType() == Type.UNION) { - // The fileSchema may have the null value in a different position, so - // we need to get the correct tag - try { - tag = GenericData.get().resolveUnion(fileSchema, datum); - currentFileSchema = fileSchema.getTypes().get(tag); - } catch (UnresolvedUnionException e) { - if (LOG.isDebugEnabled()) { - String datumClazz = null; - if (datum != null) { - datumClazz = datum.getClass().getName(); - } - String msg = "File schema union could not resolve union. fileSchema = " + fileSchema + - ", recordSchema = " + recordSchema + ", datum class = " + datumClazz + ": " + e; - LOG.debug(msg, e); - } - // This occurs when the datum type is different between - // the file and record schema. For example if datum is long - // and the field in the file schema is int. See HIVE-9462. - // in this case we will re-use the record schema as the file - // schema, Ultimately we need to clean this code up and will - // do as a follow-on to HIVE-9462. - currentFileSchema = schema; - } - } else { - currentFileSchema = fileSchema; - } - } - return worker(datum, currentFileSchema, schema, - SchemaToTypeInfo.generateTypeInfo(schema, null)); - } - private Object deserializeStruct(GenericData.Record datum, Schema fileSchema, StructTypeInfo columnType) throws AvroSerdeException { // No equivalent Java type for the backing structure, need to recurse and build a list @@ -390,7 +334,7 @@ private Object deserializeList(Object datum, Schema fileSchema, Schema recordSch ListTypeInfo columnType) throws AvroSerdeException { // Need to check the original schema to see if this is actually a Fixed. if(recordSchema.getType().equals(Schema.Type.FIXED)) { - // We're faking out Hive to work through a type system impedence mismatch. + // We're faking out Hive to work through a type system impedance mismatch. // Pull out the backing array and convert to a list. GenericData.Fixed fixed = (GenericData.Fixed) datum; List asList = new ArrayList(fixed.bytes().length); diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index 5467d8a84f..0658e8cea5 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -74,7 +74,6 @@ private AvroDeserializer avroDeserializer = null; private AvroSerializer avroSerializer = null; - private boolean badSchema = false; @Override public void initialize(Configuration configuration, Properties tableProperties, @@ -126,12 +125,17 @@ public void initialize(Configuration configuration, Properties properties) throw AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SCHEMA.getPropName(), schema.toString(false)); } - badSchema = schema.equals(SchemaResolutionProblem.SIGNAL_BAD_SCHEMA); + if(schema.equals(SchemaResolutionProblem.SIGNAL_BAD_SCHEMA)) { + throw new BadSchemaException(); + } AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(schema); this.columnNames = StringInternUtils.internStringsInList(aoig.getColumnNames()); this.columnTypes = aoig.getColumnTypes(); this.oi = aoig.getObjectInspector(); + + this.avroDeserializer = new AvroDeserializer(); + this.avroSerializer = new AvroSerializer(); } private boolean hasExternalSchema(Properties properties) { @@ -207,18 +211,12 @@ public Schema determineSchemaOrReturnErrorSchema(Configuration conf, Properties @Override public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException { - if(badSchema) { - throw new BadSchemaException(); - } - return getSerializer().serialize(o, objectInspector, columnNames, columnTypes, schema); + return avroSerializer.serialize(o, objectInspector, columnNames, columnTypes, schema); } @Override public Object deserialize(Writable writable) throws SerDeException { - if(badSchema) { - throw new BadSchemaException(); - } - return getDeserializer().deserialize(columnNames, columnTypes, writable, schema); + return avroDeserializer.deserialize(columnNames, columnTypes, writable, schema); } @Override diff --git serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java index 391a300549..e8c662a80e 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java @@ -205,12 +205,25 @@ public static boolean isNullableType(Schema schema) { } /** - * In a nullable type, get the schema for the non-nullable type. This method - * does no checking that the provides Schema is nullable. + * If the union schema is a nullable union, get the schema for the non-nullable type. + * This method does no checking that the provides Schema is nullable. If the provided + * union schema is non-nullable, it simply returns the union schema */ - public static Schema getOtherTypeFromNullableType(Schema schema) { - List itemSchemas = new ArrayList<>(); - for (Schema itemSchema : schema.getTypes()) { + public static Schema getOtherTypeFromNullableType(Schema unionSchema) { + final List types = unionSchema.getTypes(); + if (types.size() == 2) { // most common scenario + if (types.get(0).getType() == Schema.Type.NULL) { + return types.get(1); + } + if (types.get(1).getType() == Schema.Type.NULL) { + return types.get(0); + } + // not a nullable union + return unionSchema; + } + + final List itemSchemas = new ArrayList<>(); + for (Schema itemSchema : types) { if (!Schema.Type.NULL.equals(itemSchema.getType())) { itemSchemas.add(itemSchema); } diff --git serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java index 3dc33311c8..e10ac07319 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java +++ serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector; +import org.junit.Assert; import org.junit.Test; public class TestAvroDeserializer { @@ -361,6 +362,59 @@ private ResultPair unionTester(Schema ws, Schema rs, GenericData.Record record) return new ResultPair(fieldObjectInspector, value, theUnion); } + @Test + public void primitiveSchemaEvolution() throws Exception { + Schema fileSchema = AvroSerdeUtils.getSchemaFor( + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"r1\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"int_field\",\n" + + " \"type\": \"int\"\n" + + " }\n" + + " ]\n" + + "}" + ); + Schema readerSchema = AvroSerdeUtils.getSchemaFor( + "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"r1\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"int_field\",\n" + + " \"type\": \"int\"\n" + + " },\n" + + " {\n" + + " \"name\": \"dec_field\",\n" + + " \"type\": [\n" + + " \"null\",\n" + + " {\n" + + " \"type\": \"bytes\",\n" + + " \"logicalType\": \"decimal\",\n" + + " \"precision\": 5,\n" + + " \"scale\": 4\n" + + " }\n" + + " ],\n" + + " \"default\": null\n" + + " }\n" + + " ]\n" + + "}" + ); + GenericData.Record record = new GenericData.Record(fileSchema); + + record.put("int_field", 1); + assertTrue(GENERIC_DATA.validate(fileSchema, record)); + AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record); + AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(readerSchema); + + AvroDeserializer de = new AvroDeserializer(); + List row = (List) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, readerSchema); + System.out.println(row); + Assert.assertEquals(1, row.get(0)); + Assert.assertNull(row.get(1)); + } + @Test // Enums are one of two types we fudge for Hive. Enums go in, Strings come out. public void canDeserializeEnums() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.ENUM_SCHEMA);