diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java index d4ebbd4..a809284 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java @@ -54,7 +54,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { leastRow = conf.getLeastRows(); offset = (conf.getOffset() == null) ? 0 : conf.getOffset(); currCount = 0; - isMap = hconf.getBoolean("mapred.task.is.map", true); + isMap = hconf.getBoolean(Utilities.MAPRED_TASK_IS_MAP, true); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 8db833e..94be0e8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -220,6 +220,7 @@ public static final String INPUT_NAME = "iocontext.input.name"; public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class"; public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; + public static final String MAPRED_TASK_IS_MAP = "mapred.task.is.map"; public static final String HIVE_ADDED_JARS = "hive.added.jars"; public static final String VECTOR_MODE = "VECTOR_MODE"; public static final String USE_VECTORIZED_INPUT_FILE_FORMAT = "USE_VECTORIZED_INPUT_FILE_FORMAT"; @@ -288,11 +289,30 @@ public static void clearWork(Configuration conf) { public static MapredWork getMapRedWork(Configuration conf) { MapredWork w = new MapredWork(); - w.setMapWork(getMapWork(conf)); - w.setReduceWork(getReduceWork(conf)); + boolean isMap = conf.getBoolean(MAPRED_TASK_IS_MAP, true); + if (isMap) { + w.setMapWork(getMapWork(conf)); + } + if (!isMap || w.getMapWork() == null || workHasOp(w.getMapWork(), ReduceSinkOperator.class)) { + w.setReduceWork(getReduceWork(conf)); + } return w; } + /** + * Check if 'work' contains a operator of class 'clazz'. Return true if so. + */ + public static boolean workHasOp(BaseWork work, Class clazz) { + if (work != null) { + for (Operator op : work.getAllOperators()) { + if (clazz.isInstance(op)) { + return true; + } + } + } + return false; + } + public static void cacheMapWork(Configuration conf, MapWork work, Path hiveScratchDir) { cacheBaseWork(conf, MAP_PLAN_NAME, work, hiveScratchDir); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 66ffe5d..f136ed1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -301,7 +301,7 @@ private JobConf cloneJobConf(BaseWork work) throws Exception { throw new IllegalArgumentException(msg, e); } if (work instanceof MapWork) { - cloned.setBoolean("mapred.task.is.map", true); + cloned.setBoolean(Utilities.MAPRED_TASK_IS_MAP, true); List inputPaths = Utilities.getInputPaths(cloned, (MapWork) work, scratchDir, context, false); Utilities.setInputPaths(cloned, inputPaths); @@ -323,7 +323,7 @@ private JobConf cloneJobConf(BaseWork work) throws Exception { // remember the JobConf cloned for each MapWork, so we won't clone for it again workToJobConf.put(work, cloned); } else if (work instanceof ReduceWork) { - cloned.setBoolean("mapred.task.is.map", false); + cloned.setBoolean(Utilities.MAPRED_TASK_IS_MAP, false); Utilities.setReduceWork(cloned, (ReduceWork) work, scratchDir, false); Utilities.createTmpDirs(cloned, (ReduceWork) work); cloned.set(Utilities.MAPRED_REDUCER_CLASS, ExecReducer.class.getName()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index aa2dfc7..c007cdd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -597,6 +597,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, // remember mapping of plan to input conf.set(Utilities.INPUT_NAME, mapWork.getName()); + conf.setBoolean(Utilities.MAPRED_TASK_IS_MAP, true); if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)) { // set up the operator plan. (before setting up splits on the AM) @@ -712,6 +713,7 @@ private Vertex createVertex(JobConf conf, ReduceWork reduceWork, // set up operator plan conf.set(Utilities.INPUT_NAME, reduceWork.getName()); + conf.setBoolean(Utilities.MAPRED_TASK_IS_MAP, false); Utilities.setReduceWork(conf, reduceWork, mrScratchDir, false); // create the directories FileSinkOperators need