diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 555343e..79c38c1 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -783,7 +783,9 @@ HIVE_PTF_PARTITION_PERSISTENCE_CLASS("hive.ptf.partition.persistence", "org.apache.hadoop.hive.ql.exec.PTFPersistence$PartitionedByteBasedList"), HIVE_PTF_PARTITION_PERSISTENT_SIZE("hive.ptf.partition.persistence.memsize", - (int) Math.pow(2, (5 + 10 + 10)) ), // 32MB + (int) Math.pow(2, (5 + 10 + 10)) ), // 32MB + + HIVE_OPTIMIZE_TEZ("hive.optimize.tez", false), ; public final String varname; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 458d259..ca48f5e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -112,6 +112,8 @@ public Operator() { id = String.valueOf(seqId.getAndIncrement()); + childOperators = new ArrayList>(); + parentOperators = new ArrayList>(); } public static void resetId() { @@ -131,6 +133,9 @@ public Operator(Reporter reporter) { public void setChildOperators( List> childOperators) { + if (childOperators == null) { + childOperators = new ArrayList>(); + } this.childOperators = childOperators; } @@ -164,6 +169,9 @@ public int getNumChild() { public void setParentOperators( List> parentOperators) { + if (parentOperators == null) { + parentOperators = new ArrayList>(); + } this.parentOperators = parentOperators; } @@ -691,7 +699,7 @@ public void removeChild(Operator child) { int childIndex = childOperators.indexOf(child); assert childIndex != -1; if (childOperators.size() == 1) { - childOperators = null; + setChildOperators(null); } else { childOperators.remove(childIndex); } @@ -740,7 +748,7 @@ public void removeParent(Operator parent) { int parentIndex = parentOperators.indexOf(parent); assert parentIndex != -1; if (parentOperators.size() == 1) { - parentOperators = null; + setParentOperators(null); } else { parentOperators.remove(parentIndex); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index d0807d2..c723994 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask; import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork; import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanTask; @@ -41,6 +42,7 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.StatsWork; +import org.apache.hadoop.hive.ql.plan.TezWork; /** * TaskFactory implementation. @@ -89,6 +91,7 @@ public taskTuple(Class workClass, Class> taskClass) { DependencyCollectionTask.class)); taskvec.add(new taskTuple(PartialScanWork.class, PartialScanTask.class)); + taskvec.add(new taskTuple(TezWork.class, TezTask.class)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java new file mode 100644 index 0000000..cf43743 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.io.Serializable; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.TezWork; + +/** + * GenTezProcContext. GenTezProcContext maintains information + * about the tasks and operators as we walk the operator tree + * to break them into TezTasks. + * + */ +public class GenTezProcContext implements NodeProcessorCtx{ + + public final ParseContext parseContext; + public final HiveConf conf; + public final List> moveTask; + + // rootTasks is the entry point for all generated tasks + public final List> rootTasks; + + public final Set inputs; + public final Set outputs; + + // rootOperators are all the table scan operators in sequence + // of traversal + public final Deque> rootOperators; + + // holds the root of the operator tree we're currently processing + // this could be a table scan, but also a join, ptf, etc (i.e.: + // first operator of a reduce task. + public Operator currentRootOperator; + + // tez task we're currently processing + public TezTask currentTask; + + // last work we've processed (in order to hook it up to the current + // one. + public BaseWork preceedingWork; + + // map that keeps track of the last operator of a task to the work + // that follows it. This is used for connecting them later. + public final Map, BaseWork> leafOperatorToFollowingWork; + + + @SuppressWarnings("unchecked") + public GenTezProcContext(HiveConf conf, ParseContext parseContext, + List> moveTask, List> rootTasks, + Set inputs, Set outputs, Deque> rootOperators) { + + this.conf = conf; + this.parseContext = parseContext; + this.moveTask = moveTask; + this.rootTasks = rootTasks; + this.inputs = inputs; + this.outputs = outputs; + this.currentTask = (TezTask) TaskFactory.get(new TezWork(), conf); + this.leafOperatorToFollowingWork = new HashMap, BaseWork>(); + this.rootOperators = rootOperators; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezTaskWalker.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezTaskWalker.java new file mode 100644 index 0000000..d16fd24 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezTaskWalker.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.util.List; + +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.Node; + +/** + * Walks the operator tree in DFS fashion. + */ +public class GenTezTaskWalker extends DefaultGraphWalker { + + /** + * constructor of the walker - the dispatcher is passed. + * + * @param disp + * the dispatcher to be called for each node visited + */ + public GenTezTaskWalker(Dispatcher disp) { + super(disp); + } + + /** + * Walk the given operator. + * + * @param nd + * operator being walked + */ + @Override + public void walk(Node nd) throws SemanticException { + List children = nd.getChildren(); + + // maintain the stack of operators encountered + opStack.push(nd); + Boolean result = dispatchAndReturn(nd, opStack); + + // move all the children to the front of queue + for (Node ch : children) { + walk(ch); + } + + // done with this operator + opStack.pop(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java new file mode 100644 index 0000000..ecca44c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.util.ArrayList; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.TezWork; + +/** + * GenTezWork separates the operator tree into tez tasks. + * It is called once per leaf operator (operator that forces + * a new execution unit.) and break the operators into work + * and tasks along the way. + */ +public class GenTezWork implements NodeProcessor { + + static final private Log LOG = LogFactory.getLog(GenTezWork.class.getName()); + + @SuppressWarnings("unchecked") + @Override + public Object process(Node nd, Stack stack, + NodeProcessorCtx procContext, Object... nodeOutputs) + throws SemanticException { + + GenTezProcContext context = (GenTezProcContext) procContext; + + // Operator is a file sink or reduce sink. Something that forces + // a new vertex. + Operator operator = (Operator) nd; + + TezWork tezWork = context.currentTask.getWork(); + if (!context.rootTasks.contains(context.currentTask)) { + context.rootTasks.add(context.currentTask); + } + + // root is the start of the operator pipeline we're currently + // packing into a vertex, typically a table scan, union or join + Operator root = context.currentRootOperator; + if (root == null) { + // null means that we're starting with a new table scan + // the graph walker walks the rootOperators in the same + // order so we can just take the next + context.preceedingWork = null; + root = context.rootOperators.pop(); + } + + LOG.debug("Root operator: " + root); + LOG.debug("Leaf operator: " + operator); + + // Right now the work graph is pretty simple. If there is no + // Preceding work we have a root and will generate a map + // vertex. If there is a preceding work we will generate + // a reduce vertex + BaseWork work; + if (context.preceedingWork == null) { + assert root.getParentOperators().isEmpty(); + LOG.debug("Adding map work for " + root); + MapWork mapWork = new MapWork(); + mapWork.getAliasToWork().put("", root); + tezWork.add(mapWork); + work = mapWork; + } else { + assert !root.getParentOperators().isEmpty(); + LOG.debug("Adding reduce work for " + root); + ReduceWork reduceWork = new ReduceWork(); + reduceWork.setReducer(root); + tezWork.add(reduceWork); + tezWork.connect( + context.preceedingWork, + reduceWork); + work = reduceWork; + } + + // We're scanning the operator from table scan to final file sink. + // We're scanning a tree from roots to leaf (this is not technically + // correct, demux and mux operators might form a diamond shape, but + // we will only scan one path and ignore the others, because the + // diamond shape is always contained in a single vertex). The scan + // is depth first and because we remove parents when we pack a pipeline + // into a vertex we will never visit any node twice. But because of that + // we might have a situation where we need to connect 'work' that comes after + // the 'work' we're currently looking at. + // + // Also note: the concept of leaf and root is reversed in hive for historical + // reasons. Roots are data sources, leaves are data sinks. I know. + if (context.leafOperatorToFollowingWork.containsKey(operator)) { + tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator)); + } + + // This is where we cut the tree as described above. We also remember that + // we might have to connect parent work with this work later. + for (Operator parent: new ArrayList>(root.getParentOperators())) { + assert !context.leafOperatorToFollowingWork.containsKey(parent); + context.leafOperatorToFollowingWork.put(parent, work); + LOG.debug("Removing " + parent + " as parent from " + root); + root.removeParent(parent); + } + + // No children means we're at the bottom. If there are more operators to scan + // the next item will be a new root. + if (!operator.getChildOperators().isEmpty()) { + assert operator.getChildOperators().size() == 1; + context.currentRootOperator = operator.getChildOperators().get(0); + context.preceedingWork = work; + } else { + context.currentRootOperator = null; + context.preceedingWork = null; + } + + return null; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java index cca30b7..c1c1da5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java @@ -22,10 +22,10 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,21 +33,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.exec.ColumnStatsTask; import org.apache.hadoop.hive.ql.exec.ConditionalTask; -import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; @@ -61,9 +54,6 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1; import org.apache.hadoop.hive.ql.optimizer.GenMROperator; import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext; @@ -73,298 +63,25 @@ import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3; import org.apache.hadoop.hive.ql.optimizer.GenMRTableScan1; import org.apache.hadoop.hive.ql.optimizer.GenMRUnion1; -import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer; -import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc; -import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; -import org.apache.hadoop.hive.ql.plan.CreateTableDesc; -import org.apache.hadoop.hive.ql.plan.DDLWork; -import org.apache.hadoop.hive.ql.plan.FetchWork; -import org.apache.hadoop.hive.ql.plan.LoadFileDesc; -import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.PlanUtils; -import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.shims.ShimLoader; -public class MapReduceCompiler { +public class MapReduceCompiler extends TaskCompiler { protected final Log LOG = LogFactory.getLog(MapReduceCompiler.class); - private Hive db; - protected LogHelper console; - private HiveConf conf; - public MapReduceCompiler() { } - public void init(HiveConf conf, LogHelper console, Hive db) { - this.conf = conf; - this.db = db; - this.console = console; - } - - @SuppressWarnings({"nls", "unchecked"}) - public void compile(final ParseContext pCtx, final List> rootTasks, - final HashSet inputs, final HashSet outputs) throws SemanticException { - - Context ctx = pCtx.getContext(); - GlobalLimitCtx globalLimitCtx = pCtx.getGlobalLimitCtx(); - QB qb = pCtx.getQB(); - List> mvTask = new ArrayList>(); - - List loadTableWork = pCtx.getLoadTableWork(); - List loadFileWork = pCtx.getLoadFileWork(); - - boolean isCStats = qb.isAnalyzeRewrite(); - - if (pCtx.getFetchTask() != null) { - return; - } - - /* - * In case of a select, use a fetch task instead of a move task. - * If the select is from analyze table column rewrite, don't create a fetch task. Instead create - * a column stats task later. - */ - if (pCtx.getQB().getIsQuery() && !isCStats) { - if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1)) { - throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg()); - } - String cols = loadFileWork.get(0).getColumns(); - String colTypes = loadFileWork.get(0).getColumnTypes(); - - String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); - TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat); - - FetchWork fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(), - resultTab, qb.getParseInfo().getOuterQueryLimit()); - - pCtx.setFetchTask((FetchTask) TaskFactory.get(fetch, conf)); - - // For the FetchTask, the limit optimization requires we fetch all the rows - // in memory and count how many rows we get. It's not practical if the - // limit factor is too big - int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH); - if (globalLimitCtx.isEnable() && globalLimitCtx.getGlobalLimit() > fetchLimit) { - LOG.info("For FetchTask, LIMIT " + globalLimitCtx.getGlobalLimit() + " > " + fetchLimit - + ". Doesn't qualify limit optimiztion."); - globalLimitCtx.disableOpt(); - } - } else if (!isCStats) { - for (LoadTableDesc ltd : loadTableWork) { - Task tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); - mvTask.add(tsk); - // Check to see if we are stale'ing any indexes and auto-update them if we want - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) { - IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf); - try { - List> indexUpdateTasks = indexUpdater - .generateUpdateTasks(); - for (Task updateTask : indexUpdateTasks) { - tsk.addDependentTask(updateTask); - } - } catch (HiveException e) { - console - .printInfo("WARNING: could not auto-update stale indexes, which are not in sync"); - } - } - } - - boolean oneLoadFile = true; - for (LoadFileDesc lfd : loadFileWork) { - if (qb.isCTAS()) { - assert (oneLoadFile); // should not have more than 1 load file for - // CTAS - // make the movetask's destination directory the table's destination. - String location = qb.getTableDesc().getLocation(); - if (location == null) { - // get the table's default location - Table dumpTable; - Path targetPath; - try { - dumpTable = db.newTable(qb.getTableDesc().getTableName()); - if (!db.databaseExists(dumpTable.getDbName())) { - throw new SemanticException("ERROR: The database " + dumpTable.getDbName() - + " does not exist."); - } - Warehouse wh = new Warehouse(conf); - targetPath = wh.getTablePath(db.getDatabase(dumpTable.getDbName()), dumpTable - .getTableName()); - } catch (HiveException e) { - throw new SemanticException(e); - } catch (MetaException e) { - throw new SemanticException(e); - } - - location = targetPath.toString(); - } - lfd.setTargetDir(location); - - oneLoadFile = false; - } - mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false), conf)); - } - } - - // generate map reduce plans - ParseContext tempParseContext = getParseContext(pCtx, rootTasks); - GenMRProcContext procCtx = new GenMRProcContext( - conf, - new HashMap, Task>(), - tempParseContext, mvTask, rootTasks, - new LinkedHashMap, GenMapRedCtx>(), - inputs, outputs); - - // create a walker which walks the tree in a DFS manner while maintaining - // the operator stack. - // The dispatcher generates the plan from the operator tree - Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp(new String("R1"), - TableScanOperator.getOperatorName() + "%"), - new GenMRTableScan1()); - opRules.put(new RuleRegExp(new String("R2"), - TableScanOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), - new GenMRRedSink1()); - opRules.put(new RuleRegExp(new String("R3"), - ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), - new GenMRRedSink2()); - opRules.put(new RuleRegExp(new String("R4"), - FileSinkOperator.getOperatorName() + "%"), - new GenMRFileSink1()); - opRules.put(new RuleRegExp(new String("R5"), - UnionOperator.getOperatorName() + "%"), - new GenMRUnion1()); - opRules.put(new RuleRegExp(new String("R6"), - UnionOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), - new GenMRRedSink3()); - opRules.put(new RuleRegExp(new String("R7"), - MapJoinOperator.getOperatorName() + "%"), - MapJoinFactory.getTableScanMapJoin()); - - // The dispatcher fires the processor corresponding to the closest matching - // rule and passes the context along - Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules, - procCtx); - - GraphWalker ogw = new GenMapRedWalker(disp); - ArrayList topNodes = new ArrayList(); - topNodes.addAll(pCtx.getTopOps().values()); - ogw.startWalking(topNodes, null); - - /* - * If the query was the result of analyze table column compute statistics rewrite, create - * a column stats task instead of a fetch task to persist stats to the metastore. - */ - if (isCStats) { - genColumnStatsTask(qb, loadTableWork, loadFileWork, rootTasks); - } - - // reduce sink does not have any kids - since the plan by now has been - // broken up into multiple - // tasks, iterate over all tasks. - // For each task, go over all operators recursively - for (Task rootTask : rootTasks) { - breakTaskTree(rootTask); - } - - // For each task, set the key descriptor for the reducer - for (Task rootTask : rootTasks) { - GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask); - } - - // If a task contains an operator which instructs bucketizedhiveinputformat - // to be used, please do so - for (Task rootTask : rootTasks) { - setInputFormat(rootTask); - } - - PhysicalContext physicalContext = new PhysicalContext(conf, - getParseContext(pCtx, rootTasks), ctx, rootTasks, pCtx.getFetchTask()); - PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer( - physicalContext, conf); - physicalOptimizer.optimize(); - - // For each operator, generate the counters if needed - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS)) { - for (Task rootTask : rootTasks) { - generateCountersTask(rootTask); - } - } - - decideExecMode(rootTasks, ctx, globalLimitCtx); - - if (qb.isCTAS()) { - // generate a DDL task and make it a dependent task of the leaf - CreateTableDesc crtTblDesc = qb.getTableDesc(); - - crtTblDesc.validate(); - - // Clear the output for CTAS since we don't need the output from the - // mapredWork, the - // DDLWork at the tail of the chain will have the output - outputs.clear(); - - Task crtTblTask = TaskFactory.get(new DDLWork( - inputs, outputs, crtTblDesc), conf); - - // find all leaf tasks and make the DDLTask as a dependent task of all of - // them - HashSet> leaves = new HashSet>(); - getLeafTasks(rootTasks, leaves); - assert (leaves.size() > 0); - for (Task task : leaves) { - if (task instanceof StatsTask) { - // StatsTask require table to already exist - for (Task parentOfStatsTask : task.getParentTasks()) { - parentOfStatsTask.addDependentTask(crtTblTask); - } - for (Task parentOfCrtTblTask : crtTblTask.getParentTasks()) { - parentOfCrtTblTask.removeDependentTask(task); - } - crtTblTask.addDependentTask(task); - } else { - task.addDependentTask(crtTblTask); - } - } - } - - if (globalLimitCtx.isEnable() && pCtx.getFetchTask() != null) { - LOG.info("set least row check for FetchTask: " + globalLimitCtx.getGlobalLimit()); - pCtx.getFetchTask().getWork().setLeastNumRows(globalLimitCtx.getGlobalLimit()); - } - - if (globalLimitCtx.isEnable() && globalLimitCtx.getLastReduceLimitDesc() != null) { - LOG.info("set least row check for LimitDesc: " + globalLimitCtx.getGlobalLimit()); - globalLimitCtx.getLastReduceLimitDesc().setLeastRows(globalLimitCtx.getGlobalLimit()); - List mrTasks = Utilities.getMRTasks(rootTasks); - for (ExecDriver tsk : mrTasks) { - tsk.setRetryCmdWhenFail(true); - } - } - } - - private void setInputFormat(MapWork work, Operator op) { - if (op.isUseBucketizedHiveInputFormat()) { - work.setUseBucketizedHiveInputFormat(true); - return; - } - - if (op.getChildOperators() != null) { - for (Operator childOp : op.getChildOperators()) { - setInputFormat(work, childOp); - } - } - } - // loop over all the tasks recursively - private void setInputFormat(Task task) { + @Override + protected void setInputFormat(Task task) { if (task instanceof ExecDriver) { MapWork work = ((MapredWork) task.getWork()).getMapWork(); HashMap> opMap = work.getAliasToWork(); @@ -388,8 +105,22 @@ private void setInputFormat(Task task) { } } + private void setInputFormat(MapWork work, Operator op) { + if (op.isUseBucketizedHiveInputFormat()) { + work.setUseBucketizedHiveInputFormat(true); + return; + } + + if (op.getChildOperators() != null) { + for (Operator childOp : op.getChildOperators()) { + setInputFormat(work, childOp); + } + } + } + // loop over all the tasks recursively - private void generateCountersTask(Task task) { + @Override + protected void generateCountersTask(Task task) { if (task instanceof ExecDriver) { HashMap> opMap = ((MapredWork) task .getWork()).getMapWork().getAliasToWork(); @@ -437,6 +168,7 @@ private void generateCountersOperator(Operator op) { } } + @Override public ParseContext getParseContext(ParseContext pCtx, List> rootTasks) { return new ParseContext(conf, pCtx.getQB(), pCtx.getParseTree(), pCtx.getOpToPartPruner(), pCtx.getOpToPartList(), pCtx.getTopOps(), @@ -497,67 +229,6 @@ private void breakOperatorTree(Operator topOp) { } /** - * A helper function to generate a column stats task on top of map-red task. The column stats - * task fetches from the output of the map-red task, constructs the column stats object and - * persists it to the metastore. - * - * This method generates a plan with a column stats task on top of map-red task and sets up the - * appropriate metadata to be used during execution. - * - * @param qb - */ - @SuppressWarnings("unchecked") - private void genColumnStatsTask(QB qb, List loadTableWork, - List loadFileWork, List> rootTasks) { - QBParseInfo qbParseInfo = qb.getParseInfo(); - ColumnStatsTask cStatsTask = null; - ColumnStatsWork cStatsWork = null; - FetchWork fetch = null; - String tableName = qbParseInfo.getTableName(); - String partName = qbParseInfo.getPartName(); - List colName = qbParseInfo.getColName(); - List colType = qbParseInfo.getColType(); - boolean isTblLevel = qbParseInfo.isTblLvl(); - - String cols = loadFileWork.get(0).getColumns(); - String colTypes = loadFileWork.get(0).getColumnTypes(); - - String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); - TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat); - - fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(), - resultTab, qb.getParseInfo().getOuterQueryLimit()); - - ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName, partName, - colName, colType, isTblLevel); - cStatsWork = new ColumnStatsWork(fetch, cStatsDesc); - cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf); - rootTasks.add(cStatsTask); - } - - /** - * Find all leaf tasks of the list of root tasks. - */ - private void getLeafTasks(List> rootTasks, - HashSet> leaves) { - - for (Task root : rootTasks) { - getLeafTasks(root, leaves); - } - } - - private void getLeafTasks(Task task, - HashSet> leaves) { - if (task.getDependentTasks() == null) { - if (!leaves.contains(task)) { - leaves.add(task); - } - } else { - getLeafTasks(task.getDependentTasks(), leaves); - } - } - - /** * Make a best guess at trying to find the number of reducers */ private static int getNumberOfReducers(MapredWork mrwork, HiveConf conf) { @@ -572,7 +243,8 @@ private static int getNumberOfReducers(MapredWork mrwork, HiveConf conf) { return conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS); } - private void decideExecMode(List> rootTasks, Context ctx, + @Override + protected void decideExecMode(List> rootTasks, Context ctx, GlobalLimitCtx globalLimitCtx) throws SemanticException { @@ -648,4 +320,74 @@ public boolean accept(Path file) { console.printInfo("Automatically selecting local only mode for query"); } } + + @Override + protected void optimizeTaskPlan(List> rootTasks, + ParseContext pCtx, Context ctx) throws SemanticException { + // reduce sink does not have any kids - since the plan by now has been + // broken up into multiple + // tasks, iterate over all tasks. + // For each task, go over all operators recursively + for (Task rootTask : rootTasks) { + breakTaskTree(rootTask); + } + + + PhysicalContext physicalContext = new PhysicalContext(conf, + getParseContext(pCtx, rootTasks), ctx, rootTasks, pCtx.getFetchTask()); + PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer( + physicalContext, conf); + physicalOptimizer.optimize(); + + } + + @Override + protected void generateTaskTree(List> rootTasks, ParseContext pCtx, + List> mvTask, Set inputs, Set outputs) throws SemanticException { + + // generate map reduce plans + ParseContext tempParseContext = getParseContext(pCtx, rootTasks); + GenMRProcContext procCtx = new GenMRProcContext( + conf, + new HashMap, Task>(), + tempParseContext, mvTask, rootTasks, + new LinkedHashMap, GenMapRedCtx>(), + inputs, outputs); + + // create a walker which walks the tree in a DFS manner while maintaining + // the operator stack. + // The dispatcher generates the plan from the operator tree + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp(new String("R1"), + TableScanOperator.getOperatorName() + "%"), + new GenMRTableScan1()); + opRules.put(new RuleRegExp(new String("R2"), + TableScanOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), + new GenMRRedSink1()); + opRules.put(new RuleRegExp(new String("R3"), + ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), + new GenMRRedSink2()); + opRules.put(new RuleRegExp(new String("R4"), + FileSinkOperator.getOperatorName() + "%"), + new GenMRFileSink1()); + opRules.put(new RuleRegExp(new String("R5"), + UnionOperator.getOperatorName() + "%"), + new GenMRUnion1()); + opRules.put(new RuleRegExp(new String("R6"), + UnionOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), + new GenMRRedSink3()); + opRules.put(new RuleRegExp(new String("R7"), + MapJoinOperator.getOperatorName() + "%"), + MapJoinFactory.getTableScanMapJoin()); + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules, + procCtx); + + GraphWalker ogw = new GenMapRedWalker(disp); + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pCtx.getTopOps().values()); + ogw.startWalking(topNodes, null); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index a6c28ed..ccb8688 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -8328,7 +8328,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { if (!ctx.getExplainLogical()) { // At this point we have the complete operator tree // from which we want to create the map-reduce plan - MapReduceCompiler compiler = new MapReduceCompiler(); + TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf); compiler.init(conf, console, db); compiler.compile(pCtx, rootTasks, inputs, outputs); fetchTask = pCtx.getFetchTask(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java new file mode 100644 index 0000000..dbcac8b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -0,0 +1,372 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.ColumnStatsTask; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc; +import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; +import org.apache.hadoop.hive.ql.plan.CreateTableDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.LoadFileDesc; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; + +/** + * TaskCompiler is a the base class for classes that compile + * operator pipelines into tasks. + */ +public abstract class TaskCompiler { + + protected final Log LOG = LogFactory.getLog(TezCompiler.class); + + protected Hive db; + protected LogHelper console; + protected HiveConf conf; + + public void init(HiveConf conf, LogHelper console, Hive db) { + this.conf = conf; + this.db = db; + this.console = console; + } + + @SuppressWarnings({"nls", "unchecked"}) + public void compile(final ParseContext pCtx, final List> rootTasks, + final HashSet inputs, final HashSet outputs) throws SemanticException { + + Context ctx = pCtx.getContext(); + GlobalLimitCtx globalLimitCtx = pCtx.getGlobalLimitCtx(); + QB qb = pCtx.getQB(); + List> mvTask = new ArrayList>(); + + List loadTableWork = pCtx.getLoadTableWork(); + List loadFileWork = pCtx.getLoadFileWork(); + + boolean isCStats = qb.isAnalyzeRewrite(); + + if (pCtx.getFetchTask() != null) { + return; + } + + /* + * In case of a select, use a fetch task instead of a move task. + * If the select is from analyze table column rewrite, don't create a fetch task. Instead create + * a column stats task later. + */ + if (pCtx.getQB().getIsQuery() && !isCStats) { + if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1)) { + throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg()); + } + String cols = loadFileWork.get(0).getColumns(); + String colTypes = loadFileWork.get(0).getColumnTypes(); + + String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); + TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat); + + FetchWork fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(), + resultTab, qb.getParseInfo().getOuterQueryLimit()); + + pCtx.setFetchTask((FetchTask) TaskFactory.get(fetch, conf)); + + // For the FetchTask, the limit optimization requires we fetch all the rows + // in memory and count how many rows we get. It's not practical if the + // limit factor is too big + int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH); + if (globalLimitCtx.isEnable() && globalLimitCtx.getGlobalLimit() > fetchLimit) { + LOG.info("For FetchTask, LIMIT " + globalLimitCtx.getGlobalLimit() + " > " + fetchLimit + + ". Doesn't qualify limit optimiztion."); + globalLimitCtx.disableOpt(); + } + } else if (!isCStats) { + for (LoadTableDesc ltd : loadTableWork) { + Task tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); + mvTask.add(tsk); + // Check to see if we are stale'ing any indexes and auto-update them if we want + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) { + IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf); + try { + List> indexUpdateTasks = indexUpdater + .generateUpdateTasks(); + for (Task updateTask : indexUpdateTasks) { + tsk.addDependentTask(updateTask); + } + } catch (HiveException e) { + console + .printInfo("WARNING: could not auto-update stale indexes, which are not in sync"); + } + } + } + + boolean oneLoadFile = true; + for (LoadFileDesc lfd : loadFileWork) { + if (qb.isCTAS()) { + assert (oneLoadFile); // should not have more than 1 load file for + // CTAS + // make the movetask's destination directory the table's destination. + String location = qb.getTableDesc().getLocation(); + if (location == null) { + // get the table's default location + Table dumpTable; + Path targetPath; + try { + dumpTable = db.newTable(qb.getTableDesc().getTableName()); + if (!db.databaseExists(dumpTable.getDbName())) { + throw new SemanticException("ERROR: The database " + dumpTable.getDbName() + + " does not exist."); + } + Warehouse wh = new Warehouse(conf); + targetPath = wh.getTablePath(db.getDatabase(dumpTable.getDbName()), dumpTable + .getTableName()); + } catch (HiveException e) { + throw new SemanticException(e); + } catch (MetaException e) { + throw new SemanticException(e); + } + + location = targetPath.toString(); + } + lfd.setTargetDir(location); + + oneLoadFile = false; + } + mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false), conf)); + } + } + + generateTaskTree(rootTasks, pCtx, mvTask, inputs, outputs); + + /* + * If the query was the result of analyze table column compute statistics rewrite, create + * a column stats task instead of a fetch task to persist stats to the metastore. + */ + if (isCStats) { + genColumnStatsTask(qb, loadTableWork, loadFileWork, rootTasks); + } + + // For each task, set the key descriptor for the reducer + for (Task rootTask : rootTasks) { + GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask); + } + + // If a task contains an operator which instructs bucketizedhiveinputformat + // to be used, please do so + for (Task rootTask : rootTasks) { + setInputFormat(rootTask); + } + + optimizeTaskPlan(rootTasks, pCtx, ctx); + + // For each operator, generate the counters if needed + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS)) { + for (Task rootTask : rootTasks) { + generateCountersTask(rootTask); + } + } + + decideExecMode(rootTasks, ctx, globalLimitCtx); + + if (qb.isCTAS()) { + // generate a DDL task and make it a dependent task of the leaf + CreateTableDesc crtTblDesc = qb.getTableDesc(); + + crtTblDesc.validate(); + + // Clear the output for CTAS since we don't need the output from the + // mapredWork, the + // DDLWork at the tail of the chain will have the output + outputs.clear(); + + Task crtTblTask = TaskFactory.get(new DDLWork( + inputs, outputs, crtTblDesc), conf); + + // find all leaf tasks and make the DDLTask as a dependent task of all of + // them + HashSet> leaves = new HashSet>(); + getLeafTasks(rootTasks, leaves); + assert (leaves.size() > 0); + for (Task task : leaves) { + if (task instanceof StatsTask) { + // StatsTask require table to already exist + for (Task parentOfStatsTask : task.getParentTasks()) { + parentOfStatsTask.addDependentTask(crtTblTask); + } + for (Task parentOfCrtTblTask : crtTblTask.getParentTasks()) { + parentOfCrtTblTask.removeDependentTask(task); + } + crtTblTask.addDependentTask(task); + } else { + task.addDependentTask(crtTblTask); + } + } + } + + if (globalLimitCtx.isEnable() && pCtx.getFetchTask() != null) { + LOG.info("set least row check for FetchTask: " + globalLimitCtx.getGlobalLimit()); + pCtx.getFetchTask().getWork().setLeastNumRows(globalLimitCtx.getGlobalLimit()); + } + + if (globalLimitCtx.isEnable() && globalLimitCtx.getLastReduceLimitDesc() != null) { + LOG.info("set least row check for LimitDesc: " + globalLimitCtx.getGlobalLimit()); + globalLimitCtx.getLastReduceLimitDesc().setLeastRows(globalLimitCtx.getGlobalLimit()); + List mrTasks = Utilities.getMRTasks(rootTasks); + for (ExecDriver tsk : mrTasks) { + tsk.setRetryCmdWhenFail(true); + } + } + } + + + /** + * A helper function to generate a column stats task on top of map-red task. The column stats + * task fetches from the output of the map-red task, constructs the column stats object and + * persists it to the metastore. + * + * This method generates a plan with a column stats task on top of map-red task and sets up the + * appropriate metadata to be used during execution. + * + * @param qb + */ + @SuppressWarnings("unchecked") + protected void genColumnStatsTask(QB qb, List loadTableWork, + List loadFileWork, List> rootTasks) { + QBParseInfo qbParseInfo = qb.getParseInfo(); + ColumnStatsTask cStatsTask = null; + ColumnStatsWork cStatsWork = null; + FetchWork fetch = null; + String tableName = qbParseInfo.getTableName(); + String partName = qbParseInfo.getPartName(); + List colName = qbParseInfo.getColName(); + List colType = qbParseInfo.getColType(); + boolean isTblLevel = qbParseInfo.isTblLvl(); + + String cols = loadFileWork.get(0).getColumns(); + String colTypes = loadFileWork.get(0).getColumnTypes(); + + String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); + TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat); + + fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(), + resultTab, qb.getParseInfo().getOuterQueryLimit()); + + ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName, partName, + colName, colType, isTblLevel); + cStatsWork = new ColumnStatsWork(fetch, cStatsDesc); + cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf); + rootTasks.add(cStatsTask); + } + + + /** + * Find all leaf tasks of the list of root tasks. + */ + protected void getLeafTasks(List> rootTasks, + HashSet> leaves) { + + for (Task root : rootTasks) { + getLeafTasks(root, leaves); + } + } + + private void getLeafTasks(Task task, + HashSet> leaves) { + if (task.getDependentTasks() == null) { + if (!leaves.contains(task)) { + leaves.add(task); + } + } else { + getLeafTasks(task.getDependentTasks(), leaves); + } + } + + /* + * Called to transform tasks into local tasks where possible/desirable + */ + protected abstract void decideExecMode(List> rootTasks, Context ctx, + GlobalLimitCtx globalLimitCtx) throws SemanticException; + + /* + * Called to setup counters for the generated tasks + */ + protected abstract void generateCountersTask(Task rootTask); + + /* + * Called after the tasks have been generated to run another round of optimization + */ + protected abstract void optimizeTaskPlan(List> rootTasks, + ParseContext pCtx, Context ctx) throws SemanticException; + + /* + * Called to set the appropriate input format for tasks + */ + protected abstract void setInputFormat(Task rootTask); + + /* + * Called to generate the taks tree from the parse context/operator tree + */ + protected abstract void generateTaskTree(List> rootTasks, ParseContext pCtx, + List> mvTask, Set inputs, Set outputs) throws SemanticException; + + /** + * Create a clone of the parse context + */ + public ParseContext getParseContext(ParseContext pCtx, List> rootTasks) { + return new ParseContext(conf, pCtx.getQB(), pCtx.getParseTree(), + pCtx.getOpToPartPruner(), pCtx.getOpToPartList(), pCtx.getTopOps(), + pCtx.getTopSelOps(), pCtx.getOpParseCtx(), pCtx.getJoinContext(), + pCtx.getSmbMapJoinContext(), pCtx.getTopToTable(), pCtx.getTopToProps(), + pCtx.getFsopToTable(), + pCtx.getLoadTableWork(), pCtx.getLoadFileWork(), pCtx.getContext(), + pCtx.getIdToTableNameMap(), pCtx.getDestTableId(), pCtx.getUCtx(), + pCtx.getListMapJoinOpsNoReducer(), pCtx.getGroupOpToInputTables(), + pCtx.getPrunedPartitions(), pCtx.getOpToSamplePruner(), pCtx.getGlobalLimitCtx(), + pCtx.getNameToSplitSample(), pCtx.getSemanticInputs(), rootTasks, + pCtx.getOpToPartToSkewedPruner(), pCtx.getViewAliasToInput(), + pCtx.getReduceSinkOperatorsAddedByEnforceBucketingSorting(), + pCtx.getQueryProperties()); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java new file mode 100644 index 0000000..3308e18 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.hive.conf.HiveConf; + +/** + * TaskCompilerFactory is a factory class to choose the appropriate + * TaskCompiler. + */ +public class TaskCompilerFactory { + + private TaskCompilerFactory() { + // avoid instantiation + } + + /** + * Returns the appropriate compiler to translate the operator tree + * into executable units. + */ + public static TaskCompiler getCompiler(HiveConf conf) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_OPTIMIZE_TEZ)) { + return new TezCompiler(); + } else { + return new MapReduceCompiler(); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java new file mode 100644 index 0000000..5abedfe --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.TezWork; + +/** + * TezCompiler translates the operator plan into TezTasks. + */ +public class TezCompiler extends TaskCompiler { + + protected final Log LOG = LogFactory.getLog(TezCompiler.class); + + public TezCompiler() { + } + + @Override + protected void generateTaskTree(List> rootTasks, ParseContext pCtx, + List> mvTask, Set inputs, Set outputs) + throws SemanticException { + + // generate map reduce plans + ParseContext tempParseContext = getParseContext(pCtx, rootTasks); + + Deque> deque = new LinkedList>(); + deque.addAll(pCtx.getTopOps().values()); + + GenTezProcContext procCtx = new GenTezProcContext( + conf, tempParseContext, mvTask, rootTasks, inputs, outputs, deque); + + // create a walker which walks the tree in a DFS manner while maintaining + // the operator stack. + // The dispatcher generates the plan from the operator tree + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp(new String("Split Work - ReduceSink"), + ReduceSinkOperator.getOperatorName() + "%"), + new GenTezWork()); + opRules.put(new RuleRegExp(new String("Split Work - FileSink"), + FileSinkOperator.getOperatorName() + "%"), + new GenTezWork()); + + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); + List topNodes = new ArrayList(); + topNodes.addAll(pCtx.getTopOps().values()); + GraphWalker ogw = new GenTezTaskWalker(disp); + ogw.startWalking(topNodes, null); + } + + @Override + protected void setInputFormat(Task task) { + if (task instanceof TezTask) { + TezWork work = ((TezTask)task).getWork(); + Set roots = work.getRoots(); + for (BaseWork w: roots) { + assert w instanceof MapWork; + MapWork mapWork = (MapWork)w; + HashMap> opMap = mapWork.getAliasToWork(); + if (!opMap.isEmpty()) { + for (Operator op : opMap.values()) { + setInputFormat(mapWork, op); + } + } + } + } else if (task instanceof ConditionalTask) { + List> listTasks + = ((ConditionalTask) task).getListTasks(); + for (Task tsk : listTasks) { + setInputFormat(tsk); + } + } + + if (task.getChildTasks() != null) { + for (Task childTask : task.getChildTasks()) { + setInputFormat(childTask); + } + } + } + + private void setInputFormat(MapWork work, Operator op) { + if (op.isUseBucketizedHiveInputFormat()) { + work.setUseBucketizedHiveInputFormat(true); + return; + } + + if (op.getChildOperators() != null) { + for (Operator childOp : op.getChildOperators()) { + setInputFormat(work, childOp); + } + } + } + + @Override + protected void generateCountersTask(Task task) { + if (task instanceof TezTask) { + TezWork work = ((TezTask)task).getWork(); + List workItems = work.getAllWork(); + for (BaseWork w: workItems) { + List> ops = w.getAllOperators(); + for (Operator op: ops) { + generateCountersOperator(op); + } + } + } else if (task instanceof ConditionalTask) { + List> listTasks = ((ConditionalTask) task) + .getListTasks(); + for (Task tsk : listTasks) { + generateCountersTask(tsk); + } + } + + Operator.resetLastEnumUsed(); + + if (task.getChildTasks() == null) { + return; + } + + for (Task childTask : task.getChildTasks()) { + generateCountersTask(childTask); + } + } + + private void generateCountersOperator(Operator op) { + op.assignCounterNameToEnum(); + + if (op.getChildOperators() == null) { + return; + } + + for (Operator child : op.getChildOperators()) { + generateCountersOperator(child); + } + } + + @Override + protected void decideExecMode(List> rootTasks, Context ctx, + GlobalLimitCtx globalLimitCtx) + throws SemanticException { + // currently all Tez work is on the cluster + return; + } + + @Override + protected void optimizeTaskPlan(List> rootTasks, ParseContext pCtx, + Context ctx) throws SemanticException { + // no additional optimization needed + return; + } +}