diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java index a8b7ac6..ac396d5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java @@ -111,11 +111,8 @@ private void moveWork(SparkWork sparkWork, BaseWork work, SparkWork targetWork) SparkWork parentWork = new SparkWork(physicalContext.conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); - // Update dependency graph - if (!dependencyGraph.containsKey(targetWork)) { - dependencyGraph.put(targetWork, new ArrayList()); - } dependencyGraph.get(targetWork).add(parentWork); + dependencyGraph.put(parentWork, new ArrayList()); // this work is now moved to the parentWork, thus we should // update this information in sparkWorkMap @@ -139,7 +136,7 @@ private SparkTask createSparkTask(Task originalTask, return createdTaskMap.get(sparkWork); } SparkTask resultTask = (SparkTask) TaskFactory.get(sparkWork, physicalContext.conf); - if (dependencyGraph.get(sparkWork) != null) { + if (!dependencyGraph.get(sparkWork).isEmpty()) { for (SparkWork parentWork : dependencyGraph.get(sparkWork)) { SparkTask parentTask = createSparkTask(originalTask, parentWork, createdTaskMap); parentTask.addDependentTask(resultTask); @@ -155,6 +152,8 @@ private SparkTask createSparkTask(Task originalTask, physicalContext.removeFromRootTask(originalTask); } } + + createdTaskMap.put(sparkWork, resultTask); return resultTask; } @@ -164,6 +163,8 @@ public Object dispatch(Node nd, Stack stack, Object... nos) Task currentTask = (Task) nd; if (currentTask instanceof SparkTask) { SparkWork sparkWork = ((SparkTask) currentTask).getWork(); + + dependencyGraph.put(sparkWork, new ArrayList()); Set leaves = sparkWork.getLeaves(); for (BaseWork leaf : leaves) { moveWork(sparkWork, leaf, sparkWork);