diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java index 3349fc0..22ccf3d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; @@ -141,6 +142,14 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // Skip continue; } + // If partitions do not match, we currently do not merge + PrunedPartitionList prevTsOpPPList = pctx.getPrunedPartitions(prevTsOp); + PrunedPartitionList tsOpPPList = pctx.getPrunedPartitions(tsOp); + if (!prevTsOpPPList.getPartitions().equals(tsOpPPList.getPartitions()) + || prevTsOpPPList.hasUnknownPartitions() != tsOpPPList.hasUnknownPartitions()) { + // Skip + continue; + } // It seems these two operators can be merged. // Check that plan meets some preconditions before doing it. @@ -226,27 +235,18 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { } private static Set gatherNotValidTableScanOps( - ParseContext pctx, SharedScanOptimizerCache optimizerCache) { + ParseContext pctx, SharedScanOptimizerCache optimizerCache) throws SemanticException { // Find TS operators with partition pruning enabled in plan // because these TS may potentially read different data for // different pipeline. // These can be: - // 1) TS with static partitioning. - // TODO: Check partition list of different TS and do not add if they are identical - // 2) TS with DPP. + // 1) TS with DPP. // TODO: Check if dynamic filters are identical and do not add. - // 3) TS with semijoin DPP. + // 2) TS with semijoin DPP. // TODO: Check for dynamic filters. Set notValidTableScanOps = new HashSet<>(); - // 1) TS with static partitioning. + // 1) TS with DPP. Map topOps = pctx.getTopOps(); - for (TableScanOperator tsOp : topOps.values()) { - if (tsOp.getConf().getPartColumns() != null && - !tsOp.getConf().getPartColumns().isEmpty()) { - notValidTableScanOps.add(tsOp); - } - } - // 2) TS with DPP. Collection> tableScanOps = Lists.>newArrayList(topOps.values()); Set s = @@ -258,7 +258,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { optimizerCache.tableScanToDPPSource.put(dped.getTableScan(), a); } } - // 3) TS with semijoin DPP. + // 2) TS with semijoin DPP. for (Entry e : pctx.getRsToSemiJoinBranchInfo().entrySet()) { notValidTableScanOps.add(e.getValue().getTsOp());