diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c2456714c2..11a11b23be 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2317,6 +2317,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Whether to enable shared work extended optimizer. The optimizer tries to merge equal operators\n" + "after a work boundary after shared work optimizer has been executed. Requires hive.optimize.shared.work\n" + "to be set to true. Tez only."), + HIVE_SHARED_WORK_SEMIJOIN_OPTIMIZATION("hive.optimize.shared.work.semijoin", true, + "Whether to enable shared work extended optimizer for semijoins. Tez only."), HIVE_SHARED_WORK_REUSE_MAPJOIN_CACHE("hive.optimize.shared.work.mapjoin.cache.reuse", true, "When shared work optimizer is enabled, whether we should reuse the cache for the broadcast side\n" + "of mapjoin operators that share same broadcast input. Requires hive.optimize.shared.work\n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java index 0cb3b21fd8..2908038750 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java @@ -138,7 +138,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { gatherDPPTableScanOps(pctx, optimizerCache); // Map of dbName.TblName -> TSOperator - Multimap tableNameToOps = splitTableScanOpsByTable(pctx); + ArrayListMultimap tableNameToOps = splitTableScanOpsByTable(pctx); // We enforce a certain order when we do the reutilization. // In particular, we use size of table x number of reads to @@ -146,7 +146,126 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { List> sortedTables = rankTablesByAccumulatedSize(pctx); LOG.debug("Sorted tables by size: {}", sortedTables); - // Execute optimization + // Execute shared scan optimization + sharedWorkOptimization(pctx, optimizerCache, tableNameToOps, sortedTables, false); + + if (LOG.isDebugEnabled()) { + LOG.debug("After SharedWorkOptimizer:\n" + Operator.toString(pctx.getTopOps().values())); + } + + if (pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_EXTENDED_OPTIMIZATION)) { + // Execute shared work optimization + sharedWorkExtendedOptimization(pctx, optimizerCache); + + if (LOG.isDebugEnabled()) { + LOG.debug("After SharedWorkExtendedOptimizer:\n" + + Operator.toString(pctx.getTopOps().values())); + } + } + + if (pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_SEMIJOIN_OPTIMIZATION)) { + // Map of dbName.TblName -> TSOperator + tableNameToOps = splitTableScanOpsByTable(pctx); + // We rank by size of table x number of reads + sortedTables = rankTablesByAccumulatedSize(pctx); + + // Execute shared scan optimization with semijoin removal + boolean optimized = sharedWorkOptimization(pctx, optimizerCache, tableNameToOps, sortedTables, true); + if (optimized) { + // If it was further optimized, execute a second round of extended shared work optimizer + sharedWorkExtendedOptimization(pctx, optimizerCache); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("After SharedWorkSJOptimizer:\n" + + Operator.toString(pctx.getTopOps().values())); + } + } + + if(pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_REUSE_MAPJOIN_CACHE)) { + // Try to reuse cache for broadcast side in mapjoin operators that + // share same input. + // First we group together all the mapjoin operators that share same + // reduce sink operator. + final Multimap, MapJoinOperator> parentToMapJoinOperators = + ArrayListMultimap.create(); + final Set> visitedOperators = new HashSet<>(); + for (Entry, Collection>> e : + optimizerCache.operatorToWorkOperators.asMap().entrySet()) { + if (visitedOperators.contains(e.getKey())) { + // Already visited this work, we move on + continue; + } + for (Operator op : e.getValue()) { + if (op instanceof MapJoinOperator) { + MapJoinOperator mapJoinOp = (MapJoinOperator) op; + // Only allowed for mapjoin operator + if (!mapJoinOp.getConf().isBucketMapJoin() && + !mapJoinOp.getConf().isDynamicPartitionHashJoin()) { + parentToMapJoinOperators.put( + obtainBroadcastInput(mapJoinOp).getParentOperators().get(0), mapJoinOp); + } + } + visitedOperators.add(op); + } + } + // For each group, set the cache key accordingly if there is more than one operator + // and input RS operator are equal + for (Collection c : parentToMapJoinOperators.asMap().values()) { + Map rsOpToCacheKey = new HashMap<>(); + for (MapJoinOperator mapJoinOp : c) { + ReduceSinkOperator rsOp = obtainBroadcastInput(mapJoinOp); + String cacheKey = null; + for (Entry e: rsOpToCacheKey.entrySet()) { + if (compareOperator(pctx, rsOp, e.getKey())) { + cacheKey = e.getValue(); + break; + } + } + if (cacheKey == null) { + // Either it is the first map join operator or there was no equivalent RS, + // hence generate cache key + cacheKey = MapJoinDesc.generateCacheKey(mapJoinOp.getOperatorId()); + rsOpToCacheKey.put(rsOp, cacheKey); + } + // Set in the conf of the map join operator + mapJoinOp.getConf().setCacheKey(cacheKey); + } + } + } + + // If we are running tests, we are going to verify that the contents of the cache + // correspond with the contents of the plan, and otherwise we fail. + // This check always run when we are running in test mode, independently on whether + // we use the basic or the extended version of the optimizer. + if (pctx.getConf().getBoolVar(ConfVars.HIVE_IN_TEST)) { + Set> visited = new HashSet<>(); + Iterator> it = topOps.entrySet().iterator(); + while (it.hasNext()) { + Entry e = it.next(); + for (Operator op : OperatorUtils.findOperators(e.getValue(), Operator.class)) { + if (!visited.contains(op)) { + Set> workCachedOps = findWorkOperators(optimizerCache, op); + Set> workPlanOps = findWorkOperators(op, new HashSet<>()); + if (!workCachedOps.equals(workPlanOps)) { + throw new SemanticException("Error in shared work optimizer: operator cache contents " + + "and actual plan differ\nIn cache: " + workCachedOps + "\nIn plan: " + workPlanOps); + } + visited.add(op); + } + } + } + } + + return pctx; + } + + private static boolean sharedWorkOptimization(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, + ArrayListMultimap tableNameToOps, List> sortedTables, + boolean removeSemijoin) throws SemanticException { + // Boolean to keep track of whether this method actually merged any TS operators + boolean mergedExecuted = false; + Multimap existingOps = ArrayListMultimap.create(); Set> removedOps = new HashSet<>(); for (Entry tablePair : sortedTables) { @@ -163,40 +282,68 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { continue; } - // First we quickly check if the two table scan operators can actually be merged - boolean mergeable = areMergeable(pctx, optimizerCache, retainableTsOp, discardableTsOp); - if (!mergeable) { - // Skip - LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp); - continue; - } + SharedResult sr; + if (removeSemijoin) { + // We check if the two table scan operators can actually be merged modulo semijoins. + // Hence, two conditions should be met: + // (i) the TS ops should be mergeable excluding any kind of DPP, and + // (ii) the DPP branches (excluding SJs) should be the same + boolean mergeable = areMergeable(pctx, optimizerCache, retainableTsOp, discardableTsOp); + if (!mergeable) { + // Skip + LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp); + continue; + } + boolean validMerge = areMergeableExcludeSemijoinsExtendedCheck( + pctx, optimizerCache, retainableTsOp, discardableTsOp); + if (!validMerge) { + // Skip + LOG.debug("{} and {} do not meet preconditions", retainableTsOp, discardableTsOp); + continue; + } - // Secondly, we extract information about the part of the tree that can be merged - // as well as some structural information (memory consumption) that needs to be - // used to determined whether the merge can happen - SharedResult sr = extractSharedOptimizationInfoForRoot( - pctx, optimizerCache, retainableTsOp, discardableTsOp); - - // It seems these two operators can be merged. - // Check that plan meets some preconditions before doing it. - // In particular, in the presence of map joins in the upstream plan: - // - we cannot exceed the noconditional task size, and - // - if we already merged the big table, we cannot merge the broadcast - // tables. - if (!validPreConditions(pctx, optimizerCache, sr)) { - // Skip - LOG.debug("{} and {} do not meet preconditions", retainableTsOp, discardableTsOp); - continue; + // If tests pass, we create the shared work optimizer additional information + // about the part of the tree that can be merged. We need to regenerate the + // cache because semijoin operators have been removed + sr = extractSharedOptimizationInfoForRoot( + pctx, optimizerCache, retainableTsOp, discardableTsOp); + } else { + // First we quickly check if the two table scan operators can actually be merged + if (!areMergeable(pctx, optimizerCache, retainableTsOp, discardableTsOp) || + !areMergeableExtendedCheck(pctx, optimizerCache, retainableTsOp, discardableTsOp)) { + // Skip + LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp); + continue; + } + + // Secondly, we extract information about the part of the tree that can be merged + // as well as some structural information (memory consumption) that needs to be + // used to determined whether the merge can happen + sr = extractSharedOptimizationInfoForRoot( + pctx, optimizerCache, retainableTsOp, discardableTsOp); + + // It seems these two operators can be merged. + // Check that plan meets some preconditions before doing it. + // In particular, in the presence of map joins in the upstream plan: + // - we cannot exceed the noconditional task size, and + // - if we already merged the big table, we cannot merge the broadcast + // tables. + if (!validPreConditions(pctx, optimizerCache, sr)) { + // Skip + LOG.debug("{} and {} do not meet preconditions", retainableTsOp, discardableTsOp); + continue; + } } // We can merge + mergedExecuted = true; if (sr.retainableOps.size() > 1) { // More than TS operator Operator lastRetainableOp = sr.retainableOps.get(sr.retainableOps.size() - 1); Operator lastDiscardableOp = sr.discardableOps.get(sr.discardableOps.size() - 1); if (lastDiscardableOp.getNumChild() != 0) { List> allChildren = - Lists.newArrayList(lastDiscardableOp.getChildOperators()); + Lists.newArrayList(lastDiscardableOp.getChildOperators()); for (Operator op : allChildren) { lastDiscardableOp.getChildOperators().remove(op); op.replaceParent(lastDiscardableOp, lastRetainableOp); @@ -233,13 +380,13 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { if (exprNode.getChildren().size() == newChildren.size()) { newChildren.add(tsExprNode); exprNode = ExprNodeGenericFuncDesc.newInstance( - new GenericUDFOPOr(), - newChildren); + new GenericUDFOPOr(), + newChildren); } } else { exprNode = ExprNodeGenericFuncDesc.newInstance( - new GenericUDFOPOr(), - Arrays.asList(exprNode, tsExprNode)); + new GenericUDFOPOr(), + Arrays.asList(exprNode, tsExprNode)); } } } @@ -247,7 +394,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { retainableTsOp.getConf().setFilterExpr(exprNode); // Replace table scan operator List> allChildren = - Lists.newArrayList(discardableTsOp.getChildOperators()); + Lists.newArrayList(discardableTsOp.getChildOperators()); for (Operator op : allChildren) { discardableTsOp.getChildOperators().remove(op); op.replaceParent(discardableTsOp, retainableTsOp); @@ -267,17 +414,17 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { if (op instanceof ReduceSinkOperator) { SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op); if (sjbi != null && !sr.discardableOps.contains(sjbi.getTsOp()) && - !sr.discardableInputOps.contains(sjbi.getTsOp())) { + !sr.discardableInputOps.contains(sjbi.getTsOp())) { GenTezUtils.removeSemiJoinOperator( - pctx, (ReduceSinkOperator) op, sjbi.getTsOp()); + pctx, (ReduceSinkOperator) op, sjbi.getTsOp()); optimizerCache.tableScanToDPPSource.remove(sjbi.getTsOp(), op); } } else if (op instanceof AppMasterEventOperator) { DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf(); if (!sr.discardableOps.contains(dped.getTableScan()) && - !sr.discardableInputOps.contains(dped.getTableScan())) { + !sr.discardableInputOps.contains(dped.getTableScan())) { GenTezUtils.removeSemiJoinOperator( - pctx, (AppMasterEventOperator) op, dped.getTableScan()); + pctx, (AppMasterEventOperator) op, dped.getTableScan()); optimizerCache.tableScanToDPPSource.remove(dped.getTableScan(), op); } } @@ -296,17 +443,17 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // and it means that we have merged filter expressions for it. Thus, we // might need to remove DPP predicates from the retainable TableScanOperator Collection> c = - optimizerCache.tableScanToDPPSource.get((TableScanOperator) op); + optimizerCache.tableScanToDPPSource.get((TableScanOperator) op); for (Operator dppSource : c) { if (dppSource instanceof ReduceSinkOperator) { GenTezUtils.removeSemiJoinOperator(pctx, - (ReduceSinkOperator) dppSource, - (TableScanOperator) sr.retainableOps.get(0)); + (ReduceSinkOperator) dppSource, + (TableScanOperator) sr.retainableOps.get(0)); optimizerCache.tableScanToDPPSource.remove(sr.retainableOps.get(0), op); } else if (dppSource instanceof AppMasterEventOperator) { GenTezUtils.removeSemiJoinOperator(pctx, - (AppMasterEventOperator) dppSource, - (TableScanOperator) sr.retainableOps.get(0)); + (AppMasterEventOperator) dppSource, + (TableScanOperator) sr.retainableOps.get(0)); optimizerCache.tableScanToDPPSource.remove(sr.retainableOps.get(0), op); } } @@ -328,271 +475,179 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } // Remove unused table scan operators - Iterator> it = topOps.entrySet().iterator(); - while (it.hasNext()) { - Entry e = it.next(); - if (e.getValue().getNumChild() == 0) { - it.remove(); - } - } + pctx.getTopOps().entrySet().removeIf( + (Entry e) -> e.getValue().getNumChild() == 0); - if (LOG.isDebugEnabled()) { - LOG.debug("After SharedWorkOptimizer:\n" + Operator.toString(pctx.getTopOps().values())); + return mergedExecuted; + } + + private static void sharedWorkExtendedOptimization(ParseContext pctx, SharedWorkOptimizerCache optimizerCache) + throws SemanticException { + // Gather RS operators that 1) belong to root works, i.e., works containing TS operators, + // and 2) share the same input operator. + // These will be the first target for extended shared work optimization + Multimap, ReduceSinkOperator> parentToRsOps = ArrayListMultimap.create(); + Set> visited = new HashSet<>(); + for (Entry e : pctx.getTopOps().entrySet()) { + gatherReduceSinkOpsByInput(parentToRsOps, visited, + findWorkOperators(optimizerCache, e.getValue())); } - if(pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_EXTENDED_OPTIMIZATION)) { - // Gather RS operators that 1) belong to root works, i.e., works containing TS operators, - // and 2) share the same input operator. - // These will be the first target for extended shared work optimization - Multimap, ReduceSinkOperator> parentToRsOps = ArrayListMultimap.create(); - Set> visited = new HashSet<>(); - for (Entry e : topOps.entrySet()) { - gatherReduceSinkOpsByInput(parentToRsOps, visited, - findWorkOperators(optimizerCache, e.getValue())); - } + Set> removedOps = new HashSet<>(); + while (!parentToRsOps.isEmpty()) { + // As above, we enforce a certain order when we do the reutilization. + // In particular, we use size of data in RS x number of uses. + List, Long>> sortedRSGroups = + rankOpsByAccumulatedSize(parentToRsOps.keySet()); + LOG.debug("Sorted operators by size: {}", sortedRSGroups); + + // Execute extended optimization + // For each RS, check whether other RS in same work could be merge into this one. + // If they are merged, RS operators in the resulting work will be considered + // mergeable in next loop iteration. + Multimap, ReduceSinkOperator> existingRsOps = ArrayListMultimap.create(); + for (Entry, Long> rsGroupInfo : sortedRSGroups) { + Operator rsParent = rsGroupInfo.getKey(); + for (ReduceSinkOperator discardableRsOp : parentToRsOps.get(rsParent)) { + if (removedOps.contains(discardableRsOp)) { + LOG.debug("Skip {} as it has already been removed", discardableRsOp); + continue; + } + Collection otherRsOps = existingRsOps.get(rsParent); + for (ReduceSinkOperator retainableRsOp : otherRsOps) { + if (removedOps.contains(retainableRsOp)) { + LOG.debug("Skip {} as it has already been removed", retainableRsOp); + continue; + } - while (!parentToRsOps.isEmpty()) { - // As above, we enforce a certain order when we do the reutilization. - // In particular, we use size of data in RS x number of uses. - List, Long>> sortedRSGroups = - rankOpsByAccumulatedSize(parentToRsOps.keySet()); - LOG.debug("Sorted operators by size: {}", sortedRSGroups); - - // Execute extended optimization - // For each RS, check whether other RS in same work could be merge into this one. - // If they are merged, RS operators in the resulting work will be considered - // mergeable in next loop iteration. - Multimap, ReduceSinkOperator> existingRsOps = ArrayListMultimap.create(); - for (Entry, Long> rsGroupInfo : sortedRSGroups) { - Operator rsParent = rsGroupInfo.getKey(); - for (ReduceSinkOperator discardableRsOp : parentToRsOps.get(rsParent)) { - if (removedOps.contains(discardableRsOp)) { - LOG.debug("Skip {} as it has already been removed", discardableRsOp); + // First we quickly check if the two RS operators can actually be merged. + // We already know that these two RS operators have the same parent, but + // we need to check whether both RS are actually equal. Further, we check + // whether their child is also equal. If any of these conditions are not + // met, we are not going to try to merge. + boolean mergeable = compareOperator(pctx, retainableRsOp, discardableRsOp) && + compareOperator(pctx, retainableRsOp.getChildOperators().get(0), + discardableRsOp.getChildOperators().get(0)); + if (!mergeable) { + // Skip + LOG.debug("{} and {} cannot be merged", retainableRsOp, discardableRsOp); continue; } - Collection otherRsOps = existingRsOps.get(rsParent); - for (ReduceSinkOperator retainableRsOp : otherRsOps) { - if (removedOps.contains(retainableRsOp)) { - LOG.debug("Skip {} as it has already been removed", retainableRsOp); - continue; - } - // First we quickly check if the two RS operators can actually be merged. - // We already know that these two RS operators have the same parent, but - // we need to check whether both RS are actually equal. Further, we check - // whether their child is also equal. If any of these conditions are not - // met, we are not going to try to merge. - boolean mergeable = compareOperator(pctx, retainableRsOp, discardableRsOp) && - compareOperator(pctx, retainableRsOp.getChildOperators().get(0), - discardableRsOp.getChildOperators().get(0)); - if (!mergeable) { - // Skip - LOG.debug("{} and {} cannot be merged", retainableRsOp, discardableRsOp); - continue; - } + LOG.debug("Checking additional conditions for merging subtree starting at {}" + + " into subtree starting at {}", discardableRsOp, retainableRsOp); + + // Secondly, we extract information about the part of the tree that can be merged + // as well as some structural information (memory consumption) that needs to be + // used to determined whether the merge can happen + Operator retainableRsOpChild = retainableRsOp.getChildOperators().get(0); + Operator discardableRsOpChild = discardableRsOp.getChildOperators().get(0); + SharedResult sr = extractSharedOptimizationInfo( + pctx, optimizerCache, retainableRsOp, discardableRsOp, + retainableRsOpChild, discardableRsOpChild); + + // It seems these two operators can be merged. + // Check that plan meets some preconditions before doing it. + // In particular, in the presence of map joins in the upstream plan: + // - we cannot exceed the noconditional task size, and + // - if we already merged the big table, we cannot merge the broadcast + // tables. + if (sr.retainableOps.isEmpty() || !validPreConditions(pctx, optimizerCache, sr)) { + // Skip + LOG.debug("{} and {} do not meet preconditions", retainableRsOp, discardableRsOp); + continue; + } - LOG.debug("Checking additional conditions for merging subtree starting at {}" - + " into subtree starting at {}", discardableRsOp, retainableRsOp); - - // Secondly, we extract information about the part of the tree that can be merged - // as well as some structural information (memory consumption) that needs to be - // used to determined whether the merge can happen - Operator retainableRsOpChild = retainableRsOp.getChildOperators().get(0); - Operator discardableRsOpChild = discardableRsOp.getChildOperators().get(0); - SharedResult sr = extractSharedOptimizationInfo( - pctx, optimizerCache, retainableRsOp, discardableRsOp, - retainableRsOpChild, discardableRsOpChild); - - // It seems these two operators can be merged. - // Check that plan meets some preconditions before doing it. - // In particular, in the presence of map joins in the upstream plan: - // - we cannot exceed the noconditional task size, and - // - if we already merged the big table, we cannot merge the broadcast - // tables. - if (sr.retainableOps.isEmpty() || !validPreConditions(pctx, optimizerCache, sr)) { - // Skip - LOG.debug("{} and {} do not meet preconditions", retainableRsOp, discardableRsOp); - continue; - } + deduplicateReduceTraits(retainableRsOp.getConf(), discardableRsOp.getConf()); - deduplicateReduceTraits(retainableRsOp.getConf(), discardableRsOp.getConf()); - - // We can merge - Operator lastRetainableOp = sr.retainableOps.get(sr.retainableOps.size() - 1); - Operator lastDiscardableOp = sr.discardableOps.get(sr.discardableOps.size() - 1); - if (lastDiscardableOp.getNumChild() != 0) { - List> allChildren = - Lists.newArrayList(lastDiscardableOp.getChildOperators()); - for (Operator op : allChildren) { - lastDiscardableOp.getChildOperators().remove(op); - op.replaceParent(lastDiscardableOp, lastRetainableOp); - lastRetainableOp.getChildOperators().add(op); - } + // We can merge + Operator lastRetainableOp = sr.retainableOps.get(sr.retainableOps.size() - 1); + Operator lastDiscardableOp = sr.discardableOps.get(sr.discardableOps.size() - 1); + if (lastDiscardableOp.getNumChild() != 0) { + List> allChildren = + Lists.newArrayList(lastDiscardableOp.getChildOperators()); + for (Operator op : allChildren) { + lastDiscardableOp.getChildOperators().remove(op); + op.replaceParent(lastDiscardableOp, lastRetainableOp); + lastRetainableOp.getChildOperators().add(op); } + } - LOG.debug("Merging subtree starting at {} into subtree starting at {}", - discardableRsOp, retainableRsOp); - - // First we remove the input operators of the expression that - // we are going to eliminate - for (Operator op : sr.discardableInputOps) { - OperatorUtils.removeOperator(op); - optimizerCache.removeOp(op); - removedOps.add(op); - // Remove DPP predicates - if (op instanceof ReduceSinkOperator) { - SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op); - if (sjbi != null && !sr.discardableOps.contains(sjbi.getTsOp()) && - !sr.discardableInputOps.contains(sjbi.getTsOp())) { - GenTezUtils.removeSemiJoinOperator( - pctx, (ReduceSinkOperator) op, sjbi.getTsOp()); - optimizerCache.tableScanToDPPSource.remove(sjbi.getTsOp(), op); - } - } else if (op instanceof AppMasterEventOperator) { - DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf(); - if (!sr.discardableOps.contains(dped.getTableScan()) && - !sr.discardableInputOps.contains(dped.getTableScan())) { - GenTezUtils.removeSemiJoinOperator( - pctx, (AppMasterEventOperator) op, dped.getTableScan()); - optimizerCache.tableScanToDPPSource.remove(dped.getTableScan(), op); - } + LOG.debug("Merging subtree starting at {} into subtree starting at {}", + discardableRsOp, retainableRsOp); + + // First we remove the input operators of the expression that + // we are going to eliminate + for (Operator op : sr.discardableInputOps) { + OperatorUtils.removeOperator(op); + optimizerCache.removeOp(op); + removedOps.add(op); + // Remove DPP predicates + if (op instanceof ReduceSinkOperator) { + SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op); + if (sjbi != null && !sr.discardableOps.contains(sjbi.getTsOp()) && + !sr.discardableInputOps.contains(sjbi.getTsOp())) { + GenTezUtils.removeSemiJoinOperator( + pctx, (ReduceSinkOperator) op, sjbi.getTsOp()); + optimizerCache.tableScanToDPPSource.remove(sjbi.getTsOp(), op); + } + } else if (op instanceof AppMasterEventOperator) { + DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf(); + if (!sr.discardableOps.contains(dped.getTableScan()) && + !sr.discardableInputOps.contains(dped.getTableScan())) { + GenTezUtils.removeSemiJoinOperator( + pctx, (AppMasterEventOperator) op, dped.getTableScan()); + optimizerCache.tableScanToDPPSource.remove(dped.getTableScan(), op); } - LOG.debug("Input operator removed: {}", op); - } - // We remove the discardable RS operator - OperatorUtils.removeOperator(discardableRsOp); - optimizerCache.removeOp(discardableRsOp); - removedOps.add(discardableRsOp); - LOG.debug("Operator removed: {}", discardableRsOp); - // Then we merge the operators of the works we are going to merge - optimizerCache.removeOpAndCombineWork(discardableRsOpChild, retainableRsOpChild); - // Finally we remove the rest of the expression from the tree - for (Operator op : sr.discardableOps) { - OperatorUtils.removeOperator(op); - optimizerCache.removeOp(op); - removedOps.add(op); - LOG.debug("Operator removed: {}", op); } - - break; + LOG.debug("Input operator removed: {}", op); } - - if (removedOps.contains(discardableRsOp)) { - // This operator has been removed, remove it from the list of existing operators - existingRsOps.remove(rsParent, discardableRsOp); - } else { - // This operator has not been removed, include it in the list of existing operators - existingRsOps.put(rsParent, discardableRsOp); + // We remove the discardable RS operator + OperatorUtils.removeOperator(discardableRsOp); + optimizerCache.removeOp(discardableRsOp); + removedOps.add(discardableRsOp); + LOG.debug("Operator removed: {}", discardableRsOp); + // Then we merge the operators of the works we are going to merge + optimizerCache.removeOpAndCombineWork(discardableRsOpChild, retainableRsOpChild); + // Finally we remove the rest of the expression from the tree + for (Operator op : sr.discardableOps) { + OperatorUtils.removeOperator(op); + optimizerCache.removeOp(op); + removedOps.add(op); + LOG.debug("Operator removed: {}", op); } - } - } - // We gather the operators that will be used for next iteration of extended optimization - // (if any) - parentToRsOps = ArrayListMultimap.create(); - visited = new HashSet<>(); - for (Entry, ReduceSinkOperator> e : existingRsOps.entries()) { - if (removedOps.contains(e.getValue()) || e.getValue().getNumChild() < 1) { - // If 1) RS has been removed, or 2) it does not have a child (for instance, it is a - // semijoin RS), we can quickly skip this one - continue; + break; } - gatherReduceSinkOpsByInput(parentToRsOps, visited, - findWorkOperators(optimizerCache, e.getValue().getChildOperators().get(0))); - } - } - - // Remove unused table scan operators - it = topOps.entrySet().iterator(); - while (it.hasNext()) { - Entry e = it.next(); - if (e.getValue().getNumChild() == 0) { - it.remove(); - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("After SharedWorkExtendedOptimizer:\n" - + Operator.toString(pctx.getTopOps().values())); - } - } - if(pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_REUSE_MAPJOIN_CACHE)) { - // Try to reuse cache for broadcast side in mapjoin operators that - // share same input. - // First we group together all the mapjoin operators that share same - // reduce sink operator. - final Multimap, MapJoinOperator> parentToMapJoinOperators = - ArrayListMultimap.create(); - final Set> visitedOperators = new HashSet<>(); - for (Entry, Collection>> e : - optimizerCache.operatorToWorkOperators.asMap().entrySet()) { - if (visitedOperators.contains(e.getKey())) { - // Already visited this work, we move on - continue; - } - for (Operator op : e.getValue()) { - if (op instanceof MapJoinOperator) { - MapJoinOperator mapJoinOp = (MapJoinOperator) op; - // Only allowed for mapjoin operator - if (!mapJoinOp.getConf().isBucketMapJoin() && - !mapJoinOp.getConf().isDynamicPartitionHashJoin()) { - parentToMapJoinOperators.put( - obtainBroadcastInput(mapJoinOp).getParentOperators().get(0), mapJoinOp); - } - } - visitedOperators.add(op); - } - } - // For each group, set the cache key accordingly if there is more than one operator - // and input RS operator are equal - for (Collection c : parentToMapJoinOperators.asMap().values()) { - Map rsOpToCacheKey = new HashMap<>(); - for (MapJoinOperator mapJoinOp : c) { - ReduceSinkOperator rsOp = obtainBroadcastInput(mapJoinOp); - String cacheKey = null; - for (Entry e: rsOpToCacheKey.entrySet()) { - if (compareOperator(pctx, rsOp, e.getKey())) { - cacheKey = e.getValue(); - break; - } - } - if (cacheKey == null) { - // Either it is the first map join operator or there was no equivalent RS, - // hence generate cache key - cacheKey = MapJoinDesc.generateCacheKey(mapJoinOp.getOperatorId()); - rsOpToCacheKey.put(rsOp, cacheKey); + if (removedOps.contains(discardableRsOp)) { + // This operator has been removed, remove it from the list of existing operators + existingRsOps.remove(rsParent, discardableRsOp); + } else { + // This operator has not been removed, include it in the list of existing operators + existingRsOps.put(rsParent, discardableRsOp); } - // Set in the conf of the map join operator - mapJoinOp.getConf().setCacheKey(cacheKey); } } - } - // If we are running tests, we are going to verify that the contents of the cache - // correspond with the contents of the plan, and otherwise we fail. - // This check always run when we are running in test mode, independently on whether - // we use the basic or the extended version of the optimizer. - if (pctx.getConf().getBoolVar(ConfVars.HIVE_IN_TEST)) { - Set> visited = new HashSet<>(); - it = topOps.entrySet().iterator(); - while (it.hasNext()) { - Entry e = it.next(); - for (Operator op : OperatorUtils.findOperators(e.getValue(), Operator.class)) { - if (!visited.contains(op)) { - Set> workCachedOps = findWorkOperators(optimizerCache, op); - Set> workPlanOps = findWorkOperators(op, new HashSet<>()); - if (!workCachedOps.equals(workPlanOps)) { - throw new SemanticException("Error in shared work optimizer: operator cache contents " - + "and actual plan differ\nIn cache: " + workCachedOps + "\nIn plan: " + workPlanOps); - } - visited.add(op); - } + // We gather the operators that will be used for next iteration of extended optimization + // (if any) + parentToRsOps = ArrayListMultimap.create(); + visited = new HashSet<>(); + for (Entry, ReduceSinkOperator> e : existingRsOps.entries()) { + if (removedOps.contains(e.getValue()) || e.getValue().getNumChild() < 1) { + // If 1) RS has been removed, or 2) it does not have a child (for instance, it is a + // semijoin RS), we can quickly skip this one + continue; } + gatherReduceSinkOpsByInput(parentToRsOps, visited, + findWorkOperators(optimizerCache, e.getValue().getChildOperators().get(0))); } } - return pctx; + // Remove unused table scan operators + pctx.getTopOps().entrySet().removeIf( + (Entry e) -> e.getValue().getNumChild() == 0); } /** @@ -634,9 +689,9 @@ private static void gatherDPPTableScanOps( LOG.debug("DPP information stored in the cache: {}", optimizerCache.tableScanToDPPSource); } - private static Multimap splitTableScanOpsByTable( + private static ArrayListMultimap splitTableScanOpsByTable( ParseContext pctx) { - Multimap tableNameToOps = ArrayListMultimap.create(); + ArrayListMultimap tableNameToOps = ArrayListMultimap.create(); // Sort by operator ID so we get deterministic results Map sortedTopOps = new TreeMap<>(pctx.getTopOps()); for (Entry e : sortedTopOps.entrySet()) { @@ -750,6 +805,11 @@ private static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache if (!prevTsOpPPList.getPartitions().equals(tsOpPPList.getPartitions())) { return false; } + return true; + } + + private static boolean areMergeableExtendedCheck(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, + TableScanOperator tsOp1, TableScanOperator tsOp2) throws SemanticException { // If is a DPP, check if actually it refers to same target, column, etc. // Further, the DPP value needs to be generated from same subtree List> dppsOp1 = new ArrayList<>(optimizerCache.tableScanToDPPSource.get(tsOp1)); @@ -805,6 +865,117 @@ private static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache return true; } + private static boolean areMergeableExcludeSemijoinsExtendedCheck(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, + TableScanOperator tsOp1, TableScanOperator tsOp2) throws SemanticException { + // We remove RS-based SJs from consideration, then we compare + List> dppsOp1 = new ArrayList<>(optimizerCache.tableScanToDPPSource.get(tsOp1)); + boolean removedDppOp1 = false; + List rsOpsSemijoin1 = new ArrayList<>(); + List> dppsOp2 = new ArrayList<>(optimizerCache.tableScanToDPPSource.get(tsOp2)); + boolean removedDppOp2 = false; + List rsOpsSemijoin2 = new ArrayList<>(); + for (int i = 0; i < dppsOp1.size(); i++) { + Operator op = dppsOp1.get(i); + if (op instanceof ReduceSinkOperator) { + rsOpsSemijoin1.add((ReduceSinkOperator) op); + dppsOp1.remove(i); + removedDppOp1 = true; + } + } + for (int i = 0; i < dppsOp2.size(); i++) { + Operator op = dppsOp2.get(i); + if (op instanceof ReduceSinkOperator) { + rsOpsSemijoin2.add((ReduceSinkOperator) op); + dppsOp2.remove(i); + removedDppOp2 = true; + } + } + if (removedDppOp1 && removedDppOp2) { + // TODO: We do not merge, since currently we only merge when one of the TS operators + // are not targetted by a SJ edge + return false; + } + if (!removedDppOp1 && !removedDppOp2) { + // None of them are targetted by a SJ, we skip them + return false; + } + if (dppsOp1.size() != dppsOp2.size()) { + // We cannot merge, we move to the next couple + return false; + } + // Check if DPP branches are equal + boolean equalBranches = true; + BitSet bs = new BitSet(); + for (int i = 0; i < dppsOp1.size(); i++) { + Operator dppOp1 = dppsOp1.get(i); + for (int j = 0; j < dppsOp2.size(); j++) { + if (!bs.get(j)) { + // If not visited yet + Operator dppOp2 = dppsOp2.get(j); + if (compareAndGatherOps(pctx, dppOp1, dppOp2) != null) { + // The DPP operator/branch are equal + bs.set(j); + break; + } + } + } + if (bs.cardinality() < i + 1) { + // We cannot merge, we move to the next group + equalBranches = false; + break; + } + } + if (!equalBranches) { + // Skip + return false; + } + + // We reached here, other DPP is the same, these two could potentially be merged. + // Hence, we perform the last check. To do this, we remove the SJ operators, + // but we remember their position in the plan. After that, we will reintroduce + // the SJ operator. If the checks were valid, we will merge and remove the semijoin. + // If the rest of tests to merge do not pass, we will abort the shared scan optimization + // and we are done + TableScanOperator targetTSOp; + List semijoinRsOps; + List sjBranches = new ArrayList<>(); + if (removedDppOp1) { + targetTSOp = tsOp1; + semijoinRsOps = rsOpsSemijoin1; + } else { + targetTSOp = tsOp2; + semijoinRsOps = rsOpsSemijoin2; + } + optimizerCache.tableScanToDPPSource.get(targetTSOp).removeAll(semijoinRsOps); + for (ReduceSinkOperator rsOp : semijoinRsOps) { + sjBranches.add(pctx.getRsToSemiJoinBranchInfo().remove(rsOp)); + } + + boolean validMerge = validPreConditions(pctx, optimizerCache, + extractSharedOptimizationInfoForRoot(pctx, optimizerCache, tsOp1, tsOp2)); + + if (validMerge) { + // We are going to merge, hence we remove the semijoins completely + for (ReduceSinkOperator semijoinRsOp : semijoinRsOps) { + Operator branchOp = GenTezUtils.removeBranch(semijoinRsOp); + while (branchOp != null) { + optimizerCache.removeOp(branchOp); + branchOp = branchOp.getNumChild() > 0 ? + branchOp.getChildOperators().get(0) : null; + } + GenTezUtils.removeSemiJoinOperator(pctx, semijoinRsOp, targetTSOp); + } + } else { + // Otherwise, the put the semijoins back in the auxiliary data structures + optimizerCache.tableScanToDPPSource.get(targetTSOp).addAll(semijoinRsOps); + for (int i = 0; i < semijoinRsOps.size(); i++) { + pctx.getRsToSemiJoinBranchInfo().put(semijoinRsOps.get(i), sjBranches.get(i)); + } + } + + return validMerge; + } + private static SharedResult extractSharedOptimizationInfoForRoot(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, TableScanOperator retainableTsOp, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index dd1d6a1924..f8c7e18eb1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -499,7 +499,7 @@ private static void findRoots(Operator op, List> ops) { * Remove an operator branch. When we see a fork, we know it's time to do the removal. * @param event the leaf node of which branch to be removed */ - public static void removeBranch(Operator event) { + public static Operator removeBranch(Operator event) { Operator child = event; Operator curr = event; @@ -509,6 +509,8 @@ public static void removeBranch(Operator event) { } curr.removeChild(child); + + return child; } public static EdgeType determineEdgeType(BaseWork preceedingWork, BaseWork followingWork, ReduceSinkOperator reduceSinkOperator) {