diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 3a1eff8..762ce7d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -185,8 +185,15 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork) JobConf jobConf = cloneJobConf(mapWork); Class ifClass = getInputFormat(jobConf, mapWork); - JavaPairRDD hadoopRDD = sc.hadoopRDD(jobConf, ifClass, - WritableComparable.class, Writable.class); + JavaPairRDD hadoopRDD; + if (mapWork.getNumMapTasks() != null) { + jobConf.setNumMapTasks(mapWork.getNumMapTasks()); + hadoopRDD = sc.hadoopRDD(jobConf, ifClass, + WritableComparable.class, Writable.class, mapWork.getNumMapTasks()); + } else { + hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); + } + // Caching is disabled for MapInput due to HIVE-8920 MapInput result = new MapInput(sparkPlan, hadoopRDD, false/*cloneToWork.containsKey(mapWork)*/); return result; @@ -285,6 +292,10 @@ private JobConf cloneJobConf(BaseWork work) throws Exception { } else { cloned.set(Utilities.MAPRED_MAPPER_CLASS, ExecMapper.class.getName()); } + if (((MapWork) work).getMinSplitSize() != null) { + HiveConf.setLongVar(cloned, HiveConf.ConfVars.MAPREDMINSPLITSIZE, + ((MapWork) work).getMinSplitSize()); + } // remember the JobConf cloned for each MapWork, so we won't clone for it again workToJobConf.put(work, cloned); } else if (work instanceof ReduceWork) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index f3fb541..7ebd18d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -297,15 +297,17 @@ public static void processSkewJoin(JoinOperator joinOp, Task