diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java index ec192a0..ea2ecb4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java @@ -30,6 +30,9 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.slf4j.Logger; @@ -57,7 +60,7 @@ */ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver { protected static transient Logger LOG = LoggerFactory.getLogger(CombineEquivalentWorkResolver.class); - + private List removedMapWorkNames = new ArrayList(); @Override public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { List topNodes = new ArrayList(); @@ -83,6 +86,7 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SparkWork sparkWork = sparkTask.getWork(); Set roots = sparkWork.getRoots(); compareWorksRecursively(roots, sparkWork); + removeDynamicPartitionPruningSinkBranch(removedMapWorkNames, sparkWork); } return null; } @@ -171,6 +175,10 @@ private void replaceWork(BaseWork previous, BaseWork current, SparkWork sparkWor } } sparkWork.remove(previous); + //In order to fix HIVE-16948 + if (previous instanceof MapWork) { + removedMapWorkNames.add(previous.getName()); + } } /* @@ -313,5 +321,25 @@ private boolean compareCurrentOperator(Operator firstOperator, Operator se OperatorComparatorFactory.getOperatorComparator(firstOperator.getClass()); return operatorComparator.equals(firstOperator, secondOperator); } + + private void removeDynamicPartitionPruningSinkBranch(List removedMapWorkName, SparkWork sparkWork) { + List children = sparkWork.getAllWork(); + for (BaseWork child : children) { + Set> rootOperators = child.getAllRootOperators(); + for (Operator root : rootOperators) { + List> pruningList = new ArrayList<>(); + SparkUtilities.collectOp(pruningList, root, SparkPartitionPruningSinkOperator.class); + for (Operator pruneSinkOp : pruningList) { + SparkPartitionPruningSinkOperator sparkPruneSinkOp = (SparkPartitionPruningSinkOperator) pruneSinkOp; + if (removedMapWorkName.contains(sparkPruneSinkOp.getConf().getTargetWork())) { + LOG.debug("ready to remove the sparkPruneSinkOp which target work is " + + sparkPruneSinkOp.getConf().getTargetWork()+ " because the MapWork is equals to other map work and " + + "has been deleted!"); + OperatorUtils.removeBranch(pruneSinkOp); + } + } + } + } + } } } diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out index d476172..74d9dca 100644 --- a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out +++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out @@ -3701,20 +3701,6 @@ STAGE PLANS: Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE target column name: ds target work: Map 1 - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - Group By Operator - keys: _col0 (type: string) - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - Spark Partition Pruning Sink Operator - partition key expr: ds - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - target column name: ds - target work: Map 4 Reducer 13 Reduce Operator Tree: Group By Operator @@ -3744,20 +3730,6 @@ STAGE PLANS: Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE target column name: ds target work: Map 1 - Select Operator - expressions: _col0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - Group By Operator - keys: _col0 (type: string) - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - Spark Partition Pruning Sink Operator - partition key expr: ds - Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE - target column name: ds - target work: Map 4 Stage: Stage-1 Spark