diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a182cd7..7e224b8 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -428,6 +428,17 @@ HIVEJOINEMITINTERVAL("hive.join.emit.interval", 1000), HIVEJOINCACHESIZE("hive.join.cache.size", 25000), + /* + * Flag to control enabling Cost Based Optimizations using Optiq framework. + */ + HIVE_CBO_ENABLED("hive.cbo.enable", false), + /* + * Control queries that will be considered for join reordering, based on number of joins in + * them. Beyond a certain number of joins, the cost of considering possible permutations + * is prohibitive. + */ + HIVE_CBO_MAX_JOINS_SUPPORTED("hive.cbo.max.joins.supported", 5), + // hive.mapjoin.bucket.cache.size has been replaced by hive.smbjoin.cache.row, // need to remove by hive .13. Also, do not change default (see SMB operator) HIVEMAPJOINBUCKETCACHESIZE("hive.mapjoin.bucket.cache.size", 100), diff --git conf/hive-default.xml.template conf/hive-default.xml.template index 0d08aa2..8bf95df 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -2344,4 +2344,23 @@ + + hive.cbo.enable + false + + Placeholder Flag to control enabling Cost Based Optimizations using Optiq framework. + Currently turining on this flag, does nothing. + + + + + hive.cbo.enable + 5 + + Control queries that will be considered for join reordering, based on number of joins in + them. Beyond a certain number of joins, the cost of considering possible permutations + is prohibitive. + + + diff --git ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java index 1ba5654..6decc83 100644 --- ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java +++ ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java @@ -48,12 +48,26 @@ boolean mapJoinRemoved = false; boolean hasMapGroupBy = false; + private int noOfJoins = 0; + private int noOfOuterJoins = 0; + public boolean hasJoin() { - return hasJoin; + return (noOfJoins > 0); + } + + public void incrementJoinCount(boolean noOuterJoin) { + noOfJoins++; + if (!noOuterJoin) { + noOfOuterJoins++; + } + } + + public int getJoinCount() { + return noOfJoins; } - public void setHasJoin(boolean hasJoin) { - this.hasJoin = hasJoin; + public int getOuterJoinCount() { + return noOfOuterJoins; } public boolean hasGroupBy() { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/CostBasedOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/CostBasedOptimizer.java new file mode 100644 index 0000000..77384b7 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CostBasedOptimizer.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +/* + * Entry point to Optimizations using Optiq. + */ +public class CostBasedOptimizer { + + /* + * Currently contract is given a Hive Operator Tree, it returns an optimal + * plan as an Hive AST. - if there are Operators that cannot be handled, will + * return null imply that Semantic Analysis should proceed on the original + * AST. + */ + public static ASTNode optimize(Operator sinkOp, + SemanticAnalyzer semanticAnalyzer, ParseContext pCtx) { + return null; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/PreCBOOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/PreCBOOptimizer.java new file mode 100644 index 0000000..e27964c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/PreCBOOptimizer.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.optimizer.lineage.Generator; +import org.apache.hadoop.hive.ql.optimizer.pcr.PartitionConditionRemover; +import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; +import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.ppd.PredicatePushDown; +import org.apache.hadoop.hive.ql.ppd.PredicateTransitivePropagate; + +/* + * do PredicatePushdown, PartitionPruning and Column Pruning before CBO + */ +public class PreCBOOptimizer { + private ParseContext pctx; + private List transformations; + + /** + * Create the list of transformations. + * + * @param hiveConf + */ + public void initialize(HiveConf hiveConf) { + transformations = new ArrayList(); + // Add the transformation that computes the lineage information. + transformations.add(new Generator()); + transformations.add(new PredicateTransitivePropagate()); + transformations.add(new PredicatePushDown()); + transformations.add(new PartitionPruner()); + transformations.add(new PartitionConditionRemover()); + transformations.add(new ColumnPruner()); + transformations.add(new AnnotateWithStatistics()); + } + + /** + * Invoke all the transformations one-by-one, and alter the query plan. + * + * @return ParseContext + * @throws SemanticException + */ + public ParseContext optimize() throws SemanticException { + for (Transform t : transformations) { + pctx = t.transform(pctx); + } + return pctx; + } + + /** + * @return the pctx + */ + public ParseContext getPctx() { + return pctx; + } + + /** + * @param pctx + * the pctx to set + */ + public void setPctx(ParseContext pctx) { + this.pctx = pctx; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java index 52c39c0..a24cad9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java @@ -131,7 +131,7 @@ public String getErrorMessage(RecognitionException e, String[] tokenNames) { * so that the graph walking algorithms and the rules framework defined in * ql.lib can be used with the AST Nodes. */ - static final TreeAdaptor adaptor = new CommonTreeAdaptor() { + public static final TreeAdaptor adaptor = new CommonTreeAdaptor() { /** * Creates an ASTNode for the given token. The ASTNode is a wrapper around * antlr's CommonTree class that implements the Node interface. diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 77388dd..2fa5afe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -93,7 +93,9 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.optimizer.CostBasedOptimizer; import org.apache.hadoop.hive.ql.optimizer.Optimizer; +import org.apache.hadoop.hive.ql.optimizer.PreCBOOptimizer; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec.SpecType; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression; @@ -251,6 +253,9 @@ //flag for partial scan during analyze ... compute statistics protected boolean partialscan = false; + private volatile boolean runCBO = true; + private volatile boolean disableJoinMerge = false; + /* * Capture the CTE definitions in a Query. */ @@ -315,6 +320,7 @@ protected void reset() { opParseCtx.clear(); groupOpToInputTables.clear(); prunedPartitions.clear(); + disableJoinMerge = false; aliasToCTEs.clear(); } @@ -962,7 +968,6 @@ public boolean doPhase1(ASTNode ast, QB qb, Phase1Ctx ctx_1) frm.getToken().getType() == HiveParser.TOK_LATERAL_VIEW_OUTER) { processLateralView(qb, frm); } else if (isJoinToken(frm)) { - queryProperties.setHasJoin(true); processJoin(qb, frm); qbp.setJoinExpr(frm); }else if(frm.getToken().getType() == HiveParser.TOK_PTBLFUNCTION){ @@ -1179,6 +1184,10 @@ private void getMetaData(QBExpr qbexpr, ReadEntity parentInput) } } + public Table getTable(TableScanOperator ts) { + return topToTable.get(ts); + } + public void getMetaData(QB qb) throws SemanticException { getMetaData(qb, null); } @@ -6467,6 +6476,7 @@ private Operator genJoinOperatorChildren(QBJoinTree join, Operator left, } desc.setNullSafes(nullsafes); } + queryProperties.incrementJoinCount(joinOp.getConf().getNoOuterJoin()); return putOpInsertMap(joinOp, outputRS); } @@ -8823,7 +8833,10 @@ public Operator genPlan(QB qb) throws SemanticException { extractJoinCondsFromWhereClause(joinTree, qb, dest, (ASTNode) whereClause.getChild(0) ); } } - mergeJoinTree(qb); + + if (!disableJoinMerge) { + mergeJoinTree(qb); + } } // if any filters are present in the join tree, push them on top of the @@ -9093,6 +9106,19 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { getMetaData(qb); LOG.info("Completed getting MetaData in Semantic Analysis"); + if (runCBO) { + boolean tokenTypeIsQuery = ast.getToken().getType() == HiveParser.TOK_QUERY + || ast.getToken().getType() == HiveParser.TOK_EXPLAIN; + if (!tokenTypeIsQuery || createVwDesc != null + || !HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CBO_ENABLED)) { + runCBO = false; + } + + if (runCBO) { + disableJoinMerge = true; + } + } + // Save the result schema derived from the sink operator produced // by genPlan. This has the correct column names, which clients // such as JDBC would prefer instead of the c0, c1 we'll end @@ -9102,6 +9128,79 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver()); + if (runCBO) { + /* + * For CBO: 1. Run PreCBOOptimizer on Plan. This applies: Partition + * Pruning, Predicate Pushdown, Column Pruning and Stats Annotation + * transformations on the generated plan. 2. Hand the Plan to CBO, which + * searches the Plan space and returns the best Plan as an AST 3. We then + * run the Analysis Pipeline on the new AST: Phase 1, Get Metadata, Gen + * Plan. a. During Plan Generation, we disable Join Merging, because we + * don't want the Join order to be changed. Error Handling: - On Failure + * during CBO optimization: - We log the error and proceed with the Plan + * generated. - On Failure during Analysis of the new AST: - we restart + * the Analysis from the beginning on the original AST, with runCBO set to + * false. + */ + ASTNode newAST = null; + boolean skipCBOPlan = false; + runCBO = false; + + try { + ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner, + opToPartList, topOps, topSelOps, opParseCtx, joinContext, + smbMapJoinContext, topToTable, topToTableProps, fsopToTable, + loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, + uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, + prunedPartitions, opToSamplePruner, globalLimitCtx, + nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner, + viewAliasToInput, + reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties); + PreCBOOptimizer optm = new PreCBOOptimizer(); + optm.setPctx(pCtx); + optm.initialize(conf); + pCtx = optm.optimize(); + + newAST = CostBasedOptimizer.optimize(sinkOp, this, pCtx); + if (newAST == null) { + skipCBOPlan = true; + LOG.info("CBO failed, skipping CBO"); + } else if (LOG.isDebugEnabled()) { + String newAstExpanded = newAST.dump(); + LOG.debug("CBO rewritten query: \n" + newAstExpanded); + } + } catch (Exception e) { + LOG.warn("CBO failed, skipping CBO", e); + skipCBOPlan = true; + } + + if (!skipCBOPlan) { + try { + init(); + ctx_1 = initPhase1Ctx(); + if (!doPhase1(newAST, qb, ctx_1)) { + throw new RuntimeException( + "Couldn't do phase1 on CBO optimized query plan"); + } + getMetaData(qb); + try { + disableJoinMerge = true; + sinkOp = genPlan(qb); + } finally { + disableJoinMerge = false; + } + + resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp) + .getRowResolver()); + } catch (Exception e) { + LOG.warn("CBO failed, skipping CBO", e); + init(); + analyzeInternal(ast); + return; + } + } + } + ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner, opToPartList, topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, topToTableProps, fsopToTable,