diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index 49706b1..0e7a295 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -94,6 +94,7 @@ private void initialize(HiveConf hiveConf) { if (!"none".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVESTAGEIDREARRANGE))) { resolvers.add(new StageIDsRearranger()); } + resolvers.add(new VerifyTopNMemoryUsage()); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/VerifyTopNMemoryUsage.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/VerifyTopNMemoryUsage.java new file mode 100644 index 0000000..660e57d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/VerifyTopNMemoryUsage.java @@ -0,0 +1,85 @@ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.ppd.OpProcFactory; +import org.apache.hadoop.hive.ql.ppd.OpWalkerInfo; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; + +public class VerifyTopNMemoryUsage implements PhysicalPlanResolver { + static final private Log LOG = LogFactory.getLog(VerifyTopNMemoryUsage.class.getName()); + + static class VerifyTopNMemoryProcessor implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procContext, Object... nodeOutputs) + throws SemanticException { + OpWalkerInfo context = (OpWalkerInfo) procContext; + ReduceSinkOperator sink = (ReduceSinkOperator) nd; + ReduceSinkDesc desc = sink.getConf(); + HiveConf contextConf = context.getParseContext().getConf(); + int containerMemory = contextConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") && + contextConf.getIntVar(HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ? + contextConf.getIntVar(HiveConf.ConfVars.HIVETEZCONTAINERSIZE) : + contextConf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB); + float memUsage = desc.getTopNMemoryUsage(); + int limit = desc.getTopN(); + + if (limit >= 0 && memUsage > 0) { + long threshold = (long) (memUsage * Runtime.getRuntime().maxMemory()) - limit* 64; + // If this threshold < 0, we would by-pass the topn. + if (threshold < 0) { + return true; + } + + long maxSize = 64*2*((limit+1)/1024) + 8*2*((limit+1)/1024); + + if (0.9*containerMemory < limit || 0.9*containerMemory < maxSize) { + if (LOG.isDebugEnabled()) { + LOG.debug("Threshold of containerMemory = " + containerMemory + + " is less than the available memory for TopNHeap initialization, " + maxSize ); + } + LOG.info("Turning off Limit pushdown because of memory constraints"); + desc.setTopNMemoryUsage(-1); + desc.setTopN(0); + } + } + return false; + } + } + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", ReduceSinkOperator.getOperatorName() + "%"), new VerifyTopNMemoryProcessor()); + OpWalkerInfo opWalkerInfo = new OpWalkerInfo(pctx.getParseContext()); + + Dispatcher disp = new DefaultRuleDispatcher(null, + opRules, opWalkerInfo); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getParseContext().getTopOps().values()); + ogw.startWalking(topNodes, null); + return pctx; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 9503fa8..ec611f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.SerializeFilter; import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger; import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; +import org.apache.hadoop.hive.ql.optimizer.physical.VerifyTopNMemoryUsage; import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; @@ -482,6 +483,7 @@ protected void optimizeTaskPlan(List> rootTasks, Pa && (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) { physicalCtx = new MemoryDecider().resolve(physicalCtx); } + physicalCtx = new VerifyTopNMemoryUsage().resolve(physicalCtx); // This optimizer will serialize all filters that made it to the // table scan operator to avoid having to do it multiple times on diff --git a/ql/src/test/queries/clientpositive/topn.q b/ql/src/test/queries/clientpositive/topn.q new file mode 100644 index 0000000..11f3d8f --- /dev/null +++ b/ql/src/test/queries/clientpositive/topn.q @@ -0,0 +1,3 @@ +CREATE TABLE `sample_07` ( `code` string , `description` string , `total_emp` int , `salary` int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile; +set hive.limit.pushdown.memory.usage=0.9999999; +select * from sample_07 order by salary LIMIT 999999999; diff --git a/ql/src/test/results/clientpositive/topn.q.out b/ql/src/test/results/clientpositive/topn.q.out new file mode 100644 index 0000000..9584eaa --- /dev/null +++ b/ql/src/test/results/clientpositive/topn.q.out @@ -0,0 +1,16 @@ +PREHOOK: query: CREATE TABLE `sample_07` ( `code` string , `description` string , `total_emp` int , `salary` int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@sample_07 +POSTHOOK: query: CREATE TABLE `sample_07` ( `code` string , `description` string , `total_emp` int , `salary` int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@sample_07 +PREHOOK: query: select * from sample_07 order by salary LIMIT 999999999 +PREHOOK: type: QUERY +PREHOOK: Input: default@sample_07 +#### A masked pattern was here #### +POSTHOOK: query: select * from sample_07 order by salary LIMIT 999999999 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sample_07 +#### A masked pattern was here ####