diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 2ba622e..d1d66f5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -184,19 +184,6 @@ public BytesBytesMultiHashMap(int initialCapacity, this(initialCapacity, loadFactor, wbSize, -1); } - public class ThreadSafeGetter { - private WriteBuffers.Position position = new WriteBuffers.Position(); - public byte getValueResult(byte[] key, int offset, int length, - BytesBytesMultiHashMap.Result hashMapResult) { - return BytesBytesMultiHashMap.this.getValueResult(key, offset, length, hashMapResult, position); - } - - public void populateValue(WriteBuffers.ByteSegmentRef valueRef) { - // Convenience method, populateValue is thread-safe. - BytesBytesMultiHashMap.this.populateValue(valueRef); - } - } - /** * The result of looking up a key in the multi-hash map. * @@ -232,6 +219,14 @@ public void populateValue(WriteBuffers.ByteSegmentRef valueRef) { public Result() { hasRows = false; byteSegmentRef = new WriteBuffers.ByteSegmentRef(); + readPos = new WriteBuffers.Position(); + } + + /** + * Return the thread-safe read position. + */ + public WriteBuffers.Position getReadPos() { + return readPos; } /** @@ -260,14 +255,11 @@ public boolean isSingleRow() { * Whether there are multiple values (true) or just a single value (false). * @param offsetAfterListRecordKeyLen * The offset of just after the key length in the list record. Or, 0 when single row. - * @param readPos - * Holds mutable read position for thread safety. */ public void set(BytesBytesMultiHashMap hashMap, long firstOffset, boolean hasList, - long offsetAfterListRecordKeyLen, WriteBuffers.Position readPos) { + long offsetAfterListRecordKeyLen) { this.hashMap = hashMap; - this.readPos = readPos; this.firstOffset = firstOffset; this.hasList = hasList; @@ -481,29 +473,23 @@ public void put(KvSource kv, int keyHashCode) throws SerDeException { ++numValues; } - public ThreadSafeGetter createGetterForThread() { - return new ThreadSafeGetter(); - } - - /** Not thread-safe! Use createGetterForThread. */ - public byte getValueResult(byte[] key, int offset, int length, Result hashMapResult) { - return getValueResult(key, offset, length, hashMapResult, writeBuffers.getReadPosition()); - } - /** * Finds a key. Values can be read with the supplied result object. * + * Important Note: The caller is expected to pre-allocate the hashMapResult and not + * share it among other threads. + * * @param key Key buffer. * @param offset the offset to the key in the buffer * @param hashMapResult The object to fill in that can read the values. - * @param readPos Holds mutable read position for thread safety. * @return The state byte. */ - private byte getValueResult(byte[] key, int offset, int length, Result hashMapResult, - WriteBuffers.Position readPos) { + public byte getValueResult(byte[] key, int offset, int length, Result hashMapResult) { hashMapResult.forget(); + WriteBuffers.Position readPos = hashMapResult.getReadPos(); + // First, find first record for the key. long ref = findKeyRefToRead(key, offset, length, readPos); if (ref == 0) { @@ -515,8 +501,7 @@ private byte getValueResult(byte[] key, int offset, int length, Result hashMapRe // This relies on findKeyRefToRead doing key equality check and leaving read ptr where needed. long offsetAfterListRecordKeyLen = hasList ? writeBuffers.getReadPoint(readPos) : 0; - hashMapResult.set(this, Ref.getOffset(ref), hasList, offsetAfterListRecordKeyLen, - readPos); + hashMapResult.set(this, Ref.getOffset(ref), hasList, offsetAfterListRecordKeyLen); return Ref.getStateByte(ref); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index 6deaafc..34b3aa9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -500,7 +500,6 @@ public int directSpillPartitionId() { private byte aliasFilter; /** Hash table wrapper specific to the container. */ - private final BytesBytesMultiHashMap.ThreadSafeGetter threadSafeHashMapGetter; private BytesBytesMultiHashMap.Result hashMapResult; /** @@ -520,14 +519,13 @@ public ReusableRowContainer() { valueStruct = null; // No rows? } uselessIndirection = new ByteArrayRef(); - threadSafeHashMapGetter = hashMap.createGetterForThread(); hashMapResult = new BytesBytesMultiHashMap.Result(); clearRows(); } public JoinUtil.JoinResult setFromOutput(Output output) { - aliasFilter = threadSafeHashMapGetter.getValueResult( + aliasFilter = hashMap.getValueResult( output.getData(), 0, output.getLength(), hashMapResult); dummyRow = null; if (hashMapResult.hasRows()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java index d1b8939..b40564c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java @@ -41,7 +41,7 @@ private VectorExpression conditionEvaluator = null; // Temporary selected vector - private final int[] temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE]; + private transient int[] temporarySelected; // filterMode is 1 if condition is always true, -1 if always false // and 0 if condition needs to be computed. @@ -77,6 +77,9 @@ public VectorFilterOperator() { filterMode = -1; } } + + temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE]; + return result; }