diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index cddf14f..46f0ecd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import javolution.util.FastBitSet; @@ -146,6 +147,10 @@ private transient int countAfterReport; // report or forward private transient int heartbeatInterval; + private transient boolean isTez; + private transient boolean isLlap; + private transient int numExecutors; + /** * Total amount of memory allowed for JVM heap. */ @@ -391,17 +396,20 @@ protected void initializeOp(Configuration hconf) throws HiveException { new KeyWrapperFactory(keyFields, keyObjectInspectors, currentKeyObjectInspectors); newKeys = keyWrapperFactory.getKeyWrapper(); - + isTez = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"); + isLlap = isTez && HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap"); + numExecutors = isLlap ? HiveConf.getIntVar(hconf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS) : 1; firstRow = true; // estimate the number of hash table entries based on the size of each // entry. Since the size of a entry // is not known, estimate that based on the number of entries if (hashAggr) { - computeMaxEntriesHashAggr(hconf); + computeMaxEntriesHashAggr(); } memoryMXBean = ManagementFactory.getMemoryMXBean(); - maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + maxMemory = isTez ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax(); memoryThreshold = this.getConf().getMemoryThreshold(); + LOG.info("isTez: {} isLlap: {} numExecutors: {} maxMemory: {}", isTez, isLlap, numExecutors, maxMemory); } /** @@ -413,9 +421,14 @@ protected void initializeOp(Configuration hconf) throws HiveException { * @return number of entries that can fit in hash table - useful for map-side * aggregation only **/ - private void computeMaxEntriesHashAggr(Configuration hconf) throws HiveException { + private void computeMaxEntriesHashAggr() throws HiveException { float memoryPercentage = this.getConf().getGroupByMemoryUsage(); - maxHashTblMemory = (long) (memoryPercentage * Runtime.getRuntime().maxMemory()); + if (isTez) { + maxHashTblMemory = (long) (memoryPercentage * getConf().getMaxMemoryAvailable()); + } else { + maxHashTblMemory = (long) (memoryPercentage * Runtime.getRuntime().maxMemory()); + } + LOG.info("Max hash table memory: {} bytes", maxHashTblMemory); estimateRowSize(); } @@ -875,6 +888,9 @@ private boolean shouldBeFlushed(KeyWrapper newKeys) { if ((numEntriesHashTable == 0) || ((numEntries % NUMROWSESTIMATESIZE) == 0)) { //check how much memory left memory usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); + // TODO: there is no easy and reliable way to compute the memory used by the executor threads and on-heap cache. + // Assuming the used memory is equally divided among all executors. + usedMemory = isLlap ? usedMemory / numExecutors : usedMemory; rate = (float) usedMemory / (float) maxMemory; if(rate > memoryThreshold){ return true; @@ -957,7 +973,6 @@ else if (o instanceof ByteArrayRef){ * @throws HiveException */ private void flushHashTable(boolean complete) throws HiveException { - countAfterReport = 0; // Currently, the algorithm flushes 10% of the entries - this can be @@ -973,7 +988,7 @@ private void flushHashTable(boolean complete) throws HiveException { hashAggregations.clear(); hashAggregations = null; if (isLogInfoEnabled) { - LOG.info("Hash Table completed flushed"); + LOG.info("Hash Table completed flushed"); } return; } @@ -991,9 +1006,9 @@ private void flushHashTable(boolean complete) throws HiveException { iter.remove(); numDel++; if (numDel * 10 >= oldSize) { - if (isLogInfoEnabled) { - LOG.info("Hash Table flushed: new size = " + hashAggregations.size()); - } + if (isLogInfoEnabled) { + LOG.info("Hash Table flushed: new size = " + hashAggregations.size()); + } return; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index a8ed74c..d294e25 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -330,4 +330,20 @@ public static int countOperatorsUpstream(Operator start, Set> operators, + final long memoryAvailableToTask) { + if (operators == null) { + return; + } + + for (Operator op : operators) { + if (op.getConf() != null) { + op.getConf().setMaxMemoryAvailable(memoryAvailableToTask); + } + if (op.getChildOperators() != null && !op.getChildOperators().isEmpty()) { + setMemoryAvailable(op.getChildOperators(), memoryAvailableToTask); + } + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java index f93b420..2bc83fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java @@ -24,9 +24,11 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.io.BytesWritable; @@ -38,10 +40,15 @@ private TopNHash largestPartition; private boolean prevIndexPartIsNull; private Set indexesWithNullPartKey; + private OperatorDesc conf; + private Configuration hconf; public void initialize( - int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector) { - super.initialize(topN, memUsage, isMapGroupBy, collector); + int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector, final OperatorDesc conf, + final Configuration hconf) { + super.initialize(topN, memUsage, isMapGroupBy, collector, conf, hconf); + this.conf = conf; + this.hconf = hconf; this.isMapGroupBy = isMapGroupBy; this.memUsage = memUsage; partitionHeaps = new HashMap(); @@ -76,7 +83,7 @@ public int _tryStoreKey(HiveKey key, boolean partColsIsNull, int batchIndex) thr TopNHash partHeap = partitionHeaps.get(pk); if ( partHeap == null ) { partHeap = new TopNHash(); - partHeap.initialize(topN, memUsage, isMapGroupBy, collector); + partHeap.initialize(topN, memUsage, isMapGroupBy, collector, conf, hconf); if ( batchIndex >= 0 ) { partHeap.startVectorizedBatch(batchSize); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 9f8acc9..789d2a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -244,7 +244,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { if (limit >= 0 && memUsage > 0) { reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new TopNHash(); - reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this); + reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this, conf, hconf); } useUniformHash = conf.getReducerTraits().contains(UNIFORM); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java index e400368..f3c7c77 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java @@ -19,24 +19,24 @@ package org.apache.hadoop.hive.ql.exec; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; import java.util.Arrays; import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.SortedSet; import java.util.TreeMap; -import java.util.TreeSet; -import com.google.common.collect.MinMaxPriorityQueue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.io.BinaryComparable; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.WritableComparator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.MinMaxPriorityQueue; /** * Stores binary key/value in sorted manner to get top-n key/value @@ -92,7 +92,8 @@ public int compare(Integer o1, Integer o2) { }; public void initialize( - int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector) { + int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector, final OperatorDesc conf, + final Configuration hconf) { assert topN >= 0 && memUsage > 0; assert !this.isEnabled; this.isEnabled = false; @@ -103,11 +104,23 @@ public void initialize( return; // topN == 0 will cause a short-circuit, don't need any initialization } + final boolean isTez = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"); + final boolean isLlap = isTez && HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap"); + final int numExecutors = isLlap ? HiveConf.getIntVar(hconf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS) : 1; + // Used Memory = totalMemory() - freeMemory(); // Total Free Memory = maxMemory() - Used Memory; long totalFreeMemory = Runtime.getRuntime().maxMemory() - Runtime.getRuntime().totalMemory() + Runtime.getRuntime().freeMemory(); + if (isTez) { + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + // TODO: For LLAP, assumption is off-heap cache. + final long memoryUsedPerExecutor = (memoryMXBean.getHeapMemoryUsage().getUsed() / numExecutors); + // this is total free memory available per executor in case of LLAP + totalFreeMemory = conf.getMaxMemoryAvailable() - memoryUsedPerExecutor; + } + // limit * 64 : compensation of arrays for key/value/hashcodes this.threshold = (long) (memUsage * totalFreeMemory) - topN * 64L; if (threshold < 0) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 6f36dfb..955fa80 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.concurrent.Callable; +import org.apache.hadoop.hive.llap.LlapUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -193,6 +194,7 @@ public Object call() { } else { mapOp = new MapOperator(runtimeCtx); } + // Not synchronizing creation of mapOp with an invocation. Check immediately // after creation in case abort has been set. // Relying on the regular flow to clean up the actual operator. i.e. If an exception is @@ -283,6 +285,14 @@ public Object call() { mapOp.passExecContext(execContext); l4j.info(mapOp.dump(0)); + // set memory available for operators + long memoryAvailableToTask = processorContext.getTotalMemoryAvailableToTask(); + if (mapOp.getConf() != null) { + mapOp.getConf().setMaxMemoryAvailable(memoryAvailableToTask); + l4j.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask)); + } + OperatorUtils.setMemoryAvailable(mapOp.getChildOperators(), memoryAvailableToTask); + mapOp.initializeLocalWork(jconf); checkAbortCondition(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index e4c13fb..d80f201 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -27,6 +27,7 @@ import java.util.TreeMap; import java.util.concurrent.Callable; +import org.apache.hadoop.hive.llap.LlapUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -161,6 +162,13 @@ void init( reducer = reduceWork.getReducer(); // Check immediately after reducer is assigned, in cae the abort came in during checkAbortCondition(); + // set memory available for operators + long memoryAvailableToTask = processorContext.getTotalMemoryAvailableToTask(); + if (reducer.getConf() != null) { + reducer.getConf().setMaxMemoryAvailable(memoryAvailableToTask); + l4j.info("Memory available for operators set to {}", LlapUtil.humanReadableByteCount(memoryAvailableToTask)); + } + OperatorUtils.setMemoryAvailable(reducer.getChildOperators(), memoryAvailableToTask); if (numTags > 1) { sources = new ReduceRecordSource[numTags]; mainWorkOIs = new ObjectInspector[numTags]; @@ -199,6 +207,7 @@ void init( checkAbortCondition(); reducer = reduceWork.getReducer(); + // initialize reduce operator tree try { l4j.info(reducer.dump(0)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java index e217bdf..7df9d07 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java @@ -33,6 +33,7 @@ protected transient OpTraits opTraits; protected transient Map opProps; protected long memNeeded = 0; + protected long memAvailable = 0; protected String runtimeStatsTmpDir; @Override @@ -93,6 +94,16 @@ public void setMemoryNeeded(long memNeeded) { this.memNeeded = memNeeded; } + @Override + public long getMaxMemoryAvailable() { + return memAvailable; + } + + @Override + public void setMaxMemoryAvailable(final long memoryAvailble) { + this.memAvailable = memoryAvailble; + } + public String getRuntimeStatsTmpDir() { return runtimeStatsTmpDir; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java index ad620c2..850576c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java @@ -30,6 +30,8 @@ public Map getOpProps(); public long getMemoryNeeded(); public void setMemoryNeeded(long memoryNeeded); + public long getMaxMemoryAvailable(); + public void setMaxMemoryAvailable(long memoryAvailble); public String getRuntimeStatsTmpDir(); public void setRuntimeStatsTmpDir(String runtimeStatsTmpDir); }