diff --git ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java index 9e4612d..f22694b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java +++ ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.lib; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.parse.SemanticException; /** @@ -58,6 +60,12 @@ public void walk(Node nd) throws SemanticException { for (Node n : nd.getChildren()) { walk(n); } + } else if (nd instanceof ConditionalTask) { + for (Task n : ((ConditionalTask) nd).getListTasks()) { + if (n.getParentTasks() == null || n.getParentTasks().isEmpty()) { + walk(n); + } + } } opStack.pop(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index 52e6be8..1252cb2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -335,6 +335,7 @@ public static void processSkewJoin(JoinOperator joinOp, Task> visitedTasks = new HashSet<>(); + @Override public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { @@ -78,6 +81,15 @@ private boolean containsOp(BaseWork work, Class clazz) { return matchingOps != null && !matchingOps.isEmpty(); } + private boolean containsOp(SparkWork sparkWork, Class clazz) { + for (BaseWork work : sparkWork.getAllWorkUnsorted()) { + if (containsOp(work, clazz)) { + return true; + } + } + return false; + } + public static Set> getOp(BaseWork work, Class clazz) { Set> ops = new HashSet>(); if (work instanceof MapWork) { @@ -172,70 +184,68 @@ private void moveWork(SparkWork sparkWork, BaseWork work, SparkWork targetWork) private void generateLocalWork(SparkTask originalTask) { SparkWork originalWork = originalTask.getWork(); - Collection allBaseWorks = originalWork.getAllWorkUnsorted(); - - for (BaseWork work : allBaseWorks) { - if (containsOp(work, SparkHashTableSinkOperator.class) || - containsOp(work, MapJoinOperator.class)) { - work.setMapRedLocalWork(new MapredLocalWork()); - } - } - + Collection allBaseWorks = originalWork.getAllWork(); Context ctx = physicalContext.getContext(); for (BaseWork work : allBaseWorks) { - Set> ops = getOp(work, MapJoinOperator.class); - if (ops == null || ops.isEmpty()) { - continue; - } - Path tmpPath = Utilities.generateTmpPath(ctx.getMRTmpPath(), originalTask.getId()); - MapredLocalWork bigTableLocalWork = work.getMapRedLocalWork(); - List> dummyOps = - new ArrayList>(work.getDummyOps()); - bigTableLocalWork.setDummyParentOp(dummyOps); - bigTableLocalWork.setTmpPath(tmpPath); - - // In one work, only one map join operator can be bucketed - SparkBucketMapJoinContext bucketMJCxt = null; - for (Operator op: ops) { - MapJoinOperator mapJoinOp = (MapJoinOperator) op; - MapJoinDesc mapJoinDesc = mapJoinOp.getConf(); - if (mapJoinDesc.isBucketMapJoin()) { - bucketMJCxt = new SparkBucketMapJoinContext(mapJoinDesc); - bucketMJCxt.setBucketMatcherClass( - org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class); - bucketMJCxt.setPosToAliasMap(mapJoinOp.getPosToAliasMap()); - ((MapWork) work).setUseBucketizedHiveInputFormat(true); - bigTableLocalWork.setBucketMapjoinContext(bucketMJCxt); - bigTableLocalWork.setInputFileChangeSensitive(true); - break; + if (work.getMapRedLocalWork() == null) { + if (containsOp(work, SparkHashTableSinkOperator.class) || + containsOp(work, MapJoinOperator.class)) { + work.setMapRedLocalWork(new MapredLocalWork()); } - } - - for (BaseWork parentWork : originalWork.getParents(work)) { - Set> hashTableSinkOps = - getOp(parentWork, SparkHashTableSinkOperator.class); - if (hashTableSinkOps == null || hashTableSinkOps.isEmpty()) { + Set> ops = getOp(work, MapJoinOperator.class); + if (ops == null || ops.isEmpty()) { continue; } - MapredLocalWork parentLocalWork = parentWork.getMapRedLocalWork(); - parentLocalWork.setTmpHDFSPath(tmpPath); - if (bucketMJCxt != null) { - // We only need to update the work with the hashtable - // sink operator with the same mapjoin desc. We can tell - // that by comparing the bucket file name mapping map - // instance. They should be exactly the same one due to - // the way how the bucket mapjoin context is constructed. - for (Operator op: hashTableSinkOps) { - SparkHashTableSinkOperator hashTableSinkOp = (SparkHashTableSinkOperator) op; - SparkHashTableSinkDesc hashTableSinkDesc = hashTableSinkOp.getConf(); - BucketMapJoinContext original = hashTableSinkDesc.getBucketMapjoinContext(); - if (original != null && original.getBucketFileNameMapping() - == bucketMJCxt.getBucketFileNameMapping()) { - ((MapWork) parentWork).setUseBucketizedHiveInputFormat(true); - parentLocalWork.setBucketMapjoinContext(bucketMJCxt); - parentLocalWork.setInputFileChangeSensitive(true); - break; + Path tmpPath = Utilities.generateTmpPath(ctx.getMRTmpPath(), originalTask.getId()); + MapredLocalWork bigTableLocalWork = work.getMapRedLocalWork(); + List> dummyOps = + new ArrayList>(work.getDummyOps()); + bigTableLocalWork.setDummyParentOp(dummyOps); + bigTableLocalWork.setTmpPath(tmpPath); + + // In one work, only one map join operator can be bucketed + SparkBucketMapJoinContext bucketMJCxt = null; + for (Operator op : ops) { + MapJoinOperator mapJoinOp = (MapJoinOperator) op; + MapJoinDesc mapJoinDesc = mapJoinOp.getConf(); + if (mapJoinDesc.isBucketMapJoin()) { + bucketMJCxt = new SparkBucketMapJoinContext(mapJoinDesc); + bucketMJCxt.setBucketMatcherClass( + org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class); + bucketMJCxt.setPosToAliasMap(mapJoinOp.getPosToAliasMap()); + ((MapWork) work).setUseBucketizedHiveInputFormat(true); + bigTableLocalWork.setBucketMapjoinContext(bucketMJCxt); + bigTableLocalWork.setInputFileChangeSensitive(true); + break; + } + } + + for (BaseWork parentWork : originalWork.getParents(work)) { + Set> hashTableSinkOps = + getOp(parentWork, SparkHashTableSinkOperator.class); + if (hashTableSinkOps == null || hashTableSinkOps.isEmpty()) { + continue; + } + MapredLocalWork parentLocalWork = parentWork.getMapRedLocalWork(); + parentLocalWork.setTmpHDFSPath(tmpPath); + if (bucketMJCxt != null) { + // We only need to update the work with the hashtable + // sink operator with the same mapjoin desc. We can tell + // that by comparing the bucket file name mapping map + // instance. They should be exactly the same one due to + // the way how the bucket mapjoin context is constructed. + for (Operator op : hashTableSinkOps) { + SparkHashTableSinkOperator hashTableSinkOp = (SparkHashTableSinkOperator) op; + SparkHashTableSinkDesc hashTableSinkDesc = hashTableSinkOp.getConf(); + BucketMapJoinContext original = hashTableSinkDesc.getBucketMapjoinContext(); + if (original != null && original.getBucketFileNameMapping() + == bucketMJCxt.getBucketFileNameMapping()) { + ((MapWork) parentWork).setUseBucketizedHiveInputFormat(true); + parentLocalWork.setBucketMapjoinContext(bucketMJCxt); + parentLocalWork.setInputFileChangeSensitive(true); + break; + } } } } @@ -296,10 +306,12 @@ public Object dispatch(Node nd, Stack stack, Object... nos) for (Task tsk : taskList) { if (tsk instanceof SparkTask) { processCurrentTask((SparkTask) tsk, (ConditionalTask) currentTask); + visitedTasks.add(tsk); } } } else if (currentTask instanceof SparkTask) { processCurrentTask((SparkTask) currentTask, null); + visitedTasks.add(currentTask); } } @@ -312,32 +324,47 @@ public Object dispatch(Node nd, Stack stack, Object... nos) * wrapped in its task list. */ private void processCurrentTask(SparkTask sparkTask, ConditionalTask conditionalTask) { - dependencyGraph.clear(); - sparkWorkMap.clear(); SparkWork sparkWork = sparkTask.getWork(); + if (!visitedTasks.contains(sparkTask)) { + dependencyGraph.clear(); + sparkWorkMap.clear(); - // Generate MapredLocalWorks for MJ and HTS - generateLocalWork(sparkTask); + // Generate MapredLocalWorks for MJ and HTS + generateLocalWork(sparkTask); - dependencyGraph.put(sparkWork, new ArrayList()); - Set leaves = sparkWork.getLeaves(); - for (BaseWork leaf : leaves) { - moveWork(sparkWork, leaf, sparkWork); - } + dependencyGraph.put(sparkWork, new ArrayList()); + Set leaves = sparkWork.getLeaves(); + for (BaseWork leaf : leaves) { + moveWork(sparkWork, leaf, sparkWork); + } - // Now remove all BaseWorks in all the childSparkWorks that we created - // from the original SparkWork - for (SparkWork newSparkWork : sparkWorkMap.values()) { - for (BaseWork work : newSparkWork.getAllWorkUnsorted()) { - sparkWork.remove(work); + // Now remove all BaseWorks in all the childSparkWorks that we created + // from the original SparkWork + for (SparkWork newSparkWork : sparkWorkMap.values()) { + for (BaseWork work : newSparkWork.getAllWorkUnsorted()) { + sparkWork.remove(work); + } } - } - Map createdTaskMap = new LinkedHashMap(); + Map createdTaskMap = new LinkedHashMap(); - // Now create SparkTasks from the SparkWorks, also set up dependency - for (SparkWork work : dependencyGraph.keySet()) { - createSparkTask(sparkTask, work, createdTaskMap, conditionalTask); + // Now create SparkTasks from the SparkWorks, also set up dependency + for (SparkWork work : dependencyGraph.keySet()) { + createSparkTask(sparkTask, work, createdTaskMap, conditionalTask); + } + } else if (conditionalTask != null) { + // We may need to update the conditional task's list. This happens when a common map join + // task exists in the task list and has already been processed. In such a case, + // the current task is the map join task and we need to replace it with + // its parent, i.e. the small table task. + if (sparkTask.getParentTasks() != null && sparkTask.getParentTasks().size() == 1 && + sparkTask.getParentTasks().get(0) instanceof SparkTask) { + SparkTask parent = (SparkTask) sparkTask.getParentTasks().get(0); + if (containsOp(sparkWork, MapJoinOperator.class) && + containsOp(parent.getWork(), SparkHashTableSinkOperator.class)) { + updateConditionalTask(conditionalTask, sparkTask, parent); + } + } } } @@ -382,6 +409,10 @@ private void updateConditionalTask(ConditionalTask conditionalTask, } } context.setDirToTaskMap(newbigKeysDirToTaskMap); + // update no skew task + if (context.getNoSkewTask().equals(originalTask)) { + context.setNoSkewTask(newTask); + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java index 4c2f42e..00743bc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import java.io.Serializable; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.Stack; @@ -58,6 +59,9 @@ * Spark-version of SkewJoinProcFactory. */ public class SparkSkewJoinProcFactory { + // let's remember the join operators we have processed + private static final Set visitedJoinOp = new HashSet(); + private SparkSkewJoinProcFactory() { // prevent instantiation } @@ -84,10 +88,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (!op.getConf().isFixedAsSorted() && currentTsk instanceof SparkTask && reduceWork != null && ((SparkTask) currentTsk).getWork().contains(reduceWork) && GenSparkSkewJoinProcessor.supportRuntimeSkewJoin( - op, currentTsk, parseContext.getConf())) { + op, currentTsk, parseContext.getConf()) && !visitedJoinOp.contains(op)) { // first we try to split the task splitTask((SparkTask) currentTsk, reduceWork, parseContext); GenSparkSkewJoinProcessor.processSkewJoin(op, currentTsk, reduceWork, parseContext); + visitedJoinOp.add(op); } return null; } @@ -112,8 +117,7 @@ private static void splitTask(SparkTask currentTask, ReduceWork reduceWork, SparkWork newWork = new SparkWork(parseContext.getConf().getVar(HiveConf.ConfVars.HIVEQUERYID)); newWork.add(childWork); - copyWorkGraph(currentWork, newWork, childWork, true); - copyWorkGraph(currentWork, newWork, childWork, false); + copyWorkGraph(currentWork, newWork, childWork); // remove them from current spark work for (BaseWork baseWork : newWork.getAllWorkUnsorted()) { currentWork.remove(baseWork); @@ -196,22 +200,26 @@ private static boolean canSplit(SparkWork sparkWork) { /** * Copy a sub-graph from originWork to newWork. */ - private static void copyWorkGraph(SparkWork originWork, SparkWork newWork, - BaseWork baseWork, boolean upWards) { - if (upWards) { - for (BaseWork parent : originWork.getParents(baseWork)) { - newWork.add(parent); - SparkEdgeProperty edgeProperty = originWork.getEdgeProperty(parent, baseWork); - newWork.connect(parent, baseWork, edgeProperty); - copyWorkGraph(originWork, newWork, parent, true); - } - } else { - for (BaseWork child : originWork.getChildren(baseWork)) { + private static void copyWorkGraph(SparkWork originWork, SparkWork newWork, BaseWork baseWork) { + for (BaseWork child : originWork.getChildren(baseWork)) { + if (!newWork.contains(child)) { newWork.add(child); SparkEdgeProperty edgeProperty = originWork.getEdgeProperty(baseWork, child); newWork.connect(baseWork, child, edgeProperty); - copyWorkGraph(originWork, newWork, child, false); + copyWorkGraph(originWork, newWork, child); + } + } + for (BaseWork parent : originWork.getParents(baseWork)) { + if (!newWork.contains(parent)) { + newWork.add(parent); + SparkEdgeProperty edgeProperty = originWork.getEdgeProperty(parent, baseWork); + newWork.connect(parent, baseWork, edgeProperty); + copyWorkGraph(originWork, newWork, parent); } } } + + public static Set getVisitedJoinOp() { + return visitedJoinOp; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java index 984380d..608a0de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.PreOrderWalker; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; @@ -53,8 +54,10 @@ public class SparkSkewJoinResolver implements PhysicalPlanResolver { @Override public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + SparkSkewJoinProcFactory.getVisitedJoinOp().clear(); Dispatcher disp = new SparkSkewJoinTaskDispatcher(pctx); - GraphWalker ogw = new DefaultGraphWalker(disp); + // since we may split current task, use a pre-order walker + GraphWalker ogw = new PreOrderWalker(disp); ArrayList topNodes = new ArrayList(); topNodes.addAll(pctx.getRootTasks()); ogw.startWalking(topNodes, null); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java index a7c896d..8500b21 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java @@ -236,6 +236,7 @@ public void remove(BaseWork work) { List parents = getParents(work); for (BaseWork w: children) { + edgeProperties.remove(new ImmutablePair(work, w)); invertedWorkGraph.get(w).remove(work); if (invertedWorkGraph.get(w).size() == 0) { roots.add(w); @@ -243,6 +244,7 @@ public void remove(BaseWork work) { } for (BaseWork w: parents) { + edgeProperties.remove(new ImmutablePair(w, work)); workGraph.get(w).remove(work); if (workGraph.get(w).size() == 0) { leaves.add(w);