diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index eff4d30..92d7382 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -772,9 +772,12 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "hybrid grace hash join, how often (how many rows apart) we check if memory is full. " + "This number should be power of 2."), HIVEHYBRIDGRACEHASHJOINMINWBSIZE("hive.mapjoin.hybridgrace.minwbsize", 524288, "For hybrid grace" + - " hash join, the minimum write buffer size used by optimized hashtable. Default is 512 KB."), + "Hash join, the minimum write buffer size used by optimized hashtable. Default is 512 KB."), HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS("hive.mapjoin.hybridgrace.minnumpartitions", 16, "For" + - " hybrid grace hash join, the minimum number of partitions to create."), + "Hybrid grace hash join, the minimum number of partitions to create."), + HIVEHYBRIDGRACEHASHPREALLOCMEMORY("hive.mapjoin.hybridgrace.prealloc.percentmemory", (float) 0.5, + "Portion of memory to preallocate for hash table, this is fraction of estimate data size\n" + + "if more memory is needed more WriteBuffers will be allocated"), HIVEHASHTABLEWBSIZE("hive.mapjoin.optimized.hashtable.wbsize", 10 * 1024 * 1024, "Optimized hashtable (see hive.mapjoin.optimized.hashtable) uses a chain of buffers to\n" + "store data. This is one buffer size. HT may be slightly faster if this is larger, but for small\n" + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index f80ffc5..80ed9f9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -217,20 +217,20 @@ public void clear() { public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryAvailable, long estimatedTableSize, HybridHashTableConf nwayConf) - throws SerDeException, IOException { - this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), - HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), - estimatedTableSize, keyCount, memoryAvailable, nwayConf); + throws SerDeException, IOException { + this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT), HiveConf + .getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), HiveConf.getFloatVar(hconf, + HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ), HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), HiveConf.getFloatVar(hconf, + HiveConf.ConfVars.HIVEHYBRIDGRACEHASHPREALLOCMEMORY), HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), estimatedTableSize, keyCount, + memoryAvailable, nwayConf); } private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor, - int memCheckFreq, int minWbSize, int minNumParts, - long estimatedTableSize, long keyCount, - long memoryAvailable, HybridHashTableConf nwayConf) + int memCheckFreq, int minWbSize, int minNumParts, long estimatedTableSize, long keyCount, + long memoryAvailable, float preAllocMemoryFraction, HybridHashTableConf nwayConf) throws SerDeException, IOException { directWriteHelper = new MapJoinBytesTableContainer.DirectKeyValueWriter(); @@ -269,6 +269,12 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac writeBufferSize = (int)(memoryThreshold / numPartitions); } } + + // If preAllocMemoryFraction is set to 0 then WriteBufferSize will default to + // HIVEHYBRIDGRACEHASHJOINMINWBSIZE + writeBufferSize = (int) (writeBufferSize * preAllocMemoryFraction); + LOG.info("Will preallocate: " + (int) (writeBufferSize * numPartitions) + " out of " + memoryThreshold); + writeBufferSize = writeBufferSize < minWbSize ? minWbSize : writeBufferSize; LOG.info("Write buffer size: " + writeBufferSize); hashPartitions = new HashPartition[numPartitions];