diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 8db833e..9f72056 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -218,6 +218,8 @@ public static String REDUCE_PLAN_NAME = "reduce.xml"; public static String MERGE_PLAN_NAME = "merge.xml"; public static final String INPUT_NAME = "iocontext.input.name"; + public static final String HAS_MAP_WORK = "has.map.work"; + public static final String HAS_REDUCE_WORK = "has.reduce.work"; public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class"; public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; public static final String HIVE_ADDED_JARS = "hive.added.jars"; @@ -302,6 +304,9 @@ public static void setMapWork(Configuration conf, MapWork work) { } public static MapWork getMapWork(Configuration conf) { + if (!conf.getBoolean(HAS_MAP_WORK, true)) { + return null; + } return (MapWork) getBaseWork(conf, MAP_PLAN_NAME); } @@ -310,6 +315,9 @@ public static void setReduceWork(Configuration conf, ReduceWork work) { } public static ReduceWork getReduceWork(Configuration conf) { + if (!conf.getBoolean(HAS_REDUCE_WORK, true)) { + return null; + } return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 34b683c..9c0d135 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -264,6 +264,9 @@ public int execute(DriverContext driverContext) { job.setNumReduceTasks(rWork != null ? rWork.getNumReduceTasks().intValue() : 0); job.setReducerClass(ExecReducer.class); + if (rWork == null) { + job.setBoolean(Utilities.HAS_REDUCE_WORK, false); + } // set input format information if necessary setInputAttributes(job); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 66ffe5d..8dd37fc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -306,6 +306,7 @@ private JobConf cloneJobConf(BaseWork work) throws Exception { scratchDir, context, false); Utilities.setInputPaths(cloned, inputPaths); Utilities.setMapWork(cloned, (MapWork) work, scratchDir, false); + cloned.setBoolean(Utilities.HAS_REDUCE_WORK, false); Utilities.createTmpDirs(cloned, (MapWork) work); if (work instanceof MergeFileWork) { MergeFileWork mergeFileWork = (MergeFileWork) work; @@ -325,6 +326,7 @@ private JobConf cloneJobConf(BaseWork work) throws Exception { } else if (work instanceof ReduceWork) { cloned.setBoolean("mapred.task.is.map", false); Utilities.setReduceWork(cloned, (ReduceWork) work, scratchDir, false); + cloned.setBoolean(Utilities.HAS_MAP_WORK, false); Utilities.createTmpDirs(cloned, (ReduceWork) work); cloned.set(Utilities.MAPRED_REDUCER_CLASS, ExecReducer.class.getName()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index b3b6431..6ca0184 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -101,6 +101,7 @@ public int execute(DriverContext driverContext) { job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(0); + job.setBoolean(Utilities.HAS_REDUCE_WORK, false); // create the temp directories Path outputPath = work.getOutputDir(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java index dcd0e97..c119eed 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java @@ -134,6 +134,7 @@ public int execute(DriverContext driverContext) { // zero reducers job.setNumReduceTasks(0); + job.setBoolean(Utilities.HAS_REDUCE_WORK, false); if (work.getMinSplitSize() != null) { HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index fcfcf2f..941abf7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -108,6 +108,7 @@ public int execute(DriverContext driverContext) { // zero reducers job.setNumReduceTasks(0); + job.setBoolean(Utilities.HAS_REDUCE_WORK, false); if (work.getMinSplitSize() != null) { HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work