diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index 3d664c1..44b8b19 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.NodeUtils.Function; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.mapred.OutputCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -297,4 +298,22 @@ public static int countOperatorsUpstream(Operator start, Set op) { + long cumulativeCardinality = 0L; + for (Operator inputOp : op.getParentOperators()) { + Long inputCardinality = computeCumulativeCardinality(inputOp); + if (inputCardinality == null) { + return null; + } + cumulativeCardinality += inputCardinality; + } + Statistics currInputStat = op.getStatistics(); + if (currInputStat == null) { + LOG.warn("Couldn't get statistics from: " + op); + return null; + } + cumulativeCardinality += currInputStat.getNumRows(); + return cumulativeCardinality; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 00bc193..e4fff29 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -27,27 +27,21 @@ import java.util.Set; import java.util.Stack; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; -import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; -import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MuxOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.OperatorUtils; -import org.apache.hadoop.hive.ql.exec.PTFOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator; -import org.apache.hadoop.hive.ql.exec.UDTFOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -64,8 +58,8 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.util.ReflectionUtils; - -import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * ConvertJoinMapJoin is an optimization that replaces a common join @@ -78,16 +72,6 @@ private static final Logger LOG = LoggerFactory.getLogger(ConvertJoinMapJoin.class.getName()); - @SuppressWarnings({ "unchecked", "rawtypes" }) - private static final Set>> COSTLY_OPERATORS = - new ImmutableSet.Builder() - .add(CommonJoinOperator.class) - .add(GroupByOperator.class) - .add(LateralViewJoinOperator.class) - .add(PTFOperator.class) - .add(ReduceSinkOperator.class) - .add(UDTFOperator.class) - .build(); @Override /* @@ -144,11 +128,13 @@ return null; } } + + // We need to calculate back the map join conversion pos with no bucket scaling. + mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1); } LOG.info("Convert to non-bucketed map join"); // check if we can convert to map join no bucket scaling. - mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1); if (mapJoinConversionPos < 0) { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. @@ -557,8 +543,8 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); int bigTablePosition = -1; - // number of costly ops (Join, GB, PTF/Windowing, TF) below the big input - int bigInputNumberCostlyOps = -1; + // big input cumulative row count + Long bigInputCumulativeCardinality = -1L; // stats of the big input Statistics bigInputStat = null; @@ -602,18 +588,27 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c } } - int currentInputNumberCostlyOps = foundInputNotFittingInMemory ? - -1 : OperatorUtils.countOperatorsUpstream(parentOp, COSTLY_OPERATORS); + long currentInputCumulativeCardinality; + if (foundInputNotFittingInMemory) { + currentInputCumulativeCardinality = -1L; + } else { + Long cardinality = OperatorUtils.computeCumulativeCardinality(parentOp); + if (cardinality == null) { + // We could not get stats, we cannot convert + return -1; + } + currentInputCumulativeCardinality = cardinality; + } // This input is the big table if it is contained in the big candidates set, and either: // 1) we have not chosen a big table yet, or // 2) it has been chosen as the big table above, or - // 3) the number of costly operators for this input is higher, or - // 4) the number of costly operators is equal, but the size is bigger, + // 3) the cumulative cardinality for this input is higher, or + // 4) the cumulative cardinality is equal, but the size is bigger, boolean selectedBigTable = bigTableCandidateSet.contains(pos) && (bigInputStat == null || currentInputNotFittingInMemory || - (!foundInputNotFittingInMemory && (currentInputNumberCostlyOps > bigInputNumberCostlyOps || - (currentInputNumberCostlyOps == bigInputNumberCostlyOps && inputSize > bigInputStat.getDataSize())))); + (!foundInputNotFittingInMemory && (currentInputCumulativeCardinality > bigInputCumulativeCardinality || + (currentInputCumulativeCardinality == bigInputCumulativeCardinality && inputSize > bigInputStat.getDataSize())))); if (bigInputStat != null && selectedBigTable) { // We are replacing the current big table with a new one, thus @@ -633,7 +628,7 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c if (selectedBigTable) { bigTablePosition = pos; - bigInputNumberCostlyOps = currentInputNumberCostlyOps; + bigInputCumulativeCardinality = currentInputCumulativeCardinality; bigInputStat = currInputStat; }