diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 74bb863..cbed26d 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1670,6 +1670,9 @@ HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", false, "This flag should be set to true to enable vectorized mode of query execution.\n" + "The default value is false."), + HIVE_VECTORIZATION_REDUCE_ENABLED("hive.vectorized.execution.reduce.enabled", true, + "This flag should be set to true to enable vectorized mode of the reduce-side of query execution.\n" + + "The default value is true."), HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL("hive.vectorized.groupby.checkinterval", 100000, "Number of entries added to the group by aggregation hash before a recomputation of average entry size is performed."), HIVE_VECTORIZATION_GROUPBY_MAXENTRIES("hive.vectorized.groupby.maxentries", 1000000, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java new file mode 100644 index 0000000..d9c16dc --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +/** + * Class to keep information on a set of typed vector columns. Used by + * other classes to efficiently access the set of columns. + */ +public class VectorColumnSetInfo { + + // For simpler access, we make these members protected instead of + // providing get methods. + + /** + * indices of LONG primitive keys. + */ + protected int[] longIndices; + + /** + * indices of DOUBLE primitive keys. + */ + protected int[] doubleIndices; + + /** + * indices of string (byte[]) primitive keys. + */ + protected int[] stringIndices; + + /** + * indices of decimal primitive keys. + */ + protected int[] decimalIndices; + + /** + * Helper class for looking up a key value based on key index. + */ + public class KeyLookupHelper { + public int longIndex; + public int doubleIndex; + public int stringIndex; + public int decimalIndex; + + private static final int INDEX_UNUSED = -1; + + private void resetIndices() { + this.longIndex = this.doubleIndex = this.stringIndex = this.decimalIndex = INDEX_UNUSED; + } + public void setLong(int index) { + resetIndices(); + this.longIndex= index; + } + + public void setDouble(int index) { + resetIndices(); + this.doubleIndex = index; + } + + public void setString(int index) { + resetIndices(); + this.stringIndex = index; + } + + public void setDecimal(int index) { + resetIndices(); + this.decimalIndex = index; + } + } + + /** + * Lookup vector to map from key index to primitive type index. + */ + protected KeyLookupHelper[] indexLookup; + + private int keyCount; + private int addIndex; + + protected int longIndicesIndex; + protected int doubleIndicesIndex; + protected int stringIndicesIndex; + protected int decimalIndicesIndex; + + protected VectorColumnSetInfo(int keyCount) { + this.keyCount = keyCount; + this.addIndex = 0; + + // We'll over allocate and then shrink the array for each type + longIndices = new int[this.keyCount]; + longIndicesIndex = 0; + doubleIndices = new int[this.keyCount]; + doubleIndicesIndex = 0; + stringIndices = new int[this.keyCount]; + stringIndicesIndex = 0; + decimalIndices = new int[this.keyCount]; + decimalIndicesIndex = 0; + indexLookup = new KeyLookupHelper[this.keyCount]; + } + + protected void addKey(String outputType) throws HiveException { + indexLookup[addIndex] = new KeyLookupHelper(); + if (VectorizationContext.isIntFamily(outputType) || + VectorizationContext.isDatetimeFamily(outputType)) { + longIndices[longIndicesIndex] = addIndex; + indexLookup[addIndex].setLong(longIndicesIndex); + ++longIndicesIndex; + } else if (VectorizationContext.isFloatFamily(outputType)) { + doubleIndices[doubleIndicesIndex] = addIndex; + indexLookup[addIndex].setDouble(doubleIndicesIndex); + ++doubleIndicesIndex; + } else if (VectorizationContext.isStringFamily(outputType)) { + stringIndices[stringIndicesIndex]= addIndex; + indexLookup[addIndex].setString(stringIndicesIndex); + ++stringIndicesIndex; + } else if (VectorizationContext.isDecimalFamily(outputType)) { + decimalIndices[decimalIndicesIndex]= addIndex; + indexLookup[addIndex].setDecimal(decimalIndicesIndex); + ++decimalIndicesIndex; + } + else { + throw new HiveException("Unsuported vector output type: " + outputType); + } + addIndex++; + } + + protected void finishAdding() { + longIndices = Arrays.copyOf(longIndices, longIndicesIndex); + doubleIndices = Arrays.copyOf(doubleIndices, doubleIndicesIndex); + stringIndices = Arrays.copyOf(stringIndices, stringIndicesIndex); + decimalIndices = Arrays.copyOf(decimalIndices, decimalIndicesIndex); + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 07d7b8b..5b4fb63 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -32,8 +32,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.KeyWrapper; +import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; @@ -47,13 +50,17 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.DataOutputBuffer; /** * Vectorized GROUP BY operator implementation. Consumes the vectorized input and * stores the aggregate operators' intermediate states. Emits row mode output. * */ -public class VectorGroupByOperator extends GroupByOperator { +public class VectorGroupByOperator extends GroupByOperator implements VectorizationContextRegion { private static final Log LOG = LogFactory.getLog( VectorGroupByOperator.class.getName()); @@ -70,6 +77,17 @@ */ private VectorExpression[] keyExpressions; + private boolean isVectorOutput; + + // Create a new outgoing vectorization context because column name map will change. + private VectorizationContext vOutContext = null; + + private String fileKey; + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + private transient VectorExpressionWriter[] keyOutputWriters; /** @@ -85,11 +103,18 @@ private transient Object[] forwardCache; + private transient VectorizedRowBatch outputBatch; + private transient VectorizedRowBatchCtx vrbCtx; + + private transient VectorColumnAssign[] vectorColumnAssign; + /** - * Interface for processing mode: global, hash or streaming + * Interface for processing mode: global, hash, unsorted streaming, or group batch */ private static interface IProcessingMode { public void initialize(Configuration hconf) throws HiveException; + public void startGroup() throws HiveException; + public void endGroup() throws HiveException; public void processBatch(VectorizedRowBatch batch) throws HiveException; public void close(boolean aborted) throws HiveException; } @@ -98,6 +123,15 @@ * Base class for all processing modes */ private abstract class ProcessingModeBase implements IProcessingMode { + + // Overridden and used in sorted reduce group batch processing mode. + public void startGroup() throws HiveException { + // Do nothing. + } + public void endGroup() throws HiveException { + // Do nothing. + } + /** * Evaluates the aggregators on the current batch. * The aggregationBatchInfo must have been prepared @@ -170,7 +204,7 @@ public void processBatch(VectorizedRowBatch batch) throws HiveException { @Override public void close(boolean aborted) throws HiveException { if (!aborted) { - flushSingleRow(null, aggregationBuffers); + writeSingleRow(null, aggregationBuffers); } } } @@ -426,7 +460,7 @@ private void flush(boolean all) throws HiveException { while(iter.hasNext()) { Map.Entry pair = iter.next(); - flushSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue()); + writeSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue()); if (!all) { iter.remove(); @@ -501,20 +535,21 @@ private void checkHashModeEfficiency() throws HiveException { if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) { flush(true); - changeToStreamingMode(); + changeToUnsortedStreamingMode(); } } } } /** - * Streaming processing mode. Intermediate values are flushed each time key changes. - * In this mode we're relying on the MR shuffle and merge the intermediates in the reduce. + * Unsorted streaming processing mode. Each input VectorizedRowBatch may have + * a mix of different keys (hence unsorted). Intermediate values are flushed + * each time key changes. */ - private class ProcessingModeStreaming extends ProcessingModeBase { + private class ProcessingModeUnsortedStreaming extends ProcessingModeBase { /** - * The aggreagation buffers used in streaming mode + * The aggregation buffers used in streaming mode */ private VectorAggregationBufferRow currentStreamingAggregators; @@ -557,7 +592,7 @@ public void free(VectorAggregationBufferRow t) { // Nothing to do } }); - LOG.info("using streaming aggregation processing mode"); + LOG.info("using unsorted streaming aggregation processing mode"); } @Override @@ -601,7 +636,7 @@ public void processBatch(VectorizedRowBatch batch) throws HiveException { // Now flush/forward all keys/rows, except the last (current) one for (int i = 0; i < flushMark; ++i) { - flushSingleRow(keysToFlush[i], rowsToFlush[i]); + writeSingleRow(keysToFlush[i], rowsToFlush[i]); rowsToFlush[i].reset(); streamAggregationBufferRowPool.putInPool(rowsToFlush[i]); } @@ -610,7 +645,79 @@ public void processBatch(VectorizedRowBatch batch) throws HiveException { @Override public void close(boolean aborted) throws HiveException { if (!aborted && null != streamingKey) { - flushSingleRow(streamingKey, currentStreamingAggregators); + writeSingleRow(streamingKey, currentStreamingAggregators); + } + } + } + + /** + * Sorted reduce group batch processing mode. Each input VectorizedRowBatch will have the + * same key. On endGroup (or close), the intermediate values are flushed. + */ + private class ProcessingModeGroupBatches extends ProcessingModeBase { + + private boolean inGroup; + private boolean first; + + /** + * The group vector key helper. + */ + VectorGroupKeyHelper groupKeyHelper; + + /** + * The group vector aggregation buffers. + */ + private VectorAggregationBufferRow groupAggregators; + + /** + * Buffer to hold string values. + */ + private DataOutputBuffer buffer; + + @Override + public void initialize(Configuration hconf) throws HiveException { + inGroup = false; + groupKeyHelper = new VectorGroupKeyHelper(keyExpressions.length); + groupKeyHelper.init(keyExpressions); + groupAggregators = allocateAggregationBuffer(); + buffer = new DataOutputBuffer(); + LOG.info("using sorted group batch aggregation processing mode"); + } + + @Override + public void startGroup() throws HiveException { + inGroup = true; + first = true; + } + + @Override + public void endGroup() throws HiveException { + if (inGroup && !first) { + writeGroupRow(groupAggregators, buffer); + groupAggregators.reset(); + } + inGroup = false; + } + + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { + assert(inGroup); + if (first) { + // Copy the group key to output batch now. We'll copy in the aggregates at the end of the group. + first = false; + groupKeyHelper.copyGroupKey(batch, outputBatch, buffer); + } + + // Aggregate this batch. + for (int i = 0; i < aggregators.length; ++i) { + aggregators[i].aggregateInput(groupAggregators.getAggregationBuffer(i), batch); + } + } + + @Override + public void close(boolean aborted) throws HiveException { + if (!aborted && inGroup && !first) { + writeGroupRow(groupAggregators, buffer); } } } @@ -635,6 +742,18 @@ public VectorGroupByOperator(VectorizationContext vContext, OperatorDesc conf) AggregationDesc aggDesc = aggrDesc.get(i); aggregators[i] = vContext.getAggregatorExpression(aggDesc); } + + isVectorOutput = desc.getVectorDesc().isVectorOutput(); + + List outColNames = desc.getOutputColumnNames(); + Map mapOutCols = new HashMap(outColNames.size()); + int outColIndex = 0; + for(String outCol: outColNames) { + mapOutCols.put(outCol, outColIndex++); + } + vOutContext = new VectorizationContext(mapOutCols, outColIndex); + vOutContext.setFileKey(vContext.getFileKey() + "/_GROUPBY_"); + fileKey = vOutContext.getFileKey(); } public VectorGroupByOperator() { @@ -662,13 +781,23 @@ protected void initializeOp(Configuration hconf) throws HiveException { objectInspectors.add(aggregators[i].getOutputObjectInspector()); } - keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); - aggregationBatchInfo = new VectorAggregationBufferBatch(); - aggregationBatchInfo.compileAggregationBatchInfo(aggregators); - + if (!conf.getVectorDesc().isVectorGroupBatches()) { + // These data structures are only used by the map-side processing modes. + keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); + aggregationBatchInfo = new VectorAggregationBufferBatch(); + aggregationBatchInfo.compileAggregationBatchInfo(aggregators); + } + LOG.warn("VectorGroupByOperator is vector output " + isVectorOutput); List outputFieldNames = conf.getOutputColumnNames(); outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector( outputFieldNames, objectInspectors); + if (isVectorOutput) { + vrbCtx = new VectorizedRowBatchCtx(); + vrbCtx.init(hconf, fileKey, (StructObjectInspector) outputObjInspector); + outputBatch = vrbCtx.createVectorizedRowBatch(); + vectorColumnAssign = VectorColumnAssignFactory.buildAssigners( + outputBatch, outputObjInspector, vOutContext.getColumnMap(), conf.getOutputColumnNames()); + } } catch (HiveException he) { throw he; @@ -678,32 +807,43 @@ protected void initializeOp(Configuration hconf) throws HiveException { initializeChildren(hconf); - forwardCache =new Object[keyExpressions.length + aggregators.length]; + forwardCache = new Object[keyExpressions.length + aggregators.length]; - if (keyExpressions.length == 0) { + if (conf.getVectorDesc().isVectorGroupBatches()) { + // Sorted GroupBy of vector batches where an individual batch has the same group key (e.g. reduce). + processingMode = this.new ProcessingModeGroupBatches(); + } else if (keyExpressions.length == 0) { processingMode = this.new ProcessingModeGlobalAggregate(); - } - else { - //TODO: consider if parent can offer order guarantees - // If input is sorted, is more efficient to use the streaming mode + } else { + // We start in hash mode and may dynamically switch to unsorted stream mode. processingMode = this.new ProcessingModeHashAggregate(); } processingMode.initialize(hconf); } /** - * changes the processing mode to streaming + * changes the processing mode to unsorted streaming * This is done at the request of the hash agg mode, if the number of keys * exceeds the minReductionHashAggr factor * @throws HiveException */ - private void changeToStreamingMode() throws HiveException { - processingMode = this.new ProcessingModeStreaming(); + private void changeToUnsortedStreamingMode() throws HiveException { + processingMode = this.new ProcessingModeUnsortedStreaming(); processingMode.initialize(null); LOG.trace("switched to streaming mode"); } @Override + public void startGroup() throws HiveException { + processingMode.startGroup(); + } + + @Override + public void endGroup() throws HiveException { + processingMode.endGroup(); + } + + @Override public void processOp(Object row, int tag) throws HiveException { VectorizedRowBatch batch = (VectorizedRowBatch) row; @@ -719,26 +859,72 @@ public void processOp(Object row, int tag) throws HiveException { * @param agg * @throws HiveException */ - private void flushSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg) + private void writeSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg) throws HiveException { int fi = 0; - for (int i = 0; i < keyExpressions.length; ++i) { - forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue ( - kw, i, keyOutputWriters[i]); + if (!isVectorOutput) { + // Output row. + for (int i = 0; i < keyExpressions.length; ++i) { + forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue ( + kw, i, keyOutputWriters[i]); + } + for (int i = 0; i < aggregators.length; ++i) { + forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)); + } + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("forwarding keys: %s: %s", + kw, Arrays.toString(forwardCache))); + } + forward(forwardCache, outputObjInspector); + } else { + // Output keys and aggregates into the output batch. + for (int i = 0; i < keyExpressions.length; ++i) { + vectorColumnAssign[fi++].assignObjectValue(keyWrappersBatch.getWritableKeyValue ( + kw, i, keyOutputWriters[i]), outputBatch.size); + } + for (int i = 0; i < aggregators.length; ++i) { + vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput( + agg.getAggregationBuffer(i)), outputBatch.size); + } + ++outputBatch.size; + if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { + flushOutput(); + } } + } + + /** + * Emits a (reduce) group row, made from the key (copied in at the beginning of the group) and + * the row aggregation buffers values + * @param agg + * @param buffer + * @throws HiveException + */ + private void writeGroupRow(VectorAggregationBufferRow agg, DataOutputBuffer buffer) + throws HiveException { + int fi = keyExpressions.length; // Start after group keys. for (int i = 0; i < aggregators.length; ++i) { - forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)); + vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput( + agg.getAggregationBuffer(i)), outputBatch.size); } - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("forwarding keys: %s: %s", - kw, Arrays.toString(forwardCache))); + ++outputBatch.size; + if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { + flushOutput(); + buffer.reset(); } - forward(forwardCache, outputObjInspector); + } + + private void flushOutput() throws HiveException { + forward(outputBatch, null); + outputBatch.reset(); } @Override public void closeOp(boolean aborted) throws HiveException { processingMode.close(aborted); + if (!aborted && isVectorOutput && outputBatch.size > 0) { + flushOutput(); + } } static public String getOperatorName() { @@ -761,4 +947,8 @@ public void setAggregators(VectorAggregateExpression[] aggregators) { this.aggregators = aggregators; } + @Override + public VectorizationContext getOuputVectorizationContext() { + return vOutContext; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java new file mode 100644 index 0000000..51beb7c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.io.DataOutputBuffer; + +/** + * Class for copying the group key from an input batch to an output batch. + */ +public class VectorGroupKeyHelper extends VectorColumnSetInfo { + + public VectorGroupKeyHelper(int keyCount) { + super(keyCount); + } + + void init(VectorExpression[] keyExpressions) throws HiveException { + // Inspect the output type of each key expression. + for(int i=0; i < keyExpressions.length; ++i) { + addKey(keyExpressions[i].getOutputType()); + } + finishAdding(); + } + + public void copyGroupKey(VectorizedRowBatch inputBatch, VectorizedRowBatch outputBatch, + DataOutputBuffer buffer) throws HiveException { + // Grab the key at index 0. We don't care about selected or repeating since all keys in the input batch are the same. + for(int i = 0; i< longIndices.length; ++i) { + int keyIndex = longIndices[i]; + LongColumnVector inputColumnVector = (LongColumnVector) inputBatch.cols[keyIndex]; + LongColumnVector outputColumnVector = (LongColumnVector) outputBatch.cols[keyIndex]; + if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) { + outputColumnVector.vector[outputBatch.size] = inputColumnVector.vector[0]; + } else if (inputColumnVector.noNulls ){ + outputColumnVector.noNulls = false; + outputColumnVector.isNull[outputBatch.size] = true; + } else { + outputColumnVector.isNull[outputBatch.size] = true; + } + } + for(int i=0;i> scratchColumnVectorTypes; + if (mapredWork.getMapWork() != null) { + scratchColumnVectorTypes = mapredWork.getMapWork().getScratchColumnVectorTypes(); + } else { + scratchColumnVectorTypes = mapredWork.getReduceWork().getScratchColumnVectorTypes(); + } + columnTypeMap = scratchColumnVectorTypes.get(fileKey); this.rowOI= rowOI; this.rawRowOI = rowOI; } @@ -566,13 +572,17 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveExcepti } } - private void addScratchColumnsToBatch(VectorizedRowBatch vrb) { + private void addScratchColumnsToBatch(VectorizedRowBatch vrb) throws HiveException { if (columnTypeMap != null && !columnTypeMap.isEmpty()) { int origNumCols = vrb.numCols; int newNumCols = vrb.cols.length+columnTypeMap.keySet().size(); vrb.cols = Arrays.copyOf(vrb.cols, newNumCols); for (int i = origNumCols; i < newNumCols; i++) { - vrb.cols[i] = allocateColumnVector(columnTypeMap.get(i), + String typeName = columnTypeMap.get(i); + if (typeName == null) { + throw new HiveException("No type name for column " + i); + } + vrb.cols[i] = allocateColumnVector(typeName, VectorizedRowBatch.DEFAULT_SIZE); } vrb.numCols = vrb.cols.length; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index a622095..0b5248b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -38,9 +38,11 @@ import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; +import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -61,6 +63,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; @@ -71,6 +74,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.UDFAcos; import org.apache.hadoop.hive.ql.udf.UDFAsin; @@ -284,23 +288,26 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SemanticException { Task currTask = (Task) nd; if (currTask instanceof MapRedTask) { - convertMapWork(((MapRedTask) currTask).getWork().getMapWork()); + convertMapWork(((MapRedTask) currTask).getWork().getMapWork(), false); } else if (currTask instanceof TezTask) { TezWork work = ((TezTask) currTask).getWork(); for (BaseWork w: work.getAllWork()) { if (w instanceof MapWork) { - convertMapWork((MapWork)w); + convertMapWork((MapWork) w, true); } else if (w instanceof ReduceWork) { // We are only vectorizing Reduce under Tez. - convertReduceWork((ReduceWork)w); + if (HiveConf.getBoolVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) { + convertReduceWork((ReduceWork) w); + } } } } return null; } - private void convertMapWork(MapWork mapWork) throws SemanticException { - boolean ret = validateMapWork(mapWork); + private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException { + boolean ret = validateMapWork(mapWork, isTez); if (ret) { vectorizeMapWork(mapWork); } @@ -313,7 +320,8 @@ private void addMapWorkRules(Map opRules, NodeProcessor np) + ReduceSinkOperator.getOperatorName()), np); } - private boolean validateMapWork(MapWork mapWork) throws SemanticException { + private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticException { + LOG.info("Validating MapWork..."); // Validate the input format for (String path : mapWork.getPathToPartitionInfo().keySet()) { @@ -327,7 +335,7 @@ private boolean validateMapWork(MapWork mapWork) throws SemanticException { } } Map opRules = new LinkedHashMap(); - MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(); + MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(isTez); addMapWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -411,9 +419,12 @@ private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork) throws Sema private void addReduceWorkRules(Map opRules, NodeProcessor np) { opRules.put(new RuleRegExp("R1", ExtractOperator.getOperatorName() + ".*"), np); opRules.put(new RuleRegExp("R2", GroupByOperator.getOperatorName() + ".*"), np); + opRules.put(new RuleRegExp("R3", SelectOperator.getOperatorName() + ".*"), np); } private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException { + LOG.info("Validating ReduceWork..."); + // Validate input to ReduceWork. if (!getOnlyStructObjectInspectors(reduceWork)) { return false; @@ -481,16 +492,21 @@ private void vectorizeReduceWork(ReduceWork reduceWork) throws SemanticException class MapWorkValidationNodeProcessor implements NodeProcessor { + private boolean isTez; + + public MapWorkValidationNodeProcessor(boolean isTez) { + this.isTez = isTez; + } + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { for (Node n : stack) { Operator op = (Operator) n; - if ((op.getType().equals(OperatorType.REDUCESINK) || op.getType().equals(OperatorType.FILESINK)) && - op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) { + if (nonVectorizableChildOfGroupBy(op)) { return new Boolean(true); } - boolean ret = validateMapWorkOperator(op); + boolean ret = validateMapWorkOperator(op, isTez); if (!ret) { LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized."); return new Boolean(false); @@ -507,6 +523,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { for (Node n : stack) { Operator op = (Operator) n; + if (nonVectorizableChildOfGroupBy(op)) { + return new Boolean(true); + } boolean ret = validateReduceWorkOperator(op); if (!ret) { LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized."); @@ -573,21 +592,6 @@ public VectorizationContext walkStackToFindVectorizationContext(Stack stac return vContext; } - public Boolean nonVectorizableChildOfGroupBy(Operator op) { - Operator currentOp = op; - while (currentOp.getParentOperators().size() > 0) { - currentOp = currentOp.getParentOperators().get(0); - if (currentOp.getType().equals(OperatorType.GROUPBY)) { - // No need to vectorize - if (!opsDone.contains(op)) { - opsDone.add(op); - } - return true; - } - } - return false; - } - public Operator doVectorize(Operator op, VectorizationContext vContext) throws SemanticException { Operator vectorOp = op; @@ -659,9 +663,13 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, assert vContext != null; - // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs. So, don't vectorize - // any operators below GROUPBY. + // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't + // vectorize the operators below it. if (nonVectorizableChildOfGroupBy(op)) { + // No need to vectorize + if (!opsDone.contains(op)) { + opsDone.add(op); + } return null; } @@ -713,13 +721,22 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, assert vContext != null; - // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs. So, don't vectorize - // any operators below GROUPBY. + // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't + // vectorize the operators below it. if (nonVectorizableChildOfGroupBy(op)) { + // No need to vectorize + if (!opsDone.contains(op)) { + opsDone.add(op); + } return null; } Operator vectorOp = doVectorize(op, vContext); + if (vectorOp instanceof VectorGroupByOperator) { + VectorGroupByOperator groupBy = (VectorGroupByOperator) vectorOp; + VectorGroupByDesc vectorDesc = groupBy.getConf().getVectorDesc(); + vectorDesc.setVectorGroupBatches(true); + } if (saveRootVectorOp && op != vectorOp) { rootVectorOp = vectorOp; } @@ -766,7 +783,7 @@ public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { return pctx; } - boolean validateMapWorkOperator(Operator op) { + boolean validateMapWorkOperator(Operator op, boolean isTez) { boolean ret = false; switch (op.getType()) { case MAPJOIN: @@ -777,7 +794,7 @@ boolean validateMapWorkOperator(Operator op) { } break; case GROUPBY: - ret = validateGroupByOperator((GroupByOperator) op); + ret = validateGroupByOperator((GroupByOperator) op, false, isTez); break; case FILTER: ret = validateFilterOperator((FilterOperator) op); @@ -808,6 +825,17 @@ boolean validateReduceWorkOperator(Operator op) { case EXTRACT: ret = validateExtractOperator((ExtractOperator) op); break; + case MAPJOIN: + // Does MAPJOIN actually get planned in Reduce? + if (op instanceof MapJoinOperator) { + ret = validateMapJoinOperator((MapJoinOperator) op); + } else if (op instanceof SMBMapJoinOperator) { + ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op); + } + break; + case GROUPBY: + ret = validateGroupByOperator((GroupByOperator) op, true, true); + break; case FILTER: ret = validateFilterOperator((FilterOperator) op); break; @@ -828,6 +856,23 @@ boolean validateReduceWorkOperator(Operator op) { return ret; } + public Boolean nonVectorizableChildOfGroupBy(Operator op) { + Operator currentOp = op; + while (currentOp.getParentOperators().size() > 0) { + currentOp = currentOp.getParentOperators().get(0); + if (currentOp.getType().equals(OperatorType.GROUPBY)) { + GroupByDesc desc = (GroupByDesc)currentOp.getConf(); + boolean isVectorOutput = desc.getVectorDesc().isVectorOutput(); + if (isVectorOutput) { + // This GROUP BY does vectorize its output. + return false; + } + return true; + } + } + return false; + } + private boolean validateSMBMapJoinOperator(SMBMapJoinOperator op) { SMBJoinDesc desc = op.getConf(); // Validation is the same as for map join, since the 'small' tables are not vectorized @@ -878,16 +923,50 @@ private boolean validateFilterOperator(FilterOperator op) { return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.FILTER); } - private boolean validateGroupByOperator(GroupByOperator op) { - if (op.getConf().isGroupingSetsPresent()) { + private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTez) { + GroupByDesc desc = op.getConf(); + VectorGroupByDesc vectorDesc = desc.getVectorDesc(); + + if (desc.isGroupingSetsPresent()) { LOG.warn("Grouping sets not supported in vector mode"); return false; } - boolean ret = validateExprNodeDesc(op.getConf().getKeys()); + boolean ret = validateExprNodeDesc(desc.getKeys()); + if (!ret) { + return false; + } + ret = validateAggregationDesc(desc.getAggregators()); if (!ret) { return false; } - return validateAggregationDesc(op.getConf().getAggregators()); + boolean isVectorOutput = isTez && aggregatorsOutputIsPrimitive(desc.getAggregators()); + vectorDesc.setVectorOutput(isVectorOutput); + if (isReduce) { + if (desc.isDistinct()) { + LOG.warn("Distinct not supported in reduce vector mode"); + return false; + } + // Sort-based GroupBy? + if (desc.getMode() != GroupByDesc.Mode.COMPLETE && + desc.getMode() != GroupByDesc.Mode.PARTIAL1 && + desc.getMode() != GroupByDesc.Mode.PARTIAL2 && + desc.getMode() != GroupByDesc.Mode.MERGEPARTIAL) { + LOG.warn("Reduce vector mode not supported when input for GROUP BY not sorted"); + return false; + } + if (desc.getGroupKeyNotReductionKey()) { + LOG.warn("Reduce vector mode not supported when group key is not reduction key"); + return false; + } + if (!isVectorOutput) { + LOG.warn("Reduce vector mode only supported when aggregate outputs are primitive types"); + return false; + } + vectorDesc.setVectorGroupBatches(true); + } else { + LOG.info("Downstream operators of map-side GROUP BY will be vectorized: " + isVectorOutput); + } + return true; } private boolean validateExtractOperator(ExtractOperator op) { @@ -935,9 +1014,7 @@ private boolean validateExprNodeDescRecursive(ExprNodeDesc desc) { String typeName = desc.getTypeInfo().getTypeName(); boolean ret = validateDataType(typeName); if (!ret) { - if (LOG.isDebugEnabled()) { - LOG.debug("Cannot vectorize " + desc.toString() + " of type " + typeName); - } + LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName); return false; } if (desc instanceof ExprNodeGenericFuncDesc) { @@ -970,12 +1047,11 @@ boolean validateExprNodeDesc(ExprNodeDesc desc, VectorExpressionDescriptor.Mode VectorizationContext vc = new ValidatorVectorizationContext(); if (vc.getVectorExpression(desc, mode) == null) { // TODO: this cannot happen - VectorizationContext throws in such cases. + LOG.info("getVectorExpression returned null"); return false; } } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to vectorize", e); - } + LOG.info("Failed to vectorize", e); return false; } return true; @@ -1001,9 +1077,49 @@ private boolean validateAggregationDesc(AggregationDesc aggDesc) { if (aggDesc.getParameters() != null) { return validateExprNodeDesc(aggDesc.getParameters()); } + // See if we can vectorize the aggregation. + try { + VectorizationContext vc = new ValidatorVectorizationContext(); + if (vc.getAggregatorExpression(aggDesc) == null) { + // TODO: this cannot happen - VectorizationContext throws in such cases. + LOG.info("getAggregatorExpression returned null"); + return false; + } + } catch (Exception e) { + LOG.info("Failed to vectorize", e); + return false; + } + return true; + } + + private boolean aggregatorsOutputIsPrimitive(List descs) { + for (AggregationDesc d : descs) { + boolean ret = aggregatorsOutputIsPrimitive(d); + if (!ret) { + return false; + } + } return true; } + private boolean aggregatorsOutputIsPrimitive(AggregationDesc aggDesc) { + VectorizationContext vc = new ValidatorVectorizationContext(); + VectorAggregateExpression vectorAggrExpr; + try { + vectorAggrExpr = vc.getAggregatorExpression(aggDesc); + } catch (Exception e) { + // We should have already attempted to vectorize in validateAggregationDesc. + LOG.info("Vectorization of aggreation should have succeeded ", e); + return false; + } + + ObjectInspector outputObjInspector = vectorAggrExpr.getOutputObjectInspector(); + if (outputObjInspector.getCategory() == ObjectInspector.Category.PRIMITIVE) { + return true; + } + return false; + } + private boolean validateDataType(String type) { return supportedDataTypesPattern.matcher(type.toLowerCase()).matches(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractVectorDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractVectorDesc.java new file mode 100644 index 0000000..5157ebd --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractVectorDesc.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +public class AbstractVectorDesc implements VectorDesc { + + @Override + public Object clone() throws CloneNotSupportedException { + throw new CloneNotSupportedException("clone not supported"); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java index 4475b76..698d970 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java @@ -69,7 +69,11 @@ transient private boolean isDistinct; private boolean dontResetAggrsDistinct; + // Extra parameters only for vectorization. + private VectorGroupByDesc vectorDesc; + public GroupByDesc() { + vectorDesc = new VectorGroupByDesc(); } public GroupByDesc( @@ -102,6 +106,7 @@ public GroupByDesc( final boolean groupingSetsPresent, final int groupingSetsPosition, final boolean isDistinct) { + vectorDesc = new VectorGroupByDesc(); this.mode = mode; this.outputColumnNames = outputColumnNames; this.keys = keys; @@ -116,6 +121,14 @@ public GroupByDesc( this.isDistinct = isDistinct; } + public void setVectorDesc(VectorGroupByDesc vectorDesc) { + this.vectorDesc = vectorDesc; + } + + public VectorGroupByDesc getVectorDesc() { + return vectorDesc; + } + public Mode getMode() { return mode; } @@ -268,14 +281,6 @@ public void setGroupingSetPosition(int groupingSetPosition) { this.groupingSetPosition = groupingSetPosition; } - public boolean isDistinct() { - return isDistinct; - } - - public void setDistinct(boolean isDistinct) { - this.isDistinct = isDistinct; - } - public boolean isDontResetAggrsDistinct() { return dontResetAggrsDistinct; } @@ -283,4 +288,13 @@ public boolean isDontResetAggrsDistinct() { public void setDontResetAggrsDistinct(boolean dontResetAggrsDistinct) { this.dontResetAggrsDistinct = dontResetAggrsDistinct; } + + public boolean isDistinct() { + return isDistinct; + } + + public void setDistinct(boolean isDistinct) { + this.isDistinct = isDistinct; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/VectorDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/VectorDesc.java new file mode 100644 index 0000000..3a2efdb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/VectorDesc.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; + +public interface VectorDesc extends Serializable, Cloneable { + public Object clone() throws CloneNotSupportedException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java new file mode 100644 index 0000000..c4de39b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByDesc.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +/** + * VectorGroupByDesc. + * + * Extra parameters beyond GroupByDesc just for the VectorGroupByOperator. + * + * We don't extend GroupByDesc because the base OperatorDesc doesn't support + * clone and adding it is a lot work for little gain. + */ +public class VectorGroupByDesc extends AbstractVectorDesc { + + private static long serialVersionUID = 1L; + + private boolean isVectorGroupBatches; + private boolean isVectorOutput; + + public VectorGroupByDesc() { + this.isVectorGroupBatches = false; + } + + public boolean isVectorGroupBatches() { + return isVectorGroupBatches; + } + + public void setVectorGroupBatches(boolean isVectorGroupBatches) { + this.isVectorGroupBatches = isVectorGroupBatches; + } + + public boolean isVectorOutput() { + return isVectorOutput; + } + + public void setVectorOutput(boolean isVectorOutput) { + this.isVectorOutput = isVectorOutput; + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java index 0a0e0c3..fbab3af 100644 --- ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java +++ ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java @@ -107,7 +107,7 @@ public void testAggregateOnUDF() throws HiveException { gbyOp.setConf(desc); Vectorizer v = new Vectorizer(); - Assert.assertTrue(v.validateMapWorkOperator(gbyOp)); + Assert.assertTrue(v.validateMapWorkOperator(gbyOp, false)); VectorGroupByOperator vectorOp = (VectorGroupByOperator) v.vectorizeOperator(gbyOp, vContext); Assert.assertEquals(VectorUDAFSumLong.class, vectorOp.getAggregators()[0].getClass()); VectorUDAFSumLong udaf = (VectorUDAFSumLong) vectorOp.getAggregators()[0]; @@ -150,7 +150,7 @@ public void testValidateNestedExpressions() { /** * prepareAbstractMapJoin prepares a join operator descriptor, used as helper by SMB and Map join tests. */ - private void prepareAbstractMapJoin(AbstractMapJoinOperator mop, MapJoinDesc mjdesc) { + private void prepareAbstractMapJoin(AbstractMapJoinOperator map, MapJoinDesc mjdesc) { mjdesc.setPosBigTable(0); List expr = new ArrayList(); expr.add(new ExprNodeColumnDesc(Integer.class, "col1", "T", false)); @@ -180,14 +180,14 @@ private void prepareAbstractMapJoin(AbstractMapJoinOperator 0 limit 7 +PREHOOK: type: QUERY +POSTHOOK: query: explain SELECT cbigint, cdouble FROM alltypesorc WHERE cbigint < cdouble and cint > 0 limit 7 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: 7 + Processor Tree: + TableScan + alias: alltypesorc + Statistics: Num rows: 18861 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((cbigint < cdouble) and (cint > 0)) (type: boolean) + Statistics: Num rows: 2095 Data size: 41901 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cbigint (type: bigint), cdouble (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2095 Data size: 41901 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 7 + Statistics: Num rows: 7 Data size: 140 Basic stats: COMPLETE Column stats: NONE + ListSink + +WARNING: Comparing a bigint and a double may result in a loss of precision. +PREHOOK: query: SELECT cbigint, cdouble FROM alltypesorc WHERE cbigint < cdouble and cint > 0 limit 7 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: SELECT cbigint, cdouble FROM alltypesorc WHERE cbigint < cdouble and cint > 0 limit 7 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +-1887561756 1839.0 +-1887561756 -10011.0 +-1887561756 -13877.0 +-1887561756 10361.0 +-1887561756 -8881.0 +-1887561756 -2281.0 +-1887561756 9531.0 +PREHOOK: query: -- HIVE-3562 Some limit can be pushed down to map stage - c/p parts from limit_pushdown + +explain +select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20 +PREHOOK: type: QUERY +POSTHOOK: query: -- HIVE-3562 Some limit can be pushed down to map stage - c/p parts from limit_pushdown + +explain +select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: alltypesorc + Statistics: Num rows: 23577 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ctinyint is not null (type: boolean) + Statistics: Num rows: 11789 Data size: 188626 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: ctinyint (type: tinyint), cdouble (type: double), csmallint (type: smallint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 11789 Data size: 188626 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: tinyint), _col1 (type: double) + sort order: ++ + Statistics: Num rows: 11789 Data size: 188626 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.3 + value expressions: _col2 (type: smallint) + Reducer 2 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: tinyint), KEY.reducesinkkey1 (type: double), VALUE._col0 (type: smallint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 11789 Data size: 188626 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 20 + Statistics: Num rows: 20 Data size: 320 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 20 Data size: 320 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: vectorized + + Stage: Stage-0 + Fetch Operator + limit: 20 + Processor Tree: + ListSink + +PREHOOK: query: select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: select ctinyint,cdouble,csmallint from alltypesorc where ctinyint is not null order by ctinyint,cdouble limit 20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +-64 -15920.0 -15920 +-64 -10462.0 -10462 +-64 -9842.0 -9842 +-64 -8080.0 -8080 +-64 -7196.0 -7196 +-64 -7196.0 -7196 +-64 -7196.0 -7196 +-64 -7196.0 -7196 +-64 -7196.0 -7196 +-64 -7196.0 -7196 +-64 -7196.0 -7196 +-64 -6907.0 -6907 +-64 -4803.0 -4803 +-64 -4040.0 -4040 +-64 -4018.0 -4018 +-64 -3586.0 -3586 +-64 -3097.0 -3097 +-64 -2919.0 -2919 +-64 -1600.0 -1600 +-64 -200.0 -200 +PREHOOK: query: -- deduped RS +explain +select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20 +PREHOOK: type: QUERY +POSTHOOK: query: -- deduped RS +explain +select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: alltypesorc + Statistics: Num rows: 31436 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: ctinyint (type: tinyint), cdouble (type: double) + outputColumnNames: ctinyint, cdouble + Statistics: Num rows: 31436 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: avg((cdouble + 1)) + keys: ctinyint (type: tinyint) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 31436 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: tinyint) + sort order: + + Map-reduce partition columns: _col0 (type: tinyint) + Statistics: Num rows: 31436 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.3 + value expressions: _col1 (type: struct) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: avg(VALUE._col0) + keys: KEY._col0 (type: tinyint) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 15718 Data size: 188618 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: tinyint), _col1 (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 15718 Data size: 188618 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 20 + Statistics: Num rows: 20 Data size: 240 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 20 Data size: 240 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 20 + Processor Tree: + ListSink + +PREHOOK: query: select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: select ctinyint,avg(cdouble + 1) from alltypesorc group by ctinyint order by ctinyint limit 20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +NULL 9370.0945309795 +-64 373.52941176470586 +-63 2178.7272727272725 +-62 245.69387755102042 +-61 914.3404255319149 +-60 1071.82 +-59 318.27272727272725 +-58 3483.2444444444445 +-57 1867.0535714285713 +-56 2595.818181818182 +-55 2385.595744680851 +-54 2712.7272727272725 +-53 -532.7567567567568 +-52 2810.705882352941 +-51 -96.46341463414635 +-50 -960.0192307692307 +-49 768.7659574468086 +-48 1672.909090909091 +-47 -574.6428571428571 +-46 3033.55 +PREHOOK: query: -- distincts +explain +select distinct(ctinyint) from alltypesorc limit 20 +PREHOOK: type: QUERY +POSTHOOK: query: -- distincts +explain +select distinct(ctinyint) from alltypesorc limit 20 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: alltypesorc + Statistics: Num rows: 94309 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: ctinyint (type: tinyint) + outputColumnNames: ctinyint + Statistics: Num rows: 94309 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: ctinyint (type: tinyint) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 94309 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: tinyint) + sort order: + + Map-reduce partition columns: _col0 (type: tinyint) + Statistics: Num rows: 94309 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.3 + Execution mode: vectorized + Reducer 2 + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: tinyint) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 47154 Data size: 188616 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: tinyint) + outputColumnNames: _col0 + Statistics: Num rows: 47154 Data size: 188616 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 20 + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 20 Data size: 80 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 20 + Processor Tree: + ListSink + +PREHOOK: query: select distinct(ctinyint) from alltypesorc limit 20 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: select distinct(ctinyint) from alltypesorc limit 20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +NULL +-64 +-63 +-62 +-61 +-60 +-59 +-58 +-57 +-56 +-55 +-54 +-53 +-52 +-51 +-50 +-49 +-48 +-47 +-46 +PREHOOK: query: explain +select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: alltypesorc + Statistics: Num rows: 31436 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: ctinyint (type: tinyint), cdouble (type: double) + outputColumnNames: ctinyint, cdouble + Statistics: Num rows: 31436 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(DISTINCT cdouble) + keys: ctinyint (type: tinyint), cdouble (type: double) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 31436 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: tinyint), _col1 (type: double) + sort order: ++ + Map-reduce partition columns: _col0 (type: tinyint) + Statistics: Num rows: 31436 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.3 + Execution mode: vectorized + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(DISTINCT KEY._col1:0._col0) + keys: KEY._col0 (type: tinyint) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 15718 Data size: 188618 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: tinyint), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 15718 Data size: 188618 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 20 + Statistics: Num rows: 20 Data size: 240 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 20 Data size: 240 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: 20 + Processor Tree: + ListSink + +PREHOOK: query: select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +NULL 2932 +-64 24 +-63 19 +-62 27 +-61 25 +-60 27 +-59 31 +-58 23 +-57 35 +-56 36 +-55 29 +-54 26 +-53 22 +-52 33 +-51 21 +-50 30 +-49 26 +-48 29 +-47 22 +-46 24 +PREHOOK: query: -- limit zero +explain +select ctinyint,cdouble from alltypesorc order by ctinyint limit 0 +PREHOOK: type: QUERY +POSTHOOK: query: -- limit zero +explain +select ctinyint,cdouble from alltypesorc order by ctinyint limit 0 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: 0 + Processor Tree: + ListSink + +PREHOOK: query: select ctinyint,cdouble from alltypesorc order by ctinyint limit 0 +PREHOOK: type: QUERY +#### A masked pattern was here #### +POSTHOOK: query: select ctinyint,cdouble from alltypesorc order by ctinyint limit 0 +POSTHOOK: type: QUERY +#### A masked pattern was here #### +PREHOOK: query: -- 2MR (applied to last RS) +explain +select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20 +PREHOOK: type: QUERY +POSTHOOK: query: -- 2MR (applied to last RS) +explain +select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: alltypesorc + Statistics: Num rows: 31436 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ctinyint is not null (type: boolean) + Statistics: Num rows: 15718 Data size: 188618 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cdouble (type: double), ctinyint (type: tinyint) + outputColumnNames: cdouble, ctinyint + Statistics: Num rows: 15718 Data size: 188618 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: sum(ctinyint) + keys: cdouble (type: double) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 15718 Data size: 188618 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: double) + sort order: + + Map-reduce partition columns: _col0 (type: double) + Statistics: Num rows: 15718 Data size: 188618 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: bigint) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + keys: KEY._col0 (type: double) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 7859 Data size: 94309 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: double), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 7859 Data size: 94309 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: bigint), _col0 (type: double) + sort order: ++ + Statistics: Num rows: 7859 Data size: 94309 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.3 + Reducer 3 + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey1 (type: double), KEY.reducesinkkey0 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 7859 Data size: 94309 Basic stats: COMPLETE Column stats: NONE + Limit + Number of rows: 20 + Statistics: Num rows: 20 Data size: 240 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 20 Data size: 240 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: vectorized + + Stage: Stage-0 + Fetch Operator + limit: 20 + Processor Tree: + ListSink + +PREHOOK: query: select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: select cdouble, sum(ctinyint) as sum from alltypesorc where ctinyint is not null group by cdouble order by sum, cdouble limit 20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +NULL -32768 +-7196.0 -2009 +15601.0 -1733 +4811.0 -115 +-11322.0 -101 +-1121.0 -89 +7705.0 -88 +3520.0 -86 +-8118.0 -80 +5241.0 -80 +-11492.0 -78 +9452.0 -76 +557.0 -75 +10496.0 -67 +-15920.0 -64 +-10462.0 -64 +-9842.0 -64 +-8080.0 -64 +-6907.0 -64 +-4803.0 -64 diff --git ql/src/test/results/clientpositive/tez/vectorized_distinct_gby.q.out ql/src/test/results/clientpositive/tez/vectorized_distinct_gby.q.out new file mode 100644 index 0000000..f0a1743 --- /dev/null +++ ql/src/test/results/clientpositive/tez/vectorized_distinct_gby.q.out @@ -0,0 +1,157 @@ +PREHOOK: query: create table dtest(a int, b int) clustered by (a) sorted by (a) into 1 buckets stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dtest +POSTHOOK: query: create table dtest(a int, b int) clustered by (a) sorted by (a) into 1 buckets stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dtest +PREHOOK: query: insert into table dtest select c,b from (select array(300,300,300,300,300) as a, 1 as b from src limit 1) y lateral view explode(a) t1 as c +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dtest +POSTHOOK: query: insert into table dtest select c,b from (select array(300,300,300,300,300) as a, 1 as b from src limit 1) y lateral view explode(a) t1 as c +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@dtest +POSTHOOK: Lineage: dtest.a SIMPLE [] +POSTHOOK: Lineage: dtest.b EXPRESSION [] +PREHOOK: query: explain select sum(distinct a), count(distinct a) from dtest +PREHOOK: type: QUERY +POSTHOOK: query: explain select sum(distinct a), count(distinct a) from dtest +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: dtest + Statistics: Num rows: 5 Data size: 40 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: a (type: int) + outputColumnNames: a + Statistics: Num rows: 5 Data size: 40 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: sum(DISTINCT a), count(DISTINCT a) + bucketGroup: true + keys: a (type: int) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 5 Data size: 40 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Statistics: Num rows: 5 Data size: 40 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: sum(DISTINCT KEY._col0:0._col0), count(DISTINCT KEY._col0:1._col0) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: bigint), _col1 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 24 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select sum(distinct a), count(distinct a) from dtest +PREHOOK: type: QUERY +PREHOOK: Input: default@dtest +#### A masked pattern was here #### +POSTHOOK: query: select sum(distinct a), count(distinct a) from dtest +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dtest +#### A masked pattern was here #### +300 1 +PREHOOK: query: explain select sum(distinct cint), count(distinct cint), avg(distinct cint), std(distinct cint) from alltypesorc +PREHOOK: type: QUERY +POSTHOOK: query: explain select sum(distinct cint), count(distinct cint), avg(distinct cint), std(distinct cint) from alltypesorc +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: alltypesorc + Statistics: Num rows: 94309 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: cint + Statistics: Num rows: 94309 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: sum(DISTINCT cint), count(DISTINCT cint), avg(DISTINCT cint), std(DISTINCT cint) + keys: cint (type: int) + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 94309 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Statistics: Num rows: 94309 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: sum(DISTINCT KEY._col0:0._col0), count(DISTINCT KEY._col0:1._col0), avg(DISTINCT KEY._col0:2._col0), std(DISTINCT KEY._col0:3._col0) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 32 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: bigint), _col1 (type: bigint), _col2 (type: double), _col3 (type: double) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 32 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 32 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select sum(distinct cint), count(distinct cint), avg(distinct cint), std(distinct cint) from alltypesorc +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: select sum(distinct cint), count(distinct cint), avg(distinct cint), std(distinct cint) from alltypesorc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +-3482841611 6082 -572647.4204209142 6.153814687328991E8 diff --git ql/src/test/results/clientpositive/tez/vectorized_nested_mapjoin.q.out ql/src/test/results/clientpositive/tez/vectorized_nested_mapjoin.q.out index 120eba6..05c6608 100644 --- ql/src/test/results/clientpositive/tez/vectorized_nested_mapjoin.q.out +++ ql/src/test/results/clientpositive/tez/vectorized_nested_mapjoin.q.out @@ -114,6 +114,7 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: vectorized Stage: Stage-0 Fetch Operator diff --git ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out index 982e8ba..6dead26 100644 --- ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out +++ ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out @@ -1779,6 +1779,7 @@ STAGE PLANS: tag: -1 value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: int) auto parallelism: true + Execution mode: vectorized Reducer 4 Needs Tagging: false Reduce Operator Tree: @@ -4579,6 +4580,7 @@ STAGE PLANS: TotalFiles: 1 GatherStats: false MultiFileSpray: false + Execution mode: vectorized Stage: Stage-0 Fetch Operator @@ -4826,6 +4828,7 @@ STAGE PLANS: tag: -1 value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: double) auto parallelism: true + Execution mode: vectorized Reducer 3 Needs Tagging: false Reduce Operator Tree: diff --git ql/src/test/results/clientpositive/tez/vectorized_rcfile_columnar.q.out ql/src/test/results/clientpositive/tez/vectorized_rcfile_columnar.q.out new file mode 100644 index 0000000..ee8959b --- /dev/null +++ ql/src/test/results/clientpositive/tez/vectorized_rcfile_columnar.q.out @@ -0,0 +1,62 @@ +PREHOOK: query: --This query must pass even when vectorized reader is not available for +--RC files. The query must fall back to the non-vector mode and run successfully. + +CREATE table columnTable (key STRING, value STRING) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' +STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileOutputFormat' +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@columnTable +POSTHOOK: query: --This query must pass even when vectorized reader is not available for +--RC files. The query must fall back to the non-vector mode and run successfully. + +CREATE table columnTable (key STRING, value STRING) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe' +STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileOutputFormat' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@columnTable +PREHOOK: query: FROM src +INSERT OVERWRITE TABLE columnTable SELECT src.key, src.value LIMIT 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@columntable +POSTHOOK: query: FROM src +INSERT OVERWRITE TABLE columnTable SELECT src.key, src.value LIMIT 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@columntable +POSTHOOK: Lineage: columntable.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: columntable.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: describe columnTable +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@columntable +POSTHOOK: query: describe columnTable +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@columntable +key string +value string +PREHOOK: query: SELECT key, value FROM columnTable ORDER BY key +PREHOOK: type: QUERY +PREHOOK: Input: default@columntable +#### A masked pattern was here #### +POSTHOOK: query: SELECT key, value FROM columnTable ORDER BY key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@columntable +#### A masked pattern was here #### +165 val_165 +238 val_238 +255 val_255 +27 val_27 +278 val_278 +311 val_311 +409 val_409 +484 val_484 +86 val_86 +98 val_98 diff --git ql/src/test/results/clientpositive/tez/vectorized_string_funcs.q.out ql/src/test/results/clientpositive/tez/vectorized_string_funcs.q.out new file mode 100644 index 0000000..456e464 --- /dev/null +++ ql/src/test/results/clientpositive/tez/vectorized_string_funcs.q.out @@ -0,0 +1,126 @@ +PREHOOK: query: -- Test string functions in vectorized mode to verify end-to-end functionality. + +explain +select + substr(cstring1, 1, 2) + ,substr(cstring1, 2) + ,lower(cstring1) + ,upper(cstring1) + ,ucase(cstring1) + ,length(cstring1) + ,trim(cstring1) + ,ltrim(cstring1) + ,rtrim(cstring1) + ,concat(cstring1, cstring2) + ,concat('>', cstring1) + ,concat(cstring1, '<') + ,concat(substr(cstring1, 1, 2), substr(cstring2, 1, 2)) +from alltypesorc +-- Limit the number of rows of output to a reasonable amount. +where cbigint % 237 = 0 +-- Test function use in the WHERE clause. +and length(substr(cstring1, 1, 2)) <= 2 +and cstring1 like '%' +PREHOOK: type: QUERY +POSTHOOK: query: -- Test string functions in vectorized mode to verify end-to-end functionality. + +explain +select + substr(cstring1, 1, 2) + ,substr(cstring1, 2) + ,lower(cstring1) + ,upper(cstring1) + ,ucase(cstring1) + ,length(cstring1) + ,trim(cstring1) + ,ltrim(cstring1) + ,rtrim(cstring1) + ,concat(cstring1, cstring2) + ,concat('>', cstring1) + ,concat(cstring1, '<') + ,concat(substr(cstring1, 1, 2), substr(cstring2, 1, 2)) +from alltypesorc +-- Limit the number of rows of output to a reasonable amount. +where cbigint % 237 = 0 +-- Test function use in the WHERE clause. +and length(substr(cstring1, 1, 2)) <= 2 +and cstring1 like '%' +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: alltypesorc + Statistics: Num rows: 1813 Data size: 377237 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: ((((cbigint % 237) = 0) and (length(substr(cstring1, 1, 2)) <= 2)) and (cstring1 like '%')) (type: boolean) + Statistics: Num rows: 151 Data size: 31419 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: substr(cstring1, 1, 2) (type: string), substr(cstring1, 2) (type: string), lower(cstring1) (type: string), upper(cstring1) (type: string), upper(cstring1) (type: string), length(cstring1) (type: int), trim(cstring1) (type: string), ltrim(cstring1) (type: string), rtrim(cstring1) (type: string), concat(cstring1, cstring2) (type: string), concat('>', cstring1) (type: string), concat(cstring1, '<') (type: string), concat(substr(cstring1, 1, 2), substr(cstring2, 1, 2)) (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8, _col9, _col10, _col11, _col12 + Statistics: Num rows: 151 Data size: 31419 Basic stats: COMPLETE Column stats: NONE + ListSink + +PREHOOK: query: select + substr(cstring1, 1, 2) + ,substr(cstring1, 2) + ,lower(cstring1) + ,upper(cstring1) + ,ucase(cstring1) + ,length(cstring1) + ,trim(cstring1) + ,ltrim(cstring1) + ,rtrim(cstring1) + ,concat(cstring1, cstring2) + ,concat('>', cstring1) + ,concat(cstring1, '<') + ,concat(substr(cstring1, 1, 2), substr(cstring2, 1, 2)) +from alltypesorc +-- Limit the number of rows of output to a reasonable amount. +where cbigint % 237 = 0 +-- Test function use in the WHERE clause. +and length(substr(cstring1, 1, 2)) <= 2 +and cstring1 like '%' +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: select + substr(cstring1, 1, 2) + ,substr(cstring1, 2) + ,lower(cstring1) + ,upper(cstring1) + ,ucase(cstring1) + ,length(cstring1) + ,trim(cstring1) + ,ltrim(cstring1) + ,rtrim(cstring1) + ,concat(cstring1, cstring2) + ,concat('>', cstring1) + ,concat(cstring1, '<') + ,concat(substr(cstring1, 1, 2), substr(cstring2, 1, 2)) +from alltypesorc +-- Limit the number of rows of output to a reasonable amount. +where cbigint % 237 = 0 +-- Test function use in the WHERE clause. +and length(substr(cstring1, 1, 2)) <= 2 +and cstring1 like '%' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +Vi iqXS6s88N1yr14lj7I viqxs6s88n1yr14lj7i VIQXS6S88N1YR14LJ7I VIQXS6S88N1YR14LJ7I 19 ViqXS6s88N1yr14lj7I ViqXS6s88N1yr14lj7I ViqXS6s88N1yr14lj7I ViqXS6s88N1yr14lj7ITh638b67kn8o >ViqXS6s88N1yr14lj7I ViqXS6s88N1yr14lj7I< ViTh +R4 4e7Gf r4e7gf R4E7GF R4E7GF 6 R4e7Gf R4e7Gf R4e7Gf R4e7GfPTBh56R3LS7L13sB4 >R4e7Gf R4e7Gf< R4PT +3g gubGh4J18TV 3gubgh4j18tv 3GUBGH4J18TV 3GUBGH4J18TV 12 3gubGh4J18TV 3gubGh4J18TV 3gubGh4J18TV 3gubGh4J18TVpJucOe4dN4R5XURJW8 >3gubGh4J18TV 3gubGh4J18TV< 3gpJ +EP PCRx8ObNv51rOF epcrx8obnv51rof EPCRX8OBNV51ROF EPCRX8OBNV51ROF 15 EPCRx8ObNv51rOF EPCRx8ObNv51rOF EPCRx8ObNv51rOF EPCRx8ObNv51rOFysaU2Xm11f715L0I35rut2 >EPCRx8ObNv51rOF EPCRx8ObNv51rOF< EPys +8e eiti74gc5m01xyMKSjUIx 8eiti74gc5m01xymksjuix 8EITI74GC5M01XYMKSJUIX 8EITI74GC5M01XYMKSJUIX 22 8eiti74gc5m01xyMKSjUIx 8eiti74gc5m01xyMKSjUIx 8eiti74gc5m01xyMKSjUIx 8eiti74gc5m01xyMKSjUIxI8x87Fm1J4hE8g4CWNo >8eiti74gc5m01xyMKSjUIx 8eiti74gc5m01xyMKSjUIx< 8eI8 +m0 0hbv1516qk8 m0hbv1516qk8 M0HBV1516QK8 M0HBV1516QK8 12 m0hbv1516qk8 m0hbv1516qk8 m0hbv1516qk8 m0hbv1516qk8N8i3sxF54C4x5h0 >m0hbv1516qk8 m0hbv1516qk8< m0N8 +uT T5e2 ut5e2 UT5E2 UT5E2 5 uT5e2 uT5e2 uT5e2 uT5e2SJp57VKYsDtA2r1Xb2H >uT5e2 uT5e2< uTSJ +l3 35W8012cM77E227Ts l35w8012cm77e227ts L35W8012CM77E227TS L35W8012CM77E227TS 18 l35W8012cM77E227Ts l35W8012cM77E227Ts l35W8012cM77E227Ts l35W8012cM77E227TsMH38bE >l35W8012cM77E227Ts l35W8012cM77E227Ts< l3MH +o1 1uPH5EflET5ts1RjSB74 o1uph5eflet5ts1rjsb74 O1UPH5EFLET5TS1RJSB74 O1UPH5EFLET5TS1RJSB74 21 o1uPH5EflET5ts1RjSB74 o1uPH5EflET5ts1RjSB74 o1uPH5EflET5ts1RjSB74 o1uPH5EflET5ts1RjSB74a1U3DRA788kW7I0UTF203 >o1uPH5EflET5ts1RjSB74 o1uPH5EflET5ts1RjSB74< o1a1 +Ix x8dXlDbC3S44L1FQJqpwa ix8dxldbc3s44l1fqjqpwa IX8DXLDBC3S44L1FQJQPWA IX8DXLDBC3S44L1FQJQPWA 22 Ix8dXlDbC3S44L1FQJqpwa Ix8dXlDbC3S44L1FQJqpwa Ix8dXlDbC3S44L1FQJqpwa Ix8dXlDbC3S44L1FQJqpwa8wQR4X28CiccBVXGqPL7 >Ix8dXlDbC3S44L1FQJqpwa Ix8dXlDbC3S44L1FQJqpwa< Ix8w +OT Tn0Dj2HiBi05Baq1Xt otn0dj2hibi05baq1xt OTN0DJ2HIBI05BAQ1XT OTN0DJ2HIBI05BAQ1XT 19 OTn0Dj2HiBi05Baq1Xt OTn0Dj2HiBi05Baq1Xt OTn0Dj2HiBi05Baq1Xt OTn0Dj2HiBi05Baq1XtAoQ21J1lQ27kYSmfA >OTn0Dj2HiBi05Baq1Xt OTn0Dj2HiBi05Baq1Xt< OTAo +a0 0P3sn1ihxJCsTLDb a0p3sn1ihxjcstldb A0P3SN1IHXJCSTLDB A0P3SN1IHXJCSTLDB 17 a0P3sn1ihxJCsTLDb a0P3sn1ihxJCsTLDb a0P3sn1ihxJCsTLDb a0P3sn1ihxJCsTLDbfT4Jlw38k8kmd6Dt1wv >a0P3sn1ihxJCsTLDb a0P3sn1ihxJCsTLDb< a0fT diff --git ql/src/test/results/clientpositive/vectorization_limit.q.out ql/src/test/results/clientpositive/vectorization_limit.q.out index cef59ba..17fc2c9 100644 --- ql/src/test/results/clientpositive/vectorization_limit.q.out +++ ql/src/test/results/clientpositive/vectorization_limit.q.out @@ -31,7 +31,6 @@ STAGE PLANS: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Execution mode: vectorized Stage: Stage-0 Fetch Operator @@ -89,7 +88,6 @@ STAGE PLANS: Statistics: Num rows: 11789 Data size: 188626 Basic stats: COMPLETE Column stats: NONE TopN Hash Memory Usage: 0.3 value expressions: _col2 (type: smallint) - Execution mode: vectorized Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: tinyint), KEY.reducesinkkey1 (type: double), VALUE._col0 (type: smallint) @@ -176,7 +174,6 @@ STAGE PLANS: Statistics: Num rows: 31436 Data size: 377237 Basic stats: COMPLETE Column stats: NONE TopN Hash Memory Usage: 0.3 value expressions: _col1 (type: struct) - Execution mode: vectorized Reduce Operator Tree: Group By Operator aggregations: avg(VALUE._col0) @@ -476,7 +473,6 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: double) Statistics: Num rows: 15718 Data size: 188618 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: bigint) - Execution mode: vectorized Reduce Operator Tree: Group By Operator aggregations: sum(VALUE._col0)