diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java @@ -248,7 +248,7 @@ if(schema.getType().equals(Schema.Type.NULL)) { return null; } - return worker(datum, schema, SchemaToTypeInfo.generateTypeInfo(schema)); + return worker(datum, schema, SchemaToTypeInfo.generateTypeInfo(schema, null)); } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorGenerator.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorGenerator.java --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorGenerator.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroObjectInspectorGenerator.java @@ -49,7 +49,7 @@ verifySchemaIsARecord(schema); this.columnNames = generateColumnNames(schema); - this.columnTypes = SchemaToTypeInfo.generateColumnTypes(schema); + this.columnTypes = SchemaToTypeInfo.generateColumnTypes(schema, null); assert columnNames.size() == columnTypes.size(); this.oi = createObjectInspector(); } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java @@ -144,10 +144,12 @@ final InstanceCache> enums = new InstanceCache>() { @Override - protected InstanceCache makeInstance(final Schema schema) { + protected InstanceCache makeInstance(final Schema schema, + Map> schemaSeenMap) { return new InstanceCache() { @Override - protected GenericEnumSymbol makeInstance(Object seed) { + protected GenericEnumSymbol makeInstance(Object seed, + Map schemaSeenMap) { return new GenericData.EnumSymbol(schema, seed.toString()); } }; @@ -155,7 +157,7 @@ }; private Object serializeEnum(TypeInfo typeInfo, PrimitiveObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException { - return enums.retrieve(schema).retrieve(serializePrimitive(typeInfo, fieldOI, structFieldData, schema)); + return enums.retrieve(schema, null).retrieve(serializePrimitive(typeInfo, fieldOI, structFieldData, schema), null); } private Object serializeStruct(StructTypeInfo typeInfo, StructObjectInspector ssoi, Object o, Schema schema) throws AvroSerdeException { diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.LogFactory; import java.util.HashMap; +import java.util.Map; /** * Cache for objects whose creation only depends on some other set of objects @@ -40,7 +41,8 @@ * Retrieve (or create if it doesn't exist) the correct Instance for this * SeedObject */ - public Instance retrieve(SeedObject hv) throws AvroSerdeException { + public Instance retrieve(SeedObject hv, + Map schemaSeenMap) throws AvroSerdeException { if(LOG.isDebugEnabled()) LOG.debug("Checking for hv: " + hv.toString()); if(cache.containsKey(hv.hashCode())) { @@ -50,10 +52,11 @@ if(LOG.isDebugEnabled()) LOG.debug("Creating new instance and storing in cache"); - Instance instance = makeInstance(hv); + Instance instance = makeInstance(hv, schemaSeenMap); cache.put(hv.hashCode(), instance); return instance; } - protected abstract Instance makeInstance(SeedObject hv) throws AvroSerdeException; + protected abstract Instance makeInstance(SeedObject hv, + Map schemaSeenMap) throws AvroSerdeException; } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Hashtable; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; @@ -76,17 +77,19 @@ * currently public due to some weirdness in deserializing unions, but * will be made private once that is resolved. * @param schema Schema to generate field types for + * @param schemaSeenMap map to handle circular references * @return List of TypeInfos, each element of which is a TypeInfo derived * from the schema. * @throws AvroSerdeException for problems during conversion. */ - public static List generateColumnTypes(Schema schema) throws AvroSerdeException { + public static List generateColumnTypes(Schema schema, + Map schemaSeenMap) throws AvroSerdeException { List fields = schema.getFields(); List types = new ArrayList(fields.size()); for (Schema.Field field : fields) { - types.add(generateTypeInfo(field.schema())); + types.add(generateTypeInfo(field.schema(), schemaSeenMap)); } return types; @@ -94,25 +97,30 @@ static InstanceCache typeInfoCache = new InstanceCache() { @Override - protected TypeInfo makeInstance(Schema s) throws AvroSerdeException { - return generateTypeInfoWorker(s); + protected TypeInfo makeInstance(Schema s, + Map schemaSeenMap) + throws AvroSerdeException { + return generateTypeInfoWorker(s, schemaSeenMap); } }; /** * Convert an Avro Schema into an equivalent Hive TypeInfo. * @param schema to record. Must be of record type. + * @param schemaSeenMap map to handle circular references * @return TypeInfo matching the Avro schema * @throws AvroSerdeException for any problems during conversion. */ - public static TypeInfo generateTypeInfo(Schema schema) throws AvroSerdeException { - return typeInfoCache.retrieve(schema); + public static TypeInfo generateTypeInfo(Schema schema, + Map schemaSeenMap) throws AvroSerdeException { + return typeInfoCache.retrieve(schema, schemaSeenMap); } - private static TypeInfo generateTypeInfoWorker(Schema schema) throws AvroSerdeException { + private static TypeInfo generateTypeInfoWorker(Schema schema, + Map schemaSeenMap) throws AvroSerdeException { // Avro requires NULLable types to be defined as unions of some type T // and NULL. This is annoying and we're going to hide it from the user. if(AvroSerdeUtils.isNullableType(schema)) { - return generateTypeInfo(AvroSerdeUtils.getOtherTypeFromNullableType(schema)); + return generateTypeInfo(AvroSerdeUtils.getOtherTypeFromNullableType(schema), schemaSeenMap); } Schema.Type type = schema.getType(); @@ -122,25 +130,32 @@ } switch(type) { - case RECORD: return generateRecordTypeInfo(schema); - case MAP: return generateMapTypeInfo(schema); - case ARRAY: return generateArrayTypeInfo(schema); - case UNION: return generateUnionTypeInfo(schema); + case RECORD: return generateRecordTypeInfo(schema, schemaSeenMap); + case MAP: return generateMapTypeInfo(schema, schemaSeenMap); + case ARRAY: return generateArrayTypeInfo(schema, schemaSeenMap); + case UNION: return generateUnionTypeInfo(schema, schemaSeenMap); case ENUM: return generateEnumTypeInfo(schema); default: throw new AvroSerdeException("Do not yet support: " + schema); } } - private static TypeInfo generateRecordTypeInfo(Schema schema) throws AvroSerdeException { + private static TypeInfo generateRecordTypeInfo(Schema schema, + Map schemaSeenMap) throws AvroSerdeException { assert schema.getType().equals(Schema.Type.RECORD); + if (schemaSeenMap == null) + schemaSeenMap = new IdentityHashMap(); + if (schemaSeenMap.containsKey(schema)) + return schemaSeenMap.get(schema); + schemaSeenMap.put(schema, primitiveTypeToTypeInfo.get(Schema.Type.NULL)); + List fields = schema.getFields(); List fieldNames = new ArrayList(fields.size()); List typeInfos = new ArrayList(fields.size()); for(int i = 0; i < fields.size(); i++) { fieldNames.add(i, fields.get(i).name()); - typeInfos.add(i, generateTypeInfo(fields.get(i).schema())); + typeInfos.add(i, generateTypeInfo(fields.get(i).schema(), schemaSeenMap)); } return TypeInfoFactory.getStructTypeInfo(fieldNames, typeInfos); @@ -150,23 +165,26 @@ * Generate a TypeInfo for an Avro Map. This is made slightly simpler in that * Avro only allows maps with strings for keys. */ - private static TypeInfo generateMapTypeInfo(Schema schema) throws AvroSerdeException { + private static TypeInfo generateMapTypeInfo(Schema schema, + Map schemaSeenMap) throws AvroSerdeException { assert schema.getType().equals(Schema.Type.MAP); Schema valueType = schema.getValueType(); - TypeInfo ti = generateTypeInfo(valueType); + TypeInfo ti = generateTypeInfo(valueType, schemaSeenMap); return TypeInfoFactory.getMapTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"), ti); } - private static TypeInfo generateArrayTypeInfo(Schema schema) throws AvroSerdeException { + private static TypeInfo generateArrayTypeInfo(Schema schema, + Map schemaSeenMap) throws AvroSerdeException { assert schema.getType().equals(Schema.Type.ARRAY); Schema itemsType = schema.getElementType(); - TypeInfo itemsTypeInfo = generateTypeInfo(itemsType); + TypeInfo itemsTypeInfo = generateTypeInfo(itemsType, schemaSeenMap); return TypeInfoFactory.getListTypeInfo(itemsTypeInfo); } - private static TypeInfo generateUnionTypeInfo(Schema schema) throws AvroSerdeException { + private static TypeInfo generateUnionTypeInfo(Schema schema, + Map schemaSeenMap) throws AvroSerdeException { assert schema.getType().equals(Schema.Type.UNION); List types = schema.getTypes(); @@ -174,7 +192,7 @@ List typeInfos = new ArrayList(types.size()); for(Schema type : types) { - typeInfos.add(generateTypeInfo(type)); + typeInfos.add(generateTypeInfo(type, schemaSeenMap)); } return TypeInfoFactory.getUnionTypeInfo(typeInfos); diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java --- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java +++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.serde2.avro; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -83,6 +84,36 @@ } @Test + public void canSerializeCyclesInSchema() throws SerDeException, IOException { + // Create parent-child avro-record and avro-schema + AvroCycleParent parent = new AvroCycleParent(); + AvroCycleChild child = new AvroCycleChild(); + parent.setChild (child); + Schema parentS = ReflectData.AllowNull.get().getSchema(AvroCycleParent.class); + GenericData.Record parentRec = new GenericData.Record(parentS); + Schema childS = ReflectData.AllowNull.get().getSchema(AvroCycleChild.class); + GenericData.Record childRec = new GenericData.Record(childS); + parentRec.put("child", childRec); + + // Initialize Avro SerDe + AvroSerializer as = new AvroSerializer(); + AvroDeserializer ad = new AvroDeserializer(); + AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(parentS); + ObjectInspector oi = aoig.getObjectInspector(); + List columnNames = aoig.getColumnNames(); + List columnTypes = aoig.getColumnTypes(); + + // Check serialization and deserialization + AvroGenericRecordWritable agrw = Utils.serializeAndDeserializeRecord(parentRec); + Object obj = ad.deserialize(columnNames, columnTypes, agrw, parentS); + + Writable result = as.serialize(obj, oi, columnNames, columnTypes, parentS); + assertTrue(result instanceof AvroGenericRecordWritable); + GenericRecord r2 = ((AvroGenericRecordWritable) result).getRecord(); + assertEquals(parentS, r2.getSchema()); + } + + @Test public void canSerializeStrings() throws SerDeException, IOException { singleFieldTest("string1", "hello", "string"); } @@ -431,3 +462,17 @@ } } + +class AvroCycleParent { + AvroCycleChild child; + public AvroCycleChild getChild () {return child;} + public void setChild (AvroCycleChild child) {this.child = child;} +} + +class AvroCycleChild { + AvroCycleParent parent; + AvroCycleChild next; + Map map; + public AvroCycleParent getParent () {return parent;} + public void setParent (AvroCycleParent parent) {this.parent = parent;} +} diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java --- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java +++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.serde2.avro; +import java.util.Map; import org.junit.Test; import static org.junit.Assert.assertSame; @@ -41,18 +42,18 @@ public void instanceCachesOnlyCreateOneInstance() throws AvroSerdeException { InstanceCache> ic = new InstanceCache>() { @Override - protected Wrapper makeInstance(Foo hv) { + protected Wrapper makeInstance(Foo hv, Map> seenMap) { return new Wrapper(hv); } }; Foo f1 = new Foo(); - Wrapper fc = ic.retrieve(f1); + Wrapper fc = ic.retrieve(f1, null); assertSame(f1, fc.wrapped); // Our original foo should be in the wrapper Foo f2 = new Foo(); // Different instance, same value - Wrapper fc2 = ic.retrieve(f2); + Wrapper fc2 = ic.retrieve(f2, null); assertSame(fc2,fc); // Since equiv f, should get back first container assertSame(fc2.wrapped, f1); } @@ -61,18 +62,18 @@ public void instanceCacheReturnsCorrectInstances() throws AvroSerdeException { InstanceCache> ic = new InstanceCache>() { @Override - protected Wrapper makeInstance(String hv) { + protected Wrapper makeInstance(String hv, Map> seenMap) { return new Wrapper(hv); } }; - Wrapper one = ic.retrieve("one"); - Wrapper two = ic.retrieve("two"); + Wrapper one = ic.retrieve("one", null); + Wrapper two = ic.retrieve("two", null); - Wrapper anotherOne = ic.retrieve("one"); + Wrapper anotherOne = ic.retrieve("one", null); assertSame(one, anotherOne); - Wrapper anotherTwo = ic.retrieve("two"); + Wrapper anotherTwo = ic.retrieve("two", null); assertSame(two, anotherTwo); } }