diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index 29fc99b..fed206b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -1654,18 +1654,6 @@ public static GenericUDTF cloneGenericUDTF(GenericUDTF genericUDTF) { } /** - * Get the UDF class from an exprNodeDesc. Returns null if the exprNodeDesc - * does not contain a UDF class. - */ - private static Class getUDFClassFromExprDesc(ExprNodeDesc desc) { - if (!(desc instanceof ExprNodeGenericFuncDesc)) { - return null; - } - ExprNodeGenericFuncDesc genericFuncDesc = (ExprNodeGenericFuncDesc) desc; - return genericFuncDesc.getGenericUDF().getClass(); - } - - /** * Returns whether a GenericUDF is deterministic or not. */ public static boolean isDeterministic(GenericUDF genericUDF) { @@ -1755,7 +1743,7 @@ public static boolean isOpNot(ExprNodeDesc desc) { * Returns whether the exprNodeDesc is a node of "positive". */ public static boolean isOpPositive(ExprNodeDesc desc) { - Class udfClass = getUDFClassFromExprDesc(desc); + Class udfClass = getGenericUDFClassFromExprDesc(desc); return GenericUDFOPPositive.class == udfClass; } @@ -2040,10 +2028,15 @@ public static boolean isRankingFunction(String name) throws SemanticException { * @return True iff the fnExpr represents a hive built-in function. */ public static boolean isNativeFuncExpr(ExprNodeGenericFuncDesc fnExpr) { - Class udfClass = getUDFClassFromExprDesc(fnExpr); - if (udfClass == null) { - udfClass = getGenericUDFClassFromExprDesc(fnExpr); + Class udfClass = null; + + GenericUDF udf = fnExpr.getGenericUDF(); + if (udf instanceof GenericUDFBridge) { + udfClass = ((GenericUDFBridge) udf).getUdfClass(); + } else { + udfClass = udf.getClass(); } + return nativeUdfs.contains(udfClass); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java index 73ee40d..c372602 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java @@ -18,11 +18,19 @@ package org.apache.hadoop.hive.ql.optimizer.physical; +import static org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode.all; +import static org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode.auto; +import static org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode.map; +import static org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode.none; + import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Deque; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Stack; @@ -30,8 +38,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.FunctionInfo; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ScriptOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; @@ -46,13 +59,16 @@ import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.plan.TezWork; -import static org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode.*; +import com.google.common.base.Joiner; /** * LlapDecider takes care of tagging certain vertices in the execution @@ -133,7 +149,7 @@ private boolean evaluateWork(TezWork tezWork, BaseWork work) // first we check if we *can* run in llap. If we need to use // user code to do so (script/udf) we don't. if (!evaluateOperators(work)) { - LOG.info("some operators cannot be run in llap"); + LOG.info("some operators cannot be run in llap"); return false; } @@ -195,15 +211,109 @@ private boolean evaluateWork(TezWork tezWork, BaseWork work) return true; } + private boolean checkExpression(ExprNodeDesc expr) { + Deque exprs = new LinkedList(); + exprs.add(expr); + while (!exprs.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Checking '%s'",expr.getExprString())); + } + + ExprNodeDesc cur = exprs.removeFirst(); + if (cur == null) continue; + if (cur.getChildren() != null) { + exprs.addAll(cur.getChildren()); + } + + if (cur instanceof ExprNodeGenericFuncDesc) { + // getRequiredJars is currently broken (requires init in some cases before you can call it) + // String[] jars = ((ExprNodeGenericFuncDesc)cur).getGenericUDF().getRequiredJars(); + // if (jars != null && !(jars.length == 0)) { + // LOG.info(String.format("%s requires %s", cur.getExprString(), Joiner.on(", ").join(jars))); + // return false; + // } + + if (!FunctionRegistry.isNativeFuncExpr((ExprNodeGenericFuncDesc)cur)) { + LOG.info("Not a built-in function: " + cur.getExprString()); + return false; + } + } + } + return true; + } + + private boolean checkAggregator(AggregationDesc agg) throws SemanticException { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Checking '%s'", agg.getExprString())); + } + + boolean result = checkExpressions(agg.getParameters()); + FunctionInfo fi = FunctionRegistry.getFunctionInfo(agg.getGenericUDAFName()); + result = result && (fi != null) && fi.isNative(); + if (!result) { + LOG.info("Aggregator is not native: " + agg.getExprString()); + } + return result; + } + + private boolean checkExpressions(Collection exprs) { + boolean result = true; + for (ExprNodeDesc expr: exprs) { + result = result && checkExpression(expr); + } + return result; + } + + private boolean checkAggregators(Collection aggs) { + boolean result = true; + try { + for (AggregationDesc agg: aggs) { + result = result && checkAggregator(agg); + } + } catch (SemanticException e) { + LOG.warn("Exception testing aggregators.",e); + result = false; + } + return result; + } + private Map getRules() { Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp("No scripts", ScriptOperator.getOperatorName() + ".*"), + opRules.put(new RuleRegExp("No scripts", ScriptOperator.getOperatorName() + "%"), new NodeProcessor() { public Object process(Node n, Stack s, NodeProcessorCtx c, Object... os) { return new Boolean(false); } }); + opRules.put(new RuleRegExp("No user code in fil", + FilterOperator.getOperatorName() + "%"), + new NodeProcessor() { + public Object process(Node n, Stack s, NodeProcessorCtx c, + Object... os) { + ExprNodeDesc expr = ((FilterOperator)n).getConf().getPredicate(); + return new Boolean(checkExpression(expr)); + } + }); + opRules.put(new RuleRegExp("No user code in gby", + GroupByOperator.getOperatorName() + "%"), + new NodeProcessor() { + public Object process(Node n, Stack s, NodeProcessorCtx c, + Object... os) { + List aggs = ((GroupByOperator)n).getConf().getAggregators(); + return new Boolean(checkAggregators(aggs)); + } + }); + opRules.put(new RuleRegExp("No user code in select", + SelectOperator.getOperatorName() + "%"), + new NodeProcessor() { + public Object process(Node n, Stack s, NodeProcessorCtx c, + Object... os) { + List exprs = ((SelectOperator)n).getConf().getColList(); + return new Boolean(checkExpressions(exprs)); + } + }); + return opRules; } diff --git ql/src/test/queries/clientpositive/llapdecider.q ql/src/test/queries/clientpositive/llapdecider.q index 1fb0d8d..e8e8770 100644 --- ql/src/test/queries/clientpositive/llapdecider.q +++ ql/src/test/queries/clientpositive/llapdecider.q @@ -49,3 +49,11 @@ EXPLAIN SELECT * from src_orc s1 join src_orc s2 on (s1.key = s2.key) order by s set hive.llap.execution.mode=all; EXPLAIN SELECT * from src_orc s1 join src_orc s2 on (s1.key = s2.key) order by s2.value; + +set hive.llap.execution.mode=auto; + +CREATE TEMPORARY FUNCTION test_udf_get_java_string AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFTestGetJavaString'; + +EXPLAIN SELECT sum(cast(key as int) + 1) from src_orc where cast(key as int) > 1; +EXPLAIN SELECT sum(cast(test_udf_get_java_string(cast(key as string)) as int) + 1) from src_orc where cast(key as int) > 1; +EXPLAIN SELECT sum(cast(key as int) + 1) from src_orc where cast(test_udf_get_java_string(cast(key as string)) as int) > 1; diff --git ql/src/test/results/clientpositive/tez/llapdecider.q.out ql/src/test/results/clientpositive/tez/llapdecider.q.out index cb4443a..a698284 100644 --- ql/src/test/results/clientpositive/tez/llapdecider.q.out +++ ql/src/test/results/clientpositive/tez/llapdecider.q.out @@ -1011,3 +1011,182 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: CREATE TEMPORARY FUNCTION test_udf_get_java_string AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFTestGetJavaString' +PREHOOK: type: CREATEFUNCTION +PREHOOK: Output: test_udf_get_java_string +POSTHOOK: query: CREATE TEMPORARY FUNCTION test_udf_get_java_string AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFTestGetJavaString' +POSTHOOK: type: CREATEFUNCTION +POSTHOOK: Output: test_udf_get_java_string +PREHOOK: query: EXPLAIN SELECT sum(cast(key as int) + 1) from src_orc where cast(key as int) > 1 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT sum(cast(key as int) + 1) from src_orc where cast(key as int) > 1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src_orc + Statistics: Num rows: 500 Data size: 88000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (UDFToInteger(key) > 1) (type: boolean) + Statistics: Num rows: 166 Data size: 14442 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: (UDFToInteger(key) + 1) (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 166 Data size: 14442 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: sum(_col0) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Execution mode: llap + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: EXPLAIN SELECT sum(cast(test_udf_get_java_string(cast(key as string)) as int) + 1) from src_orc where cast(key as int) > 1 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT sum(cast(test_udf_get_java_string(cast(key as string)) as int) + 1) from src_orc where cast(key as int) > 1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src_orc + Statistics: Num rows: 500 Data size: 88000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (UDFToInteger(key) > 1) (type: boolean) + Statistics: Num rows: 166 Data size: 14442 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: (UDFToInteger(GenericUDFTestGetJavaString(key)) + 1) (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 166 Data size: 14442 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: sum(_col0) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: EXPLAIN SELECT sum(cast(key as int) + 1) from src_orc where cast(test_udf_get_java_string(cast(key as string)) as int) > 1 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT sum(cast(key as int) + 1) from src_orc where cast(test_udf_get_java_string(cast(key as string)) as int) > 1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src_orc + Statistics: Num rows: 500 Data size: 88000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (UDFToInteger(GenericUDFTestGetJavaString(key)) > 1) (type: boolean) + Statistics: Num rows: 166 Data size: 14442 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: (UDFToInteger(key) + 1) (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 166 Data size: 14442 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: sum(_col0) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink +