diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java index 34da50d..2b11832 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java @@ -25,6 +25,7 @@ import java.sql.Date; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -41,9 +42,7 @@ import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; -import org.apache.avro.UnresolvedUnionException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveVarchar; @@ -60,6 +59,9 @@ import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + class AvroDeserializer { private static final Logger LOG = LoggerFactory.getLogger(AvroDeserializer.class); /** @@ -116,7 +118,7 @@ public GenericRecord reencode(GenericRecord r) } } - private List row; + private final ArrayList row = new ArrayList<>(0); /** * Deserialize an Avro record, recursing into its component fields and @@ -135,14 +137,13 @@ public GenericRecord reencode(GenericRecord r) */ public Object deserialize(List columnNames, List columnTypes, Writable writable, Schema readerSchema) throws AvroSerdeException { - if(!(writable instanceof AvroGenericRecordWritable)) { + if (!(writable instanceof AvroGenericRecordWritable)) { throw new AvroSerdeException("Expecting a AvroGenericRecordWritable"); } - if(row == null || row.size() != columnNames.size()) { - row = new ArrayList(columnNames.size()); - } else { - row.clear(); + row.clear(); + if (row.size() != columnNames.size()) { + row.ensureCapacity(columnNames.size()); } AvroGenericRecordWritable recordWritable = (AvroGenericRecordWritable) writable; @@ -152,21 +153,21 @@ public Object deserialize(List columnNames, List columnTypes, UID recordReaderId = recordWritable.getRecordReaderID(); //If the record reader (from which the record is originated) is already seen and valid, //no need to re-encode the record. - if(!noEncodingNeeded.contains(recordReaderId)) { + if (!noEncodingNeeded.contains(recordReaderId)) { SchemaReEncoder reEncoder = null; //Check if the record record is already encoded once. If it does //reuse the encoder. - if(reEncoderCache.containsKey(recordReaderId)) { + if (reEncoderCache.containsKey(recordReaderId)) { reEncoder = reEncoderCache.get(recordReaderId); //Reuse the re-encoder } else if (!r.getSchema().equals(readerSchema)) { //Evolved schema? //Create and store new encoder in the map for re-use reEncoder = new SchemaReEncoder(r.getSchema(), readerSchema); reEncoderCache.put(recordReaderId, reEncoder); } else{ - LOG.debug("Adding new valid RRID :" + recordReaderId); + LOG.debug("Adding new valid RRID : {}", recordReaderId); noEncodingNeeded.add(recordReaderId); } - if(reEncoder != null) { + if (reEncoder != null) { if (!warnedOnce) { LOG.warn("Received different schemas. Have to re-encode: " + r.getSchema().toString(false) + "\nSIZE" + reEncoderCache + " ID " + recordReaderId); @@ -184,7 +185,7 @@ public Object deserialize(List columnNames, List columnTypes, private List workerBase(List objectRow, Schema fileSchema, List columnNames, List columnTypes, GenericRecord record) throws AvroSerdeException { - for(int i = 0; i < columnNames.size(); i++) { + for (int i = 0; i < columnNames.size(); i++) { TypeInfo columnType = columnTypes.get(i); String columnName = columnNames.get(i); Object datum = record.get(columnName); @@ -236,14 +237,14 @@ private Object deserializePrimitive(Object datum, Schema fileSchema, Schema reco // This also gets us around the Enum issue since we just take the value // and convert it to a string. Yay! case BINARY: - if (recordSchema.getType() == Type.FIXED){ + if (recordSchema.getType() == Type.FIXED) { Fixed fixed = (Fixed) datum; return fixed.bytes(); - } else if (recordSchema.getType() == Type.BYTES){ + } + if (recordSchema.getType() == Type.BYTES) { return AvroSerdeUtils.getBytesFromByteBuffer((ByteBuffer) datum); - } else { - throw new AvroSerdeException("Unexpected Avro schema for Binary TypeInfo: " + recordSchema.getType()); } + throw new AvroSerdeException("Unexpected Avro schema for Binary TypeInfo: " + recordSchema.getType()); case DECIMAL: if (fileSchema == null) { throw new AvroSerdeException("File schema is missing for decimal field. Reader schema is " + columnType); @@ -332,34 +333,28 @@ private Object deserializeUnion(Object datum, Schema fileSchema, Schema recordSc private Object deserializeList(Object datum, Schema fileSchema, Schema recordSchema, ListTypeInfo columnType) throws AvroSerdeException { // Need to check the original schema to see if this is actually a Fixed. - if(recordSchema.getType().equals(Schema.Type.FIXED)) { + if (recordSchema.getType().equals(Schema.Type.FIXED)) { // We're faking out Hive to work through a type system impedence mismatch. // Pull out the backing array and convert to a list. GenericData.Fixed fixed = (GenericData.Fixed) datum; - List asList = new ArrayList(fixed.bytes().length); - for(int j = 0; j < fixed.bytes().length; j++) { - asList.add(fixed.bytes()[j]); - } - return asList; - } else if(recordSchema.getType().equals(Schema.Type.BYTES)) { + return Arrays.asList(ArrayUtils.toObject(fixed.bytes())); + } + + if (recordSchema.getType().equals(Schema.Type.BYTES)) { // This is going to be slow... hold on. ByteBuffer bb = (ByteBuffer)datum; - List asList = new ArrayList(bb.capacity()); - byte[] array = bb.array(); - for(int j = 0; j < array.length; j++) { - asList.add(array[j]); - } - return asList; - } else { // An actual list, deser its values - List listData = (List) datum; - Schema listSchema = recordSchema.getElementType(); - List listContents = new ArrayList(listData.size()); - for(Object obj : listData) { - listContents.add(worker(obj, fileSchema == null ? null : fileSchema.getElementType(), listSchema, - columnType.getListElementTypeInfo())); - } - return listContents; + return Arrays.asList(ArrayUtils.toObject(bb.array())); + } + + // An actual list, deser its values + List listData = (List) datum; + Schema listSchema = recordSchema.getElementType(); + List listContents = new ArrayList(listData.size()); + for (Object obj : listData) { + listContents.add(worker(obj, fileSchema == null ? null : fileSchema.getElementType(), listSchema, + columnType.getListElementTypeInfo())); } + return listContents; } private Object deserializeMap(Object datum, Schema fileSchema, Schema mapSchema, MapTypeInfo columnType) diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java index e1e4d8b..3fdcfe4 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java @@ -30,6 +30,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.commons.lang.ClassUtils; +import org.apache.commons.lang3.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.serde2.SerDeException; @@ -125,10 +126,8 @@ public Object getStructFieldData(Object data, StructField f) { int fieldID = f.getFieldID(); - if (LOG.isDebugEnabled()) { - LOG.debug("Getting struct field data for field: [" + f.getFieldName() + "] on data [" - + data.getClass() + "]"); - } + LOG.debug("Getting struct field data for field: [{}] on data [{}]", + f.getFieldName(), data.getClass()); if (data instanceof LazyStruct) { LazyStruct row = (LazyStruct) data; @@ -138,10 +137,9 @@ public Object getStructFieldData(Object data, StructField f) { if (rowField instanceof LazyStruct) { - if (LOG.isDebugEnabled() && rowField != null) { - LOG.debug("Deserializing struct [" + rowField.getClass() + "]"); + if (rowField != null) { + LOG.debug("Deserializing struct [{}]", rowField.getClass()); } - return deserializeStruct(rowField, f.getFieldName()); } else if (rowField instanceof LazyMap) { @@ -158,16 +156,11 @@ public Object getStructFieldData(Object data, StructField f) { } } - if (LOG.isDebugEnabled()) { - LOG.debug("Returning a lazy map for field [" + f.getFieldName() + "]"); - } - + LOG.debug("Returning a lazy map for field [{}]", f.getFieldName()); return lazyMap; } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Returning [" + rowField + "] for field [" + f.getFieldName() + "]"); - } + LOG.debug("Returning [{}] for field [{}]", rowField, f.getFieldName()); // Just return the object. We need no further operation on it return rowField; @@ -198,16 +191,13 @@ public Object getStructFieldData(Object data, StructField f) { @Override public List getStructFieldsDataAsList(Object data) { - if (data == null) { - return null; - } - - List result = new ArrayList(fields.size()); - - for (int i = 0; i < fields.size(); i++) { - result.add(getStructFieldData(data, fields.get(i))); + List result = null; + if (data != null) { + result = new ArrayList(fields.size()); + for (MyField field : fields) { + result.add(getStructFieldData(data, field)); + } } - return result; } @@ -223,7 +213,7 @@ private Object deserializeStruct(Object struct, String fieldName) { byte[] data = ((LazyStruct) struct).getBytes(); AvroDeserializer deserializer = new AvroDeserializer(); - if (data == null || data.length == 0) { + if (ArrayUtils.isEmpty(data)) { return null; } @@ -263,10 +253,8 @@ private Object deserializeStruct(Object struct, String fieldName) { } // adjust the data bytes according to any possible offset that was provided - if (LOG.isDebugEnabled()) { - LOG.debug("Retrieved writer Schema: " + ws.toString()); - LOG.debug("Retrieved reader Schema: " + rs.toString()); - } + LOG.debug("Retrieved writer Schema: {}", ws); + LOG.debug("Retrieved reader Schema: {}", rs); try { avroWritable.readFields(data, offset, data.length, ws, rs); @@ -403,8 +391,8 @@ private Object toLazyListObject(Object obj, ObjectInspector objectInspector) { ObjectInspector listElementOI = ((ListObjectInspector) objectInspector).getListElementObjectInspector(); - for (int i = 0; i < listObj.size(); i++) { - lazyList.add(toLazyObject(listObj.get(i), listElementOI)); + for (Object lobj : listObj) { + lazyList.add(toLazyObject(lobj, listElementOI)); } return retList; diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index 3955611..c3ae573 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -19,11 +19,13 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.avro.Schema; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.common.StringInternUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,8 +107,8 @@ public void initialize(Configuration configuration, Properties properties) throw .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); if (hasExternalSchema(properties) - || columnNameProperty == null || columnNameProperty.isEmpty() - || columnTypeProperty == null || columnTypeProperty.isEmpty()) { + || StringUtils.isEmpty(columnNameProperty) + || StringUtils.isEmpty(columnTypeProperty)) { schema = determineSchemaOrReturnErrorSchema(configuration, properties); } else { // Get column names and sort order @@ -119,9 +121,7 @@ public void initialize(Configuration configuration, Properties properties) throw properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schema.toString()); - if (LOG.isDebugEnabled()) { - LOG.debug("Avro schema is " + schema); - } + LOG.debug("Avro schema is {}", schema); if (configuration == null) { LOG.debug("Configuration null, not inserting schema"); @@ -137,7 +137,7 @@ public void initialize(Configuration configuration, Properties properties) throw this.columnTypes = aoig.getColumnTypes(); this.oi = aoig.getObjectInspector(); - if(!badSchema) { + if (!badSchema) { this.avroSerializer = new AvroSerializer(); this.avroDeserializer = new AvroDeserializer(); } @@ -156,16 +156,13 @@ private boolean hasExternalSchema(Map tableParams) { public static Schema getSchemaFromCols(Properties properties, List columnNames, List columnTypes, String columnCommentProperty) { List columnComments; - if (columnCommentProperty == null || columnCommentProperty.isEmpty()) { - columnComments = new ArrayList(); + if (StringUtils.isEmpty(columnCommentProperty)) { + columnComments = Collections.emptyList(); } else { //Comments are separated by "\0" in columnCommentProperty, see method getSchema //in MetaStoreUtils where this string columns.comments is generated columnComments = Arrays.asList(columnCommentProperty.split("\0")); - - if (LOG.isDebugEnabled()) { - LOG.debug("columnComments is " + columnCommentProperty); - } + LOG.debug("columnComments is {}", columnCommentProperty); } if (columnNames.size() != columnTypes.size()) { throw new IllegalArgumentException("AvroSerde initialization failed. Number of column " + @@ -216,7 +213,7 @@ public Schema determineSchemaOrReturnErrorSchema(Configuration conf, Properties @Override public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException { - if(badSchema) { + if (badSchema) { throw new BadSchemaException(); } return avroSerializer.serialize(o, objectInspector, columnNames, columnTypes, schema); @@ -224,7 +221,7 @@ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerD @Override public Object deserialize(Writable writable) throws SerDeException { - if(badSchema) { + if (badSchema) { throw new BadSchemaException(); } return avroDeserializer.deserialize(columnNames, columnTypes, writable, schema); diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java index 2d52020..f0cbc05 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java @@ -53,14 +53,14 @@ public Instance retrieve(SeedObject hv) throws AvroSerdeException { */ public synchronized Instance retrieve(SeedObject hv, Set seenSchemas) throws AvroSerdeException { - if(LOG.isDebugEnabled()) LOG.debug("Checking for hv: " + hv.toString()); + LOG.debug("Checking for hv: {}", hv); - if(cache.containsKey(hv)) { - if(LOG.isDebugEnabled()) LOG.debug("Returning cache result."); + if (cache.containsKey(hv)) { + LOG.debug("Returning cache result."); return cache.get(hv); } - if(LOG.isDebugEnabled()) LOG.debug("Creating new instance and storing in cache"); + LOG.debug("Creating new instance and storing in cache"); Instance instance = makeInstance(hv, seenSchemas); cache.put(hv, instance);