diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e3ee06ab5fa..a84b6b967ad 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2384,6 +2384,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "would change the query plan to take care of it, and hive.optimize.skewjoin will be a no-op."), HIVE_OPTIMIZE_TOPNKEY("hive.optimize.topnkey", true, "Whether to enable top n key optimizer."), + HIVE_MAX_TOPN_ALLOWED("hive.optimize.topnkey.max", 128, "Maximum topN value allowed by top n key optimizer.\n" + + "If the LIMIT is greater than this value then top n key optimization won't be used."), HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true, "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java index 4998766f064..42f3fac1227 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java @@ -17,38 +17,68 @@ */ package org.apache.hadoop.hive.ql.exec; +import static java.util.Arrays.binarySearch; + +import java.util.Arrays; import java.util.Comparator; -import java.util.PriorityQueue; + +import org.apache.commons.lang.builder.ToStringBuilder; /** * Implementation of filtering out keys. * An instance of this class is wrapped in {@link TopNKeyOperator} and * {@link org.apache.hadoop.hive.ql.exec.vector.VectorTopNKeyOperator} - * @param - Type of {@link KeyWrapper}. Each key is stored in a KeyWrapper instance. */ -public class TopNKeyFilter { - private final PriorityQueue priorityQueue; +public final class TopNKeyFilter { private final int topN; + private Comparator comparator; + private KeyWrapper[] sortedTopItems; + private int size = 0; + private long repeated = 0; + private long added = 0; + private long total = 0; - public TopNKeyFilter(int topN, Comparator comparator) { - // We need a reversed comparator because the PriorityQueue.poll() method is used for filtering out keys. - // Ex.: When ORDER BY key1 ASC then call of poll() should remove the largest key. - this.priorityQueue = new PriorityQueue<>(topN + 1, comparator.reversed()); + public TopNKeyFilter(int topN, Comparator comparator) { + this.comparator = comparator; + this.sortedTopItems = new KeyWrapper[topN +1]; this.topN = topN; } - public boolean canForward(T kw) { - if (!priorityQueue.contains(kw)) { - priorityQueue.offer((T) kw.copyKey()); + public final boolean canForward(KeyWrapper kw) { + total++; + int pos = binarySearch(sortedTopItems, 0, size, kw, (Comparator) comparator); + if (pos >= 0) { // found + repeated++; + return true; } - if (priorityQueue.size() > topN) { - priorityQueue.poll(); + pos = -pos -1; // not found, calculate insertion point + if (pos >= topN) { // would be inserted to the end, there are topN elements which are smaller/larger + return false; } - - return priorityQueue.contains(kw); + System.arraycopy(sortedTopItems, pos, sortedTopItems, pos +1, size - pos); // make space by shifting + sortedTopItems[pos] = kw.copyKey(); + added++; + if (size < topN) { + size++; + } + return true; } public void clear() { - priorityQueue.clear(); + this.size = 0; + this.repeated = 0; + this.added = 0; + this.total = 0; + Arrays.fill(sortedTopItems, null); + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("topN", topN) + .append("repeated", repeated) + .append("added", added) + .append("total", total) + .toString(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java index 0ccaeea1da5..f7499b60328 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java @@ -18,6 +18,13 @@ package org.apache.hadoop.hive.ql.exec; +import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -27,13 +34,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import java.io.Serializable; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY; - /** * TopNKeyOperator passes rows that contains top N keys only. */ @@ -41,7 +41,7 @@ private static final long serialVersionUID = 1L; - private transient Map> topNKeyFilters; + private transient Map topNKeyFilters; private transient KeyWrapper partitionKeyWrapper; private transient KeyWrapper keyWrapper; @@ -108,9 +108,9 @@ public void process(Object row, int tag) throws HiveException { partitionKeyWrapper.getNewKey(row, inputObjInspectors[tag]); partitionKeyWrapper.setHashKey(); - TopNKeyFilter topNKeyFilter = topNKeyFilters.get(partitionKeyWrapper); + TopNKeyFilter topNKeyFilter = topNKeyFilters.get(partitionKeyWrapper); if (topNKeyFilter == null) { - topNKeyFilter = new TopNKeyFilter<>(conf.getTopN(), keyWrapperComparator); + topNKeyFilter = new TopNKeyFilter(conf.getTopN(), keyWrapperComparator); topNKeyFilters.put(partitionKeyWrapper.copyKey(), topNKeyFilter); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java index 5faa038c18d..f03d65030dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java @@ -46,7 +46,7 @@ // Batch processing private transient int[] temporarySelected; private transient VectorHashKeyWrapperBatch keyWrappersBatch; - private transient TopNKeyFilter topNKeyFilter; + private transient TopNKeyFilter topNKeyFilter; public VectorTopNKeyOperator(CompilationOpContext ctx, OperatorDesc conf, VectorizationContext vContext, VectorDesc vectorDesc) { @@ -80,7 +80,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE]; keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); - this.topNKeyFilter = new TopNKeyFilter<>(conf.getTopN(), keyWrappersBatch.getComparator( + this.topNKeyFilter = new TopNKeyFilter(conf.getTopN(), keyWrappersBatch.getComparator( conf.getColumnSortOrder(), conf.getNullOrder())); } @@ -169,6 +169,7 @@ public OperatorType getType() { @Override protected void closeOp(boolean abort) throws HiveException { + LOG.info("Closing TopNKeyFilter: {}.", topNKeyFilter); topNKeyFilter.clear(); super.closeOp(abort); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBatch.java index 0786c82b7be..b487480b938 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBatch.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBatch.java @@ -1084,7 +1084,9 @@ public int getVariableSize(int batchSize) { comparator.addColumnComparator( i, columnTypeSpecificIndex, columnVectorType, columnSortOrder.charAt(i), nullOrder.charAt(i)); } - + if (comparator.getComparators().size() == 1) { // don't use the composite comparator for n=1 + return comparator.getComparators().get(0); + } return comparator; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneralComparator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneralComparator.java index 8cb48473785..06ac661028f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneralComparator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneralComparator.java @@ -133,4 +133,8 @@ public int compare(VectorHashKeyWrapperBase o1, VectorHashKeyWrapperBase o2) { } return 0; } + + public List getComparators() { + return comparators; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java index a9ff6b4a830..8ad52d0934f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java @@ -43,8 +43,10 @@ */ public class TopNKeyProcessor implements NodeProcessor { private static final Logger LOG = LoggerFactory.getLogger(TopNKeyProcessor.class); + private final int maxTopNAllowed; - public TopNKeyProcessor() { + public TopNKeyProcessor(int maxTopNAllowed) { + this.maxTopNAllowed = maxTopNAllowed; } @Override @@ -60,6 +62,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } + if (reduceSinkDesc.getTopN() > maxTopNAllowed) { + return null; + } + // Check whether there already is a top n key operator Operator parentOperator = reduceSinkOperator.getParentOperators().get(0); if (parentOperator instanceof TopNKeyOperator) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index ff815434f0c..832440da209 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -1290,7 +1290,7 @@ private static void runTopNKeyOptimization(OptimizeTezProcContext procCtx) Map opRules = new LinkedHashMap(); opRules.put( new RuleRegExp("Top n key optimization", ReduceSinkOperator.getOperatorName() + "%"), - new TopNKeyProcessor()); + new TopNKeyProcessor(HiveConf.getIntVar(procCtx.conf, HiveConf.ConfVars.HIVE_MAX_TOPN_ALLOWED))); opRules.put( new RuleRegExp("Top n key pushdown", TopNKeyOperator.getOperatorName() + "%"), new TopNKeyPushdownProcessor());