diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestVectorStringExpressions.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestVectorStringExpressions.java index 5c323ba..d97152f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestVectorStringExpressions.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestVectorStringExpressions.java @@ -3178,6 +3178,8 @@ public void testRightTrimAndTruncateScalar() { public void testLoadBytesColumnVectorByValueLargeData() { BytesColumnVector bcv = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); bcv.initBuffer(10); // initialize with estimated element size 10 + // Record initial buffer size + int initialBufferSize = bcv.bufferSize(); String s = "0123456789"; while (s.length() < 500) { s += s; @@ -3191,7 +3193,8 @@ public void testLoadBytesColumnVectorByValueLargeData() { for (int i = 0; i != VectorizedRowBatch.DEFAULT_SIZE; i++) { bcv.setVal(i, b, 0, b.length); } - Assert.assertTrue(bcv.bufferSize() >= b.length * VectorizedRowBatch.DEFAULT_SIZE); + // Current buffer size should be larger than initial size + Assert.assertTrue(bcv.bufferSize() > initialBufferSize); } @Test diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java index 552982c..fa3e67b 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java +++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java @@ -43,15 +43,27 @@ * in vector[0] and isRepeating from the superclass is set to true. */ public int[] length; + + // A call to increaseBufferSpace() or ensureValPreallocated() will ensure that buffer[] points to + // a byte[] with sufficient space for the specified size. private byte[] buffer; // optional buffer to use when actually copying in data private int nextFree; // next free position in buffer + // Hang onto a byte array for holding smaller byte values + private byte[] smallBuffer; + private int smallBufferNextFree; + + private int bufferAllocationCount; + // Estimate that there will be 16 bytes per entry static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE; // Proportion of extra space to provide when allocating more buffer space. static final float EXTRA_SPACE_FACTOR = (float) 1.2; + // Largest size allowed in smallBuffer + static final int MAX_SIZE_FOR_SMALL_BUFFER = 1024 * 1024; + /** * Use this constructor for normal operation. * All column vectors should be the default size normally. @@ -79,6 +91,7 @@ public BytesColumnVector(int size) { public void reset() { super.reset(); initBuffer(0); + bufferAllocationCount = 0; } /** Set a field by reference. @@ -103,9 +116,17 @@ public void setRef(int elementNum, byte[] sourceBuf, int start, int length) { */ public void initBuffer(int estimatedValueSize) { nextFree = 0; + smallBufferNextFree = 0; // if buffer is already allocated, keep using it, don't re-allocate if (buffer != null) { + // Free up any previously allocated buffers that are referenced by vector + if (bufferAllocationCount > 0) { + for (int idx = 0; idx < vector.length; ++idx) { + vector[idx] = null; + } + buffer = smallBuffer; // In case last row was a large bytes value + } return; } @@ -115,6 +136,7 @@ public void initBuffer(int estimatedValueSize) { bufferSize = DEFAULT_BUFFER_SIZE; } buffer = new byte[bufferSize]; + smallBuffer = buffer; } /** @@ -238,17 +260,51 @@ public void setConcat(int elementNum, byte[] leftSourceBuf, int leftStart, int l * @param nextElemLength size of next element to be added */ public void increaseBufferSpace(int nextElemLength) { + // A call to increaseBufferSpace() or ensureValPreallocated() will ensure that buffer[] points to + // a byte[] with sufficient space for the specified size. + // This will either point to smallBuffer, or to a newly allocated byte array for larger values. + + if (nextElemLength > MAX_SIZE_FOR_SMALL_BUFFER) { + // Larger allocations will be special-cased and will not use the normal buffer. + // buffer/nextFree will be set to a newly allocated array just for the current row. + // The next row will require another call to increaseBufferSpace() since this new buffer should be used up. + byte[] newBuffer = new byte[nextElemLength]; + ++bufferAllocationCount; + // If the buffer was pointing to smallBuffer, then nextFree keeps track of the current state + // of the free index for smallBuffer. We now need to save this value to smallBufferNextFree + // so we don't lose this. A bit of a weird dance here. + if (smallBuffer == buffer) { + smallBufferNextFree = nextFree; + } + buffer = newBuffer; + nextFree = 0; + } else { + // This value should go into smallBuffer. + if (smallBuffer != buffer) { + // Previous row was for a large bytes value ( > MAX_SIZE_FOR_SMALL_BUFFER). + // Use smallBuffer if possible. + buffer = smallBuffer; + nextFree = smallBufferNextFree; + } - // Keep doubling buffer size until there will be enough space for next element. - int newLength = 2 * buffer.length; - while((nextFree + nextElemLength) > newLength) { - newLength *= 2; + // smallBuffer might still be out of space + if ((nextFree + nextElemLength) > buffer.length) { + int newLength = smallBuffer.length * 2; + while (newLength < nextElemLength) { + if (newLength < 0) { + throw new RuntimeException("Overflow of newLength. smallBuffer.length=" + + smallBuffer.length + ", nextElemLength=" + nextElemLength); + } + newLength *= 2; + } + smallBuffer = new byte[newLength]; + ++bufferAllocationCount; + smallBufferNextFree = 0; + // Update buffer + buffer = smallBuffer; + nextFree = 0; + } } - - // Allocate new buffer, copy data to it, and set buffer to new buffer. - byte[] newBuffer = new byte[newLength]; - System.arraycopy(buffer, 0, newBuffer, 0, nextFree); - buffer = newBuffer; } /** Copy the current object contents into the output. Only copy selected entries, diff --git a/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java b/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java new file mode 100644 index 0000000..e14abf1 --- /dev/null +++ b/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.util.Arrays; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class TestBytesColumnVector { + @Test + public void testSmallBufferReuse() { + BytesColumnVector col = new BytesColumnVector(); + int smallWriteSize = 1024; + int largeWriteSize = 1024 * 1024 * 2; + + int rowIdx = 0; + int bytesWrittenToBytes1 = 0; + col.reset(); + + // Initial write (small value) + byte[] bytes1 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1); + bytesWrittenToBytes1 += smallWriteSize; + + // Write a large value. This should use a different byte buffer + rowIdx++; + byte[] bytes2 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, (byte) 2); + assertFalse(bytes1 == bytes2); + + // Another small write. smallBuffer should be re-used for this write + rowIdx++; + byte[] bytes3 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1); + bytesWrittenToBytes1 += smallWriteSize; + assertTrue(bytes1 == bytes3); + + // Write another large value. This should use a different byte buffer + rowIdx++; + byte[] bytes4 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, (byte) 3); + assertFalse(bytes1 == bytes4); + assertFalse(bytes2 == bytes4); + + // Eventually enough small writes should result in another buffer getting created + boolean gotNewBuffer = false; + // Test is dependent on getting a new buffer within 1MB. + // This may need to change as the implementation changes. + for (int i = 0; i < 1024; ++i) { + rowIdx++; + byte[] currBytes = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1); + if (currBytes == bytes1) { + bytesWrittenToBytes1 += smallWriteSize; + } else { + gotNewBuffer = true; + break; + } + } + + assertTrue(gotNewBuffer); + + // All small writes to the first buffer should be in contiguous memory + for (int i = 0; i < bytesWrittenToBytes1; ++i) { + assertEquals((byte) 1, bytes1[i]); + } + } + + // Write a value to the column vector, and return back the byte buffer used. + private static byte[] writeToBytesColumnVector(int rowIdx, BytesColumnVector col, int writeSize, byte val) { + col.ensureValPreallocated(writeSize); + byte[] bytes = col.getValPreallocatedBytes(); + int startIdx = col.getValPreallocatedStart(); + Arrays.fill(bytes, startIdx, startIdx + writeSize, val); + col.setValPreallocated(rowIdx, writeSize); + return bytes; + } +}