diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e9cc267..8a311f1 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2011,6 +2011,20 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { LLAP_ORC_CACHE_MAX_SIZE("hive.llap.io.cache.orc.size", 1024L * 1024 * 1024, ""), LLAP_USE_LRFU("hive.llap.io.use.lrfu", false, ""), LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.01f, ""), + LLAP_AUTO_ENFORCE_TREE("hive.llap.auto.enforce.tree", true, + "Enforce that all parents are in llap, before considering vertex"), + LLAP_AUTO_ENFORCE_VECTORIZED("hive.llap.auto.enforce.vectorized", true, + "Enforce that inputs are vectorized, before considering vertex"), + LLAP_AUTO_ENFORCE_STATS("hive.llap.auto.enforce.stats", true, + "Enforce that col stats are available, before considering vertex"), + LLAP_AUTO_MAX_INPUT("hive.llap.auto.max.input.size", 10*1024*1024*1024, + "Check input size, before considering vertex (-1 disables check)"), + LLAP_AUTO_MAX_OUTPUT("hive.llap.auto.max.output.size", 1*1024*1024*1024, + "Check output size, before considering vertex (-1 disables check)"), + LLAP_EXECUTION_MODE("hive.llap.execution.mode", "auto", + new StringSet("auto", "none", "all", "map"), + "Chooses whether query fragments will run in container or in llap"), + SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), @@ -2034,7 +2048,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { SPARK_RPC_CHANNEL_LOG_LEVEL("hive.spark.client.channel.log.level", null, "Channel logging level for remote Spark driver. One of {DEBUG, ERROR, INFO, TRACE, WARN}."), SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5", - "Name of the SASL mechanism to use for authentication."); ; + "Name of the SASL mechanism to use for authentication.") + ; public final String varname; private final String defaultExpr; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java new file mode 100644 index 0000000..9e227e0 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ScriptOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.Statistics; +import org.apache.hadoop.hive.ql.plan.TezWork; + +import static org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode.*; + +/** + * LlapDecider takes care of tagging certain vertices in the execution + * graph as "llap", which in turn causes them to be submitted to an + * llap daemon instead of a regular yarn container. + * + * The actual algoritm used is driven by LLAP_EXECUTION_MODE. "all", + * "none" and "map" mechanically tag those elements. "auto" tries to + * be smarter by looking for suitable vertices. + * + * Regardless of the algorithm used, it's always ensured that there's + * not user code that will be sent to the daemon (ie.: script + * operators, temporary functions, etc) + */ +public class LlapDecider implements PhysicalPlanResolver { + + protected static transient final Log LOG + = LogFactory.getLog(LlapDecider.class); + + private PhysicalContext physicalContext; + + private HiveConf conf; + + public enum LlapMode { + map, // map operators only + all, // all operators + none, // no operators + auto // please hive, choose for me + } + + private LlapMode mode; + + class LlapDecisionDispatcher implements Dispatcher { + + private PhysicalContext pctx; + private HiveConf conf; + + public LlapDecisionDispatcher(PhysicalContext pctx) { + this.pctx = pctx; + this.conf = pctx.getConf(); + } + + @Override + public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) + throws SemanticException { + Task currTask = (Task) nd; + if (currTask instanceof TezTask) { + TezWork work = ((TezTask) currTask).getWork(); + for (BaseWork w: work.getAllWork()) { + handleWork(work, w); + } + } + return null; + } + + private void handleWork(TezWork tezWork, BaseWork work) + throws SemanticException { + if (evaluateWork(tezWork, work)) { + convertWork(tezWork, work); + } + } + + private void convertWork(TezWork tezWork, BaseWork work) + throws SemanticException { + work.setLlapMode(true); + } + + private boolean evaluateWork(TezWork tezWork, BaseWork work) + throws SemanticException { + + LOG.info("Evaluating work item: " + work.getName()); + + // no means no + if (mode == none) { + return false; + } + + // first we check if we *can* run in llap. If we need to use + // user code to do so (script/udf) we don't. + if (evaluateOperators(work)) { + return false; + } + + // --- From here on out we choose whether we *want* to run in llap + + // if mode is all just run it + if (mode == all) { + return true; + } + + // if map mode run iff work is map work + if (mode == map) { + return work instanceof MapWork; + } + + // --- From here we evaluate the auto mode + assert mode == auto; + + // if parents aren't in llap neither should the child + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_AUTO_ENFORCE_TREE) + && !checkParentsInLlap(tezWork, work)) { + LOG.info("Parent not in llap."); + return false; + } + + // only vectorized orc input is cached. so there's a reason to + // limit to that for now. + if (work instanceof MapWork + && HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_AUTO_ENFORCE_VECTORIZED) + && !checkInputsVectorized((MapWork) work)) { + LOG.info("Inputs not vectorized."); + return false; + } + + // check if there's at least some degree of stats available + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_AUTO_ENFORCE_STATS) + && !checkPartialStatsAvailable(work)) { + LOG.info("No column stats available."); + return false; + } + + // now let's take a look at input sizes + long maxInput = HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_AUTO_MAX_INPUT); + long expectedInput = computeInputSize(work); + if (maxInput >= 0 && (expectedInput > maxInput)) { + LOG.info(String.format("Inputs too big (%d > %d)", expectedInput, maxInput)); + return false; + } + + // and finally let's check output sizes + long maxOutput = HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_AUTO_MAX_OUTPUT); + long expectedOutput = computeOutputSize(work); + if (maxOutput >= 0 && (expectedOutput > maxOutput)) { + LOG.info(String.format("Outputs too big (%d > %d)", expectedOutput, maxOutput)); + return false; + } + + // couldn't convince you otherwise? well then let's llap. + return true; + } + + private Map getRules() { + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("No scripts", ScriptOperator.getOperatorName() + ".*"), + new NodeProcessor() { + public Object process(Node n, Stack s, NodeProcessorCtx c, + Object... os) { + return new Boolean(false); + } + }); + return opRules; + } + + private boolean evaluateOperators(BaseWork work) throws SemanticException { + // lets take a look at the operators. we're checking for user + // code in those. we will not run that in llap. + Dispatcher disp = new DefaultRuleDispatcher(null, getRules(), null); + GraphWalker ogw = new DefaultGraphWalker(disp); + + ArrayList topNodes = new ArrayList(); + topNodes.addAll(work.getAllRootOperators()); + + HashMap nodeOutput = new HashMap(); + ogw.startWalking(topNodes, nodeOutput); + + for (Node n : nodeOutput.keySet()) { + if (nodeOutput.get(n) != null) { + if (!((Boolean)nodeOutput.get(n))) { + return false; + } + } + } + return true; + } + + private boolean checkParentsInLlap(TezWork tezWork, BaseWork base) { + for (BaseWork w: tezWork.getParents(base)) { + if (!w.getLlapMode()) { + LOG.info("Not all parents are run in llap"); + return false; + } + } + return true; + } + + private boolean checkInputsVectorized(MapWork mapWork) { + for (String path : mapWork.getPathToPartitionInfo().keySet()) { + PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path); + List> interfaceList = + Arrays.asList(pd.getInputFileFormatClass().getInterfaces()); + if (!interfaceList.contains(VectorizedInputFormatInterface.class)) { + LOG.info("Input format: " + pd.getInputFileFormatClassName() + + ", doesn't provide vectorized input"); + return false; + } + } + return true; + } + + private boolean checkPartialStatsAvailable(BaseWork base) { + for (Operator o: base.getAllRootOperators()) { + if (o.getStatistics().getColumnStatsState() == Statistics.State.NONE) { + return false; + } + } + return true; + } + + private long computeEdgeSize(BaseWork base, boolean input) { + long size = 0; + for (Operator o: (input ? base.getAllRootOperators() : base.getAllLeafOperators())) { + if (o.getStatistics() == null) { + // return worst case if unknown + return Long.MAX_VALUE; + } + + long currSize = o.getStatistics().getDataSize(); + if ((currSize < 0) || ((Long.MAX_VALUE - size) < currSize)) { + // overflow + return Long.MAX_VALUE; + } + size += currSize; + } + return size; + } + + private long computeInputSize(BaseWork base) { + return computeEdgeSize(base, true); + } + + private long computeOutputSize(BaseWork base) { + return computeEdgeSize(base, false); + } + } + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + + this.physicalContext = pctx; + this.conf = pctx.getConf(); + + this.mode = LlapMode.valueOf(HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_EXECUTION_MODE)); + + if (mode == none) { + LOG.info("LLAP disabled."); + return pctx; + } + + // create dispatcher and graph walker + Dispatcher disp = new LlapDecisionDispatcher(pctx); + TaskGraphWalker ogw = new TaskGraphWalker(disp); + + // get all the tasks nodes from root task + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getRootTasks()); + + // begin to walk through the task tree. + ogw.startWalking(topNodes, null); + return pctx; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 1737a34..e1aed9c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -60,6 +60,8 @@ public BaseWork(String name) { // Vectorization. protected Map> allScratchColumnVectorTypeMaps = null; protected Map> allColumnVectorMaps = null; + + protected boolean llapMode = false; protected boolean vectorMode = false; public void setGatheringStats(boolean gatherStats) { @@ -184,6 +186,14 @@ public boolean getVectorMode() { return vectorMode; } + public void setLlapMode(boolean llapMode) { + this.llapMode = llapMode; + } + + public boolean getLlapMode() { + return llapMode; + } + public abstract void configureJobConf(JobConf job); public void setTag(int tag) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index f6616fb..6c64318 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -313,8 +313,17 @@ private void setAliases() { } @Explain(displayName = "Execution mode") - public String getVectorModeOn() { - return vectorMode ? "vectorized" : null; + public String getExecutionMode() { + if (vectorMode) { + if (llapMode) { + return "vectorized, llap"; + } else { + return "vectorized"; + } + } else if (llapMode) { + return "llap"; + } + return null; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index c78184b..598f230 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -149,8 +149,17 @@ public void setTagToValueDesc(final List tagToValueDesc) { } @Explain(displayName = "Execution mode") - public String getVectorModeOn() { - return vectorMode ? "vectorized" : null; + public String getExecutionMode() { + if (vectorMode) { + if (llapMode) { + return "vectorized, llap"; + } else { + return "vectorized"; + } + } else if (llapMode) { + return "llap"; + } + return null; } @Explain(displayName = "Reduce Operator Tree")