diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c49a0f2..b832aee 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1031,6 +1031,7 @@ "Whether to push predicates down into storage handlers. Ignored when hive.optimize.ppd is false."), // Constant propagation optimizer HIVEOPTCONSTANTPROPAGATION("hive.optimize.constant.propagation", true, "Whether to enable constant propagation optimizer"), + HIVEIDENTITYPROJECTREMOVER("hive.optimize.remove.identity.project", true, "Removes identity project from operator tree"), HIVEMETADATAONLYQUERIES("hive.optimize.metadataonly", true, ""), HIVENULLSCANOPTIMIZE("hive.optimize.null.scan", true, "Dont scan relations which are guaranteed to not generate any rows"), HIVEOPTPPD_STORAGE("hive.optimize.ppd.storage", true, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 273691e..8966cfc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.OpTraits; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -44,11 +45,14 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import com.sun.tools.internal.ws.wsdl.document.jaxws.Exception; + /** * Base operator implementation. **/ @@ -705,12 +709,30 @@ public void removeChild(Operator child) { * @throws SemanticException */ public void removeChildAndAdoptItsChildren( - Operator child) throws SemanticException { + Operator child) throws SemanticException { + removeChildAndAdoptItsChildren(child, false); + } + + public boolean removeChildAndAdoptItsChildren( + Operator child, boolean translateColExprs) throws SemanticException { int childIndex = childOperators.indexOf(child); if (childIndex == -1) { throw new SemanticException( "Exception when trying to remove partition predicates: fail to find child from parent"); } + Map childColExprMap = child.getColumnExprMap(); + if (translateColExprs && child.getChildOperators() != null) { + // First check that we can translate all the colExprMaps. + for (Operator grandChild : child.getChildOperators()) { + Map gcColExprMap = grandChild.getColumnExprMap(); + if (gcColExprMap == null) continue; // TODO: do we need to patch grand-grand child? + for (Map.Entry e : gcColExprMap.entrySet()) { + if (!(e.getValue() instanceof ExprNodeColumnDesc)) { + return false; // We cannot translate this map. + } + } + } + } childOperators.remove(childIndex); if (child.getChildOperators() != null && @@ -723,10 +745,28 @@ public void removeChildAndAdoptItsChildren( int index = parents.indexOf(child); if (index == -1) { throw new SemanticException( - "Exception when trying to remove partition predicates: fail to find parent from child"); + "Exception when trying to remove operator: fail to find parent from child"); } parents.set(index, this); + if (translateColExprs) { + Map gcColExprMap = gc.getColumnExprMap(); + if (gcColExprMap == null) { + LOG.warn("colExprMap of " + gc + " is null while removing " + child); + continue; + } + for (Map.Entry e : gcColExprMap.entrySet()) { + String colName = ((ExprNodeColumnDesc)e.getValue()).getColumn(); + ExprNodeDesc childToParentExpr = childColExprMap.get(colName); + if (childToParentExpr != null) { + e.setValue(childToParentExpr); + } else { + LOG.warn("Cannot translate " + e.getKey() + " in " + gc + + ": nothing found in " + child + " for " + colName); + } + } + } } + return true; } public void removeParent(Operator parent) { @@ -1330,4 +1370,41 @@ public void processOp(Object row, int tag) { } Map dummyOps = parentOperators.get(0).getTagToOperatorTree(); return dummyOps; } + + public String toVerboseString() { + String s = "{colExprMap: {"; + if (colExprMap != null) { + for (Map.Entry entry : colExprMap.entrySet()) { + s += entry.getKey() + " => " + entry.getValue().getExprString() + ", "; + } + } + s += "}, output oi " + toOiString(outputObjInspector) + ", input ois ["; + if (inputObjInspectors != null) { + for (ObjectInspector oi : inputObjInspectors) { + s += toOiString(oi) + ", "; + } + } + return s + "]}"; + } + + public String toOiString(ObjectInspector oi) { + String s = "{" + (oi == null ? "null" : oi.toString() + + " (" + oi.getClass().getSimpleName() + ")") + " with fields: ["; + if (oi instanceof StructObjectInspector) { + StructObjectInspector soi = (StructObjectInspector)oi; + for (StructField sf : soi.getAllStructFieldRefs()) { + s += sf.getFieldName() + ", "; + } + } + s += "]}"; + return s; + } + + public String toChildVerboseString() { + String s = "["; + for (Operator op : childOperators) { + s += op.toVerboseString() + ", "; + } + return s + "]"; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/IdentityProjectRemover.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/IdentityProjectRemover.java new file mode 100644 index 0000000..c52625d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/IdentityProjectRemover.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +/** This optimization tries to remove {@link SelectOperator} from tree which don't do any + * processing except forwarding columns from its parent to its children. + * e.g., select * from (select * from src where key = value) t1 join (select * from src where key = value) t2; + * Query tree + * + * Without this optimization: + * + * TS -> FIL -> SEL -> RS -> + * JOIN -> SEL -> FS + * TS -> FIL -> SEL -> RS -> + * + * With this optimization + * + * TS -> FIL -> RS -> + * JOIN -> FS + * TS -> FIL -> RS -> + * + * Note absence of select operator after filter and after join operator. + * Also, see : identity_proj_remove.q + */ +public class IdentityProjectRemover implements Transform { + + private static final Log LOG = LogFactory.getLog(IdentityProjectRemover.class); + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + + if (!pctx.getLoadTableWork().isEmpty()) { + //TODO: For insert queries, top level column name changes which screws up with this + // optimization. Disabling for insert queries for now. + return pctx; + } + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", + "(" + SelectOperator.getOperatorName() + "%)"), new ProjectRemover()); + GraphWalker ogw = new DefaultGraphWalker(new DefaultRuleDispatcher(null, opRules, null)); + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + return pctx; + } + + private static class ProjectRemover implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + SelectOperator sel = (SelectOperator)nd; + List> parents = sel.getParentOperators(); + if (parents.size() != 1) { + // multi parents, cant handle that. + return null; + } + Operator parent = parents.get(0); + RowSchema prs = parent.getSchema(); + ArrayList pcols; + List selCols = sel.getConf().getColList(); + List outputColNames = sel.getConf().getOutputColumnNames(); + if (selCols == null || null == prs || (pcols = prs.getSignature()) == null) { + // this is unlikely, but lets be defensive, this is an optimization after all. + return null; + } + if (selCols.size() != pcols.size()/* || outputColNames.size() != pcols.size()*/) { + return null; + } + + for (int i = 0; i < pcols.size(); i++) { + ExprNodeDesc desc = selCols.get(i); + if (!(desc instanceof ExprNodeColumnDesc)) { + // this needs to be a simple project + return null; + } + // TODO#: For strict identity check, we should be comparing outputColNames with pcols, but + // below we translate ColRef-s, so it should work. Do we even need to check getExprString? + if (!(desc.getTypeInfo().equals(pcols.get(i).getType())) || + !(desc.getExprString().equals(pcols.get(i).getInternalName()))) { + // type and name should match + return null; + } + + } + // All looks good! + if (parent.removeChildAndAdoptItsChildren(sel, true)) { + LOG.debug("TODO# Identity project remover optimization removed : " + sel.toVerboseString() + + " with parent " + parent.toVerboseString() + " and children " + sel.toChildVerboseString()); + } + return null; + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 3e32558..fc1161a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -122,6 +122,9 @@ public void initialize(HiveConf hiveConf) { transformations.add(new ReduceSinkDeDuplication()); } transformations.add(new NonBlockingOpDeDupProc()); + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEIDENTITYPROJECTREMOVER)) { + transformations.add(new IdentityProjectRemover()); + } if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVELIMITOPTENABLE)) { transformations.add(new GlobalLimitOptimizer()); } diff --git ql/src/test/queries/clientpositive/identity_proj_remove.q ql/src/test/queries/clientpositive/identity_proj_remove.q new file mode 100644 index 0000000..cc8be74 --- /dev/null +++ ql/src/test/queries/clientpositive/identity_proj_remove.q @@ -0,0 +1,9 @@ +set hive.optimize.remove.identity.project=false; +explain +select * from (select * from src where key = value) t1 join (select * from src where key = value) t2; + +set hive.optimize.remove.identity.project=true; +explain +select * from (select * from src where key = value) t1 join (select * from src where key = value) t2; + +select * from (select * from src where key = value) t1 join (select * from src where key = value) t2; diff --git ql/src/test/results/clientpositive/auto_join0.q.out ql/src/test/results/clientpositive/auto_join0.q.out index 9261ce0..cb9b3dd 100644 --- ql/src/test/results/clientpositive/auto_join0.q.out +++ ql/src/test/results/clientpositive/auto_join0.q.out @@ -42,17 +42,13 @@ STAGE PLANS: Filter Operator predicate: (key < 10) (type: boolean) Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: key (type: string), value (type: string) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE - HashTable Sink Operator - condition expressions: - 0 {_col0} {_col1} - 1 {_col0} {_col1} - keys: - 0 - 1 + HashTable Sink Operator + condition expressions: + 0 {key} {value} + 1 {key} {value} + keys: + 0 + 1 Stage: Stage-2 Map Reduce @@ -63,29 +59,21 @@ STAGE PLANS: Filter Operator predicate: (key < 10) (type: boolean) Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: key (type: string), value (type: string) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE - Map Join Operator - condition map: - Inner Join 0 to 1 - condition expressions: - 0 {_col0} {_col1} - 1 {_col0} {_col1} - keys: - 0 - 1 - outputColumnNames: _col0, _col1, _col2, _col3 + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} {value} + 1 {key} {value} + keys: + 0 + 1 + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 182 Data size: 1939 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string) + sort order: ++++ Statistics: Num rows: 182 Data size: 1939 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string) - outputColumnNames: _col0, _col1, _col2, _col3 - Statistics: Num rows: 182 Data size: 1939 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string) - sort order: ++++ - Statistics: Num rows: 182 Data size: 1939 Basic stats: COMPLETE Column stats: NONE Local Work: Map Reduce Local Work Reduce Operator Tree: @@ -119,17 +107,13 @@ STAGE PLANS: mode: mergepartial outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: _col0 (type: bigint) - outputColumnNames: _col0 + File Output Operator + compressed: false Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator diff --git ql/src/test/results/clientpositive/identity_proj_remove.q.out ql/src/test/results/clientpositive/identity_proj_remove.q.out new file mode 100644 index 0000000..993d6cc --- /dev/null +++ ql/src/test/results/clientpositive/identity_proj_remove.q.out @@ -0,0 +1,137 @@ +Warning: Shuffle Join JOIN[8][tables = [t1, t2]] in Stage 'Stage-1:MAPRED' is a cross product +PREHOOK: query: explain +select * from (select * from src where key = value) t1 join (select * from src where key = value) t2 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select * from (select * from src where key = value) t1 join (select * from src where key = value) t2 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key = value) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string) + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key = value) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string) + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 {VALUE._col0} {VALUE._col1} + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +Warning: Shuffle Join JOIN[8][tables = [t1, t2]] in Stage 'Stage-1:MAPRED' is a cross product +PREHOOK: query: explain +select * from (select * from src where key = value) t1 join (select * from src where key = value) t2 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select * from (select * from src where key = value) t1 join (select * from src where key = value) t2 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key = value) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string) + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key = value) (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string) + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 {VALUE._col0} {VALUE._col1} + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +Warning: Shuffle Join JOIN[8][tables = [t1, t2]] in Stage 'Stage-1:MAPRED' is a cross product +PREHOOK: query: select * from (select * from src where key = value) t1 join (select * from src where key = value) t2 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select * from (select * from src where key = value) t1 join (select * from src where key = value) t2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here ####