diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java index 8cdc567..972b147 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.rmi.server.UID; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -39,14 +40,12 @@ import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; -import org.apache.avro.UnresolvedUnionException; +import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.hive.common.type.Date; -import org.apache.hadoop.hive.common.type.Timestamp; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector; @@ -59,6 +58,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class AvroDeserializer { private static final Logger LOG = LoggerFactory.getLogger(AvroDeserializer.class); @@ -109,14 +110,13 @@ public GenericRecord reencode(GenericRecord r) binaryDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bais, binaryDecoder); return gdr.read(r, binaryDecoder); - } catch (IOException e) { throw new AvroSerdeException("Exception trying to re-encode record to new schema", e); } } } - private List row; + private final ArrayList row = new ArrayList<>(0); /** * Deserialize an Avro record, recursing into its component fields and @@ -135,38 +135,39 @@ public GenericRecord reencode(GenericRecord r) */ public Object deserialize(List columnNames, List columnTypes, Writable writable, Schema readerSchema) throws AvroSerdeException { - if(!(writable instanceof AvroGenericRecordWritable)) { + if (!(writable instanceof AvroGenericRecordWritable)) { throw new AvroSerdeException("Expecting a AvroGenericRecordWritable"); } - if(row == null || row.size() != columnNames.size()) { - row = new ArrayList(columnNames.size()); - } else { - row.clear(); + row.clear(); + if (row.size() != columnNames.size()) { + row.ensureCapacity(columnNames.size()); } AvroGenericRecordWritable recordWritable = (AvroGenericRecordWritable) writable; GenericRecord r = recordWritable.getRecord(); Schema fileSchema = recordWritable.getFileSchema(); - UID recordReaderId = recordWritable.getRecordReaderID(); - //If the record reader (from which the record is originated) is already seen and valid, - //no need to re-encode the record. - if(!noEncodingNeeded.contains(recordReaderId)) { + UID recordReaderId = recordWritable.getRecordReaderID(); + // If the record reader (from which the record is originated) is already + // seen and valid, no need to re-encode the record. + if (!noEncodingNeeded.contains(recordReaderId)) { SchemaReEncoder reEncoder = null; - //Check if the record record is already encoded once. If it does - //reuse the encoder. - if(reEncoderCache.containsKey(recordReaderId)) { - reEncoder = reEncoderCache.get(recordReaderId); //Reuse the re-encoder - } else if (!r.getSchema().equals(readerSchema)) { //Evolved schema? - //Create and store new encoder in the map for re-use + // Check if the record record is already encoded once. If it does + // reuse the encoder. + if (reEncoderCache.containsKey(recordReaderId)) { + // Reuse the re-encoder + reEncoder = reEncoderCache.get(recordReaderId); + } else if (!r.getSchema().equals(readerSchema)) { + // Evolved schema? + // Create and store new encoder in the map for re-use reEncoder = new SchemaReEncoder(r.getSchema(), readerSchema); reEncoderCache.put(recordReaderId, reEncoder); - } else{ - LOG.debug("Adding new valid RRID :" + recordReaderId); + } else { + LOG.debug("Adding new valid RRID : {}", recordReaderId); noEncodingNeeded.add(recordReaderId); } - if(reEncoder != null) { + if (reEncoder != null) { if (!warnedOnce) { LOG.warn("Received different schemas. Have to re-encode: " + r.getSchema().toString(false) + "\nSIZE" + reEncoderCache + " ID " + recordReaderId); @@ -180,19 +181,32 @@ public Object deserialize(List columnNames, List columnTypes, return row; } - // The actual deserialization may involve nested records, which require recursion. - private List workerBase(List objectRow, Schema fileSchema, List columnNames, - List columnTypes, GenericRecord record) - throws AvroSerdeException { - for(int i = 0; i < columnNames.size(); i++) { - TypeInfo columnType = columnTypes.get(i); + /** + * The actual deserialization may involve nested records, which require + * recursion. + */ + private List workerBase(List objectRow, Schema fileSchema, + List columnNames, List columnTypes, + GenericRecord record) throws AvroSerdeException { + for (int i = 0; i < columnNames.size(); i++) { String columnName = columnNames.get(i); Object datum = record.get(columnName); - Schema datumSchema = record.getSchema().getField(columnName).schema(); - Schema.Field field = AvroSerdeUtils.isNullableType(fileSchema)?AvroSerdeUtils.getOtherTypeFromNullableType(fileSchema).getField(columnName):fileSchema.getField(columnName); - objectRow.add(worker(datum, field == null ? null : field.schema(), datumSchema, columnType)); + if (datum == null) { + objectRow.add(null); + } else { + Schema columnSchema = null; + TypeInfo columnType = columnTypes.get(i); + Schema datumSchema = record.getSchema().getField(columnName).schema(); + Schema.Field field = AvroSerdeUtils.isNullableType(fileSchema) + ? AvroSerdeUtils.getOtherTypeFromNullableType(fileSchema) + .getField(columnName) + : fileSchema.getField(columnName); + if (field != null) { + columnSchema = field.schema(); + } + objectRow.add(worker(datum, columnSchema, datumSchema, columnType)); + } } - return objectRow; } @@ -212,7 +226,7 @@ private Object worker(Object datum, Schema fileSchema, Schema recordSchema, Type fileSchema = AvroSerdeUtils.getOtherTypeFromNullableType(fileSchema); } - switch(columnType.getCategory()) { + switch (columnType.getCategory()) { case STRUCT: return deserializeStruct((GenericData.Record) datum, fileSchema, (StructTypeInfo) columnType); case UNION: @@ -236,14 +250,16 @@ private Object deserializePrimitive(Object datum, Schema fileSchema, Schema reco // This also gets us around the Enum issue since we just take the value // and convert it to a string. Yay! case BINARY: - if (recordSchema.getType() == Type.FIXED){ + final Type recordSchemaType = recordSchema.getType(); + if (recordSchemaType == Type.FIXED){ Fixed fixed = (Fixed) datum; return fixed.bytes(); - } else if (recordSchema.getType() == Type.BYTES){ + } + if (recordSchemaType == Type.BYTES) { return AvroSerdeUtils.getBytesFromByteBuffer((ByteBuffer) datum); - } else { - throw new AvroSerdeException("Unexpected Avro schema for Binary TypeInfo: " + recordSchema.getType()); } + throw new AvroSerdeException( + "Unexpected Avro schema for Binary TypeInfo: " + recordSchemaType); case DECIMAL: if (fileSchema == null) { throw new AvroSerdeException("File schema is missing for decimal field. Reader schema is " + columnType); @@ -329,39 +345,36 @@ private Object deserializeUnion(Object datum, Schema fileSchema, Schema recordSc return new StandardUnionObjectInspector.StandardUnion((byte)rsTag, desered); } + @SuppressWarnings("rawtypes") private Object deserializeList(Object datum, Schema fileSchema, Schema recordSchema, ListTypeInfo columnType) throws AvroSerdeException { // Need to check the original schema to see if this is actually a Fixed. - if(recordSchema.getType().equals(Schema.Type.FIXED)) { + if (recordSchema.getType().equals(Schema.Type.FIXED)) { // We're faking out Hive to work through a type system impedence mismatch. // Pull out the backing array and convert to a list. GenericData.Fixed fixed = (GenericData.Fixed) datum; - List asList = new ArrayList(fixed.bytes().length); - for(int j = 0; j < fixed.bytes().length; j++) { - asList.add(fixed.bytes()[j]); - } - return asList; - } else if(recordSchema.getType().equals(Schema.Type.BYTES)) { + return Arrays.asList(ArrayUtils.toObject(fixed.bytes())); + } + if (recordSchema.getType().equals(Schema.Type.BYTES)) { // This is going to be slow... hold on. ByteBuffer bb = (ByteBuffer)datum; - List asList = new ArrayList(bb.capacity()); - byte[] array = bb.array(); - for(int j = 0; j < array.length; j++) { - asList.add(array[j]); - } - return asList; - } else { // An actual list, deser its values - List listData = (List) datum; - Schema listSchema = recordSchema.getElementType(); - List listContents = new ArrayList(listData.size()); - for(Object obj : listData) { - listContents.add(worker(obj, fileSchema == null ? null : fileSchema.getElementType(), listSchema, - columnType.getListElementTypeInfo())); - } - return listContents; + return Arrays.asList(ArrayUtils.toObject(bb.array())); + } + // An actual list, DeSer its values + List listData = (List) datum; + Schema listSchema = recordSchema.getElementType(); + if (fileSchema != null) { + fileSchema = fileSchema.getElementType(); } + TypeInfo columnTypeInfo = columnType.getListElementTypeInfo(); + List listContents = new ArrayList(listData.size()); + for (Object obj : listData) { + listContents.add(worker(obj, fileSchema, listSchema, columnTypeInfo)); + } + return listContents; } + @SuppressWarnings({ "unchecked", "rawtypes" }) private Object deserializeMap(Object datum, Schema fileSchema, Schema mapSchema, MapTypeInfo columnType) throws AvroSerdeException { // Avro only allows maps with Strings for keys, so we only have to worry @@ -369,13 +382,15 @@ private Object deserializeMap(Object datum, Schema fileSchema, Schema mapSchema, Map map = new HashMap(); Map mapDatum = (Map)datum; Schema valueSchema = mapSchema.getValueType(); + if (fileSchema != null) { + fileSchema = fileSchema.getValueType(); + } TypeInfo valueTypeInfo = columnType.getMapValueTypeInfo(); for (CharSequence key : mapDatum.keySet()) { Object value = mapDatum.get(key); - map.put(key.toString(), worker(value, fileSchema == null ? null : fileSchema.getValueType(), - valueSchema, valueTypeInfo)); + map.put(key.toString(), + worker(value, fileSchema, valueSchema, valueTypeInfo)); } - return map; } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java index e1e4d8b..d877678 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java @@ -19,6 +19,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -29,6 +30,7 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.ClassUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -198,16 +200,13 @@ public Object getStructFieldData(Object data, StructField f) { @Override public List getStructFieldsDataAsList(Object data) { - if (data == null) { - return null; - } - - List result = new ArrayList(fields.size()); - - for (int i = 0; i < fields.size(); i++) { - result.add(getStructFieldData(data, fields.get(i))); + List result = null; + if (data != null) { + result = new ArrayList(fields.size()); + for (MyField field : fields) { + result.add(getStructFieldData(data, field)); + } } - return result; } @@ -223,7 +222,7 @@ private Object deserializeStruct(Object struct, String fieldName) { byte[] data = ((LazyStruct) struct).getBytes(); AvroDeserializer deserializer = new AvroDeserializer(); - if (data == null || data.length == 0) { + if (ArrayUtils.isEmpty(data)) { return null; } @@ -263,10 +262,8 @@ private Object deserializeStruct(Object struct, String fieldName) { } // adjust the data bytes according to any possible offset that was provided - if (LOG.isDebugEnabled()) { - LOG.debug("Retrieved writer Schema: " + ws.toString()); - LOG.debug("Retrieved reader Schema: " + rs.toString()); - } + LOG.debug("Retrieved writer Schema: {}", ws); + LOG.debug("Retrieved reader Schema: {}", rs); try { avroWritable.readFields(data, offset, data.length, ws, rs); @@ -374,7 +371,7 @@ private Object toLazyObject(Object field, ObjectInspector fieldOI) { String objAsString = obj.toString().trim(); - ref.setData(objAsString.getBytes()); + ref.setData(objAsString.getBytes(StandardCharsets.UTF_8)); // initialize the lazy object lazyObject.init(ref, 0, ref.getData().length); @@ -403,8 +400,8 @@ private Object toLazyListObject(Object obj, ObjectInspector objectInspector) { ObjectInspector listElementOI = ((ListObjectInspector) objectInspector).getListElementObjectInspector(); - for (int i = 0; i < listObj.size(); i++) { - lazyList.add(toLazyObject(listObj.get(i), listElementOI)); + for (Object lobj : listObj) { + lazyList.add(toLazyObject(lobj, listElementOI)); } return retList; diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index 3955611..bbaf35c 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -19,14 +19,14 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.avro.Schema; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.common.StringInternUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Read or write Avro data from Hive. @@ -76,18 +78,23 @@ private boolean badSchema = false; + /** + * Avro should always use the table properties for initialization. + * (see HIVE-6835) + */ @Override - public void initialize(Configuration configuration, Properties tableProperties, - Properties partitionProperties) throws SerDeException { - // Avro should always use the table properties for initialization (see HIVE-6835). + public void initialize(Configuration configuration, + Properties tableProperties, Properties partitionProperties) + throws SerDeException { initialize(configuration, tableProperties); } @Override - public void initialize(Configuration configuration, Properties properties) throws SerDeException { + public void initialize(Configuration configuration, Properties properties) + throws SerDeException { // Reset member variables so we don't get in a half-constructed state if (schema != null) { - LOG.debug("Resetting already initialized AvroSerDe"); + LOG.debug("Re-initializing already initialized AvroSerDe"); } LOG.info("AvroSerde::initialize(): Preset value of avro.schema.literal == " @@ -105,29 +112,30 @@ public void initialize(Configuration configuration, Properties properties) throw .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); if (hasExternalSchema(properties) - || columnNameProperty == null || columnNameProperty.isEmpty() - || columnTypeProperty == null || columnTypeProperty.isEmpty()) { + || StringUtils.isEmpty(columnNameProperty) + || StringUtils.isEmpty(columnTypeProperty)) { schema = determineSchemaOrReturnErrorSchema(configuration, properties); } else { // Get column names and sort order columnNames = StringInternUtils.internStringsInList( Arrays.asList(columnNameProperty.split(columnNameDelimiter))); - columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + columnTypes = + TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); - schema = getSchemaFromCols(properties, columnNames, columnTypes, columnCommentProperty); + schema = getSchemaFromCols(properties, columnNames, columnTypes, + columnCommentProperty); } properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schema.toString()); - if (LOG.isDebugEnabled()) { - LOG.debug("Avro schema is " + schema); - } + LOG.debug("Avro schema is {}", schema); if (configuration == null) { LOG.debug("Configuration null, not inserting schema"); } else { configuration.set( - AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SCHEMA.getPropName(), schema.toString(false)); + AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SCHEMA.getPropName(), + schema.toString(false)); } badSchema = schema.equals(SchemaResolutionProblem.SIGNAL_BAD_SCHEMA); @@ -137,7 +145,7 @@ public void initialize(Configuration configuration, Properties properties) throw this.columnTypes = aoig.getColumnTypes(); this.oi = aoig.getObjectInspector(); - if(!badSchema) { + if (!badSchema) { this.avroSerializer = new AvroSerializer(); this.avroDeserializer = new AvroDeserializer(); } @@ -156,17 +164,15 @@ private boolean hasExternalSchema(Map tableParams) { public static Schema getSchemaFromCols(Properties properties, List columnNames, List columnTypes, String columnCommentProperty) { List columnComments; - if (columnCommentProperty == null || columnCommentProperty.isEmpty()) { - columnComments = new ArrayList(); + if (StringUtils.isEmpty(columnCommentProperty)) { + columnComments = Collections.emptyList(); } else { //Comments are separated by "\0" in columnCommentProperty, see method getSchema //in MetaStoreUtils where this string columns.comments is generated columnComments = Arrays.asList(columnCommentProperty.split("\0")); - - if (LOG.isDebugEnabled()) { - LOG.debug("columnComments is " + columnCommentProperty); - } + LOG.debug("columnComments is {}", columnCommentProperty); } + if (columnNames.size() != columnTypes.size()) { throw new IllegalArgumentException("AvroSerde initialization failed. Number of column " + "name and column type differs. columnNames = " + columnNames + ", columnTypes = " + @@ -180,7 +186,6 @@ public static Schema getSchemaFromCols(Properties properties, properties.getProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_NAMESPACE.getPropName()), properties.getProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_NAME.getPropName(), tableName), properties.getProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_DOC.getPropName(), tableComment)); - } /** @@ -216,7 +221,7 @@ public Schema determineSchemaOrReturnErrorSchema(Configuration conf, Properties @Override public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException { - if(badSchema) { + if (badSchema) { throw new BadSchemaException(); } return avroSerializer.serialize(o, objectInspector, columnNames, columnTypes, schema); @@ -224,7 +229,7 @@ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerD @Override public Object deserialize(Writable writable) throws SerDeException { - if(badSchema) { + if (badSchema) { throw new BadSchemaException(); } return avroDeserializer.deserialize(columnNames, columnTypes, writable, schema); diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java index 2d52020..668dafe 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java @@ -53,17 +53,21 @@ public Instance retrieve(SeedObject hv) throws AvroSerdeException { */ public synchronized Instance retrieve(SeedObject hv, Set seenSchemas) throws AvroSerdeException { - if(LOG.isDebugEnabled()) LOG.debug("Checking for hv: " + hv.toString()); + Instance instance = null; - if(cache.containsKey(hv)) { - if(LOG.isDebugEnabled()) LOG.debug("Returning cache result."); - return cache.get(hv); - } + LOG.debug("Checking for hv: {}", hv); + + instance = cache.get(hv); - if(LOG.isDebugEnabled()) LOG.debug("Creating new instance and storing in cache"); + if (instance != null) { + LOG.debug("Returning cache result: {}", instance); + return instance; + } - Instance instance = makeInstance(hv, seenSchemas); + LOG.debug("Creating new instance and storing in cache"); + instance = makeInstance(hv, seenSchemas); cache.put(hv, instance); + return instance; } diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java index ef97d2d..192b1ad 100644 --- a/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java +++ b/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java @@ -43,11 +43,17 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestAvroDeserializer { + + private static final Logger LOG = LoggerFactory.getLogger(TestAvroDeserializer.class); + private final GenericData GENERIC_DATA = GenericData.get(); @Test + @SuppressWarnings("rawtypes") public void canDeserializeVoidType() throws IOException, SerDeException { String schemaString = "{\n" + " \"type\": \"record\", \n" + @@ -68,8 +74,8 @@ public void canDeserializeVoidType() throws IOException, SerDeException { AvroDeserializer de = new AvroDeserializer(); - ArrayList row = (ArrayList)de.deserialize(aoig.getColumnNames(), - aoig.getColumnTypes(), garw, s); + List row = (List) de.deserialize(aoig.getColumnNames(), + aoig.getColumnTypes(), garw, s); assertEquals(1, row.size()); Object theVoidObject = row.get(0); assertNull(theVoidObject); @@ -83,6 +89,7 @@ public void canDeserializeVoidType() throws IOException, SerDeException { } @Test + @SuppressWarnings("rawtypes") public void canDeserializeMapsWithPrimitiveKeys() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.MAP_WITH_PRIMITIVE_VALUE_TYPE); GenericData.Record record = new GenericData.Record(s); @@ -94,7 +101,7 @@ public void canDeserializeMapsWithPrimitiveKeys() throws SerDeException, IOExcep record.put("aMap", m); assertTrue(GENERIC_DATA.validate(s, record)); - System.out.println("record = " + record); + LOG.debug("record = " + record); AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record); @@ -102,8 +109,8 @@ public void canDeserializeMapsWithPrimitiveKeys() throws SerDeException, IOExcep AvroDeserializer de = new AvroDeserializer(); - ArrayList row = (ArrayList)de.deserialize(aoig.getColumnNames(), - aoig.getColumnTypes(), garw, s); + List row = (List) de.deserialize(aoig.getColumnNames(), + aoig.getColumnTypes(), garw, s); assertEquals(1, row.size()); Object theMapObject = row.get(0); assertTrue(theMapObject instanceof Map); @@ -129,6 +136,7 @@ public void canDeserializeMapsWithPrimitiveKeys() throws SerDeException, IOExcep } @Test + @SuppressWarnings("rawtypes") public void canDeserializeArrays() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.ARRAY_WITH_PRIMITIVE_ELEMENT_TYPE); GenericData.Record record = new GenericData.Record(s); @@ -140,15 +148,15 @@ public void canDeserializeArrays() throws SerDeException, IOException { record.put("anArray", list); assertTrue(GENERIC_DATA.validate(s, record)); - System.out.println("Array-backed record = " + record); + LOG.debug("Array-backed record = " + record); AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record); AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s); AvroDeserializer de = new AvroDeserializer(); - ArrayList row = (ArrayList)de.deserialize(aoig.getColumnNames(), - aoig.getColumnTypes(), garw, s); + List row = (List) de.deserialize(aoig.getColumnNames(), + aoig.getColumnTypes(), garw, s); assertEquals(1, row.size()); Object theArrayObject = row.get(0); assertTrue(theArrayObject instanceof List); @@ -183,9 +191,9 @@ public void canDeserializeArrays() throws SerDeException, IOException { Object thirdElement = anArrayOI.getListElement(anArrayData, 2); assertEquals("Smith", elementOI.getPrimitiveJavaObject(thirdElement)); assertTrue(thirdElement instanceof String); - } + @SuppressWarnings("rawtypes") public void canDeserializeRecordsInternal(Schema s, Schema fileSchema) throws SerDeException, IOException { GenericData.Record record = new GenericData.Record(s); GenericData.Record innerRecord = new GenericData.Record(s.getField("aRecord").schema()); @@ -200,11 +208,11 @@ public void canDeserializeRecordsInternal(Schema s, Schema fileSchema) throws Se AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s); AvroDeserializer de = new AvroDeserializer(); - ArrayList row = - (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s); + List row = (List) de.deserialize(aoig.getColumnNames(), + aoig.getColumnTypes(), garw, s); assertEquals(1, row.size()); Object theRecordObject = row.get(0); - System.out.println("theRecordObject = " + theRecordObject.getClass().getCanonicalName()); + LOG.debug("theRecordObject = " + theRecordObject.getClass().getCanonicalName()); // The original record was lost in the deserialization, so just go the // correct way, through objectinspectors @@ -350,6 +358,7 @@ private ResultPair unionTester(Schema ws, GenericData.Record record) return unionTester(ws, ws, record); } + @SuppressWarnings("rawtypes") private ResultPair unionTester(Schema ws, Schema rs, GenericData.Record record) throws SerDeException, IOException { assertTrue(GENERIC_DATA.validate(ws, record)); @@ -358,8 +367,8 @@ private ResultPair unionTester(Schema ws, Schema rs, GenericData.Record record) AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(rs); AvroDeserializer de = new AvroDeserializer(); - ArrayList row = - (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, rs); + List row = (List) de.deserialize(aoig.getColumnNames(), + aoig.getColumnTypes(), garw, rs); assertEquals(1, row.size()); StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector(); List fieldRefs = oi.getAllStructFieldRefs(); @@ -377,6 +386,7 @@ private ResultPair unionTester(Schema ws, Schema rs, GenericData.Record record) } @Test + @SuppressWarnings("rawtypes") public void primitiveSchemaEvolution() throws Exception { Schema fileSchema = AvroSerdeUtils.getSchemaFor( "{\n" @@ -423,12 +433,14 @@ public void primitiveSchemaEvolution() throws Exception { AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(readerSchema); AvroDeserializer de = new AvroDeserializer(); - List row = (List) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, readerSchema); + List row = (List) de.deserialize(aoig.getColumnNames(), + aoig.getColumnTypes(), garw, readerSchema); Assert.assertEquals(1, row.get(0)); Assert.assertNull(row.get(1)); } @Test // Enums are one of two types we fudge for Hive. Enums go in, Strings come out. + @SuppressWarnings("rawtypes") public void canDeserializeEnums() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.ENUM_SCHEMA); GenericData.Record record = new GenericData.Record(s); @@ -440,8 +452,8 @@ public void canDeserializeEnums() throws SerDeException, IOException { AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s); AvroDeserializer de = new AvroDeserializer(); - ArrayList row = - (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s); + List row = (List) de.deserialize(aoig.getColumnNames(), + aoig.getColumnTypes(), garw, s); assertEquals(1, row.size()); StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector(); List fieldRefs = oi.getAllStructFieldRefs(); @@ -458,7 +470,15 @@ public void canDeserializeEnums() throws SerDeException, IOException { assertEquals("DALEKS", finalValue); } - @Test // Fixed doesn't exist in Hive. Fixeds go in, lists of bytes go out. + /** + * "Fixed" types do not exist in Hive. Fixed types go in, lists of bytes go + * out. + * + * @throws SerDeException + * @throws IOException + */ + @Test + @SuppressWarnings("rawtypes") public void canDeserializeFixed() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.FIXED_SCHEMA); GenericData.Record record = new GenericData.Record(s); @@ -471,8 +491,8 @@ public void canDeserializeFixed() throws SerDeException, IOException { AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s); AvroDeserializer de = new AvroDeserializer(); - ArrayList row = - (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s); + List row = (List) de.deserialize(aoig.getColumnNames(), + aoig.getColumnTypes(), garw, s); assertEquals(1, row.size()); Object byteObject = row.get(0); assertTrue(byteObject instanceof byte[]); @@ -495,6 +515,7 @@ public void canDeserializeFixed() throws SerDeException, IOException { } @Test + @SuppressWarnings("rawtypes") public void canDeserializeBytes() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.BYTES_SCHEMA); GenericData.Record record = new GenericData.Record(s); @@ -510,8 +531,8 @@ public void canDeserializeBytes() throws SerDeException, IOException { AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s); AvroDeserializer de = new AvroDeserializer(); - ArrayList row = - (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s); + List row = (List) de.deserialize(aoig.getColumnNames(), + aoig.getColumnTypes(), garw, s); assertEquals(1, row.size()); Object byteObject = row.get(0); assertTrue(byteObject instanceof byte[]); @@ -560,6 +581,7 @@ public void canDeserializeNullableEnums() throws IOException, SerDeException { } @Test + @SuppressWarnings("rawtypes") public void canDeserializeMapWithNullablePrimitiveValues() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator .MAP_WITH_NULLABLE_PRIMITIVE_VALUE_TYPE_SCHEMA); @@ -573,7 +595,7 @@ public void canDeserializeMapWithNullablePrimitiveValues() throws SerDeException record.put("aMap", m); assertTrue(GENERIC_DATA.validate(s, record)); - System.out.println("record = " + record); + LOG.debug("record = " + record); AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record); @@ -581,7 +603,7 @@ public void canDeserializeMapWithNullablePrimitiveValues() throws SerDeException AvroDeserializer de = new AvroDeserializer(); - ArrayList row = (ArrayList)de.deserialize(aoig.getColumnNames(), + List row = (List) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s); assertEquals(1, row.size()); Object theMapObject = row.get(0); @@ -610,8 +632,9 @@ public void canDeserializeMapWithNullablePrimitiveValues() throws SerDeException assertTrue(theMap2.containsKey("mu")); assertEquals(null, theMap2.get("mu")); } - + @Test + @SuppressWarnings("rawtypes") public void canDeserializeMapsWithJavaLangStringKeys() throws IOException, SerDeException { // Ensures maps can be deserialized when avro.java.string=String. // See http://stackoverflow.com/a/19868919/312944 for why that might be used. @@ -638,7 +661,7 @@ public void canDeserializeMapsWithJavaLangStringKeys() throws IOException, SerDe record.put("aMap", m); assertTrue(GENERIC_DATA.validate(s, record)); - System.out.println("record = " + record); + LOG.debug("record = " + record); AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record); @@ -646,7 +669,7 @@ public void canDeserializeMapsWithJavaLangStringKeys() throws IOException, SerDe AvroDeserializer de = new AvroDeserializer(); - ArrayList row = (ArrayList)de.deserialize(aoig.getColumnNames(), + List row = (List)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s); assertEquals(1, row.size()); Object theMapObject = row.get(0); @@ -672,6 +695,7 @@ public void canDeserializeMapsWithJavaLangStringKeys() throws IOException, SerDe assertEquals(3l, theMap2.get("three")); } + @SuppressWarnings("rawtypes") private void verifyNullableType(GenericData.Record record, Schema s, String fieldName, String expected) throws SerDeException, IOException { assertTrue(GENERIC_DATA.validate(s, record)); @@ -680,8 +704,7 @@ private void verifyNullableType(GenericData.Record record, Schema s, String fiel AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s); AvroDeserializer de = new AvroDeserializer(); - ArrayList row = - (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s); + List row = (List)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s); assertEquals(1, row.size()); Object rowElement = row.get(0); @@ -700,6 +723,7 @@ private void verifyNullableType(GenericData.Record record, Schema s, String fiel } @Test + @SuppressWarnings("rawtypes") public void verifyCaching() throws SerDeException, IOException { Schema s = AvroSerdeUtils.getSchemaFor(TestAvroObjectInspectorGenerator.RECORD_SCHEMA); GenericData.Record record = new GenericData.Record(s); @@ -716,14 +740,15 @@ public void verifyCaching() throws SerDeException, IOException { AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s); AvroDeserializer de = new AvroDeserializer(); - ArrayList row = - (ArrayList) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s); + List row = (List) de.deserialize(aoig.getColumnNames(), + aoig.getColumnTypes(), garw, s); assertEquals(1, de.getNoEncodingNeeded().size()); assertEquals(0, de.getReEncoderCache().size()); // Read the record with the same record reader ID - row = (ArrayList) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s); + row = (List) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), + garw, s); //Expecting not to change the size of internal structures assertEquals(1, de.getNoEncodingNeeded().size()); @@ -731,7 +756,8 @@ public void verifyCaching() throws SerDeException, IOException { //Read the record with **different** record reader ID garw.setRecordReaderID(new UID()); //New record reader ID - row = (ArrayList) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s); + row = (List) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), + garw, s); //Expecting to change the size of internal structures assertEquals(2, de.getNoEncodingNeeded().size()); @@ -741,8 +767,8 @@ public void verifyCaching() throws SerDeException, IOException { Schema evolvedSchema = AvroSerdeUtils.getSchemaFor(s.toString()); evolvedSchema.getField("aRecord").schema().addProp("Testing", "meaningless"); garw.setRecordReaderID(recordReaderID = new UID()); //New record reader ID - row = - (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, evolvedSchema); + row = (List) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), + garw, evolvedSchema); //Expecting to change the size of internal structures assertEquals(2, de.getNoEncodingNeeded().size()); @@ -750,12 +776,11 @@ public void verifyCaching() throws SerDeException, IOException { //Read the record with existing record reader ID and same **evolved** schema garw.setRecordReaderID(recordReaderID); //Reuse record reader ID - row = - (ArrayList)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, evolvedSchema); + row = (List) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), + garw, evolvedSchema); //Expecting NOT to change the size of internal structures assertEquals(2, de.getNoEncodingNeeded().size()); assertEquals(1, de.getReEncoderCache().size()); - } }