diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a182cd7..4bd9ed1 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -428,6 +428,17 @@
HIVEJOINEMITINTERVAL("hive.join.emit.interval", 1000),
HIVEJOINCACHESIZE("hive.join.cache.size", 25000),
+ /*
+ * Flag to control enabling Cost Based Optimizations using Optiq framework.
+ */
+ HIVE_CBO_ENABLED("hive.cbo.enable", false),
+ /*
+ * Control queries that will be considered for join reordering, based on number of joins in
+ * them. Beyond a certain number of joins, the cost of considering possible permutations
+ * is prohibitive.
+ */
+ HIVE_CBO_MAX_JOINS_SUPPORTED("hive.cbo.max.joins.supported", 10),
+
// hive.mapjoin.bucket.cache.size has been replaced by hive.smbjoin.cache.row,
// need to remove by hive .13. Also, do not change default (see SMB operator)
HIVEMAPJOINBUCKETCACHESIZE("hive.mapjoin.bucket.cache.size", 100),
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index 0d08aa2..eb18f67 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -2344,4 +2344,23 @@
+
+ hive.cbo.enable
+ false
+
+ Placeholder Flag to control enabling Cost Based Optimizations using Optiq framework.
+ Currently turining on this flag, does nothing.
+
+
+
+
+ hive.cbo.max.joins.supported
+ 10
+
+ Control queries that will be considered for join reordering, based on number of joins in
+ them. Beyond a certain number of joins, the cost of considering possible permutations
+ is prohibitive.
+
+
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
index 1ba5654..6decc83 100644
--- ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
+++ ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
@@ -48,12 +48,26 @@
boolean mapJoinRemoved = false;
boolean hasMapGroupBy = false;
+ private int noOfJoins = 0;
+ private int noOfOuterJoins = 0;
+
public boolean hasJoin() {
- return hasJoin;
+ return (noOfJoins > 0);
+ }
+
+ public void incrementJoinCount(boolean noOuterJoin) {
+ noOfJoins++;
+ if (!noOuterJoin) {
+ noOfOuterJoins++;
+ }
+ }
+
+ public int getJoinCount() {
+ return noOfJoins;
}
- public void setHasJoin(boolean hasJoin) {
- this.hasJoin = hasJoin;
+ public int getOuterJoinCount() {
+ return noOfOuterJoins;
}
public boolean hasGroupBy() {
diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/CostBasedOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/CostBasedOptimizer.java
new file mode 100644
index 0000000..77384b7
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/CostBasedOptimizer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+/*
+ * Entry point to Optimizations using Optiq.
+ */
+public class CostBasedOptimizer {
+
+ /*
+ * Currently contract is given a Hive Operator Tree, it returns an optimal
+ * plan as an Hive AST. - if there are Operators that cannot be handled, will
+ * return null imply that Semantic Analysis should proceed on the original
+ * AST.
+ */
+ public static ASTNode optimize(Operator extends OperatorDesc> sinkOp,
+ SemanticAnalyzer semanticAnalyzer, ParseContext pCtx) {
+ return null;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/PreCBOOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/PreCBOOptimizer.java
new file mode 100644
index 0000000..e27964c
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/PreCBOOptimizer.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.optimizer.lineage.Generator;
+import org.apache.hadoop.hive.ql.optimizer.pcr.PartitionConditionRemover;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
+import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.ppd.PredicatePushDown;
+import org.apache.hadoop.hive.ql.ppd.PredicateTransitivePropagate;
+
+/*
+ * do PredicatePushdown, PartitionPruning and Column Pruning before CBO
+ */
+public class PreCBOOptimizer {
+ private ParseContext pctx;
+ private List transformations;
+
+ /**
+ * Create the list of transformations.
+ *
+ * @param hiveConf
+ */
+ public void initialize(HiveConf hiveConf) {
+ transformations = new ArrayList();
+ // Add the transformation that computes the lineage information.
+ transformations.add(new Generator());
+ transformations.add(new PredicateTransitivePropagate());
+ transformations.add(new PredicatePushDown());
+ transformations.add(new PartitionPruner());
+ transformations.add(new PartitionConditionRemover());
+ transformations.add(new ColumnPruner());
+ transformations.add(new AnnotateWithStatistics());
+ }
+
+ /**
+ * Invoke all the transformations one-by-one, and alter the query plan.
+ *
+ * @return ParseContext
+ * @throws SemanticException
+ */
+ public ParseContext optimize() throws SemanticException {
+ for (Transform t : transformations) {
+ pctx = t.transform(pctx);
+ }
+ return pctx;
+ }
+
+ /**
+ * @return the pctx
+ */
+ public ParseContext getPctx() {
+ return pctx;
+ }
+
+ /**
+ * @param pctx
+ * the pctx to set
+ */
+ public void setPctx(ParseContext pctx) {
+ this.pctx = pctx;
+ }
+
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java
index 52c39c0..a24cad9 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java
@@ -131,7 +131,7 @@ public String getErrorMessage(RecognitionException e, String[] tokenNames) {
* so that the graph walking algorithms and the rules framework defined in
* ql.lib can be used with the AST Nodes.
*/
- static final TreeAdaptor adaptor = new CommonTreeAdaptor() {
+ public static final TreeAdaptor adaptor = new CommonTreeAdaptor() {
/**
* Creates an ASTNode for the given token. The ASTNode is a wrapper around
* antlr's CommonTree class that implements the Node interface.
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 77388dd..2fa5afe 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -93,7 +93,9 @@
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.CostBasedOptimizer;
import org.apache.hadoop.hive.ql.optimizer.Optimizer;
+import org.apache.hadoop.hive.ql.optimizer.PreCBOOptimizer;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec.SpecType;
import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
@@ -251,6 +253,9 @@
//flag for partial scan during analyze ... compute statistics
protected boolean partialscan = false;
+ private volatile boolean runCBO = true;
+ private volatile boolean disableJoinMerge = false;
+
/*
* Capture the CTE definitions in a Query.
*/
@@ -315,6 +320,7 @@ protected void reset() {
opParseCtx.clear();
groupOpToInputTables.clear();
prunedPartitions.clear();
+ disableJoinMerge = false;
aliasToCTEs.clear();
}
@@ -962,7 +968,6 @@ public boolean doPhase1(ASTNode ast, QB qb, Phase1Ctx ctx_1)
frm.getToken().getType() == HiveParser.TOK_LATERAL_VIEW_OUTER) {
processLateralView(qb, frm);
} else if (isJoinToken(frm)) {
- queryProperties.setHasJoin(true);
processJoin(qb, frm);
qbp.setJoinExpr(frm);
}else if(frm.getToken().getType() == HiveParser.TOK_PTBLFUNCTION){
@@ -1179,6 +1184,10 @@ private void getMetaData(QBExpr qbexpr, ReadEntity parentInput)
}
}
+ public Table getTable(TableScanOperator ts) {
+ return topToTable.get(ts);
+ }
+
public void getMetaData(QB qb) throws SemanticException {
getMetaData(qb, null);
}
@@ -6467,6 +6476,7 @@ private Operator genJoinOperatorChildren(QBJoinTree join, Operator left,
}
desc.setNullSafes(nullsafes);
}
+ queryProperties.incrementJoinCount(joinOp.getConf().getNoOuterJoin());
return putOpInsertMap(joinOp, outputRS);
}
@@ -8823,7 +8833,10 @@ public Operator genPlan(QB qb) throws SemanticException {
extractJoinCondsFromWhereClause(joinTree, qb, dest, (ASTNode) whereClause.getChild(0) );
}
}
- mergeJoinTree(qb);
+
+ if (!disableJoinMerge) {
+ mergeJoinTree(qb);
+ }
}
// if any filters are present in the join tree, push them on top of the
@@ -9093,6 +9106,19 @@ public void analyzeInternal(ASTNode ast) throws SemanticException {
getMetaData(qb);
LOG.info("Completed getting MetaData in Semantic Analysis");
+ if (runCBO) {
+ boolean tokenTypeIsQuery = ast.getToken().getType() == HiveParser.TOK_QUERY
+ || ast.getToken().getType() == HiveParser.TOK_EXPLAIN;
+ if (!tokenTypeIsQuery || createVwDesc != null
+ || !HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CBO_ENABLED)) {
+ runCBO = false;
+ }
+
+ if (runCBO) {
+ disableJoinMerge = true;
+ }
+ }
+
// Save the result schema derived from the sink operator produced
// by genPlan. This has the correct column names, which clients
// such as JDBC would prefer instead of the c0, c1 we'll end
@@ -9102,6 +9128,79 @@ public void analyzeInternal(ASTNode ast) throws SemanticException {
resultSchema =
convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver());
+ if (runCBO) {
+ /*
+ * For CBO: 1. Run PreCBOOptimizer on Plan. This applies: Partition
+ * Pruning, Predicate Pushdown, Column Pruning and Stats Annotation
+ * transformations on the generated plan. 2. Hand the Plan to CBO, which
+ * searches the Plan space and returns the best Plan as an AST 3. We then
+ * run the Analysis Pipeline on the new AST: Phase 1, Get Metadata, Gen
+ * Plan. a. During Plan Generation, we disable Join Merging, because we
+ * don't want the Join order to be changed. Error Handling: - On Failure
+ * during CBO optimization: - We log the error and proceed with the Plan
+ * generated. - On Failure during Analysis of the new AST: - we restart
+ * the Analysis from the beginning on the original AST, with runCBO set to
+ * false.
+ */
+ ASTNode newAST = null;
+ boolean skipCBOPlan = false;
+ runCBO = false;
+
+ try {
+ ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner,
+ opToPartList, topOps, topSelOps, opParseCtx, joinContext,
+ smbMapJoinContext, topToTable, topToTableProps, fsopToTable,
+ loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId,
+ uCtx, listMapJoinOpsNoReducer, groupOpToInputTables,
+ prunedPartitions, opToSamplePruner, globalLimitCtx,
+ nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner,
+ viewAliasToInput,
+ reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties);
+ PreCBOOptimizer optm = new PreCBOOptimizer();
+ optm.setPctx(pCtx);
+ optm.initialize(conf);
+ pCtx = optm.optimize();
+
+ newAST = CostBasedOptimizer.optimize(sinkOp, this, pCtx);
+ if (newAST == null) {
+ skipCBOPlan = true;
+ LOG.info("CBO failed, skipping CBO");
+ } else if (LOG.isDebugEnabled()) {
+ String newAstExpanded = newAST.dump();
+ LOG.debug("CBO rewritten query: \n" + newAstExpanded);
+ }
+ } catch (Exception e) {
+ LOG.warn("CBO failed, skipping CBO", e);
+ skipCBOPlan = true;
+ }
+
+ if (!skipCBOPlan) {
+ try {
+ init();
+ ctx_1 = initPhase1Ctx();
+ if (!doPhase1(newAST, qb, ctx_1)) {
+ throw new RuntimeException(
+ "Couldn't do phase1 on CBO optimized query plan");
+ }
+ getMetaData(qb);
+ try {
+ disableJoinMerge = true;
+ sinkOp = genPlan(qb);
+ } finally {
+ disableJoinMerge = false;
+ }
+
+ resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp)
+ .getRowResolver());
+ } catch (Exception e) {
+ LOG.warn("CBO failed, skipping CBO", e);
+ init();
+ analyzeInternal(ast);
+ return;
+ }
+ }
+ }
+
ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner,
opToPartList, topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext,
topToTable, topToTableProps, fsopToTable,