diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java index f66916b..ad2fffa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java @@ -338,10 +338,11 @@ public void init() throws HiveException { * @param batch * @param batchIndex * @param logicalColumnIndex + * @param canReferenceBytes * @throws IOException */ private void deserializeRowColumn(VectorizedRowBatch batch, int batchIndex, - int logicalColumnIndex) throws IOException { + int logicalColumnIndex, boolean canReferenceBytes) throws IOException { Category sourceCategory = sourceCategories[logicalColumnIndex]; if (sourceCategory == null) { /* @@ -406,11 +407,19 @@ private void deserializeRowColumn(VectorizedRowBatch batch, int batchIndex, break; case BINARY: case STRING: - ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( - batchIndex, - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - deserializeRead.currentBytesLength); + if (deserializeRead.currentBytesAreByReference && canReferenceBytes) { + ((BytesColumnVector) batch.cols[projectionColumnNum]).setRef( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength); + } else { + ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength); + } break; case VARCHAR: { @@ -421,11 +430,19 @@ private void deserializeRowColumn(VectorizedRowBatch batch, int batchIndex, deserializeRead.currentBytesStart, deserializeRead.currentBytesLength, maxLengths[logicalColumnIndex]); - ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( - batchIndex, - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - adjustedLength); + if (deserializeRead.currentBytesAreByReference && canReferenceBytes) { + ((BytesColumnVector) batch.cols[projectionColumnNum]).setRef( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength); + } else { + ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + adjustedLength); + } } break; case CHAR: @@ -437,11 +454,19 @@ private void deserializeRowColumn(VectorizedRowBatch batch, int batchIndex, deserializeRead.currentBytesStart, deserializeRead.currentBytesLength, maxLengths[logicalColumnIndex]); - ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( - batchIndex, - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - adjustedLength); + if (deserializeRead.currentBytesAreByReference && canReferenceBytes) { + ((BytesColumnVector) batch.cols[projectionColumnNum]).setRef( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + adjustedLength); + } else { + ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + adjustedLength); + } } break; case DECIMAL: @@ -663,12 +688,39 @@ public void deserialize(VectorizedRowBatch batch, int batchIndex) throws IOExcep if (isConvert[i]) { deserializeConvertRowColumn(batch, batchIndex, i); } else { - deserializeRowColumn(batch, batchIndex, i); + deserializeRowColumn(batch, batchIndex, i, /* canReferenceBytes */ false); + } + } + deserializeRead.extraFieldsCheck(); + } + + /** + * Deserialize a row from the range of bytes specified by setBytes. + * + * Use this method instead of deserialize when the bytes source for DeserializeRead is + * immutable and its STRING, CHAR/VARCHAR data can be set in BytesColumnVector with setRef + * instead of with setVal (i.e. a data copy). + * + * Use getDetailedReadPositionString to get detailed read position information to help + * diagnose exceptions that are thrown... + * + * @param batch + * @param batchIndex + * @throws IOException + */ + public void deserializeByRef(VectorizedRowBatch batch, int batchIndex) throws IOException { + final int count = isConvert.length; + for (int i = 0; i < count; i++) { + if (isConvert[i]) { + deserializeConvertRowColumn(batch, batchIndex, i); + } else { + deserializeRowColumn(batch, batchIndex, i, /* canReferenceBytes */ true); } } deserializeRead.extraFieldsCheck(); } + public String getDetailedReadPositionString() { return deserializeRead.getDetailedReadPositionString(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index 469f86a..be89d96 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -144,7 +144,8 @@ protected void doSmallTableDeserializeRow(VectorizedRowBatch batch, int batchInd smallTableVectorDeserializeRow.setBytes(bytes, offset, length); try { - smallTableVectorDeserializeRow.deserialize(batch, batchIndex); + // Our hash tables are immutable. We can safely do by reference STRING, CHAR/VARCHAR, etc. + smallTableVectorDeserializeRow.deserializeByRef(batch, batchIndex); } catch (Exception e) { throw new HiveException( "\nHashMapResult detail: " + diff --git serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java index 003a2d4..5038153 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java +++ serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java @@ -276,14 +276,63 @@ public boolean readCheckNull() throws IOException { case CHAR: case VARCHAR: { - if (tempText == null) { - tempText = new Text(); + /* + * This code is a modified version of BinarySortableSerDe.deserializeText that lets us + * detect if we can return a reference to the bytes directly. + */ + + // Get the actual length first + final int bytesStart = inputByteBuffer.tell(); + final boolean invert = columnSortOrderIsDesc[fieldIndex]; + int length = 0; + do { + byte b = inputByteBuffer.read(invert); + if (b == 0) { + // end of string + break; + } + if (b == 1) { + // the last char is an escape char. read the actual char + inputByteBuffer.read(invert); + } + length++; + } while (true); + + if (length == inputByteBuffer.tell() - bytesStart) { + // No escaping happened, so we are can safely reference. + currentBytesAreByReference = true; + currentBytes = inputByteBuffer.getData(); + currentBytesStart = bytesStart; + currentBytesLength = length; + } else { + // Escaping happened, we need to copy byte-by-byte. + if (tempText == null) { + tempText = new Text(); + } + // 1. Set the length first. + tempText.set(inputByteBuffer.getData(), bytesStart, length); + // 2. Reset the pointer. + inputByteBuffer.seek(bytesStart); + // 3. Copy the data. + byte[] rdata = tempText.getBytes(); + for (int i = 0; i < length; i++) { + byte b = inputByteBuffer.read(invert); + if (b == 1) { + // The last char is an escape char, read the actual char. + // The serialization format escape \0 to \1, and \1 to \2, + // to make sure the string is null-terminated. + b = (byte) (inputByteBuffer.read(invert) - 1); + } + rdata[i] = b; + } + // 4. Read the null terminator. + byte b = inputByteBuffer.read(invert); + assert (b == 0); + currentBytesAreByReference = false; + currentBytes = tempText.getBytes(); + currentBytesStart = 0; + currentBytesLength = tempText.getLength(); } - BinarySortableSerDe.deserializeText( - inputByteBuffer, columnSortOrderIsDesc[fieldIndex], tempText); - currentBytes = tempText.getBytes(); - currentBytesStart = 0; - currentBytesLength = tempText.getLength(); } break; case INTERVAL_YEAR_MONTH: diff --git serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java index 8f3e771..9981cc9 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java +++ serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java @@ -96,6 +96,9 @@ public DeserializeRead(TypeInfo[] typeInfos) { // No writable needed for this data type. } } + + // Assume the bytes are always by reference. + currentBytesAreByReference = true; } columnsToInclude = null; @@ -194,7 +197,10 @@ public void setColumnsToInclude(boolean[] columnsToInclude) { * * For CHAR and VARCHAR when the caller takes responsibility for * truncation/padding issues. + * + * When currentBytesAreByReference is true, currentBytes references a range of the input buffer. */ + public boolean currentBytesAreByReference; public byte[] currentBytes; public int currentBytesStart; public int currentBytesLength; diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java index ac44390..de6bd20 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java @@ -396,12 +396,15 @@ public boolean readCheckNull() { case CHAR: case VARCHAR: if (isEscaped) { + // CONSIDER: Is copying always necessary? + currentBytesAreByReference = false; LazyUtils.copyAndEscapeStringDataToText(bytes, fieldStart, fieldLength, escapeChar, tempText); currentBytes = tempText.getBytes(); currentBytesStart = 0; currentBytesLength = tempText.getLength(); } else { - // if the data is not escaped, simply copy the data. + // if the data is not escaped, reference the data. + currentBytesAreByReference = true; currentBytes = bytes; currentBytesStart = fieldStart; currentBytesLength = fieldLength; diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java index 0df1d79..e74ba09 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.WritableUtils; +import com.google.common.base.Preconditions; + /* * Directly deserialize with the caller reading field-by-field the LazyBinary serialization format. * @@ -75,6 +77,9 @@ public LazyBinaryDeserializeRead(TypeInfo[] typeInfos) { tempVLong = new VLong(); readBeyondConfiguredFieldsWarned = false; bufferRangeHasExtraDataWarned = false; + + // Yes, for this format, we always refer to the bytes by reference. + Preconditions.checkState(currentBytesAreByReference); } // Not public since we must have the field count so every 8 fields NULL bytes can be navigated.