diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java index 688b072..1b42122 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java @@ -271,7 +271,8 @@ private Object deserializeNullableUnion(Object datum, Schema fileSchema, Schema currentFileSchema = fileSchema.getType() == Type.UNION ? fileSchema.getTypes().get(tag) : fileSchema; } - return worker(datum, currentFileSchema, schema, SchemaToTypeInfo.generateTypeInfo(schema)); + return worker(datum, currentFileSchema, schema, + SchemaToTypeInfo.generateTypeInfo(schema, null)); } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorGenerator.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorGenerator.java index 46cdb4f..9ac075a 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorGenerator.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorGenerator.java @@ -49,7 +49,7 @@ public AvroObjectInspectorGenerator(Schema schema) throws SerDeException { verifySchemaIsARecord(schema); this.columnNames = generateColumnNames(schema); - this.columnTypes = SchemaToTypeInfo.generateColumnTypes(schema); + this.columnTypes = SchemaToTypeInfo.generateColumnTypes(schema, null); assert columnNames.size() == columnTypes.size(); this.oi = createObjectInspector(); } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java index 2bd48ca..54c6a5d 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java @@ -145,10 +145,12 @@ private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object stru final InstanceCache> enums = new InstanceCache>() { @Override - protected InstanceCache makeInstance(final Schema schema) { + protected InstanceCache makeInstance(final Schema schema, + Map> schemaSeenMap) { return new InstanceCache() { @Override - protected GenericEnumSymbol makeInstance(Object seed) { + protected GenericEnumSymbol makeInstance(Object seed, + Map schemaSeenMap) { return new GenericData.EnumSymbol(schema, seed.toString()); } }; @@ -156,7 +158,7 @@ protected GenericEnumSymbol makeInstance(Object seed) { }; private Object serializeEnum(TypeInfo typeInfo, PrimitiveObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException { - return enums.retrieve(schema).retrieve(serializePrimitive(typeInfo, fieldOI, structFieldData, schema)); + return enums.retrieve(schema, null).retrieve(serializePrimitive(typeInfo, fieldOI, structFieldData, schema), null); } private Object serializeStruct(StructTypeInfo typeInfo, StructObjectInspector ssoi, Object o, Schema schema) throws AvroSerdeException { diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java index d848005..7d03214 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.LogFactory; import java.util.HashMap; +import java.util.Map; /** * Cache for objects whose creation only depends on some other set of objects @@ -40,7 +41,8 @@ public InstanceCache() {} * Retrieve (or create if it doesn't exist) the correct Instance for this * SeedObject */ - public Instance retrieve(SeedObject hv) throws AvroSerdeException { + public Instance retrieve(SeedObject hv, + Map schemaSeenMap) throws AvroSerdeException { if(LOG.isDebugEnabled()) LOG.debug("Checking for hv: " + hv.toString()); if(cache.containsKey(hv.hashCode())) { @@ -50,10 +52,11 @@ public Instance retrieve(SeedObject hv) throws AvroSerdeException { if(LOG.isDebugEnabled()) LOG.debug("Creating new instance and storing in cache"); - Instance instance = makeInstance(hv); + Instance instance = makeInstance(hv, schemaSeenMap); cache.put(hv.hashCode(), instance); return instance; } - protected abstract Instance makeInstance(SeedObject hv) throws AvroSerdeException; + protected abstract Instance makeInstance(SeedObject hv, + Map schemaSeenMap) throws AvroSerdeException; } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java index 23e024f..f8e3061 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Hashtable; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; @@ -77,17 +78,19 @@ * currently public due to some weirdness in deserializing unions, but * will be made private once that is resolved. * @param schema Schema to generate field types for + * @param schemaSeenMap map to handle circular references * @return List of TypeInfos, each element of which is a TypeInfo derived * from the schema. * @throws AvroSerdeException for problems during conversion. */ - public static List generateColumnTypes(Schema schema) throws AvroSerdeException { + public static List generateColumnTypes(Schema schema, + Map schemaSeenMap) throws AvroSerdeException { List fields = schema.getFields(); List types = new ArrayList(fields.size()); for (Schema.Field field : fields) { - types.add(generateTypeInfo(field.schema())); + types.add(generateTypeInfo(field.schema(), schemaSeenMap)); } return types; @@ -95,17 +98,21 @@ static InstanceCache typeInfoCache = new InstanceCache() { @Override - protected TypeInfo makeInstance(Schema s) throws AvroSerdeException { - return generateTypeInfoWorker(s); + protected TypeInfo makeInstance(Schema s, + Map schemaSeenMap) + throws AvroSerdeException { + return generateTypeInfoWorker(s, schemaSeenMap); } }; /** * Convert an Avro Schema into an equivalent Hive TypeInfo. * @param schema to record. Must be of record type. + * @param schemaSeenMap map to handle circular references * @return TypeInfo matching the Avro schema * @throws AvroSerdeException for any problems during conversion. */ - public static TypeInfo generateTypeInfo(Schema schema) throws AvroSerdeException { + public static TypeInfo generateTypeInfo(Schema schema, + Map schemaSeenMap) throws AvroSerdeException { // For bytes type, it can be mapped to decimal. Schema.Type type = schema.getType(); if (type == Schema.Type.BYTES && @@ -128,14 +135,16 @@ public static TypeInfo generateTypeInfo(Schema schema) throws AvroSerdeException return TypeInfoFactory.getDecimalTypeInfo(precision, scale); } - return typeInfoCache.retrieve(schema); + return typeInfoCache.retrieve(schema, schemaSeenMap); } - private static TypeInfo generateTypeInfoWorker(Schema schema) throws AvroSerdeException { + private static TypeInfo generateTypeInfoWorker(Schema schema, + Map schemaSeenMap) throws AvroSerdeException { // Avro requires NULLable types to be defined as unions of some type T // and NULL. This is annoying and we're going to hide it from the user. if(AvroSerdeUtils.isNullableType(schema)) { - return generateTypeInfo(AvroSerdeUtils.getOtherTypeFromNullableType(schema)); + return generateTypeInfo( + AvroSerdeUtils.getOtherTypeFromNullableType(schema), schemaSeenMap); } Schema.Type type = schema.getType(); @@ -144,25 +153,32 @@ private static TypeInfo generateTypeInfoWorker(Schema schema) throws AvroSerdeEx } switch(type) { - case RECORD: return generateRecordTypeInfo(schema); - case MAP: return generateMapTypeInfo(schema); - case ARRAY: return generateArrayTypeInfo(schema); - case UNION: return generateUnionTypeInfo(schema); + case RECORD: return generateRecordTypeInfo(schema, schemaSeenMap); + case MAP: return generateMapTypeInfo(schema, schemaSeenMap); + case ARRAY: return generateArrayTypeInfo(schema, schemaSeenMap); + case UNION: return generateUnionTypeInfo(schema, schemaSeenMap); case ENUM: return generateEnumTypeInfo(schema); default: throw new AvroSerdeException("Do not yet support: " + schema); } } - private static TypeInfo generateRecordTypeInfo(Schema schema) throws AvroSerdeException { + private static TypeInfo generateRecordTypeInfo(Schema schema, + Map schemaSeenMap) throws AvroSerdeException { assert schema.getType().equals(Schema.Type.RECORD); + if (schemaSeenMap == null) + schemaSeenMap = new IdentityHashMap(); + if (schemaSeenMap.containsKey(schema)) + return schemaSeenMap.get(schema); + schemaSeenMap.put(schema, primitiveTypeToTypeInfo.get(Schema.Type.NULL)); + List fields = schema.getFields(); List fieldNames = new ArrayList(fields.size()); List typeInfos = new ArrayList(fields.size()); for(int i = 0; i < fields.size(); i++) { fieldNames.add(i, fields.get(i).name()); - typeInfos.add(i, generateTypeInfo(fields.get(i).schema())); + typeInfos.add(i, generateTypeInfo(fields.get(i).schema(), schemaSeenMap)); } return TypeInfoFactory.getStructTypeInfo(fieldNames, typeInfos); @@ -172,23 +188,26 @@ private static TypeInfo generateRecordTypeInfo(Schema schema) throws AvroSerdeEx * Generate a TypeInfo for an Avro Map. This is made slightly simpler in that * Avro only allows maps with strings for keys. */ - private static TypeInfo generateMapTypeInfo(Schema schema) throws AvroSerdeException { + private static TypeInfo generateMapTypeInfo(Schema schema, + Map schemaSeenMap) throws AvroSerdeException { assert schema.getType().equals(Schema.Type.MAP); Schema valueType = schema.getValueType(); - TypeInfo ti = generateTypeInfo(valueType); + TypeInfo ti = generateTypeInfo(valueType, schemaSeenMap); return TypeInfoFactory.getMapTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"), ti); } - private static TypeInfo generateArrayTypeInfo(Schema schema) throws AvroSerdeException { + private static TypeInfo generateArrayTypeInfo(Schema schema, + Map schemaSeenMap) throws AvroSerdeException { assert schema.getType().equals(Schema.Type.ARRAY); Schema itemsType = schema.getElementType(); - TypeInfo itemsTypeInfo = generateTypeInfo(itemsType); + TypeInfo itemsTypeInfo = generateTypeInfo(itemsType, schemaSeenMap); return TypeInfoFactory.getListTypeInfo(itemsTypeInfo); } - private static TypeInfo generateUnionTypeInfo(Schema schema) throws AvroSerdeException { + private static TypeInfo generateUnionTypeInfo(Schema schema, + Map schemaSeenMap) throws AvroSerdeException { assert schema.getType().equals(Schema.Type.UNION); List types = schema.getTypes(); @@ -196,7 +215,7 @@ private static TypeInfo generateUnionTypeInfo(Schema schema) throws AvroSerdeExc List typeInfos = new ArrayList(types.size()); for(Schema type : types) { - typeInfos.add(generateTypeInfo(type)); + typeInfos.add(generateTypeInfo(type, schemaSeenMap)); } return TypeInfoFactory.getUnionTypeInfo(typeInfos); diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java index f8161da..779c2ed 100644 --- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java +++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.serde2.avro; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -486,4 +487,48 @@ public void canSerializeFixed() throws SerDeException, IOException { assertArrayEquals(fixed.bytes(), ((GenericData.Fixed) r.get("fixed1")).bytes()); } + @Test + public void canSerializeCyclesInSchema() throws SerDeException, IOException { + // Create parent-child avro-record and avro-schema + AvroCycleParent parent = new AvroCycleParent(); + AvroCycleChild child = new AvroCycleChild(); + parent.setChild (child); + Schema parentS = ReflectData.AllowNull.get().getSchema(AvroCycleParent.class); + GenericData.Record parentRec = new GenericData.Record(parentS); + Schema childS = ReflectData.AllowNull.get().getSchema(AvroCycleChild.class); + GenericData.Record childRec = new GenericData.Record(childS); + parentRec.put("child", childRec); + + // Initialize Avro SerDe + AvroSerializer as = new AvroSerializer(); + AvroDeserializer ad = new AvroDeserializer(); + AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(parentS); + ObjectInspector oi = aoig.getObjectInspector(); + List columnNames = aoig.getColumnNames(); + List columnTypes = aoig.getColumnTypes(); + + // Check serialization and deserialization + AvroGenericRecordWritable agrw = Utils.serializeAndDeserializeRecord(parentRec); + Object obj = ad.deserialize(columnNames, columnTypes, agrw, parentS); + + Writable result = as.serialize(obj, oi, columnNames, columnTypes, parentS); + assertTrue(result instanceof AvroGenericRecordWritable); + GenericRecord r2 = ((AvroGenericRecordWritable) result).getRecord(); + assertEquals(parentS, r2.getSchema()); + } + +} + +class AvroCycleParent { + AvroCycleChild child; + public AvroCycleChild getChild () {return child;} + public void setChild (AvroCycleChild child) {this.child = child;} +} + +class AvroCycleChild { + AvroCycleParent parent; + AvroCycleChild next; + Map map; + public AvroCycleParent getParent () {return parent;} + public void setParent (AvroCycleParent parent) {this.parent = parent;} } diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java index 1df88ee..57d2b5d 100644 --- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java +++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.serde2.avro; +import java.util.Map; import org.junit.Test; import static org.junit.Assert.assertSame; @@ -41,18 +42,19 @@ private Wrapper(T wrapped) { public void instanceCachesOnlyCreateOneInstance() throws AvroSerdeException { InstanceCache> ic = new InstanceCache>() { @Override - protected Wrapper makeInstance(Foo hv) { + protected Wrapper makeInstance(Foo hv, + Map> seenMap) { return new Wrapper(hv); } }; Foo f1 = new Foo(); - Wrapper fc = ic.retrieve(f1); + Wrapper fc = ic.retrieve(f1, null); assertSame(f1, fc.wrapped); // Our original foo should be in the wrapper Foo f2 = new Foo(); // Different instance, same value - Wrapper fc2 = ic.retrieve(f2); + Wrapper fc2 = ic.retrieve(f2, null); assertSame(fc2,fc); // Since equiv f, should get back first container assertSame(fc2.wrapped, f1); } @@ -60,19 +62,20 @@ protected Wrapper makeInstance(Foo hv) { @Test public void instanceCacheReturnsCorrectInstances() throws AvroSerdeException { InstanceCache> ic = new InstanceCache>() { - @Override - protected Wrapper makeInstance(String hv) { - return new Wrapper(hv); - } - }; + @Override + protected Wrapper makeInstance( + String hv, Map> seenMap) { + return new Wrapper(hv); + } + }; - Wrapper one = ic.retrieve("one"); - Wrapper two = ic.retrieve("two"); + Wrapper one = ic.retrieve("one", null); + Wrapper two = ic.retrieve("two", null); - Wrapper anotherOne = ic.retrieve("one"); + Wrapper anotherOne = ic.retrieve("one", null); assertSame(one, anotherOne); - Wrapper anotherTwo = ic.retrieve("two"); + Wrapper anotherTwo = ic.retrieve("two", null); assertSame(two, anotherTwo); } }