diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f49e848..f4d44ee 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2034,7 +2034,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { SPARK_RPC_CHANNEL_LOG_LEVEL("hive.spark.client.channel.log.level", null, "Channel logging level for remote Spark driver. One of {DEBUG, ERROR, INFO, TRACE, WARN}."), SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5", - "Name of the SASL mechanism to use for authentication."); + "Name of the SASL mechanism to use for authentication."), + NWAYJOINREORDER("hive.reorder.nway.joins", true, + "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"); public final String varname; private final String defaultExpr; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java index 065edef..e31dc9b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/JoinReorder.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.optimizer; import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.Map; import java.util.Set; import org.apache.hadoop.hive.ql.exec.JoinOperator; @@ -36,6 +38,8 @@ * implemented, this transformation can also be done based on costs. */ public class JoinReorder implements Transform { + + private final Map, Integer> cache = new IdentityHashMap, Integer>(); /** * Estimate the size of the output based on the STREAMTABLE hints. To do so * the whole tree is traversed. Possible sizes: 0: the operator and its @@ -49,8 +53,25 @@ * @return The estimated size - 0 (no streamed tables), 1 (streamed tables in * subtree) or 2 (a streamed table) */ + private int getOutputSize(Operator operator, Set bigTables) { + + // memoize decorator for getOutputSizeInternal + if (cache.containsKey(operator)) { + return cache.get(operator); + } + + int result = getOutputSizeInternal(operator, bigTables); + + cache.put(operator, result); + + return result; + } + + private int getOutputSizeInternal(Operator operator, + Set bigTables) { + // If a join operator contains a big subtree, there is a chance that its // output is also big, so the output size is 1 (medium) if (operator instanceof JoinOperator) { @@ -74,6 +95,7 @@ private int getOutputSize(Operator operator, int maxSize = 0; if (operator.getParentOperators() != null) { for (Operator o : operator.getParentOperators()) { + // recurse into memoized decorator int current = getOutputSize(o, bigTables); if (current > maxSize) { maxSize = current; @@ -151,8 +173,10 @@ private void reorder(JoinOperator joinOp, Set bigTables) { * @param pactx * current parse context */ + @Override public ParseContext transform(ParseContext pactx) throws SemanticException { Set bigTables = getBigTables(pactx); + cache.clear(); for (JoinOperator joinOp : pactx.getJoinOps()) { reorder(joinOp, bigTables); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 3482a47..ea5efe5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -128,7 +128,11 @@ public void initialize(HiveConf hiveConf) { } transformations.add(new UnionProcessor()); - transformations.add(new JoinReorder()); + + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.NWAYJOINREORDER)) { + transformations.add(new JoinReorder()); + } + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONING) && HiveConf.getVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equals("nonstrict") && HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITION) &&