commit 72610aa7353795a918f5cad417f9e03fd823c55d Author: kellyzly Date: Fri Aug 18 11:38:58 2017 +0800 HIVE-16948.7.patch diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index be0795d..7308b5f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.hadoop.hive.ql.exec.NodeUtils.Function; +import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.mapred.OutputCollector; @@ -367,7 +368,7 @@ public static void findRoots(Operator op, Collection> roots) { * Remove the branch that contains the specified operator. Do nothing if there's no branching, * i.e. all the upstream operators have only one child. */ - public static void removeBranch(Operator op) { + public static void removeBranch(SparkPartitionPruningSinkOperator op) { Operator child = op; Operator curr = op; @@ -409,4 +410,18 @@ public static String getOpNamePretty(Operator op) { } return op.toString(); } + + /** + * Return true if contain branch otherwise return false + */ + public static boolean isInBranch(SparkPartitionPruningSinkOperator op) { + Operator curr = op; + while (curr.getChildOperators().size() <= 1) { + if (curr.getParentOperators() == null || curr.getParentOperators().isEmpty()) { + return false; + } + curr = curr.getParentOperators().get(0); + } + return true; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index eb9883a..fac3cea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Collection; import com.google.common.base.Preconditions; @@ -30,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager; @@ -207,4 +209,22 @@ public static void collectOp(Collection> result, Operator root, C collectOp(result, child, clazz); } } + + /** + * remove currTask from the children of its parentTask + * remove currTask from the parent of its childrenTask + * @param currTask + */ + public static void removeEmptySparkTask(SparkTask currTask) { + //remove currTask from parentTasks + ArrayList parTasks = new ArrayList(); + parTasks.addAll(currTask.getParentTasks()); + + Object[] parTaskArr = parTasks.toArray(); + for (Object parTask : parTaskArr) { + ((Task) parTask).removeDependentTask(currTask); + } + //remove currTask from childTasks + currTask.removeFromChildrenTasks(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java index 95ad962..2641c1a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer.spark; +import java.io.Serializable; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; @@ -30,6 +31,10 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.slf4j.Logger; @@ -56,9 +61,11 @@ */ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver { protected static transient Logger LOG = LoggerFactory.getLogger(CombineEquivalentWorkResolver.class); - + private List removedMapWorkNames = new ArrayList(); + private PhysicalContext pctx; @Override public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + this.pctx = pctx; List topNodes = new ArrayList(); topNodes.addAll(pctx.getRootTasks()); TaskGraphWalker taskWalker = new TaskGraphWalker(new EquivalentWorkMatcher()); @@ -82,6 +89,17 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SparkWork sparkWork = sparkTask.getWork(); Set roots = sparkWork.getRoots(); compareWorksRecursively(roots, sparkWork); + // For dpp case, dpp sink will appear in Task1 and the target work of dpp sink will appear in Task2. + // Task2 is the child task of Task1. Task2 will be traversed before task1 because TaskGraphWalker will first + // put children task in the front of task queue. + // If a spark work which is equal to other is found and removed in Task2, the dpp sink can be removed when Task1 + // is traversed(More detailed see HIVE-16948) + if (removedMapWorkNames.size() > 0) { + removeDynamicPartitionPruningSink(removedMapWorkNames, sparkWork); + if (sparkWork.getAllWork().size() == 0) { + removeEmptySparkTask(sparkTask); + } + } } return null; } @@ -170,6 +188,10 @@ private void replaceWork(BaseWork previous, BaseWork current, SparkWork sparkWor } } sparkWork.remove(previous); + // In order to fix HIVE-16948 + if (previous instanceof MapWork) { + removedMapWorkNames.add(previous.getName()); + } } /* @@ -306,5 +328,55 @@ private boolean compareOperatorChain(Operator firstOperator, Operator seco private boolean compareCurrentOperator(Operator firstOperator, Operator secondOperator) { return firstOperator.logicalEquals(secondOperator); } + + /** + * traverse the children in sparkWork to find the dpp sink operator which target work is included in + * removedMapWorkList + * If there is branch, remove prune sink operator branch in the BaseWork + * If there is no branch, remove the whole BaseWork + * + * @param removedMapWorkList: the name of the map work has been deleted because they are equals to other works. + * @param sparkWork: current spark work + */ + private void removeDynamicPartitionPruningSink(List removedMapWorkList, SparkWork sparkWork) { + List allWorks = sparkWork.getAllWork(); + for (BaseWork baseWork : allWorks) { + Set> rootOperators = baseWork.getAllRootOperators(); + for (Operator root : rootOperators) { + List> pruningList = new ArrayList<>(); + SparkUtilities.collectOp(pruningList, root, SparkPartitionPruningSinkOperator.class); + for (Operator pruneSinkOp : pruningList) { + SparkPartitionPruningSinkOperator sparkPruneSinkOp = (SparkPartitionPruningSinkOperator) pruneSinkOp; + if (removedMapWorkList.contains(sparkPruneSinkOp.getConf().getTargetWork())) { + LOG.debug("ready to remove the sparkPruneSinkOp which target work is " + + sparkPruneSinkOp.getConf().getTargetWork() + " because the MapWork is equals to other map work and " + + "has been deleted!"); + // If there is branch, remove prune sink operator branch in the baseWork + // If there is no branch, remove the whole baseWork + if (OperatorUtils.isInBranch(sparkPruneSinkOp)) { + OperatorUtils.removeBranch(sparkPruneSinkOp); + } else { + sparkWork.remove(baseWork); + } + } + } + } + } + } + + private void removeEmptySparkTask(SparkTask currTask) { + // If currTask is rootTasks, remove it and add its children to the rootTasks which currTask is its only parent + // task + if (pctx.getRootTasks().contains(currTask)) { + pctx.removeFromRootTask(currTask); + List> newRoots = currTask.getChildTasks(); + for (Task newRoot : newRoots) { + if (newRoot.getParentTasks().size() == 1) { + pctx.addToRootTask(newRoot); + } + } + } + SparkUtilities.removeEmptySparkTask(currTask); + } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java index 4c7ec76..44931f0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java @@ -22,9 +22,16 @@ import static org.mockito.Mockito.verify; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -43,4 +50,46 @@ public void sparkTask_updates_Metrics() throws IOException { verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_MR_TASKS); } + @Test + public void removeEmptySparkTask() { + SparkTask grandpa = new SparkTask(); + SparkWork grandpaWork = new SparkWork("grandpa"); + grandpaWork.add(new MapWork()); + grandpa.setWork(grandpaWork); + + SparkTask parent = new SparkTask(); + SparkWork parentWork = new SparkWork("parent"); + parentWork.add(new MapWork()); + parent.setWork(parentWork); + + SparkTask child1 = new SparkTask(); + SparkWork childWork1 = new SparkWork("child1"); + childWork1.add(new MapWork()); + child1.setWork(childWork1); + + + grandpa.addDependentTask(parent); + parent.addDependentTask(child1); + + Assert.assertEquals(grandpa.getChildTasks().size(), 1); + Assert.assertEquals(child1.getParentTasks().size(), 1); + if (isEmptySparkWork(parent.getWork())) { + SparkUtilities.removeEmptySparkTask(parent); + } + + Assert.assertEquals(grandpa.getChildTasks().size(), 0); + Assert.assertEquals(child1.getParentTasks().size(), 0); + } + + private boolean isEmptySparkWork(SparkWork sparkWork) { + List allWorks = sparkWork.getAllWork(); + boolean allWorksIsEmtpy = true; + for (BaseWork work : allWorks) { + if (work.getAllOperators().size() > 0) { + allWorksIsEmtpy = false; + break; + } + } + return allWorksIsEmtpy; + } } diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out index d9da762..41c6c10 100644 --- a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out +++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out @@ -4228,20 +4228,6 @@ STAGE PLANS: partition key expr: ds Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE target work: Map 1 - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - Group By Operator - keys: _col0 (type: string) - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - Spark Partition Pruning Sink Operator - Target column: ds (string) - partition key expr: ds - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - target work: Map 4 Reducer 13 Reduce Operator Tree: Group By Operator @@ -4271,20 +4257,6 @@ STAGE PLANS: partition key expr: ds Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE target work: Map 1 - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - Group By Operator - keys: _col0 (type: string) - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - Spark Partition Pruning Sink Operator - Target column: ds (string) - partition key expr: ds - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - target work: Map 4 Stage: Stage-1 Spark