commit 57438e156c09af078dcf3f73591cb45205987635 Author: kellyzly Date: Tue Aug 8 03:57:11 2017 -0400 HIVE-16948.5.patch diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index be0795d..7308b5f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.hadoop.hive.ql.exec.NodeUtils.Function; +import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.mapred.OutputCollector; @@ -367,7 +368,7 @@ public static void findRoots(Operator op, Collection> roots) { * Remove the branch that contains the specified operator. Do nothing if there's no branching, * i.e. all the upstream operators have only one child. */ - public static void removeBranch(Operator op) { + public static void removeBranch(SparkPartitionPruningSinkOperator op) { Operator child = op; Operator curr = op; @@ -409,4 +410,18 @@ public static String getOpNamePretty(Operator op) { } return op.toString(); } + + /** + * Return true if contain branch otherwise return false + */ + public static boolean isInBranch(SparkPartitionPruningSinkOperator op) { + Operator curr = op; + while (curr.getChildOperators().size() <= 1) { + if (curr.getParentOperators() == null || curr.getParentOperators().isEmpty()) { + return false; + } + curr = curr.getParentOperators().get(0); + } + return true; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java index ec192a0..1fab55d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer.spark; +import java.io.Serializable; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; @@ -30,6 +31,10 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.slf4j.Logger; @@ -57,9 +62,11 @@ */ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver { protected static transient Logger LOG = LoggerFactory.getLogger(CombineEquivalentWorkResolver.class); - + private List removedMapWorkNames = new ArrayList(); + private PhysicalContext pctx; @Override public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + this.pctx = pctx; List topNodes = new ArrayList(); topNodes.addAll(pctx.getRootTasks()); TaskGraphWalker taskWalker = new TaskGraphWalker(new EquivalentWorkMatcher()); @@ -83,6 +90,15 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SparkWork sparkWork = sparkTask.getWork(); Set roots = sparkWork.getRoots(); compareWorksRecursively(roots, sparkWork); + // For dpp case, dpp sink will appear in Task1 and the target work of dpp sink will appear in Task2. + // Task2 is the child task of Task1. Task2 will be traversed before task1 because TaskGraphWalker will first + // put children task in the front of task queue. + // If a spark work which is equal to other is found and removed in Task2, the dpp sink can be removed when Task1 + // is traversed(More detailed see HIVE-16948) + removeDynamicPartitionPruningSink(removedMapWorkNames, sparkWork); + if (isEmptySparkWork(sparkWork)) { + removeEmptySparkTask(sparkTask); + } } return null; } @@ -171,6 +187,10 @@ private void replaceWork(BaseWork previous, BaseWork current, SparkWork sparkWor } } sparkWork.remove(previous); + // In order to fix HIVE-16948 + if (previous instanceof MapWork) { + removedMapWorkNames.add(previous.getName()); + } } /* @@ -313,5 +333,71 @@ private boolean compareCurrentOperator(Operator firstOperator, Operator se OperatorComparatorFactory.getOperatorComparator(firstOperator.getClass()); return operatorComparator.equals(firstOperator, secondOperator); } + + /** + * traverse the children in sparkWork to find the dpp sink operator which target work is included in + * removedMapWorkList + * If there is branch, remove prune sink operator branch in the BaseWork + * If there is no branch, remove the whole BaseWork + * + * @param removedMapWorkList: the name of the map work has been deleted because they are equals to other works. + * @param sparkWork: current spark work + */ + private void removeDynamicPartitionPruningSink(List removedMapWorkList, SparkWork sparkWork) { + List allWorks = sparkWork.getAllWork(); + for (BaseWork baseWork : allWorks) { + Set> rootOperators = baseWork.getAllRootOperators(); + for (Operator root : rootOperators) { + List> pruningList = new ArrayList<>(); + SparkUtilities.collectOp(pruningList, root, SparkPartitionPruningSinkOperator.class); + for (Operator pruneSinkOp : pruningList) { + SparkPartitionPruningSinkOperator sparkPruneSinkOp = (SparkPartitionPruningSinkOperator) pruneSinkOp; + if (removedMapWorkList.contains(sparkPruneSinkOp.getConf().getTargetWork())) { + LOG.debug("ready to remove the sparkPruneSinkOp which target work is " + + sparkPruneSinkOp.getConf().getTargetWork() + " because the MapWork is equals to other map work and " + + "has been deleted!"); + // If there is branch, remove prune sink operator branch in the baseWork + // If there is no branch, remove the whole baseWork + if (OperatorUtils.isInBranch(sparkPruneSinkOp)) { + OperatorUtils.removeBranch(sparkPruneSinkOp); + } else { + sparkWork.remove(baseWork); + } + } + } + } + } + } + + private boolean isEmptySparkWork(SparkWork sparkWork) { + List allWorks = sparkWork.getAllWork(); + boolean allWorksIsEmtpy = true; + for (BaseWork work : allWorks) { + if (work.getAllOperators().size() > 0) { + allWorksIsEmtpy = false; + break; + } + } + return allWorksIsEmtpy; + } + + private void removeEmptySparkTask(SparkTask currTask) { + // modify the rootTasks of physicalContext + if (pctx.getRootTasks().contains(currTask)) { + List> newRoots = currTask.getChildTasks(); + pctx.removeFromRootTask(currTask); + for (Task newRoot : newRoots) { + pctx.addToRootTask(newRoot); + } + } + //remove currTask from parentTasks + if (currTask.getParentTasks() != null && !currTask.getParentTasks().isEmpty()) { + for (Task parTask : currTask.getParentTasks()) { + parTask.removeDependentTask(currTask); + } + } + //remove currTask from childTasks + currTask.removeFromChildrenTasks(); + } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java index 4c7ec76..a67832a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java @@ -22,9 +22,16 @@ import static org.mockito.Mockito.verify; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -43,4 +50,59 @@ public void sparkTask_updates_Metrics() throws IOException { verify(mockMetrics, never()).incrementCounter(MetricsConstant.HIVE_MR_TASKS); } + @Test + public void removeEmptySparkTask() { + SparkTask grandpa = new SparkTask(); + SparkWork grandpaWork = new SparkWork("grandpa"); + grandpaWork.add(new MapWork()); + grandpa.setWork(grandpaWork); + + SparkTask parent = new SparkTask(); + SparkWork parentWork = new SparkWork("parent"); + parentWork.add(new MapWork()); + parent.setWork(parentWork); + + SparkTask child1 = new SparkTask(); + SparkWork childWork1 = new SparkWork("child1"); + childWork1.add(new MapWork()); + child1.setWork(childWork1); + + + grandpa.addDependentTask(parent); + parent.addDependentTask(child1); + + Assert.assertEquals(grandpa.getChildTasks().size(), 1); + Assert.assertEquals(child1.getParentTasks().size(), 1); + if (isEmptySparkWork(parent.getWork())) { + removeEmptySparkTask(parent); + } + + Assert.assertEquals(grandpa.getChildTasks().size(), 0); + Assert.assertEquals(child1.getParentTasks().size(), 0); + } + + private boolean isEmptySparkWork(SparkWork sparkWork) { + List allWorks = sparkWork.getAllWork(); + boolean allWorksIsEmtpy = true; + for (BaseWork work : allWorks) { + if (work.getAllOperators().size() > 0) { + allWorksIsEmtpy = false; + break; + } + } + return allWorksIsEmtpy; + } + + private void removeEmptySparkTask(SparkTask currTask) { + //remove currTask from parentTasks + ArrayList parTasks = new ArrayList(); + parTasks.addAll(currTask.getParentTasks()); + + Object[] parTaskArr = parTasks.toArray(); + for (Object parTask : parTaskArr) { + ((Task) parTask).removeDependentTask(currTask); + } + //remove currTask from childTasks + currTask.removeFromChildrenTasks(); + } } diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out index e743af1..ece46b7 100644 --- a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out +++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out @@ -3700,20 +3700,6 @@ STAGE PLANS: partition key expr: ds Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE target column name: ds - target work: Map 4 - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - Group By Operator - keys: _col0 (type: string) - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - Spark Partition Pruning Sink Operator - partition key expr: ds - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - target column name: ds target work: Map 1 Reducer 13 Reduce Operator Tree: @@ -3743,20 +3729,6 @@ STAGE PLANS: partition key expr: ds Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE target column name: ds - target work: Map 4 - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - Group By Operator - keys: _col0 (type: string) - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - Spark Partition Pruning Sink Operator - partition key expr: ds - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - target column name: ds target work: Map 1 Stage: Stage-1