diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 6bd404c..9c3ec8e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -498,26 +498,26 @@ private void continueProcess(HashPartition partition, HybridHashTableContainer h private void reloadHashTable(HashPartition partition, HybridHashTableContainer hybridHtContainer) throws IOException, ClassNotFoundException, HiveException, SerDeException { - // Deserialize the on-disk hash table - // We're sure this part is smaller than memory limit - BytesBytesMultiHashMap restoredHashMap = partition.getHashMapFromDisk(); - int rowCount = restoredHashMap.getNumValues(); - LOG.info("Hybrid Grace Hash Join: Deserializing spilled hash partition..."); - LOG.info("Hybrid Grace Hash Join: Number of rows restored from hashmap: " + rowCount); // Merge the sidefile into the newly created hash table // This is where the spilling may happen again KeyValueContainer kvContainer = partition.getSidefileKVContainer(); - rowCount += kvContainer.size(); + int rowCount = kvContainer.size(); LOG.info("Hybrid Grace Hash Join: Number of rows restored from KeyValueContainer: " + kvContainer.size()); + // Deserialize the on-disk hash table + // We're sure this part is smaller than memory limit + BytesBytesMultiHashMap restoredHashMap = partition.getHashMapFromDisk(rowCount); + rowCount += restoredHashMap.getNumValues(); + LOG.info("Hybrid Grace Hash Join: Deserializing spilled hash partition..."); + LOG.info("Hybrid Grace Hash Join: Number of rows in hashmap: " + rowCount); + // If based on the new key count, keyCount is smaller than a threshold, // then just load the entire restored hashmap into memory. // The size of deserialized partition shouldn't exceed half of memory limit if (rowCount * hybridHtContainer.getTableRowSize() >= hybridHtContainer.getMemoryThreshold() / 2) { - throw new RuntimeException("Hybrid Grace Hash Join: Hash table cannot be reloaded since it" + - " will be greater than memory limit. Recursive spilling is currently not supported"); + LOG.info("Hybrid Grace Hash Join: Hash table reload can fail since it will be greater than memory limit. Recursive spilling is currently not supported"); } KeyValueHelper writeHelper = hybridHtContainer.getWriteHelper(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 2312ccb..5488f8c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -146,7 +146,7 @@ private long[] refs; private int startingHashBitCount, hashBitCount; - private int metricPutConflict = 0, metricExpands = 0, metricExpandsUs = 0; + private int metricPutConflict = 0, metricGetConflict = 0, metricExpands = 0, metricExpandsMs = 0; /** We have 39 bits to store list pointer from the first record; this is size limit */ final static long MAX_WB_SIZE = ((long)1) << 38; @@ -341,6 +341,17 @@ public void clear() { this.keysAssigned = 0; } + public void expandAndRehashToTarget(int estimateNewRowCount) { + int oldRefsCount = refs.length; + int newRefsCount = oldRefsCount + estimateNewRowCount; + if (resizeThreshold <= newRefsCount) { + newRefsCount = + (Long.bitCount(newRefsCount) == 1) ? estimateNewRowCount : nextHighestPowerOfTwo(newRefsCount); + expandAndRehashImpl(newRefsCount); + LOG.info("Expand and rehash to " + newRefsCount + " from " + oldRefsCount); + } + } + private static void validateCapacity(long capacity) { if (Long.bitCount(capacity) != 1) { throw new AssertionError("Capacity must be a power of two"); @@ -405,6 +416,7 @@ private long findKeyRefToRead(byte[] key, int length) { if (isSameKey(key, length, ref, hashCode)) { return ref; } + ++metricGetConflict; probeSlot += (++i); if (i > largestNumberOfSteps) { // We know we never went that far when we were inserting. @@ -501,9 +513,13 @@ private long getFirstRecordLengthsOffset(long ref) { } private void expandAndRehash() { - long expandTime = System.nanoTime(); - final long[] oldRefs = refs; long capacity = refs.length << 1; + expandAndRehashImpl(capacity); + } + + private void expandAndRehashImpl(long capacity) { + long expandTime = System.currentTimeMillis(); + final long[] oldRefs = refs; validateCapacity(capacity); long[] newRefs = new long[(int)capacity]; @@ -533,9 +549,8 @@ private void expandAndRehash() { this.largestNumberOfSteps = maxSteps; this.hashBitCount = newHashBitCount; this.resizeThreshold = (int)(capacity * loadFactor); - metricExpandsUs += (System.nanoTime() - expandTime); + metricExpandsMs += (System.currentTimeMillis() - expandTime); ++metricExpands; - } /** @@ -753,7 +768,8 @@ private static String dumpRef(long ref) { public void debugDumpMetrics() { LOG.info("Map metrics: keys allocated " + this.refs.length +", keys assigned " + keysAssigned + ", write conflict " + metricPutConflict + ", write max dist " + largestNumberOfSteps - + ", expanded " + metricExpands + " times in " + metricExpandsUs + "us"); + + ", read conflict " + metricGetConflict + + ", expanded " + metricExpands + " times in " + metricExpandsMs + "ms"); } private void debugDumpKeyProbe(long keyOffset, int keyLength, int hashCode, int finalSlot) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index c8e6584..3ad7655 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -123,15 +123,20 @@ public BytesBytesMultiHashMap getHashMapFromMemory() { /* Restore the hashmap from disk by deserializing it. * Currently Kryo is used for this purpose. */ - public BytesBytesMultiHashMap getHashMapFromDisk() + public BytesBytesMultiHashMap getHashMapFromDisk(int initialCapacity) throws IOException, ClassNotFoundException { if (hashMapSpilledOnCreation) { - return new BytesBytesMultiHashMap(threshold, loadFactor, wbSize, -1); + return new BytesBytesMultiHashMap(Math.max(threshold, initialCapacity) , loadFactor, wbSize, -1); } else { InputStream inputStream = Files.newInputStream(hashMapLocalPath); com.esotericsoftware.kryo.io.Input input = new com.esotericsoftware.kryo.io.Input(inputStream); Kryo kryo = Utilities.runtimeSerializationKryo.get(); BytesBytesMultiHashMap restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class); + + if (initialCapacity > 0) { + restoredHashMap.expandAndRehashToTarget(initialCapacity); + } + input.close(); inputStream.close(); Files.delete(hashMapLocalPath); @@ -163,7 +168,8 @@ public boolean isHashMapOnDisk() { public HybridHashTableContainer(Configuration hconf, long keyCount, long memUsage, long tableSize) throws SerDeException { - this(HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), + this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD), @@ -171,22 +177,27 @@ public HybridHashTableContainer(Configuration hconf, long keyCount, long memUsag tableSize, keyCount, memUsage); } - private HybridHashTableContainer(int threshold, float loadFactor, int wbSize, + private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor, int wbSize, long noConditionalTaskThreshold, int memCheckFreq, long tableSize, long keyCount, long memUsage) throws SerDeException { + + int newKeyCount = HashMapWrapper.calculateTableSize( + keyCountAdj, threshold, loadFactor, keyCount); + memoryThreshold = noConditionalTaskThreshold; - tableRowSize = tableSize / keyCount; + tableRowSize = tableSize / newKeyCount; memoryCheckFrequency = memCheckFreq; int numPartitions = calcNumPartitions(tableSize, wbSize); // estimate # of partitions to create hashPartitions = new HashPartition[numPartitions]; int numPartitionsSpilledOnCreation = 0; long memoryAllocated = 0; + int initialCapacity = Math.max(newKeyCount / numPartitions, threshold / numPartitions); for (int i = 0; i < numPartitions; i++) { if (i == 0) { // We unconditionally create a hashmap for the first hash partition - hashPartitions[i] = new HashPartition(threshold, loadFactor, wbSize, memUsage, true); + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, wbSize, memUsage, true); } else { - hashPartitions[i] = new HashPartition(threshold, loadFactor, wbSize, memUsage, + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, wbSize, memUsage, memoryAllocated + wbSize < memoryThreshold); } if (isHashMapSpilledOnCreation(i)) {