diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index eff4d30..49b8f97 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -772,10 +772,10 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "hybrid grace hash join, how often (how many rows apart) we check if memory is full. " + "This number should be power of 2."), HIVEHYBRIDGRACEHASHJOINMINWBSIZE("hive.mapjoin.hybridgrace.minwbsize", 524288, "For hybrid grace" + - " hash join, the minimum write buffer size used by optimized hashtable. Default is 512 KB."), + "Hash join, the minimum write buffer size used by optimized hashtable. Default is 512 KB."), HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS("hive.mapjoin.hybridgrace.minnumpartitions", 16, "For" + - " hybrid grace hash join, the minimum number of partitions to create."), - HIVEHASHTABLEWBSIZE("hive.mapjoin.optimized.hashtable.wbsize", 10 * 1024 * 1024, + "Hybrid grace hash join, the minimum number of partitions to create."), + HIVEHASHTABLEWBSIZE("hive.mapjoin.optimized.hashtable.wbsize", 8 * 1024 * 1024, "Optimized hashtable (see hive.mapjoin.optimized.hashtable) uses a chain of buffers to\n" + "store data. This is one buffer size. HT may be slightly faster if this is larger, but for small\n" + "joins unnecessary memory will be allocated and then trimmed."), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index f80ffc5..3428e6e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -217,20 +217,20 @@ public void clear() { public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryAvailable, long estimatedTableSize, HybridHashTableConf nwayConf) - throws SerDeException, IOException { - this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), - HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), - estimatedTableSize, keyCount, memoryAvailable, nwayConf); + throws SerDeException, IOException { + this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), + HiveConf.getFloatVar(hconf,HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), + HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ), + HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), + HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), + HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), + estimatedTableSize, keyCount, memoryAvailable, nwayConf); } private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor, - int memCheckFreq, int minWbSize, int minNumParts, - long estimatedTableSize, long keyCount, - long memoryAvailable, HybridHashTableConf nwayConf) + int memCheckFreq, int minWbSize, int maxWbSize, int minNumParts, long estimatedTableSize, + long keyCount, long memoryAvailable, HybridHashTableConf nwayConf) throws SerDeException, IOException { directWriteHelper = new MapJoinBytesTableContainer.DirectKeyValueWriter(); @@ -269,8 +269,11 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac writeBufferSize = (int)(memoryThreshold / numPartitions); } } - writeBufferSize = writeBufferSize < minWbSize ? minWbSize : writeBufferSize; + + // Cap WriteBufferSize to avoid large preallocations + writeBufferSize = writeBufferSize < minWbSize ? minWbSize : Math.min(maxWbSize, writeBufferSize); LOG.info("Write buffer size: " + writeBufferSize); + hashPartitions = new HashPartition[numPartitions]; int numPartitionsSpilledOnCreation = 0; memoryUsed = 0;