diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index 3ad7655..2208a13 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -20,6 +20,7 @@ import com.esotericsoftware.kryo.Kryo; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -27,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.persistence.BytesBytesMultiHashMap.ThreadSafeGetter; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -36,6 +38,7 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.WriteBuffers; import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.WriteBuffers.ByteSegmentRef; import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryStructObjectInspector; @@ -90,7 +93,9 @@ */ public static class HashPartition { BytesBytesMultiHashMap hashMap; // In memory hashMap + private ThreadSafeGetter threadLocalGetter; // Concurrent reader for hashMap KeyValueContainer sidefileKVContainer; // Stores small table key/value pairs + @SuppressWarnings("rawtypes") ObjectContainer matchfileObjContainer; // Stores big table rows Path hashMapLocalPath; // Local file system path for spilled hashMap boolean hashMapOnDisk; // Status of hashMap. true: on disk, false: in memory @@ -106,6 +111,7 @@ public HashPartition(int threshold, float loadFactor, int wbSize, long memUsage, boolean createHashMap) { if (createHashMap) { hashMap = new BytesBytesMultiHashMap(threshold, loadFactor, wbSize, memUsage); + threadLocalGetter = hashMap.createGetterForThread(); } else { hashMapSpilledOnCreation = true; hashMapOnDisk = true; @@ -625,14 +631,13 @@ public ReusableRowContainer() { return JoinUtil.JoinResult.SPILL; } else { - byte aliasFilter = hashPartitions[partitionId].hashMap.getValueRefs( - output.getData(), output.getLength(), refs); + byte aliasFilter = hashPartitions[partitionId].threadLocalGetter + .getValueRefs(output.getData(), output.getLength(), refs); this.aliasFilter = refs.isEmpty() ? (byte) 0xff : aliasFilter; this.dummyRow = null; if (refs.isEmpty()) { return JoinUtil.JoinResult.NOMATCH; - } - else { + } else { return JoinUtil.JoinResult.MATCH; } }