diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e7724f9084f..96367a6074d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2384,6 +2384,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "would change the query plan to take care of it, and hive.optimize.skewjoin will be a no-op."), HIVE_OPTIMIZE_TOPNKEY("hive.optimize.topnkey", true, "Whether to enable top n key optimizer."), + HIVE_MAX_TOPN_ALLOWED("hive.optimize.topnkey.max", 128, "Maximum topN value allowed by topNkey optimizer."), HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true, "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java index 4998766f064..c43733ade91 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyFilter.java @@ -17,38 +17,47 @@ */ package org.apache.hadoop.hive.ql.exec; +import static java.util.Arrays.binarySearch; + +import java.util.Arrays; import java.util.Comparator; -import java.util.PriorityQueue; /** * Implementation of filtering out keys. * An instance of this class is wrapped in {@link TopNKeyOperator} and * {@link org.apache.hadoop.hive.ql.exec.vector.VectorTopNKeyOperator} - * @param - Type of {@link KeyWrapper}. Each key is stored in a KeyWrapper instance. */ -public class TopNKeyFilter { - private final PriorityQueue priorityQueue; +public class TopNKeyFilter { private final int topN; + private Comparator comparator; + private KeyWrapper[] sortedTopItems; + private int size = 0; - public TopNKeyFilter(int topN, Comparator comparator) { - // We need a reversed comparator because the PriorityQueue.poll() method is used for filtering out keys. - // Ex.: When ORDER BY key1 ASC then call of poll() should remove the largest key. - this.priorityQueue = new PriorityQueue<>(topN + 1, comparator.reversed()); + public TopNKeyFilter(int topN, Comparator comparator) { + this.comparator = comparator; + this.sortedTopItems = new KeyWrapper[topN +1]; this.topN = topN; } - public boolean canForward(T kw) { - if (!priorityQueue.contains(kw)) { - priorityQueue.offer((T) kw.copyKey()); + public boolean canForward(KeyWrapper kw) { + int pos = binarySearch(sortedTopItems, 0, size, kw, (Comparator) comparator); + if (pos >= 0) { // found + return true; } - if (priorityQueue.size() > topN) { - priorityQueue.poll(); + pos = -pos -1; // not found, calculate insertion point + if (pos >= topN) { // would be inserted to the end, there are topN elements which are smaller/larger + return false; } - - return priorityQueue.contains(kw); + System.arraycopy(sortedTopItems, pos, sortedTopItems, pos +1, size - pos); // make space by shifting + sortedTopItems[pos] = kw.copyKey(); + if (size < topN) { + size++; + } + return true; } public void clear() { - priorityQueue.clear(); + this.size = 0; + Arrays.fill(sortedTopItems, null); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java index b7c12502204..254760244c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java @@ -38,7 +38,7 @@ private static final long serialVersionUID = 1L; - private transient TopNKeyFilter topNKeyFilter; + private transient TopNKeyFilter topNKeyFilter; private transient KeyWrapper keyWrapper; @@ -75,7 +75,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { ObjectInspectorUtils.ObjectInspectorCopyOption.WRITABLE); } - this.topNKeyFilter = new TopNKeyFilter<>(conf.getTopN(), new KeyWrapperComparator( + this.topNKeyFilter = new TopNKeyFilter(conf.getTopN(), new KeyWrapperComparator( keyObjectInspectors, currentKeyObjectInspectors, columnSortOrder, nullSortOrder)); KeyWrapperFactory keyWrapperFactory = new KeyWrapperFactory(keyFields, keyObjectInspectors, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java index 5faa038c18d..2deef0e3ae0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java @@ -46,7 +46,7 @@ // Batch processing private transient int[] temporarySelected; private transient VectorHashKeyWrapperBatch keyWrappersBatch; - private transient TopNKeyFilter topNKeyFilter; + private transient TopNKeyFilter topNKeyFilter; public VectorTopNKeyOperator(CompilationOpContext ctx, OperatorDesc conf, VectorizationContext vContext, VectorDesc vectorDesc) { @@ -80,7 +80,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE]; keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); - this.topNKeyFilter = new TopNKeyFilter<>(conf.getTopN(), keyWrappersBatch.getComparator( + this.topNKeyFilter = new TopNKeyFilter(conf.getTopN(), keyWrappersBatch.getComparator( conf.getColumnSortOrder(), conf.getNullOrder())); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java index ce6efa49192..f62fb252442 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java @@ -41,8 +41,10 @@ */ public class TopNKeyProcessor implements NodeProcessor { private static final Logger LOG = LoggerFactory.getLogger(TopNKeyProcessor.class); + private final int maxTopNAllowed; - public TopNKeyProcessor() { + public TopNKeyProcessor(int maxTopNAllowed) { + this.maxTopNAllowed = maxTopNAllowed; } @Override @@ -58,6 +60,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } + if (reduceSinkDesc.getTopN() > maxTopNAllowed) { + return null; + } + // Currently, per partitioning top n key is not supported // in TopNKey operator if (reduceSinkDesc.isPTFReduceSink()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index ff815434f0c..832440da209 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -1290,7 +1290,7 @@ private static void runTopNKeyOptimization(OptimizeTezProcContext procCtx) Map opRules = new LinkedHashMap(); opRules.put( new RuleRegExp("Top n key optimization", ReduceSinkOperator.getOperatorName() + "%"), - new TopNKeyProcessor()); + new TopNKeyProcessor(HiveConf.getIntVar(procCtx.conf, HiveConf.ConfVars.HIVE_MAX_TOPN_ALLOWED))); opRules.put( new RuleRegExp("Top n key pushdown", TopNKeyOperator.getOperatorName() + "%"), new TopNKeyPushdownProcessor());