diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java index b7b3d12..ff5b7b9 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java @@ -25,6 +25,7 @@ import java.sql.Date; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -41,6 +42,7 @@ import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; +import org.apache.commons.lang.ArrayUtils; import org.apache.avro.UnresolvedUnionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,11 +137,11 @@ public GenericRecord reencode(GenericRecord r) */ public Object deserialize(List columnNames, List columnTypes, Writable writable, Schema readerSchema) throws AvroSerdeException { - if(!(writable instanceof AvroGenericRecordWritable)) { + if (!(writable instanceof AvroGenericRecordWritable)) { throw new AvroSerdeException("Expecting a AvroGenericRecordWritable"); } - if(row == null || row.size() != columnNames.size()) { + if (row == null || row.size() != columnNames.size()) { row = new ArrayList(columnNames.size()); } else { row.clear(); @@ -152,21 +154,21 @@ public Object deserialize(List columnNames, List columnTypes, UID recordReaderId = recordWritable.getRecordReaderID(); //If the record reader (from which the record is originated) is already seen and valid, //no need to re-encode the record. - if(!noEncodingNeeded.contains(recordReaderId)) { + if (!noEncodingNeeded.contains(recordReaderId)) { SchemaReEncoder reEncoder = null; //Check if the record record is already encoded once. If it does //reuse the encoder. - if(reEncoderCache.containsKey(recordReaderId)) { + if (reEncoderCache.containsKey(recordReaderId)) { reEncoder = reEncoderCache.get(recordReaderId); //Reuse the re-encoder } else if (!r.getSchema().equals(readerSchema)) { //Evolved schema? //Create and store new encoder in the map for re-use reEncoder = new SchemaReEncoder(r.getSchema(), readerSchema); reEncoderCache.put(recordReaderId, reEncoder); - } else{ - LOG.debug("Adding new valid RRID :" + recordReaderId); + } else { + LOG.debug("Adding new valid RRID: {}", recordReaderId); noEncodingNeeded.add(recordReaderId); } - if(reEncoder != null) { + if (reEncoder != null) { if (!warnedOnce) { LOG.warn("Received different schemas. Have to re-encode: " + r.getSchema().toString(false) + "\nSIZE" + reEncoderCache + " ID " + recordReaderId); @@ -184,7 +186,7 @@ public Object deserialize(List columnNames, List columnTypes, private List workerBase(List objectRow, Schema fileSchema, List columnNames, List columnTypes, GenericRecord record) throws AvroSerdeException { - for(int i = 0; i < columnNames.size(); i++) { + for (int i = 0; i < columnNames.size(); i++) { TypeInfo columnType = columnTypes.get(i); String columnName = columnNames.get(i); Object datum = record.get(columnName); @@ -229,10 +231,10 @@ private Object deserializePrimitive(Object datum, Schema fileSchema, Schema reco // This also gets us around the Enum issue since we just take the value // and convert it to a string. Yay! case BINARY: - if (recordSchema.getType() == Type.FIXED){ + if (recordSchema.getType() == Type.FIXED) { Fixed fixed = (Fixed) datum; return fixed.bytes(); - } else if (recordSchema.getType() == Type.BYTES){ + } else if (recordSchema.getType() == Type.BYTES) { return AvroSerdeUtils.getBytesFromByteBuffer((ByteBuffer) datum); } else { throw new AvroSerdeException("Unexpected Avro schema for Binary TypeInfo: " + recordSchema.getType()); @@ -344,9 +346,9 @@ private Object deserializeSingleItemNullableUnion(Object datum, if (datum != null) { datumClazz = datum.getClass().getName(); } - String msg = "File schema union could not resolve union. fileSchema = " + fileSchema + - ", recordSchema = " + recordSchema + ", datum class = " + datumClazz + ": " + e; - LOG.debug(msg, e); + LOG.debug("File schema union could not resolve union. fileSchema" + + " = {}, recordSchema = {}, datum class = {}", datumClazz, + fileSchema, recordSchema, e); } // This occurs when the datum type is different between // the file and record schema. For example if datum is long @@ -387,29 +389,20 @@ private Object deserializeUnion(Object datum, Schema fileSchema, Schema recordSc private Object deserializeList(Object datum, Schema fileSchema, Schema recordSchema, ListTypeInfo columnType) throws AvroSerdeException { // Need to check the original schema to see if this is actually a Fixed. - if(recordSchema.getType().equals(Schema.Type.FIXED)) { + if (recordSchema.getType().equals(Schema.Type.FIXED)) { // We're faking out Hive to work through a type system impedence mismatch. // Pull out the backing array and convert to a list. GenericData.Fixed fixed = (GenericData.Fixed) datum; - List asList = new ArrayList(fixed.bytes().length); - for(int j = 0; j < fixed.bytes().length; j++) { - asList.add(fixed.bytes()[j]); - } - return asList; - } else if(recordSchema.getType().equals(Schema.Type.BYTES)) { + return Arrays.asList(ArrayUtils.toObject(fixed.bytes())); + } else if (recordSchema.getType().equals(Schema.Type.BYTES)) { // This is going to be slow... hold on. ByteBuffer bb = (ByteBuffer)datum; - List asList = new ArrayList(bb.capacity()); - byte[] array = bb.array(); - for(int j = 0; j < array.length; j++) { - asList.add(array[j]); - } - return asList; + return Arrays.asList(ArrayUtils.toObject(bb.array())); } else { // An actual list, deser its values List listData = (List) datum; Schema listSchema = recordSchema.getElementType(); List listContents = new ArrayList(listData.size()); - for(Object obj : listData) { + for (Object obj : listData) { listContents.add(worker(obj, fileSchema == null ? null : fileSchema.getElementType(), listSchema, columnType.getListElementTypeInfo())); } @@ -430,7 +423,6 @@ private Object deserializeMap(Object datum, Schema fileSchema, Schema mapSchema, map.put(key.toString(), worker(value, fileSchema == null ? null : fileSchema.getValueType(), valueSchema, valueTypeInfo)); } - return map; } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java index e1e4d8b..e349c79 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java @@ -29,6 +29,7 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.ClassUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,10 +126,8 @@ public Object getStructFieldData(Object data, StructField f) { int fieldID = f.getFieldID(); - if (LOG.isDebugEnabled()) { - LOG.debug("Getting struct field data for field: [" + f.getFieldName() + "] on data [" - + data.getClass() + "]"); - } + LOG.debug("Getting struct field data for field: [{}] on data [{}]", + f.getFieldName(), data.getClass()); if (data instanceof LazyStruct) { LazyStruct row = (LazyStruct) data; @@ -137,10 +136,7 @@ public Object getStructFieldData(Object data, StructField f) { Object rowField = row.getField(fieldID); if (rowField instanceof LazyStruct) { - - if (LOG.isDebugEnabled() && rowField != null) { - LOG.debug("Deserializing struct [" + rowField.getClass() + "]"); - } + LOG.debug("Deserializing struct [{}]", rowField.getClass()); return deserializeStruct(rowField, f.getFieldName()); @@ -158,16 +154,12 @@ public Object getStructFieldData(Object data, StructField f) { } } - if (LOG.isDebugEnabled()) { - LOG.debug("Returning a lazy map for field [" + f.getFieldName() + "]"); - } + LOG.debug("Returning a lazy map for field [{}]", f.getFieldName()); return lazyMap; } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Returning [" + rowField + "] for field [" + f.getFieldName() + "]"); - } + LOG.debug("Returning [{}] for field [{}]", rowField, f.getFieldName()); // Just return the object. We need no further operation on it return rowField; @@ -198,16 +190,13 @@ public Object getStructFieldData(Object data, StructField f) { @Override public List getStructFieldsDataAsList(Object data) { - if (data == null) { - return null; - } - - List result = new ArrayList(fields.size()); - - for (int i = 0; i < fields.size(); i++) { - result.add(getStructFieldData(data, fields.get(i))); + List result = null; + if (data != null) { + result = new ArrayList(fields.size()); + for (MyField field : fields) { + result.add(getStructFieldData(data, field)); + } } - return result; } @@ -223,7 +212,7 @@ private Object deserializeStruct(Object struct, String fieldName) { byte[] data = ((LazyStruct) struct).getBytes(); AvroDeserializer deserializer = new AvroDeserializer(); - if (data == null || data.length == 0) { + if (ArrayUtils.isEmpty(data)) { return null; } @@ -263,10 +252,8 @@ private Object deserializeStruct(Object struct, String fieldName) { } // adjust the data bytes according to any possible offset that was provided - if (LOG.isDebugEnabled()) { - LOG.debug("Retrieved writer Schema: " + ws.toString()); - LOG.debug("Retrieved reader Schema: " + rs.toString()); - } + LOG.debug("Retrieved writer Schema: {}", ws); + LOG.debug("Retrieved reader Schema: {}", rs); try { avroWritable.readFields(data, offset, data.length, ws, rs); @@ -403,8 +390,8 @@ private Object toLazyListObject(Object obj, ObjectInspector objectInspector) { ObjectInspector listElementOI = ((ListObjectInspector) objectInspector).getListElementObjectInspector(); - for (int i = 0; i < listObj.size(); i++) { - lazyList.add(toLazyObject(listObj.get(i), listElementOI)); + for (Object lobj : listObj) { + lazyList.add(toLazyObject(lobj, listElementOI)); } return retList; diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index 1746a0f..cffc5ee 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -24,6 +24,7 @@ import java.util.Properties; import org.apache.avro.Schema; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.common.StringInternUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,9 +120,7 @@ public void initialize(Configuration configuration, Properties properties) throw properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schema.toString()); - if (LOG.isDebugEnabled()) { - LOG.debug("Avro schema is " + schema); - } + LOG.debug("Avro schema is {}", schema); if (configuration == null) { LOG.debug("Configuration null, not inserting schema"); @@ -151,16 +150,14 @@ private boolean hasExternalSchema(Map tableParams) { public static Schema getSchemaFromCols(Properties properties, List columnNames, List columnTypes, String columnCommentProperty) { List columnComments; - if (columnCommentProperty == null || columnCommentProperty.isEmpty()) { + if (StringUtils.isEmpty(columnCommentProperty)) { columnComments = new ArrayList(); } else { //Comments are separated by "\0" in columnCommentProperty, see method getSchema //in MetaStoreUtils where this string columns.comments is generated columnComments = Arrays.asList(columnCommentProperty.split("\0")); - if (LOG.isDebugEnabled()) { - LOG.debug("columnComments is " + columnCommentProperty); - } + LOG.debug("columnComments is {}", columnCommentProperty); } if (columnNames.size() != columnTypes.size()) { throw new IllegalArgumentException("AvroSerde initialization failed. Number of column " + @@ -211,7 +208,7 @@ public Schema determineSchemaOrReturnErrorSchema(Configuration conf, Properties @Override public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException { - if(badSchema) { + if (badSchema) { throw new BadSchemaException(); } return getSerializer().serialize(o, objectInspector, columnNames, columnTypes, schema); @@ -219,7 +216,7 @@ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerD @Override public Object deserialize(Writable writable) throws SerDeException { - if(badSchema) { + if (badSchema) { throw new BadSchemaException(); } return getDeserializer().deserialize(columnNames, columnTypes, writable, schema); @@ -237,15 +234,14 @@ public SerDeStats getSerDeStats() { } private AvroDeserializer getDeserializer() { - if(avroDeserializer == null) { + if (avroDeserializer == null) { avroDeserializer = new AvroDeserializer(); } - return avroDeserializer; } private AvroSerializer getSerializer() { - if(avroSerializer == null) { + if (avroSerializer == null) { avroSerializer = new AvroSerializer(); } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java index 2d52020..4101609 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java @@ -29,7 +29,7 @@ * used against other equivalent versions of those objects. Essentially memoizes instance creation. * * @param Object that determines the instance. The cache uses this object as a key for - * its hash which is why it is imperative to have appropriate equals and hashcode + * its hash which is why it is imperative to have appropriate equals and hash-code * implementation for this object for the cache to work properly * @param Instance that will be created from SeedObject. */ @@ -53,16 +53,16 @@ public Instance retrieve(SeedObject hv) throws AvroSerdeException { */ public synchronized Instance retrieve(SeedObject hv, Set seenSchemas) throws AvroSerdeException { - if(LOG.isDebugEnabled()) LOG.debug("Checking for hv: " + hv.toString()); + LOG.debug("Checking instance cache for hv: {}", hv); - if(cache.containsKey(hv)) { - if(LOG.isDebugEnabled()) LOG.debug("Returning cache result."); - return cache.get(hv); + Instance instance = cache.get(hv); + if (instance != null) { + LOG.debug("Returning instance cache result"); + return instance; } - if(LOG.isDebugEnabled()) LOG.debug("Creating new instance and storing in cache"); - - Instance instance = makeInstance(hv, seenSchemas); + LOG.debug("Creating new instance and storing in instance cache"); + instance = makeInstance(hv, seenSchemas); cache.put(hv, instance); return instance; }