diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index f104c13a49..d4d18ef4fa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -645,9 +645,26 @@ private void checkHashModeEfficiency() throws HiveException { LOG.debug(String.format("checkHashModeEfficiency: HT:%d RC:%d MIN:%d", numEntriesHashTable, sumBatchSize, (long)(sumBatchSize * minReductionHashAggr))); } - if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) { + /* + * The grouping sets expand the hash sizes by producing intermediate keys. 3 grouping sets + * of (),(col1),(col1,col2), will turn 10 rows into 30 rows. If the col1 has an nDV of 2 and + * col2 has nDV of 5, then this turns into a maximum of 1+3+(2*5) or 14 keys into the + * hashtable. + * + * So you get 10 rows in and 14 rows out, which is a reduction of ~2x vs Streaming mode, + * but it is an increase if the grouping-set is not accounted for. + * + * For performance, it is definitely better to send 14 rows out to shuffle and not 30. + * + * Particularly if the same nDVs are repeated for a thousand rows, this would send a + * thousand rows via streaming to a single reducer which owns the empty grouping set, + * instead of sending 1 from the hash. + * + */ + final int groupingExpansion = (groupingSets != null) ? groupingSets.length : 1; + final long intermediateKeyCount = sumBatchSize * groupingExpansion; + if (numEntriesHashTable > intermediateKeyCount * minReductionHashAggr) { flush(true); - changeToStreamingMode(); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java index e8586fce25..12df385c3a 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -111,9 +112,10 @@ private static AggregationDesc buildAggregationDesc( String column, TypeInfo typeInfo) { - ExprNodeDesc inputColumn = buildColumnDesc(ctx, column, typeInfo); - ArrayList params = new ArrayList(); + TypeInfo[] typeInfos = new TypeInfo[] {typeInfo}; + ArrayList params = new ArrayList(1); + ExprNodeDesc inputColumn = buildColumnDesc(ctx, column, typeInfo); params.add(inputColumn); AggregationDesc agg = new AggregationDesc(); @@ -121,10 +123,7 @@ private static AggregationDesc buildAggregationDesc( agg.setMode(mode); agg.setParameters(params); - TypeInfo[] typeInfos = new TypeInfo[] { typeInfo }; - final GenericUDAFEvaluator evaluator; - PrimitiveCategory primitiveCategory = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory(); try { switch (aggregate) { case "count": @@ -232,14 +231,13 @@ private static AggregationDesc buildAggregationDescCountStar( return new Pair(desc, vectorDesc); } - private static Pair buildKeyGroupByDesc( VectorizationContext ctx, String aggregate, String column, TypeInfo dataTypeInfo, - String key, - TypeInfo keyTypeInfo) { + String[] keys, + TypeInfo[] keyTypeInfos) { Pair pair = buildGroupByDescType(ctx, aggregate, GenericUDAFEvaluator.Mode.PARTIAL1, column, dataTypeInfo); @@ -247,10 +245,14 @@ private static AggregationDesc buildAggregationDescCountStar( VectorGroupByDesc vectorDesc = pair.snd; vectorDesc.setProcessingMode(ProcessingMode.HASH); - ExprNodeDesc keyExp = buildColumnDesc(ctx, key, keyTypeInfo); - ArrayList keys = new ArrayList(); - keys.add(keyExp); - desc.setKeys(keys); + ArrayList keyExprs = new ArrayList(keys.length); + for (int i = 0; i < keys.length; i++) { + final String key = keys[i]; + final TypeInfo keyTypeInfo = keyTypeInfos[i]; + final ExprNodeDesc keyExp = buildColumnDesc(ctx, key, keyTypeInfo); + keyExprs.add(keyExp); + } + desc.setKeys(keyExprs); desc.getOutputColumnNames().add("_col1"); @@ -269,7 +271,8 @@ public void testMemoryPressureFlush() throws HiveException { Pair pair = buildKeyGroupByDesc (ctx, "max", "Value", TypeInfoFactory.longTypeInfo, - "Key", TypeInfoFactory.longTypeInfo); + new String[] {"Key"}, + new TypeInfo[] {TypeInfoFactory.longTypeInfo}); GroupByDesc desc = pair.fst; VectorGroupByDesc vectorDesc = pair.snd; @@ -359,7 +362,8 @@ public void testMemoryPressureFlushLlap() throws HiveException { Pair pair = buildKeyGroupByDesc(ctx, "max", "Value", TypeInfoFactory.longTypeInfo, - "Key", TypeInfoFactory.longTypeInfo); + new String[] {"Key"}, + new TypeInfo[] {TypeInfoFactory.longTypeInfo}); GroupByDesc desc = pair.fst; VectorGroupByDesc vectorDesc = pair.snd; @@ -440,6 +444,170 @@ public void remove() { } } + @Test + public void testRollupAggregation() throws HiveException { + + List mapColumnNames = new ArrayList(); + mapColumnNames.add("k1"); + mapColumnNames.add("k2"); + mapColumnNames.add("v"); + VectorizationContext ctx = new VectorizationContext("name", mapColumnNames); + + // select count(v) from name group by rollup (k1,k2); + + Pair pair = buildKeyGroupByDesc (ctx, "count", + "v", TypeInfoFactory.longTypeInfo, + new String[] { "k1", "k2" }, + new TypeInfo[] {TypeInfoFactory.longTypeInfo, TypeInfoFactory.longTypeInfo}); + GroupByDesc desc = pair.fst; + VectorGroupByDesc vectorDesc = pair.snd; + + desc.setGroupingSetsPresent(true); + ArrayList groupingSets = new ArrayList<>(); + // groupingSets + groupingSets.add(0L); + groupingSets.add(1L); + groupingSets.add(2L); + desc.setListGroupingSets(groupingSets); + // add grouping sets dummy key + ExprNodeDesc groupingSetDummyKey = new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 0L); + // this only works because we used an arraylist in buildKeyGroupByDesc + // don't do this in actual compiler + desc.getKeys().add(groupingSetDummyKey); + // groupingSet Position + desc.setGroupingSetPosition(2); + + CompilationOpContext cCtx = new CompilationOpContext(); + + desc.setMinReductionHashAggr(0.5f); + // Set really low check interval setting + hconf.set("hive.groupby.mapaggr.checkinterval", "10"); + hconf.set("hive.vectorized.groupby.checkinterval", "10"); + + Operator groupByOp = OperatorFactory.get(cCtx, desc); + + VectorGroupByOperator vgo = + (VectorGroupByOperator) Vectorizer.vectorizeGroupByOperator(groupByOp, ctx, vectorDesc); + + FakeCaptureVectorToRowOutputOperator out = FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo); + vgo.initialize(hconf, null); + + this.outputRowCount = 0; + out.setOutputInspector(new FakeCaptureVectorToRowOutputOperator.OutputInspector() { + @Override + public void inspectRow(Object row, int tag) throws HiveException { + ++outputRowCount; + } + }); + + // k1 has nDV of 2 + Iterable k1 = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int value = 0; + int ndv = 2; + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Object next() { + value = (value + 1) % ndv; + return value; + } + + @Override + public void remove() { + } + }; + } + }; + + // ndv of 5 + Iterable k2 = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int value = 0; + int ndv = 6; + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Object next() { + value = (value + 1) % ndv; + return value; + } + + @Override + public void remove() { + } + }; + } + }; + + // just return 1, we're running "count" + Iterable v = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int value = 0; + int ndv = 1; + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Object next() { + value = (value + 1) % ndv; + return value; + } + + @Override + public void remove() { + } + }; + } + }; + + // vrb of 1 row each + FakeVectorRowBatchFromObjectIterables data = new FakeVectorRowBatchFromObjectIterables( + 2, + new String[] {"long", "long", "long", "long"}, + k1, + k2, + v, + v); // output col + + long countRowsProduced = 0; + for (VectorizedRowBatch unit: data) { + // after 24 rows, we'd have seen all the keys + // find 14 keys in the hashmap + // but 24*0.5 = 12 + // won't turn off hash mode because of the 3 grouping sets + // if it turns off the hash mode, we'd get 14 + 3*(100-24) rows + countRowsProduced += unit.size; + vgo.process(unit, 0); + + if (countRowsProduced >= 100) { + break; + } + + } + vgo.close(false); + // all groupings + // 10 keys generates 14 rows with the rollup + assertEquals(1+3+10, outputRowCount); + } + @Test public void testMaxHTEntriesFlush() throws HiveException { @@ -450,7 +618,8 @@ public void testMaxHTEntriesFlush() throws HiveException { Pair pair = buildKeyGroupByDesc (ctx, "max", "Value", TypeInfoFactory.longTypeInfo, - "Key", TypeInfoFactory.longTypeInfo); + new String[] {"Key"}, + new TypeInfo[] {TypeInfoFactory.longTypeInfo}); GroupByDesc desc = pair.fst; VectorGroupByDesc vectorDesc = pair.snd; @@ -2784,7 +2953,9 @@ public void testAggregateLongKeyIterable ( Set keys = new HashSet(); Pair pair = buildKeyGroupByDesc (ctx, aggregateName, "Value", - TypeInfoFactory.longTypeInfo, "Key", TypeInfoFactory.longTypeInfo); + TypeInfoFactory.longTypeInfo, + new String[] {"Key"}, + new TypeInfo[] {TypeInfoFactory.longTypeInfo}); GroupByDesc desc = pair.fst; VectorGroupByDesc vectorDesc = pair.snd; @@ -2856,7 +3027,9 @@ public void testAggregateStringKeyIterable ( Set keys = new HashSet(); Pair pair = buildKeyGroupByDesc (ctx, aggregateName, "Value", - dataTypeInfo, "Key", TypeInfoFactory.stringTypeInfo); + dataTypeInfo, + new String[] {"Key"}, + new TypeInfo[] {TypeInfoFactory.stringTypeInfo}); GroupByDesc desc = pair.fst; VectorGroupByDesc vectorDesc = pair.snd;