diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index d5006bd..50ca8f1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Lists; import com.google.common.collect.Multimap; public class OperatorUtils { @@ -381,6 +382,23 @@ public static void removeBranch(Operator op) { curr.removeChild(child); } + public static void removeOperator(Operator op) { + if (op.getNumParent() != 0) { + List> allParent = + Lists.newArrayList(op.getParentOperators()); + for (Operator parentOp : allParent) { + parentOp.removeChild(op); + } + } + if (op.getNumChild() != 0) { + List> allChildren = + Lists.newArrayList(op.getChildOperators()); + for (Operator childOp : allChildren) { + childOp.removeParent(op); + } + } + } + public static String getOpNamePretty(Operator op) { if (op instanceof TableScanOperator) { return op.toString() + " (" + ((TableScanOperator) op).getConf().getAlias() + ")"; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java index 1da9164..17e73f9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java @@ -25,6 +25,7 @@ import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.CollectOperator; import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.DemuxOperator; @@ -56,6 +57,8 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator; +import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc; +import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; @@ -100,6 +103,7 @@ comparatorMapping.put(VectorLimitOperator.class, new LimitOperatorComparator()); comparatorMapping.put(ScriptOperator.class, new ScriptOperatorComparator()); comparatorMapping.put(TemporaryHashSinkOperator.class, new HashTableSinkOperatorComparator()); + comparatorMapping.put(AppMasterEventOperator.class, new AppMasterEventOperatorComparator()); // these operators does not have state, so they always equal with the same kind. comparatorMapping.put(UnionOperator.class, new AlwaysTrueOperatorComparator()); comparatorMapping.put(ForwardOperator.class, new AlwaysTrueOperatorComparator()); @@ -288,9 +292,9 @@ public boolean equals(ReduceSinkOperator op1, ReduceSinkOperator op2) { ReduceSinkDesc op1Conf = op1.getConf(); ReduceSinkDesc op2Conf = op2.getConf(); - if (compareExprNodeDescList(op1Conf.getKeyCols(), op2Conf.getKeyCols()) && - compareExprNodeDescList(op1Conf.getValueCols(), op2Conf.getValueCols()) && - compareExprNodeDescList(op1Conf.getPartitionCols(), op2Conf.getPartitionCols()) && + if (compareString(op1Conf.getKeyCols().toString(), op2Conf.getKeyCols().toString()) && + compareString(op1Conf.getValueCols().toString(), op2Conf.getValueCols().toString()) && + compareString(op1Conf.getPartitionCols().toString(), op2Conf.getPartitionCols().toString()) && op1Conf.getTag() == op2Conf.getTag() && compareString(op1Conf.getOrder(), op2Conf.getOrder()) && op1Conf.getTopN() == op2Conf.getTopN() && @@ -516,6 +520,37 @@ public boolean equals(UDTFOperator op1, UDTFOperator op2) { } } + static class AppMasterEventOperatorComparator implements OperatorComparator { + + @Override + public boolean equals(AppMasterEventOperator op1, AppMasterEventOperator op2) { + Preconditions.checkNotNull(op1); + Preconditions.checkNotNull(op2); + AppMasterEventDesc op1Conf = op1.getConf(); + AppMasterEventDesc op2Conf = op2.getConf(); + + if (compareString(op1Conf.getInputName(), op2Conf.getInputName()) && + compareString(op1Conf.getVertexName(), op2Conf.getVertexName()) && + compareObject(op1Conf.getTable(), op2Conf.getTable())) { + if (op1Conf instanceof DynamicPruningEventDesc && op2Conf instanceof DynamicPruningEventDesc) { + DynamicPruningEventDesc op1DPPConf = (DynamicPruningEventDesc) op1Conf; + DynamicPruningEventDesc op2DPPConf = (DynamicPruningEventDesc) op2Conf; + if (compareString(op1DPPConf.getTargetColumnName(), op2DPPConf.getTargetColumnName()) && + compareString(op1DPPConf.getTargetColumnType(), op2DPPConf.getTargetColumnType()) && + compareString(op1DPPConf.getPartKeyString(), op2DPPConf.getPartKeyString())) { + return true; + } + return false; + } else if (op1Conf instanceof DynamicPruningEventDesc || op2Conf instanceof DynamicPruningEventDesc) { + return false; + } + return true; + } else { + return false; + } + } + } + static boolean compareString(String first, String second) { return compareObject(first, second); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java index e31119f..688a4f9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java @@ -20,11 +20,14 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -43,26 +46,35 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.parse.GenTezUtils; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.datanucleus.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; +import com.google.common.collect.TreeMultiset; /** * Shared scan optimizer. This rule finds scan operator over the same table @@ -72,6 +84,9 @@ * | | -> / \ * Op Op Op Op * + * Extension for sharing subplans. Current limitations: 1) does not span + * over multiple stages, and 2) once merged, that is the limit operator. + * *

Currently it only works with the Tez execution engine. */ public class SharedScanOptimizer extends Transform { @@ -87,95 +102,97 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { return pctx; } + if (LOG.isDebugEnabled()) { + LOG.debug("Before SharedScanOptimizer:\n" + Operator.toString(pctx.getTopOps().values())); + } + // Cache to use during optimization SharedScanOptimizerCache optimizerCache = new SharedScanOptimizerCache(); - // We will not apply this optimization on some table scan operators. - Set excludeTableScanOps = gatherNotValidTableScanOps(pctx, optimizerCache); - LOG.debug("Exclude TableScan ops: {}", excludeTableScanOps); + // Gather information about the DPP table scans and store it in the cache + gatherDPPTableScanOps(pctx, optimizerCache); - // Map of dbName.TblName -> Pair(tableAlias, TSOperator) - Multimap> tableNameToOps = splitTableScanOpsByTable(pctx); + // Map of dbName.TblName -> TSOperator + Multimap tableNameToOps = splitTableScanOpsByTable(pctx); // We enforce a certain order when we do the reutilization. // In particular, we use size of table x number of reads to // rank the tables. - List> sortedTables = rankTablesByAccumulatedSize(pctx, excludeTableScanOps); + List> sortedTables = rankTablesByAccumulatedSize(pctx); LOG.debug("Sorted tables by size: {}", sortedTables); // Execute optimization Multimap existingOps = ArrayListMultimap.create(); - Set entriesToRemove = new HashSet<>(); + Set> removedOps = new HashSet<>(); for (Entry tablePair : sortedTables) { - for (Entry tableScanOpPair : tableNameToOps.get(tablePair.getKey())) { - TableScanOperator tsOp = tableScanOpPair.getValue(); - if (excludeTableScanOps.contains(tsOp)) { - // Skip operator, currently we do not merge + String tableName = tablePair.getKey(); + for (TableScanOperator discardableTsOp : tableNameToOps.get(tableName)) { + if (removedOps.contains(discardableTsOp)) { + LOG.debug("Skip {} as it has been already removed", discardableTsOp); continue; } - String tableName = tablePair.getKey(); Collection prevTsOps = existingOps.get(tableName); - if (!prevTsOps.isEmpty()) { - for (TableScanOperator prevTsOp : prevTsOps) { - - // First we check if the two table scan operators can actually be merged - // If schemas do not match, we currently do not merge - List prevTsOpNeededColumns = prevTsOp.getNeededColumns(); - List tsOpNeededColumns = tsOp.getNeededColumns(); - if (prevTsOpNeededColumns.size() != tsOpNeededColumns.size()) { - // Skip - continue; - } - boolean notEqual = false; - for (int i = 0; i < prevTsOpNeededColumns.size(); i++) { - if (!prevTsOpNeededColumns.get(i).equals(tsOpNeededColumns.get(i))) { - notEqual = true; - break; - } - } - if (notEqual) { - // Skip - continue; - } - // If row limit does not match, we currently do not merge - if (prevTsOp.getConf().getRowLimit() != tsOp.getConf().getRowLimit()) { - // Skip - continue; - } - // If partitions do not match, we currently do not merge - PrunedPartitionList prevTsOpPPList = pctx.getPrunedPartitions(prevTsOp); - PrunedPartitionList tsOpPPList = pctx.getPrunedPartitions(tsOp); - if (prevTsOpPPList.hasUnknownPartitions() - || tsOpPPList.hasUnknownPartitions() - || !prevTsOpPPList.getPartitions().equals(tsOpPPList.getPartitions())) { - // Skip - continue; - } + for (TableScanOperator retainableTsOp : prevTsOps) { + if (removedOps.contains(retainableTsOp)) { + LOG.debug("Skip {} as it has been already removed", retainableTsOp); + continue; + } + + // First we quickly check if the two table scan operators can actually be merged + boolean mergeable = areMergeable(pctx, optimizerCache, retainableTsOp, discardableTsOp); + if (!mergeable) { + // Skip + LOG.debug("{} and {} cannot be merged", retainableTsOp, discardableTsOp); + continue; + } + + // Secondly, we extract information about the part of the tree that can be merged + // as well as some structural information (memory consumption) that needs to be + // used to determined whether the merge can happen + SharedResult sr = extractSharedOptimizationInfo( + pctx, optimizerCache, retainableTsOp, discardableTsOp); - // It seems these two operators can be merged. - // Check that plan meets some preconditions before doing it. - // In particular, in the presence of map joins in the upstream plan: - // - we cannot exceed the noconditional task size, and - // - if we already merged the big table, we cannot merge the broadcast - // tables. - if (!validPreConditions(pctx, optimizerCache, prevTsOp, tsOp)) { - // Skip - LOG.debug("{} does not meet preconditions", tsOp); - continue; + // It seems these two operators can be merged. + // Check that plan meets some preconditions before doing it. + // In particular, in the presence of map joins in the upstream plan: + // - we cannot exceed the noconditional task size, and + // - if we already merged the big table, we cannot merge the broadcast + // tables. + if (!validPreConditions(pctx, optimizerCache, sr)) { + // Skip + LOG.debug("{} and {} do not meet preconditions", retainableTsOp, discardableTsOp); + continue; + } + + // We can merge + if (sr.retainableOps.size() > 1) { + // More than TS operator + Operator lastRetainableOp = sr.retainableOps.get(sr.retainableOps.size() - 1); + Operator lastDiscardableOp = sr.discardableOps.get(sr.discardableOps.size() - 1); + if (lastDiscardableOp.getNumChild() != 0) { + List> allChildren = + Lists.newArrayList(lastDiscardableOp.getChildOperators()); + for (Operator op : allChildren) { + lastDiscardableOp.getChildOperators().remove(op); + op.replaceParent(lastDiscardableOp, lastRetainableOp); + lastRetainableOp.getChildOperators().add(op); + } } - // We can merge + LOG.debug("Merging subtree starting at {} into subtree starting at {}", discardableTsOp, retainableTsOp); + } else { + // Only TS operator ExprNodeGenericFuncDesc exprNode = null; - if (prevTsOp.getConf().getFilterExpr() != null) { + if (retainableTsOp.getConf().getFilterExpr() != null) { // Push filter on top of children - pushFilterToTopOfTableScan(optimizerCache, prevTsOp); + pushFilterToTopOfTableScan(optimizerCache, retainableTsOp); // Clone to push to table scan - exprNode = (ExprNodeGenericFuncDesc) prevTsOp.getConf().getFilterExpr(); + exprNode = (ExprNodeGenericFuncDesc) retainableTsOp.getConf().getFilterExpr(); } - if (tsOp.getConf().getFilterExpr() != null) { + if (discardableTsOp.getConf().getFilterExpr() != null) { // Push filter on top - pushFilterToTopOfTableScan(optimizerCache, tsOp); - ExprNodeGenericFuncDesc tsExprNode = tsOp.getConf().getFilterExpr(); + pushFilterToTopOfTableScan(optimizerCache, discardableTsOp); + ExprNodeGenericFuncDesc tsExprNode = discardableTsOp.getConf().getFilterExpr(); if (exprNode != null && !exprNode.isSame(tsExprNode)) { // We merge filters from previous scan by ORing with filters from current scan if (exprNode.getGenericUDF() instanceof GenericUDFOPOr) { @@ -201,52 +218,87 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } } // Replace filter - prevTsOp.getConf().setFilterExpr(exprNode); + retainableTsOp.getConf().setFilterExpr(exprNode); // Replace table scan operator List> allChildren = - Lists.newArrayList(tsOp.getChildOperators()); + Lists.newArrayList(discardableTsOp.getChildOperators()); for (Operator op : allChildren) { - tsOp.getChildOperators().remove(op); - op.replaceParent(tsOp, prevTsOp); - prevTsOp.getChildOperators().add(op); + discardableTsOp.getChildOperators().remove(op); + op.replaceParent(discardableTsOp, retainableTsOp); + retainableTsOp.getChildOperators().add(op); } - entriesToRemove.add(tableScanOpPair.getKey()); - // Remove and combine - optimizerCache.removeOpAndCombineWork(tsOp, prevTsOp); - LOG.debug("Merged {} into {}", tsOp, prevTsOp); + LOG.debug("Merging {} into {}", discardableTsOp, retainableTsOp); + } - break; + // First we remove the input operators of the expression that + // we are going to eliminate + for (Operator op : sr.discardableInputOps) { + OperatorUtils.removeOperator(op); + optimizerCache.removeOp(op); + removedOps.add(op); + // Remove semijoin DPP predicates + SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op); + if (sjbi != null && !sr.discardableOps.contains(sjbi.getTsOp()) && + !sr.discardableInputOps.contains(sjbi.getTsOp())) { + GenTezUtils.removeSemiJoinOperator(pctx, (ReduceSinkOperator) op, sjbi.getTsOp()); + } + // TODO: Remove other DPP predicates + // dppSource instanceof AppMasterEventOperator + LOG.debug("Input operator removed: {}", op); } - if (!entriesToRemove.contains(tableScanOpPair.getKey())) { - existingOps.put(tableName, tsOp); + // Then we merge the operators of the works we are going to merge + optimizerCache.removeOpAndCombineWork(discardableTsOp, retainableTsOp); + removedOps.add(discardableTsOp); + // Finally we remove the expression from the tree + for (Operator op : sr.discardableOps) { + OperatorUtils.removeOperator(op); + optimizerCache.removeOp(op); + removedOps.add(op); + LOG.debug("Operator removed: {}", op); } + + break; + } + + if (removedOps.contains(discardableTsOp)) { + // This operator has been removed, remove it from the list of existing operators + existingOps.remove(tableName, discardableTsOp); } else { - // Add to existing ops - existingOps.put(tableName, tsOp); + // This operator has not been removed, include it in the list of existing operators + existingOps.put(tableName, discardableTsOp); } } } - // Remove unused operators - for (String key : entriesToRemove) { - topOps.remove(key); + + // Remove unused table scan operators + Iterator> it = topOps.entrySet().iterator(); + while (it.hasNext()) { + Entry e = it.next(); + if (e.getValue().getNumChild() == 0) { + it.remove(); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("After SharedScanOptimizer:\n" + Operator.toString(pctx.getTopOps().values())); } return pctx; } - private static Set gatherNotValidTableScanOps( + /** + * This method gathers the TS operators with DPP from the context and + * stores them into the input optimization cache. + */ + private static void gatherDPPTableScanOps( ParseContext pctx, SharedScanOptimizerCache optimizerCache) throws SemanticException { // Find TS operators with partition pruning enabled in plan // because these TS may potentially read different data for // different pipeline. // These can be: // 1) TS with DPP. - // TODO: Check if dynamic filters are identical and do not add. // 2) TS with semijoin DPP. - // TODO: Check for dynamic filters. - Set notValidTableScanOps = new HashSet<>(); - // 1) TS with DPP. Map topOps = pctx.getTopOps(); Collection> tableScanOps = Lists.>newArrayList(topOps.values()); @@ -255,40 +307,32 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { for (AppMasterEventOperator a : s) { if (a.getConf() instanceof DynamicPruningEventDesc) { DynamicPruningEventDesc dped = (DynamicPruningEventDesc) a.getConf(); - notValidTableScanOps.add(dped.getTableScan()); optimizerCache.tableScanToDPPSource.put(dped.getTableScan(), a); } } - // 2) TS with semijoin DPP. for (Entry e : pctx.getRsToSemiJoinBranchInfo().entrySet()) { - notValidTableScanOps.add(e.getValue().getTsOp()); optimizerCache.tableScanToDPPSource.put(e.getValue().getTsOp(), e.getKey()); } - return notValidTableScanOps; + LOG.debug("DPP information stored in the cache: {}", optimizerCache.tableScanToDPPSource); } - private static Multimap> splitTableScanOpsByTable( + private static Multimap splitTableScanOpsByTable( ParseContext pctx) { - Multimap> tableNameToOps = ArrayListMultimap.create(); + Multimap tableNameToOps = ArrayListMultimap.create(); for (Entry e : pctx.getTopOps().entrySet()) { TableScanOperator tsOp = e.getValue(); tableNameToOps.put( tsOp.getConf().getTableMetadata().getDbName() + "." - + tsOp.getConf().getTableMetadata().getTableName(), e); + + tsOp.getConf().getTableMetadata().getTableName(), tsOp); } return tableNameToOps; } - private static List> rankTablesByAccumulatedSize(ParseContext pctx, - Set excludeTables) { + private static List> rankTablesByAccumulatedSize(ParseContext pctx) { Map tableToTotalSize = new HashMap<>(); for (Entry e : pctx.getTopOps().entrySet()) { TableScanOperator tsOp = e.getValue(); - if (excludeTables.contains(tsOp)) { - // Skip operator, currently we do not merge - continue; - } String tableName = tsOp.getConf().getTableMetadata().getDbName() + "." + tsOp.getConf().getTableMetadata().getTableName(); long tableSize = tsOp.getStatistics() != null ? @@ -312,8 +356,369 @@ public int compare(Map.Entry o1, Map.Entry o2) { return sortedTables; } + private static boolean areMergeable(ParseContext pctx, SharedScanOptimizerCache optimizerCache, + TableScanOperator tsOp1, TableScanOperator tsOp2) throws SemanticException { + // First we check if the two table scan operators can actually be merged + // If schemas do not match, we currently do not merge + List prevTsOpNeededColumns = tsOp1.getNeededColumns(); + List tsOpNeededColumns = tsOp2.getNeededColumns(); + if (prevTsOpNeededColumns.size() != tsOpNeededColumns.size()) { + return false; + } + boolean notEqual = false; + for (int i = 0; i < prevTsOpNeededColumns.size(); i++) { + if (!prevTsOpNeededColumns.get(i).equals(tsOpNeededColumns.get(i))) { + notEqual = true; + break; + } + } + if (notEqual) { + return false; + } + // If row limit does not match, we currently do not merge + if (tsOp1.getConf().getRowLimit() != tsOp2.getConf().getRowLimit()) { + return false; + } + // If partitions do not match, we currently do not merge + PrunedPartitionList prevTsOpPPList = pctx.getPrunedPartitions(tsOp1); + PrunedPartitionList tsOpPPList = pctx.getPrunedPartitions(tsOp2); + if (!prevTsOpPPList.getPartitions().equals(tsOpPPList.getPartitions())) { + return false; + } + // If is a DPP, check if actually it refers to same target, column, etc. + // Further, the DPP value needs to be generated from same subtree + List> dppsOp1 = new ArrayList<>(optimizerCache.tableScanToDPPSource.get(tsOp1)); + List> dppsOp2 = new ArrayList<>(optimizerCache.tableScanToDPPSource.get(tsOp2)); + if (dppsOp1.isEmpty() && dppsOp2.isEmpty()) { + return true; + } + for (int i = 0; i < dppsOp1.size(); i++) { + Operator op = dppsOp1.get(i); + if (op instanceof ReduceSinkOperator) { + Set> ascendants = + findAscendantWorkOperators(pctx, optimizerCache, op); + if (ascendants.contains(tsOp2)) { + dppsOp1.remove(i); + i--; + } + } + } + for (int i = 0; i < dppsOp2.size(); i++) { + Operator op = dppsOp2.get(i); + if (op instanceof ReduceSinkOperator) { + Set> ascendants = + findAscendantWorkOperators(pctx, optimizerCache, op); + if (ascendants.contains(tsOp1)) { + dppsOp2.remove(i); + i--; + } + } + } + if (dppsOp1.size() != dppsOp2.size()) { + // Only first or second operator contains DPP pruning + return false; + } + // Check if DPP branches are equal + for (int i = 0; i < dppsOp1.size(); i++) { + Operator dppOp1 = dppsOp1.get(i); + BitSet bs = new BitSet(); + for (int j = 0; j < dppsOp2.size(); j++) { + if (!bs.get(j)) { + // If not visited yet + Operator dppOp2 = dppsOp2.get(j); + if (compareAndGatherOps(dppOp1, dppOp2) != null) { + // The DPP operator/branch are equal + bs.set(j); + break; + } + } + } + if (bs.cardinality() == i) { + return false; + } + } + return true; + } + + private static SharedResult extractSharedOptimizationInfo(ParseContext pctx, + SharedScanOptimizerCache optimizerCache, + TableScanOperator retainableTsOp, + TableScanOperator discardableTsOp) { + Set> retainableOps = new LinkedHashSet<>(); + Set> discardableOps = new LinkedHashSet<>(); + Set> discardableInputOps = new HashSet<>(); + long dataSize = 0l; + long maxDataSize = 0l; + + retainableOps.add(retainableTsOp); + discardableOps.add(discardableTsOp); + Operator equalOp1 = retainableTsOp; + Operator equalOp2 = discardableTsOp; + if (equalOp1.getNumChild() > 1 || equalOp2.getNumChild() > 1) { + // TODO: Support checking multiple child operators to merge further. + discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps)); + return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize); + } + Operator currentOp1 = retainableTsOp.getChildOperators().get(0); + Operator currentOp2 = discardableTsOp.getChildOperators().get(0); + + // Special treatment for Filter operator that ignores the DPP predicates + if (currentOp1 instanceof FilterOperator && currentOp2 instanceof FilterOperator) { + boolean equalFilters = false; + FilterDesc op1Conf = ((FilterOperator) currentOp1).getConf(); + FilterDesc op2Conf = ((FilterOperator) currentOp2).getConf(); + + if (op1Conf.getIsSamplingPred() == op2Conf.getIsSamplingPred() && + StringUtils.areStringsEqual(op1Conf.getSampleDescExpr(), op2Conf.getSampleDescExpr())) { + Multiset conjsOp1String = extractConjsIgnoringDPPPreds(op1Conf.getPredicate()); + Multiset conjsOp2String = extractConjsIgnoringDPPPreds(op2Conf.getPredicate()); + if (conjsOp1String.equals(conjsOp2String)) { + equalFilters = true; + } + } + + if (equalFilters) { + equalOp1 = currentOp1; + equalOp2 = currentOp2; + retainableOps.add(equalOp1); + discardableOps.add(equalOp2); + if (currentOp1.getChildOperators().size() > 1 || + currentOp2.getChildOperators().size() > 1) { + // TODO: Support checking multiple child operators to merge further. + discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableInputOps)); + discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps)); + discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps, discardableInputOps)); + return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize); + } + currentOp1 = currentOp1.getChildOperators().get(0); + currentOp2 = currentOp2.getChildOperators().get(0); + } else { + // Bail out + discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableInputOps)); + discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps)); + discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps, discardableInputOps)); + return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize); + } + } + + // Try to merge rest of operators + while (!(currentOp1 instanceof ReduceSinkOperator)) { + // Check whether current operators are equal + if (!compareOperator(currentOp1, currentOp2)) { + // If they are not equal, we could zip up till here + break; + } + if (currentOp1.getParentOperators().size() != + currentOp2.getParentOperators().size()) { + // If they are not equal, we could zip up till here + break; + } + if (currentOp1.getParentOperators().size() > 1) { + List> discardableOpsForCurrentOp = new ArrayList<>(); + int idx = 0; + for (; idx < currentOp1.getParentOperators().size(); idx++) { + Operator parentOp1 = currentOp1.getParentOperators().get(idx); + Operator parentOp2 = currentOp2.getParentOperators().get(idx); + if (parentOp1 == equalOp1 && parentOp2 == equalOp2) { + continue; + } + if ((parentOp1 == equalOp1 && parentOp2 != equalOp2) || + (parentOp1 != equalOp1 && parentOp2 == equalOp2)) { + // Input operator is not in the same position + break; + } + // Compare input + List> removeOpsForCurrentInput = compareAndGatherOps(parentOp1, parentOp2); + if (removeOpsForCurrentInput == null) { + // Inputs are not the same, bail out + break; + } + // Add inputs to ops to remove + discardableOpsForCurrentOp.addAll(removeOpsForCurrentInput); + } + if (idx != currentOp1.getParentOperators().size()) { + // If inputs are not equal, we could zip up till here + break; + } + discardableInputOps.addAll(discardableOpsForCurrentOp); + } + + equalOp1 = currentOp1; + equalOp2 = currentOp2; + retainableOps.add(equalOp1); + discardableOps.add(equalOp2); + if (equalOp1 instanceof MapJoinOperator) { + MapJoinOperator mop = (MapJoinOperator) equalOp1; + dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); + maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + } + if (currentOp1.getChildOperators().size() > 1 || + currentOp2.getChildOperators().size() > 1) { + // TODO: Support checking multiple child operators to merge further. + break; + } + // Update for next iteration + currentOp1 = currentOp1.getChildOperators().get(0); + currentOp2 = currentOp2.getChildOperators().get(0); + } + + // Add the rest to the memory consumption + Set> opsWork1 = findWorkOperators(optimizerCache, currentOp1); + for (Operator op : opsWork1) { + if (op instanceof MapJoinOperator && !retainableOps.contains(op)) { + MapJoinOperator mop = (MapJoinOperator) op; + dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); + maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + } + } + Set> opsWork2 = findWorkOperators(optimizerCache, currentOp2); + for (Operator op : opsWork2) { + if (op instanceof MapJoinOperator && !discardableOps.contains(op)) { + MapJoinOperator mop = (MapJoinOperator) op; + dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); + maxDataSize = mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize(); + } + } + + discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableInputOps)); + discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps)); + discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps, discardableInputOps)); + return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize); + } + + private static Multiset extractConjsIgnoringDPPPreds(ExprNodeDesc predicate) { + List conjsOp = ExprNodeDescUtils.split(predicate); + Multiset conjsOpString = TreeMultiset.create(); + for (int i = 0; i < conjsOp.size(); i++) { + if (conjsOp.get(i) instanceof ExprNodeGenericFuncDesc) { + ExprNodeGenericFuncDesc func = (ExprNodeGenericFuncDesc) conjsOp.get(i); + if (GenericUDFInBloomFilter.class == func.getGenericUDF().getClass()) { + continue; + } else if (GenericUDFBetween.class == func.getGenericUDF().getClass() && + (func.getChildren().get(2) instanceof ExprNodeDynamicValueDesc || + func.getChildren().get(3) instanceof ExprNodeDynamicValueDesc)) { + continue; + } + } + conjsOpString.add(conjsOp.get(i).toString()); + } + return conjsOpString; + } + + private static Set> gatherDPPBranchOps(ParseContext pctx, + SharedScanOptimizerCache optimizerCache, Set> ops) { + Set> dppBranches = new HashSet<>(); + for (Operator op : ops) { + if (op instanceof TableScanOperator) { + Collection> c = optimizerCache.tableScanToDPPSource.get((TableScanOperator) op); + for (Operator dppSource : c) { + // Remove the branches + Operator currentOp = dppSource; + while (currentOp.getNumChild() <= 1) { + dppBranches.add(currentOp); + currentOp = currentOp.getParentOperators().get(0); + } + } + } + } + return dppBranches; + } + + private static Set> gatherDPPBranchOps(ParseContext pctx, + SharedScanOptimizerCache optimizerCache, Set> ops, + Set> discardedOps) { + Set> dppBranches = new HashSet<>(); + for (Operator op : ops) { + if (op instanceof TableScanOperator) { + Collection> c = optimizerCache.tableScanToDPPSource.get((TableScanOperator) op); + for (Operator dppSource : c) { + Set> ascendants = + findAscendantWorkOperators(pctx, optimizerCache, dppSource); + if (!Collections.disjoint(ascendants, discardedOps)) { + // Remove branch + Operator currentOp = dppSource; + while (currentOp.getNumChild() <= 1) { + dppBranches.add(currentOp); + currentOp = currentOp.getParentOperators().get(0); + } + } + } + } + } + return dppBranches; + } + + private static List> compareAndGatherOps(Operator op1, Operator op2) { + List> result = new ArrayList<>(); + boolean mergeable = compareAndGatherOps(op1, op2, result, true); + if (!mergeable) { + return null; + } + return result; + } + + private static boolean compareAndGatherOps(Operator op1, Operator op2, + List> result, boolean gather) { + if (!compareOperator(op1, op2)) { + LOG.debug("Operators not equal: {} and {}", op1, op2); + return false; + } + + if (gather) { + result.add(op2); + } + + List> op1ParentOperators = op1.getParentOperators(); + List> op2ParentOperators = op2.getParentOperators(); + if (op1ParentOperators != null && op2ParentOperators != null) { + if (op1ParentOperators.size() != op2ParentOperators.size()) { + return false; + } + for (int i = 0; i < op1ParentOperators.size(); i++) { + Operator op1ParentOp = op1ParentOperators.get(i); + Operator op2ParentOp = op2ParentOperators.get(i); + boolean mergeable; + if (gather && op2ParentOp.getChildOperators().size() < 2) { + mergeable = compareAndGatherOps(op1ParentOp, op2ParentOp, result, true); + } else { + mergeable = compareAndGatherOps(op1ParentOp, op2ParentOp, result, false); + } + if (!mergeable) { + return false; + } + } + } else if (op1ParentOperators != null || op2ParentOperators != null) { + return false; + } + + return true; + } + + private static boolean compareOperator(Operator op1, Operator op2) { + if (!op1.getClass().getName().equals(op2.getClass().getName())) { + return false; + } + + OperatorComparatorFactory.OperatorComparator operatorComparator = + OperatorComparatorFactory.getOperatorComparator(op1.getClass()); + return operatorComparator.equals(op1, op2); + } + private static boolean validPreConditions(ParseContext pctx, SharedScanOptimizerCache optimizerCache, - TableScanOperator prevTsOp, TableScanOperator tsOp) { + SharedResult sr) { + + // We check whether merging the works would cause the size of + // the data in memory grow too large. + // TODO: Currently ignores GBY and PTF which may also buffer data in memory. + if (sr.dataSize > sr.maxDataSize) { + // Size surpasses limit, we cannot convert + LOG.debug("accumulated data size: {} / max size: {}", sr.dataSize, sr.maxDataSize); + return false; + } + + TableScanOperator tsOp1 = (TableScanOperator) sr.retainableOps.get(0); + TableScanOperator tsOp2 = (TableScanOperator) sr.discardableOps.get(0); + // 1) The set of operators in the works of the TS operators need to meet // some requirements. In particular: // 1.1. None of the works that contain the TS operators can contain a Union @@ -324,8 +729,8 @@ private static boolean validPreConditions(ParseContext pctx, SharedScanOptimizer // MergeJoinProc that needs to be further explored. // If any of these conditions are not met, we cannot merge. // TODO: Extend rule so it can be applied for these cases. - final Set> workOps1 = findWorkOperators(optimizerCache, prevTsOp); - final Set> workOps2 = findWorkOperators(optimizerCache, tsOp); + final Set> workOps1 = findWorkOperators(optimizerCache, tsOp1); + final Set> workOps2 = findWorkOperators(optimizerCache, tsOp2); boolean foundDummyStoreOp = false; for (Operator op : workOps1) { if (op instanceof UnionOperator) { @@ -355,8 +760,8 @@ private static boolean validPreConditions(ParseContext pctx, SharedScanOptimizer // If we do, we cannot merge. The reason is that Tez currently does // not support parallel edges, i.e., multiple edges from same work x // into same work y. - final Set> outputWorksOps1 = findChildWorkOperators(pctx, optimizerCache, prevTsOp); - final Set> outputWorksOps2 = findChildWorkOperators(pctx, optimizerCache, tsOp); + final Set> outputWorksOps1 = findChildWorkOperators(pctx, optimizerCache, tsOp1); + final Set> outputWorksOps2 = findChildWorkOperators(pctx, optimizerCache, tsOp2); if (!Collections.disjoint(outputWorksOps1, outputWorksOps2)) { // We cannot merge return false; @@ -369,8 +774,9 @@ private static boolean validPreConditions(ParseContext pctx, SharedScanOptimizer // // If we do, we cannot merge. The reason is the same as above, currently // Tez currently does not support parallel edges. - final Set> inputWorksOps1 = findParentWorkOperators(pctx, optimizerCache, prevTsOp); - final Set> inputWorksOps2 = findParentWorkOperators(pctx, optimizerCache, tsOp); + final Set> inputWorksOps1 = findParentWorkOperators(pctx, optimizerCache, tsOp1); + final Set> inputWorksOps2 = + findParentWorkOperators(pctx, optimizerCache, tsOp2, sr.discardableInputOps); if (!Collections.disjoint(inputWorksOps1, inputWorksOps2)) { // We cannot merge return false; @@ -386,36 +792,24 @@ private static boolean validPreConditions(ParseContext pctx, SharedScanOptimizer // // If we do, we cannot merge, as we would end up with a cycle in the DAG. final Set> descendantWorksOps1 = - findDescendantWorkOperators(pctx, optimizerCache, prevTsOp); + findDescendantWorkOperators(pctx, optimizerCache, tsOp1, sr.discardableInputOps); final Set> descendantWorksOps2 = - findDescendantWorkOperators(pctx, optimizerCache, tsOp); + findDescendantWorkOperators(pctx, optimizerCache, tsOp2, sr.discardableInputOps); if (!Collections.disjoint(descendantWorksOps1, workOps2) || !Collections.disjoint(workOps1, descendantWorksOps2)) { return false; } - // 5) We check whether merging the works would cause the size of - // the data in memory grow too large. - // TODO: Currently ignores GBY and PTF which may also buffer data in memory. - final Set> newWorkOps = workOps1; - newWorkOps.addAll(workOps2); - long dataSize = 0L; - for (Operator op : newWorkOps) { - if (op instanceof MapJoinOperator) { - MapJoinOperator mop = (MapJoinOperator) op; - dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize()); - if (dataSize > mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize()) { - // Size surpasses limit, we cannot convert - LOG.debug("accumulated data size: {} / max size: {}", - dataSize, mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize()); - return false; - } - } - } return true; } private static Set> findParentWorkOperators(ParseContext pctx, SharedScanOptimizerCache optimizerCache, Operator start) { + return findParentWorkOperators(pctx, optimizerCache, start, ImmutableSet.of()); + } + + private static Set> findParentWorkOperators(ParseContext pctx, + SharedScanOptimizerCache optimizerCache, Operator start, + Set> excludeOps) { // Find operators in work Set> workOps = findWorkOperators(optimizerCache, start); // Gather input works operators @@ -423,20 +817,51 @@ private static boolean validPreConditions(ParseContext pctx, SharedScanOptimizer for (Operator op : workOps) { if (op.getParentOperators() != null) { for (Operator parent : op.getParentOperators()) { - if (parent instanceof ReduceSinkOperator) { + if (parent instanceof ReduceSinkOperator && !excludeOps.contains(parent)) { set.addAll(findWorkOperators(optimizerCache, parent)); } } } else if (op instanceof TableScanOperator) { // Check for DPP and semijoin DPP for (Operator parent : optimizerCache.tableScanToDPPSource.get((TableScanOperator) op)) { - set.addAll(findWorkOperators(optimizerCache, parent)); + if (!excludeOps.contains(parent)) { + set.addAll(findWorkOperators(optimizerCache, parent)); + } } } } return set; } + private static Set> findAscendantWorkOperators(ParseContext pctx, + SharedScanOptimizerCache optimizerCache, Operator start) { + // Find operators in work + Set> workOps = findWorkOperators(optimizerCache, start); + // Gather input works operators + Set> result = new HashSet>(); + Set> set; + while (!workOps.isEmpty()) { + set = new HashSet>(); + for (Operator op : workOps) { + if (op.getParentOperators() != null) { + for (Operator parent : op.getParentOperators()) { + if (parent instanceof ReduceSinkOperator) { + set.addAll(findWorkOperators(optimizerCache, parent)); + } + } + } else if (op instanceof TableScanOperator) { + // Check for DPP and semijoin DPP + for (Operator parent : optimizerCache.tableScanToDPPSource.get((TableScanOperator) op)) { + set.addAll(findWorkOperators(optimizerCache, parent)); + } + } + } + workOps = set; + result.addAll(set); + } + return result; + } + private static Set> findChildWorkOperators(ParseContext pctx, SharedScanOptimizerCache optimizerCache, Operator start) { // Find operators in work @@ -469,6 +894,12 @@ private static boolean validPreConditions(ParseContext pctx, SharedScanOptimizer private static Set> findDescendantWorkOperators(ParseContext pctx, SharedScanOptimizerCache optimizerCache, Operator start) { + return findDescendantWorkOperators(pctx, optimizerCache, start, ImmutableSet.of()); + } + + private static Set> findDescendantWorkOperators(ParseContext pctx, + SharedScanOptimizerCache optimizerCache, Operator start, + Set> excludeOps) { // Find operators in work Set> workOps = findWorkOperators(optimizerCache, start); // Gather output works operators @@ -477,6 +908,9 @@ private static boolean validPreConditions(ParseContext pctx, SharedScanOptimizer while (!workOps.isEmpty()) { set = new HashSet>(); for (Operator op : workOps) { + if (excludeOps.contains(op)) { + continue; + } if (op instanceof ReduceSinkOperator) { if (op.getChildOperators() != null) { // All children of RS are descendants @@ -583,6 +1017,23 @@ private static void pushFilterToTopOfTableScan( } } + private static class SharedResult { + final List> retainableOps; + final List> discardableOps; + final Set> discardableInputOps; + final long dataSize; + final long maxDataSize; + + private SharedResult(Collection> retainableOps, Collection> discardableOps, + Set> discardableInputOps, long dataSize, long maxDataSize) { + this.retainableOps = ImmutableList.copyOf(retainableOps); + this.discardableOps = ImmutableList.copyOf(discardableOps); + this.discardableInputOps = ImmutableSet.copyOf(discardableInputOps); + this.dataSize = dataSize; + this.maxDataSize = maxDataSize; + } + } + /** Cache to accelerate optimization */ private static class SharedScanOptimizerCache { // Operators that belong to each work @@ -604,6 +1055,19 @@ void putIfWorkExists(Operator opToAdd, Operator existingOp) { } } + // Remove operator + void removeOp(Operator opToRemove) { + Set> s = operatorToWorkOperators.get(opToRemove); + s.remove(opToRemove); + List> c1 = ImmutableList.copyOf(s); + if (!c1.isEmpty()) { + for (Operator op1 : c1) { + operatorToWorkOperators.remove(op1, opToRemove); // Remove operator + } + operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator + } + } + // Remove operator and combine void removeOpAndCombineWork(Operator opToRemove, Operator replacementOp) { Set> s = operatorToWorkOperators.get(opToRemove);