diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java index 05748a1..7cdaa41 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java @@ -119,11 +119,16 @@ private boolean containsOp(BaseWork work, Class clazz) { // of its parent SparkWorks for the small tables private final Map> dependencyGraph; + // Newly generated tasks. We need to keep this so they won't be + // processed by the dispatcher. + private final Set generatedTasks; + public SparkMapJoinTaskDispatcher(PhysicalContext pc) { super(); physicalContext = pc; sparkWorkMap = new LinkedHashMap(); dependencyGraph = new LinkedHashMap>(); + generatedTasks = new HashSet(); } // Move the specified work from the sparkWork to the targetWork @@ -282,6 +287,7 @@ private SparkTask createSparkTask(SparkTask originalTask, } createdTaskMap.put(sparkWork, resultTask); + generatedTasks.add(resultTask); return resultTask; } @@ -289,7 +295,7 @@ private SparkTask createSparkTask(SparkTask originalTask, public Object dispatch(Node nd, Stack stack, Object... nos) throws SemanticException { Task currentTask = (Task) nd; - if(currentTask.isMapRedTask()) { + if(currentTask.isMapRedTask() && !generatedTasks.contains(currentTask)) { if (currentTask instanceof ConditionalTask) { List> taskList = ((ConditionalTask) currentTask).getListTasks();