diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 12a76a7..5f85f9e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -298,12 +298,13 @@ private JobConf cloneJobConf(BaseWork work) throws Exception { throw new IllegalArgumentException(msg, e); } if (work instanceof MapWork) { + MapWork mapWork = (MapWork) work; cloned.setBoolean("mapred.task.is.map", true); - List inputPaths = Utilities.getInputPaths(cloned, (MapWork) work, + List inputPaths = Utilities.getInputPaths(cloned, mapWork, scratchDir, context, false); Utilities.setInputPaths(cloned, inputPaths); - Utilities.setMapWork(cloned, (MapWork) work, scratchDir, false); - Utilities.createTmpDirs(cloned, (MapWork) work); + Utilities.setMapWork(cloned, mapWork, scratchDir, false); + Utilities.createTmpDirs(cloned, mapWork); if (work instanceof MergeFileWork) { MergeFileWork mergeFileWork = (MergeFileWork) work; cloned.set(Utilities.MAPRED_MAPPER_CLASS, MergeFileMapper.class.getName()); @@ -313,9 +314,21 @@ private JobConf cloneJobConf(BaseWork work) throws Exception { } else { cloned.set(Utilities.MAPRED_MAPPER_CLASS, ExecMapper.class.getName()); } - if (((MapWork) work).getMinSplitSize() != null) { + if (mapWork.getMaxSplitSize() != null) { + HiveConf.setLongVar(cloned, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, + mapWork.getMaxSplitSize()); + } + if (mapWork.getMinSplitSize() != null) { HiveConf.setLongVar(cloned, HiveConf.ConfVars.MAPREDMINSPLITSIZE, - ((MapWork) work).getMinSplitSize()); + mapWork.getMinSplitSize()); + } + if (mapWork.getMinSplitSizePerNode() != null) { + HiveConf.setLongVar(cloned, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, + mapWork.getMinSplitSizePerNode()); + } + if (mapWork.getMinSplitSizePerRack() != null) { + HiveConf.setLongVar(cloned, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, + mapWork.getMinSplitSizePerRack()); } // remember the JobConf cloned for each MapWork, so we won't clone for it again workToJobConf.put(work, cloned);