diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java index f93b420..62709d4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFTopNHash.java @@ -174,7 +174,16 @@ public int getVectorizedKeyHashCode(int batchIndex) { TopNHash partHeap = partitionHeaps.get(pk); return partHeap.getVectorizedKeyHashCode(batchIndex); } - + + public static double getMemoryOverhead(int topN, long avgRowSize) { + // This is the sum of memory allocated for the expensive fields : + // 1. TopNHash Object + // 2. The partition heap, Map : + // For each entry in the hashmap, the average cost is approximately 32 bytes. + // There are top N entries. + return TopNHash.getMemoryOverhead(topN, avgRowSize) + (32 * topN)/(double)(1024*1024); + } + static class Key { boolean isNull; int hashCode; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index e33c1d4..6bb7dae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -24,11 +24,8 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Random; -import java.util.concurrent.Future; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -520,6 +517,24 @@ public void collect(byte[] key, byte[] value, int hash) throws IOException { collect(keyWritable, valueWritable); } + /** + * Get the memory requirement for the TopN reducer hash. + * @param topN Number of top rows + * @return The estimated memory overhead for TopNHash in MB. + */ + public double getTopNMemoryRequirement(int topN) { + // If we don't have statistics, get an approximate estimation of the average row size + // by counting the sum of number of key columns and value columns. + long avgRowSize = this.getStatistics() == null ? 8 * (this.conf.getValueCols().size() + + this.conf.getKeyCols().size()) : this.getStatistics().getAvgRowSize(); + + // If the TopNHash is going to be PTF ReduceSink, return the PTF TopNHash memory footprint. + if (conf.isPTFReduceSink()) { + return PTFTopNHash.getMemoryOverhead(topN, avgRowSize); + } + return TopNHash.getMemoryOverhead(topN, avgRowSize); + } + protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException { // Since this is a terminal operator, update counters explicitly - // forward is not called diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java index 484006a..eb4d3ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java @@ -21,12 +21,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.SortedSet; import java.util.TreeMap; -import java.util.TreeSet; - import com.google.common.collect.MinMaxPriorityQueue; import org.apache.commons.logging.Log; @@ -34,7 +29,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.io.BinaryComparable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.WritableComparator; @@ -91,6 +85,31 @@ public int compare(Integer o1, Integer o2) { } }; + /** + * Computes the memory overhead based on the statistics. + * @param avgRowSize Average Row Size + * @param topN number of top rows + * @return Estimated memory Overhead in bytes + */ + public static double getMemoryOverhead(int topN, long avgRowSize) { + // The following allocations are made at TopNHash.initialize(). + // Verify that the below allocations will most likely fit in the memory. + // this.keys = new byte[topN + 1][]; + // this.values = new byte[topN + 1][]; + // this.hashes = new int[topN + 1]; + // this.distKeyLengths = new int[topN + 1]; + // The distinct key lengths and hashes are integer arrays. + // (1) The size required to allocate each of them is 64 * (limit+1) bytes or + // 64*2*((limit+1)/(1024*1024) MB + // (2) The estimated size of keys array is + // (8 * ((limit+1) * (Average Key length in bytes)) / (1024*1024)) MB + // (3) The estimated size of values array is + // (8 * ((limit+1) * (Average Value length in bytes)) / (1024*1024)) MB + // (2) + (3) is (8 * ((limit+1) * (Average Row Size in bytes)) / (1024*1024)) MB + return (double)(64.0*2.0*(double)(topN+1))/((double)(1024.0*1024.0)) + + (8.0*(double)(topN+1)*avgRowSize)/(double)(1024*1024); + } + public void initialize( int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector) { assert topN >= 0 && memUsage > 0; @@ -103,8 +122,20 @@ public void initialize( return; // topN == 0 will cause a short-circuit, don't need any initialization } - // limit * 64 : compensation of arrays for key/value/hashcodes - this.threshold = (long) (memUsage * Runtime.getRuntime().maxMemory()) - topN * 64; + // If the collector is an operator, e.g. reduceSinkOperator, we can use statistics to + // get a better estimate of the memory overhead. + if (collector instanceof Operator) { + double memoryOverHeadInMb = getMemoryOverhead(topN, ((Operator)collector).getStatistics() == null ? + ((Operator)collector).conf.getMemoryNeeded() : + ((Operator)collector). + getStatistics().getAvgRowSize()); + this.threshold = (long) + ((((memUsage * Runtime.getRuntime().maxMemory())/(double)(1024*1024)) - + memoryOverHeadInMb)*1024*1024); + } else { + // limit * 64 : compensation of arrays for key/value/hashcodes + this.threshold = (long) (memUsage * Runtime.getRuntime().maxMemory()) - topN * 64; + } if (threshold < 0) { return; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index 49706b1..0e7a295 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -94,6 +94,7 @@ private void initialize(HiveConf hiveConf) { if (!"none".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVESTAGEIDREARRANGE))) { resolvers.add(new StageIDsRearranger()); } + resolvers.add(new VerifyTopNMemoryUsage()); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/VerifyTopNMemoryUsage.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/VerifyTopNMemoryUsage.java new file mode 100644 index 0000000..480c8d2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/VerifyTopNMemoryUsage.java @@ -0,0 +1,80 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.ppd.OpProcFactory; +import org.apache.hadoop.hive.ql.ppd.OpWalkerInfo; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; + +public class VerifyTopNMemoryUsage implements PhysicalPlanResolver { + static final private Log LOG = LogFactory.getLog(VerifyTopNMemoryUsage.class.getName()); + + static class VerifyTopNMemoryProcessor implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procContext, Object... nodeOutputs) + throws SemanticException { + OpWalkerInfo context = (OpWalkerInfo) procContext; + ReduceSinkOperator sink = (ReduceSinkOperator) nd; + ReduceSinkDesc desc = sink.getConf(); + HiveConf contextConf = context.getParseContext().getConf(); + int containerMemory = contextConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") && + contextConf.getIntVar(HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ? + contextConf.getIntVar(HiveConf.ConfVars.HIVETEZCONTAINERSIZE) : + contextConf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); + float memUsage = desc.getTopNMemoryUsage(); + int limit = desc.getTopN(); + + if (limit >= 0 && memUsage > 0) { + double topNMemRequired = sink.getTopNMemoryRequirement(limit); + + // We disable topN optimization if the maxSize > memUsage * available memory + if (memUsage*containerMemory < topNMemRequired) { + if (LOG.isDebugEnabled()) { + LOG.debug("Threshold of containerMemory = " + containerMemory + + " is less than the available memory for TopNHeap initialization, " + topNMemRequired); + } + LOG.info("Turning off Limit pushdown because of memory constraints"); + desc.setTopNMemoryUsage(-1); + desc.setTopN(0); + } + } + return false; + } + } + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName() + "%"), new VerifyTopNMemoryProcessor()); + OpWalkerInfo opWalkerInfo = new OpWalkerInfo(pctx.getParseContext()); + + Dispatcher disp = new DefaultRuleDispatcher(null, + opRules, opWalkerInfo); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getParseContext().getTopOps().values()); + ogw.startWalking(topNodes, null); + return pctx; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index a60527b..9c530eb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.SerializeFilter; import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger; import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; +import org.apache.hadoop.hive.ql.optimizer.physical.VerifyTopNMemoryUsage; import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; @@ -483,6 +484,7 @@ protected void optimizeTaskPlan(List> rootTasks, Pa && (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) { physicalCtx = new MemoryDecider().resolve(physicalCtx); } + physicalCtx = new VerifyTopNMemoryUsage().resolve(physicalCtx); if ("llap".equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_MODE))) { physicalCtx = new LlapDecider().resolve(physicalCtx); diff --git a/ql/src/test/queries/clientpositive/topn.q b/ql/src/test/queries/clientpositive/topn.q new file mode 100644 index 0000000..11f3d8f --- /dev/null +++ b/ql/src/test/queries/clientpositive/topn.q @@ -0,0 +1,3 @@ +CREATE TABLE `sample_07` ( `code` string , `description` string , `total_emp` int , `salary` int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile; +set hive.limit.pushdown.memory.usage=0.9999999; +select * from sample_07 order by salary LIMIT 999999999; diff --git a/ql/src/test/results/clientpositive/topn.q.out b/ql/src/test/results/clientpositive/topn.q.out new file mode 100644 index 0000000..9584eaa --- /dev/null +++ b/ql/src/test/results/clientpositive/topn.q.out @@ -0,0 +1,16 @@ +PREHOOK: query: CREATE TABLE `sample_07` ( `code` string , `description` string , `total_emp` int , `salary` int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@sample_07 +POSTHOOK: query: CREATE TABLE `sample_07` ( `code` string , `description` string , `total_emp` int , `salary` int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@sample_07 +PREHOOK: query: select * from sample_07 order by salary LIMIT 999999999 +PREHOOK: type: QUERY +PREHOOK: Input: default@sample_07 +#### A masked pattern was here #### +POSTHOOK: query: select * from sample_07 order by salary LIMIT 999999999 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sample_07 +#### A masked pattern was here ####