diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java index 9c4c25e..e6f043d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java @@ -31,6 +31,7 @@ import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; @@ -41,11 +42,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalPlanResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.MapJoinDesc; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; -import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.*; /** @@ -204,6 +201,11 @@ private boolean compareWork(BaseWork first, BaseWork second, SparkWork sparkWork return false; } + // need to check paths and partition desc for MapWorks + if (first instanceof MapWork && !compareMapWork((MapWork) first, (MapWork) second)) { + return false; + } + // leave work's output may be read in further SparkWork/FetchWork, we should not combine // leave works without notifying further SparkWork/FetchWork. if (sparkWork.getLeaves().contains(first) && sparkWork.getLeaves().contains(second)) { @@ -228,6 +230,24 @@ private boolean compareWork(BaseWork first, BaseWork second, SparkWork sparkWork return true; } + + private boolean compareMapWork(MapWork first, MapWork second) { + Map pathToPartition1 = first.getPathToPartitionInfo(); + Map pathToPartition2 = second.getPathToPartitionInfo(); + if (pathToPartition1.size() == pathToPartition2.size()) { + for (Map.Entry entry : pathToPartition1.entrySet()) { + String path1 = entry.getKey(); + PartitionDesc partitionDesc1 = entry.getValue(); + PartitionDesc partitionDesc2 = pathToPartition2.get(path1); + if (!partitionDesc1.equals(partitionDesc2)) { + return false; + } + } + return true; + } + return false; + } + private boolean hasSameParent(BaseWork first, BaseWork second, SparkWork sparkWork) { boolean result = true; List firstParents = sparkWork.getParents(first); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java index b032349..aac4b0e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java @@ -313,4 +313,73 @@ public void setVectorPartitionDesc(VectorPartitionDesc vectorPartitionDesc) { public VectorPartitionDesc getVectorPartitionDesc() { return vectorPartitionDesc; } + + @Override + public boolean equals(Object o) { + boolean cond = o instanceof PartitionDesc; + if (!cond) { + return false; + } + + PartitionDesc other = (PartitionDesc) o; + Class input1 = getInputFileFormatClass(); + Class input2 = other.getInputFileFormatClass(); + cond = (input1 == null && input2 == null) || (input1 != null && input1.equals(input2)); + if (!cond) { + return false; + } + + Class output1 = getOutputFileFormatClass(); + Class output2 = other.getOutputFileFormatClass(); + cond = (output1 == null && output2 == null) || (output1 != null && output1.equals(output2)); + if (!cond) { + return false; + } + + Properties properties1 = getProperties(); + Properties properties2 = other.getProperties(); + cond = (properties1 == null && properties2 == null) || + (properties1 != null && properties1.equals(properties2)); + if (!cond) { + return false; + } + + TableDesc tableDesc1 = getTableDesc(); + TableDesc tableDesc2 = other.getTableDesc(); + cond = (tableDesc1 == null && tableDesc2 == null) || + (tableDesc1 != null && tableDesc1.equals(tableDesc2)); + if (!cond) { + return false; + } + + Map partSpec1 = getPartSpec(); + Map partSpec2 = other.getPartSpec(); + cond = (partSpec1 == null && partSpec2 == null) || + (partSpec1 != null && partSpec1.equals(partSpec2)); + if (!cond) { + return false; + } + + VectorPartitionDesc vecPartDesc1 = getVectorPartitionDesc(); + VectorPartitionDesc vecPartDesc2 = other.getVectorPartitionDesc(); + return (vecPartDesc1 == null && vecPartDesc2 == null) || + (vecPartDesc1 != null && vecPartDesc1.equals(vecPartDesc2)); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = result * prime + + (getInputFileFormatClass() == null ? 0 : getInputFileFormatClass().hashCode()); + result = result * prime + + (getOutputFileFormatClass() == null ? 0 : getOutputFileFormatClass().hashCode()); + result = result * prime + (getProperties() == null ? 0 : getProperties().hashCode()); + result = result * prime + (getTableDesc() == null ? 0 : getTableDesc().hashCode()); + result = result * prime + (getPartSpec() == null ? 0 : getPartSpec().hashCode()); + result = result * prime + + (getVectorPartitionDesc() == null ? 0 : getVectorPartitionDesc().hashCode()); + return result; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java index 45151f2..7ceb809 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; +import com.google.common.base.Strings; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; /** @@ -107,4 +108,23 @@ public void setTypeInfos(List typeInfoList) { public int getNonPartColumnCount() { return typeInfos.length; } + + @Override + public boolean equals(Object o) { + if (o instanceof VectorPartitionDesc) { + VectorPartitionDesc other = (VectorPartitionDesc) o; + return + getVectorMapOperatorReadType() == other.getVectorMapOperatorReadType(); + } + return false; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = result * prime + + (getVectorMapOperatorReadType() == null ? 0 : getVectorMapOperatorReadType().hashCode()); + return result; + } } diff --git ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q new file mode 100644 index 0000000..f85d8ee --- /dev/null +++ ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q @@ -0,0 +1,33 @@ +set hive.vectorized.execution.enabled = false; + +create table a1(KEHHAO string, START_DT string) partitioned by (END_DT string); +create table a2(KEHHAO string, START_DT string) partitioned by (END_DT string); + +alter table a1 add partition(END_DT='20161020'); +alter table a1 add partition(END_DT='20161021'); + +insert into table a1 partition(END_DT='20161020') values('2000721360','20161001'); + + +SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a2 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1; + +SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1; + +DROP TABLE a1; +DROP TABLE a2; \ No newline at end of file diff --git ql/src/test/results/clientpositive/spark_combine_equivalent_work.q.out ql/src/test/results/clientpositive/spark_combine_equivalent_work.q.out new file mode 100644 index 0000000..93d07d6 --- /dev/null +++ ql/src/test/results/clientpositive/spark_combine_equivalent_work.q.out @@ -0,0 +1,115 @@ +PREHOOK: query: create table a1(KEHHAO string, START_DT string) partitioned by (END_DT string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@a1 +POSTHOOK: query: create table a1(KEHHAO string, START_DT string) partitioned by (END_DT string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@a1 +PREHOOK: query: create table a2(KEHHAO string, START_DT string) partitioned by (END_DT string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@a2 +POSTHOOK: query: create table a2(KEHHAO string, START_DT string) partitioned by (END_DT string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@a2 +PREHOOK: query: alter table a1 add partition(END_DT='20161020') +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@a1 +POSTHOOK: query: alter table a1 add partition(END_DT='20161020') +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@a1 +POSTHOOK: Output: default@a1@end_dt=20161020 +PREHOOK: query: alter table a1 add partition(END_DT='20161021') +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@a1 +POSTHOOK: query: alter table a1 add partition(END_DT='20161021') +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@a1 +POSTHOOK: Output: default@a1@end_dt=20161021 +PREHOOK: query: insert into table a1 partition(END_DT='20161020') values('2000721360','20161001') +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@a1@end_dt=20161020 +POSTHOOK: query: insert into table a1 partition(END_DT='20161020') values('2000721360','20161001') +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@a1@end_dt=20161020 +POSTHOOK: Lineage: a1 PARTITION(end_dt=20161020).kehhao SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: a1 PARTITION(end_dt=20161020).start_dt SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a2 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1 +PREHOOK: type: QUERY +PREHOOK: Input: default@a1 +PREHOOK: Input: default@a1@end_dt=20161020 +PREHOOK: Input: default@a1@end_dt=20161021 +PREHOOK: Input: default@a2 +#### A masked pattern was here #### +POSTHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a2 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@a1 +POSTHOOK: Input: default@a1@end_dt=20161020 +POSTHOOK: Input: default@a1@end_dt=20161021 +POSTHOOK: Input: default@a2 +#### A masked pattern was here #### +PREHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1 +PREHOOK: type: QUERY +PREHOOK: Input: default@a1 +PREHOOK: Input: default@a1@end_dt=20161020 +PREHOOK: Input: default@a1@end_dt=20161021 +#### A masked pattern was here #### +POSTHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM ( +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +UNION ALL +SELECT KEHHAO FROM a1 T +WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1 +) T1 +GROUP BY T1.KEHHAO +HAVING COUNT(1)>1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@a1 +POSTHOOK: Input: default@a1@end_dt=20161020 +POSTHOOK: Input: default@a1@end_dt=20161021 +#### A masked pattern was here #### +2000721360 2 +PREHOOK: query: DROP TABLE a1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@a1 +PREHOOK: Output: default@a1 +POSTHOOK: query: DROP TABLE a1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@a1 +POSTHOOK: Output: default@a1 +PREHOOK: query: DROP TABLE a2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@a2 +PREHOOK: Output: default@a2 +POSTHOOK: query: DROP TABLE a2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@a2 +POSTHOOK: Output: default@a2