diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index fc5ea89..ed40480 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -986,6 +986,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "If hive.auto.convert.join.noconditionaltask is off, this parameter does not take affect. \n" + "However, if it is on, and the sum of size for n-1 of the tables/partitions for a n-way join is smaller than this size, \n" + "the join is directly converted to a mapjoin(there is no conditional task). The default is 10MB"), + HIVECONVERTJOINMAXNUMBER("hive.auto.convert.join.max.number", 2500000L, + "If the sum of row count for n-1 of the tables/partitions for a n-way join is bigger than this size, \n" + + "the join cannot be converted to a mapjoin. The default is 2500000L"), HIVECONVERTJOINUSENONSTAGED("hive.auto.convert.join.use.nonstaged", false, "For conditional joins, if input stream from a small alias can be directly applied to join operator without \n" + "filtering or projection, the alias need not to be pre-staged in distributed cache via mapred local task.\n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index a1b47f4..e5b2000 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -99,6 +99,7 @@ import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; @@ -2673,6 +2674,37 @@ public static long sumOfExcept(Map aliasToSize, return total; } + public static long numberOf(LinkedHashMap aliasToPartnInfo, Set aliases) { + return numsberOfExcept(aliasToPartnInfo, aliases, null); + } + /** + * ignore if partition's statistics is null. + * @param aliasToPartnInfo + * @param aliases + * @param excepts + * @return + */ + public static long numsberOfExcept(LinkedHashMap aliasToPartnInfo, + Set aliases, Set excepts) { + long nums = 0; + for (String alias : aliases) { + if (excepts != null && excepts.contains(alias)) { + continue; + } + PartitionDesc partInfo = aliasToPartnInfo.get(alias); + if (partInfo != null) { + Properties properties = partInfo.isPartitioned() ? + partInfo.getProperties() : partInfo.getTableDesc().getProperties(); + if(properties != null) { + Object rowCount = properties.get(StatsSetupConst.ROW_COUNT); + if (rowCount != null) + nums += Long.parseLong(rowCount.toString()); + } + } + } + return nums; + } + public static boolean isEmptyPath(JobConf job, Path dirPath, Context ctx) throws Exception { if (ctx != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index f500a5e..69da38e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -429,6 +429,10 @@ public static boolean cannotConvert(long aliasKnownSize, // the threshold size, convert the join into map-join and don't create a conditional task boolean convertJoinMapJoin = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK); + + // This is the numbers that the user has specified to fit in mapjoin + long mapJoinNumber = HiveConf.getLongVar(conf, + HiveConf.ConfVars.HIVECONVERTJOINMAXNUMBER); int bigTablePosition = -1; if (convertJoinMapJoin) { // This is the threshold that the user has specified to fit in mapjoin @@ -444,6 +448,10 @@ public static boolean cannotConvert(long aliasKnownSize, if (sumOfOthers < 0 || sumOfOthers > mapJoinSize) { continue; // some small alias is not known or too big } + long numberOfOthers = Utilities.numsberOfExcept(currWork.getAliasToPartnInfo(), aliases, participants); + if (numberOfOthers > mapJoinNumber) { + continue; // some small alias is not known or too big + } if (bigTableSize == null && bigTablePosition >= 0 && tablePosition < bigTablePosition) { continue; // prefer right most alias } @@ -493,6 +501,11 @@ public static boolean cannotConvert(long aliasKnownSize, Operator startOp = joinOp.getParentOperators().get(pos); Set aliases = GenMapRedUtils.findAliases(currWork, startOp); + long numberOf = Utilities.numberOf(currWork.getAliasToPartnInfo(), aliases); + if (numberOf > mapJoinNumber) { + continue; // some small alias is not known or too big + } + long aliasKnownSize = Utilities.sumOf(aliasToSize, aliases); if (cannotConvert(aliasKnownSize, aliasTotalKnownInputSize, ThresholdOfSmallTblSizeSum)) { continue;