diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f49e848..f4d44ee 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2034,7 +2034,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { SPARK_RPC_CHANNEL_LOG_LEVEL("hive.spark.client.channel.log.level", null, "Channel logging level for remote Spark driver. One of {DEBUG, ERROR, INFO, TRACE, WARN}."), SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5", - "Name of the SASL mechanism to use for authentication."); + "Name of the SASL mechanism to use for authentication."), + NWAYJOINREORDER("hive.reorder.nway.joins", true, + "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"); public final String varname; private final String defaultExpr; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java index 065edef..7bed53d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hive.ql.optimizer; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import org.apache.hadoop.hive.ql.exec.JoinOperator; @@ -36,6 +38,8 @@ * implemented, this transformation can also be done based on costs. */ public class JoinReorder implements Transform { + + private final Map, Integer> cache = new HashMap, Integer>(); /** * Estimate the size of the output based on the STREAMTABLE hints. To do so * the whole tree is traversed. Possible sizes: 0: the operator and its @@ -51,37 +55,44 @@ */ private int getOutputSize(Operator operator, Set bigTables) { + + if (cache.containsKey(operator)) { + return cache.get(operator); + } + + int result = 0; // If a join operator contains a big subtree, there is a chance that its // output is also big, so the output size is 1 (medium) if (operator instanceof JoinOperator) { for (Operator o : operator.getParentOperators()) { if (getOutputSize(o, bigTables) != 0) { - return 1; + result = 1; + break; } } } // If a table is in bigTables then its output is big (2) - if (operator instanceof TableScanOperator) { + if (result == 0 && operator instanceof TableScanOperator) { String alias = ((TableScanOperator) operator).getConf().getAlias(); if (bigTables.contains(alias)) { - return 2; + result = 2; } } // For all other kinds of operators, assume the output is as big as the // the biggest output from a parent - int maxSize = 0; - if (operator.getParentOperators() != null) { + if (result == 0 && operator.getParentOperators() != null) { for (Operator o : operator.getParentOperators()) { int current = getOutputSize(o, bigTables); - if (current > maxSize) { - maxSize = current; + if (current > result) { + result = current; } } } - return maxSize; + cache.put(operator, result); + return result; } /** @@ -151,8 +162,10 @@ private void reorder(JoinOperator joinOp, Set bigTables) { * @param pactx * current parse context */ + @Override public ParseContext transform(ParseContext pactx) throws SemanticException { Set bigTables = getBigTables(pactx); + cache.clear(); for (JoinOperator joinOp : pactx.getJoinOps()) { reorder(joinOp, bigTables); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 3482a47..ea5efe5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -128,7 +128,11 @@ public void initialize(HiveConf hiveConf) { } transformations.add(new UnionProcessor()); - transformations.add(new JoinReorder()); + + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.NWAYJOINREORDER)) { + transformations.add(new JoinReorder()); + } + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONING) && HiveConf.getVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equals("nonstrict") && HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITION) &&