diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a182cd7..8236fe4 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -428,6 +428,10 @@ HIVEJOINEMITINTERVAL("hive.join.emit.interval", 1000), HIVEJOINCACHESIZE("hive.join.cache.size", 25000), + // CBO related + HIVE_CBO_ENABLED("hive.cbo.enable", false), + HIVE_CBO_MAX_JOINS_SUPPORTED("hive.cbo.max.joins.supported", 5), + // hive.mapjoin.bucket.cache.size has been replaced by hive.smbjoin.cache.row, // need to remove by hive .13. Also, do not change default (see SMB operator) HIVEMAPJOINBUCKETCACHESIZE("hive.mapjoin.bucket.cache.size", 100), diff --git ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java index 1ba5654..a3e62aa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java +++ ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java @@ -48,12 +48,25 @@ boolean mapJoinRemoved = false; boolean hasMapGroupBy = false; + private int noOfJoins = 0; + private int noOfOuterJoins = 0; + public boolean hasJoin() { - return hasJoin; + return (noOfJoins > 0); + } + + public void incrementJoinCount(boolean noOuterJoin) { + noOfJoins++; + if (!noOuterJoin) + noOfOuterJoins++; + } + + public int getJoinCount() { + return noOfJoins; } - public void setHasJoin(boolean hasJoin) { - this.hasJoin = hasJoin; + public int getOuterJoinCount() { + return noOfOuterJoins; } public boolean hasGroupBy() { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/PreCBOOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/PreCBOOptimizer.java new file mode 100644 index 0000000..e27964c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/PreCBOOptimizer.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.optimizer.lineage.Generator; +import org.apache.hadoop.hive.ql.optimizer.pcr.PartitionConditionRemover; +import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; +import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.ppd.PredicatePushDown; +import org.apache.hadoop.hive.ql.ppd.PredicateTransitivePropagate; + +/* + * do PredicatePushdown, PartitionPruning and Column Pruning before CBO + */ +public class PreCBOOptimizer { + private ParseContext pctx; + private List transformations; + + /** + * Create the list of transformations. + * + * @param hiveConf + */ + public void initialize(HiveConf hiveConf) { + transformations = new ArrayList(); + // Add the transformation that computes the lineage information. + transformations.add(new Generator()); + transformations.add(new PredicateTransitivePropagate()); + transformations.add(new PredicatePushDown()); + transformations.add(new PartitionPruner()); + transformations.add(new PartitionConditionRemover()); + transformations.add(new ColumnPruner()); + transformations.add(new AnnotateWithStatistics()); + } + + /** + * Invoke all the transformations one-by-one, and alter the query plan. + * + * @return ParseContext + * @throws SemanticException + */ + public ParseContext optimize() throws SemanticException { + for (Transform t : transformations) { + pctx = t.transform(pctx); + } + return pctx; + } + + /** + * @return the pctx + */ + public ParseContext getPctx() { + return pctx; + } + + /** + * @param pctx + * the pctx to set + */ + public void setPctx(ParseContext pctx) { + this.pctx = pctx; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/CostBasedOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/CostBasedOptimizer.java new file mode 100644 index 0000000..5226323 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/CostBasedOptimizer.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.optiq; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; + +/* + * Entry point to Optimizations using Optiq. + */ +public class CostBasedOptimizer { + + /* + * Currently contract is given a Hive Operator Tree, it returns an optimal plan as an Hive AST. + * - if there are Operators that cannot be handled, will return null imply that Semantic Analysis + * should proceed on the original AST. + */ + public static ASTNode optimize(@SuppressWarnings("rawtypes") Operator sinkOp, + SemanticAnalyzer semanticAnalyzer, ParseContext pCtx) { + return null; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java index 52c39c0..a24cad9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java @@ -131,7 +131,7 @@ public String getErrorMessage(RecognitionException e, String[] tokenNames) { * so that the graph walking algorithms and the rules framework defined in * ql.lib can be used with the AST Nodes. */ - static final TreeAdaptor adaptor = new CommonTreeAdaptor() { + public static final TreeAdaptor adaptor = new CommonTreeAdaptor() { /** * Creates an ASTNode for the given token. The ASTNode is a wrapper around * antlr's CommonTree class that implements the Node interface. diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 77388dd..b649af1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -94,6 +94,8 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.Optimizer; +import org.apache.hadoop.hive.ql.optimizer.PreCBOOptimizer; +import org.apache.hadoop.hive.ql.optimizer.optiq.CostBasedOptimizer; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec.SpecType; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression; @@ -251,6 +253,9 @@ //flag for partial scan during analyze ... compute statistics protected boolean partialscan = false; + private volatile boolean runCBO = true; + private volatile boolean disableJoinMerge = false; + /* * Capture the CTE definitions in a Query. */ @@ -315,6 +320,7 @@ protected void reset() { opParseCtx.clear(); groupOpToInputTables.clear(); prunedPartitions.clear(); + disableJoinMerge = false; aliasToCTEs.clear(); } @@ -962,7 +968,6 @@ public boolean doPhase1(ASTNode ast, QB qb, Phase1Ctx ctx_1) frm.getToken().getType() == HiveParser.TOK_LATERAL_VIEW_OUTER) { processLateralView(qb, frm); } else if (isJoinToken(frm)) { - queryProperties.setHasJoin(true); processJoin(qb, frm); qbp.setJoinExpr(frm); }else if(frm.getToken().getType() == HiveParser.TOK_PTBLFUNCTION){ @@ -1179,6 +1184,10 @@ private void getMetaData(QBExpr qbexpr, ReadEntity parentInput) } } + public Table getTable(TableScanOperator ts) { + return topToTable.get(ts); + } + public void getMetaData(QB qb) throws SemanticException { getMetaData(qb, null); } @@ -6467,6 +6476,7 @@ private Operator genJoinOperatorChildren(QBJoinTree join, Operator left, } desc.setNullSafes(nullsafes); } + queryProperties.incrementJoinCount(joinOp.getConf().getNoOuterJoin()); return putOpInsertMap(joinOp, outputRS); } @@ -8823,7 +8833,9 @@ public Operator genPlan(QB qb) throws SemanticException { extractJoinCondsFromWhereClause(joinTree, qb, dest, (ASTNode) whereClause.getChild(0) ); } } - mergeJoinTree(qb); + + if (!disableJoinMerge) + mergeJoinTree(qb); } // if any filters are present in the join tree, push them on top of the @@ -9093,6 +9105,19 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { getMetaData(qb); LOG.info("Completed getting MetaData in Semantic Analysis"); + if (runCBO) { + boolean tokenTypeIsQuery = ast.getToken().getType() == HiveParser.TOK_QUERY + || ast.getToken().getType() == HiveParser.TOK_EXPLAIN; + if (!tokenTypeIsQuery || createVwDesc != null + || !HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CBO_ENABLED)) { + runCBO = false; + } + + if (runCBO) { + disableJoinMerge = true; + } + } + // Save the result schema derived from the sink operator produced // by genPlan. This has the correct column names, which clients // such as JDBC would prefer instead of the c0, c1 we'll end @@ -9102,6 +9127,80 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver()); + if (runCBO) { + /* + * For CBO: + * 1. Run PreCBOOptimizer on Plan. This applies: Partition Pruning, Predicate Pushdown, + * Column Pruning and Stats Annotation transformations on the generated plan. + * 2. Hand the Plan to CBO, which searches the Plan space and returns the best Plan as an AST + * 3. We then run the Analysis Pipeline on the new AST: Phase 1, Get Metadata, Gen Plan. + * a. During Plan Generation, we disable Join Merging, because we don't want the Join + * order to be changed. + * Error Handling: + * - On Failure during CBO optimization: + * - We log the error and proceed with the Plan generated. + * - On Failure during Analysis of the new AST: + * - we restart the Analysis from the beginning on the original AST, with runCBO set to false. + */ + ASTNode newAST = null; + boolean skipCBOPlan = false; + runCBO = false; + + try { + ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner, + opToPartList, topOps, topSelOps, opParseCtx, joinContext, + smbMapJoinContext, topToTable, topToTableProps, fsopToTable, + loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, + uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, + prunedPartitions, opToSamplePruner, globalLimitCtx, + nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, + viewAliasToInput, + reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties); + PreCBOOptimizer optm = new PreCBOOptimizer(); + optm.setPctx(pCtx); + optm.initialize(conf); + pCtx = optm.optimize(); + + newAST = CostBasedOptimizer.optimize(sinkOp, this, pCtx); + if (newAST == null) { + skipCBOPlan = true; + LOG.info("CBO failed, skipping CBO"); + } else if (LOG.isDebugEnabled()) { + String newAstExpanded = newAST.dump(); + LOG.debug("CBO rewritten query: \n" + newAstExpanded); + } + } catch (Throwable t) { + LOG.debug("CBO failed, skipping CBO", t); + skipCBOPlan = true; + } + + if (!skipCBOPlan) { + try { + init(); + ctx_1 = initPhase1Ctx(); + if (!doPhase1(newAST, qb, ctx_1)) { + throw new RuntimeException( + "Couldn't do phase1 on CBO optimized query plan"); + } + getMetaData(qb); + try { + disableJoinMerge = true; + sinkOp = genPlan(qb); + } finally { + disableJoinMerge = false; + } + + resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp) + .getRowResolver()); + } catch (Throwable t) { + LOG.debug("CBO failed, skipping CBO", t); + init(); + analyzeInternal(ast); + return; + } + } + } + ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner, opToPartList, topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps, fsopToTable,