diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java index 9bbaadd3a3948c39be17bc28ca0154cfe9e35d33..7c6bf7b80075c8305c9c97344afe28e4471c90a6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.UnionDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ReturnObjectInspectorResolver; @@ -42,6 +43,7 @@ StructObjectInspector[] parentObjInspectors; List[] parentFields; + List[] relatedParentFieldsIndexes; ReturnObjectInspectorResolver[] columnTypeResolvers; boolean[] needsTransform; @@ -62,16 +64,27 @@ int parents = parentOperators.size(); parentObjInspectors = new StructObjectInspector[parents]; parentFields = new List[parents]; + relatedParentFieldsIndexes = new List[parents]; + Operator curPop = null; for (int p = 0; p < parents; p++) { parentObjInspectors[p] = (StructObjectInspector) inputObjInspectors[p]; parentFields[p] = parentObjInspectors[p].getAllStructFieldRefs(); + curPop = parentOperators.get(p); + if(curPop instanceof FilterOperator) { + int posInParent = curPop.getChildOperators().indexOf(this); + relatedParentFieldsIndexes[p] = ((FilterOperator)curPop).getConf().getRelatedFieldsIndexes(posInParent); + } else { + relatedParentFieldsIndexes[p] = null; + } } // Get columnNames from the first parent - int columns = parentFields[0].size(); + int columns = (relatedParentFieldsIndexes[0] == null? parentFields[0].size(): relatedParentFieldsIndexes[0].size()) ; ArrayList columnNames = new ArrayList(columns); + int orgC; for (int c = 0; c < columns; c++) { - columnNames.add(parentFields[0].get(c).getFieldName()); + orgC = (relatedParentFieldsIndexes[0] == null? c : relatedParentFieldsIndexes[0].get(c)); + columnNames.add(parentFields[0].get(orgC).getFieldName()); } // Get outputFieldOIs @@ -80,10 +93,13 @@ columnTypeResolvers[c] = new ReturnObjectInspectorResolver(true); } + int matchParentFieldSize; for (int p = 0; p < parents; p++) { - assert (parentFields[p].size() == columns); + matchParentFieldSize = relatedParentFieldsIndexes[p] == null? parentFields[p].size() : relatedParentFieldsIndexes[p].size(); + assert (matchParentFieldSize == columns); for (int c = 0; c < columns; c++) { - if (!columnTypeResolvers[c].updateForUnionAll(parentFields[p].get(c) + orgC = (relatedParentFieldsIndexes[p] == null? c : relatedParentFieldsIndexes[p].get(c)); + if (!columnTypeResolvers[c].updateForUnionAll(parentFields[p].get(orgC) .getFieldObjectInspector())) { // checked in SemanticAnalyzer. Should not happen throw new HiveException("Incompatible types for union operator"); @@ -95,7 +111,8 @@ columns); for (int c = 0; c < columns; c++) { // can be null for void type - ObjectInspector fieldOI = parentFields[0].get(c).getFieldObjectInspector(); + orgC = (relatedParentFieldsIndexes[0] == null? c : relatedParentFieldsIndexes[0].get(c)); + ObjectInspector fieldOI = parentFields[0].get(orgC).getFieldObjectInspector(); outputFieldOIs.add(columnTypeResolvers[c].get(fieldOI)); } @@ -127,11 +144,13 @@ public synchronized void process(Object row, int tag) throws HiveException { StructObjectInspector soi = parentObjInspectors[tag]; List fields = parentFields[tag]; - - if (needsTransform[tag]) { - for (int c = 0; c < fields.size(); c++) { + int unionFieldsSize = (relatedParentFieldsIndexes[tag] == null? parentFields[tag].size(): relatedParentFieldsIndexes[tag].size()) ; + int orgC; + if (needsTransform[tag] || relatedParentFieldsIndexes[tag] != null) { //relatedParentFieldsIndexes[tag] not null means union col num different from its parent. + for (int c = 0; c < unionFieldsSize; c++) { + orgC = (relatedParentFieldsIndexes[tag] == null? c : relatedParentFieldsIndexes[tag].get(c)) ; outputRow.set(c, columnTypeResolvers[c].convertIfNecessary(soi - .getStructFieldData(row, fields.get(c)), fields.get(c) + .getStructFieldData(row, fields.get(orgC)), fields.get(orgC) .getFieldObjectInspector())); } forward(outputRow, outputObjInspector); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java index 692319e1529a081ce4d43af6ee6d7aad5b8a7035..d4101d1822f03199cae99b36cd0c8939c197edbe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java @@ -25,14 +25,17 @@ import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; +import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; @@ -212,4 +215,54 @@ public ParseContext getParseContext() { } return columns; } + + /** + * If the input filter operator has direct child(ren) which are union operator, + * and the filter's column is not the same as union's + * create list to map child's col order to the parent's. + * + * @param curOp + * The filter operator which need to handle children. + * @throws SemanticException + */ + public void handleFilterUnionChildren(Operator curOp) + throws SemanticException { + if (curOp.getChildOperators() == null || !(curOp instanceof FilterOperator)) { + return; + } + List parentPrunList = prunedColLists.get(curOp); + if(parentPrunList == null || parentPrunList.size() == 0) { + return; + } + FilterOperator filOp = (FilterOperator)curOp; + FilterDesc desc = filOp.getConf(); + List prunList = null; + List[] childToParentIndex = null; + + int childPos = 0; + for (Operator child : curOp.getChildOperators()) { + if (child instanceof UnionOperator) { + prunList = prunedColLists.get(child); + if (prunList == null || parentPrunList.size() == prunList.size()) { + ++childPos; + continue; + } + if(childToParentIndex == null) { + childToParentIndex = new List[curOp.getChildOperators().size()]; + for (int pos = 0; pos < curOp.getChildOperators().size(); ++pos) { + childToParentIndex[pos] = null; + } + } + List unionColInx = new ArrayList(); + int inx = 0; + for (String colName: prunList) { + unionColInx.add(inx, parentPrunList.indexOf(colName)); + ++inx; + } + childToParentIndex[childPos] = unionColInx; + } + ++childPos; + } + desc.setChildOpColsToParentIndex(childToParentIndex); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java index 26b83cac89f3cd8d7797fdb29eaf0207f1865ad6..7a66031f72fb1b648988c967c71271edaa3a8e6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java @@ -106,6 +106,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, filterOpPrunedColListsOrderPreserved); pruneOperator(cppCtx, op, cppCtx.getPrunedColLists().get(op)); + cppCtx.handleFilterUnionChildren(op); return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java index 5408dc87d3097ca1415a450a2a7dfbdb4efefd51..ba685ba5c7e8bb186f195f654592e2b31a9b0696 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.plan; import java.util.List; + import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -83,6 +84,7 @@ public String toString() { //Is this a filter that should perform a comparison for sorted searches private boolean isSortedFilter; private transient boolean isGenerated; + private List[] childOpColsToParentIndex; public FilterDesc() { } @@ -93,6 +95,7 @@ public FilterDesc( this.predicate = predicate; this.isSamplingPred = isSamplingPred; sampleDescr = null; + childOpColsToParentIndex = null; } public FilterDesc( @@ -101,6 +104,7 @@ public FilterDesc( this.predicate = predicate; this.isSamplingPred = isSamplingPred; this.sampleDescr = sampleDescr; + childOpColsToParentIndex = null; } @Explain(displayName = "predicate", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -162,6 +166,21 @@ public void setGenerated(boolean isGenerated) { this.isGenerated = isGenerated; } + public List getRelatedFieldsIndexes(int childPos) { + if(childOpColsToParentIndex != null) { + return childOpColsToParentIndex[childPos]; + } + return null; + } + + public List[] getChildOpColsToParentIndex() { + return childOpColsToParentIndex; + } + + public void setChildOpColsToParentIndex(List[] childOpColsToParentIndex) { + this.childOpColsToParentIndex = childOpColsToParentIndex; + } + @Override public Object clone() { FilterDesc filterDesc = new FilterDesc(getPredicate().clone(), getIsSamplingPred()); @@ -169,6 +188,7 @@ public Object clone() { filterDesc.setSampleDescr(getSampleDescr()); } filterDesc.setSortedFilter(isSortedFilter()); + filterDesc.setChildOpColsToParentIndex(getChildOpColsToParentIndex()); return filterDesc; } } diff --git a/ql/src/test/queries/clientpositive/unionall_unbalancedppd.q b/ql/src/test/queries/clientpositive/unionall_unbalancedppd.q new file mode 100644 index 0000000000000000000000000000000000000000..a13d710b22416f1dda386d4c5848458dd2a1198b --- /dev/null +++ b/ql/src/test/queries/clientpositive/unionall_unbalancedppd.q @@ -0,0 +1,86 @@ +set hive.optimize.ppd=true; +drop table if exists union_all_bug_test_1; +drop table if exists union_all_bug_test_2; +create table if not exists union_all_bug_test_1 +( +f1 int, +f2 int +); + +create table if not exists union_all_bug_test_2 +( +f1 int +); + +SELECT f1 +FROM ( + +SELECT +f1 +, if('helloworld' like '%hello%' ,f1,f2) as filter +FROM union_all_bug_test_1 + +union all + +select +f1 +, 0 as filter +from union_all_bug_test_2 +) A +WHERE (filter = 1); + +insert into table union_all_bug_test_1 values (1,1); +insert into table union_all_bug_test_2 values (1); +insert into table union_all_bug_test_1 values (0,0); +insert into table union_all_bug_test_2 values (0); + +SELECT f1 +FROM ( + +SELECT +f1 +, if('helloworld' like '%hello%' ,f1,f2) as filter +FROM union_all_bug_test_1 + +union all + +select +f1 +, 0 as filter +from union_all_bug_test_2 +) A +WHERE (filter = 1); + +SELECT f1 +FROM ( + +SELECT +f1 +, if('helloworld' like '%hello%' ,f1,f2) as filter +FROM union_all_bug_test_1 + +union all + +select +f1 +, 0 as filter +from union_all_bug_test_2 +) A +WHERE (filter = 0); + +SELECT f1 +FROM ( + +SELECT +f1 +, if('helloworld' like '%hello%' ,f1,f2) as filter +FROM union_all_bug_test_1 + +union all + +select +f1 +, 0 as filter +from union_all_bug_test_2 +) A +WHERE (filter = 1 or filter == 0); diff --git a/ql/src/test/results/clientpositive/unionall_unbalancedppd.q.out b/ql/src/test/results/clientpositive/unionall_unbalancedppd.q.out new file mode 100644 index 0000000000000000000000000000000000000000..d1f6940dc97b27e1b802317dd7e41be40573012d --- /dev/null +++ b/ql/src/test/results/clientpositive/unionall_unbalancedppd.q.out @@ -0,0 +1,244 @@ +PREHOOK: query: drop table if exists union_all_bug_test_1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists union_all_bug_test_1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists union_all_bug_test_2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists union_all_bug_test_2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table if not exists union_all_bug_test_1 +( +f1 int, +f2 int +) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@union_all_bug_test_1 +POSTHOOK: query: create table if not exists union_all_bug_test_1 +( +f1 int, +f2 int +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@union_all_bug_test_1 +PREHOOK: query: create table if not exists union_all_bug_test_2 +( +f1 int +) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@union_all_bug_test_2 +POSTHOOK: query: create table if not exists union_all_bug_test_2 +( +f1 int +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@union_all_bug_test_2 +PREHOOK: query: SELECT f1 +FROM ( + +SELECT +f1 +, if('helloworld' like '%hello%' ,f1,f2) as filter +FROM union_all_bug_test_1 + +union all + +select +f1 +, 0 as filter +from union_all_bug_test_2 +) A +WHERE (filter = 1) +PREHOOK: type: QUERY +PREHOOK: Input: default@union_all_bug_test_1 +PREHOOK: Input: default@union_all_bug_test_2 +#### A masked pattern was here #### +POSTHOOK: query: SELECT f1 +FROM ( + +SELECT +f1 +, if('helloworld' like '%hello%' ,f1,f2) as filter +FROM union_all_bug_test_1 + +union all + +select +f1 +, 0 as filter +from union_all_bug_test_2 +) A +WHERE (filter = 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@union_all_bug_test_1 +POSTHOOK: Input: default@union_all_bug_test_2 +#### A masked pattern was here #### +PREHOOK: query: insert into table union_all_bug_test_1 values (1,1) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@union_all_bug_test_1 +POSTHOOK: query: insert into table union_all_bug_test_1 values (1,1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@union_all_bug_test_1 +POSTHOOK: Lineage: union_all_bug_test_1.f1 EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: union_all_bug_test_1.f2 EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: insert into table union_all_bug_test_2 values (1) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__2 +PREHOOK: Output: default@union_all_bug_test_2 +POSTHOOK: query: insert into table union_all_bug_test_2 values (1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__2 +POSTHOOK: Output: default@union_all_bug_test_2 +POSTHOOK: Lineage: union_all_bug_test_2.f1 EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: insert into table union_all_bug_test_1 values (0,0) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__3 +PREHOOK: Output: default@union_all_bug_test_1 +POSTHOOK: query: insert into table union_all_bug_test_1 values (0,0) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__3 +POSTHOOK: Output: default@union_all_bug_test_1 +POSTHOOK: Lineage: union_all_bug_test_1.f1 EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: union_all_bug_test_1.f2 EXPRESSION [(values__tmp__table__3)values__tmp__table__3.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: insert into table union_all_bug_test_2 values (0) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__4 +PREHOOK: Output: default@union_all_bug_test_2 +POSTHOOK: query: insert into table union_all_bug_test_2 values (0) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__4 +POSTHOOK: Output: default@union_all_bug_test_2 +POSTHOOK: Lineage: union_all_bug_test_2.f1 EXPRESSION [(values__tmp__table__4)values__tmp__table__4.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: SELECT f1 +FROM ( + +SELECT +f1 +, if('helloworld' like '%hello%' ,f1,f2) as filter +FROM union_all_bug_test_1 + +union all + +select +f1 +, 0 as filter +from union_all_bug_test_2 +) A +WHERE (filter = 1) +PREHOOK: type: QUERY +PREHOOK: Input: default@union_all_bug_test_1 +PREHOOK: Input: default@union_all_bug_test_2 +#### A masked pattern was here #### +POSTHOOK: query: SELECT f1 +FROM ( + +SELECT +f1 +, if('helloworld' like '%hello%' ,f1,f2) as filter +FROM union_all_bug_test_1 + +union all + +select +f1 +, 0 as filter +from union_all_bug_test_2 +) A +WHERE (filter = 1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@union_all_bug_test_1 +POSTHOOK: Input: default@union_all_bug_test_2 +#### A masked pattern was here #### +1 +PREHOOK: query: SELECT f1 +FROM ( + +SELECT +f1 +, if('helloworld' like '%hello%' ,f1,f2) as filter +FROM union_all_bug_test_1 + +union all + +select +f1 +, 0 as filter +from union_all_bug_test_2 +) A +WHERE (filter = 0) +PREHOOK: type: QUERY +PREHOOK: Input: default@union_all_bug_test_1 +PREHOOK: Input: default@union_all_bug_test_2 +#### A masked pattern was here #### +POSTHOOK: query: SELECT f1 +FROM ( + +SELECT +f1 +, if('helloworld' like '%hello%' ,f1,f2) as filter +FROM union_all_bug_test_1 + +union all + +select +f1 +, 0 as filter +from union_all_bug_test_2 +) A +WHERE (filter = 0) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@union_all_bug_test_1 +POSTHOOK: Input: default@union_all_bug_test_2 +#### A masked pattern was here #### +0 +1 +0 +PREHOOK: query: SELECT f1 +FROM ( + +SELECT +f1 +, if('helloworld' like '%hello%' ,f1,f2) as filter +FROM union_all_bug_test_1 + +union all + +select +f1 +, 0 as filter +from union_all_bug_test_2 +) A +WHERE (filter = 1 or filter == 0) +PREHOOK: type: QUERY +PREHOOK: Input: default@union_all_bug_test_1 +PREHOOK: Input: default@union_all_bug_test_2 +#### A masked pattern was here #### +POSTHOOK: query: SELECT f1 +FROM ( + +SELECT +f1 +, if('helloworld' like '%hello%' ,f1,f2) as filter +FROM union_all_bug_test_1 + +union all + +select +f1 +, 0 as filter +from union_all_bug_test_2 +) A +WHERE (filter = 1 or filter == 0) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@union_all_bug_test_1 +POSTHOOK: Input: default@union_all_bug_test_2 +#### A masked pattern was here #### +1 +0 +1 +0