diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index f4c3b81..7e41b7a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -185,6 +185,7 @@ void init(JobConf jconf, Operator reducer, boolean vectorized, TableDesc keyT new BinarySortableDeserializeRead( VectorizedBatchUtil.typeInfosFromStructObjectInspector( keyStructInspector), + /* useExternalBuffer */ true, binarySortableSerDe.getSortOrders())); keyBinarySortableDeserializeToRow.init(0); @@ -194,7 +195,8 @@ void init(JobConf jconf, Operator reducer, boolean vectorized, TableDesc keyT new VectorDeserializeRow( new LazyBinaryDeserializeRead( VectorizedBatchUtil.typeInfosFromStructObjectInspector( - valueStructInspectors))); + valueStructInspectors), + /* useExternalBuffer */ true)); valueLazyBinaryDeserializeToRow.init(firstValueColumnOffset); // Create data buffers for value bytes column vectors. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java index f66916b..8f413be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java @@ -74,6 +74,12 @@ private TypeInfo[] sourceTypeInfos; + private byte[] inputBytes; + + /** + * @param deserializeRead Set useExternalBuffer to true to avoid buffer copying and to get + * more efficient reading. + */ public VectorDeserializeRow(T deserializeRead) { this(); this.deserializeRead = deserializeRead; @@ -338,10 +344,11 @@ public void init() throws HiveException { * @param batch * @param batchIndex * @param logicalColumnIndex + * @param canRetainByteRef * @throws IOException */ private void deserializeRowColumn(VectorizedRowBatch batch, int batchIndex, - int logicalColumnIndex) throws IOException { + int logicalColumnIndex, boolean canRetainByteRef) throws IOException { Category sourceCategory = sourceCategories[logicalColumnIndex]; if (sourceCategory == null) { /* @@ -406,42 +413,114 @@ private void deserializeRowColumn(VectorizedRowBatch batch, int batchIndex, break; case BINARY: case STRING: - ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( - batchIndex, - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - deserializeRead.currentBytesLength); + { + BytesColumnVector bytesColVec = ((BytesColumnVector) batch.cols[projectionColumnNum]); + if (deserializeRead.currentExternalBufferNeeded) { + bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen); + deserializeRead.copyToExternalBuffer( + bytesColVec.getValPreallocatedBytes(), bytesColVec.getValPreallocatedStart()); + bytesColVec.setValPreallocated( + batchIndex, + deserializeRead.currentExternalBufferNeededLen); + } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) { + bytesColVec.setRef( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength); + } else { + bytesColVec.setVal( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength); + } + } break; case VARCHAR: { // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method // that does not use Java String objects. - int adjustedLength = StringExpr.truncate( - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - deserializeRead.currentBytesLength, - maxLengths[logicalColumnIndex]); - ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( - batchIndex, - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - adjustedLength); + BytesColumnVector bytesColVec = ((BytesColumnVector) batch.cols[projectionColumnNum]); + if (deserializeRead.currentExternalBufferNeeded) { + // Write directly into our BytesColumnVector value buffer. + bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen); + byte[] convertBuffer = bytesColVec.getValPreallocatedBytes(); + int convertBufferStart = bytesColVec.getValPreallocatedStart(); + deserializeRead.copyToExternalBuffer( + convertBuffer, + convertBufferStart); + bytesColVec.setValPreallocated( + batchIndex, + StringExpr.truncate( + convertBuffer, + convertBufferStart, + deserializeRead.currentExternalBufferNeededLen, + maxLengths[logicalColumnIndex])); + } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) { + bytesColVec.setRef( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + StringExpr.truncate( + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength, + maxLengths[logicalColumnIndex])); + } else { + bytesColVec.setVal( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + StringExpr.truncate( + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength, + maxLengths[logicalColumnIndex])); + } } break; case CHAR: { // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method // that does not use Java String objects. - int adjustedLength = StringExpr.rightTrimAndTruncate( - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - deserializeRead.currentBytesLength, - maxLengths[logicalColumnIndex]); - ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( - batchIndex, - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - adjustedLength); + BytesColumnVector bytesColVec = ((BytesColumnVector) batch.cols[projectionColumnNum]); + if (deserializeRead.currentExternalBufferNeeded) { + // Write directly into our BytesColumnVector value buffer. + bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen); + byte[] convertBuffer = bytesColVec.getValPreallocatedBytes(); + int convertBufferStart = bytesColVec.getValPreallocatedStart(); + deserializeRead.copyToExternalBuffer( + convertBuffer, + convertBufferStart); + bytesColVec.setValPreallocated( + batchIndex, + StringExpr.rightTrimAndTruncate( + convertBuffer, + convertBufferStart, + deserializeRead.currentExternalBufferNeededLen, + maxLengths[logicalColumnIndex])); + } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) { + bytesColVec.setRef( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + StringExpr.rightTrimAndTruncate( + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength, + maxLengths[logicalColumnIndex])); + } else { + bytesColVec.setVal( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + StringExpr.rightTrimAndTruncate( + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength, + maxLengths[logicalColumnIndex])); + } } break; case DECIMAL: @@ -644,6 +723,7 @@ private void deserializeConvertRowColumn(VectorizedRowBatch batch, int batchInde * @param length */ public void setBytes(byte[] bytes, int offset, int length) { + inputBytes = bytes; deserializeRead.set(bytes, offset, length); } @@ -663,12 +743,45 @@ public void deserialize(VectorizedRowBatch batch, int batchIndex) throws IOExcep if (isConvert[i]) { deserializeConvertRowColumn(batch, batchIndex, i); } else { - deserializeRowColumn(batch, batchIndex, i); + deserializeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ false); } } deserializeRead.extraFieldsCheck(); } + /** + * Deserialize a row from the range of bytes specified by setBytes. + * + * Use this method instead of deserialize when it is safe to retain references to the bytes source + * for DeserializeRead. I.e. the STRING, CHAR/VARCHAR data can be set in BytesColumnVector with + * setRef instead of with setVal which copies data. + * + * An example of a safe usage: + * Referring to bytes in a hash table entry that is immutable. + * + * An example of a unsafe usage: + * Referring to bytes in a reduce receive buffer that will be overwritten with new data. + * + * Use getDetailedReadPositionString to get detailed read position information to help + * diagnose exceptions that are thrown... + * + * @param batch + * @param batchIndex + * @throws IOException + */ + public void deserializeByRef(VectorizedRowBatch batch, int batchIndex) throws IOException { + final int count = isConvert.length; + for (int i = 0; i < count; i++) { + if (isConvert[i]) { + deserializeConvertRowColumn(batch, batchIndex, i); + } else { + deserializeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ true); + } + } + deserializeRead.extraFieldsCheck(); + } + + public String getDetailedReadPositionString() { return deserializeRead.getDetailedReadPositionString(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java index 2bdc59b..c7fa0db 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -261,7 +261,10 @@ public void init(Configuration hconf) LazySimpleSerDe.class.getName()); LazySimpleDeserializeRead lazySimpleDeserializeRead = - new LazySimpleDeserializeRead(dataTypeInfos, simpleSerdeParams); + new LazySimpleDeserializeRead( + dataTypeInfos, + /* useExternalBuffer */ true, + simpleSerdeParams); vectorDeserializeRow = new VectorDeserializeRow(lazySimpleDeserializeRead); @@ -277,7 +280,9 @@ public void init(Configuration hconf) case LAZY_BINARY: { LazyBinaryDeserializeRead lazyBinaryDeserializeRead = - new LazyBinaryDeserializeRead(dataTypeInfos); + new LazyBinaryDeserializeRead( + dataTypeInfos, + /* useExternalBuffer */ true); vectorDeserializeRow = new VectorDeserializeRow(lazyBinaryDeserializeRead); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java index 24668f9..c288731 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java @@ -571,7 +571,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { new VectorDeserializeRow( new LazyBinaryDeserializeRead( VectorizedBatchUtil.typeInfosFromTypeNames( - smallTableMapping.getTypeNames()))); + smallTableMapping.getTypeNames()), + /* useExternalBuffer */ true)); smallTableVectorDeserializeRow.init(smallTableMapping.getOutputColumns()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index 469f86a..21a01e6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -144,7 +144,8 @@ protected void doSmallTableDeserializeRow(VectorizedRowBatch batch, int batchInd smallTableVectorDeserializeRow.setBytes(bytes, offset, length); try { - smallTableVectorDeserializeRow.deserialize(batch, batchIndex); + // Our hash tables are immutable. We can safely do by reference STRING, CHAR/VARCHAR, etc. + smallTableVectorDeserializeRow.deserializeByRef(batch, batchIndex); } catch (Exception e) { throw new HiveException( "\nHashMapResult detail: " + @@ -442,7 +443,9 @@ private void setupSpillSerDe(VectorizedRowBatch batch) throws HiveException { bigTableVectorDeserializeRow = new VectorDeserializeRow( - new LazyBinaryDeserializeRead(bigTableTypeInfos)); + new LazyBinaryDeserializeRead( + bigTableTypeInfos, + /* useExternalBuffer */ true)); bigTableVectorDeserializeRow.init(noNullsProjection); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java index ee66d5b..726a937 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java @@ -267,7 +267,10 @@ public VectorMapJoinFastLongHashTable( this.isOuterJoin = isOuterJoin; this.hashTableKeyType = hashTableKeyType; PrimitiveTypeInfo[] primitiveTypeInfos = { hashTableKeyType.getPrimitiveTypeInfo() }; - keyBinarySortableDeserializeRead = new BinarySortableDeserializeRead(primitiveTypeInfos); + keyBinarySortableDeserializeRead = + new BinarySortableDeserializeRead( + primitiveTypeInfos, + /* useExternalBuffer */ false); allocateBucketArray(); useMinMax = minMaxEnabled; min = Long.MAX_VALUE; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java index bf378ac..456e6ba 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java @@ -67,6 +67,8 @@ public VectorMapJoinFastStringCommon(boolean isOuterJoin) { this.isOuterJoin = isOuterJoin; PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo }; keyBinarySortableDeserializeRead = - new BinarySortableDeserializeRead(primitiveTypeInfos); + new BinarySortableDeserializeRead( + primitiveTypeInfos, + /* useExternalBuffer */ false); } } \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java index 0eabc44..ac85899 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java @@ -161,8 +161,6 @@ public VectorMapJoinOptimizedLongCommon( min = Long.MAX_VALUE; max = Long.MIN_VALUE; this.hashTableKeyType = hashTableKeyType; - // PrimitiveTypeInfo[] primitiveTypeInfos = { hashTableKeyType.getPrimitiveTypeInfo() }; - // keyBinarySortableDeserializeRead = new BinarySortableDeserializeRead(primitiveTypeInfos); keyBinarySortableSerializeWrite = new BinarySortableSerializeWrite(1); output = new Output(); keyBinarySortableSerializeWrite.set(output); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java index c6704f9..7a5eb34 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; import org.apache.hadoop.hive.serde2.fast.DeserializeRead; @@ -305,7 +306,8 @@ void serializeBatch(VectorizedRowBatch batch, VectorSerializeRow vectorSerialize } } - void testVectorSerializeRow(int caseNum, Random r, SerializationType serializationType) throws HiveException, IOException, SerDeException { + void testVectorSerializeRow(int caseNum, Random r, SerializationType serializationType) + throws HiveException, IOException, SerDeException { String[] emptyScratchTypeNames = new String[0]; @@ -324,11 +326,11 @@ void testVectorSerializeRow(int caseNum, Random r, SerializationType serializati SerializeWrite serializeWrite; switch (serializationType) { case BINARY_SORTABLE: - deserializeRead = new BinarySortableDeserializeRead(source.primitiveTypeInfos()); + deserializeRead = new BinarySortableDeserializeRead(source.primitiveTypeInfos(), /* useExternalBuffer */ false); serializeWrite = new BinarySortableSerializeWrite(fieldCount); break; case LAZY_BINARY: - deserializeRead = new LazyBinaryDeserializeRead(source.primitiveTypeInfos()); + deserializeRead = new LazyBinaryDeserializeRead(source.primitiveTypeInfos(), /* useExternalBuffer */ false); serializeWrite = new LazyBinarySerializeWrite(fieldCount); break; case LAZY_SIMPLE: @@ -336,7 +338,7 @@ void testVectorSerializeRow(int caseNum, Random r, SerializationType serializati StructObjectInspector rowObjectInspector = source.rowStructObjectInspector(); LazySerDeParameters lazySerDeParams = getSerDeParams(rowObjectInspector); byte separator = (byte) '\t'; - deserializeRead = new LazySimpleDeserializeRead(source.primitiveTypeInfos(), + deserializeRead = new LazySimpleDeserializeRead(source.primitiveTypeInfos(), /* useExternalBuffer */ false, separator, lazySerDeParams); serializeWrite = new LazySimpleSerializeWrite(fieldCount, separator, lazySerDeParams); @@ -367,7 +369,7 @@ void testVectorSerializeRow(int caseNum, Random r, SerializationType serializati } void examineBatch(VectorizedRowBatch batch, VectorExtractRow vectorExtractRow, - Object[][] randomRows, int firstRandomRowIndex ) { + PrimitiveTypeInfo[] primitiveTypeInfos, Object[][] randomRows, int firstRandomRowIndex ) { int rowSize = vectorExtractRow.getCount(); Object[] row = new Object[rowSize]; @@ -381,13 +383,14 @@ void examineBatch(VectorizedRowBatch batch, VectorExtractRow vectorExtractRow, fail("Unexpected NULL from extractRow"); } if (!row[c].equals(expectedRow[c])) { - fail("Row " + (firstRandomRowIndex + i) + " and column " + c + " mismatch"); + fail("Row " + (firstRandomRowIndex + i) + " and column " + c + " mismatch (" + primitiveTypeInfos[c].getPrimitiveCategory() + " actual value " + row[c] + " and expected value " + expectedRow[c] + ")"); } } } } - private Output serializeRow(Object[] row, VectorRandomRowSource source, SerializeWrite serializeWrite) throws HiveException, IOException { + private Output serializeRow(Object[] row, VectorRandomRowSource source, + SerializeWrite serializeWrite) throws HiveException, IOException { Output output = new Output(); serializeWrite.set(output); PrimitiveTypeInfo[] primitiveTypeInfos = source.primitiveTypeInfos(); @@ -536,7 +539,9 @@ private LazySerDeParameters getSerDeParams(StructObjectInspector rowObjectInspec return new LazySerDeParameters(conf, tbl, LazySimpleSerDe.class.getName()); } - void testVectorDeserializeRow(int caseNum, Random r, SerializationType serializationType) throws HiveException, IOException, SerDeException { + void testVectorDeserializeRow(int caseNum, Random r, SerializationType serializationType, + boolean alternate1, boolean useExternalBuffer) + throws HiveException, IOException, SerDeException { String[] emptyScratchTypeNames = new String[0]; @@ -552,16 +557,44 @@ void testVectorDeserializeRow(int caseNum, Random r, SerializationType serializa Arrays.fill(cv.isNull, true); } + PrimitiveTypeInfo[] primitiveTypeInfos = source.primitiveTypeInfos(); int fieldCount = source.typeNames().size(); DeserializeRead deserializeRead; SerializeWrite serializeWrite; switch (serializationType) { case BINARY_SORTABLE: - deserializeRead = new BinarySortableDeserializeRead(source.primitiveTypeInfos()); - serializeWrite = new BinarySortableSerializeWrite(fieldCount); + boolean useColumnSortOrderIsDesc = alternate1; + if (!useColumnSortOrderIsDesc) { + deserializeRead = new BinarySortableDeserializeRead(source.primitiveTypeInfos(), useExternalBuffer); + serializeWrite = new BinarySortableSerializeWrite(fieldCount); + } else { + boolean[] columnSortOrderIsDesc = new boolean[fieldCount]; + for (int i = 0; i < fieldCount; i++) { + columnSortOrderIsDesc[i] = r.nextBoolean(); + } + deserializeRead = new BinarySortableDeserializeRead(source.primitiveTypeInfos(), useExternalBuffer, + columnSortOrderIsDesc); + + byte[] columnNullMarker = new byte[fieldCount]; + byte[] columnNotNullMarker = new byte[fieldCount]; + for (int i = 0; i < fieldCount; i++) { + if (columnSortOrderIsDesc[i]) { + // Descending + // Null last (default for descending order) + columnNullMarker[i] = BinarySortableSerDe.ZERO; + columnNotNullMarker[i] = BinarySortableSerDe.ONE; + } else { + // Ascending + // Null first (default for ascending order) + columnNullMarker[i] = BinarySortableSerDe.ZERO; + columnNotNullMarker[i] = BinarySortableSerDe.ONE; + } + } + serializeWrite = new BinarySortableSerializeWrite(columnSortOrderIsDesc, columnNullMarker, columnNotNullMarker); + } break; case LAZY_BINARY: - deserializeRead = new LazyBinaryDeserializeRead(source.primitiveTypeInfos()); + deserializeRead = new LazyBinaryDeserializeRead(source.primitiveTypeInfos(), useExternalBuffer); serializeWrite = new LazyBinarySerializeWrite(fieldCount); break; case LAZY_SIMPLE: @@ -569,7 +602,7 @@ void testVectorDeserializeRow(int caseNum, Random r, SerializationType serializa StructObjectInspector rowObjectInspector = source.rowStructObjectInspector(); LazySerDeParameters lazySerDeParams = getSerDeParams(rowObjectInspector); byte separator = (byte) '\t'; - deserializeRead = new LazySimpleDeserializeRead(source.primitiveTypeInfos(), + deserializeRead = new LazySimpleDeserializeRead(source.primitiveTypeInfos(), useExternalBuffer, separator, lazySerDeParams); serializeWrite = new LazySimpleSerializeWrite(fieldCount, separator, lazySerDeParams); @@ -597,23 +630,33 @@ void testVectorDeserializeRow(int caseNum, Random r, SerializationType serializa Output output = serializeRow(row, source, serializeWrite); vectorDeserializeRow.setBytes(output.getData(), 0, output.getLength()); - vectorDeserializeRow.deserialize(batch, batch.size); + try { + vectorDeserializeRow.deserialize(batch, batch.size); + } catch (Exception e) { + throw new HiveException( + "\nDeserializeRead details: " + + vectorDeserializeRow.getDetailedReadPositionString(), + e); + } batch.size++; if (batch.size == batch.DEFAULT_SIZE) { - examineBatch(batch, vectorExtractRow, randomRows, firstRandomRowIndex); + examineBatch(batch, vectorExtractRow, primitiveTypeInfos, randomRows, firstRandomRowIndex); firstRandomRowIndex = i + 1; batch.reset(); } } if (batch.size > 0) { - examineBatch(batch, vectorExtractRow, randomRows, firstRandomRowIndex); + examineBatch(batch, vectorExtractRow, primitiveTypeInfos, randomRows, firstRandomRowIndex); } } public void testVectorSerDeRow() throws Throwable { - try { Random r = new Random(5678); + + /* + * SERIALIZE tests. + */ for (int c = 0; c < 10; c++) { testVectorSerializeRow(c, r, SerializationType.BINARY_SORTABLE); } @@ -624,20 +667,64 @@ public void testVectorSerDeRow() throws Throwable { testVectorSerializeRow(c, r, SerializationType.LAZY_SIMPLE); } - for (int c = 0; c < 10; c++) { - testVectorDeserializeRow(c, r, SerializationType.BINARY_SORTABLE); - } - for (int c = 0; c < 10; c++) { - testVectorDeserializeRow(c, r, SerializationType.LAZY_BINARY); - } - for (int c = 0; c < 10; c++) { - testVectorDeserializeRow(c, r, SerializationType.LAZY_SIMPLE); - } - - - } catch (Throwable e) { - e.printStackTrace(); - throw e; - } + /* + * DESERIALIZE tests. + */ + int c = 0; + + // BINARY_SORTABLE + // UNDONE: 0 and 1 get escaped... + do { + testVectorDeserializeRow(c, r, + SerializationType.BINARY_SORTABLE, + /* alternate1 = useColumnSortOrderIsDesc */ false, + /* useExternalBuffer */ false); + } while (++c < 5); + do { + testVectorDeserializeRow(c, r, + SerializationType.BINARY_SORTABLE, + /* alternate1 = useColumnSortOrderIsDesc */ true, + /* useExternalBuffer */ false); + } while (++c < 10); + do { + testVectorDeserializeRow(c, r, + SerializationType.BINARY_SORTABLE, + /* alternate1 = useColumnSortOrderIsDesc */ false, + /* useExternalBuffer */ true); + } while (++c < 15); + do { + testVectorDeserializeRow(c, r, + SerializationType.BINARY_SORTABLE, + /* alternate1 = useColumnSortOrderIsDesc */ true, + /* useExternalBuffer */ true); + } while (++c < 20); + + // LAZY_BINARY + do { + testVectorDeserializeRow(c, r, + SerializationType.LAZY_BINARY, + /* alternate1 = unused */ false, + /* useExternalBuffer */ false); + } while (++c < 25); + do { + testVectorDeserializeRow(c, r, + SerializationType.LAZY_SIMPLE, + /* alternate1 = unused */ false, + /* useExternalBuffer */ true); + } while (++c < 30); + + // LAZY_SIMPLE + do { + testVectorDeserializeRow(c, r, + SerializationType.LAZY_SIMPLE, + /* alternate1 = unused */ false, + /* useExternalBuffer */ false); + } while (++c < 35); + do { + testVectorDeserializeRow(c, r, + SerializationType.LAZY_SIMPLE, + /* alternate1 = unused */ false, + /* useExternalBuffer */ true); + } while (++c < 40); } } \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java index 0bcfb56..7f68186 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java @@ -69,7 +69,9 @@ public static void verifyHashMapRows(List rows, int[] actualToValueMap int length = ref.getLength(); LazyBinaryDeserializeRead lazyBinaryDeserializeRead = - new LazyBinaryDeserializeRead(typeInfos); + new LazyBinaryDeserializeRead( + typeInfos, + /* useExternalBuffer */ false); lazyBinaryDeserializeRead.set(bytes, offset, length); @@ -127,7 +129,9 @@ public static void verifyHashMapRowsMore(List rows, int[] actualToValu } LazyBinaryDeserializeRead lazyBinaryDeserializeRead = - new LazyBinaryDeserializeRead(typeInfos); + new LazyBinaryDeserializeRead( + typeInfos, + /* useExternalBuffer */ false); lazyBinaryDeserializeRead.set(bytes, offset, length); diff --git serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java index 003a2d4..5348fd6 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java +++ serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.io.Text; /* * Directly deserialize with the caller reading field-by-field the LazyBinary serialization format. @@ -64,8 +63,12 @@ private int end; private int fieldStart; + private int bytesStart; + + private int internalBufferLen; + private byte[] internalBuffer; + private byte[] tempTimestampBytes; - private Text tempText; private byte[] tempDecimalBuffer; @@ -77,13 +80,14 @@ /* * Use this constructor when only ascending sort order is used. */ - public BinarySortableDeserializeRead(PrimitiveTypeInfo[] primitiveTypeInfos) { - this(primitiveTypeInfos, null); + public BinarySortableDeserializeRead(PrimitiveTypeInfo[] primitiveTypeInfos, + boolean useExternalBuffer) { + this(primitiveTypeInfos, useExternalBuffer, null); } - public BinarySortableDeserializeRead(TypeInfo[] typeInfos, + public BinarySortableDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer, boolean[] columnSortOrderIsDesc) { - super(typeInfos); + super(typeInfos, useExternalBuffer); fieldCount = typeInfos.length; if (columnSortOrderIsDesc != null) { this.columnSortOrderIsDesc = columnSortOrderIsDesc; @@ -94,6 +98,7 @@ public BinarySortableDeserializeRead(TypeInfo[] typeInfos, inputByteBuffer = new InputByteBuffer(); readBeyondConfiguredFieldsWarned = false; bufferRangeHasExtraDataWarned = false; + internalBufferLen = -1; } // Not public since we must have column information. @@ -139,6 +144,8 @@ public String getDetailedReadPositionString() { sb.append(" current read offset "); sb.append(inputByteBuffer.tell()); } + sb.append(" column sort order "); + sb.append(Arrays.toString(columnSortOrderIsDesc)); return sb.toString(); } @@ -276,14 +283,54 @@ public boolean readCheckNull() throws IOException { case CHAR: case VARCHAR: { - if (tempText == null) { - tempText = new Text(); + /* + * This code is a modified version of BinarySortableSerDe.deserializeText that lets us + * detect if we can return a reference to the bytes directly. + */ + + // Get the actual length first + bytesStart = inputByteBuffer.tell(); + final boolean invert = columnSortOrderIsDesc[fieldIndex]; + int length = 0; + do { + byte b = inputByteBuffer.read(invert); + if (b == 0) { + // end of string + break; + } + if (b == 1) { + // the last char is an escape char. read the actual char + inputByteBuffer.read(invert); + } + length++; + } while (true); + + if (!invert && length == inputByteBuffer.tell() - bytesStart - 1) { + // No inversion or escaping happened, so we are can reference directly. + currentExternalBufferNeeded = false; + currentBytes = inputByteBuffer.getData(); + currentBytesStart = bytesStart; + currentBytesLength = length; + } else { + // We are now positioned at the end of this field's bytes. + if (useExternalBuffer) { + // If we decided not to reposition and re-read the buffer to copy it with + // copyToExternalBuffer, we we will still be correctly positioned for the next field. + currentExternalBufferNeeded = true; + currentExternalBufferNeededLen = length; + } else { + // The copyToBuffer will reposition and re-read the input buffer. + currentExternalBufferNeeded = false; + if (internalBufferLen < length) { + internalBufferLen = length; + internalBuffer = new byte[internalBufferLen]; + } + copyToBuffer(internalBuffer, 0, length); + currentBytes = internalBuffer; + currentBytesStart = 0; + currentBytesLength = length; + } } - BinarySortableSerDe.deserializeText( - inputByteBuffer, columnSortOrderIsDesc[fieldIndex], tempText); - currentBytes = tempText.getBytes(); - currentBytesStart = 0; - currentBytesLength = tempText.getLength(); } break; case INTERVAL_YEAR_MONTH: @@ -317,7 +364,9 @@ public boolean readCheckNull() throws IOException { final boolean invert = columnSortOrderIsDesc[fieldIndex]; int b = inputByteBuffer.read(invert) - 1; - assert (b == 1 || b == -1 || b == 0); + if (!(b == 1 || b == -1 || b == 0)) { + throw new IOException("Unexpected byte value " + (int)b + " in binary sortable format data (invert " + invert + ")"); + } boolean positive = b != -1; int factor = inputByteBuffer.read(invert) ^ 0x80; @@ -334,7 +383,10 @@ public boolean readCheckNull() throws IOException { do { b = inputByteBuffer.read(positive ? invert : !invert); - assert(b != 1); + if (b == 1) { + throw new IOException("Expected -1 and found byte value " + (int)b + " in binary sortable format data (invert " + invert + ")"); + } + if (b == 0) { // end of digits @@ -396,6 +448,32 @@ public boolean readCheckNull() throws IOException { return isNull; } + @Override + public void copyToExternalBuffer(byte[] externalBuffer, int externalBufferStart) throws IOException { + copyToBuffer(externalBuffer, externalBufferStart, currentExternalBufferNeededLen); + } + + private void copyToBuffer(byte[] buffer, int bufferStart, int bufferLength) throws IOException { + final boolean invert = columnSortOrderIsDesc[fieldIndex]; + inputByteBuffer.seek(bytesStart); + // 3. Copy the data. + for (int i = 0; i < bufferLength; i++) { + byte b = inputByteBuffer.read(invert); + if (b == 1) { + // The last char is an escape char, read the actual char. + // The serialization format escape \0 to \1, and \1 to \2, + // to make sure the string is null-terminated. + b = (byte) (inputByteBuffer.read(invert) - 1); + } + buffer[bufferStart + i] = b; + } + // 4. Read the null terminator. + byte b = inputByteBuffer.read(invert); + if (b != 0) { + throw new RuntimeException("Expected 0 terminating byte"); + } + } + /* * Call this method after all fields have been read to check for extra fields. */ diff --git serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java index 8f3e771..dcfc2c2 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java +++ serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java @@ -47,12 +47,14 @@ protected TypeInfo[] typeInfos; + protected boolean useExternalBuffer; + protected boolean[] columnsToInclude; protected Category[] categories; protected PrimitiveCategory[] primitiveCategories; - public DeserializeRead(TypeInfo[] typeInfos) { + public DeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer) { this.typeInfos = typeInfos; final int count = typeInfos.length; categories = new Category[count]; @@ -96,6 +98,8 @@ public DeserializeRead(TypeInfo[] typeInfos) { // No writable needed for this data type. } } + + this.useExternalBuffer = useExternalBuffer; } columnsToInclude = null; @@ -194,7 +198,19 @@ public void setColumnsToInclude(boolean[] columnsToInclude) { * * For CHAR and VARCHAR when the caller takes responsibility for * truncation/padding issues. + * + * When currentExternalBufferNeeded is true, conversion is needed into an external buffer of + * at least currentExternalBufferNeededLen bytes. Use copyToExternalBuffer to get the result. + * + * Otherwise, currentBytes, currentBytesStart, and currentBytesLength are the result. */ + public boolean currentExternalBufferNeeded; + public int currentExternalBufferNeededLen; + + public void copyToExternalBuffer(byte[] externalBuffer, int externalBufferStart) throws IOException { + throw new RuntimeException("Not implemented"); + } + public byte[] currentBytes; public int currentBytesStart; public int currentBytesLength; diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java index ac44390..07709d8 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.serde2.lazy.fast; +import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.charset.CharacterCodingException; import java.sql.Date; @@ -78,15 +79,17 @@ private int fieldStart; private int fieldLength; - private Text tempText; + private int internalBufferLen; + private byte[] internalBuffer; + private TimestampParser timestampParser; private boolean extraFieldWarned; private boolean missingFieldWarned; - public LazySimpleDeserializeRead(TypeInfo[] typeInfos, + public LazySimpleDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer, byte separator, LazySerDeParameters lazyParams) { - super(typeInfos); + super(typeInfos, useExternalBuffer); // Field length is difference between positions hence one extra. startPosition = new int[typeInfos.length + 1]; @@ -100,13 +103,14 @@ public LazySimpleDeserializeRead(TypeInfo[] typeInfos, lastColumnTakesRest = lazyParams.isLastColumnTakesRest(); fieldCount = typeInfos.length; - tempText = new Text(); extraFieldWarned = false; missingFieldWarned = false; + internalBufferLen = -1; } - public LazySimpleDeserializeRead(TypeInfo[] typeInfos, LazySerDeParameters lazyParams) { - this(typeInfos, lazyParams.getSeparators()[0], lazyParams); + public LazySimpleDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer, + LazySerDeParameters lazyParams) { + this(typeInfos, useExternalBuffer, lazyParams.getSeparators()[0], lazyParams); } // Not public since we must have the field count so every 8 fields NULL bytes can be navigated. @@ -395,16 +399,48 @@ public boolean readCheckNull() { case STRING: case CHAR: case VARCHAR: - if (isEscaped) { - LazyUtils.copyAndEscapeStringDataToText(bytes, fieldStart, fieldLength, escapeChar, tempText); - currentBytes = tempText.getBytes(); - currentBytesStart = 0; - currentBytesLength = tempText.getLength(); - } else { - // if the data is not escaped, simply copy the data. - currentBytes = bytes; - currentBytesStart = fieldStart; - currentBytesLength = fieldLength; + { + if (isEscaped) { + // First calculate the length of the output string + int outputLength = 0; + for (int i = 0; i < fieldLength; i++) { + if (bytes[fieldStart + i] != escapeChar) { + outputLength++; + } else { + outputLength++; + i++; + } + } + if (outputLength == fieldLength) { + // No escaping. + currentExternalBufferNeeded = false; + currentBytes = bytes; + currentBytesStart = fieldStart; + currentBytesLength = outputLength; + } else { + if (useExternalBuffer) { + currentExternalBufferNeeded = true; + currentExternalBufferNeededLen = outputLength; + } else { + // The copyToBuffer will reposition and re-read the input buffer. + currentExternalBufferNeeded = false; + if (internalBufferLen < outputLength) { + internalBufferLen = outputLength; + internalBuffer = new byte[internalBufferLen]; + } + copyToBuffer(internalBuffer, 0, outputLength); + currentBytes = internalBuffer; + currentBytesStart = 0; + currentBytesLength = outputLength; + } + } + } else { + // If the data is not escaped, reference the data directly. + currentExternalBufferNeeded = false; + currentBytes = bytes; + currentBytesStart = fieldStart; + currentBytesLength = fieldLength; + } } break; case BINARY: @@ -528,6 +564,32 @@ public boolean readCheckNull() { return false; } + @Override + public void copyToExternalBuffer(byte[] externalBuffer, int externalBufferStart) { + copyToBuffer(externalBuffer, externalBufferStart, currentExternalBufferNeededLen); + } + + private void copyToBuffer(byte[] buffer, int bufferStart, int bufferLength) { + int k = 0; + for (int i = 0; i < bufferLength; i++) { + byte b = bytes[fieldStart + i]; + if (b == escapeChar && i < bufferLength - 1) { + ++i; + // Check if it's '\r' or '\n' + if (bytes[fieldStart + i] == 'r') { + buffer[bufferStart + k++] = '\r'; + } else if (bytes[fieldStart + i] == 'n') { + buffer[bufferStart + k++] = '\n'; + } else { + // get the next byte + buffer[bufferStart + k++] = bytes[fieldStart + i]; + } + } else { + buffer[bufferStart + k++] = b; + } + } + } + public void logExceptionMessage(byte[] bytes, int bytesStart, int bytesLength, String dataType) { try { if(LOG.isDebugEnabled()) { diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java index 0df1d79..472ace7 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java @@ -68,11 +68,12 @@ private boolean readBeyondConfiguredFieldsWarned; private boolean bufferRangeHasExtraDataWarned; - public LazyBinaryDeserializeRead(TypeInfo[] typeInfos) { - super(typeInfos); + public LazyBinaryDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer) { + super(typeInfos, useExternalBuffer); fieldCount = typeInfos.length; tempVInt = new VInt(); tempVLong = new VLong(); + currentExternalBufferNeeded = false; readBeyondConfiguredFieldsWarned = false; bufferRangeHasExtraDataWarned = false; } diff --git serde/src/test/org/apache/hadoop/hive/serde2/binarysortable/TestBinarySortableFast.java serde/src/test/org/apache/hadoop/hive/serde2/binarysortable/TestBinarySortableFast.java index 7babf7a..49ee9c6 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/binarysortable/TestBinarySortableFast.java +++ serde/src/test/org/apache/hadoop/hive/serde2/binarysortable/TestBinarySortableFast.java @@ -115,7 +115,10 @@ private void testBinarySortableFast( for (int i = 0; i < rowCount; i++) { Object[] row = rows[i]; BinarySortableDeserializeRead binarySortableDeserializeRead = - new BinarySortableDeserializeRead(primitiveTypeInfos, columnSortOrderIsDesc); + new BinarySortableDeserializeRead( + primitiveTypeInfos, + /* useExternalBuffer */ false, + columnSortOrderIsDesc); if (useIncludeColumns) { binarySortableDeserializeRead.setColumnsToInclude(columnsToInclude); @@ -143,7 +146,10 @@ private void testBinarySortableFast( * Clip off one byte and expect to get an EOFException on the write field. */ BinarySortableDeserializeRead binarySortableDeserializeRead2 = - new BinarySortableDeserializeRead(primitiveTypeInfos, columnSortOrderIsDesc); + new BinarySortableDeserializeRead( + primitiveTypeInfos, + /* useExternalBuffer */ false, + columnSortOrderIsDesc); if (useIncludeColumns) { binarySortableDeserializeRead2.setColumnsToInclude(columnsToInclude); @@ -247,7 +253,10 @@ private void testBinarySortableFast( for (int i = 0; i < rowCount; i++) { Object[] row = rows[i]; BinarySortableDeserializeRead binarySortableDeserializeRead = - new BinarySortableDeserializeRead(primitiveTypeInfos, columnSortOrderIsDesc); + new BinarySortableDeserializeRead( + primitiveTypeInfos, + /* useExternalBuffer */ false, + columnSortOrderIsDesc); if (useIncludeColumns) { binarySortableDeserializeRead.setColumnsToInclude(columnsToInclude); diff --git serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazySimpleFast.java serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazySimpleFast.java index 66c6203..8285c06 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazySimpleFast.java +++ serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazySimpleFast.java @@ -96,7 +96,9 @@ private void testLazySimpleFast( for (int i = 0; i < rowCount; i++) { Object[] row = rows[i]; LazySimpleDeserializeRead lazySimpleDeserializeRead = - new LazySimpleDeserializeRead(writePrimitiveTypeInfos, + new LazySimpleDeserializeRead( + writePrimitiveTypeInfos, + /* useExternalBuffer */ false, separator, serdeParams); if (useIncludeColumns) { @@ -186,7 +188,9 @@ private void testLazySimpleFast( Object[] row = rows[i]; LazySimpleDeserializeRead lazySimpleDeserializeRead = - new LazySimpleDeserializeRead(writePrimitiveTypeInfos, + new LazySimpleDeserializeRead( + writePrimitiveTypeInfos, + /* useExternalBuffer */ false, separator, serdeParams); if (useIncludeColumns) { diff --git serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinaryFast.java serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinaryFast.java index 5af11cd..e64d67d 100644 --- serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinaryFast.java +++ serde/src/test/org/apache/hadoop/hive/serde2/lazybinary/TestLazyBinaryFast.java @@ -91,7 +91,9 @@ private void testLazyBinaryFast( // Specifying the right type info length tells LazyBinaryDeserializeRead which is the last // column. LazyBinaryDeserializeRead lazyBinaryDeserializeRead = - new LazyBinaryDeserializeRead(writePrimitiveTypeInfos); + new LazyBinaryDeserializeRead( + writePrimitiveTypeInfos, + /* useExternalBuffer */ false); if (useIncludeColumns) { lazyBinaryDeserializeRead.setColumnsToInclude(columnsToInclude); @@ -191,7 +193,9 @@ private void testLazyBinaryFast( // When doWriteFewerColumns, try to read more fields than exist in buffer. LazyBinaryDeserializeRead lazyBinaryDeserializeRead = - new LazyBinaryDeserializeRead(primitiveTypeInfos); + new LazyBinaryDeserializeRead( + primitiveTypeInfos, + /* useExternalBuffer */ false); if (useIncludeColumns) { lazyBinaryDeserializeRead.setColumnsToInclude(columnsToInclude); diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java index a6d932c..3caa584 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java @@ -170,6 +170,37 @@ public void setVal(int elementNum, byte[] sourceBuf) { } /** + * Preallocate space in the local buffer so the caller can fill in the value bytes themselves. + * + * Always use with getValPreallocatedBytes, getValPreallocatedStart, and setValPreallocated. + */ + public void ensureValPreallocated(int length) { + if ((nextFree + length) > buffer.length) { + increaseBufferSpace(length); + } + } + + public byte[] getValPreallocatedBytes() { + return buffer; + } + + public int getValPreallocatedStart() { + return nextFree; + } + + /** + * Set the length of the preallocated values bytes used. + * @param elementNum + * @param length + */ + public void setValPreallocated(int elementNum, int length) { + vector[elementNum] = buffer; + this.start[elementNum] = nextFree; + this.length[elementNum] = length; + nextFree += length; + } + + /** * Set a field to the concatenation of two string values. Result data is copied * into the internal buffer. *