diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index e4910e4..c9dcc94 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1466,7 +1466,8 @@ spark.query.files=add_part_multiple.q, \ # Unlike "spark.query.files" above, these tests only run # under Spark engine. -spark.only.query.files=spark_dynamic_partition_pruning.q,\ +spark.only.query.files=spark_combine_equivalent_work.q,\ + spark_dynamic_partition_pruning.q,\ spark_dynamic_partition_pruning_2.q,\ spark_vectorized_dynamic_partition_pruning.q diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java index 41e9ba6..ec192a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java @@ -29,6 +29,9 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; @@ -210,6 +213,11 @@ private boolean compareWork(BaseWork first, BaseWork second, SparkWork sparkWork return false; } + // need to check paths and partition desc for MapWorks + if (first instanceof MapWork && !compareMapWork((MapWork) first, (MapWork) second)) { + return false; + } + Set> firstRootOperators = first.getAllRootOperators(); Set> secondRootOperators = second.getAllRootOperators(); if (firstRootOperators.size() != secondRootOperators.size()) { @@ -228,6 +236,23 @@ private boolean compareWork(BaseWork first, BaseWork second, SparkWork sparkWork return true; } + private boolean compareMapWork(MapWork first, MapWork second) { + Map pathToPartition1 = first.getPathToPartitionInfo(); + Map pathToPartition2 = second.getPathToPartitionInfo(); + if (pathToPartition1.size() == pathToPartition2.size()) { + for (Map.Entry entry : pathToPartition1.entrySet()) { + Path path1 = entry.getKey(); + PartitionDesc partitionDesc1 = entry.getValue(); + PartitionDesc partitionDesc2 = pathToPartition2.get(path1); + if (!partitionDesc1.equals(partitionDesc2)) { + return false; + } + } + return true; + } + return false; + } + private boolean hasSameParent(BaseWork first, BaseWork second, SparkWork sparkWork) { boolean result = true; List firstParents = sparkWork.getParents(first); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java index 921461f..73981e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java @@ -298,6 +298,74 @@ public PartitionDesc clone() { return ret; } + @Override + public boolean equals(Object o) { + boolean cond = o instanceof PartitionDesc; + if (!cond) { + return false; + } + + PartitionDesc other = (PartitionDesc) o; + Class input1 = getInputFileFormatClass(); + Class input2 = other.getInputFileFormatClass(); + cond = (input1 == null && input2 == null) || (input1 != null && input1.equals(input2)); + if (!cond) { + return false; + } + + Class output1 = getOutputFileFormatClass(); + Class output2 = other.getOutputFileFormatClass(); + cond = (output1 == null && output2 == null) || (output1 != null && output1.equals(output2)); + if (!cond) { + return false; + } + + Properties properties1 = getProperties(); + Properties properties2 = other.getProperties(); + cond = (properties1 == null && properties2 == null) || + (properties1 != null && properties1.equals(properties2)); + if (!cond) { + return false; + } + + TableDesc tableDesc1 = getTableDesc(); + TableDesc tableDesc2 = other.getTableDesc(); + cond = (tableDesc1 == null && tableDesc2 == null) || + (tableDesc1 != null && tableDesc1.equals(tableDesc2)); + if (!cond) { + return false; + } + + Map partSpec1 = getPartSpec(); + Map partSpec2 = other.getPartSpec(); + cond = (partSpec1 == null && partSpec2 == null) || + (partSpec1 != null && partSpec1.equals(partSpec2)); + if (!cond) { + return false; + } + + VectorPartitionDesc vecPartDesc1 = getVectorPartitionDesc(); + VectorPartitionDesc vecPartDesc2 = other.getVectorPartitionDesc(); + return (vecPartDesc1 == null && vecPartDesc2 == null) || + (vecPartDesc1 != null && vecPartDesc1.equals(vecPartDesc2)); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = result * prime + + (getInputFileFormatClass() == null ? 0 : getInputFileFormatClass().hashCode()); + result = result * prime + + (getOutputFileFormatClass() == null ? 0 : getOutputFileFormatClass().hashCode()); + result = result * prime + (getProperties() == null ? 0 : getProperties().hashCode()); + result = result * prime + (getTableDesc() == null ? 0 : getTableDesc().hashCode()); + result = result * prime + (getPartSpec() == null ? 0 : getPartSpec().hashCode()); + result = result * prime + + (getVectorPartitionDesc() == null ? 0 : getVectorPartitionDesc().hashCode()); + return result; + } + /** * Attempt to derive a virtual base file name property from the * path. If path format is unrecognized, just use the full path. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java index 2b61ec0..7bf70c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; +import com.google.common.base.Strings; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; /** @@ -156,6 +157,39 @@ public VectorPartitionDesc clone() { return result; } + @Override + public boolean equals(Object o) { + if (o instanceof VectorPartitionDesc) { + VectorPartitionDesc other = (VectorPartitionDesc) o; + return Strings.nullToEmpty(getInputFileFormatClassName()).equals( + Strings.nullToEmpty(other.getInputFileFormatClassName())) && + Strings.nullToEmpty(getRowDeserializerClassName()).equals( + Strings.nullToEmpty(other.getRowDeserializerClassName())) && + getVectorDeserializeType() == other.getVectorDeserializeType() && + getVectorMapOperatorReadType() == other.getVectorMapOperatorReadType() && + getIsInputFileFormatSelfDescribing() == other.getIsInputFileFormatSelfDescribing() && + Arrays.equals(getDataTypeInfos(), other.getDataTypeInfos()); + } + return false; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = result * prime + + (getInputFileFormatClassName() == null ? 0 : getInputFileFormatClassName().hashCode()); + result = result * prime + + (getRowDeserializerClassName() == null ? 0 : getRowDeserializerClassName().hashCode()); + result = result * prime + + (getVectorDeserializeType() == null ? 0 : getVectorDeserializeType().hashCode()); + result = result * prime + + (getVectorMapOperatorReadType() == null ? 0 : getVectorMapOperatorReadType().hashCode()); + result = result * prime + Boolean.valueOf(getIsInputFileFormatSelfDescribing()).hashCode(); + result = result * prime + Arrays.hashCode(getDataTypeInfos()); + return result; + } + public VectorMapOperatorReadType getVectorMapOperatorReadType() { return vectorMapOperatorReadType; } diff --git a/ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q b/ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q new file mode 100644 index 0000000..f85d8ee --- /dev/null +++ b/ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q @@ -0,0 +1,33 @@ +set hive.vectorized.execution.enabled = false; + +create table a1(KEHHAO string, START_DT string) partitioned by (END_DT string); +create table a2(KEHHAO string, START_DT string) partitioned by (END_DT string); + +alter table a1 add partition(END_DT='20161020'); +alter table a1 add partition(END_DT='20161021'); + +insert into table a1 partition(END_DT='20161020') values('2000721360','20161001'); + + +SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a2 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1; + +SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1; + +DROP TABLE a1; +DROP TABLE a2; \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work.q.out b/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work.q.out new file mode 100644 index 0000000..93d07d6 --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work.q.out @@ -0,0 +1,115 @@ +PREHOOK: query: create table a1(KEHHAO string, START_DT string) partitioned by (END_DT string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@a1 +POSTHOOK: query: create table a1(KEHHAO string, START_DT string) partitioned by (END_DT string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@a1 +PREHOOK: query: create table a2(KEHHAO string, START_DT string) partitioned by (END_DT string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@a2 +POSTHOOK: query: create table a2(KEHHAO string, START_DT string) partitioned by (END_DT string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@a2 +PREHOOK: query: alter table a1 add partition(END_DT='20161020') +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@a1 +POSTHOOK: query: alter table a1 add partition(END_DT='20161020') +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@a1 +POSTHOOK: Output: default@a1@end_dt=20161020 +PREHOOK: query: alter table a1 add partition(END_DT='20161021') +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@a1 +POSTHOOK: query: alter table a1 add partition(END_DT='20161021') +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@a1 +POSTHOOK: Output: default@a1@end_dt=20161021 +PREHOOK: query: insert into table a1 partition(END_DT='20161020') values('2000721360','20161001') +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@a1@end_dt=20161020 +POSTHOOK: query: insert into table a1 partition(END_DT='20161020') values('2000721360','20161001') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@a1@end_dt=20161020 +POSTHOOK: Lineage: a1 PARTITION(end_dt=20161020).kehhao SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: a1 PARTITION(end_dt=20161020).start_dt SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a2 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1 +PREHOOK: type: QUERY +PREHOOK: Input: default@a1 +PREHOOK: Input: default@a1@end_dt=20161020 +PREHOOK: Input: default@a1@end_dt=20161021 +PREHOOK: Input: default@a2 +#### A masked pattern was here #### +POSTHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a2 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@a1 +POSTHOOK: Input: default@a1@end_dt=20161020 +POSTHOOK: Input: default@a1@end_dt=20161021 +POSTHOOK: Input: default@a2 +#### A masked pattern was here #### +PREHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1 +PREHOOK: type: QUERY +PREHOOK: Input: default@a1 +PREHOOK: Input: default@a1@end_dt=20161020 +PREHOOK: Input: default@a1@end_dt=20161021 +#### A masked pattern was here #### +POSTHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@a1 +POSTHOOK: Input: default@a1@end_dt=20161020 +POSTHOOK: Input: default@a1@end_dt=20161021 +#### A masked pattern was here #### +2000721360 2 +PREHOOK: query: DROP TABLE a1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@a1 +PREHOOK: Output: default@a1 +POSTHOOK: query: DROP TABLE a1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@a1 +POSTHOOK: Output: default@a1 +PREHOOK: query: DROP TABLE a2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@a2 +PREHOOK: Output: default@a2 +POSTHOOK: query: DROP TABLE a2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@a2 +POSTHOOK: Output: default@a2