diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java index 41e9ba6..1337cc9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.optimizer.spark; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@ -29,6 +30,10 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; @@ -216,6 +221,11 @@ private boolean compareWork(BaseWork first, BaseWork second, SparkWork sparkWork return false; } + // need to check paths and partition desc for MapWorks + if (first instanceof MapWork && !compareMapWork((MapWork) first, (MapWork) second)) { + return false; + } + Iterator> firstIterator = firstRootOperators.iterator(); Iterator> secondIterator = secondRootOperators.iterator(); while (firstIterator.hasNext()) { @@ -228,6 +238,50 @@ private boolean compareWork(BaseWork first, BaseWork second, SparkWork sparkWork return true; } + private boolean compareMapWork(MapWork first, MapWork second) { + Map pathToPartition1 = first.getPathToPartitionInfo(); + Map pathToPartition2 = second.getPathToPartitionInfo(); + if (pathToPartition1.size() == pathToPartition2.size()) { + for (Map.Entry entry : pathToPartition1.entrySet()) { + Path path1 = entry.getKey(); + PartitionDesc partitionDesc1 = entry.getValue(); + PartitionDesc partitionDesc2 = pathToPartition2.get(path1); + if (partitionDesc2 == null || !samePartitionDesc(partitionDesc1, partitionDesc2)) { + return false; + } + } + return true; + } + return false; + } + + private boolean samePartitionDesc(PartitionDesc first, PartitionDesc second) { + boolean ret = first.getTableDesc().equals(second.getTableDesc()); + if (ret) { + Map partSpec1 = first.getPartSpec(); + Map partSpec2 = second.getPartSpec(); + ret = (partSpec1 == null && partSpec2 == null) || + (partSpec1 != null && partSpec2 != null && partSpec1.equals(partSpec2)); + if (ret) { + VectorPartitionDesc vectorPartitionDesc1 = first.getVectorPartitionDesc(); + VectorPartitionDesc vectorPartitionDesc2 = second.getVectorPartitionDesc(); + ret = (vectorPartitionDesc1 == null && vectorPartitionDesc2 == null) || + (vectorPartitionDesc1 != null && vectorPartitionDesc2 != null && + sameVecPartitionDesc(vectorPartitionDesc1, vectorPartitionDesc2)); + } + } + return ret; + } + + private boolean sameVecPartitionDesc(VectorPartitionDesc first, VectorPartitionDesc second) { + return first.getInputFileFormatClassName().equals(second.getInputFileFormatClassName()) && + first.getRowDeserializerClassName().equals(second.getRowDeserializerClassName()) && + first.getVectorDeserializeType() == second.getVectorDeserializeType() && + first.getVectorMapOperatorReadType() == second.getVectorMapOperatorReadType() && + first.getIsInputFileFormatSelfDescribing() == second.getIsInputFileFormatSelfDescribing() && + Arrays.equals(first.getDataTypeInfos(), second.getDataTypeInfos()); + } + private boolean hasSameParent(BaseWork first, BaseWork second, SparkWork sparkWork) { boolean result = true; List firstParents = sparkWork.getParents(first);