diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java new file mode 100644 index 0000000..13951fc --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java @@ -0,0 +1,632 @@ +package org.apache.hadoop.hive.ql.parse; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.ColumnStatsTask; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1; +import org.apache.hadoop.hive.ql.optimizer.GenMROperator; +import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext; +import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx; +import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1; +import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2; +import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3; +import org.apache.hadoop.hive.ql.optimizer.GenMRTableScan1; +import org.apache.hadoop.hive.ql.optimizer.GenMRUnion1; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory; +import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; +import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer; +import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc; +import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; +import org.apache.hadoop.hive.ql.plan.CreateTableDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.LoadFileDesc; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.shims.ShimLoader; + +public class MapReduceCompiler { + + protected final Log LOG = LogFactory.getLog(MapReduceCompiler.class); + private Hive db; + protected LogHelper console; + private HiveConf conf; + + + public MapReduceCompiler() { + } + + public void init(HiveConf conf, LogHelper console, Hive db) { + this.conf = conf; + this.db = db; + this.console = console; + } + + @SuppressWarnings({"nls", "unchecked"}) + public void compile(final ParseContext pCtx, final List> rootTasks, + final HashSet inputs, final HashSet outputs) throws SemanticException { + + Context ctx = pCtx.getContext(); + GlobalLimitCtx globalLimitCtx = pCtx.getGlobalLimitCtx(); + QB qb = pCtx.getQB(); + List> mvTask = new ArrayList>(); + + List loadTableWork = pCtx.getLoadTableWork(); + List loadFileWork = pCtx.getLoadFileWork(); + + boolean isCStats = qb.isAnalyzeRewrite(); + + if (pCtx.getFetchTask() != null) { + return; + } + + /* + * In case of a select, use a fetch task instead of a move task. + * If the select is from analyze table column rewrite, don't create a fetch task. Instead create + * a column stats task later. + */ + if (pCtx.getQB().getIsQuery() && !isCStats) { + if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1)) { + throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg()); + } + String cols = loadFileWork.get(0).getColumns(); + String colTypes = loadFileWork.get(0).getColumnTypes(); + + String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); + TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat); + + FetchWork fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(), + resultTab, qb.getParseInfo().getOuterQueryLimit()); + + pCtx.setFetchTask((FetchTask) TaskFactory.get(fetch, conf)); + + // For the FetchTask, the limit optimization requires we fetch all the rows + // in memory and count how many rows we get. It's not practical if the + // limit factor is too big + int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH); + if (globalLimitCtx.isEnable() && globalLimitCtx.getGlobalLimit() > fetchLimit) { + LOG.info("For FetchTask, LIMIT " + globalLimitCtx.getGlobalLimit() + " > " + fetchLimit + + ". Doesn't qualify limit optimiztion."); + globalLimitCtx.disableOpt(); + } + } else if (!isCStats) { + for (LoadTableDesc ltd : loadTableWork) { + Task tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); + mvTask.add(tsk); + // Check to see if we are stale'ing any indexes and auto-update them if we want + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) { + IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf); + try { + List> indexUpdateTasks = indexUpdater + .generateUpdateTasks(); + for (Task updateTask : indexUpdateTasks) { + tsk.addDependentTask(updateTask); + } + } catch (HiveException e) { + console + .printInfo("WARNING: could not auto-update stale indexes, which are not in sync"); + } + } + } + + boolean oneLoadFile = true; + for (LoadFileDesc lfd : loadFileWork) { + if (qb.isCTAS()) { + assert (oneLoadFile); // should not have more than 1 load file for + // CTAS + // make the movetask's destination directory the table's destination. + String location = qb.getTableDesc().getLocation(); + if (location == null) { + // get the table's default location + Table dumpTable; + Path targetPath; + try { + dumpTable = db.newTable(qb.getTableDesc().getTableName()); + if (!db.databaseExists(dumpTable.getDbName())) { + throw new SemanticException("ERROR: The database " + dumpTable.getDbName() + + " does not exist."); + } + Warehouse wh = new Warehouse(conf); + targetPath = wh.getTablePath(db.getDatabase(dumpTable.getDbName()), dumpTable + .getTableName()); + } catch (HiveException e) { + throw new SemanticException(e); + } catch (MetaException e) { + throw new SemanticException(e); + } + + location = targetPath.toString(); + } + lfd.setTargetDir(location); + + oneLoadFile = false; + } + mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false), conf)); + } + } + + // generate map reduce plans + ParseContext tempParseContext = getParseContext(pCtx, rootTasks); + GenMRProcContext procCtx = new GenMRProcContext( + conf, + new HashMap, Task>(), + new ArrayList>(), tempParseContext, + mvTask, rootTasks, + new LinkedHashMap, GenMapRedCtx>(), + inputs, outputs); + + // create a walker which walks the tree in a DFS manner while maintaining + // the operator stack. + // The dispatcher generates the plan from the operator tree + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp(new String("R1"), + TableScanOperator.getOperatorName() + "%"), + new GenMRTableScan1()); + opRules.put(new RuleRegExp(new String("R2"), + TableScanOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), + new GenMRRedSink1()); + opRules.put(new RuleRegExp(new String("R3"), + ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), + new GenMRRedSink2()); + opRules.put(new RuleRegExp(new String("R4"), + FileSinkOperator.getOperatorName() + "%"), + new GenMRFileSink1()); + opRules.put(new RuleRegExp(new String("R5"), + UnionOperator.getOperatorName() + "%"), + new GenMRUnion1()); + opRules.put(new RuleRegExp(new String("R6"), + UnionOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), + new GenMRRedSink3()); + opRules.put(new RuleRegExp(new String("R7"), + MapJoinOperator.getOperatorName() + "%"), + MapJoinFactory.getTableScanMapJoin()); + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules, + procCtx); + + GraphWalker ogw = new GenMapRedWalker(disp); + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pCtx.getTopOps().values()); + ogw.startWalking(topNodes, null); + + /* + * If the query was the result of analyze table column compute statistics rewrite, create + * a column stats task instead of a fetch task to persist stats to the metastore. + */ + if (isCStats) { + genColumnStatsTask(qb, loadTableWork, loadFileWork, rootTasks); + } + + // reduce sink does not have any kids - since the plan by now has been + // broken up into multiple + // tasks, iterate over all tasks. + // For each task, go over all operators recursively + for (Task rootTask : rootTasks) { + breakTaskTree(rootTask); + } + + // For each task, set the key descriptor for the reducer + for (Task rootTask : rootTasks) { + GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask); + } + + // If a task contains an operator which instructs bucketizedhiveinputformat + // to be used, please do so + for (Task rootTask : rootTasks) { + setInputFormat(rootTask); + } + + PhysicalContext physicalContext = new PhysicalContext(conf, + getParseContext(pCtx, rootTasks), ctx, rootTasks, pCtx.getFetchTask()); + PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer( + physicalContext, conf); + physicalOptimizer.optimize(); + + // For each operator, generate the counters if needed + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS)) { + for (Task rootTask : rootTasks) { + generateCountersTask(rootTask); + } + } + + decideExecMode(rootTasks, ctx, globalLimitCtx); + + if (qb.isCTAS()) { + // generate a DDL task and make it a dependent task of the leaf + CreateTableDesc crtTblDesc = qb.getTableDesc(); + + crtTblDesc.validate(); + + // Clear the output for CTAS since we don't need the output from the + // mapredWork, the + // DDLWork at the tail of the chain will have the output + outputs.clear(); + + Task crtTblTask = TaskFactory.get(new DDLWork( + inputs, outputs, crtTblDesc), conf); + + // find all leaf tasks and make the DDLTask as a dependent task of all of + // them + HashSet> leaves = new HashSet>(); + getLeafTasks(rootTasks, leaves); + assert (leaves.size() > 0); + for (Task task : leaves) { + if (task instanceof StatsTask) { + // StatsTask require table to already exist + for (Task parentOfStatsTask : task.getParentTasks()) { + parentOfStatsTask.addDependentTask(crtTblTask); + } + for (Task parentOfCrtTblTask : crtTblTask.getParentTasks()) { + parentOfCrtTblTask.removeDependentTask(task); + } + crtTblTask.addDependentTask(task); + } else { + task.addDependentTask(crtTblTask); + } + } + } + + if (globalLimitCtx.isEnable() && pCtx.getFetchTask() != null) { + LOG.info("set least row check for FetchTask: " + globalLimitCtx.getGlobalLimit()); + pCtx.getFetchTask().getWork().setLeastNumRows(globalLimitCtx.getGlobalLimit()); + } + + if (globalLimitCtx.isEnable() && globalLimitCtx.getLastReduceLimitDesc() != null) { + LOG.info("set least row check for LimitDesc: " + globalLimitCtx.getGlobalLimit()); + globalLimitCtx.getLastReduceLimitDesc().setLeastRows(globalLimitCtx.getGlobalLimit()); + List mrTasks = Utilities.getMRTasks(rootTasks); + for (ExecDriver tsk : mrTasks) { + tsk.setRetryCmdWhenFail(true); + } + } + } + + private void setInputFormat(MapredWork work, Operator op) { + if (op.isUseBucketizedHiveInputFormat()) { + work.setUseBucketizedHiveInputFormat(true); + return; + } + + if (op.getChildOperators() != null) { + for (Operator childOp : op.getChildOperators()) { + setInputFormat(work, childOp); + } + } + } + + // loop over all the tasks recursively + private void setInputFormat(Task task) { + if (task instanceof ExecDriver) { + MapredWork work = (MapredWork) task.getWork(); + HashMap> opMap = work.getAliasToWork(); + if (!opMap.isEmpty()) { + for (Operator op : opMap.values()) { + setInputFormat(work, op); + } + } + } else if (task instanceof ConditionalTask) { + List> listTasks + = ((ConditionalTask) task).getListTasks(); + for (Task tsk : listTasks) { + setInputFormat(tsk); + } + } + + if (task.getChildTasks() != null) { + for (Task childTask : task.getChildTasks()) { + setInputFormat(childTask); + } + } + } + + // loop over all the tasks recursively + private void generateCountersTask(Task task) { + if (task instanceof ExecDriver) { + HashMap> opMap = ((MapredWork) task + .getWork()).getAliasToWork(); + if (!opMap.isEmpty()) { + for (Operator op : opMap.values()) { + generateCountersOperator(op); + } + } + + Operator reducer = ((MapredWork) task.getWork()) + .getReducer(); + if (reducer != null) { + LOG.info("Generating counters for operator " + reducer); + generateCountersOperator(reducer); + } + } else if (task instanceof ConditionalTask) { + List> listTasks = ((ConditionalTask) task) + .getListTasks(); + for (Task tsk : listTasks) { + generateCountersTask(tsk); + } + } + + // Start the counters from scratch - a hack for hadoop 17. + Operator.resetLastEnumUsed(); + + if (task.getChildTasks() == null) { + return; + } + + for (Task childTask : task.getChildTasks()) { + generateCountersTask(childTask); + } + } + + private void generateCountersOperator(Operator op) { + op.assignCounterNameToEnum(); + + if (op.getChildOperators() == null) { + return; + } + + for (Operator child : op.getChildOperators()) { + generateCountersOperator(child); + } + } + + public ParseContext getParseContext(ParseContext pCtx, List> rootTasks) { + return new ParseContext(conf, pCtx.getQB(), pCtx.getParseTree(), + pCtx.getOpToPartPruner(), pCtx.getOpToPartList(), pCtx.getTopOps(), + pCtx.getTopSelOps(), pCtx.getOpParseCtx(), pCtx.getJoinContext(), + pCtx.getSmbMapJoinContext(), pCtx.getTopToTable(), pCtx.getFsopToTable(), + pCtx.getLoadTableWork(), pCtx.getLoadFileWork(), pCtx.getContext(), + pCtx.getIdToTableNameMap(), pCtx.getDestTableId(), pCtx.getUCtx(), + pCtx.getListMapJoinOpsNoReducer(), pCtx.getGroupOpToInputTables(), + pCtx.getPrunedPartitions(), pCtx.getOpToSamplePruner(), pCtx.getGlobalLimitCtx(), + pCtx.getNameToSplitSample(), pCtx.getSemanticInputs(), rootTasks, + pCtx.getOpToPartToSkewedPruner(), pCtx.getViewAliasToInput(), + pCtx.getReduceSinkOperatorsAddedByEnforceBucketingSorting(), + pCtx.getQueryProperties()); + } + + // loop over all the tasks recursively + private void breakTaskTree(Task task) { + + if (task instanceof ExecDriver) { + HashMap> opMap = ((MapredWork) task + .getWork()).getAliasToWork(); + if (!opMap.isEmpty()) { + for (Operator op : opMap.values()) { + breakOperatorTree(op); + } + } + } else if (task instanceof ConditionalTask) { + List> listTasks = ((ConditionalTask) task) + .getListTasks(); + for (Task tsk : listTasks) { + breakTaskTree(tsk); + } + } + + if (task.getChildTasks() == null) { + return; + } + + for (Task childTask : task.getChildTasks()) { + breakTaskTree(childTask); + } + } + + // loop over all the operators recursively + private void breakOperatorTree(Operator topOp) { + if (topOp instanceof ReduceSinkOperator) { + topOp.setChildOperators(null); + } + + if (topOp.getChildOperators() == null) { + return; + } + + for (Operator op : topOp.getChildOperators()) { + breakOperatorTree(op); + } + } + + /** + * A helper function to generate a column stats task on top of map-red task. The column stats + * task fetches from the output of the map-red task, constructs the column stats object and + * persists it to the metastore. + * + * This method generates a plan with a column stats task on top of map-red task and sets up the + * appropriate metadata to be used during execution. + * + * @param qb + */ + @SuppressWarnings("unchecked") + private void genColumnStatsTask(QB qb, List loadTableWork, + List loadFileWork, List> rootTasks) { + QBParseInfo qbParseInfo = qb.getParseInfo(); + ColumnStatsTask cStatsTask = null; + ColumnStatsWork cStatsWork = null; + FetchWork fetch = null; + String tableName = qbParseInfo.getTableName(); + String partName = qbParseInfo.getPartName(); + List colName = qbParseInfo.getColName(); + List colType = qbParseInfo.getColType(); + boolean isTblLevel = qbParseInfo.isTblLvl(); + + String cols = loadFileWork.get(0).getColumns(); + String colTypes = loadFileWork.get(0).getColumnTypes(); + + String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); + TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat); + + fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(), + resultTab, qb.getParseInfo().getOuterQueryLimit()); + + ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName, partName, + colName, colType, isTblLevel); + cStatsWork = new ColumnStatsWork(fetch, cStatsDesc); + cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf); + rootTasks.add(cStatsTask); + } + + /** + * Find all leaf tasks of the list of root tasks. + */ + private void getLeafTasks(List> rootTasks, + HashSet> leaves) { + + for (Task root : rootTasks) { + getLeafTasks(root, leaves); + } + } + + private void getLeafTasks(Task task, + HashSet> leaves) { + if (task.getDependentTasks() == null) { + if (!leaves.contains(task)) { + leaves.add(task); + } + } else { + getLeafTasks(task.getDependentTasks(), leaves); + } + } + + /** + * Make a best guess at trying to find the number of reducers + */ + private static int getNumberOfReducers(MapredWork mrwork, HiveConf conf) { + if (mrwork.getReducer() == null) { + return 0; + } + + if (mrwork.getNumReduceTasks() >= 0) { + return mrwork.getNumReduceTasks(); + } + + return conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS); + } + + private void decideExecMode(List> rootTasks, Context ctx, + GlobalLimitCtx globalLimitCtx) + throws SemanticException { + + // bypass for explain queries for now + if (ctx.getExplain()) { + return; + } + + // user has told us to run in local mode or doesn't want auto-local mode + if (ctx.isLocalOnlyExecutionMode() || + !conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) { + return; + } + + final Context lCtx = ctx; + PathFilter p = new PathFilter() { + public boolean accept(Path file) { + return !lCtx.isMRTmpFileURI(file.toUri().getPath()); + } + }; + List mrtasks = Utilities.getMRTasks(rootTasks); + + // map-reduce jobs will be run locally based on data size + // first find out if any of the jobs needs to run non-locally + boolean hasNonLocalJob = false; + for (ExecDriver mrtask : mrtasks) { + try { + ContentSummary inputSummary = Utilities.getInputSummary + (ctx, (MapredWork) mrtask.getWork(), p); + int numReducers = getNumberOfReducers(mrtask.getWork(), conf); + + long estimatedInput; + + if (globalLimitCtx != null && globalLimitCtx.isEnable()) { + // If the global limit optimization is triggered, we will + // estimate input data actually needed based on limit rows. + // estimated Input = (num_limit * max_size_per_row) * (estimated_map + 2) + // + long sizePerRow = HiveConf.getLongVar(conf, + HiveConf.ConfVars.HIVELIMITMAXROWSIZE); + estimatedInput = globalLimitCtx.getGlobalLimit() * sizePerRow; + long minSplitSize = HiveConf.getLongVar(conf, + HiveConf.ConfVars.MAPREDMINSPLITSIZE); + long estimatedNumMap = inputSummary.getLength() / minSplitSize + 1; + estimatedInput = estimatedInput * (estimatedNumMap + 1); + } else { + estimatedInput = inputSummary.getLength(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Task: " + mrtask.getId() + ", Summary: " + + inputSummary.getLength() + "," + inputSummary.getFileCount() + "," + + numReducers + ", estimated Input: " + estimatedInput); + } + + if (MapRedTask.isEligibleForLocalMode(conf, numReducers, + estimatedInput, inputSummary.getFileCount()) != null) { + hasNonLocalJob = true; + break; + } else { + mrtask.setLocalMode(true); + } + } catch (IOException e) { + throw new SemanticException(e); + } + } + + if (!hasNonLocalJob) { + // Entire query can be run locally. + // Save the current tracker value and restore it when done. + ctx.setOriginalTracker(ShimLoader.getHadoopShims().getJobLauncherRpcAddress(conf)); + ShimLoader.getHadoopShims().setJobLauncherRpcAddress(conf, "local"); + console.printInfo("Automatically selecting local only mode for query"); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 0bad4be..142c974 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.parse; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; @@ -38,9 +37,7 @@ import org.antlr.runtime.tree.TreeWizard; import org.antlr.runtime.tree.TreeWizard.ContextVisitor; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ObjectPair; @@ -51,21 +48,16 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; -import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryProperties; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ColumnInfo; -import org.apache.hadoop.hive.ql.exec.ColumnStatsTask; -import org.apache.hadoop.hive.ql.exec.ConditionalTask; -import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.RecordReader; @@ -73,7 +65,6 @@ import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; -import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -81,21 +72,15 @@ import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo; -import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; -import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; -import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; -import org.apache.hadoop.hive.ql.lib.NodeProcessor; -import org.apache.hadoop.hive.ql.lib.Rule; -import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.metadata.DummyPartition; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -104,20 +89,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; -import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1; -import org.apache.hadoop.hive.ql.optimizer.GenMROperator; -import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext; -import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx; -import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1; -import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2; -import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3; -import org.apache.hadoop.hive.ql.optimizer.GenMRTableScan1; -import org.apache.hadoop.hive.ql.optimizer.GenMRUnion1; -import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; -import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory; import org.apache.hadoop.hive.ql.optimizer.Optimizer; -import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; -import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec.SpecType; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression; @@ -139,8 +111,6 @@ import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec; import org.apache.hadoop.hive.ql.plan.AggregationDesc; -import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc; -import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc; import org.apache.hadoop.hive.ql.plan.CreateViewDesc; @@ -154,7 +124,6 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc; import org.apache.hadoop.hive.ql.plan.ExtractDesc; -import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; @@ -170,8 +139,6 @@ import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; -import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PTFDesc; import org.apache.hadoop.hive.ql.plan.PTFDesc.OrderExpressionDef; @@ -205,7 +172,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.InputFormat; /** @@ -885,7 +851,7 @@ public boolean doPhase1(ASTNode ast, QB qb, Phase1Ctx ctx_1) ErrorMsg.ORDERBY_DISTRIBUTEBY_CONFLICT.getMsg())); } break; - + case HiveParser.TOK_SORTBY: // Get the sort by aliases - these are aliased to the entries in the // select list @@ -8224,434 +8190,6 @@ private void LVmergeRowResolvers(RowResolver source, RowResolver dest, } } - /** - * A helper function to generate a column stats task on top of map-red task. The column stats - * task fetches from the output of the map-red task, constructs the column stats object and - * persists it to the metastore. - * - * This method generates a plan with a column stats task on top of map-red task and sets up the - * appropriate metadata to be used during execution. - * - * @param qb - */ - private void genColumnStatsTask(QB qb) { - QBParseInfo qbParseInfo = qb.getParseInfo(); - ColumnStatsTask cStatsTask = null; - ColumnStatsWork cStatsWork = null; - FetchWork fetch = null; - String tableName = qbParseInfo.getTableName(); - String partName = qbParseInfo.getPartName(); - List colName = qbParseInfo.getColName(); - List colType = qbParseInfo.getColType(); - boolean isTblLevel = qbParseInfo.isTblLvl(); - - String cols = loadFileWork.get(0).getColumns(); - String colTypes = loadFileWork.get(0).getColumnTypes(); - - String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); - TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat); - - fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(), - resultTab, qb.getParseInfo().getOuterQueryLimit()); - - ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName, partName, - colName, colType, isTblLevel); - cStatsWork = new ColumnStatsWork(fetch, cStatsDesc); - cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf); - rootTasks.add(cStatsTask); - } - - @SuppressWarnings("nls") - private void genMapRedTasks(ParseContext pCtx) throws SemanticException { - boolean isCStats = qb.isAnalyzeRewrite(); - - if (pCtx.getFetchTask() != null) { - // replaced by single fetch task - initParseCtx(pCtx); - return; - } - - initParseCtx(pCtx); - List> mvTask = new ArrayList>(); - - /* - * In case of a select, use a fetch task instead of a move task. - * If the select is from analyze table column rewrite, don't create a fetch task. Instead create - * a column stats task later. - */ - if (qb.getIsQuery() && !isCStats) { - if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1)) { - throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg()); - } - String cols = loadFileWork.get(0).getColumns(); - String colTypes = loadFileWork.get(0).getColumnTypes(); - - String resFileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); - TableDesc resultTab = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, resFileFormat); - - FetchWork fetch = new FetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(), - resultTab, qb.getParseInfo().getOuterQueryLimit()); - - FetchTask fetchTask = (FetchTask) TaskFactory.get(fetch, conf); - setFetchTask(fetchTask); - - // For the FetchTask, the limit optimiztion requires we fetch all the rows - // in memory and count how many rows we get. It's not practical if the - // limit factor is too big - int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH); - if (globalLimitCtx.isEnable() && globalLimitCtx.getGlobalLimit() > fetchLimit) { - LOG.info("For FetchTask, LIMIT " + globalLimitCtx.getGlobalLimit() + " > " + fetchLimit - + ". Doesn't qualify limit optimiztion."); - globalLimitCtx.disableOpt(); - } - } else if (!isCStats) { - for (LoadTableDesc ltd : loadTableWork) { - Task tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), - conf); - mvTask.add(tsk); - // Check to see if we are stale'ing any indexes and auto-update them if we want - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) { - IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, getInputs(), conf); - try { - List> indexUpdateTasks = indexUpdater - .generateUpdateTasks(); - for (Task updateTask : indexUpdateTasks) { - tsk.addDependentTask(updateTask); - } - } catch (HiveException e) { - console - .printInfo("WARNING: could not auto-update stale indexes, which are not in sync"); - } - } - } - - boolean oneLoadFile = true; - for (LoadFileDesc lfd : loadFileWork) { - if (qb.isCTAS()) { - assert (oneLoadFile); // should not have more than 1 load file for - // CTAS - // make the movetask's destination directory the table's destination. - String location = qb.getTableDesc().getLocation(); - if (location == null) { - // get the table's default location - Table dumpTable; - Path targetPath; - try { - dumpTable = db.newTable(qb.getTableDesc().getTableName()); - if (!db.databaseExists(dumpTable.getDbName())) { - throw new SemanticException("ERROR: The database " + dumpTable.getDbName() - + " does not exist."); - } - Warehouse wh = new Warehouse(conf); - targetPath = wh.getTablePath(db.getDatabase(dumpTable.getDbName()), dumpTable - .getTableName()); - } catch (HiveException e) { - throw new SemanticException(e); - } catch (MetaException e) { - throw new SemanticException(e); - } - - location = targetPath.toString(); - } - lfd.setTargetDir(location); - - oneLoadFile = false; - } - mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false), - conf)); - } - } - - // generate map reduce plans - ParseContext tempParseContext = getParseContext(); - GenMRProcContext procCtx = new GenMRProcContext( - conf, - new HashMap, Task>(), - new ArrayList>(), tempParseContext, - mvTask, rootTasks, - new LinkedHashMap, GenMapRedCtx>(), - inputs, outputs); - - // create a walker which walks the tree in a DFS manner while maintaining - // the operator stack. - // The dispatcher generates the plan from the operator tree - Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp(new String("R1"), - TableScanOperator.getOperatorName() + "%"), - new GenMRTableScan1()); - opRules.put(new RuleRegExp(new String("R2"), - TableScanOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), - new GenMRRedSink1()); - opRules.put(new RuleRegExp(new String("R3"), - ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), - new GenMRRedSink2()); - opRules.put(new RuleRegExp(new String("R4"), - FileSinkOperator.getOperatorName() + "%"), - new GenMRFileSink1()); - opRules.put(new RuleRegExp(new String("R5"), - UnionOperator.getOperatorName() + "%"), - new GenMRUnion1()); - opRules.put(new RuleRegExp(new String("R6"), - UnionOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"), - new GenMRRedSink3()); - opRules.put(new RuleRegExp(new String("R7"), - MapJoinOperator.getOperatorName() + "%"), - MapJoinFactory.getTableScanMapJoin()); - - // The dispatcher fires the processor corresponding to the closest matching - // rule and passes the context along - Dispatcher disp = new DefaultRuleDispatcher(new GenMROperator(), opRules, - procCtx); - - GraphWalker ogw = new GenMapRedWalker(disp); - ArrayList topNodes = new ArrayList(); - topNodes.addAll(topOps.values()); - ogw.startWalking(topNodes, null); - - /* - * If the query was the result of analyze table column compute statistics rewrite, create - * a column stats task instead of a fetch task to persist stats to the metastore. - */ - if (isCStats) { - genColumnStatsTask(qb); - } - - // reduce sink does not have any kids - since the plan by now has been - // broken up into multiple - // tasks, iterate over all tasks. - // For each task, go over all operators recursively - for (Task rootTask : rootTasks) { - breakTaskTree(rootTask); - } - - // For each task, set the key descriptor for the reducer - for (Task rootTask : rootTasks) { - GenMapRedUtils.setKeyAndValueDescForTaskTree(rootTask); - } - - // If a task contains an operator which instructs bucketizedhiveinputformat - // to be used, please do so - for (Task rootTask : rootTasks) { - setInputFormat(rootTask); - } - - PhysicalContext physicalContext = new PhysicalContext(conf, - getParseContext(), ctx, rootTasks, fetchTask); - PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer( - physicalContext, conf); - physicalOptimizer.optimize(); - - // For each operator, generate the counters if needed - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS)) { - for (Task rootTask : rootTasks) { - generateCountersTask(rootTask); - } - } - - decideExecMode(rootTasks, ctx, globalLimitCtx); - - if (qb.isCTAS()) { - // generate a DDL task and make it a dependent task of the leaf - CreateTableDesc crtTblDesc = qb.getTableDesc(); - - crtTblDesc.validate(); - - // Clear the output for CTAS since we don't need the output from the - // mapredWork, the - // DDLWork at the tail of the chain will have the output - getOutputs().clear(); - - Task crtTblTask = TaskFactory.get(new DDLWork( - getInputs(), getOutputs(), crtTblDesc), conf); - - // find all leaf tasks and make the DDLTask as a dependent task of all of - // them - HashSet> leaves = new HashSet>(); - getLeafTasks(rootTasks, leaves); - assert (leaves.size() > 0); - for (Task task : leaves) { - if (task instanceof StatsTask) { - // StatsTask require table to already exist - for (Task parentOfStatsTask : task.getParentTasks()) { - parentOfStatsTask.addDependentTask(crtTblTask); - } - for (Task parentOfCrtTblTask : crtTblTask.getParentTasks()) { - parentOfCrtTblTask.removeDependentTask(task); - } - crtTblTask.addDependentTask(task); - } else { - task.addDependentTask(crtTblTask); - } - } - } - - if (globalLimitCtx.isEnable() && fetchTask != null) { - int fetchLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVELIMITOPTMAXFETCH); - LOG.info("set least row check for FetchTask: " + globalLimitCtx.getGlobalLimit()); - fetchTask.getWork().setLeastNumRows(globalLimitCtx.getGlobalLimit()); - } - - if (globalLimitCtx.isEnable() && globalLimitCtx.getLastReduceLimitDesc() != null) { - LOG.info("set least row check for LimitDesc: " + globalLimitCtx.getGlobalLimit()); - globalLimitCtx.getLastReduceLimitDesc().setLeastRows(globalLimitCtx.getGlobalLimit()); - List mrTasks = Utilities.getMRTasks(rootTasks); - for (ExecDriver tsk : mrTasks) { - tsk.setRetryCmdWhenFail(true); - } - } - } - - /** - * Find all leaf tasks of the list of root tasks. - */ - private void getLeafTasks(List> rootTasks, - HashSet> leaves) { - - for (Task root : rootTasks) { - getLeafTasks(root, leaves); - } - } - - private void getLeafTasks(Task task, - HashSet> leaves) { - if (task.getDependentTasks() == null) { - if (!leaves.contains(task)) { - leaves.add(task); - } - } else { - getLeafTasks(task.getDependentTasks(), leaves); - } - } - - // loop over all the tasks recursviely - private void generateCountersTask(Task task) { - if (task instanceof ExecDriver) { - HashMap> opMap = ((MapredWork) task - .getWork()).getAliasToWork(); - if (!opMap.isEmpty()) { - for (Operator op : opMap.values()) { - generateCountersOperator(op); - } - } - - Operator reducer = ((MapredWork) task.getWork()) - .getReducer(); - if (reducer != null) { - LOG.info("Generating counters for operator " + reducer); - generateCountersOperator(reducer); - } - } else if (task instanceof ConditionalTask) { - List> listTasks = ((ConditionalTask) task) - .getListTasks(); - for (Task tsk : listTasks) { - generateCountersTask(tsk); - } - } - - // Start the counters from scratch - a hack for hadoop 17. - Operator.resetLastEnumUsed(); - - if (task.getChildTasks() == null) { - return; - } - - for (Task childTask : task.getChildTasks()) { - generateCountersTask(childTask); - } - } - - private void generateCountersOperator(Operator op) { - op.assignCounterNameToEnum(); - - if (op.getChildOperators() == null) { - return; - } - - for (Operator child : op.getChildOperators()) { - generateCountersOperator(child); - } - } - - // loop over all the tasks recursviely - private void breakTaskTree(Task task) { - - if (task instanceof ExecDriver) { - HashMap> opMap = ((MapredWork) task - .getWork()).getAliasToWork(); - if (!opMap.isEmpty()) { - for (Operator op : opMap.values()) { - breakOperatorTree(op); - } - } - } else if (task instanceof ConditionalTask) { - List> listTasks = ((ConditionalTask) task) - .getListTasks(); - for (Task tsk : listTasks) { - breakTaskTree(tsk); - } - } - - if (task.getChildTasks() == null) { - return; - } - - for (Task childTask : task.getChildTasks()) { - breakTaskTree(childTask); - } - } - - // loop over all the operators recursviely - private void breakOperatorTree(Operator topOp) { - if (topOp instanceof ReduceSinkOperator) { - topOp.setChildOperators(null); - } - - if (topOp.getChildOperators() == null) { - return; - } - - for (Operator op : topOp.getChildOperators()) { - breakOperatorTree(op); - } - } - - private void setInputFormat(MapredWork work, Operator op) { - if (op.isUseBucketizedHiveInputFormat()) { - work.setUseBucketizedHiveInputFormat(true); - return; - } - - if (op.getChildOperators() != null) { - for (Operator childOp : op.getChildOperators()) { - setInputFormat(work, childOp); - } - } - } - - // loop over all the tasks recursviely - private void setInputFormat(Task task) { - if (task instanceof ExecDriver) { - MapredWork work = (MapredWork) task.getWork(); - HashMap> opMap = work.getAliasToWork(); - if (!opMap.isEmpty()) { - for (Operator op : opMap.values()) { - setInputFormat(work, op); - } - } - } else if (task instanceof ConditionalTask) { - List> listTasks = ((ConditionalTask) task).getListTasks(); - for (Task tsk : listTasks) { - setInputFormat(tsk); - } - } - - if (task.getChildTasks() != null) { - for (Task childTask : task.getChildTasks()) { - setInputFormat(childTask); - } - } - } - @SuppressWarnings("nls") public Phase1Ctx initPhase1Ctx() { @@ -8780,8 +8318,11 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { } // At this point we have the complete operator tree - // from which we want to find the reduce operator - genMapRedTasks(pCtx); + // from which we want to create the map-reduce plan + MapReduceCompiler compiler = new MapReduceCompiler(); + compiler.init(conf, console, db); + compiler.compile(pCtx, rootTasks, inputs, outputs); + fetchTask = pCtx.getFetchTask(); LOG.info("Completed plan generation"); @@ -9569,98 +9110,6 @@ private void validateCreateView(CreateViewDesc createVwDesc) } } - private void decideExecMode(List> rootTasks, Context ctx, - GlobalLimitCtx globalLimitCtx) - throws SemanticException { - - // bypass for explain queries for now - if (ctx.getExplain()) { - return; - } - - // user has told us to run in local mode or doesn't want auto-local mode - if (ctx.isLocalOnlyExecutionMode() || - !conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) { - return; - } - - final Context lCtx = ctx; - PathFilter p = new PathFilter() { - public boolean accept(Path file) { - return !lCtx.isMRTmpFileURI(file.toUri().getPath()); - } - }; - List mrtasks = Utilities.getMRTasks(rootTasks); - - // map-reduce jobs will be run locally based on data size - // first find out if any of the jobs needs to run non-locally - boolean hasNonLocalJob = false; - for (ExecDriver mrtask : mrtasks) { - try { - ContentSummary inputSummary = Utilities.getInputSummary - (ctx, (MapredWork) mrtask.getWork(), p); - int numReducers = getNumberOfReducers(mrtask.getWork(), conf); - - long estimatedInput; - - if (globalLimitCtx != null && globalLimitCtx.isEnable()) { - // If the global limit optimization is triggered, we will - // estimate input data actually needed based on limit rows. - // estimated Input = (num_limit * max_size_per_row) * (estimated_map + 2) - // - long sizePerRow = HiveConf.getLongVar(conf, - HiveConf.ConfVars.HIVELIMITMAXROWSIZE); - estimatedInput = globalLimitCtx.getGlobalLimit() * sizePerRow; - long minSplitSize = HiveConf.getLongVar(conf, - HiveConf.ConfVars.MAPREDMINSPLITSIZE); - long estimatedNumMap = inputSummary.getLength() / minSplitSize + 1; - estimatedInput = estimatedInput * (estimatedNumMap + 1); - } else { - estimatedInput = inputSummary.getLength(); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Task: " + mrtask.getId() + ", Summary: " + - inputSummary.getLength() + "," + inputSummary.getFileCount() + "," - + numReducers + ", estimated Input: " + estimatedInput); - } - - if (MapRedTask.isEligibleForLocalMode(conf, numReducers, - estimatedInput, inputSummary.getFileCount()) != null) { - hasNonLocalJob = true; - break; - } else { - mrtask.setLocalMode(true); - } - } catch (IOException e) { - throw new SemanticException(e); - } - } - - if (!hasNonLocalJob) { - // Entire query can be run locally. - // Save the current tracker value and restore it when done. - ctx.setOriginalTracker(ShimLoader.getHadoopShims().getJobLauncherRpcAddress(conf)); - ShimLoader.getHadoopShims().setJobLauncherRpcAddress(conf, "local"); - console.printInfo("Automatically selecting local only mode for query"); - } - } - - /** - * Make a best guess at trying to find the number of reducers - */ - private static int getNumberOfReducers(MapredWork mrwork, HiveConf conf) { - if (mrwork.getReducer() == null) { - return 0; - } - - if (mrwork.getNumReduceTasks() >= 0) { - return mrwork.getNumReduceTasks(); - } - - return conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS); - } - // Process the position alias in GROUPBY and ORDERBY private void processPositionAlias(ASTNode ast) throws SemanticException { if (HiveConf.getBoolVar(conf,