diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 011dadf495..a052936afb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -1121,6 +1121,8 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo } } + // Before returning, add the mapjoin to the parseContext + context.parseContext.getMapJoinOps().add(mapJoinOp); return mapJoinOp; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index c3eb886fd2..eb41fc6be1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -29,8 +29,11 @@ import java.util.Map; import java.util.Set; import java.util.Stack; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; @@ -65,6 +68,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.PreOrderOnceWalker; +import org.apache.hadoop.hive.ql.lib.PreOrderWalker; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -428,7 +432,8 @@ private void semijoinRemovalBasedTransformations(OptimizeTezProcContext procCtx, final boolean dynamicPartitionPruningEnabled = procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING); final boolean semiJoinReductionEnabled = dynamicPartitionPruningEnabled && - procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION); + procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION) && + procCtx.parseContext.getRsToSemiJoinBranchInfo().size() != 0; final boolean extendedReductionEnabled = dynamicPartitionPruningEnabled && procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING_EXTENDED); @@ -438,46 +443,31 @@ private void semijoinRemovalBasedTransformations(OptimizeTezProcContext procCtx, } perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Run remove dynamic pruning by size"); - perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); if (semiJoinReductionEnabled) { + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); markSemiJoinForDPP(procCtx); - } - perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Mark certain semijoin edges important based "); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Mark certain semijoin edges important based "); - // Removing semijoin optimization when it may not be beneficial - perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); - if (semiJoinReductionEnabled) { - removeSemijoinOptimizationByBenefit(procCtx); - } - perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove Semijoins based on cost benefits"); - - // Remove any parallel edge between semijoin and mapjoin. - perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); - if (semiJoinReductionEnabled) { + // Remove any parallel edge between semijoin and mapjoin. + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); removeSemijoinsParallelToMapJoin(procCtx); - } - perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove any parallel edge between semijoin and mapjoin"); - - // Remove semijoin optimization if it creates a cycle with mapside joins - perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); - if (semiJoinReductionEnabled && procCtx.parseContext.getRsToSemiJoinBranchInfo().size() != 0) { - removeSemiJoinCyclesDueToMapsideJoins(procCtx); - } - perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove semijoin optimizations if it creates a cycle with mapside join"); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove any parallel edge between semijoin and mapjoin"); - // Remove semijoin optimization if SMB join is created. - perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); - if (semiJoinReductionEnabled && procCtx.parseContext.getRsToSemiJoinBranchInfo().size() != 0) { + // Remove semijoin optimization if SMB join is created. + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); removeSemijoinOptimizationFromSMBJoins(procCtx); - } - perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove semijoin optimizations if needed"); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove semijoin optimizations if needed"); - // Remove bloomfilter if no stats generated - perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); - if (semiJoinReductionEnabled && procCtx.parseContext.getRsToSemiJoinBranchInfo().size() != 0) { + // Remove bloomfilter if no stats generated + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); removeSemiJoinIfNoStats(procCtx); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove bloom filter optimizations if needed"); + + // Removing semijoin optimization when it may not be beneficial + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); + removeSemijoinOptimizationByBenefit(procCtx); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove Semijoins based on cost benefits"); } - perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove bloom filter optimizations if needed"); // after the stats phase we might have some cyclic dependencies that we need // to take care of. @@ -487,6 +477,13 @@ private void semijoinRemovalBasedTransformations(OptimizeTezProcContext procCtx, } perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Run cycle analysis for partition pruning"); + if (semiJoinReductionEnabled) { + // Remove semijoin optimization if it creates a cycle with mapside joins + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); + removeSemiJoinsDueToTaskLevelCycles(procCtx); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove semijoin optimizations if it creates a cycle with mapside join"); + } + // remove redundant dpp and semijoins perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER); if (extendedReductionEnabled) { @@ -843,7 +840,7 @@ private static void removeSemijoinOptimizationFromSMBJoins( } private static class SemiJoinCycleRemovalDueTOMapsideJoinContext implements NodeProcessorCtx { - HashMap,Operator> childParentMap = new HashMap,Operator>(); + Map, Operator>> parentChildMap = new HashMap<>(); } private static class SemiJoinCycleRemovalDueToMapsideJoins implements NodeProcessor { @@ -854,7 +851,16 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, SemiJoinCycleRemovalDueTOMapsideJoinContext ctx = (SemiJoinCycleRemovalDueTOMapsideJoinContext) procCtx; - ctx.childParentMap.put((Operator)stack.get(stack.size() - 2), (Operator) nd); + Operator child = (Operator) nd; + String childName = OperatorUtils.getOpNamePretty(child); + for (int i = stack.size() - 2; i >= 0; i--) { + Operator curOp = (Operator)stack.get(i); + if (curOp instanceof MapJoinOperator || curOp instanceof CommonMergeJoinOperator) { + // Make pretty name string + String pairName = OperatorUtils.getOpNamePretty(curOp) + " " + childName; + ctx.parentChildMap.put(pairName, new ImmutablePair<>(curOp, child)); + } + } return null; } } @@ -863,19 +869,19 @@ private static void removeSemiJoinCyclesDueToMapsideJoins( OptimizeTezProcContext procCtx) throws SemanticException { Map opRules = new LinkedHashMap(); opRules.put( - new RuleRegExp("R1", MapJoinOperator.getOperatorName() + "%" + + new RuleRegExp("R1", MapJoinOperator.getOperatorName() + "%.*%" + MapJoinOperator.getOperatorName() + "%"), new SemiJoinCycleRemovalDueToMapsideJoins()); opRules.put( - new RuleRegExp("R2", MapJoinOperator.getOperatorName() + "%" + + new RuleRegExp("R2", MapJoinOperator.getOperatorName() + "%.*%" + CommonMergeJoinOperator.getOperatorName() + "%"), new SemiJoinCycleRemovalDueToMapsideJoins()); opRules.put( - new RuleRegExp("R3", CommonMergeJoinOperator.getOperatorName() + "%" + + new RuleRegExp("R3", CommonMergeJoinOperator.getOperatorName() + "%.*%" + MapJoinOperator.getOperatorName() + "%"), new SemiJoinCycleRemovalDueToMapsideJoins()); opRules.put( - new RuleRegExp("R4", CommonMergeJoinOperator.getOperatorName() + "%" + + new RuleRegExp("R4", CommonMergeJoinOperator.getOperatorName() + "%.*%" + CommonMergeJoinOperator.getOperatorName() + "%"), new SemiJoinCycleRemovalDueToMapsideJoins()); @@ -884,19 +890,23 @@ private static void removeSemiJoinCyclesDueToMapsideJoins( Dispatcher disp = new DefaultRuleDispatcher(null, opRules, ctx); List topNodes = new ArrayList(); topNodes.addAll(procCtx.parseContext.getTopOps().values()); - GraphWalker ogw = new PreOrderOnceWalker(disp); + GraphWalker ogw = new PreOrderWalker(disp); ogw.startWalking(topNodes, null); // process the list ParseContext pCtx = procCtx.parseContext; - for (Operator parentJoin : ctx.childParentMap.keySet()) { - Operator childJoin = ctx.childParentMap.get(parentJoin); + for (Pair, Operator> pair : ctx.parentChildMap.values()) { + Operator parentJoin = pair.getLeft(); + Operator childJoin = pair.getRight(); + + LOG.info("Deepak : parentJoin = " + OperatorUtils.getOpNamePretty(parentJoin) + " childJoin = " + OperatorUtils.getOpNamePretty(childJoin)); if (parentJoin.getChildOperators().size() == 1) { continue; } - for (Operator child : parentJoin.getChildOperators()) { + for (Node childNode : parentJoin.getChildren()) { + Operator child = (Operator) childNode; if (!(child instanceof SelectOperator)) { continue; } @@ -919,28 +929,194 @@ private static void removeSemiJoinCyclesDueToMapsideJoins( // This is a semijoin branch. Find if this is creating a potential // cycle with childJoin. for (Operator parent : childJoin.getParentOperators()) { - if (parent == parentJoin) { + if (parent == parentJoin || + !(parent instanceof ReduceSinkOperator)) { continue; } - assert parent instanceof ReduceSinkOperator; - while (parent.getParentOperators().size() > 0) { - parent = parent.getParentOperators().get(0); + // Fetch all the parent TS ops + Set tsOps = OperatorUtils.findOperatorsUpstream(parent, + TableScanOperator.class); + for (TableScanOperator parentTS : tsOps) { + // If the parent is same as the ts, then we have a cycle. + if (ts == parentTS) { + // We have a cycle! + if (sjInfo.getIsHint()) { + throw new SemanticException("Removing hinted semijoin as it is creating cycles with mapside joins " + rs + " : " + ts); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Semijoin cycle due to mapjoin. Removing semijoin " + + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(ts)); + } + GenTezUtils.removeBranch(rs); + GenTezUtils.removeSemiJoinOperator(pCtx, rs, ts); + break; + } } + } + } + } + } - if (parent == ts) { - // We have a cycle! - if (sjInfo.getIsHint()) { - throw new SemanticException("Removing hinted semijoin as it is creating cycles with mapside joins " + rs + " : " + ts); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Semijoin cycle due to mapjoin. Removing semijoin " - + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(ts)); - } - GenTezUtils.removeBranch(rs); - GenTezUtils.removeSemiJoinOperator(pCtx, rs, ts); + public static Set> + findWorkOperatorsAndSemiJoinEdges(Operator start, Set> found, + final Map rsToSemiJoinBranchInfo, + Set rsOps) { + found.add(start); + if (start.getParentOperators() != null) { + for (Operator parent : start.getParentOperators()) { + if (parent instanceof ReduceSinkOperator) { + continue; + } + if (!found.contains(parent)) { + findWorkOperatorsAndSemiJoinEdges(parent, found, rsToSemiJoinBranchInfo, rsOps); + } + } + } + if (start instanceof ReduceSinkOperator) { + // This could be RS1 in semijoin edge which looks like, + // SEL->GBY1->RS1->GBY2->RS2 + if (start.getChildOperators().size() == 1) { + Operator gb2 = start.getChildOperators().get(0); + if (gb2 instanceof GroupByOperator && gb2.getChildOperators().size() == 1) { + Operator rs2 = gb2.getChildOperators().get(0); + if (rs2 instanceof ReduceSinkOperator && (rsToSemiJoinBranchInfo.get(rs2) != null)) { + // Semijoin edge found. All the operators to the set + found.add(start); + found.add(gb2); + found.add(rs2); + rsOps.add((ReduceSinkOperator)rs2); + } + } + } + return found; + } + if (start.getChildOperators() != null) { + for (Operator child : start.getChildOperators()) { + if (!found.contains(child)) { + findWorkOperatorsAndSemiJoinEdges(child, found, rsToSemiJoinBranchInfo, rsOps); + } + } + } + return found; + } + + private Set getTsOpsUpstream(Operator op, + Map, Set> opToTsSet) { + /*Set tsSet = opToTsSet.get(op); + if (tsSet != null) { + return tsSet; + }*/ + // It is guaranteed that the current op is not traversed + Set tsSet = new HashSet<>(); + for (Operator parent : op.getParentOperators()) { + if (parent instanceof TableScanOperator) { + // TS Op found, add to the set + tsSet.add((TableScanOperator)parent); + } else { + Set parentTsSet = opToTsSet.get(parent); + if (parentTsSet != null) { + // Node already traversed + tsSet.addAll(parentTsSet); + } else { + // traverse the node + tsSet.addAll(getTsOpsUpstream(parent, opToTsSet)); + } + } + } + opToTsSet.put(op, tsSet); + return tsSet; + } + + private static class WorkOpsInfo { + public Set> allworkOps; + public Set rsOps; + + WorkOpsInfo() { + allworkOps = new HashSet<>(); + rsOps = new HashSet<>(); + } + + WorkOpsInfo(Set> allworkOps, Set rsOps) { + this.allworkOps = allworkOps; + this.rsOps = rsOps; + } + } + + // Hashjoins may create complex task level cycles which may involve more than 2 tasks. + private void removeSemiJoinsDueToTaskLevelCycles(OptimizeTezProcContext procCtx) + throws SemanticException { + ParseContext pCtx = procCtx.parseContext; + // Get the set of hashjoins + Set mapJoins = pCtx.getMapJoinOps(); + + // Create a map of mapjoins to work ops to ensure no work is examined more than once + Map mapJoinToWorkOps = new HashMap<>(); + + // Get all the work operator sets. + for (MapJoinOperator mj : mapJoins) { + WorkOpsInfo workOpsInfo = mapJoinToWorkOps.get(mj); + if (workOpsInfo != null) { + continue; // done with this mapjoin + } + + // If the set contains atleast one of the RS from semijoin edges, its a candidate + Set rsOps = new HashSet<>(); + Set> ops = findWorkOperatorsAndSemiJoinEdges(mj, + new HashSet<>(), pCtx.getRsToSemiJoinBranchInfo(), rsOps); + + WorkOpsInfo empty = new WorkOpsInfo(); + WorkOpsInfo candidate = new WorkOpsInfo(ops, rsOps); + // A work may contain more than 1 mapjoin, traverse the set to add this hashset for all of them. + for (Operator op : ops) { + if (op instanceof MapJoinOperator) { + mapJoinToWorkOps.put((MapJoinOperator)op, rsOps.isEmpty() ? empty : candidate); + } + } + } + + boolean cycleFree = false; + while(!cycleFree) { + cycleFree = true; + // Create a map of operator to TS ops set upstream, to reuse and avoid multiple traversals + Map, Set> opToTsSet = new HashMap<>(); + Set tsSet = new HashSet<>(); + for (WorkOpsInfo workOpsInfo : mapJoinToWorkOps.values()) { + if (workOpsInfo.rsOps.isEmpty()) { + continue; // Must have been processed before + } + // Go through each operator in the set + for (Operator op : workOpsInfo.allworkOps) { + // aggregate them all + tsSet.addAll(getTsOpsUpstream(op, opToTsSet)); + } + // Map to store cycle creating semijoins + Map rsToTsMap = new HashMap<>(); + // Go through each RS from semijoin edges in the work and collect the + // ones creating cycle + for (ReduceSinkOperator rs : workOpsInfo.rsOps) { + TableScanOperator ts = pCtx.getRsToSemiJoinBranchInfo().get(rs).getTsOp(); + if (tsSet.contains(ts)) { + // cycle! + rsToTsMap.put(rs, ts); + cycleFree = false; } } + + // Remove the bad ones + for (ReduceSinkOperator rs : rsToTsMap.keySet()) { + TableScanOperator ts = rsToTsMap.get(rs); + if (LOG.isDebugEnabled()) { + LOG.debug("removeSemiJoinsDueToTaskLevelCycles : Semijoin cycle due to mapjoin. Removing semijoin " + + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(ts)); + } + GenTezUtils.removeBranch(rs); + GenTezUtils.removeSemiJoinOperator(pCtx, rs, ts); + + // Cleanup internal data structures + workOpsInfo.rsOps.remove(rs); + workOpsInfo.allworkOps.remove(rs); + } } } } @@ -1558,7 +1734,7 @@ private static double computeBloomFilterNetBenefit( private void removeSemijoinOptimizationByBenefit(OptimizeTezProcContext procCtx) throws SemanticException { - List semijoinRsToRemove = new ArrayList(); + List semijoinRsToRemove = new ArrayList<>(); Map map = procCtx.parseContext.getRsToSemiJoinBranchInfo(); double semijoinReductionThreshold = procCtx.conf.getFloatVar( HiveConf.ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_THRESHOLD);