diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 536b92c..4ec5dd7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -92,24 +92,25 @@ public void load(MapJoinTableContainer[] mapJoinTables, nwayConf = new HybridHashTableConf(); // Find the biggest small table; also calculate total data size of all small tables - long maxSize = 0; // the size of the biggest small table + long maxSize = Long.MIN_VALUE; // the size of the biggest small table for (int pos = 0; pos < mapJoinTables.length; pos++) { if (pos == desc.getPosBigTable()) { continue; } totalSize += desc.getParentDataSizes().get(pos); - biggest = desc.getParentDataSizes().get(pos) > maxSize ? pos : biggest; - maxSize = desc.getParentDataSizes().get(pos) > maxSize ? desc.getParentDataSizes().get(pos) - : maxSize; + if (maxSize < desc.getParentDataSizes().get(pos)) { + maxSize = desc.getParentDataSizes().get(pos); + biggest = pos; + } } // Using biggest small table, calculate number of partitions to create for each small table - float percentage = (float) maxSize / totalSize; - long memory = (long) (noConditionalTaskThreshold * percentage); + long memory = estimateHybridHashTableMemory(mapJoinTables, desc, biggest, totalSize, + noConditionalTaskThreshold); int numPartitions = 0; try { numPartitions = HybridHashTableContainer.calcNumPartitions(memory, - desc.getParentDataSizes().get(biggest), + maxSize, HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), nwayConf); @@ -160,8 +161,8 @@ public void load(MapJoinTableContainer[] mapJoinTables, if (useHybridGraceHashJoin) { if (mapJoinTables.length > 2) { // Allocate n-way join memory proportionally - float percentage = (float) desc.getParentDataSizes().get(pos) / totalSize; - memory = (long) (noConditionalTaskThreshold * percentage); + memory = estimateHybridHashTableMemory(mapJoinTables, desc, pos, totalSize, + noConditionalTaskThreshold); } else { // binary join memory = noConditionalTaskThreshold; } @@ -186,6 +187,16 @@ public void load(MapJoinTableContainer[] mapJoinTables, } } + private static long estimateHybridHashTableMemory(MapJoinTableContainer[] mapJoinTables, MapJoinDesc desc, + int pos, long totalSize, long noConditionalTaskThreshold) { + float percentage = ((float) 1) / Math.max(mapJoinTables.length - 1, 1); + if (desc.getParentDataSizes().get(pos) != 0 && totalSize != 0) { + percentage = (float) desc.getParentDataSizes().get(pos) / totalSize; + } + long memory = (long) (noConditionalTaskThreshold * percentage); + return memory; + } + private String describeOi(String desc, ObjectInspector keyOi) { for (StructField field : ((StructObjectInspector)keyOi).getAllStructFieldRefs()) { ObjectInspector oi = field.getFieldObjectInspector();