diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 2b93e01..d32198b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.optimizer; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -58,6 +57,7 @@ import org.apache.hadoop.hive.ql.plan.OpTraits; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.Statistics; +import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -362,7 +362,6 @@ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcCont // each side better have 0 or more RS. if either side is unbalanced, cannot convert. // This is a workaround for now. Right fix would be to refactor code in the // MapRecordProcessor and ReduceRecordProcessor with respect to the sources. - @SuppressWarnings({"rawtypes","unchecked"}) Set set = OperatorUtils.findOperatorsUpstream(parentOp.getParentOperators(), ReduceSinkOperator.class); @@ -547,8 +546,8 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c Set bigTableCandidateSet = MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds()); int bigTablePosition = -1; - // big input cumulative row count - long bigInputCumulativeCardinality = -1L; + // big input cost + long bigInputCost = -1L; // stats of the big input Statistics bigInputStat = null; @@ -592,27 +591,27 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c } } - long currentInputCumulativeCardinality; + long currentInputCost; if (foundInputNotFittingInMemory) { - currentInputCumulativeCardinality = -1L; + currentInputCost = -1L; } else { - Long cardinality = computeCumulativeCardinality(parentOp); - if (cardinality == null) { + Long cost = computeCost(parentOp); + if (cost == null) { // We could not get stats, we cannot convert return -1; } - currentInputCumulativeCardinality = cardinality; + currentInputCost = cost; } // This input is the big table if it is contained in the big candidates set, and either: // 1) we have not chosen a big table yet, or // 2) it has been chosen as the big table above, or - // 3) the cumulative cardinality for this input is higher, or - // 4) the cumulative cardinality is equal, but the size is bigger, + // 3) the cost for this input is higher, or + // 4) the cost is equal, but the size is bigger, boolean selectedBigTable = bigTableCandidateSet.contains(pos) && (bigInputStat == null || currentInputNotFittingInMemory || - (!foundInputNotFittingInMemory && (currentInputCumulativeCardinality > bigInputCumulativeCardinality || - (currentInputCumulativeCardinality == bigInputCumulativeCardinality && inputSize > bigInputStat.getDataSize())))); + (!foundInputNotFittingInMemory && (currentInputCost > bigInputCost || + (currentInputCost == bigInputCost && inputSize > bigInputStat.getDataSize())))); if (bigInputStat != null && selectedBigTable) { // We are replacing the current big table with a new one, thus @@ -632,7 +631,7 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c if (selectedBigTable) { bigTablePosition = pos; - bigInputCumulativeCardinality = currentInputCumulativeCardinality; + bigInputCost = currentInputCost; bigInputStat = currInputStat; } @@ -641,6 +640,22 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c return bigTablePosition; } + // TODO: broadcast side should be determined with the extended cost model + // once it is in place + private static Long computeCost(Operator op) { + // Cumulative cardinality of the subtree rooted at op + // (estimate of computation cost) + Long cumulativeCardinality = computeCumulativeCardinality(op); + if (cumulativeCardinality == null) { + // Bail out + return null; + } + // Cardinality of the top operator in the subtree + // (estimate of network cost) + Long opCardinality = op.getStatistics().getNumRows(); + return StatsUtils.safeAdd(cumulativeCardinality, opCardinality); + } + // This is akin to CBO cumulative cardinality model private static Long computeCumulativeCardinality(Operator op) { long cumulativeCardinality = 0L; @@ -662,7 +677,7 @@ private static Long computeCumulativeCardinality(Operator