diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index e4910e4..c9dcc94 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1466,7 +1466,8 @@ spark.query.files=add_part_multiple.q, \ # Unlike "spark.query.files" above, these tests only run # under Spark engine. -spark.only.query.files=spark_dynamic_partition_pruning.q,\ +spark.only.query.files=spark_combine_equivalent_work.q,\ + spark_dynamic_partition_pruning.q,\ spark_dynamic_partition_pruning_2.q,\ spark_vectorized_dynamic_partition_pruning.q diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java index 41e9ba6..0b0a60d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.optimizer.spark; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@ -27,8 +28,13 @@ import java.util.Set; import java.util.Stack; +import com.google.common.base.Strings; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; @@ -216,6 +222,11 @@ private boolean compareWork(BaseWork first, BaseWork second, SparkWork sparkWork return false; } + // need to check paths and partition desc for MapWorks + if (first instanceof MapWork && !compareMapWork((MapWork) first, (MapWork) second)) { + return false; + } + Iterator> firstIterator = firstRootOperators.iterator(); Iterator> secondIterator = secondRootOperators.iterator(); while (firstIterator.hasNext()) { @@ -228,6 +239,53 @@ private boolean compareWork(BaseWork first, BaseWork second, SparkWork sparkWork return true; } + private boolean compareMapWork(MapWork first, MapWork second) { + Map pathToPartition1 = first.getPathToPartitionInfo(); + Map pathToPartition2 = second.getPathToPartitionInfo(); + if (pathToPartition1.size() == pathToPartition2.size()) { + for (Map.Entry entry : pathToPartition1.entrySet()) { + Path path1 = entry.getKey(); + PartitionDesc partitionDesc1 = entry.getValue(); + PartitionDesc partitionDesc2 = pathToPartition2.get(path1); + if (partitionDesc2 == null || !samePartitionDesc(partitionDesc1, partitionDesc2)) { + return false; + } + } + return true; + } + return false; + } + + private boolean samePartitionDesc(PartitionDesc first, PartitionDesc second) { + boolean ret = first.getTableDesc().equals(second.getTableDesc()); + if (ret) { + Map partSpec1 = first.getPartSpec(); + Map partSpec2 = second.getPartSpec(); + ret = (partSpec1 == null && partSpec2 == null) || + (partSpec1 != null && partSpec2 != null && partSpec1.equals(partSpec2)); + if (ret) { + ret = sameVecPartitionDesc(first.getVectorPartitionDesc(), + second.getVectorPartitionDesc()); + } + } + return ret; + } + + private boolean sameVecPartitionDesc(VectorPartitionDesc first, VectorPartitionDesc second) { + if (first == null && second == null) { + return true; + } + return first != null && second != null && Strings.nullToEmpty( + first.getInputFileFormatClassName()).equals(Strings.nullToEmpty( + second.getInputFileFormatClassName())) && + Strings.nullToEmpty(first.getRowDeserializerClassName()).equals( + Strings.nullToEmpty(second.getRowDeserializerClassName())) && + first.getVectorDeserializeType() == second.getVectorDeserializeType() && + first.getVectorMapOperatorReadType() == second.getVectorMapOperatorReadType() && + first.getIsInputFileFormatSelfDescribing() == second.getIsInputFileFormatSelfDescribing() && + Arrays.equals(first.getDataTypeInfos(), second.getDataTypeInfos()); + } + private boolean hasSameParent(BaseWork first, BaseWork second, SparkWork sparkWork) { boolean result = true; List firstParents = sparkWork.getParents(first); diff --git a/ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q b/ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q new file mode 100644 index 0000000..f85d8ee --- /dev/null +++ b/ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q @@ -0,0 +1,33 @@ +set hive.vectorized.execution.enabled = false; + +create table a1(KEHHAO string, START_DT string) partitioned by (END_DT string); +create table a2(KEHHAO string, START_DT string) partitioned by (END_DT string); + +alter table a1 add partition(END_DT='20161020'); +alter table a1 add partition(END_DT='20161021'); + +insert into table a1 partition(END_DT='20161020') values('2000721360','20161001'); + + +SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a2 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1; + +SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1; + +DROP TABLE a1; +DROP TABLE a2; \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work.q.out b/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work.q.out new file mode 100644 index 0000000..93d07d6 --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work.q.out @@ -0,0 +1,115 @@ +PREHOOK: query: create table a1(KEHHAO string, START_DT string) partitioned by (END_DT string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@a1 +POSTHOOK: query: create table a1(KEHHAO string, START_DT string) partitioned by (END_DT string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@a1 +PREHOOK: query: create table a2(KEHHAO string, START_DT string) partitioned by (END_DT string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@a2 +POSTHOOK: query: create table a2(KEHHAO string, START_DT string) partitioned by (END_DT string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@a2 +PREHOOK: query: alter table a1 add partition(END_DT='20161020') +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@a1 +POSTHOOK: query: alter table a1 add partition(END_DT='20161020') +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@a1 +POSTHOOK: Output: default@a1@end_dt=20161020 +PREHOOK: query: alter table a1 add partition(END_DT='20161021') +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@a1 +POSTHOOK: query: alter table a1 add partition(END_DT='20161021') +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@a1 +POSTHOOK: Output: default@a1@end_dt=20161021 +PREHOOK: query: insert into table a1 partition(END_DT='20161020') values('2000721360','20161001') +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@a1@end_dt=20161020 +POSTHOOK: query: insert into table a1 partition(END_DT='20161020') values('2000721360','20161001') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@a1@end_dt=20161020 +POSTHOOK: Lineage: a1 PARTITION(end_dt=20161020).kehhao SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: a1 PARTITION(end_dt=20161020).start_dt SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a2 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1 +PREHOOK: type: QUERY +PREHOOK: Input: default@a1 +PREHOOK: Input: default@a1@end_dt=20161020 +PREHOOK: Input: default@a1@end_dt=20161021 +PREHOOK: Input: default@a2 +#### A masked pattern was here #### +POSTHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a2 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@a1 +POSTHOOK: Input: default@a1@end_dt=20161020 +POSTHOOK: Input: default@a1@end_dt=20161021 +POSTHOOK: Input: default@a2 +#### A masked pattern was here #### +PREHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1 +PREHOOK: type: QUERY +PREHOOK: Input: default@a1 +PREHOOK: Input: default@a1@end_dt=20161020 +PREHOOK: Input: default@a1@end_dt=20161021 +#### A masked pattern was here #### +POSTHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@a1 +POSTHOOK: Input: default@a1@end_dt=20161020 +POSTHOOK: Input: default@a1@end_dt=20161021 +#### A masked pattern was here #### +2000721360 2 +PREHOOK: query: DROP TABLE a1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@a1 +PREHOOK: Output: default@a1 +POSTHOOK: query: DROP TABLE a1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@a1 +POSTHOOK: Output: default@a1 +PREHOOK: query: DROP TABLE a2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@a2 +PREHOOK: Output: default@a2 +POSTHOOK: query: DROP TABLE a2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@a2 +POSTHOOK: Output: default@a2