diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 4a13650..d165630 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -68,7 +68,7 @@ import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.Vertex; -import org.apache.tez.mapreduce.common.MRInputSplitDistributor; +import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; @@ -209,9 +209,6 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, mrScratchDir.toUri().toString(), ctx); Utilities.setInputPaths(conf, inputPaths); - InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf, - new Path(tezDir, "split_"+mapWork.getName().replaceAll(" ", "_"))); - // create the directories FileSinkOperators need Utilities.createTmpDirs(conf, mapWork); @@ -221,37 +218,31 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, // finally create the vertex Vertex map = null; byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf); - if (inputSplitInfo.getNumTasks() != 0) { - map = new Vertex(mapWork.getName(), - new ProcessorDescriptor(MapTezProcessor.class.getName()). - setUserPayload(serializedConf), - inputSplitInfo.getNumTasks(), MRHelpers.getMapResource(conf)); - Map environment = new HashMap(); - MRHelpers.updateEnvironmentForMRTasks(conf, environment, true); - map.setTaskEnvironment(environment); - map.setJavaOpts(MRHelpers.getMapJavaOpts(conf)); - - assert mapWork.getAliasToWork().keySet().size() == 1; - - String alias = mapWork.getAliasToWork().keySet().iterator().next(); - byte[] mrInput = MRHelpers.createMRInputPayload(serializedConf, null); - map.addInput(alias, - new InputDescriptor(MRInput.class.getName()). - setUserPayload(mrInput), null); - - map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints()); - - Map localResources = new HashMap(); - localResources.put(getBaseName(appJarLr), appJarLr); - for (LocalResource lr: additionalLr) { - localResources.put(getBaseName(lr), lr); - } - localResources.put(FilenameUtils.getName(planPath.getName()), planLr); + map = new Vertex(mapWork.getName(), + new ProcessorDescriptor(MapTezProcessor.class.getName()). + setUserPayload(serializedConf), -1, + MRHelpers.getMapResource(conf)); + Map environment = new HashMap(); + MRHelpers.updateEnvironmentForMRTasks(conf, environment, true); + map.setTaskEnvironment(environment); + map.setJavaOpts(MRHelpers.getMapJavaOpts(conf)); - MRHelpers.updateLocalResourcesForInputSplits(FileSystem.get(conf), inputSplitInfo, - localResources); - map.setTaskLocalResources(localResources); + assert mapWork.getAliasToWork().keySet().size() == 1; + + String alias = mapWork.getAliasToWork().keySet().iterator().next(); + byte[] mrInput = MRHelpers.createMRInputPayload(serializedConf, null); + map.addInput(alias, + new InputDescriptor(MRInput.class.getName()). + setUserPayload(mrInput), MRInputAMSplitGenerator.class); + + Map localResources = new HashMap(); + localResources.put(getBaseName(appJarLr), appJarLr); + for (LocalResource lr: additionalLr) { + localResources.put(getBaseName(lr), lr); } + localResources.put(FilenameUtils.getName(planPath.getName()), planLr); + + map.setTaskLocalResources(localResources); return map; }