diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index a516d60..0d80c9e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -33,6 +33,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.IConfigureJobConf; @@ -148,6 +150,7 @@ private float memoryThreshold; + private boolean isLlap = false; /** * Interface for processing mode: global, hash, unsorted streaming, or group batch */ @@ -517,7 +520,7 @@ private void computeMemoryLimits() { aggregationBatchInfo.getAggregatorsFixedSize(); MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); - maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + maxMemory = isLlap ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax(); memoryThreshold = conf.getMemoryThreshold(); // Tests may leave this unitialized, so better set it to 1 if (memoryThreshold == 0.0f) { @@ -527,13 +530,14 @@ private void computeMemoryLimits() { maxHashTblMemory = (int)(maxMemory * memoryThreshold); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)", - maxHashTblMemory/1024/1024, - maxMemory/1024/1024, - memoryThreshold, - fixedHashEntrySize, - keyWrappersBatch.getKeysFixedSize(), - aggregationBatchInfo.getAggregatorsFixedSize())); + LOG.debug("GBY memory limits - isLlap: {} maxMemory: {} ({} * {}) fixSize:{} (key:{} agg:{})", + isLlap, + LlapUtil.humanReadableByteCount(maxHashTblMemory), + LlapUtil.humanReadableByteCount(maxMemory), + memoryThreshold, + fixedHashEntrySize, + keyWrappersBatch.getKeysFixedSize(), + aggregationBatchInfo.getAggregatorsFixedSize()); } } @@ -977,6 +981,7 @@ private void setupGroupingSets() { @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); + isLlap = LlapProxy.isDaemon(); VectorExpression.doTransientInit(keyExpressions); List objectInspectors = new ArrayList(); @@ -1233,4 +1238,7 @@ public void configureJobConf(JobConf job) { } } + public long getMaxMemory() { + return maxMemory; + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java index fe1375b..278f167 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -288,6 +290,8 @@ public void testMemoryPressureFlush() throws HiveException { FakeCaptureVectorToRowOutputOperator out = FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo); vgo.initialize(hconf, null); + long expected = vgo.getMaxMemory(); + assertEquals(expected, maxMemory); this.outputRowCount = 0; out.setOutputInspector(new FakeCaptureVectorToRowOutputOperator.OutputInspector() { @Override @@ -345,6 +349,98 @@ public void remove() { } @Test + public void testMemoryPressureFlushLlap() throws HiveException { + + try { + List mapColumnNames = new ArrayList(); + mapColumnNames.add("Key"); + mapColumnNames.add("Value"); + VectorizationContext ctx = new VectorizationContext("name", mapColumnNames); + + Pair pair = buildKeyGroupByDesc(ctx, "max", + "Value", TypeInfoFactory.longTypeInfo, + "Key", TypeInfoFactory.longTypeInfo); + GroupByDesc desc = pair.fst; + VectorGroupByDesc vectorDesc = pair.snd; + + LlapProxy.setDaemon(true); + + CompilationOpContext cCtx = new CompilationOpContext(); + + Operator groupByOp = OperatorFactory.get(cCtx, desc); + + VectorGroupByOperator vgo = + (VectorGroupByOperator) Vectorizer.vectorizeGroupByOperator(groupByOp, ctx, vectorDesc); + + FakeCaptureVectorToRowOutputOperator out = FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo); + long maxMemory=512*1024*1024L; + vgo.getConf().setMaxMemoryAvailable(maxMemory); + float threshold = 100.0f*1024.0f/maxMemory; + desc.setMemoryThreshold(threshold); + vgo.initialize(hconf, null); + + long got = vgo.getMaxMemory(); + assertEquals(maxMemory, got); + this.outputRowCount = 0; + out.setOutputInspector(new FakeCaptureVectorToRowOutputOperator.OutputInspector() { + @Override + public void inspectRow(Object row, int tag) throws HiveException { + ++outputRowCount; + } + }); + + Iterable it = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + long value = 0; + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Object next() { + return ++value; + } + + @Override + public void remove() { + } + }; + } + }; + + FakeVectorRowBatchFromObjectIterables data = new FakeVectorRowBatchFromObjectIterables( + 100, + new String[]{"long", "long"}, + it, + it); + + // The 'it' data source will produce data w/o ever ending + // We want to see that memory pressure kicks in and some + // entries in the VGBY are flushed. + long countRowsProduced = 0; + for (VectorizedRowBatch unit : data) { + countRowsProduced += 100; + vgo.process(unit, 0); + if (0 < outputRowCount) { + break; + } + // Set an upper bound how much we're willing to push before it should flush + // we've set the memory treshold at 100kb, each key is distinct + // It should not go beyond 100k/16 (key+data) + assertTrue(countRowsProduced < 100 * 1024 / 16); + } + + assertTrue(0 < outputRowCount); + } finally { + LlapProxy.setDaemon(false); + } + } + + @Test public void testMultiKeyIntStringInt() throws HiveException { testMultiKey( "sum",