diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java index fc5aea5..9004a44 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.vector.reducesink; import java.io.IOException; +import java.io.Serializable; import java.util.Arrays; import java.util.Properties; @@ -29,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter; import org.apache.hadoop.hive.ql.exec.TerminalOperator; +import org.apache.hadoop.hive.ql.exec.TopNHash; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; @@ -57,11 +59,13 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.hive.common.util.HashCodeUtil; +import com.google.common.base.Preconditions; + /** * This class is common operator class for native vectorized reduce sink. */ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator - implements VectorizationContextRegion { + implements Serializable, TopNHash.BinaryCollector, VectorizationContextRegion { private static final long serialVersionUID = 1L; private static final String CLASS_NAME = VectorReduceSinkCommonOperator.class.getName(); @@ -122,6 +126,9 @@ protected transient HiveKey keyWritable; protected transient BytesWritable valueBytesWritable; + // Picks topN K:V pairs from input. + protected transient TopNHash reducerHash; + // Where to write our key and value pairs. private transient OutputCollector out; @@ -329,10 +336,46 @@ protected void initializeOp(Configuration hconf) throws HiveException { valueBytesWritable = new BytesWritable(); + int limit = conf.getTopN(); + float memUsage = conf.getTopNMemoryUsage(); + + if (limit >= 0 && memUsage > 0) { + reducerHash = new TopNHash(); + reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this, conf, hconf); + } + batchCounter = 0; } - protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException { + // The collect method override for TopNHash.BinaryCollector + @Override + public void collect(byte[] key, byte[] value, int hash) throws IOException { + HiveKey keyWritable = new HiveKey(key, hash); + BytesWritable valueWritable = new BytesWritable(value); + doCollect(keyWritable, valueWritable); + } + + protected void collect(HiveKey keyWritable, BytesWritable valueWritable) + throws HiveException, IOException { + if (reducerHash != null) { + // NOTE: partColsIsNull is only used for PTF. + final int firstIndex = + reducerHash.tryStoreKey(keyWritable, /* partColsIsNull */ false); + + if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do. + + if (firstIndex == TopNHash.FORWARD) { + collect(keyWritable, valueWritable); + } else { + Preconditions.checkState(firstIndex >= 0); + reducerHash.storeValue(firstIndex, keyWritable.hashCode(), valueWritable, false); + } + } else { + doCollect(keyWritable, valueWritable); + } + } + + private void doCollect(HiveKey keyWritable, BytesWritable valueWritable) throws IOException { // Since this is a terminal operator, update counters explicitly - // forward is not called if (null != out) { @@ -360,6 +403,9 @@ protected void collect(BytesWritable keyWritable, Writable valueWritable) throws @Override protected void closeOp(boolean abort) throws HiveException { + if (!abort && reducerHash != null) { + reducerHash.flush(); + } super.closeOp(abort); out = null; if (isLogInfoEnabled) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index bf60c10..e12a00e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -35,8 +35,8 @@ import java.util.Set; import java.util.Stack; import java.util.regex.Pattern; -import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.ArrayUtils; import org.apache.calcite.util.Pair; import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; @@ -3009,7 +3009,10 @@ private boolean canSpecializeReduceSink(ReduceSinkDesc desc, String engine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); - boolean hasTopN = (desc.getTopN() >= 0); + int limit = desc.getTopN(); + float memUsage = desc.getTopNMemoryUsage(); + + boolean hasPTFTopN = (limit >= 0 && memUsage > 0 && desc.isPTFReduceSink()); boolean hasDistinctColumns = (desc.getDistinctColumnIndices().size() > 0); @@ -3186,7 +3189,7 @@ private boolean canSpecializeReduceSink(ReduceSinkDesc desc, // Remember the condition variables for EXPLAIN regardless. vectorDesc.setIsVectorizationReduceSinkNativeEnabled(isVectorizationReduceSinkNativeEnabled); vectorDesc.setEngine(engine); - vectorDesc.setHasTopN(hasTopN); + vectorDesc.setHasPTFTopN(hasPTFTopN); vectorDesc.setHasDistinctColumns(hasDistinctColumns); vectorDesc.setIsKeyBinarySortable(isKeyBinarySortable); vectorDesc.setIsValueLazyBinary(isValueLazyBinary); @@ -3199,7 +3202,7 @@ private boolean canSpecializeReduceSink(ReduceSinkDesc desc, if (!isVectorizationReduceSinkNativeEnabled || !isTezOrSpark || (useUniformHash && (hasEmptyBuckets || hasNoPartitions)) || - hasTopN || + hasPTFTopN || hasDistinctColumns || !isKeyBinarySortable || !isValueLazyBinary || diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 38461d5..c882b4e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -534,8 +534,8 @@ public ReduceSinkOperatorExplainVectorization(ReduceSinkDesc reduceSinkDesc, Vec engineInSupported, engineInSupportedCondName), new VectorizationCondition( - !vectorReduceSinkDesc.getHasTopN(), - "No TopN"), + !vectorReduceSinkDesc.getHasPTFTopN(), + "No PTF TopN"), new VectorizationCondition( !vectorReduceSinkDesc.getHasDistinctColumns(), "No DISTINCT columns"), diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java index d6230af..91d5be7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java @@ -64,7 +64,7 @@ public VectorReduceSinkInfo getVectorReduceSinkInfo() { private boolean isVectorizationReduceSinkNativeEnabled; private String engine; - private boolean hasTopN; + private boolean hasPTFTopN; private boolean hasDistinctColumns; private boolean isKeyBinarySortable; private boolean isValueLazyBinary; @@ -85,11 +85,11 @@ public void setEngine(String engine) { public String getEngine() { return engine; } - public void setHasTopN(boolean hasTopN) { - this.hasTopN = hasTopN; + public void setHasPTFTopN(boolean hasPTFTopN) { + this.hasPTFTopN = hasPTFTopN; } - public boolean getHasTopN() { - return hasTopN; + public boolean getHasPTFTopN() { + return hasPTFTopN; } public void setHasDistinctColumns(boolean hasDistinctColumns) { this.hasDistinctColumns = hasDistinctColumns;