diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index f104c13a49..5cbc772be4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -645,9 +645,26 @@ private void checkHashModeEfficiency() throws HiveException { LOG.debug(String.format("checkHashModeEfficiency: HT:%d RC:%d MIN:%d", numEntriesHashTable, sumBatchSize, (long)(sumBatchSize * minReductionHashAggr))); } - if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) { + /* + * The grouping sets expand the hash sizes by producing intermediate keys. 3 grouping sets + * of (),(col1),(col1,col2), will turn 10 rows into 30 rows. If the col1 has an nDV of 2 and + * col2 has nDV of 4, then this turns into a maximum of 1+2+(2*4) or 11 keys into the + * hashtable. + * + * So you get 10 rows in and 11 rows out, which is a reduction of ~3x from Streaming mode, + * but it is an increase if the grouping-set is not accounted for. + * + * For performance, it is definitely better to send 11 rows out to shuffle and not 30. + * + * Particularly if the same nDVs are repeated for a thousand rows, this would send a + * thousand rows via streaming to a single reducer which owns the empty grouping set, + * instead of sending 1 from the hash. + * + */ + final int groupingExpansion = (groupingSets != null) ? groupingSets.length : 1; + final long intermediateKeyCount = sumBatchSize * groupingExpansion; + if (numEntriesHashTable > intermediateKeyCount * minReductionHashAggr) { flush(true); - changeToStreamingMode(); } }