diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java index 737d9c3..f72e171 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java @@ -75,7 +75,7 @@ * graph as "llap", which in turn causes them to be submitted to an * llap daemon instead of a regular yarn container. * - * The actual algoritm used is driven by LLAP_EXECUTION_MODE. "all", + * The actual algorithm used is driven by LLAP_EXECUTION_MODE. "all", * "none" and "map" mechanically tag those elements. "auto" tries to * be smarter by looking for suitable vertices. * @@ -114,7 +114,7 @@ public LlapDecisionDispatcher(PhysicalContext pctx, LlapMode mode) { @Override public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) - throws SemanticException { + throws SemanticException { @SuppressWarnings("unchecked") Task currTask = (Task) nd; if (currTask instanceof TezTask) { @@ -127,14 +127,19 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) } private void handleWork(TezWork tezWork, BaseWork work) - throws SemanticException { + throws SemanticException { if (evaluateWork(tezWork, work)) { convertWork(tezWork, work); + } else { + if (mode == all) { + throw new SemanticException("Llap mode is set to all but cannot run work in llap mode. " + + "Set hive.llap.execution.mode = auto or set hive.execution.mode = auto"); + } } } private void convertWork(TezWork tezWork, BaseWork work) - throws SemanticException { + throws SemanticException { if (shouldUber) { // let's see if we can go one step further and just uber this puppy @@ -150,7 +155,7 @@ private void convertWork(TezWork tezWork, BaseWork work) } private boolean evaluateWork(TezWork tezWork, BaseWork work) - throws SemanticException { + throws SemanticException { LOG.info("Evaluating work item: " + work.getName()); @@ -226,6 +231,7 @@ private boolean evaluateWork(TezWork tezWork, BaseWork work) } // couldn't convince you otherwise? well then let's llap. + LOG.info("Can run work " + work.getName() + " in llap mode."); return true; } @@ -249,7 +255,7 @@ private boolean checkExpression(ExprNodeDesc expr) { if (!isBuiltIn) { if (!arePermanentFnsAllowed) { LOG.info("Not a built-in function: " + cur.getExprString() - + " (permanent functions are disabled)"); + + " (permanent functions are disabled)"); return false; } if (!FunctionRegistry.isPermanentFunction(funcDesc)) { @@ -299,39 +305,46 @@ private boolean checkAggregators(Collection aggs) { Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp("No scripts", ScriptOperator.getOperatorName() + "%"), new NodeProcessor() { - @Override - public Object process(Node n, Stack s, NodeProcessorCtx c, - Object... os) { - return new Boolean(false); - } - }); + @Override + public Object process(Node n, Stack s, NodeProcessorCtx c, + Object... os) { + LOG.info("Cannot run operator [" + n + "] in llap mode."); + return new Boolean(false); + } + }); opRules.put(new RuleRegExp("No user code in fil", FilterOperator.getOperatorName() + "%"), new NodeProcessor() { - @Override - public Object process(Node n, Stack s, NodeProcessorCtx c, - Object... os) { - ExprNodeDesc expr = ((FilterOperator)n).getConf().getPredicate(); - return new Boolean(checkExpression(expr)); - } - }); + @Override + public Object process(Node n, Stack s, NodeProcessorCtx c, + Object... os) { + ExprNodeDesc expr = ((FilterOperator)n).getConf().getPredicate(); + Boolean retval = new Boolean(checkExpression(expr)); + LOG.info("Can filter operator [" + n + "] can be run in llap mode? " + retval); + return new Boolean(retval); + } + }); opRules.put(new RuleRegExp("No user code in gby", GroupByOperator.getOperatorName() + "%"), new NodeProcessor() { - @Override - public Object process(Node n, Stack s, NodeProcessorCtx c, - Object... os) { - List aggs = ((GroupByOperator)n).getConf().getAggregators(); - return new Boolean(checkAggregators(aggs)); - } - }); + @Override + public Object process(Node n, Stack s, NodeProcessorCtx c, + Object... os) { + List aggs = ((GroupByOperator)n).getConf().getAggregators(); + Boolean retval = new Boolean(checkAggregators(aggs)); + LOG.info("Can group by operator [" + n + "] be run in llap mode? " + retval); + return new Boolean(retval); + } + }); opRules.put(new RuleRegExp("No user code in select", SelectOperator.getOperatorName() + "%"), new NodeProcessor() { - @Override - public Object process(Node n, Stack s, NodeProcessorCtx c, - Object... os) { - List exprs = ((SelectOperator)n).getConf().getColList(); - return new Boolean(checkExpressions(exprs)); - } - }); + @Override + public Object process(Node n, Stack s, NodeProcessorCtx c, + Object... os) { + List exprs = ((SelectOperator)n).getConf().getColList(); + Boolean retval = new Boolean(checkExpressions(exprs)); + LOG.info("Can select operator [" + n + "] be run in llap mode? " + retval); + return new Boolean(retval); + } + }); return opRules; } @@ -372,10 +385,10 @@ private boolean checkInputsVectorized(MapWork mapWork) { for (String path : mapWork.getPathToPartitionInfo().keySet()) { PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path); List> interfaceList = - Arrays.asList(pd.getInputFileFormatClass().getInterfaces()); + Arrays.asList(pd.getInputFileFormatClass().getInterfaces()); if (!interfaceList.contains(VectorizedInputFormatInterface.class)) { LOG.info("Input format: " + pd.getInputFileFormatClassName() - + ", doesn't provide vectorized input"); + + ", doesn't provide vectorized input"); return false; } }