diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java index ce67b33..5e47434 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer.physical; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.io.Serializable; import java.io.UnsupportedEncodingException; @@ -50,8 +51,7 @@ import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin; -import - org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx; +import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx; import org.apache.hadoop.hive.ql.plan.ConditionalWork; import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; @@ -98,6 +98,7 @@ * task would be merged with the map-reduce task to create a single map-reduce task. */ public class CommonJoinResolver implements PhysicalPlanResolver { + @Override public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { @@ -121,6 +122,8 @@ public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { */ class CommonJoinTaskDispatcher implements Dispatcher { + long largestTableSize = 0; + HashMap aliasToSize = null; private final PhysicalContext physicalContext; public CommonJoinTaskDispatcher(PhysicalContext context) { @@ -146,7 +149,8 @@ private int getPosition(MapredWork work, Operator joinOp * A task and its child task has been converted from join to mapjoin. * See if the two tasks can be merged. */ - private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task) { + private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task, Configuration conf) + throws IOException { MapRedTask childTask = (MapRedTask)task.getChildTasks().get(0); MapredWork work = task.getWork(); MapredLocalWork localWork = work.getMapLocalWork(); @@ -175,10 +179,7 @@ private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task) { FileSinkOperator fop = (FileSinkOperator)op; String workDir = fop.getConf().getDirName(); - Map> childPathToAliases = childWork.getPathToAliases(); - if (childPathToAliases.size() > 1) { - return; - } + HashMap> childPathToAliases = childWork.getPathToAliases(); // The filesink writes to a different directory if (!childPathToAliases.keySet().iterator().next().equals(workDir)) { @@ -191,9 +192,20 @@ private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task) { return; } - // Merge the trees - if (childWork.getAliasToWork().size() > 1) { - return; + long mapJoinSize = HiveConf.getLongVar(conf, + HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + + long localTableTotalSize = 0; + for (String alias : localWork.getAliasToWork().keySet()) { + localTableTotalSize += aliasToSize.get(alias); + } + + for (String alias : childLocalWork.getAliasToWork().keySet()) { + long size = aliasToSize.get(alias); + localTableTotalSize += size; + if (localTableTotalSize > mapJoinSize) { + return; + } } Operator childAliasOp = @@ -234,6 +246,56 @@ private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task) { } } + private int getBigTablePosition(Map> aliasToWork, + HashMap aliasToSize, MapredWork currWork, JoinOperator joinOp, + Configuration conf) { + // If sizes of atleast n-1 tables in a n-way join is known, and their sum is smaller than + // the threshold size, convert the join into map-join and don't create a conditional task + boolean convertJoinMapJoin = HiveConf.getBoolVar(conf, + HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK); + int bigTablePosition = -1; + if (convertJoinMapJoin) { + // This is the threshold that the user has specified to fit in mapjoin + long mapJoinSize = HiveConf.getLongVar(conf, + HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + + boolean bigTableFound = false; + largestTableSize = 0; + long sumTableSizes = 0; + for (String alias : aliasToWork.keySet()) { + Long size = aliasToSize.get(alias); + // The size is not available at compile time if the input is a sub-query. + // If the size of atleast n-1 inputs for a n-way join are available at compile time, + // and the sum of them is less than the specified threshold, then convert the join + // into a map-join without the conditional task. + if ((size == null) || (size > mapJoinSize)) { + sumTableSizes += largestTableSize; + if (bigTableFound || (sumTableSizes > mapJoinSize)) { + convertJoinMapJoin = false; + break; + } + bigTableFound = true; + bigTablePosition = getPosition(currWork, joinOp, alias); + largestTableSize = mapJoinSize + 1; + } else { + if (size > largestTableSize) { + sumTableSizes += largestTableSize; + largestTableSize = size; + bigTablePosition = getPosition(currWork, joinOp, alias); + } else { + sumTableSizes += size; + } + if (sumTableSizes > mapJoinSize) { + convertJoinMapJoin = false; + break; + } + } + } + } + + return bigTablePosition; + } + // create map join task and set big table as bigTablePosition private ObjectPair convertTaskToMapJoinTask(String xml, int bigTablePosition) throws UnsupportedEncodingException, SemanticException { @@ -283,7 +345,11 @@ private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task) { int numAliases = order.length; long aliasTotalKnownInputSize = 0; - HashMap aliasToSize = new HashMap(); + + if (aliasToSize == null) { + aliasToSize = new HashMap(); + } + try { // go over all the input paths, and calculate a known total size, known // size for each input alias. @@ -318,49 +384,12 @@ private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task) { } Configuration conf = context.getConf(); + int bigTablePosition = getBigTablePosition(aliasToWork, aliasToSize, + currWork, joinOp, conf); - // If sizes of atleast n-1 tables in a n-way join is known, and their sum is smaller than - // the threshold size, convert the join into map-join and don't create a conditional task - boolean convertJoinMapJoin = HiveConf.getBoolVar(conf, - HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK); - int bigTablePosition = -1; - if (convertJoinMapJoin) { - // This is the threshold that the user has specified to fit in mapjoin - long mapJoinSize = HiveConf.getLongVar(conf, - HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); - - boolean bigTableFound = false; - long largestTableSize = 0; - long sumTableSizes = 0; - for (String alias : aliasToWork.keySet()) { - Long size = aliasToSize.get(alias); - // The size is not available at compile time if the input is a sub-query. - // If the size of atleast n-1 inputs for a n-way join are available at compile time, - // and the sum of them is less than the specified threshold, then convert the join - // into a map-join without the conditional task. - if ((size == null) || (size > mapJoinSize)) { - sumTableSizes += largestTableSize; - if (bigTableFound || (sumTableSizes > mapJoinSize)) { - convertJoinMapJoin = false; - break; - } - bigTableFound = true; - bigTablePosition = getPosition(currWork, joinOp, alias); - largestTableSize = mapJoinSize + 1; - } else { - if (size > largestTableSize) { - sumTableSizes += largestTableSize; - largestTableSize = size; - bigTablePosition = getPosition(currWork, joinOp, alias); - } else { - sumTableSizes += size; - } - if (sumTableSizes > mapJoinSize) { - convertJoinMapJoin = false; - break; - } - } - } + boolean convertJoinMapJoin = true; + if (bigTablePosition == -1) { + convertJoinMapJoin = false; } String bigTableAlias = null; @@ -381,7 +410,7 @@ private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task) { // followed by a mapjoin can be performed in a single MR job. if ((newTask.getChildTasks() != null) && (newTask.getChildTasks().size() == 1) && (newTask.getChildTasks().get(0).getTaskTag() == Task.MAPJOIN_ONLY_NOBACKUP)) { - mergeMapJoinTaskWithChildMapJoinTask(newTask); + mergeMapJoinTaskWithChildMapJoinTask(newTask, conf); } return newTask;