diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index 49706b1..0e7a295 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -94,6 +94,7 @@ private void initialize(HiveConf hiveConf) { if (!"none".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVESTAGEIDREARRANGE))) { resolvers.add(new StageIDsRearranger()); } + resolvers.add(new VerifyTopNMemoryUsage()); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/VerifyTopNMemoryUsage.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/VerifyTopNMemoryUsage.java new file mode 100644 index 0000000..e128e5a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/VerifyTopNMemoryUsage.java @@ -0,0 +1,94 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.ppd.OpProcFactory; +import org.apache.hadoop.hive.ql.ppd.OpWalkerInfo; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; + +public class VerifyTopNMemoryUsage implements PhysicalPlanResolver { + static final private Log LOG = LogFactory.getLog(VerifyTopNMemoryUsage.class.getName()); + + static class VerifyTopNMemoryProcessor implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procContext, Object... nodeOutputs) + throws SemanticException { + OpWalkerInfo context = (OpWalkerInfo) procContext; + ReduceSinkOperator sink = (ReduceSinkOperator) nd; + ReduceSinkDesc desc = sink.getConf(); + HiveConf contextConf = context.getParseContext().getConf(); + int containerMemory = contextConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") && + contextConf.getIntVar(HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ? + contextConf.getIntVar(HiveConf.ConfVars.HIVETEZCONTAINERSIZE) : + contextConf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); + float memUsage = desc.getTopNMemoryUsage(); + int limit = desc.getTopN(); + + if (limit >= 0 && memUsage > 0) { + // The following allocations are made at TopNHash.initialize(). + // Verify that the below allocations will most likely fit in the memory. + // this.keys = new byte[topN + 1][]; + // this.values = new byte[topN + 1][]; + // this.hashes = new int[topN + 1]; + // this.distKeyLengths = new int[topN + 1]; + // The distinct key lengths and hashes are integer arrays. + // The size required to allocate each of them is 64 * (limit+1) bytes or 64*2*((limit+1)/(1024*1024) MB + // The estimated size of keys array is (8 * ((limit+1) * (Average Key length in bytes)) / (1024*1024)) MB + // The estimated size of values array is (8 * ((limit+1) * (Average Tuple size in bytes)) / (1024*1024)) MB + + double maxSize = (64*2*(limit+1))/(double)(1024*1024) + (8*(limit+1)*sink.getStatistics().getAvgRowSize())/(double)(1024*1024) + + (8*(limit+1)*sink.getStatistics().getAvgRowSize())/(double)(1024*1024*(desc.getValueCols().size() + desc.getKeyCols().size())); + + // We disable topn optimization for either of the following cases : + // 1. If the maxSize is greater than 90% of the available memory. + // 2. If the maxSize > memUsage * available memory + if (0.9*containerMemory < limit || 0.9*containerMemory < maxSize || memUsage*containerMemory < maxSize) { + if (LOG.isDebugEnabled()) { + LOG.debug("Threshold of containerMemory = " + containerMemory + + " is less than the available memory for TopNHeap initialization, " + maxSize ); + } + LOG.info("Turning off Limit pushdown because of memory constraints"); + desc.setTopNMemoryUsage(-1); + desc.setTopN(0); + } + } + return false; + } + } + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName() + "%"), new VerifyTopNMemoryProcessor()); + OpWalkerInfo opWalkerInfo = new OpWalkerInfo(pctx.getParseContext()); + + Dispatcher disp = new DefaultRuleDispatcher(null, + opRules, opWalkerInfo); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getParseContext().getTopOps().values()); + ogw.startWalking(topNodes, null); + return pctx; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index a60527b..9c530eb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.SerializeFilter; import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger; import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; +import org.apache.hadoop.hive.ql.optimizer.physical.VerifyTopNMemoryUsage; import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; @@ -483,6 +484,7 @@ protected void optimizeTaskPlan(List> rootTasks, Pa && (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) { physicalCtx = new MemoryDecider().resolve(physicalCtx); } + physicalCtx = new VerifyTopNMemoryUsage().resolve(physicalCtx); if ("llap".equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_MODE))) { physicalCtx = new LlapDecider().resolve(physicalCtx); diff --git a/ql/src/test/queries/clientpositive/topn.q b/ql/src/test/queries/clientpositive/topn.q new file mode 100644 index 0000000..11f3d8f --- /dev/null +++ b/ql/src/test/queries/clientpositive/topn.q @@ -0,0 +1,3 @@ +CREATE TABLE `sample_07` ( `code` string , `description` string , `total_emp` int , `salary` int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile; +set hive.limit.pushdown.memory.usage=0.9999999; +select * from sample_07 order by salary LIMIT 999999999; diff --git a/ql/src/test/results/clientpositive/topn.q.out b/ql/src/test/results/clientpositive/topn.q.out new file mode 100644 index 0000000..9584eaa --- /dev/null +++ b/ql/src/test/results/clientpositive/topn.q.out @@ -0,0 +1,16 @@ +PREHOOK: query: CREATE TABLE `sample_07` ( `code` string , `description` string , `total_emp` int , `salary` int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@sample_07 +POSTHOOK: query: CREATE TABLE `sample_07` ( `code` string , `description` string , `total_emp` int , `salary` int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@sample_07 +PREHOOK: query: select * from sample_07 order by salary LIMIT 999999999 +PREHOOK: type: QUERY +PREHOOK: Input: default@sample_07 +#### A masked pattern was here #### +POSTHOOK: query: select * from sample_07 order by salary LIMIT 999999999 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sample_07 +#### A masked pattern was here ####