diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index f4c3b81..7e41b7a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -185,6 +185,7 @@ void init(JobConf jconf, Operator reducer, boolean vectorized, TableDesc keyT new BinarySortableDeserializeRead( VectorizedBatchUtil.typeInfosFromStructObjectInspector( keyStructInspector), + /* useExternalBuffer */ true, binarySortableSerDe.getSortOrders())); keyBinarySortableDeserializeToRow.init(0); @@ -194,7 +195,8 @@ void init(JobConf jconf, Operator reducer, boolean vectorized, TableDesc keyT new VectorDeserializeRow( new LazyBinaryDeserializeRead( VectorizedBatchUtil.typeInfosFromStructObjectInspector( - valueStructInspectors))); + valueStructInspectors), + /* useExternalBuffer */ true)); valueLazyBinaryDeserializeToRow.init(firstValueColumnOffset); // Create data buffers for value bytes column vectors. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java index f66916b..47bef43 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java @@ -74,6 +74,12 @@ private TypeInfo[] sourceTypeInfos; + private byte[] inputBytes; + + /** + * @param deserializeRead Set useExternalBuffer to true to avoid buffer copying and to get + * more efficient reading. + */ public VectorDeserializeRow(T deserializeRead) { this(); this.deserializeRead = deserializeRead; @@ -338,10 +344,15 @@ public void init() throws HiveException { * @param batch * @param batchIndex * @param logicalColumnIndex + * @param canRetainByteRef Specify true when it is safe to retain references to the bytes + * source for DeserializeRead. I.e. the STRING, CHAR/VARCHAR data + * can be set in BytesColumnVector with setRef instead of with setVal + * which copies data. An example of a safe usage is referring to bytes + * in a hash table entry that is immutable. * @throws IOException */ private void deserializeRowColumn(VectorizedRowBatch batch, int batchIndex, - int logicalColumnIndex) throws IOException { + int logicalColumnIndex, boolean canRetainByteRef) throws IOException { Category sourceCategory = sourceCategories[logicalColumnIndex]; if (sourceCategory == null) { /* @@ -406,42 +417,114 @@ private void deserializeRowColumn(VectorizedRowBatch batch, int batchIndex, break; case BINARY: case STRING: - ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( - batchIndex, - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - deserializeRead.currentBytesLength); + { + BytesColumnVector bytesColVec = ((BytesColumnVector) batch.cols[projectionColumnNum]); + if (deserializeRead.currentExternalBufferNeeded) { + bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen); + deserializeRead.copyToExternalBuffer( + bytesColVec.getValPreallocatedBytes(), bytesColVec.getValPreallocatedStart()); + bytesColVec.setValPreallocated( + batchIndex, + deserializeRead.currentExternalBufferNeededLen); + } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) { + bytesColVec.setRef( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength); + } else { + bytesColVec.setVal( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength); + } + } break; case VARCHAR: { // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method // that does not use Java String objects. - int adjustedLength = StringExpr.truncate( - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - deserializeRead.currentBytesLength, - maxLengths[logicalColumnIndex]); - ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( - batchIndex, - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - adjustedLength); + BytesColumnVector bytesColVec = ((BytesColumnVector) batch.cols[projectionColumnNum]); + if (deserializeRead.currentExternalBufferNeeded) { + // Write directly into our BytesColumnVector value buffer. + bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen); + byte[] convertBuffer = bytesColVec.getValPreallocatedBytes(); + int convertBufferStart = bytesColVec.getValPreallocatedStart(); + deserializeRead.copyToExternalBuffer( + convertBuffer, + convertBufferStart); + bytesColVec.setValPreallocated( + batchIndex, + StringExpr.truncate( + convertBuffer, + convertBufferStart, + deserializeRead.currentExternalBufferNeededLen, + maxLengths[logicalColumnIndex])); + } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) { + bytesColVec.setRef( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + StringExpr.truncate( + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength, + maxLengths[logicalColumnIndex])); + } else { + bytesColVec.setVal( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + StringExpr.truncate( + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength, + maxLengths[logicalColumnIndex])); + } } break; case CHAR: { // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method // that does not use Java String objects. - int adjustedLength = StringExpr.rightTrimAndTruncate( - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - deserializeRead.currentBytesLength, - maxLengths[logicalColumnIndex]); - ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( - batchIndex, - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - adjustedLength); + BytesColumnVector bytesColVec = ((BytesColumnVector) batch.cols[projectionColumnNum]); + if (deserializeRead.currentExternalBufferNeeded) { + // Write directly into our BytesColumnVector value buffer. + bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen); + byte[] convertBuffer = bytesColVec.getValPreallocatedBytes(); + int convertBufferStart = bytesColVec.getValPreallocatedStart(); + deserializeRead.copyToExternalBuffer( + convertBuffer, + convertBufferStart); + bytesColVec.setValPreallocated( + batchIndex, + StringExpr.rightTrimAndTruncate( + convertBuffer, + convertBufferStart, + deserializeRead.currentExternalBufferNeededLen, + maxLengths[logicalColumnIndex])); + } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) { + bytesColVec.setRef( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + StringExpr.rightTrimAndTruncate( + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength, + maxLengths[logicalColumnIndex])); + } else { + bytesColVec.setVal( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + StringExpr.rightTrimAndTruncate( + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength, + maxLengths[logicalColumnIndex])); + } } break; case DECIMAL: @@ -644,6 +727,7 @@ private void deserializeConvertRowColumn(VectorizedRowBatch batch, int batchInde * @param length */ public void setBytes(byte[] bytes, int offset, int length) { + inputBytes = bytes; deserializeRead.set(bytes, offset, length); } @@ -653,6 +737,10 @@ public void setBytes(byte[] bytes, int offset, int length) { * Use getDetailedReadPositionString to get detailed read position information to help * diagnose exceptions that are thrown... * + * This version of deserialize does not keep byte references to string/char/varchar/binary data + * type field. The bytes are copied into the BytesColumnVector buffer with setVal. + * (See deserializeByRef below if keep references is safe). + * * @param batch * @param batchIndex * @throws IOException @@ -663,12 +751,49 @@ public void deserialize(VectorizedRowBatch batch, int batchIndex) throws IOExcep if (isConvert[i]) { deserializeConvertRowColumn(batch, batchIndex, i); } else { - deserializeRowColumn(batch, batchIndex, i); + // Pass false for canRetainByteRef since we will NOT be keeping byte references to the input + // bytes with the BytesColumnVector.setRef method. + deserializeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ false); } } deserializeRead.extraFieldsCheck(); } + /** + * Deserialize a row from the range of bytes specified by setBytes. + * + * Use this method instead of deserialize when it is safe to retain references to the bytes source + * for DeserializeRead. I.e. the STRING, CHAR/VARCHAR data can be set in BytesColumnVector with + * setRef instead of with setVal which copies data. + * + * An example of a safe usage: + * Referring to bytes in a hash table entry that is immutable. + * + * An example of a unsafe usage: + * Referring to bytes in a reduce receive buffer that will be overwritten with new data. + * + * Use getDetailedReadPositionString to get detailed read position information to help + * diagnose exceptions that are thrown... + * + * @param batch + * @param batchIndex + * @throws IOException + */ + public void deserializeByRef(VectorizedRowBatch batch, int batchIndex) throws IOException { + final int count = isConvert.length; + for (int i = 0; i < count; i++) { + if (isConvert[i]) { + deserializeConvertRowColumn(batch, batchIndex, i); + } else { + // Pass true for canRetainByteRef since we will be keeping byte references to the input + // bytes with the BytesColumnVector.setRef method. + deserializeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ true); + } + } + deserializeRead.extraFieldsCheck(); + } + + public String getDetailedReadPositionString() { return deserializeRead.getDetailedReadPositionString(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java index 2bdc59b..c7fa0db 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -261,7 +261,10 @@ public void init(Configuration hconf) LazySimpleSerDe.class.getName()); LazySimpleDeserializeRead lazySimpleDeserializeRead = - new LazySimpleDeserializeRead(dataTypeInfos, simpleSerdeParams); + new LazySimpleDeserializeRead( + dataTypeInfos, + /* useExternalBuffer */ true, + simpleSerdeParams); vectorDeserializeRow = new VectorDeserializeRow(lazySimpleDeserializeRead); @@ -277,7 +280,9 @@ public void init(Configuration hconf) case LAZY_BINARY: { LazyBinaryDeserializeRead lazyBinaryDeserializeRead = - new LazyBinaryDeserializeRead(dataTypeInfos); + new LazyBinaryDeserializeRead( + dataTypeInfos, + /* useExternalBuffer */ true); vectorDeserializeRow = new VectorDeserializeRow(lazyBinaryDeserializeRead); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java index 24668f9..c288731 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java @@ -571,7 +571,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { new VectorDeserializeRow( new LazyBinaryDeserializeRead( VectorizedBatchUtil.typeInfosFromTypeNames( - smallTableMapping.getTypeNames()))); + smallTableMapping.getTypeNames()), + /* useExternalBuffer */ true)); smallTableVectorDeserializeRow.init(smallTableMapping.getOutputColumns()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index 469f86a..21a01e6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -144,7 +144,8 @@ protected void doSmallTableDeserializeRow(VectorizedRowBatch batch, int batchInd smallTableVectorDeserializeRow.setBytes(bytes, offset, length); try { - smallTableVectorDeserializeRow.deserialize(batch, batchIndex); + // Our hash tables are immutable. We can safely do by reference STRING, CHAR/VARCHAR, etc. + smallTableVectorDeserializeRow.deserializeByRef(batch, batchIndex); } catch (Exception e) { throw new HiveException( "\nHashMapResult detail: " + @@ -442,7 +443,9 @@ private void setupSpillSerDe(VectorizedRowBatch batch) throws HiveException { bigTableVectorDeserializeRow = new VectorDeserializeRow( - new LazyBinaryDeserializeRead(bigTableTypeInfos)); + new LazyBinaryDeserializeRead( + bigTableTypeInfos, + /* useExternalBuffer */ true)); bigTableVectorDeserializeRow.init(noNullsProjection); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java index ee66d5b..726a937 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java @@ -267,7 +267,10 @@ public VectorMapJoinFastLongHashTable( this.isOuterJoin = isOuterJoin; this.hashTableKeyType = hashTableKeyType; PrimitiveTypeInfo[] primitiveTypeInfos = { hashTableKeyType.getPrimitiveTypeInfo() }; - keyBinarySortableDeserializeRead = new BinarySortableDeserializeRead(primitiveTypeInfos); + keyBinarySortableDeserializeRead = + new BinarySortableDeserializeRead( + primitiveTypeInfos, + /* useExternalBuffer */ false); allocateBucketArray(); useMinMax = minMaxEnabled; min = Long.MAX_VALUE; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java index bf378ac..456e6ba 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java @@ -67,6 +67,8 @@ public VectorMapJoinFastStringCommon(boolean isOuterJoin) { this.isOuterJoin = isOuterJoin; PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo }; keyBinarySortableDeserializeRead = - new BinarySortableDeserializeRead(primitiveTypeInfos); + new BinarySortableDeserializeRead( + primitiveTypeInfos, + /* useExternalBuffer */ false); } } \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java index 0eabc44..ac85899 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java @@ -161,8 +161,6 @@ public VectorMapJoinOptimizedLongCommon( min = Long.MAX_VALUE; max = Long.MIN_VALUE; this.hashTableKeyType = hashTableKeyType; - // PrimitiveTypeInfo[] primitiveTypeInfos = { hashTableKeyType.getPrimitiveTypeInfo() }; - // keyBinarySortableDeserializeRead = new BinarySortableDeserializeRead(primitiveTypeInfos); keyBinarySortableSerializeWrite = new BinarySortableSerializeWrite(1); output = new Output(); keyBinarySortableSerializeWrite.set(output); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java index c6704f9..238c136 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.OpenCSVSerde; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; @@ -45,6 +46,7 @@ import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; import org.apache.hadoop.hive.serde2.fast.DeserializeRead; @@ -69,6 +71,8 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import com.google.common.base.Charsets; + import junit.framework.TestCase; /** @@ -305,7 +309,8 @@ void serializeBatch(VectorizedRowBatch batch, VectorSerializeRow vectorSerialize } } - void testVectorSerializeRow(int caseNum, Random r, SerializationType serializationType) throws HiveException, IOException, SerDeException { + void testVectorSerializeRow(int caseNum, Random r, SerializationType serializationType) + throws HiveException, IOException, SerDeException { String[] emptyScratchTypeNames = new String[0]; @@ -324,11 +329,11 @@ void testVectorSerializeRow(int caseNum, Random r, SerializationType serializati SerializeWrite serializeWrite; switch (serializationType) { case BINARY_SORTABLE: - deserializeRead = new BinarySortableDeserializeRead(source.primitiveTypeInfos()); + deserializeRead = new BinarySortableDeserializeRead(source.primitiveTypeInfos(), /* useExternalBuffer */ false); serializeWrite = new BinarySortableSerializeWrite(fieldCount); break; case LAZY_BINARY: - deserializeRead = new LazyBinaryDeserializeRead(source.primitiveTypeInfos()); + deserializeRead = new LazyBinaryDeserializeRead(source.primitiveTypeInfos(), /* useExternalBuffer */ false); serializeWrite = new LazyBinarySerializeWrite(fieldCount); break; case LAZY_SIMPLE: @@ -336,7 +341,7 @@ void testVectorSerializeRow(int caseNum, Random r, SerializationType serializati StructObjectInspector rowObjectInspector = source.rowStructObjectInspector(); LazySerDeParameters lazySerDeParams = getSerDeParams(rowObjectInspector); byte separator = (byte) '\t'; - deserializeRead = new LazySimpleDeserializeRead(source.primitiveTypeInfos(), + deserializeRead = new LazySimpleDeserializeRead(source.primitiveTypeInfos(), /* useExternalBuffer */ false, separator, lazySerDeParams); serializeWrite = new LazySimpleSerializeWrite(fieldCount, separator, lazySerDeParams); @@ -367,7 +372,7 @@ void testVectorSerializeRow(int caseNum, Random r, SerializationType serializati } void examineBatch(VectorizedRowBatch batch, VectorExtractRow vectorExtractRow, - Object[][] randomRows, int firstRandomRowIndex ) { + PrimitiveTypeInfo[] primitiveTypeInfos, Object[][] randomRows, int firstRandomRowIndex ) { int rowSize = vectorExtractRow.getCount(); Object[] row = new Object[rowSize]; @@ -381,13 +386,14 @@ void examineBatch(VectorizedRowBatch batch, VectorExtractRow vectorExtractRow, fail("Unexpected NULL from extractRow"); } if (!row[c].equals(expectedRow[c])) { - fail("Row " + (firstRandomRowIndex + i) + " and column " + c + " mismatch"); + fail("Row " + (firstRandomRowIndex + i) + " and column " + c + " mismatch (" + primitiveTypeInfos[c].getPrimitiveCategory() + " actual value " + row[c] + " and expected value " + expectedRow[c] + ")"); } } } } - private Output serializeRow(Object[] row, VectorRandomRowSource source, SerializeWrite serializeWrite) throws HiveException, IOException { + private Output serializeRow(Object[] row, VectorRandomRowSource source, + SerializeWrite serializeWrite) throws HiveException, IOException { Output output = new Output(); serializeWrite.set(output); PrimitiveTypeInfo[] primitiveTypeInfos = source.primitiveTypeInfos(); @@ -514,9 +520,7 @@ private Output serializeRow(Object[] row, VectorRandomRowSource source, Serializ return output; } - private Properties createProperties(String fieldNames, String fieldTypes) { - Properties tbl = new Properties(); - + private void addToProperties(Properties tbl, String fieldNames, String fieldTypes) { // Set the configuration parameters tbl.setProperty(serdeConstants.SERIALIZATION_FORMAT, "9"); @@ -524,19 +528,23 @@ private Properties createProperties(String fieldNames, String fieldTypes) { tbl.setProperty("columns.types", fieldTypes); tbl.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, "NULL"); + } - return tbl; + private LazySerDeParameters getSerDeParams( StructObjectInspector rowObjectInspector) throws SerDeException { + return getSerDeParams(new Configuration(), new Properties(), rowObjectInspector); } - private LazySerDeParameters getSerDeParams(StructObjectInspector rowObjectInspector) throws SerDeException { + private LazySerDeParameters getSerDeParams(Configuration conf, Properties tbl, StructObjectInspector rowObjectInspector) throws SerDeException { String fieldNames = ObjectInspectorUtils.getFieldNames(rowObjectInspector); String fieldTypes = ObjectInspectorUtils.getFieldTypes(rowObjectInspector); - Configuration conf = new Configuration(); - Properties tbl = createProperties(fieldNames, fieldTypes); + addToProperties(tbl, fieldNames, fieldTypes); return new LazySerDeParameters(conf, tbl, LazySimpleSerDe.class.getName()); } - void testVectorDeserializeRow(int caseNum, Random r, SerializationType serializationType) throws HiveException, IOException, SerDeException { + void testVectorDeserializeRow(int caseNum, Random r, SerializationType serializationType, + boolean alternate1, boolean alternate2, + boolean useExternalBuffer) + throws HiveException, IOException, SerDeException { String[] emptyScratchTypeNames = new String[0]; @@ -552,24 +560,88 @@ void testVectorDeserializeRow(int caseNum, Random r, SerializationType serializa Arrays.fill(cv.isNull, true); } + PrimitiveTypeInfo[] primitiveTypeInfos = source.primitiveTypeInfos(); int fieldCount = source.typeNames().size(); DeserializeRead deserializeRead; SerializeWrite serializeWrite; switch (serializationType) { case BINARY_SORTABLE: - deserializeRead = new BinarySortableDeserializeRead(source.primitiveTypeInfos()); - serializeWrite = new BinarySortableSerializeWrite(fieldCount); + boolean useColumnSortOrderIsDesc = alternate1; + if (!useColumnSortOrderIsDesc) { + deserializeRead = new BinarySortableDeserializeRead(source.primitiveTypeInfos(), useExternalBuffer); + serializeWrite = new BinarySortableSerializeWrite(fieldCount); + } else { + boolean[] columnSortOrderIsDesc = new boolean[fieldCount]; + for (int i = 0; i < fieldCount; i++) { + columnSortOrderIsDesc[i] = r.nextBoolean(); + } + deserializeRead = new BinarySortableDeserializeRead(source.primitiveTypeInfos(), useExternalBuffer, + columnSortOrderIsDesc); + + byte[] columnNullMarker = new byte[fieldCount]; + byte[] columnNotNullMarker = new byte[fieldCount]; + for (int i = 0; i < fieldCount; i++) { + if (columnSortOrderIsDesc[i]) { + // Descending + // Null last (default for descending order) + columnNullMarker[i] = BinarySortableSerDe.ZERO; + columnNotNullMarker[i] = BinarySortableSerDe.ONE; + } else { + // Ascending + // Null first (default for ascending order) + columnNullMarker[i] = BinarySortableSerDe.ZERO; + columnNotNullMarker[i] = BinarySortableSerDe.ONE; + } + } + serializeWrite = new BinarySortableSerializeWrite(columnSortOrderIsDesc, columnNullMarker, columnNotNullMarker); + } + boolean useBinarySortableCharsNeedingEscape = alternate2; + if (useBinarySortableCharsNeedingEscape) { + source.addBinarySortableAlphabets(); + } break; case LAZY_BINARY: - deserializeRead = new LazyBinaryDeserializeRead(source.primitiveTypeInfos()); + deserializeRead = new LazyBinaryDeserializeRead(source.primitiveTypeInfos(), useExternalBuffer); serializeWrite = new LazyBinarySerializeWrite(fieldCount); break; case LAZY_SIMPLE: { StructObjectInspector rowObjectInspector = source.rowStructObjectInspector(); - LazySerDeParameters lazySerDeParams = getSerDeParams(rowObjectInspector); + Configuration conf = new Configuration(); + Properties tbl = new Properties(); + tbl.setProperty(serdeConstants.FIELD_DELIM, "\t"); + tbl.setProperty(serdeConstants.LINE_DELIM, "\n"); byte separator = (byte) '\t'; - deserializeRead = new LazySimpleDeserializeRead(source.primitiveTypeInfos(), + boolean useLazySimpleEscapes = alternate1; + if (useLazySimpleEscapes) { + tbl.setProperty(serdeConstants.QUOTE_CHAR, "'"); + String escapeString = "\\"; + tbl.setProperty(serdeConstants.ESCAPE_CHAR, escapeString); + } + + LazySerDeParameters lazySerDeParams = getSerDeParams(conf, tbl, rowObjectInspector); + + if (useLazySimpleEscapes) { + // LazySimple seems to throw away everything but \n and \r. + boolean[] needsEscape = lazySerDeParams.getNeedsEscape(); + StringBuilder sb = new StringBuilder(); + if (needsEscape['\n']) { + sb.append('\n'); + } + if (needsEscape['\r']) { + sb.append('\r'); + } + // for (int i = 0; i < needsEscape.length; i++) { + // if (needsEscape[i]) { + // sb.append((char) i); + // } + // } + String needsEscapeStr = sb.toString(); + if (needsEscapeStr.length() > 0) { + source.addEscapables(needsEscapeStr); + } + } + deserializeRead = new LazySimpleDeserializeRead(source.primitiveTypeInfos(), useExternalBuffer, separator, lazySerDeParams); serializeWrite = new LazySimpleSerializeWrite(fieldCount, separator, lazySerDeParams); @@ -597,47 +669,133 @@ void testVectorDeserializeRow(int caseNum, Random r, SerializationType serializa Output output = serializeRow(row, source, serializeWrite); vectorDeserializeRow.setBytes(output.getData(), 0, output.getLength()); - vectorDeserializeRow.deserialize(batch, batch.size); + try { + vectorDeserializeRow.deserialize(batch, batch.size); + } catch (Exception e) { + throw new HiveException( + "\nDeserializeRead details: " + + vectorDeserializeRow.getDetailedReadPositionString(), + e); + } batch.size++; if (batch.size == batch.DEFAULT_SIZE) { - examineBatch(batch, vectorExtractRow, randomRows, firstRandomRowIndex); + examineBatch(batch, vectorExtractRow, primitiveTypeInfos, randomRows, firstRandomRowIndex); firstRandomRowIndex = i + 1; batch.reset(); } } if (batch.size > 0) { - examineBatch(batch, vectorExtractRow, randomRows, firstRandomRowIndex); + examineBatch(batch, vectorExtractRow, primitiveTypeInfos, randomRows, firstRandomRowIndex); } } public void testVectorSerDeRow() throws Throwable { - try { Random r = new Random(5678); - for (int c = 0; c < 10; c++) { - testVectorSerializeRow(c, r, SerializationType.BINARY_SORTABLE); - } - for (int c = 0; c < 10; c++) { - testVectorSerializeRow(c, r, SerializationType.LAZY_BINARY); - } - for (int c = 0; c < 10; c++) { - testVectorSerializeRow(c, r, SerializationType.LAZY_SIMPLE); - } - for (int c = 0; c < 10; c++) { - testVectorDeserializeRow(c, r, SerializationType.BINARY_SORTABLE); - } - for (int c = 0; c < 10; c++) { - testVectorDeserializeRow(c, r, SerializationType.LAZY_BINARY); - } - for (int c = 0; c < 10; c++) { - testVectorDeserializeRow(c, r, SerializationType.LAZY_SIMPLE); - } - - - } catch (Throwable e) { - e.printStackTrace(); - throw e; - } + int c = 0; + + /* + * SERIALIZE tests. + */ + testVectorSerializeRow(c++, r, SerializationType.BINARY_SORTABLE); + + testVectorSerializeRow(c++, r, SerializationType.LAZY_BINARY); + + testVectorSerializeRow(c++, r, SerializationType.LAZY_SIMPLE); + + /* + * DESERIALIZE tests. + */ + + // BINARY_SORTABLE + + testVectorDeserializeRow(c++, r, + SerializationType.BINARY_SORTABLE, + /* alternate1 = useColumnSortOrderIsDesc */ false, + /* alternate2 = useBinarySortableCharsNeedingEscape */ false, + /* useExternalBuffer */ false); + + testVectorDeserializeRow(c++, r, + SerializationType.BINARY_SORTABLE, + /* alternate1 = useColumnSortOrderIsDesc */ true, + /* alternate2 = useBinarySortableCharsNeedingEscape */ false, + /* useExternalBuffer */ false); + + testVectorDeserializeRow(c++, r, + SerializationType.BINARY_SORTABLE, + /* alternate1 = useColumnSortOrderIsDesc */ false, + /* alternate2 = useBinarySortableCharsNeedingEscape */ false, + /* useExternalBuffer */ true); + + testVectorDeserializeRow(c++, r, + SerializationType.BINARY_SORTABLE, + /* alternate1 = useColumnSortOrderIsDesc */ true, + /* alternate2 = useBinarySortableCharsNeedingEscape */ false, + /* useExternalBuffer */ true); + + testVectorDeserializeRow(c++, r, + SerializationType.BINARY_SORTABLE, + /* alternate1 = useColumnSortOrderIsDesc */ false, + /* alternate2 = useBinarySortableCharsNeedingEscape */ true, + /* useExternalBuffer */ false); + + testVectorDeserializeRow(c++, r, + SerializationType.BINARY_SORTABLE, + /* alternate1 = useColumnSortOrderIsDesc */ true, + /* alternate2 = useBinarySortableCharsNeedingEscape */ true, + /* useExternalBuffer */ false); + + testVectorDeserializeRow(c++, r, + SerializationType.BINARY_SORTABLE, + /* alternate1 = useColumnSortOrderIsDesc */ false, + /* alternate2 = useBinarySortableCharsNeedingEscape */ true, + /* useExternalBuffer */ true); + + testVectorDeserializeRow(c++, r, + SerializationType.BINARY_SORTABLE, + /* alternate1 = useColumnSortOrderIsDesc */ true, + /* alternate2 = useBinarySortableCharsNeedingEscape */ true, + /* useExternalBuffer */ true); + + // LAZY_BINARY + + testVectorDeserializeRow(c++, r, + SerializationType.LAZY_BINARY, + /* alternate1 = unused */ false, + /* alternate2 = unused */ false, + /* useExternalBuffer */ false); + + testVectorDeserializeRow(c++, r, + SerializationType.LAZY_BINARY, + /* alternate1 = unused */ false, + /* alternate2 = unused */ false, + /* useExternalBuffer */ true); + + // LAZY_SIMPLE + + testVectorDeserializeRow(c++, r, + SerializationType.LAZY_SIMPLE, + /* alternate1 = useLazySimpleEscapes */ false, + /* alternate2 = unused */ false, + /* useExternalBuffer */ false); + + testVectorDeserializeRow(c++, r, + SerializationType.LAZY_SIMPLE, + /* alternate1 = useLazySimpleEscapes */ false, + /* alternate2 = unused */ false, + /* useExternalBuffer */ true); + + testVectorDeserializeRow(c++, r, + SerializationType.LAZY_SIMPLE, + /* alternate1 = useLazySimpleEscapes */ true, + /* alternate2 = unused */ false, + /* useExternalBuffer */ false); + + testVectorDeserializeRow(c++, r, + SerializationType.LAZY_SIMPLE, + /* alternate1 = useLazySimpleEscapes */ true, + /* alternate2 = unused */ false, + /* useExternalBuffer */ true); } } \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/VectorRandomRowSource.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/VectorRandomRowSource.java index 349c76a..57bf60d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/VectorRandomRowSource.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/VectorRandomRowSource.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.common.type.RandomTypeUtil; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; @@ -61,6 +62,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hive.common.util.DateUtils; +import com.google.common.base.Charsets; + /** * Generate object inspector and random row object[]. */ @@ -80,6 +83,11 @@ private StructObjectInspector rowStructObjectInspector; + private String[] alphabets; + + private boolean addEscapables; + private String needsEscapeStr; + public List typeNames() { return typeNames; } @@ -201,6 +209,35 @@ private void chooseSchema() { typeNames.add(typeName); } rowStructObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, primitiveObjectInspectorList); + alphabets = new String[columnCount]; + } + + public void addBinarySortableAlphabets() { + for (int c = 0; c < columnCount; c++) { + switch (primitiveCategories[c]) { + case STRING: + case CHAR: + case VARCHAR: + byte[] bytes = new byte[10 + r.nextInt(10)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = (byte) (32 + r.nextInt(96)); + } + int alwaysIndex = r.nextInt(bytes.length); + bytes[alwaysIndex] = 0; // Must be escaped by BinarySortable. + int alwaysIndex2 = r.nextInt(bytes.length); + bytes[alwaysIndex2] = 1; // Must be escaped by BinarySortable. + alphabets[c] = new String(bytes, Charsets.UTF_8); + break; + default: + // No alphabet needed. + break; + } + } + } + + public void addEscapables(String needsEscapeStr) { + addEscapables = true; + this.needsEscapeStr = needsEscapeStr; } public Object[][] randomRows(int n) { @@ -327,65 +364,107 @@ public static Object getWritableObject(int column, Object object, } public Object randomObject(int column) { - return randomObject(column, r, primitiveCategories, primitiveTypeInfos); + return randomObject(column, r, primitiveCategories, primitiveTypeInfos, alphabets, addEscapables, needsEscapeStr); } public static Object randomObject(int column, Random r, PrimitiveCategory[] primitiveCategories, PrimitiveTypeInfo[] primitiveTypeInfos) { + return randomObject(column, r, primitiveCategories, primitiveTypeInfos, null, false, ""); + } + + public static Object randomObject(int column, Random r, PrimitiveCategory[] primitiveCategories, + PrimitiveTypeInfo[] primitiveTypeInfos, String[] alphabets, boolean addEscapables, String needsEscapeStr) { PrimitiveCategory primitiveCategory = primitiveCategories[column]; PrimitiveTypeInfo primitiveTypeInfo = primitiveTypeInfos[column]; - switch (primitiveCategory) { - case BOOLEAN: - return Boolean.valueOf(r.nextInt(1) == 1); - case BYTE: - return Byte.valueOf((byte) r.nextInt()); - case SHORT: - return Short.valueOf((short) r.nextInt()); - case INT: - return Integer.valueOf(r.nextInt()); - case LONG: - return Long.valueOf(r.nextLong()); - case DATE: - return RandomTypeUtil.getRandDate(r); - case FLOAT: - return Float.valueOf(r.nextFloat() * 10 - 5); - case DOUBLE: - return Double.valueOf(r.nextDouble() * 10 - 5); - case STRING: - return RandomTypeUtil.getRandString(r); - case CHAR: - return getRandHiveChar(r, (CharTypeInfo) primitiveTypeInfo); - case VARCHAR: - return getRandHiveVarchar(r, (VarcharTypeInfo) primitiveTypeInfo); - case BINARY: - return getRandBinary(r, 1 + r.nextInt(100)); - case TIMESTAMP: - return RandomTypeUtil.getRandTimestamp(r); - case INTERVAL_YEAR_MONTH: - return getRandIntervalYearMonth(r); - case INTERVAL_DAY_TIME: - return getRandIntervalDayTime(r); - case DECIMAL: - return getRandHiveDecimal(r, (DecimalTypeInfo) primitiveTypeInfo); - default: - throw new Error("Unknown primitive category " + primitiveCategory); + try { + switch (primitiveCategory) { + case BOOLEAN: + return Boolean.valueOf(r.nextInt(1) == 1); + case BYTE: + return Byte.valueOf((byte) r.nextInt()); + case SHORT: + return Short.valueOf((short) r.nextInt()); + case INT: + return Integer.valueOf(r.nextInt()); + case LONG: + return Long.valueOf(r.nextLong()); + case DATE: + return RandomTypeUtil.getRandDate(r); + case FLOAT: + return Float.valueOf(r.nextFloat() * 10 - 5); + case DOUBLE: + return Double.valueOf(r.nextDouble() * 10 - 5); + case STRING: + case CHAR: + case VARCHAR: + { + String result; + if (alphabets != null && alphabets[column] != null) { + result = RandomTypeUtil.getRandString(r, alphabets[column], r.nextInt(10)); + } else { + result = RandomTypeUtil.getRandString(r); + } + if (addEscapables && result.length() > 0) { + int escapeCount = 1 + r.nextInt(2); + for (int i = 0; i < escapeCount; i++) { + int index = r.nextInt(result.length()); + String begin = result.substring(0, index); + String end = result.substring(index); + Character needsEscapeChar = needsEscapeStr.charAt(r.nextInt(needsEscapeStr.length())); + result = begin + needsEscapeChar + end; + } + } + switch (primitiveCategory) { + case STRING: + return result; + case CHAR: + return new HiveChar(result, ((CharTypeInfo) primitiveTypeInfo).getLength()); + case VARCHAR: + return new HiveChar(result, ((VarcharTypeInfo) primitiveTypeInfo).getLength()); + default: + throw new Error("Unknown primitive category " + primitiveCategory); + } + } + case BINARY: + return getRandBinary(r, 1 + r.nextInt(100)); + case TIMESTAMP: + return RandomTypeUtil.getRandTimestamp(r); + case INTERVAL_YEAR_MONTH: + return getRandIntervalYearMonth(r); + case INTERVAL_DAY_TIME: + return getRandIntervalDayTime(r); + case DECIMAL: + return getRandHiveDecimal(r, (DecimalTypeInfo) primitiveTypeInfo); + default: + throw new Error("Unknown primitive category " + primitiveCategory); + } + } catch (Exception e) { + throw new RuntimeException("randomObject failed on column " + column + " type " + primitiveCategory, e); } } - public static HiveChar getRandHiveChar(Random r, CharTypeInfo charTypeInfo) { + public static HiveChar getRandHiveChar(Random r, CharTypeInfo charTypeInfo, String alphabet) { int maxLength = 1 + r.nextInt(charTypeInfo.getLength()); - String randomString = RandomTypeUtil.getRandString(r, "abcdefghijklmnopqrstuvwxyz", 100); + String randomString = RandomTypeUtil.getRandString(r, alphabet, 100); HiveChar hiveChar = new HiveChar(randomString, maxLength); return hiveChar; } - public static HiveVarchar getRandHiveVarchar(Random r, VarcharTypeInfo varcharTypeInfo) { + public static HiveChar getRandHiveChar(Random r, CharTypeInfo charTypeInfo) { + return getRandHiveChar(r, charTypeInfo, "abcdefghijklmnopqrstuvwxyz"); + } + + public static HiveVarchar getRandHiveVarchar(Random r, VarcharTypeInfo varcharTypeInfo, String alphabet) { int maxLength = 1 + r.nextInt(varcharTypeInfo.getLength()); - String randomString = RandomTypeUtil.getRandString(r, "abcdefghijklmnopqrstuvwxyz", 100); + String randomString = RandomTypeUtil.getRandString(r, alphabet, 100); HiveVarchar hiveVarchar = new HiveVarchar(randomString, maxLength); return hiveVarchar; } + public static HiveVarchar getRandHiveVarchar(Random r, VarcharTypeInfo varcharTypeInfo) { + return getRandHiveVarchar(r, varcharTypeInfo, "abcdefghijklmnopqrstuvwxyz"); + } + public static byte[] getRandBinary(Random r, int len){ byte[] bytes = new byte[len]; for (int j = 0; j < len; j++){ diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java index 0bcfb56..7f68186 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java @@ -69,7 +69,9 @@ public static void verifyHashMapRows(List rows, int[] actualToValueMap int length = ref.getLength(); LazyBinaryDeserializeRead lazyBinaryDeserializeRead = - new LazyBinaryDeserializeRead(typeInfos); + new LazyBinaryDeserializeRead( + typeInfos, + /* useExternalBuffer */ false); lazyBinaryDeserializeRead.set(bytes, offset, length); @@ -127,7 +129,9 @@ public static void verifyHashMapRowsMore(List rows, int[] actualToValu } LazyBinaryDeserializeRead lazyBinaryDeserializeRead = - new LazyBinaryDeserializeRead(typeInfos); + new LazyBinaryDeserializeRead( + typeInfos, + /* useExternalBuffer */ false); lazyBinaryDeserializeRead.set(bytes, offset, length); diff --git serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java index 003a2d4..0cbc8d0 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java +++ serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.io.Text; /* * Directly deserialize with the caller reading field-by-field the LazyBinary serialization format. @@ -64,8 +63,12 @@ private int end; private int fieldStart; + private int bytesStart; + + private int internalBufferLen; + private byte[] internalBuffer; + private byte[] tempTimestampBytes; - private Text tempText; private byte[] tempDecimalBuffer; @@ -77,13 +80,14 @@ /* * Use this constructor when only ascending sort order is used. */ - public BinarySortableDeserializeRead(PrimitiveTypeInfo[] primitiveTypeInfos) { - this(primitiveTypeInfos, null); + public BinarySortableDeserializeRead(PrimitiveTypeInfo[] primitiveTypeInfos, + boolean useExternalBuffer) { + this(primitiveTypeInfos, useExternalBuffer, null); } - public BinarySortableDeserializeRead(TypeInfo[] typeInfos, + public BinarySortableDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer, boolean[] columnSortOrderIsDesc) { - super(typeInfos); + super(typeInfos, useExternalBuffer); fieldCount = typeInfos.length; if (columnSortOrderIsDesc != null) { this.columnSortOrderIsDesc = columnSortOrderIsDesc; @@ -94,6 +98,7 @@ public BinarySortableDeserializeRead(TypeInfo[] typeInfos, inputByteBuffer = new InputByteBuffer(); readBeyondConfiguredFieldsWarned = false; bufferRangeHasExtraDataWarned = false; + internalBufferLen = -1; } // Not public since we must have column information. @@ -139,6 +144,8 @@ public String getDetailedReadPositionString() { sb.append(" current read offset "); sb.append(inputByteBuffer.tell()); } + sb.append(" column sort order "); + sb.append(Arrays.toString(columnSortOrderIsDesc)); return sb.toString(); } @@ -276,14 +283,55 @@ public boolean readCheckNull() throws IOException { case CHAR: case VARCHAR: { - if (tempText == null) { - tempText = new Text(); + /* + * This code is a modified version of BinarySortableSerDe.deserializeText that lets us + * detect if we can return a reference to the bytes directly. + */ + + // Get the actual length first + bytesStart = inputByteBuffer.tell(); + final boolean invert = columnSortOrderIsDesc[fieldIndex]; + int length = 0; + do { + byte b = inputByteBuffer.read(invert); + if (b == 0) { + // end of string + break; + } + if (b == 1) { + // the last char is an escape char. read the actual char + inputByteBuffer.read(invert); + } + length++; + } while (true); + + if (length == 0 || + (!invert && length == inputByteBuffer.tell() - bytesStart - 1)) { + // No inversion or escaping happened, so we are can reference directly. + currentExternalBufferNeeded = false; + currentBytes = inputByteBuffer.getData(); + currentBytesStart = bytesStart; + currentBytesLength = length; + } else { + // We are now positioned at the end of this field's bytes. + if (useExternalBuffer) { + // If we decided not to reposition and re-read the buffer to copy it with + // copyToExternalBuffer, we we will still be correctly positioned for the next field. + currentExternalBufferNeeded = true; + currentExternalBufferNeededLen = length; + } else { + // The copyToBuffer will reposition and re-read the input buffer. + currentExternalBufferNeeded = false; + if (internalBufferLen < length) { + internalBufferLen = length; + internalBuffer = new byte[internalBufferLen]; + } + copyToBuffer(internalBuffer, 0, length); + currentBytes = internalBuffer; + currentBytesStart = 0; + currentBytesLength = length; + } } - BinarySortableSerDe.deserializeText( - inputByteBuffer, columnSortOrderIsDesc[fieldIndex], tempText); - currentBytes = tempText.getBytes(); - currentBytesStart = 0; - currentBytesLength = tempText.getLength(); } break; case INTERVAL_YEAR_MONTH: @@ -317,7 +365,9 @@ public boolean readCheckNull() throws IOException { final boolean invert = columnSortOrderIsDesc[fieldIndex]; int b = inputByteBuffer.read(invert) - 1; - assert (b == 1 || b == -1 || b == 0); + if (!(b == 1 || b == -1 || b == 0)) { + throw new IOException("Unexpected byte value " + (int)b + " in binary sortable format data (invert " + invert + ")"); + } boolean positive = b != -1; int factor = inputByteBuffer.read(invert) ^ 0x80; @@ -334,7 +384,10 @@ public boolean readCheckNull() throws IOException { do { b = inputByteBuffer.read(positive ? invert : !invert); - assert(b != 1); + if (b == 1) { + throw new IOException("Expected -1 and found byte value " + (int)b + " in binary sortable format data (invert " + invert + ")"); + } + if (b == 0) { // end of digits @@ -396,6 +449,32 @@ public boolean readCheckNull() throws IOException { return isNull; } + @Override + public void copyToExternalBuffer(byte[] externalBuffer, int externalBufferStart) throws IOException { + copyToBuffer(externalBuffer, externalBufferStart, currentExternalBufferNeededLen); + } + + private void copyToBuffer(byte[] buffer, int bufferStart, int bufferLength) throws IOException { + final boolean invert = columnSortOrderIsDesc[fieldIndex]; + inputByteBuffer.seek(bytesStart); + // 3. Copy the data. + for (int i = 0; i < bufferLength; i++) { + byte b = inputByteBuffer.read(invert); + if (b == 1) { + // The last char is an escape char, read the actual char. + // The serialization format escape \0 to \1, and \1 to \2, + // to make sure the string is null-terminated. + b = (byte) (inputByteBuffer.read(invert) - 1); + } + buffer[bufferStart + i] = b; + } + // 4. Read the null terminator. + byte b = inputByteBuffer.read(invert); + if (b != 0) { + throw new RuntimeException("Expected 0 terminating byte"); + } + } + /* * Call this method after all fields have been read to check for extra fields. */ diff --git serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java index 8f3e771..1600fec 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java +++ serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java @@ -47,12 +47,34 @@ protected TypeInfo[] typeInfos; + protected boolean useExternalBuffer; + protected boolean[] columnsToInclude; protected Category[] categories; protected PrimitiveCategory[] primitiveCategories; - public DeserializeRead(TypeInfo[] typeInfos) { + /** + * Constructor. + * + * When useExternalBuffer is specified true and readCheckNull reads a string/char/varchar/binary + * field, it will request an external buffer to receive the data of format conversion. + * + * if (!deserializeRead.readCheckNull()) { + * if (deserializeRead.currentExternalBufferNeeded) { + * + * deserializeRead.copyToExternalBuffer(externalBuffer, externalBufferStart); + * } else { + * + * } + * + * @param typeInfos + * @param useExternalBuffer Specify true when the caller is prepared to provide a bytes buffer + * to receive a string/char/varchar/binary field that needs format + * conversion. + */ + public DeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer) { this.typeInfos = typeInfos; final int count = typeInfos.length; categories = new Category[count]; @@ -96,6 +118,8 @@ public DeserializeRead(TypeInfo[] typeInfos) { // No writable needed for this data type. } } + + this.useExternalBuffer = useExternalBuffer; } columnsToInclude = null; @@ -194,7 +218,19 @@ public void setColumnsToInclude(boolean[] columnsToInclude) { * * For CHAR and VARCHAR when the caller takes responsibility for * truncation/padding issues. + * + * When currentExternalBufferNeeded is true, conversion is needed into an external buffer of + * at least currentExternalBufferNeededLen bytes. Use copyToExternalBuffer to get the result. + * + * Otherwise, currentBytes, currentBytesStart, and currentBytesLength are the result. */ + public boolean currentExternalBufferNeeded; + public int currentExternalBufferNeededLen; + + public void copyToExternalBuffer(byte[] externalBuffer, int externalBufferStart) throws IOException { + throw new RuntimeException("Not implemented"); + } + public byte[] currentBytes; public int currentBytesStart; public int currentBytesLength; diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java index ac44390..07709d8 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.serde2.lazy.fast; +import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.charset.CharacterCodingException; import java.sql.Date; @@ -78,15 +79,17 @@ private int fieldStart; private int fieldLength; - private Text tempText; + private int internalBufferLen; + private byte[] internalBuffer; + private TimestampParser timestampParser; private boolean extraFieldWarned; private boolean missingFieldWarned; - public LazySimpleDeserializeRead(TypeInfo[] typeInfos, + public LazySimpleDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer, byte separator, LazySerDeParameters lazyParams) { - super(typeInfos); + super(typeInfos, useExternalBuffer); // Field length is difference between positions hence one extra. startPosition = new int[typeInfos.length + 1]; @@ -100,13 +103,14 @@ public LazySimpleDeserializeRead(TypeInfo[] typeInfos, lastColumnTakesRest = lazyParams.isLastColumnTakesRest(); fieldCount = typeInfos.length; - tempText = new Text(); extraFieldWarned = false; missingFieldWarned = false; + internalBufferLen = -1; } - public LazySimpleDeserializeRead(TypeInfo[] typeInfos, LazySerDeParameters lazyParams) { - this(typeInfos, lazyParams.getSeparators()[0], lazyParams); + public LazySimpleDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer, + LazySerDeParameters lazyParams) { + this(typeInfos, useExternalBuffer, lazyParams.getSeparators()[0], lazyParams); } // Not public since we must have the field count so every 8 fields NULL bytes can be navigated. @@ -395,16 +399,48 @@ public boolean readCheckNull() { case STRING: case CHAR: case VARCHAR: - if (isEscaped) { - LazyUtils.copyAndEscapeStringDataToText(bytes, fieldStart, fieldLength, escapeChar, tempText); - currentBytes = tempText.getBytes(); - currentBytesStart = 0; - currentBytesLength = tempText.getLength(); - } else { - // if the data is not escaped, simply copy the data. - currentBytes = bytes; - currentBytesStart = fieldStart; - currentBytesLength = fieldLength; + { + if (isEscaped) { + // First calculate the length of the output string + int outputLength = 0; + for (int i = 0; i < fieldLength; i++) { + if (bytes[fieldStart + i] != escapeChar) { + outputLength++; + } else { + outputLength++; + i++; + } + } + if (outputLength == fieldLength) { + // No escaping. + currentExternalBufferNeeded = false; + currentBytes = bytes; + currentBytesStart = fieldStart; + currentBytesLength = outputLength; + } else { + if (useExternalBuffer) { + currentExternalBufferNeeded = true; + currentExternalBufferNeededLen = outputLength; + } else { + // The copyToBuffer will reposition and re-read the input buffer. + currentExternalBufferNeeded = false; + if (internalBufferLen < outputLength) { + internalBufferLen = outputLength; + internalBuffer = new byte[internalBufferLen]; + } + copyToBuffer(internalBuffer, 0, outputLength); + currentBytes = internalBuffer; + currentBytesStart = 0; + currentBytesLength = outputLength; + } + } + } else { + // If the data is not escaped, reference the data directly. + currentExternalBufferNeeded = false; + currentBytes = bytes; + currentBytesStart = fieldStart; + currentBytesLength = fieldLength; + } } break; case BINARY: @@ -528,6 +564,32 @@ public boolean readCheckNull() { return false; } + @Override + public void copyToExternalBuffer(byte[] externalBuffer, int externalBufferStart) { + copyToBuffer(externalBuffer, externalBufferStart, currentExternalBufferNeededLen); + } + + private void copyToBuffer(byte[] buffer, int bufferStart, int bufferLength) { + int k = 0; + for (int i = 0; i < bufferLength; i++) { + byte b = bytes[fieldStart + i]; + if (b == escapeChar && i < bufferLength - 1) { + ++i; + // Check if it's '\r' or '\n' + if (bytes[fieldStart + i] == 'r') { + buffer[bufferStart + k++] = '\r'; + } else if (bytes[fieldStart + i] == 'n') { + buffer[bufferStart + k++] = '\n'; + } else { + // get the next byte + buffer[bufferStart + k++] = bytes[fieldStart + i]; + } + } else { + buffer[bufferStart + k++] = b; + } + } + } + public void logExceptionMessage(byte[] bytes, int bytesStart, int bytesLength, String dataType) { try { if(LOG.isDebugEnabled()) { diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java index 0df1d79..472ace7 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java @@ -68,11 +68,12 @@ private boolean readBeyondConfiguredFieldsWarned; private boolean bufferRangeHasExtraDataWarned; - public LazyBinaryDeserializeRead(TypeInfo[] typeInfos) { - super(typeInfos); + public LazyBinaryDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer) { + super(typeInfos, useExternalBuffer); fieldCount = typeInfos.length; tempVInt = new VInt(); tempVLong = new VLong(); + currentExternalBufferNeeded = false; readBeyondConfiguredFieldsWarned = false; bufferRangeHasExtraDataWarned = false; } diff --git serde/src/test/org/apache/hadoop/hive/serde2/binarysortable/TestBinarySortableFast.java serde/src/test/org/apache/hadoop/hive/serde2/binarysortable/TestBinarySortableFast.java index 7babf7a..49ee9c6 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/binarysortable/TestBinarySortableFast.java +++ serde/src/test/org/apache/hadoop/hive/serde2/binarysortable/TestBinarySortableFast.java @@ -115,7 +115,10 @@ private void testBinarySortableFast( for (int i = 0; i < rowCount; i++) { Object[] row = rows[i]; BinarySortableDeserializeRead binarySortableDeserializeRead = - new BinarySortableDeserializeRead(primitiveTypeInfos, columnSortOrderIsDesc); + new BinarySortableDeserializeRead( + primitiveTypeInfos, + /* useExternalBuffer */ false, + columnSortOrderIsDesc); if (useIncludeColumns) { binarySortableDeserializeRead.setColumnsToInclude(columnsToInclude); @@ -143,7 +146,10 @@ private void testBinarySortableFast( * Clip off one byte and expect to get an EOFException on the write field. */ BinarySortableDeserializeRead binarySortableDeserializeRead2 = - new BinarySortableDeserializeRead(primitiveTypeInfos, columnSortOrderIsDesc); + new BinarySortableDeserializeRead( + primitiveTypeInfos, + /* useExternalBuffer */ false, + columnSortOrderIsDesc); if (useIncludeColumns) { binarySortableDeserializeRead2.setColumnsToInclude(columnsToInclude); @@ -247,7 +253,10 @@ private void testBinarySortableFast( for (int i = 0; i < rowCount; i++) { Object[] row = rows[i]; BinarySortableDeserializeRead binarySortableDeserializeRead = - new BinarySortableDeserializeRead(primitiveTypeInfos, columnSortOrderIsDesc); + new BinarySortableDeserializeRead( + primitiveTypeInfos, + /* useExternalBuffer */ false, + columnSortOrderIsDesc); if (useIncludeColumns) { binarySortableDeserializeRead.setColumnsToInclude(columnsToInclude); diff --git serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazySimpleFast.java serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazySimpleFast.java index 66c6203..8285c06 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazySimpleFast.java +++ serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazySimpleFast.java @@ -96,7 +96,9 @@ private void testLazySimpleFast( for (int i = 0; i < rowCount; i++) { Object[] row = rows[i]; LazySimpleDeserializeRead lazySimpleDeserializeRead = - new LazySimpleDeserializeRead(writePrimitiveTypeInfos, + new LazySimpleDeserializeRead( + writePrimitiveTypeInfos, + /* useExternalBuffer */ false, separator, serdeParams); if (useIncludeColumns) { @@ -186,7 +188,9 @@ private void testLazySimpleFast( Object[] row = rows[i]; LazySimpleDeserializeRead lazySimpleDeserializeRead = - new LazySimpleDeserializeRead(writePrimitiveTypeInfos, + new LazySimpleDeserializeRead( + writePrimitiveTypeInfos, + /* useExternalBuffer */ false, separator, serdeParams); if (useIncludeColumns) { diff --git serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinaryFast.java serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinaryFast.java index 5af11cd..e64d67d 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinaryFast.java +++ serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinaryFast.java @@ -91,7 +91,9 @@ private void testLazyBinaryFast( // Specifying the right type info length tells LazyBinaryDeserializeRead which is the last // column. LazyBinaryDeserializeRead lazyBinaryDeserializeRead = - new LazyBinaryDeserializeRead(writePrimitiveTypeInfos); + new LazyBinaryDeserializeRead( + writePrimitiveTypeInfos, + /* useExternalBuffer */ false); if (useIncludeColumns) { lazyBinaryDeserializeRead.setColumnsToInclude(columnsToInclude); @@ -191,7 +193,9 @@ private void testLazyBinaryFast( // When doWriteFewerColumns, try to read more fields than exist in buffer. LazyBinaryDeserializeRead lazyBinaryDeserializeRead = - new LazyBinaryDeserializeRead(primitiveTypeInfos); + new LazyBinaryDeserializeRead( + primitiveTypeInfos, + /* useExternalBuffer */ false); if (useIncludeColumns) { lazyBinaryDeserializeRead.setColumnsToInclude(columnsToInclude); diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java index a6d932c..3caa584 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java @@ -170,6 +170,37 @@ public void setVal(int elementNum, byte[] sourceBuf) { } /** + * Preallocate space in the local buffer so the caller can fill in the value bytes themselves. + * + * Always use with getValPreallocatedBytes, getValPreallocatedStart, and setValPreallocated. + */ + public void ensureValPreallocated(int length) { + if ((nextFree + length) > buffer.length) { + increaseBufferSpace(length); + } + } + + public byte[] getValPreallocatedBytes() { + return buffer; + } + + public int getValPreallocatedStart() { + return nextFree; + } + + /** + * Set the length of the preallocated values bytes used. + * @param elementNum + * @param length + */ + public void setValPreallocated(int elementNum, int length) { + vector[elementNum] = buffer; + this.start[elementNum] = nextFree; + this.length[elementNum] = length; + nextFree += length; + } + + /** * Set a field to the concatenation of two string values. Result data is copied * into the internal buffer. *