diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index 77a6dc6..533fa98 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -173,7 +173,9 @@ public abstract class BaseSemanticAnalyzer { case HiveParser.TOK_TBLRCFILE: inputFormat = RCFILE_INPUT; outputFormat = RCFILE_OUTPUT; - shared.serde = COLUMNAR_SERDE; + if (shared.serde == null) { + shared.serde = COLUMNAR_SERDE; + } storageFormat = true; break; case HiveParser.TOK_TABLEFILEFORMAT: diff --git serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java index 28cf8a1..11f5f07 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java @@ -51,15 +51,7 @@ import org.apache.hadoop.io.Writable; * (2) ColumnarSerDe initialize ColumnarStruct's field directly. But under the * field level, it works like LazySimpleSerDe
*/ -public class ColumnarSerDe implements SerDe { - - // We need some initial values in case user don't call initialize() - private ObjectInspector cachedObjectInspector; - - private long serializedSize; - private SerDeStats stats; - private boolean lastOperationSerialize; - private boolean lastOperationDeserialize; +public class ColumnarSerDe extends ColumnarSerDeBase { @Override public String toString() { @@ -104,65 +96,15 @@ public class ColumnarSerDe implements SerDe { serdeParams.getNullSequence()); int size = serdeParams.getColumnTypes().size(); - field = new BytesRefWritable[size]; - for (int i = 0; i < size; i++) { - field[i] = new BytesRefWritable(); - serializeCache.set(i, field[i]); - } - + super.initialize(size); LOG.debug("ColumnarSerDe initialized with: columnNames=" + serdeParams.getColumnNames() + " columnTypes=" + serdeParams.getColumnTypes() + " separator=" + Arrays.asList(serdeParams.getSeparators()) + " nullstring=" + serdeParams.getNullString()); - - serializedSize = 0; - stats = new SerDeStats(); - lastOperationSerialize = false; - lastOperationDeserialize = false; - } - - // The object for storing row data - ColumnarStruct cachedLazyStruct; - - /** - * Deserialize a row from the Writable to a LazyObject. - */ - public Object deserialize(Writable blob) throws SerDeException { - - if (!(blob instanceof BytesRefArrayWritable)) { - throw new SerDeException(getClass().toString() - + ": expects BytesRefArrayWritable!"); - } - - BytesRefArrayWritable cols = (BytesRefArrayWritable) blob; - cachedLazyStruct.init(cols); - lastOperationSerialize = false; - lastOperationDeserialize = true; - return cachedLazyStruct; } /** - * Returns the ObjectInspector for the row. - */ - public ObjectInspector getObjectInspector() throws SerDeException { - return cachedObjectInspector; - } - - /** - * Returns the Writable Class after serialization. - * - * @see SerDe#getSerializedClass() - */ - public Class getSerializedClass() { - return BytesRefArrayWritable.class; - } - - BytesRefArrayWritable serializeCache = new BytesRefArrayWritable(); - BytesRefWritable field[]; - ByteStream.Output serializeStream = new ByteStream.Output(); - - /** * Serialize a row of data. * * @param obj @@ -244,20 +186,4 @@ public class ColumnarSerDe implements SerDe { return serializeCache; } - /** - * Returns the statistics after (de)serialization) - */ - - public SerDeStats getSerDeStats() { - // must be different - assert (lastOperationSerialize != lastOperationDeserialize); - - if (lastOperationSerialize) { - stats.setRawDataSize(serializedSize); - } else { - stats.setRawDataSize(cachedLazyStruct.getRawDataSerializedSize()); - } - return stats; - - } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDeBase.java serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDeBase.java new file mode 100644 index 0000000..017c9c6 --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDeBase.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.columnar; + +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.Writable; + +public abstract class ColumnarSerDeBase implements SerDe { + + // The object for storing row data + ColumnarStructBase cachedLazyStruct; + // We need some initial values in case user don't call initialize() + protected ObjectInspector cachedObjectInspector; + + protected long serializedSize; + protected SerDeStats stats; + protected boolean lastOperationSerialize; + protected boolean lastOperationDeserialize; + + BytesRefArrayWritable serializeCache = new BytesRefArrayWritable(); + BytesRefWritable field[]; + ByteStream.Output serializeStream = new ByteStream.Output(); + + @Override + public Object deserialize(Writable blob) throws SerDeException { + if (!(blob instanceof BytesRefArrayWritable)) { + throw new SerDeException(getClass().toString() + + ": expects BytesRefArrayWritable!"); + } + + BytesRefArrayWritable cols = (BytesRefArrayWritable) blob; + cachedLazyStruct.init(cols); + lastOperationSerialize = false; + lastOperationDeserialize = true; + return cachedLazyStruct; + } + + @Override + public SerDeStats getSerDeStats() { + // must be different + assert (lastOperationSerialize != lastOperationDeserialize); + + if (lastOperationSerialize) { + stats.setRawDataSize(serializedSize); + } else { + stats.setRawDataSize(cachedLazyStruct.getRawDataSerializedSize()); + } + return stats; + } + + @Override + public Class getSerializedClass() { + return BytesRefArrayWritable.class; + } + + protected void initialize(int size) throws SerDeException { + field = new BytesRefWritable[size]; + for (int i = 0; i < size; i++) { + field[i] = new BytesRefWritable(); + serializeCache.set(i, field[i]); + } + + serializedSize = 0; + stats = new SerDeStats(); + lastOperationSerialize = false; + lastOperationDeserialize = false; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return cachedObjectInspector; + } + +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java index e79021d..1335446 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java +++ serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java @@ -18,20 +18,15 @@ package org.apache.hadoop.hive.serde2.columnar; -import java.io.IOException; import java.util.ArrayList; -import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.serde2.SerDeStatsStruct; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyFactory; -import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; import org.apache.hadoop.hive.serde2.lazy.LazyUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.Text; /** @@ -41,12 +36,10 @@ import org.apache.hadoop.io.Text; * lazy way. * */ -public class ColumnarStruct implements SerDeStatsStruct{ +public class ColumnarStruct extends ColumnarStructBase { private static final Log LOG = LogFactory.getLog(ColumnarStruct.class); - int[] prjColIDs = null; // list of projected column IDs - Text nullSequence; int lengthNullSequence; @@ -72,207 +65,28 @@ public class ColumnarStruct implements SerDeStatsStruct{ */ public ColumnarStruct(ObjectInspector oi, ArrayList notSkippedColumnIDs, Text nullSequence) { - List fieldRefs = ((StructObjectInspector) oi) - .getAllStructFieldRefs(); - int num = fieldRefs.size(); - - fieldInfoList = new FieldInfo[num]; - + super(oi, notSkippedColumnIDs); if (nullSequence != null) { this.nullSequence = nullSequence; this.lengthNullSequence = nullSequence.getLength(); } - - // if no columns is set to be skipped, add all columns in - // 'notSkippedColumnIDs' - if (notSkippedColumnIDs == null || notSkippedColumnIDs.size() == 0) { - for (int i = 0; i < num; i++) { - notSkippedColumnIDs.add(i); - } - } - - for (int i = 0; i < num; i++) { - fieldInfoList[i] = new FieldInfo( - LazyFactory.createLazyObject(fieldRefs.get(i) - .getFieldObjectInspector()), - !notSkippedColumnIDs.contains(i)); - } - - // maintain a list of non-NULL column IDs - int min = notSkippedColumnIDs.size() > num ? num : notSkippedColumnIDs - .size(); - prjColIDs = new int[min]; - for (int i = 0, index = 0; i < notSkippedColumnIDs.size(); ++i) { - int readCol = notSkippedColumnIDs.get(i).intValue(); - if (readCol < num) { - prjColIDs[index] = readCol; - index++; - } - } - } - - /** - * Get one field out of the struct. - * - * If the field is a primitive field, return the actual object. Otherwise - * return the LazyObject. This is because PrimitiveObjectInspector does not - * have control over the object used by the user - the user simply directly - * use the Object instead of going through Object - * PrimitiveObjectInspector.get(Object). - * - * NOTE: separator and nullSequence has to be the same each time this method - * is called. These two parameters are used only once to parse each record. - * - * @param fieldID - * The field ID - * @param nullSequence - * The sequence for null value - * @return The field as a LazyObject - */ - public Object getField(int fieldID) { - return fieldInfoList[fieldID].uncheckedGetField(); } - class FieldInfo { - LazyObject field; - /* - * use an array instead of only one object in case in future hive does not do - * the byte copy. - */ - ByteArrayRef cachedByteArrayRef; - BytesRefWritable rawBytesField; - boolean inited; - boolean fieldSkipped; - - public FieldInfo(LazyObject lazyObject, boolean fieldSkipped) { - field = lazyObject; - cachedByteArrayRef = new ByteArrayRef(); - if (fieldSkipped) { - this.fieldSkipped = true; - inited = true; - } else { - inited = false; + @Override + protected int getLength(ObjectInspector objectInspector, ByteArrayRef cachedByteArrayRef, + int start, int fieldLen) { + if (fieldLen == lengthNullSequence) { + byte[] data = cachedByteArrayRef.getData(); + if (LazyUtils.compare(data, start, fieldLen, + nullSequence.getBytes(), 0, lengthNullSequence) == 0) { + return -1; } } - - /* - * ============================ [PERF] =================================== - * This function is called for every row. Setting up the selected/projected - * columns at the first call, and don't do that for the following calls. - * Ideally this should be done in the constructor where we don't need to - * branch in the function for each row. - * ========================================================================= - */ - public void init(BytesRefWritable col) { - if (col != null) { - rawBytesField= col; - inited = false; - } else { - // select columns that actually do not exist in the file. - fieldSkipped = true; - } - } - - /** - * Return the uncompressed size of this field - */ - public long getSerializedSize(){ - if (rawBytesField == null) { - return 0; - } - return rawBytesField.getLength(); - } - - /** - * Get the field out of the row without checking parsed. This is called by - * both getField and getFieldsAsList. - * - * @param fieldID - * The id of the field starting from 0. - * @param nullSequence - * The sequence representing NULL value. - * @return The value of the field - */ - protected Object uncheckedGetField() { - if (fieldSkipped) { - return null; - } - if (!inited) { - try { - cachedByteArrayRef.setData(rawBytesField.getData()); - } catch (IOException e) { - throw new RuntimeException(e); - } - field.init(cachedByteArrayRef, rawBytesField - .getStart(), rawBytesField.getLength()); - inited = true; - } - - - int fieldLen = rawBytesField.length; - if (fieldLen == lengthNullSequence) { - byte[] data = cachedByteArrayRef.getData(); - - if (LazyUtils.compare(data, rawBytesField.getStart(), fieldLen, - nullSequence.getBytes(), 0, lengthNullSequence) == 0) { - return null; - } - } - - return field.getObject(); - - } + return fieldLen; } - - FieldInfo[] fieldInfoList = null; - - - /* - * ============================ [PERF] =================================== - * This function is called for every row. Setting up the selected/projected - * columns at the first call, and don't do that for the following calls. - * Ideally this should be done in the constructor where we don't need to - * branch in the function for each row. - * ========================================================================= - */ - public void init(BytesRefArrayWritable cols) { - for (int i = 0; i < prjColIDs.length; ++i) { - int fieldIndex = prjColIDs[i]; - if (fieldIndex < cols.size()) { - fieldInfoList[fieldIndex].init(cols.unCheckedGet(fieldIndex)); - } else { - // select columns that actually do not exist in the file. - fieldInfoList[fieldIndex].init(null); - } - } - } - - ArrayList cachedList; - - /** - * Get the values of the fields as an ArrayList. - * - * @param nullSequence - * The sequence for the NULL value - * @return The values of the fields as an ArrayList. - */ - public ArrayList getFieldsAsList() { - if (cachedList == null) { - cachedList = new ArrayList(); - } else { - cachedList.clear(); - } - for (int i = 0; i < fieldInfoList.length; i++) { - cachedList.add(fieldInfoList[i].uncheckedGetField()); - } - return cachedList; - } - - public long getRawDataSerializedSize() { - long serializedSize = 0; - for (int i = 0; i < fieldInfoList.length; ++i) { - serializedSize += fieldInfoList[i].getSerializedSize(); - } - return serializedSize; + + @Override + protected LazyObjectBase createLazyObjectBase(ObjectInspector objectInspector) { + return LazyFactory.createLazyObject(objectInspector); } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java new file mode 100644 index 0000000..379b3e5 --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.columnar; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.serde2.SerDeStatsStruct; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +public abstract class ColumnarStructBase implements SerDeStatsStruct { + + class FieldInfo { + LazyObjectBase field; + /* + * use an array instead of only one object in case in future hive does not do + * the byte copy. + */ + ByteArrayRef cachedByteArrayRef; + BytesRefWritable rawBytesField; + boolean inited; + boolean fieldSkipped; + ObjectInspector objectInspector; + + public FieldInfo(LazyObjectBase lazyObject, boolean fieldSkipped, ObjectInspector oi) { + field = lazyObject; + cachedByteArrayRef = new ByteArrayRef(); + objectInspector = oi; + if (fieldSkipped) { + this.fieldSkipped = true; + inited = true; + } else { + inited = false; + } + } + + /* + * ============================ [PERF] =================================== + * This function is called for every row. Setting up the selected/projected + * columns at the first call, and don't do that for the following calls. + * Ideally this should be done in the constructor where we don't need to + * branch in the function for each row. + * ========================================================================= + */ + public void init(BytesRefWritable col) { + if (col != null) { + rawBytesField = col; + inited = false; + } else { + // select columns that actually do not exist in the file. + fieldSkipped = true; + } + } + + /** + * Return the uncompressed size of this field + */ + public long getSerializedSize() { + if (rawBytesField == null) { + return 0; + } + return rawBytesField.getLength(); + } + + /** + * Get the field out of the row without checking parsed. This is called by + * both getField and getFieldsAsList. + * + * @return The value of the field + */ + protected Object uncheckedGetField() { + if (fieldSkipped) { + return null; + } + if (!inited) { + try { + cachedByteArrayRef.setData(rawBytesField.getData()); + } catch (IOException e) { + throw new RuntimeException(e); + } + inited = true; + int byteLength = getLength(objectInspector, cachedByteArrayRef, rawBytesField.getStart(), + rawBytesField.getLength()); + if (byteLength == -1) { + return null; + } + + field.init(cachedByteArrayRef, rawBytesField.getStart(), byteLength); + return field.getObject(); + } else { + if (getLength(objectInspector, cachedByteArrayRef, rawBytesField.getStart(), + rawBytesField.getLength()) == -1) { + return null; + } + return field.getObject(); + } + } + } + + protected int[] prjColIDs = null; + private FieldInfo[] fieldInfoList = null; + private ArrayList cachedList; + + public ColumnarStructBase(ObjectInspector oi, + ArrayList notSkippedColumnIDs) { + List fieldRefs = ((StructObjectInspector) oi) + .getAllStructFieldRefs(); + int num = fieldRefs.size(); + + fieldInfoList = new FieldInfo[num]; + + // if no columns is set to be skipped, add all columns in + // 'notSkippedColumnIDs' + if (notSkippedColumnIDs == null || notSkippedColumnIDs.size() == 0) { + for (int i = 0; i < num; i++) { + notSkippedColumnIDs.add(i); + } + } + + for (int i = 0; i < num; i++) { + ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector(); + fieldInfoList[i] = new FieldInfo( + createLazyObjectBase(foi), + !notSkippedColumnIDs.contains(i), + foi); + } + + // maintain a list of non-NULL column IDs + int min = notSkippedColumnIDs.size() > num ? num : notSkippedColumnIDs + .size(); + prjColIDs = new int[min]; + for (int i = 0, index = 0; i < notSkippedColumnIDs.size(); ++i) { + int readCol = notSkippedColumnIDs.get(i).intValue(); + if (readCol < num) { + prjColIDs[index] = readCol; + index++; + } + } + } + + /** + * Get one field out of the struct. + * + * If the field is a primitive field, return the actual object. Otherwise + * return the LazyObject. This is because PrimitiveObjectInspector does not + * have control over the object used by the user - the user simply directly + * use the Object instead of going through Object + * PrimitiveObjectInspector.get(Object). + * + * NOTE: separator and nullSequence has to be the same each time this method + * is called. These two parameters are used only once to parse each record. + * + * @param fieldID + * The field ID + * @param nullSequence + * The sequence for null value + * @return The field as a LazyObject + */ + public Object getField(int fieldID) { + return fieldInfoList[fieldID].uncheckedGetField(); + } + + /** + * Check if the object is null and return the length of the stream + * + * @param objectInspector + * @param cachedByteArrayRef + * the bytes of the object + * @param start + * the start offset + * @param length + * the length + * + * @return -1 for null, >=0 for length + */ + protected abstract int getLength(ObjectInspector objectInspector, + ByteArrayRef cachedByteArrayRef, int start, int length); + + /** + * create the lazy object for this field + * + * @param objectInspector + * the object inspector for the field + * @return the lazy object for the field + */ + protected abstract LazyObjectBase createLazyObjectBase(ObjectInspector objectInspector); + + public void init(BytesRefArrayWritable cols) { + for (int i = 0; i < prjColIDs.length; ++i) { + int fieldIndex = prjColIDs[i]; + if (fieldIndex < cols.size()) { + fieldInfoList[fieldIndex].init(cols.unCheckedGet(fieldIndex)); + } else { + // select columns that actually do not exist in the file. + fieldInfoList[fieldIndex].init(null); + } + } + } + + /** + * Get the values of the fields as an ArrayList. + * + * @param nullSequence + * The sequence for the NULL value + * @return The values of the fields as an ArrayList. + */ + public ArrayList getFieldsAsList() { + if (cachedList == null) { + cachedList = new ArrayList(); + } else { + cachedList.clear(); + } + for (int i = 0; i < fieldInfoList.length; i++) { + cachedList.add(fieldInfoList[i].uncheckedGetField()); + } + return cachedList; + } + + public long getRawDataSerializedSize() { + long serializedSize = 0; + for (int i = 0; i < fieldInfoList.length; ++i) { + serializedSize += fieldInfoList[i].getSerializedSize(); + } + return serializedSize; + } + +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java new file mode 100644 index 0000000..3795ccd --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java @@ -0,0 +1,100 @@ +package org.apache.hadoop.hive.serde2.columnar; + +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.Writable; + + +/** + * LazyBinaryColumnarSerDe. This serde combines elements of columnar serde and lazybinary serde + * to produce a serde which serializes columns into a BytesRefArrayWritable in a compact binary + * format and which is deserialized in a lazy, i.e. on-demand fashion. + * + */ +public class LazyBinaryColumnarSerDe extends ColumnarSerDeBase { + + private List columnNames; + private List columnTypes; + + @Override + public String toString() { + return getClass().toString() + + "[" + + columnNames + + ":" + + columnTypes + "]"; + } + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + SerDeParameters serdeParams = new SerDeParameters(); + LazyUtils.extractColumnInfo(tbl, serdeParams, getClass().getName()); + columnNames = serdeParams.getColumnNames(); + columnTypes = serdeParams.getColumnTypes(); + + cachedObjectInspector = LazyBinaryFactory.createColumnarStructInspector( + columnNames, columnTypes); + java.util.ArrayList notSkipIDs = ColumnProjectionUtils.getReadColumnIDs(conf); + cachedLazyStruct = new LazyBinaryColumnarStruct(cachedObjectInspector, notSkipIDs); + int size = columnTypes.size(); + super.initialize(size); + } + + static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte)Integer.parseInt("10111111", 2)}; + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + if (objInspector.getCategory() != Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only serialize struct types, but we got: " + + objInspector.getTypeName()); + } + + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + + boolean warnedOnceNullMapKey = false; + serializeStream.reset(); + serializedSize = 0; + int streamOffset = 0; + // Serialize each field + for (int i = 0; i < fields.size(); i++) { + // Get the field objectInspector and the field object. + ObjectInspector foi = fields.get(i).getFieldObjectInspector(); + Object f = (list == null ? null : list.get(i)); + //empty strings are marked by an invalid utf single byte sequence. A valid utf stream cannot + //produce this sequence + if ((f != null) && (foi.getCategory().equals(ObjectInspector.Category.PRIMITIVE)) + && ((PrimitiveObjectInspector) foi).getPrimitiveCategory().equals( + PrimitiveObjectInspector.PrimitiveCategory.STRING) + && ((StringObjectInspector) foi).getPrimitiveJavaObject(f).length() == 0) { + serializeStream.write(INVALID_UTF__SINGLE_BYTE, 0, 1); + } else { + LazyBinarySerDe.serialize(serializeStream, f, foi, true, warnedOnceNullMapKey); + } + field[i].set(serializeStream.getData(), streamOffset, serializeStream + .getCount() + - streamOffset); + streamOffset = serializeStream.getCount(); + } + serializedSize = serializeStream.getCount(); + lastOperationSerialize = true; + lastOperationDeserialize = false; + return serializeCache; + } +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarStruct.java serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarStruct.java new file mode 100644 index 0000000..0317024 --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarStruct.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.columnar; + +import java.util.ArrayList; + +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.VInt; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; + +public class LazyBinaryColumnarStruct extends ColumnarStructBase { + + public LazyBinaryColumnarStruct(ObjectInspector oi, ArrayList notSkippedColumnIDs) { + super(oi, notSkippedColumnIDs); + } + + static VInt vInt = new LazyBinaryUtils.VInt(); + + @Override + protected int getLength(ObjectInspector objectInspector, ByteArrayRef cachedByteArrayRef, + int start, int length) { + if (length == 0) { + return -1; + } + Category category = objectInspector.getCategory(); + if (category.equals(Category.PRIMITIVE)) { + PrimitiveCategory primitiveCategory = ((PrimitiveObjectInspector) objectInspector) + .getPrimitiveCategory(); + if (primitiveCategory.equals(PrimitiveCategory.STRING) && (length == 1) && + (cachedByteArrayRef.getData()[start] + == LazyBinaryColumnarSerDe.INVALID_UTF__SINGLE_BYTE[0])) { + return 0; + } + } + return length; + } + + @Override + protected LazyObjectBase createLazyObjectBase(ObjectInspector objectInspector) { + return LazyBinaryFactory.createLazyBinaryObject(objectInspector); + } +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyFactory.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyFactory.java index e927547..677018f 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyFactory.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyFactory.java @@ -214,7 +214,7 @@ public final class LazyFactory { separators, 1, nullSequence, escaped, escapeChar)); } return ObjectInspectorFactory.getColumnarStructObjectInspector(columnNames, - columnObjectInspectors, nullSequence); + columnObjectInspectors); } private LazyFactory() { diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java index 2e2896c..996ff6f 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; * A LazyObject can represent any primitive object or hierarchical object like * array, map or struct. */ -public abstract class LazyObject { +public abstract class LazyObject extends LazyObjectBase { OI oi; @@ -40,27 +40,6 @@ public abstract class LazyObject { this.oi = oi; } - /** - * Set the data for this LazyObject. We take ByteArrayRef instead of byte[] so - * that we will be able to drop the reference to byte[] by a single - * assignment. The ByteArrayRef object can be reused across multiple rows. - * - * @param bytes - * The wrapper of the byte[]. - * @param start - * The start position inside the bytes. - * @param length - * The length of the data, starting from "start" - * @see ByteArrayRef - */ - public abstract void init(ByteArrayRef bytes, int start, int length); - - /** - * If the LazyObject is a primitive Object, then deserialize it and return the - * actual primitive Object. Otherwise (array, map, struct), return this. - */ - public abstract Object getObject(); - @Override public abstract int hashCode(); diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObjectBase.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObjectBase.java new file mode 100644 index 0000000..3334dff --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObjectBase.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.lazy; + +public abstract class LazyObjectBase { + + /** + * Set the data for this LazyObjectBase. We take ByteArrayRef instead of byte[] so + * that we will be able to drop the reference to byte[] by a single + * assignment. The ByteArrayRef object can be reused across multiple rows. + * + * @param bytes + * The wrapper of the byte[]. + * @param start + * The start position inside the bytes. + * @param length + * The length of the data, starting from "start" + * @see ByteArrayRef + */ + public abstract void init(ByteArrayRef bytes, int start, int length); + + /** + * If the LazyObjectBase is a primitive Object, then deserialize it and return the + * actual primitive Object. Otherwise (array, map, struct), return this. + */ + public abstract Object getObject(); + +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java index 24cfacf..de26150 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.serde2.lazy; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -232,39 +231,7 @@ public class LazySimpleSerDe implements SerDe { serdeParams.lastColumnTakesRest = (lastColumnTakesRestString != null && lastColumnTakesRestString .equalsIgnoreCase("true")); - // Read the configuration parameters - String columnNameProperty = tbl.getProperty(Constants.LIST_COLUMNS); - // NOTE: if "columns.types" is missing, all columns will be of String type - String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES); - - // Parse the configuration parameters - - if (columnNameProperty != null && columnNameProperty.length() > 0) { - serdeParams.columnNames = Arrays.asList(columnNameProperty.split(",")); - } else { - serdeParams.columnNames = new ArrayList(); - } - if (columnTypeProperty == null) { - // Default type: all string - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < serdeParams.columnNames.size(); i++) { - if (i > 0) { - sb.append(":"); - } - sb.append(Constants.STRING_TYPE_NAME); - } - columnTypeProperty = sb.toString(); - } - - serdeParams.columnTypes = TypeInfoUtils - .getTypeInfosFromTypeString(columnTypeProperty); - - if (serdeParams.columnNames.size() != serdeParams.columnTypes.size()) { - throw new SerDeException(serdeName + ": columns has " - + serdeParams.columnNames.size() - + " elements while columns.types has " - + serdeParams.columnTypes.size() + " elements!"); - } + LazyUtils.extractColumnInfo(tbl, serdeParams, serdeName); // Create the LazyObject for storing the rows serdeParams.rowTypeInfo = TypeInfoFactory.getStructTypeInfo( diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java index 2cef006..a8fc716 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java @@ -21,7 +21,13 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Properties; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; @@ -31,6 +37,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspecto import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.Text; /** @@ -222,6 +229,43 @@ public final class LazyUtils { return hash; } + public static void extractColumnInfo(Properties tbl, SerDeParameters serdeParams, + String serdeName) throws SerDeException { + // Read the configuration parameters + String columnNameProperty = tbl.getProperty(Constants.LIST_COLUMNS); + // NOTE: if "columns.types" is missing, all columns will be of String type + String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES); + + // Parse the configuration parameters + + if (columnNameProperty != null && columnNameProperty.length() > 0) { + serdeParams.columnNames = Arrays.asList(columnNameProperty.split(",")); + } else { + serdeParams.columnNames = new ArrayList(); + } + if (columnTypeProperty == null) { + // Default type: all string + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < serdeParams.columnNames.size(); i++) { + if (i > 0) { + sb.append(":"); + } + sb.append(Constants.STRING_TYPE_NAME); + } + columnTypeProperty = sb.toString(); + } + + serdeParams.columnTypes = TypeInfoUtils + .getTypeInfosFromTypeString(columnTypeProperty); + + if (serdeParams.columnNames.size() != serdeParams.columnTypes.size()) { + throw new SerDeException(serdeName + ": columns has " + + serdeParams.columnNames.size() + + " elements while columns.types has " + + serdeParams.columnTypes.size() + " elements!"); + } + } + private LazyUtils() { // prevent instantiation } diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryFactory.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryFactory.java index 1440472..b746f8a 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryFactory.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryFactory.java @@ -17,10 +17,14 @@ */ package org.apache.hadoop.hive.serde2.lazybinary; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryListObjectInspector; import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryMapObjectInspector; import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBooleanObjectInspector; @@ -32,6 +36,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjec import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableShortObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableVoidObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; /** * LazyBinaryFactory. @@ -91,4 +96,16 @@ public final class LazyBinaryFactory { private LazyBinaryFactory() { // prevent instantiation } + + public static ObjectInspector createColumnarStructInspector(List columnNames, + List columnTypes) { + ArrayList columnObjectInspectors = new ArrayList( + columnTypes.size()); + for (int i = 0; i < columnTypes.size(); i++) { + columnObjectInspectors + .add(LazyBinaryUtils.getLazyBinaryObjectInspectorFromTypeInfo(columnTypes.get(i))); + } + return ObjectInspectorFactory.getColumnarStructObjectInspector(columnNames, + columnObjectInspectors); + } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java index ea20b34..598683f 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryObject.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hive.serde2.lazybinary; -import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; /** @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; * A LazyBinaryObject can represent any primitive object or hierarchical object * like string, list, map or struct. */ -public abstract class LazyBinaryObject { +public abstract class LazyBinaryObject extends LazyObjectBase { OI oi; @@ -42,30 +42,6 @@ public abstract class LazyBinaryObject { this.oi = oi; } - /** - * Set the data for this LazyBinaryObject. We take ByteArrayRef instead of - * byte[] so that we will be able to drop the reference to byte[] by a single - * assignment. The ByteArrayRef object can be reused across multiple rows. - * - * Never call this function if the object represent a null!!! - * - * @param bytes - * The wrapper of the byte[]. - * @param start - * The start position inside the bytes. - * @param length - * The length of the data, starting from "start" - * @see ByteArrayRef - */ - public abstract void init(ByteArrayRef bytes, int start, int length); - - /** - * If the LazyBinaryObject is a primitive Object, then deserialize it and - * return the actual primitive Object. Otherwise (string, list, map, struct), - * return this. - */ - public abstract Object getObject(); - @Override public abstract int hashCode(); } diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java index 4285ab3..5f31d0c 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java @@ -179,6 +179,7 @@ public class LazyBinarySerDe implements SerDe { */ BytesWritable serializeBytesWritable = new BytesWritable(); ByteStream.Output serializeByteStream = new ByteStream.Output(); + boolean nullMapKey = false; /** * Serialize an object to a byte buffer in a binary compact way. @@ -195,8 +196,8 @@ public class LazyBinarySerDe implements SerDe { serializeByteStream.reset(); // serialize the row as a struct - serializeStruct(serializeByteStream, obj, - (StructObjectInspector) objInspector); + nullMapKey = serializeStruct(serializeByteStream, obj, + (StructObjectInspector) objInspector, nullMapKey); // return the serialized bytes serializeBytesWritable.set(serializeByteStream.getData(), 0, serializeByteStream.getCount()); @@ -207,8 +208,6 @@ public class LazyBinarySerDe implements SerDe { return serializeBytesWritable; } - boolean nullMapKey = false; - /** * Serialize a struct object without writing the byte size. This function is * shared by both row serialization and struct serialization. @@ -219,12 +218,16 @@ public class LazyBinarySerDe implements SerDe { * the struct object to serialize * @param objInspector * the struct object inspector + * @param warnedOnceNullMapKey a boolean indicating whether a warning + * has been issued once already when encountering null map keys + * @return a boolean indicating whether a warning for null map keys has been issued + * once already */ - private void serializeStruct(Output byteStream, Object obj, - StructObjectInspector soi) { + private static boolean serializeStruct(Output byteStream, Object obj, + StructObjectInspector soi, boolean warnedOnceNullMapKey) { // do nothing for null struct if (null == obj) { - return; + return warnedOnceNullMapKey; } /* * Interleave serializing one null byte and 8 struct fields in each round, @@ -243,15 +246,16 @@ public class LazyBinarySerDe implements SerDe { // if this is the last element and serialize the // corresponding 8 struct fields at the same time if (7 == i % 8 || i == size - 1) { - serializeByteStream.write(nullByte); + byteStream.write(nullByte); for (int j = lasti; j <= i; j++) { - serialize(serializeByteStream, soi.getStructFieldData(obj, fields - .get(j)), fields.get(j).getFieldObjectInspector()); + warnedOnceNullMapKey = serialize(byteStream, soi.getStructFieldData(obj, fields + .get(j)), fields.get(j).getFieldObjectInspector(), false, warnedOnceNullMapKey); } lasti = i + 1; nullByte = 0; } } + return warnedOnceNullMapKey; } /** @@ -264,13 +268,19 @@ public class LazyBinarySerDe implements SerDe { * the object to serialize * @param objInspector * the object inspector + * @param skipLengthPrefix a boolean indicating whether length prefix is + * needed for list/map/struct + * @param warnedOnceNullMapKey a boolean indicating whether a warning + * has been issued once already when encountering null map keys + * @return a boolean indicating whether a warning for null map keys has been issued + * once already */ - private void serialize(Output byteStream, Object obj, - ObjectInspector objInspector) { + public static boolean serialize(Output byteStream, Object obj, + ObjectInspector objInspector, boolean skipLengthPrefix, boolean warnedOnceNullMapKey) { // do nothing for null object if (null == obj) { - return; + return warnedOnceNullMapKey; } switch (objInspector.getCategory()) { @@ -278,37 +288,37 @@ public class LazyBinarySerDe implements SerDe { PrimitiveObjectInspector poi = (PrimitiveObjectInspector) objInspector; switch (poi.getPrimitiveCategory()) { case VOID: { - return; + return warnedOnceNullMapKey; } case BOOLEAN: { boolean v = ((BooleanObjectInspector) poi).get(obj); byteStream.write((byte) (v ? 1 : 0)); - return; + return warnedOnceNullMapKey; } case BYTE: { ByteObjectInspector boi = (ByteObjectInspector) poi; byte v = boi.get(obj); byteStream.write(v); - return; + return warnedOnceNullMapKey; } case SHORT: { ShortObjectInspector spoi = (ShortObjectInspector) poi; short v = spoi.get(obj); byteStream.write((byte) (v >> 8)); byteStream.write((byte) (v)); - return; + return warnedOnceNullMapKey; } case INT: { IntObjectInspector ioi = (IntObjectInspector) poi; int v = ioi.get(obj); LazyBinaryUtils.writeVInt(byteStream, v); - return; + return warnedOnceNullMapKey; } case LONG: { LongObjectInspector loi = (LongObjectInspector) poi; long v = loi.get(obj); LazyBinaryUtils.writeVLong(byteStream, v); - return; + return warnedOnceNullMapKey; } case FLOAT: { FloatObjectInspector foi = (FloatObjectInspector) poi; @@ -317,7 +327,7 @@ public class LazyBinarySerDe implements SerDe { byteStream.write((byte) (v >> 16)); byteStream.write((byte) (v >> 8)); byteStream.write((byte) (v)); - return; + return warnedOnceNullMapKey; } case DOUBLE: { DoubleObjectInspector doi = (DoubleObjectInspector) poi; @@ -330,18 +340,20 @@ public class LazyBinarySerDe implements SerDe { byteStream.write((byte) (v >> 16)); byteStream.write((byte) (v >> 8)); byteStream.write((byte) (v)); - return; + return warnedOnceNullMapKey; } case STRING: { StringObjectInspector soi = (StringObjectInspector) poi; Text t = soi.getPrimitiveWritableObject(obj); /* write byte size of the string which is a vint */ int length = t.getLength(); - LazyBinaryUtils.writeVInt(byteStream, length); + if (!skipLengthPrefix) { + LazyBinaryUtils.writeVInt(byteStream, length); + } /* write string itself */ byte[] data = t.getBytes(); byteStream.write(data, 0, length); - return; + return warnedOnceNullMapKey; } default: { throw new RuntimeException("Unrecognized type: " @@ -353,15 +365,18 @@ public class LazyBinarySerDe implements SerDe { ListObjectInspector loi = (ListObjectInspector) objInspector; ObjectInspector eoi = loi.getListElementObjectInspector(); - // 1/ reserve spaces for the byte size of the list - // which is a integer and takes four bytes - int byteSizeStart = byteStream.getCount(); - byteStream.write((byte) 0); - byteStream.write((byte) 0); - byteStream.write((byte) 0); - byteStream.write((byte) 0); - int listStart = byteStream.getCount(); - + int byteSizeStart = 0; + int listStart = 0; + if (!skipLengthPrefix) { + // 1/ reserve spaces for the byte size of the list + // which is a integer and takes four bytes + byteSizeStart = byteStream.getCount(); + byteStream.write((byte) 0); + byteStream.write((byte) 0); + byteStream.write((byte) 0); + byteStream.write((byte) 0); + listStart = byteStream.getCount(); + } // 2/ write the size of the list as a VInt int size = loi.getListLength(obj); LazyBinaryUtils.writeVInt(byteStream, size); @@ -383,19 +398,21 @@ public class LazyBinarySerDe implements SerDe { // 4/ write element by element from the list for (int eid = 0; eid < size; eid++) { - serialize(byteStream, loi.getListElement(obj, eid), eoi); + warnedOnceNullMapKey = serialize(byteStream, loi.getListElement(obj, eid), eoi, + false, warnedOnceNullMapKey); } - // 5/ update the list byte size - int listEnd = byteStream.getCount(); - int listSize = listEnd - listStart; - byte[] bytes = byteStream.getData(); - bytes[byteSizeStart] = (byte) (listSize >> 24); - bytes[byteSizeStart + 1] = (byte) (listSize >> 16); - bytes[byteSizeStart + 2] = (byte) (listSize >> 8); - bytes[byteSizeStart + 3] = (byte) (listSize); - - return; + if (!skipLengthPrefix) { + // 5/ update the list byte size + int listEnd = byteStream.getCount(); + int listSize = listEnd - listStart; + byte[] bytes = byteStream.getData(); + bytes[byteSizeStart] = (byte) (listSize >> 24); + bytes[byteSizeStart + 1] = (byte) (listSize >> 16); + bytes[byteSizeStart + 2] = (byte) (listSize >> 8); + bytes[byteSizeStart + 3] = (byte) (listSize); + } + return warnedOnceNullMapKey; } case MAP: { MapObjectInspector moi = (MapObjectInspector) objInspector; @@ -403,15 +420,19 @@ public class LazyBinarySerDe implements SerDe { ObjectInspector voi = moi.getMapValueObjectInspector(); Map map = moi.getMap(obj); - // 1/ reserve spaces for the byte size of the map - // which is a integer and takes four bytes - int byteSizeStart = byteStream.getCount(); - byteStream.write((byte) 0); - byteStream.write((byte) 0); - byteStream.write((byte) 0); - byteStream.write((byte) 0); - int mapStart = byteStream.getCount(); - + int byteSizeStart = 0; + int mapStart = 0; + if (!skipLengthPrefix) { + // 1/ reserve spaces for the byte size of the map + // which is a integer and takes four bytes + byteSizeStart = byteStream.getCount(); + byteStream.write((byte) 0); + byteStream.write((byte) 0); + byteStream.write((byte) 0); + byteStream.write((byte) 0); + mapStart = byteStream.getCount(); + } + // 2/ write the size of the map which is a VInt int size = map.size(); LazyBinaryUtils.writeVInt(byteStream, size); @@ -423,8 +444,8 @@ public class LazyBinarySerDe implements SerDe { // set the bit to 1 if a key is not null if (null != entry.getKey()) { nullByte |= 1 << (b % 8); - } else if (!nullMapKey) { - nullMapKey = true; + } else if (!warnedOnceNullMapKey) { + warnedOnceNullMapKey = true; LOG.warn("Null map key encountered! Ignoring similar problems."); } b++; @@ -443,44 +464,50 @@ public class LazyBinarySerDe implements SerDe { // 4/ write key-value pairs one by one for (Map.Entry entry : map.entrySet()) { - serialize(byteStream, entry.getKey(), koi); - serialize(byteStream, entry.getValue(), voi); + warnedOnceNullMapKey = serialize(byteStream, entry.getKey(), koi, false, warnedOnceNullMapKey); + warnedOnceNullMapKey = serialize(byteStream, entry.getValue(), voi, false, warnedOnceNullMapKey); } - // 5/ update the byte size of the map - int mapEnd = byteStream.getCount(); - int mapSize = mapEnd - mapStart; - byte[] bytes = byteStream.getData(); - bytes[byteSizeStart] = (byte) (mapSize >> 24); - bytes[byteSizeStart + 1] = (byte) (mapSize >> 16); - bytes[byteSizeStart + 2] = (byte) (mapSize >> 8); - bytes[byteSizeStart + 3] = (byte) (mapSize); - - return; + if (!skipLengthPrefix) { + // 5/ update the byte size of the map + int mapEnd = byteStream.getCount(); + int mapSize = mapEnd - mapStart; + byte[] bytes = byteStream.getData(); + bytes[byteSizeStart] = (byte) (mapSize >> 24); + bytes[byteSizeStart + 1] = (byte) (mapSize >> 16); + bytes[byteSizeStart + 2] = (byte) (mapSize >> 8); + bytes[byteSizeStart + 3] = (byte) (mapSize); + } + return warnedOnceNullMapKey; } case STRUCT: { - // 1/ reserve spaces for the byte size of the struct - // which is a integer and takes four bytes - int byteSizeStart = byteStream.getCount(); - byteStream.write((byte) 0); - byteStream.write((byte) 0); - byteStream.write((byte) 0); - byteStream.write((byte) 0); - int structStart = byteStream.getCount(); - + int byteSizeStart = 0; + int structStart = 0; + if (!skipLengthPrefix) { + // 1/ reserve spaces for the byte size of the struct + // which is a integer and takes four bytes + byteSizeStart = byteStream.getCount(); + byteStream.write((byte) 0); + byteStream.write((byte) 0); + byteStream.write((byte) 0); + byteStream.write((byte) 0); + structStart = byteStream.getCount(); + } // 2/ serialize the struct - serializeStruct(byteStream, obj, (StructObjectInspector) objInspector); - - // 3/ update the byte size of the struct - int structEnd = byteStream.getCount(); - int structSize = structEnd - structStart; - byte[] bytes = byteStream.getData(); - bytes[byteSizeStart] = (byte) (structSize >> 24); - bytes[byteSizeStart + 1] = (byte) (structSize >> 16); - bytes[byteSizeStart + 2] = (byte) (structSize >> 8); - bytes[byteSizeStart + 3] = (byte) (structSize); - - return; + warnedOnceNullMapKey = serializeStruct(byteStream, obj, (StructObjectInspector) objInspector, + warnedOnceNullMapKey); + + if (!skipLengthPrefix) { + // 3/ update the byte size of the struct + int structEnd = byteStream.getCount(); + int structSize = structEnd - structStart; + byte[] bytes = byteStream.getData(); + bytes[byteSizeStart] = (byte) (structSize >> 24); + bytes[byteSizeStart + 1] = (byte) (structSize >> 16); + bytes[byteSizeStart + 2] = (byte) (structSize >> 8); + bytes[byteSizeStart + 3] = (byte) (structSize); + } + return warnedOnceNullMapKey; } default: { throw new RuntimeException("Unrecognized type: " diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ColumnarStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ColumnarStructObjectInspector.java index 66f4f8d..881c3c1 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ColumnarStructObjectInspector.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ColumnarStructObjectInspector.java @@ -23,12 +23,11 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.serde2.columnar.ColumnarStruct; -import org.apache.hadoop.io.Text; +import org.apache.hadoop.hive.serde2.columnar.ColumnarStructBase; /** * ColumnarStructObjectInspector works on struct data that is stored in - * ColumnarStruct. + * ColumnarStructBase. * * The names of the struct fields and the internal structure of the struct * fields are specified in the ctor of the ColumnarStructObjectInspector. @@ -78,22 +77,18 @@ class ColumnarStructObjectInspector extends StructObjectInspector { return ObjectInspectorUtils.getStandardStructTypeName(this); } - Text nullSequence; - /** * Call ObjectInspectorFactory.getLazySimpleStructObjectInspector instead. */ public ColumnarStructObjectInspector(List structFieldNames, - List structFieldObjectInspectors, Text nullSequence) { - init(structFieldNames, structFieldObjectInspectors, nullSequence); + List structFieldObjectInspectors) { + init(structFieldNames, structFieldObjectInspectors); } protected void init(List structFieldNames, - List structFieldObjectInspectors, Text nullSequence) { + List structFieldObjectInspectors) { assert (structFieldNames.size() == structFieldObjectInspectors.size()); - this.nullSequence = nullSequence; - fields = new ArrayList(structFieldNames.size()); for (int i = 0; i < structFieldNames.size(); i++) { fields.add(new MyField(i, structFieldNames.get(i), @@ -101,14 +96,11 @@ class ColumnarStructObjectInspector extends StructObjectInspector { } } - protected ColumnarStructObjectInspector(List fields, - Text nullSequence) { - init(fields, nullSequence); + protected ColumnarStructObjectInspector(List fields) { + init(fields); } - protected void init(List fields, Text nullSequence) { - this.nullSequence = nullSequence; - + protected void init(List fields) { this.fields = new ArrayList(fields.size()); for (int i = 0; i < fields.size(); i++) { this.fields.add(new MyField(i, fields.get(i).getFieldName(), fields @@ -138,7 +130,7 @@ class ColumnarStructObjectInspector extends StructObjectInspector { if (data == null) { return null; } - ColumnarStruct struct = (ColumnarStruct) data; + ColumnarStructBase struct = (ColumnarStructBase) data; MyField f = (MyField) fieldRef; int fieldID = f.getFieldID(); @@ -152,7 +144,7 @@ class ColumnarStructObjectInspector extends StructObjectInspector { if (data == null) { return null; } - ColumnarStruct struct = (ColumnarStruct) data; + ColumnarStructBase struct = (ColumnarStructBase) data; return struct.getFieldsAsList(); } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java index 90561a1..0c8cc42 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java @@ -30,7 +30,6 @@ import java.util.Map; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; -import org.apache.hadoop.io.Text; /** * ObjectInspectorFactory is the primary way to create new ObjectInspector @@ -273,16 +272,15 @@ public final class ObjectInspectorFactory { public static ColumnarStructObjectInspector getColumnarStructObjectInspector( List structFieldNames, - List structFieldObjectInspectors, Text nullSequence) { + List structFieldObjectInspectors) { ArrayList signature = new ArrayList(); signature.add(structFieldNames); signature.add(structFieldObjectInspectors); - signature.add(nullSequence.toString()); ColumnarStructObjectInspector result = cachedColumnarStructObjectInspector .get(signature); if (result == null) { result = new ColumnarStructObjectInspector(structFieldNames, - structFieldObjectInspectors, nullSequence); + structFieldObjectInspectors); cachedColumnarStructObjectInspector.put(signature, result); } return result; diff --git serde/src/test/org/apache/hadoop/hive/serde2/columnar/TestLazyBinaryColumnarSerDe.java serde/src/test/org/apache/hadoop/hive/serde2/columnar/TestLazyBinaryColumnarSerDe.java new file mode 100644 index 0000000..095b84e --- /dev/null +++ serde/src/test/org/apache/hadoop/hive/serde2/columnar/TestLazyBinaryColumnarSerDe.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.columnar; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.TreeMap; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.CrossMapEqualComparer; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.SimpleMapEqualComparer; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +public class TestLazyBinaryColumnarSerDe extends TestCase { + + private static class InnerStruct { + public InnerStruct(Integer i, Long l) { + mInt = i; + mLong = l; + } + Integer mInt; + Long mLong; + } + + private static class OuterStruct { + Byte mByte; + Short mShort; + Integer mInt; + Long mLong; + Float mFloat; + Double mDouble; + String mString; + List mArray; + Map mMap; + InnerStruct mStruct; + } + + public void testSerDe() throws SerDeException { + StructObjectInspector oi = (StructObjectInspector) ObjectInspectorFactory + .getReflectionObjectInspector(OuterStruct.class, ObjectInspectorOptions.JAVA); + String cols = ObjectInspectorUtils.getFieldNames(oi); + Properties props = new Properties(); + props.setProperty(Constants.LIST_COLUMNS, cols); + props.setProperty(Constants.LIST_COLUMN_TYPES, ObjectInspectorUtils.getFieldTypes(oi)); + LazyBinaryColumnarSerDe serde = new LazyBinaryColumnarSerDe(); + serde.initialize(new Configuration(), props); + + OuterStruct outerStruct = new OuterStruct(); + outerStruct.mByte = 1; + outerStruct.mShort = 2; + outerStruct.mInt = 3; + outerStruct.mLong = 4l; + outerStruct.mFloat = 5.01f; + outerStruct.mDouble = 6.001d; + outerStruct.mString = "seven"; + InnerStruct is1 = new InnerStruct(8, 9l); + InnerStruct is2 = new InnerStruct(10, 11l); + outerStruct.mArray = new ArrayList(2); + outerStruct.mArray.add(is1); + outerStruct.mArray.add(is2); + outerStruct.mMap = new TreeMap(); + outerStruct.mMap.put(new String("twelve"), new InnerStruct(13, 14l)); + outerStruct.mMap.put(new String("fifteen"), new InnerStruct(16, 17l)); + outerStruct.mStruct = new InnerStruct(18, 19l); + BytesRefArrayWritable braw = (BytesRefArrayWritable) serde.serialize(outerStruct, oi); + + ObjectInspector out_oi = serde.getObjectInspector(); + Object out_o = serde.deserialize(braw); + if (0 != ObjectInspectorUtils.compare(outerStruct, oi, out_o, out_oi, new CrossMapEqualComparer())) { + System.out.println("expected = " + + SerDeUtils.getJSONString(outerStruct, oi)); + System.out.println("actual = " + + SerDeUtils.getJSONString(out_o, out_oi)); + fail("Deserialized object does not compare"); + } + } + + public void testSerDeEmpties() throws SerDeException { + StructObjectInspector oi = (StructObjectInspector) ObjectInspectorFactory + .getReflectionObjectInspector(OuterStruct.class, ObjectInspectorOptions.JAVA); + String cols = ObjectInspectorUtils.getFieldNames(oi); + Properties props = new Properties(); + props.setProperty(Constants.LIST_COLUMNS, cols); + props.setProperty(Constants.LIST_COLUMN_TYPES, ObjectInspectorUtils.getFieldTypes(oi)); + LazyBinaryColumnarSerDe serde = new LazyBinaryColumnarSerDe(); + serde.initialize(new Configuration(), props); + + OuterStruct outerStruct = new OuterStruct(); + outerStruct.mByte = 101; + outerStruct.mShort = 2002; + outerStruct.mInt = 3003; + outerStruct.mLong = 4004l; + outerStruct.mFloat = 5005.01f; + outerStruct.mDouble = 6006.001d; + outerStruct.mString = ""; + outerStruct.mArray = new ArrayList(); + outerStruct.mMap = new TreeMap(); + outerStruct.mStruct = new InnerStruct(180018, 190019l); + BytesRefArrayWritable braw = (BytesRefArrayWritable) serde.serialize(outerStruct, oi); + + ObjectInspector out_oi = serde.getObjectInspector(); + Object out_o = serde.deserialize(braw); + if (0 != ObjectInspectorUtils.compare(outerStruct, oi, out_o, out_oi, new SimpleMapEqualComparer())) { + System.out.println("expected = " + + SerDeUtils.getJSONString(outerStruct, oi)); + System.out.println("actual = " + + SerDeUtils.getJSONString(out_o, out_oi)); + fail("Deserialized object does not compare"); + } + } + + + public void testSerDeOuterNulls() throws SerDeException { + StructObjectInspector oi = (StructObjectInspector) ObjectInspectorFactory + .getReflectionObjectInspector(OuterStruct.class, ObjectInspectorOptions.JAVA); + String cols = ObjectInspectorUtils.getFieldNames(oi); + Properties props = new Properties(); + props.setProperty(Constants.LIST_COLUMNS, cols); + props.setProperty(Constants.LIST_COLUMN_TYPES, ObjectInspectorUtils.getFieldTypes(oi)); + LazyBinaryColumnarSerDe serde = new LazyBinaryColumnarSerDe(); + serde.initialize(new Configuration(), props); + + OuterStruct outerStruct = new OuterStruct(); + BytesRefArrayWritable braw = (BytesRefArrayWritable) serde.serialize(outerStruct, oi); + + ObjectInspector out_oi = serde.getObjectInspector(); + Object out_o = serde.deserialize(braw); + if (0 != ObjectInspectorUtils.compare(outerStruct, oi, out_o, out_oi, new SimpleMapEqualComparer())) { + System.out.println("expected = " + + SerDeUtils.getJSONString(outerStruct, oi)); + System.out.println("actual = " + + SerDeUtils.getJSONString(out_o, out_oi)); + fail("Deserialized object does not compare"); + } + } + + public void testSerDeInnerNulls() throws SerDeException { + StructObjectInspector oi = (StructObjectInspector) ObjectInspectorFactory + .getReflectionObjectInspector(OuterStruct.class, ObjectInspectorOptions.JAVA); + String cols = ObjectInspectorUtils.getFieldNames(oi); + Properties props = new Properties(); + props.setProperty(Constants.LIST_COLUMNS, cols); + props.setProperty(Constants.LIST_COLUMN_TYPES, ObjectInspectorUtils.getFieldTypes(oi)); + LazyBinaryColumnarSerDe serde = new LazyBinaryColumnarSerDe(); + serde.initialize(new Configuration(), props); + + OuterStruct outerStruct = new OuterStruct(); + outerStruct.mByte = 1; + outerStruct.mShort = 2; + outerStruct.mInt = 3; + outerStruct.mLong = 4l; + outerStruct.mFloat = 5.01f; + outerStruct.mDouble = 6.001d; + outerStruct.mString = "seven"; + InnerStruct is1 = new InnerStruct(null, 9l); + InnerStruct is2 = new InnerStruct(10, null); + outerStruct.mArray = new ArrayList(2); + outerStruct.mArray.add(is1); + outerStruct.mArray.add(is2); + outerStruct.mMap = new HashMap(); + outerStruct.mMap.put(null, new InnerStruct(13, 14l)); + outerStruct.mMap.put(new String("fifteen"), null); + outerStruct.mStruct = new InnerStruct(null, null); + BytesRefArrayWritable braw = (BytesRefArrayWritable) serde.serialize(outerStruct, oi); + + ObjectInspector out_oi = serde.getObjectInspector(); + Object out_o = serde.deserialize(braw); + if (0 != ObjectInspectorUtils.compare(outerStruct, oi, out_o, out_oi, new SimpleMapEqualComparer())) { + System.out.println("expected = " + + SerDeUtils.getJSONString(outerStruct, oi)); + System.out.println("actual = " + + SerDeUtils.getJSONString(out_o, out_oi)); + fail("Deserialized object does not compare"); + } + } + + +}