diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java index a3ec990..1f60344 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java @@ -141,11 +141,16 @@ public SparkMapJoinTaskDispatcher(PhysicalContext pc) { // Move the specified work from the sparkWork to the targetWork // Note that, in order not to break the graph (since we need it for the edges), // we don't remove the work from the sparkWork here. The removal is done later. - private void moveWork(SparkWork sparkWork, BaseWork work, SparkWork targetWork) { + private void moveWork(SparkWork sparkWork, BaseWork work, SparkWork targetWork,List createdSparkWork) { List parentWorks = sparkWork.getParents(work); + for (BaseWork parent : parentWorks) { + if (createdSparkWork.contains(parent)) { + return; + } + } if (sparkWork != targetWork) { targetWork.add(work); - + createdSparkWork.add(work); // If any child work for this work is already added to the targetWork earlier, // we should connect this work with it for (BaseWork childWork : sparkWork.getChildren(work)) { @@ -157,7 +162,7 @@ private void moveWork(SparkWork sparkWork, BaseWork work, SparkWork targetWork) if (!containsOp(work, MapJoinOperator.class)) { for (BaseWork parent : parentWorks) { - moveWork(sparkWork, parent, targetWork); + moveWork(sparkWork, parent, targetWork,createdSparkWork); } } else { // Create a new SparkWork for all the small tables of this work @@ -174,9 +179,9 @@ private void moveWork(SparkWork sparkWork, BaseWork work, SparkWork targetWork) sparkWorkMap.put(work, parentWork); for (BaseWork parent : parentWorks) { if (containsOp(parent, SparkHashTableSinkOperator.class)) { - moveWork(sparkWork, parent, parentWork); + moveWork(sparkWork, parent, parentWork,createdSparkWork); } else { - moveWork(sparkWork, parent, targetWork); + moveWork(sparkWork, parent, targetWork,createdSparkWork); } } } @@ -186,23 +191,28 @@ private void generateLocalWork(SparkTask originalTask) { SparkWork originalWork = originalTask.getWork(); Collection allBaseWorks = originalWork.getAllWork(); Context ctx = physicalContext.getContext(); - + Path tmpPath = null; for (BaseWork work : allBaseWorks) { if (work.getMapRedLocalWork() == null) { if (containsOp(work, SparkHashTableSinkOperator.class) || containsOp(work, MapJoinOperator.class)) { work.setMapRedLocalWork(new MapredLocalWork()); } + if (containsOp(work, SparkHashTableSinkOperator.class)) { + if (tmpPath == null) { + tmpPath = Utilities.generateTmpPath(ctx.getMRTmpPath(), originalTask.getId()); + } + work.getMapRedLocalWork().setTmpHDFSPath(tmpPath); + } Set> ops = getOp(work, MapJoinOperator.class); if (ops == null || ops.isEmpty()) { continue; } - Path tmpPath = Utilities.generateTmpPath(ctx.getMRTmpPath(), originalTask.getId()); + MapredLocalWork bigTableLocalWork = work.getMapRedLocalWork(); List> dummyOps = new ArrayList>(work.getDummyOps()); bigTableLocalWork.setDummyParentOp(dummyOps); - bigTableLocalWork.setTmpPath(tmpPath); // In one work, only one map join operator can be bucketed SparkBucketMapJoinContext bucketMJCxt = null; @@ -228,7 +238,7 @@ private void generateLocalWork(SparkTask originalTask) { continue; } MapredLocalWork parentLocalWork = parentWork.getMapRedLocalWork(); - parentLocalWork.setTmpHDFSPath(tmpPath); + bigTableLocalWork.setTmpPath(parentLocalWork.getTmpHDFSPath()); if (bucketMJCxt != null) { // We only need to update the work with the hashtable // sink operator with the same mapjoin desc. We can tell @@ -334,8 +344,9 @@ private void processCurrentTask(SparkTask sparkTask, ConditionalTask conditional dependencyGraph.put(sparkWork, new ArrayList()); Set leaves = sparkWork.getLeaves(); + List createdSparkWork = new ArrayList(); for (BaseWork leaf : leaves) { - moveWork(sparkWork, leaf, sparkWork); + moveWork(sparkWork, leaf, sparkWork,createdSparkWork); } // Now remove all BaseWorks in all the childSparkWorks that we created diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index 005fad2..69635c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -202,27 +202,6 @@ private int convertJoinBucketMapJoin(JoinOperator joinOp, MapJoinOperator mapJoi return new long[]{-1, 0, 0}; } - // Union is hard to handle. For instance, the following case: - // TS TS - // | | - // FIL FIL - // | | - // SEL SEL - // \ / - // UNION - // | - // RS - // | - // JOIN - // If we treat this as a MJ case, then after the RS is removed, we would - // create two MapWorks, for each of the TS. Each of these MapWork will contain - // a MJ operator, which is wrong. - // Otherwise, we could try to break the op tree at the UNION, and create two MapWorks - // for the branches above. Then, MJ will be in the following ReduceWork. - // But, this is tricky to implement, and we'll leave it as a future work for now. - if (containUnionWithoutRS(parentOp.getParentOperators().get(0))) { - return new long[]{-1, 0, 0}; - } long inputSize = currInputStat.getDataSize(); if ((bigInputStat == null)