diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index d8f37ae..467a878 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -196,25 +196,36 @@ private int convertJoinBucketMapJoin(JoinOperator joinOp, MapJoinOperator mapJoi // max. This table is either the big table or we cannot convert. boolean bigTableFound = false; boolean useTsStats = context.getConf().getBoolean(HiveConf.ConfVars.SPARK_USE_FILE_SIZE_FOR_MAPJOIN.varname, false); - boolean hasUpstreamSinks = false; - // Check whether there's any upstream RS. - // If so, don't use TS stats because they could be inaccurate. - for (Operator parentOp : joinOp.getParentOperators()) { - Set parentSinks = - OperatorUtils.findOperatorsUpstream(parentOp, ReduceSinkOperator.class); - parentSinks.remove(parentOp); - if (!parentSinks.isEmpty()) { - hasUpstreamSinks = true; + // Check each branch and see if there's any upstream RS on the branch + // If so, mark the branch as the big table. + if (useTsStats) { + for (Operator parentOp : joinOp.getParentOperators()) { + Set parentSinks = + OperatorUtils.findOperatorsUpstream(parentOp, ReduceSinkOperator.class); + parentSinks.remove(parentOp); + if (!parentSinks.isEmpty()) { + if (bigTablePosition < 0 && bigTableCandidateSet.contains(pos)) { + bigTablePosition = pos; + bigTableFound = true; + } else { + // We found multiple branches with upstream RS. Disable mapjoin. + return new long[]{-1, 0, 0}; + } + } + pos++; } } - // If we are using TS stats and this JOIN has at least one upstream RS, disable MapJoin conversion. - if (useTsStats && hasUpstreamSinks) { - return new long[]{-1, 0, 0}; - } + pos = 0; for (Operator parentOp : joinOp.getParentOperators()) { + // Skip the potential big table identified above + if (pos == bigTablePosition) { + pos++; + continue; + } + Statistics currInputStat; if (useTsStats) { currInputStat = new Statistics(); @@ -255,9 +266,7 @@ private int convertJoinBucketMapJoin(JoinOperator joinOp, MapJoinOperator mapJoi } long inputSize = currInputStat.getDataSize(); - if ((bigInputStat == null) - || ((bigInputStat != null) - && (inputSize > bigInputStat.getDataSize()))) { + if (bigInputStat == null || inputSize > bigInputStat.getDataSize()) { if (bigTableFound) { // cannot convert to map join; we've already chosen a big table