diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java index bc8afbf..282805d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java @@ -18,20 +18,19 @@ package org.apache.hadoop.hive.ql.optimizer.physical; +import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.Set; import java.util.Stack; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.LimitOperator; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -45,6 +44,8 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This optimizer attempts following two optimizations: @@ -54,27 +55,32 @@ */ public class NullScanOptimizer implements PhysicalPlanResolver { - private static final Logger LOG = LoggerFactory.getLogger(NullScanOptimizer.class.getName()); - @Override - public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + private static final Logger LOG = + LoggerFactory.getLogger(NullScanOptimizer.class); - Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%.*" + - FilterOperator.getOperatorName() + "%"), new WhereFalseProcessor()); + @Override + public PhysicalContext resolve(PhysicalContext pctx) + throws SemanticException { + Map opRules = new LinkedHashMap<>(); + opRules.put( + new RuleRegExp("R1", + TableScanOperator.getOperatorName() + "%.*" + + FilterOperator.getOperatorName() + "%"), + new WhereFalseProcessor()); Dispatcher disp = new NullScanTaskDispatcher(pctx, opRules); GraphWalker ogw = new DefaultGraphWalker(disp); - ArrayList topNodes = new ArrayList(); - topNodes.addAll(pctx.getRootTasks()); + List topNodes = new ArrayList<>(pctx.getRootTasks()); ogw.startWalking(topNodes, null); opRules.clear(); - opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName()+ "%"),new TSMarker()); - opRules.put(new RuleRegExp("R2", LimitOperator.getOperatorName()+ "%"), new Limit0Processor()); + opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%"), + new TSMarker()); + opRules.put(new RuleRegExp("R2", LimitOperator.getOperatorName() + "%"), + new Limit0Processor()); disp = new NullScanTaskDispatcher(pctx, opRules); ogw = new DefaultGraphWalker(disp); - topNodes = new ArrayList(); - topNodes.addAll(pctx.getRootTasks()); + topNodes = new ArrayList<>(pctx.getRootTasks()); ogw.startWalking(topNodes, null); return pctx; } @@ -82,30 +88,28 @@ public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { //We need to make sure that Null Operator (LIM or FIL) is present in all branches of multi-insert query before //applying the optimization. This method does full tree traversal starting from TS and will return true only if //it finds target Null operator on each branch. - static private boolean isNullOpPresentInAllBranches(TableScanOperator ts, Node causeOfNullNode) { - Node curNode = null; - List curChd = null; - LinkedList middleNodes = new LinkedList(); - middleNodes.addLast(ts); + private static boolean isNullOpPresentInAllBranches(TableScanOperator ts, Node causeOfNullNode) { + Queue middleNodes = new ArrayDeque<>(); + middleNodes.add(ts); while (!middleNodes.isEmpty()) { - curNode = middleNodes.remove(); - curChd = curNode.getChildren(); + Node curNode = middleNodes.remove(); + List curChd = curNode.getChildren(); for (Node chd: curChd) { - if (chd.getChildren() == null || chd.getChildren().isEmpty() || chd == causeOfNullNode) { - if (chd != causeOfNullNode) { // If there is an end node that not the limit0/wherefalse.. + List children = chd.getChildren(); + if (CollectionUtils.isEmpty(children) || chd == causeOfNullNode) { + // If there is an end node that not the limit0/wherefalse.. + if (chd != causeOfNullNode) { return false; } - } - else { - middleNodes.addLast(chd); + } else { + middleNodes.add(chd); } } - } return true; } - static private class WhereFalseProcessor implements NodeProcessor { + private static class WhereFalseProcessor implements NodeProcessor { @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, @@ -126,7 +130,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (op instanceof TableScanOperator) { if (isNullOpPresentInAllBranches((TableScanOperator)op, filter)) { ctx.setMayBeMetadataOnly((TableScanOperator)op); - LOG.info("Found where false TableScan. " + op); + LOG.debug("Found where false TableScan. {}", op); } } } @@ -135,32 +139,33 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } } - static private class Limit0Processor implements NodeProcessor { + private static class Limit0Processor implements NodeProcessor { @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { LimitOperator limitOp = (LimitOperator)nd; - if(!(limitOp.getConf().getLimit() == 0)) { + if (!(limitOp.getConf().getLimit() == 0)) { return null; } - HashSet tsOps = ((WalkerCtx)procCtx).getMayBeMetadataOnlyTableScans(); + Set tsOps = + ((WalkerCtx) procCtx).getMayBeMetadataOnlyTableScans(); if (tsOps != null) { for (Iterator tsOp = tsOps.iterator(); tsOp.hasNext();) { - if (!isNullOpPresentInAllBranches(tsOp.next(),limitOp)) + if (!isNullOpPresentInAllBranches(tsOp.next(), limitOp)) { tsOp.remove(); + } } } - LOG.info("Found Limit 0 TableScan. " + nd); + LOG.debug("Found Limit 0 TableScan. {}", nd); ((WalkerCtx)procCtx).convertMetadataOnly(); return null; } - } - static private class TSMarker implements NodeProcessor { + private static class TSMarker implements NodeProcessor { @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java index 6c0e71d..ec9813d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java @@ -19,33 +19,31 @@ package org.apache.hadoop.hive.ql.optimizer.physical; import java.io.IOException; - -import org.apache.hadoop.hive.common.StringInternUtils; -import org.apache.hadoop.hive.ql.exec.Utilities; - -import org.apache.hadoop.hive.ql.io.ZeroRowsInputFormat; - import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.Stack; +import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.NullScanFileSystem; import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat; +import org.apache.hadoop.hive.ql.io.ZeroRowsInputFormat; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; @@ -61,39 +59,44 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.NullStructSerDe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Iterate over all tasks one by one and removes all input paths from task if conditions as - * defined in rules match. + * Iterate over all tasks one by one and removes all input paths from task if + * conditions as defined in rules match. */ public class NullScanTaskDispatcher implements Dispatcher { - static final Logger LOG = LoggerFactory.getLogger(NullScanTaskDispatcher.class.getName()); + static final Logger LOG = + LoggerFactory.getLogger(NullScanTaskDispatcher.class); private final PhysicalContext physicalContext; private final Map rules; - public NullScanTaskDispatcher(PhysicalContext context, Map rules) { + public NullScanTaskDispatcher(PhysicalContext context, + Map rules) { super(); - physicalContext = context; + this.physicalContext = context; this.rules = rules; } private String getAliasForTableScanOperator(MapWork work, TableScanOperator tso) { - - for (Map.Entry> entry : - work.getAliasToWork().entrySet()) { + for (Map.Entry> entry : work + .getAliasToWork().entrySet()) { if (entry.getValue() == tso) { return entry.getKey(); } } - return null; } - private PartitionDesc changePartitionToMetadataOnly(PartitionDesc desc, Path path) { - if (desc == null) return null; + private PartitionDesc changePartitionToMetadataOnly(PartitionDesc desc, + Path path) { + if (desc == null) { + return null; + } boolean isEmpty = false; try { isEmpty = Utilities.isEmptyPath(physicalContext.getConf(), path); @@ -104,25 +107,23 @@ private PartitionDesc changePartitionToMetadataOnly(PartitionDesc desc, Path pat isEmpty ? ZeroRowsInputFormat.class : OneNullRowInputFormat.class); desc.setOutputFileFormatClass(HiveIgnoreKeyTextOutputFormat.class); desc.getProperties().setProperty(serdeConstants.SERIALIZATION_LIB, - NullStructSerDe.class.getName()); + NullStructSerDe.class.getName()); return desc; } - private void processAlias(MapWork work, Path path, ArrayList aliasesAffected, - ArrayList aliases) { + private void processAlias(MapWork work, Path path, + Collection aliasesAffected, Set aliases) { // the aliases that are allowed to map to a null scan. - ArrayList allowed = new ArrayList(); - for (String alias : aliasesAffected) { - if (aliases.contains(alias)) { - allowed.add(alias); - } - } - if (allowed.size() > 0) { + Collection allowed = aliasesAffected.stream() + .filter(a -> aliases.contains(a)).collect(Collectors.toList()); + if (!allowed.isEmpty()) { PartitionDesc partDesc = work.getPathToPartitionInfo().get(path).clone(); - PartitionDesc newPartition = changePartitionToMetadataOnly(partDesc, path); + PartitionDesc newPartition = + changePartitionToMetadataOnly(partDesc, path); // Prefix partition with something to avoid it being a hidden file. - Path fakePath = new Path(NullScanFileSystem.getBase() + newPartition.getTableName() - + "/part" + encode(newPartition.getPartSpec())); + Path fakePath = + new Path(NullScanFileSystem.getBase() + newPartition.getTableName() + + "/part" + encode(newPartition.getPartSpec())); StringInternUtils.internUriStringsInPath(fakePath); work.addPathToPartitionInfo(fakePath, newPartition); work.addPathToAlias(fakePath, new ArrayList<>(allowed)); @@ -134,12 +135,11 @@ private void processAlias(MapWork work, Path path, ArrayList aliasesAffe } } - private void processAlias(MapWork work, HashSet tableScans) { - ArrayList aliases = new ArrayList(); + private void processAlias(MapWork work, Set tableScans) { + Set aliases = new HashSet<>(); for (TableScanOperator tso : tableScans) { // use LinkedHashMap> - // getAliasToWork() - // should not apply this for non-native table + // getAliasToWork() should not apply this for non-native table if (tso.getConf().getTableMetadata().getStorageHandler() != null) { continue; } @@ -148,10 +148,10 @@ private void processAlias(MapWork work, HashSet tableScans) { tso.getConf().setIsMetadataOnly(true); } // group path alias according to work - LinkedHashMap> candidates = new LinkedHashMap<>(); + Map> candidates = new HashMap<>(); for (Path path : work.getPaths()) { ArrayList aliasesAffected = work.getPathToAliases().get(path); - if (aliasesAffected != null && aliasesAffected.size() > 0) { + if (CollectionUtils.isNotEmpty(aliasesAffected)) { candidates.put(path, aliasesAffected); } } @@ -183,10 +183,10 @@ public int compare(MapWork o1, MapWork o2) { }); for (MapWork mapWork : mapWorks) { - LOG.debug("Looking at: "+mapWork.getName()); - Collection> topOperators - = mapWork.getAliasToWork().values(); - if (topOperators.size() == 0) { + LOG.debug("Looking at: {}", mapWork.getName()); + Collection> topOperators = + mapWork.getAliasToWork().values(); + if (topOperators.isEmpty()) { LOG.debug("No top operators"); return null; } @@ -199,11 +199,11 @@ public int compare(MapWork o1, MapWork o2) { GraphWalker ogw = new PreOrderOnceWalker(disp); // Create a list of topOp nodes - ArrayList topNodes = new ArrayList(); + ArrayList topNodes = new ArrayList<>(); // Get the top Nodes for this task - for (Operator - workOperator : topOperators) { - if (parseContext.getTopOps().values().contains(workOperator)) { + Collection topOps = parseContext.getTopOps().values(); + for (Operator workOperator : topOperators) { + if (topOps.contains(workOperator)) { topNodes.add(workOperator); } } @@ -215,10 +215,11 @@ public int compare(MapWork o1, MapWork o2) { ogw.startWalking(topNodes, null); - LOG.debug(String.format("Found %d null table scans", - walkerCtx.getMetadataOnlyTableScans().size())); - if (walkerCtx.getMetadataOnlyTableScans().size() > 0) + int scanTableSize = walkerCtx.getMetadataOnlyTableScans().size(); + LOG.debug("Found {} null table scans", scanTableSize); + if (scanTableSize > 0) { processAlias(mapWork, walkerCtx.getMetadataOnlyTableScans()); + } } return null; }