diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java index 8cdc567..d42daa5 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.rmi.server.UID; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -39,14 +40,12 @@ import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; -import org.apache.avro.UnresolvedUnionException; +import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.hive.common.type.Date; -import org.apache.hadoop.hive.common.type.Timestamp; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector; @@ -59,6 +58,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class AvroDeserializer { private static final Logger LOG = LoggerFactory.getLogger(AvroDeserializer.class); @@ -116,7 +117,7 @@ public GenericRecord reencode(GenericRecord r) } } - private List row; + private final ArrayList row = new ArrayList<>(0); /** * Deserialize an Avro record, recursing into its component fields and @@ -135,14 +136,13 @@ public GenericRecord reencode(GenericRecord r) */ public Object deserialize(List columnNames, List columnTypes, Writable writable, Schema readerSchema) throws AvroSerdeException { - if(!(writable instanceof AvroGenericRecordWritable)) { + if (!(writable instanceof AvroGenericRecordWritable)) { throw new AvroSerdeException("Expecting a AvroGenericRecordWritable"); } - if(row == null || row.size() != columnNames.size()) { - row = new ArrayList(columnNames.size()); - } else { - row.clear(); + row.clear(); + if (row.size() != columnNames.size()) { + row.ensureCapacity(columnNames.size()); } AvroGenericRecordWritable recordWritable = (AvroGenericRecordWritable) writable; @@ -152,21 +152,21 @@ public Object deserialize(List columnNames, List columnTypes, UID recordReaderId = recordWritable.getRecordReaderID(); //If the record reader (from which the record is originated) is already seen and valid, //no need to re-encode the record. - if(!noEncodingNeeded.contains(recordReaderId)) { + if (!noEncodingNeeded.contains(recordReaderId)) { SchemaReEncoder reEncoder = null; //Check if the record record is already encoded once. If it does //reuse the encoder. - if(reEncoderCache.containsKey(recordReaderId)) { + if (reEncoderCache.containsKey(recordReaderId)) { reEncoder = reEncoderCache.get(recordReaderId); //Reuse the re-encoder } else if (!r.getSchema().equals(readerSchema)) { //Evolved schema? //Create and store new encoder in the map for re-use reEncoder = new SchemaReEncoder(r.getSchema(), readerSchema); reEncoderCache.put(recordReaderId, reEncoder); } else{ - LOG.debug("Adding new valid RRID :" + recordReaderId); + LOG.debug("Adding new valid RRID : {}", recordReaderId); noEncodingNeeded.add(recordReaderId); } - if(reEncoder != null) { + if (reEncoder != null) { if (!warnedOnce) { LOG.warn("Received different schemas. Have to re-encode: " + r.getSchema().toString(false) + "\nSIZE" + reEncoderCache + " ID " + recordReaderId); @@ -184,13 +184,17 @@ public Object deserialize(List columnNames, List columnTypes, private List workerBase(List objectRow, Schema fileSchema, List columnNames, List columnTypes, GenericRecord record) throws AvroSerdeException { - for(int i = 0; i < columnNames.size(); i++) { + for (int i = 0; i < columnNames.size(); i++) { TypeInfo columnType = columnTypes.get(i); String columnName = columnNames.get(i); Object datum = record.get(columnName); Schema datumSchema = record.getSchema().getField(columnName).schema(); - Schema.Field field = AvroSerdeUtils.isNullableType(fileSchema)?AvroSerdeUtils.getOtherTypeFromNullableType(fileSchema).getField(columnName):fileSchema.getField(columnName); - objectRow.add(worker(datum, field == null ? null : field.schema(), datumSchema, columnType)); + Schema.Field field = AvroSerdeUtils.isNullableType(fileSchema) + ? AvroSerdeUtils.getOtherTypeFromNullableType(fileSchema) + .getField(columnName) + : fileSchema.getField(columnName); + objectRow.add(worker(datum, field == null ? null : field.schema(), + datumSchema, columnType)); } return objectRow; @@ -212,7 +216,7 @@ private Object worker(Object datum, Schema fileSchema, Schema recordSchema, Type fileSchema = AvroSerdeUtils.getOtherTypeFromNullableType(fileSchema); } - switch(columnType.getCategory()) { + switch (columnType.getCategory()) { case STRUCT: return deserializeStruct((GenericData.Record) datum, fileSchema, (StructTypeInfo) columnType); case UNION: @@ -236,14 +240,16 @@ private Object deserializePrimitive(Object datum, Schema fileSchema, Schema reco // This also gets us around the Enum issue since we just take the value // and convert it to a string. Yay! case BINARY: - if (recordSchema.getType() == Type.FIXED){ + final Type recordSchemaType = recordSchema.getType(); + if (recordSchemaType == Type.FIXED){ Fixed fixed = (Fixed) datum; return fixed.bytes(); - } else if (recordSchema.getType() == Type.BYTES){ + } + if (recordSchemaType == Type.BYTES) { return AvroSerdeUtils.getBytesFromByteBuffer((ByteBuffer) datum); - } else { - throw new AvroSerdeException("Unexpected Avro schema for Binary TypeInfo: " + recordSchema.getType()); } + throw new AvroSerdeException( + "Unexpected Avro schema for Binary TypeInfo: " + recordSchemaType); case DECIMAL: if (fileSchema == null) { throw new AvroSerdeException("File schema is missing for decimal field. Reader schema is " + columnType); @@ -329,39 +335,36 @@ private Object deserializeUnion(Object datum, Schema fileSchema, Schema recordSc return new StandardUnionObjectInspector.StandardUnion((byte)rsTag, desered); } + @SuppressWarnings("rawtypes") private Object deserializeList(Object datum, Schema fileSchema, Schema recordSchema, ListTypeInfo columnType) throws AvroSerdeException { // Need to check the original schema to see if this is actually a Fixed. - if(recordSchema.getType().equals(Schema.Type.FIXED)) { + if (recordSchema.getType().equals(Schema.Type.FIXED)) { // We're faking out Hive to work through a type system impedence mismatch. // Pull out the backing array and convert to a list. GenericData.Fixed fixed = (GenericData.Fixed) datum; - List asList = new ArrayList(fixed.bytes().length); - for(int j = 0; j < fixed.bytes().length; j++) { - asList.add(fixed.bytes()[j]); - } - return asList; - } else if(recordSchema.getType().equals(Schema.Type.BYTES)) { + return Arrays.asList(ArrayUtils.toObject(fixed.bytes())); + } + if (recordSchema.getType().equals(Schema.Type.BYTES)) { // This is going to be slow... hold on. ByteBuffer bb = (ByteBuffer)datum; - List asList = new ArrayList(bb.capacity()); - byte[] array = bb.array(); - for(int j = 0; j < array.length; j++) { - asList.add(array[j]); - } - return asList; - } else { // An actual list, deser its values - List listData = (List) datum; - Schema listSchema = recordSchema.getElementType(); - List listContents = new ArrayList(listData.size()); - for(Object obj : listData) { - listContents.add(worker(obj, fileSchema == null ? null : fileSchema.getElementType(), listSchema, - columnType.getListElementTypeInfo())); - } - return listContents; + return Arrays.asList(ArrayUtils.toObject(bb.array())); + } + // An actual list, DeSer its values + List listData = (List) datum; + Schema listSchema = recordSchema.getElementType(); + if (fileSchema != null) { + fileSchema = fileSchema.getElementType(); } + TypeInfo columnTypeInfo = columnType.getListElementTypeInfo(); + List listContents = new ArrayList(listData.size()); + for (Object obj : listData) { + listContents.add(worker(obj, fileSchema, listSchema, columnTypeInfo)); + } + return listContents; } + @SuppressWarnings("unchecked") private Object deserializeMap(Object datum, Schema fileSchema, Schema mapSchema, MapTypeInfo columnType) throws AvroSerdeException { // Avro only allows maps with Strings for keys, so we only have to worry @@ -369,13 +372,15 @@ private Object deserializeMap(Object datum, Schema fileSchema, Schema mapSchema, Map map = new HashMap(); Map mapDatum = (Map)datum; Schema valueSchema = mapSchema.getValueType(); + if (fileSchema != null) { + fileSchema = fileSchema.getElementType(); + } TypeInfo valueTypeInfo = columnType.getMapValueTypeInfo(); for (CharSequence key : mapDatum.keySet()) { Object value = mapDatum.get(key); - map.put(key.toString(), worker(value, fileSchema == null ? null : fileSchema.getValueType(), - valueSchema, valueTypeInfo)); + map.put(key.toString(), + worker(value, fileSchema, valueSchema, valueTypeInfo)); } - return map; } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java index e1e4d8b..d877678 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroLazyObjectInspector.java @@ -19,6 +19,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -29,6 +30,7 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.ClassUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -198,16 +200,13 @@ public Object getStructFieldData(Object data, StructField f) { @Override public List getStructFieldsDataAsList(Object data) { - if (data == null) { - return null; - } - - List result = new ArrayList(fields.size()); - - for (int i = 0; i < fields.size(); i++) { - result.add(getStructFieldData(data, fields.get(i))); + List result = null; + if (data != null) { + result = new ArrayList(fields.size()); + for (MyField field : fields) { + result.add(getStructFieldData(data, field)); + } } - return result; } @@ -223,7 +222,7 @@ private Object deserializeStruct(Object struct, String fieldName) { byte[] data = ((LazyStruct) struct).getBytes(); AvroDeserializer deserializer = new AvroDeserializer(); - if (data == null || data.length == 0) { + if (ArrayUtils.isEmpty(data)) { return null; } @@ -263,10 +262,8 @@ private Object deserializeStruct(Object struct, String fieldName) { } // adjust the data bytes according to any possible offset that was provided - if (LOG.isDebugEnabled()) { - LOG.debug("Retrieved writer Schema: " + ws.toString()); - LOG.debug("Retrieved reader Schema: " + rs.toString()); - } + LOG.debug("Retrieved writer Schema: {}", ws); + LOG.debug("Retrieved reader Schema: {}", rs); try { avroWritable.readFields(data, offset, data.length, ws, rs); @@ -374,7 +371,7 @@ private Object toLazyObject(Object field, ObjectInspector fieldOI) { String objAsString = obj.toString().trim(); - ref.setData(objAsString.getBytes()); + ref.setData(objAsString.getBytes(StandardCharsets.UTF_8)); // initialize the lazy object lazyObject.init(ref, 0, ref.getData().length); @@ -403,8 +400,8 @@ private Object toLazyListObject(Object obj, ObjectInspector objectInspector) { ObjectInspector listElementOI = ((ListObjectInspector) objectInspector).getListElementObjectInspector(); - for (int i = 0; i < listObj.size(); i++) { - lazyList.add(toLazyObject(listObj.get(i), listElementOI)); + for (Object lobj : listObj) { + lazyList.add(toLazyObject(lobj, listElementOI)); } return retList; diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java index 3955611..333d849 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java @@ -19,14 +19,14 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.avro.Schema; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.common.StringInternUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Read or write Avro data from Hive. @@ -105,8 +107,8 @@ public void initialize(Configuration configuration, Properties properties) throw .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); if (hasExternalSchema(properties) - || columnNameProperty == null || columnNameProperty.isEmpty() - || columnTypeProperty == null || columnTypeProperty.isEmpty()) { + || StringUtils.isEmpty(columnNameProperty) + || StringUtils.isEmpty(columnTypeProperty)) { schema = determineSchemaOrReturnErrorSchema(configuration, properties); } else { // Get column names and sort order @@ -119,9 +121,7 @@ public void initialize(Configuration configuration, Properties properties) throw properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schema.toString()); - if (LOG.isDebugEnabled()) { - LOG.debug("Avro schema is " + schema); - } + LOG.debug("Avro schema is {}", schema); if (configuration == null) { LOG.debug("Configuration null, not inserting schema"); @@ -137,7 +137,7 @@ public void initialize(Configuration configuration, Properties properties) throw this.columnTypes = aoig.getColumnTypes(); this.oi = aoig.getObjectInspector(); - if(!badSchema) { + if (!badSchema) { this.avroSerializer = new AvroSerializer(); this.avroDeserializer = new AvroDeserializer(); } @@ -156,16 +156,14 @@ private boolean hasExternalSchema(Map tableParams) { public static Schema getSchemaFromCols(Properties properties, List columnNames, List columnTypes, String columnCommentProperty) { List columnComments; - if (columnCommentProperty == null || columnCommentProperty.isEmpty()) { - columnComments = new ArrayList(); + if (StringUtils.isEmpty(columnCommentProperty)) { + columnComments = Collections.emptyList(); } else { //Comments are separated by "\0" in columnCommentProperty, see method getSchema //in MetaStoreUtils where this string columns.comments is generated columnComments = Arrays.asList(columnCommentProperty.split("\0")); - if (LOG.isDebugEnabled()) { - LOG.debug("columnComments is " + columnCommentProperty); - } + LOG.debug("columnComments is {}", columnCommentProperty); } if (columnNames.size() != columnTypes.size()) { throw new IllegalArgumentException("AvroSerde initialization failed. Number of column " + @@ -216,7 +214,7 @@ public Schema determineSchemaOrReturnErrorSchema(Configuration conf, Properties @Override public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException { - if(badSchema) { + if (badSchema) { throw new BadSchemaException(); } return avroSerializer.serialize(o, objectInspector, columnNames, columnTypes, schema); @@ -224,7 +222,7 @@ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerD @Override public Object deserialize(Writable writable) throws SerDeException { - if(badSchema) { + if (badSchema) { throw new BadSchemaException(); } return avroDeserializer.deserialize(columnNames, columnTypes, writable, schema); diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java index 2d52020..668dafe 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java @@ -53,17 +53,21 @@ public Instance retrieve(SeedObject hv) throws AvroSerdeException { */ public synchronized Instance retrieve(SeedObject hv, Set seenSchemas) throws AvroSerdeException { - if(LOG.isDebugEnabled()) LOG.debug("Checking for hv: " + hv.toString()); + Instance instance = null; - if(cache.containsKey(hv)) { - if(LOG.isDebugEnabled()) LOG.debug("Returning cache result."); - return cache.get(hv); - } + LOG.debug("Checking for hv: {}", hv); + + instance = cache.get(hv); - if(LOG.isDebugEnabled()) LOG.debug("Creating new instance and storing in cache"); + if (instance != null) { + LOG.debug("Returning cache result: {}", instance); + return instance; + } - Instance instance = makeInstance(hv, seenSchemas); + LOG.debug("Creating new instance and storing in cache"); + instance = makeInstance(hv, seenSchemas); cache.put(hv, instance); + return instance; }