diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java index 08b603c..0e2fa25 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java @@ -96,10 +96,10 @@ protected void replaceTask( currTask.setParentTasks(null); if (parentTasks != null) { for (Task tsk : parentTasks) { - // make new generated task depends on all the parent tasks of current task. - tsk.addDependentTask(newTask); // remove the current task from its original parent task's dependent task tsk.removeDependentTask(currTask); + // make new generated task depends on all the parent tasks of current task. + tsk.addDependentTask(newTask); } } else { // remove from current root task and add conditional task to root tasks @@ -112,10 +112,10 @@ protected void replaceTask( currTask.setChildTasks(null); if (oldChildTasks != null) { for (Task tsk : oldChildTasks) { - // make new generated task depends on all the parent tasks of current task. - newTask.addDependentTask(tsk); // remove the current task from its original parent task's dependent task tsk.getParentTasks().remove(currTask); + // make new generated task depends on all the parent tasks of current task. + newTask.addDependentTask(tsk); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index c049092..2b2632d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -370,11 +370,8 @@ private void mayBeMergeMapJoinTaskWithMapReduceTask(MapRedTask mapJoinTask, Conf } // create map join task and set big table as bigTablePosition - private ObjectPair convertTaskToMapJoinTask(String xml, + private ObjectPair convertTaskToMapJoinTask(MapredWork newWork, int bigTablePosition) throws UnsupportedEncodingException, SemanticException { - // deep copy a new mapred work from xml - InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8")); - MapredWork newWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf()); // create a mapred task for this work MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext .getParseContext().getConf()); @@ -493,11 +490,10 @@ private void mayBeMergeMapJoinTaskWithMapReduceTask(MapRedTask mapJoinTask, Conf String bigTableAlias = null; currWork.setOpParseCtxMap(parseCtx.getOpParseCtx()); currWork.setJoinTree(joinTree); - String xml = currWork.toXML(); if (convertJoinMapJoin) { // create map join task and set big table as bigTablePosition - MapRedTask newTask = convertTaskToMapJoinTask(xml, bigTablePosition).getFirst(); + MapRedTask newTask = convertTaskToMapJoinTask(currWork, bigTablePosition).getFirst(); newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP); replaceTask(currTask, newTask, physicalContext); @@ -526,14 +522,18 @@ private void mayBeMergeMapJoinTaskWithMapReduceTask(MapRedTask mapJoinTask, Conf long ThresholdOfSmallTblSizeSum = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVESMALLTABLESFILESIZE); boolean foundBigTableCandidates = false; + String xml = currWork.toXML(); for (int i = 0; i < numAliases; i++) { // this table cannot be big table if (!bigTableCandidates.contains(i)) { continue; } + // deep copy a new mapred work from xml + InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8")); + MapredWork newWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf()); // create map join task and set big table as i - ObjectPair newTaskAlias = convertTaskToMapJoinTask(xml, i); + ObjectPair newTaskAlias = convertTaskToMapJoinTask(newWork, i); MapRedTask newTask = newTaskAlias.getFirst(); bigTableAlias = newTaskAlias.getSecond();