diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java index 988579e..259507a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.optimizer.spark; -import java.io.Serializable; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; @@ -28,14 +27,16 @@ import java.util.Set; import java.util.Stack; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.OperatorUtils; -import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.PreOrderWalker; import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -46,9 +47,9 @@ import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.Node; -import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalPlanResolver; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc.DPPTargetInfo; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; @@ -63,14 +64,13 @@ */ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver { protected static transient Logger LOG = LoggerFactory.getLogger(CombineEquivalentWorkResolver.class); - private List removedMapWorkNames = new ArrayList(); private PhysicalContext pctx; @Override public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { this.pctx = pctx; List topNodes = new ArrayList(); topNodes.addAll(pctx.getRootTasks()); - TaskGraphWalker taskWalker = new TaskGraphWalker(new EquivalentWorkMatcher()); + GraphWalker taskWalker = new PreOrderWalker(new EquivalentWorkMatcher()); HashMap nodeOutput = Maps.newHashMap(); taskWalker.startWalking(topNodes, nodeOutput); return pctx; @@ -84,25 +84,18 @@ public int compare(BaseWork o1, BaseWork o2) { } }; - // maps from a work to the DPPs it contains + // maps from a work to the DPPs it contains -- used to combine equivalent DPP sinks private Map> workToDpps = new HashMap<>(); + // Maps from work ID to the DPP sinks it contains -- used to update the DPP sinks when + // target map works are removed + private Map> idToDpps = new HashMap<>(); + @Override public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SemanticException { if (nd instanceof SparkTask) { SparkTask sparkTask = (SparkTask) nd; SparkWork sparkWork = sparkTask.getWork(); - // For dpp case, dpp sink will appear in Task1 and the target work of dpp sink will appear in Task2. - // Task2 is the child task of Task1. Task2 will be traversed before task1 because TaskGraphWalker will first - // put children task in the front of task queue. - // If a spark work which is equal to other is found and removed in Task2, the dpp sink can be removed when Task1 - // is traversed(More detailed see HIVE-16948) - if (removedMapWorkNames.size() > 0) { - removeDynamicPartitionPruningSink(removedMapWorkNames, sparkWork); - if (sparkWork.getAllWork().size() == 0) { - removeEmptySparkTask(sparkTask); - } - } Set roots = sparkWork.getRoots(); compareWorksRecursively(roots, sparkWork); @@ -129,7 +122,28 @@ private void compareWorksRecursively(Set works, SparkWork sparkWork) { } } + private void collectDPPInfos(Set works) { + for (BaseWork work : works) { + String id = SparkUtilities.getWorkId(work); + if (!idToDpps.containsKey(id)) { + Set> roots = work.getAllRootOperators(); + for (Operator root : roots) { + List> sinks = new ArrayList<>(); + SparkUtilities.collectOp(sinks, root, SparkPartitionPruningSinkOperator.class); + if (!sinks.isEmpty()) { + List list = idToDpps.computeIfAbsent( + id, v -> new ArrayList<>()); + for (Operator sink : sinks) { + list.add((SparkPartitionPruningSinkOperator) sink); + } + } + } + } + } + } + private Set> compareChildWorks(Set children, SparkWork sparkWork) { + collectDPPInfos(children); Set> equivalentChildren = Sets.newHashSet(); if (children.size() > 1) { for (BaseWork work : children) { @@ -219,16 +233,17 @@ private void combineEquivalentDPPSinks(SparkPartitionPruningSinkDesc first, String firstId = SparkUtilities.getWorkId(first); while (iterator.hasNext()) { BaseWork next = iterator.next(); + String secondId = SparkUtilities.getWorkId(next); if (dppList1 != null) { List dppList2 = workToDpps.get(next); // equivalent works must have dpp lists of same size for (int i = 0; i < dppList1.size(); i++) { - combineEquivalentDPPSinks(dppList1.get(i), dppList2.get(i), - firstId, SparkUtilities.getWorkId(next)); + combineEquivalentDPPSinks(dppList1.get(i), dppList2.get(i), firstId, secondId); } } replaceWork(next, first, sparkWork); removedWorks.add(next); + idToDpps.remove(secondId); } } } @@ -255,7 +270,31 @@ private void replaceWork(BaseWork previous, BaseWork current, SparkWork sparkWor sparkWork.remove(previous); // In order to fix HIVE-16948 if (previous instanceof MapWork) { - removedMapWorkNames.add(previous.getName()); + removeTargetFromDPP((MapWork) previous); + } + } + + // Remove the map work from DPP sinks that have it as a target + private void removeTargetFromDPP(MapWork target) { + Set dppIds = target.getEventSourceColumnNameMap().keySet(); + for (String dppId : dppIds) { + List sinks = idToDpps.get(dppId); + if (sinks != null) { + for (SparkPartitionPruningSinkOperator sink : sinks) { + List targetInfos = sink.getConf().getTargetInfos(); + List toRemove = new ArrayList<>(targetInfos.size()); + for (DPPTargetInfo targetInfo : targetInfos) { + if (targetInfo.work == target) { + toRemove.add(targetInfo); + } + } + targetInfos.removeAll(toRemove); + // If the target can be removed, it means there's another MapWork that shares the same + // DPP sink, and therefore it cannot be the only target. + Preconditions.checkState(!targetInfos.isEmpty(), + "The removed target work is the only target."); + } + } } } @@ -331,6 +370,10 @@ private boolean compareWork(BaseWork first, BaseWork second, SparkWork sparkWork } private boolean compareMapWork(MapWork first, MapWork second) { + return hasSamePathToPartition(first, second) && targetsOfSameDPPSink(first, second); + } + + private boolean hasSamePathToPartition(MapWork first, MapWork second) { Map pathToPartition1 = first.getPathToPartitionInfo(); Map pathToPartition2 = second.getPathToPartitionInfo(); if (pathToPartition1.size() == pathToPartition2.size()) { @@ -347,6 +390,65 @@ private boolean compareMapWork(MapWork first, MapWork second) { return false; } + // Checks whether two MapWorks will be pruned by the same DPP sink + private boolean targetsOfSameDPPSink(MapWork first, MapWork second) { + Set sources1 = first.getEventSourceColumnNameMap().keySet(); + Set sources2 = second.getEventSourceColumnNameMap().keySet(); + if (!sources1.equals(sources2)) { + return false; + } + // make sure they share same DPP sink + for (String source : sources1) { + List sinks = idToDpps.get(source); + for (SparkPartitionPruningSinkOperator sink : sinks) { + List targetInfos = sink.getConf().getTargetInfos(); + int foundFirst = 0; + int foundSecond = 0; + for (DPPTargetInfo targetInfo : targetInfos) { + if (targetInfo.work == first) { + foundFirst++; + } + if (targetInfo.work == second) { + foundSecond++; + } + } + if (foundFirst != foundSecond) { + return false; + } + } + } + // make sure they have same target columns + for (String source : sources1) { + List names1 = first.getEventSourceColumnNameMap().get(source); + List names2 = second.getEventSourceColumnNameMap().get(source); + if (names2.size() != names1.size()) { + return false; + } + for (int i = 0; i < names1.size(); i++) { + if (!equalsIgnoreId(names1.get(i), names2.get(i))) { + return false; + } + } + } + if (!first.getEventSourceColumnTypeMap().equals(second.getEventSourceColumnTypeMap()) || + !first.getEventSourceTableDescMap().equals(second.getEventSourceTableDescMap())) { + return false; + } + for (String source : first.getEventSourcePartKeyExprMap().keySet()) { + List descs1 = first.getEventSourcePartKeyExprMap().get(source); + List descs2 = second.getEventSourcePartKeyExprMap().get(source); + if (!ExprNodeDescUtils.isSame(descs1, descs2)) { + return false; + } + } + return true; + } + + private boolean equalsIgnoreId(String colName1, String colName2) { + return colName1.substring(colName1.indexOf(":") + 1).equals( + colName2.substring(colName2.indexOf(":") + 1)); + } + private boolean hasSameParent(BaseWork first, BaseWork second, SparkWork sparkWork) { boolean result = true; List firstParents = sparkWork.getParents(first); @@ -409,55 +511,5 @@ private boolean compareOperatorChain(Operator firstOperator, Operator seco private boolean compareCurrentOperator(Operator firstOperator, Operator secondOperator) { return firstOperator.logicalEquals(secondOperator); } - - /** - * traverse the children in sparkWork to find the dpp sink operator which target work is included in - * removedMapWorkList - * If there is branch, remove prune sink operator branch in the BaseWork - * If there is no branch, remove the whole BaseWork - * - * @param removedMapWorkList: the name of the map work has been deleted because they are equals to other works. - * @param sparkWork: current spark work - */ - private void removeDynamicPartitionPruningSink(List removedMapWorkList, SparkWork sparkWork) { - List allWorks = sparkWork.getAllWork(); - for (BaseWork baseWork : allWorks) { - Set> rootOperators = baseWork.getAllRootOperators(); - for (Operator root : rootOperators) { - List> pruningList = new ArrayList<>(); - SparkUtilities.collectOp(pruningList, root, SparkPartitionPruningSinkOperator.class); - for (Operator pruneSinkOp : pruningList) { - SparkPartitionPruningSinkOperator sparkPruneSinkOp = (SparkPartitionPruningSinkOperator) pruneSinkOp; - if (removedMapWorkList.contains(sparkPruneSinkOp.getConf().getTargetMapWork().getName())) { - LOG.debug("ready to remove the sparkPruneSinkOp which target work is " + - sparkPruneSinkOp.getConf().getTargetWorks() + " because the MapWork is equals to other map work and " + - "has been deleted!"); - // If there is branch, remove prune sink operator branch in the baseWork - // If there is no branch, remove the whole baseWork - if (OperatorUtils.isInBranch(sparkPruneSinkOp)) { - OperatorUtils.removeBranch(sparkPruneSinkOp); - } else { - sparkWork.remove(baseWork); - } - } - } - } - } - } - - private void removeEmptySparkTask(SparkTask currTask) { - // If currTask is rootTasks, remove it and add its children to the rootTasks which currTask is its only parent - // task - if (pctx.getRootTasks().contains(currTask)) { - pctx.removeFromRootTask(currTask); - List> newRoots = currTask.getChildTasks(); - for (Task newRoot : newRoots) { - if (newRoot.getParentTasks().size() == 1) { - pctx.addToRootTask(newRoot); - } - } - } - SparkUtilities.removeEmptySparkTask(currTask); - } } }