diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7f4afd9..5607f3f 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1658,6 +1658,9 @@ HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", false, "This flag should be set to true to enable vectorized mode of query execution.\n" + "The default value is false."), + HIVE_VECTORIZATION_REDUCE_ENABLED("hive.vectorized.execution.reduce.enabled", true, + "This flag should be set to true to enable vectorized mode of the reduce-side of query execution.\n" + + "The default value is true."), HIVE_VECTORIZATION_GROUPBY_CHECKINTERVAL("hive.vectorized.groupby.checkinterval", 100000, "Number of entries added to the group by aggregation hash before a recomputation of average entry size is performed."), HIVE_VECTORIZATION_GROUPBY_MAXENTRIES("hive.vectorized.groupby.maxentries", 1000000, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java new file mode 100644 index 0000000..d9c16dc --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +/** + * Class to keep information on a set of typed vector columns. Used by + * other classes to efficiently access the set of columns. + */ +public class VectorColumnSetInfo { + + // For simpler access, we make these members protected instead of + // providing get methods. + + /** + * indices of LONG primitive keys. + */ + protected int[] longIndices; + + /** + * indices of DOUBLE primitive keys. + */ + protected int[] doubleIndices; + + /** + * indices of string (byte[]) primitive keys. + */ + protected int[] stringIndices; + + /** + * indices of decimal primitive keys. + */ + protected int[] decimalIndices; + + /** + * Helper class for looking up a key value based on key index. + */ + public class KeyLookupHelper { + public int longIndex; + public int doubleIndex; + public int stringIndex; + public int decimalIndex; + + private static final int INDEX_UNUSED = -1; + + private void resetIndices() { + this.longIndex = this.doubleIndex = this.stringIndex = this.decimalIndex = INDEX_UNUSED; + } + public void setLong(int index) { + resetIndices(); + this.longIndex= index; + } + + public void setDouble(int index) { + resetIndices(); + this.doubleIndex = index; + } + + public void setString(int index) { + resetIndices(); + this.stringIndex = index; + } + + public void setDecimal(int index) { + resetIndices(); + this.decimalIndex = index; + } + } + + /** + * Lookup vector to map from key index to primitive type index. + */ + protected KeyLookupHelper[] indexLookup; + + private int keyCount; + private int addIndex; + + protected int longIndicesIndex; + protected int doubleIndicesIndex; + protected int stringIndicesIndex; + protected int decimalIndicesIndex; + + protected VectorColumnSetInfo(int keyCount) { + this.keyCount = keyCount; + this.addIndex = 0; + + // We'll over allocate and then shrink the array for each type + longIndices = new int[this.keyCount]; + longIndicesIndex = 0; + doubleIndices = new int[this.keyCount]; + doubleIndicesIndex = 0; + stringIndices = new int[this.keyCount]; + stringIndicesIndex = 0; + decimalIndices = new int[this.keyCount]; + decimalIndicesIndex = 0; + indexLookup = new KeyLookupHelper[this.keyCount]; + } + + protected void addKey(String outputType) throws HiveException { + indexLookup[addIndex] = new KeyLookupHelper(); + if (VectorizationContext.isIntFamily(outputType) || + VectorizationContext.isDatetimeFamily(outputType)) { + longIndices[longIndicesIndex] = addIndex; + indexLookup[addIndex].setLong(longIndicesIndex); + ++longIndicesIndex; + } else if (VectorizationContext.isFloatFamily(outputType)) { + doubleIndices[doubleIndicesIndex] = addIndex; + indexLookup[addIndex].setDouble(doubleIndicesIndex); + ++doubleIndicesIndex; + } else if (VectorizationContext.isStringFamily(outputType)) { + stringIndices[stringIndicesIndex]= addIndex; + indexLookup[addIndex].setString(stringIndicesIndex); + ++stringIndicesIndex; + } else if (VectorizationContext.isDecimalFamily(outputType)) { + decimalIndices[decimalIndicesIndex]= addIndex; + indexLookup[addIndex].setDecimal(decimalIndicesIndex); + ++decimalIndicesIndex; + } + else { + throw new HiveException("Unsuported vector output type: " + outputType); + } + addIndex++; + } + + protected void finishAdding() { + longIndices = Arrays.copyOf(longIndices, longIndicesIndex); + doubleIndices = Arrays.copyOf(doubleIndices, doubleIndicesIndex); + stringIndices = Arrays.copyOf(stringIndices, stringIndicesIndex); + decimalIndices = Arrays.copyOf(decimalIndices, decimalIndicesIndex); + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 07d7b8b..32e821d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -32,8 +32,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.KeyWrapper; +import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; @@ -47,13 +50,17 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.DataOutputBuffer; /** * Vectorized GROUP BY operator implementation. Consumes the vectorized input and * stores the aggregate operators' intermediate states. Emits row mode output. * */ -public class VectorGroupByOperator extends GroupByOperator { +public class VectorGroupByOperator extends GroupByOperator implements VectorizationContextRegion { private static final Log LOG = LogFactory.getLog( VectorGroupByOperator.class.getName()); @@ -70,6 +77,13 @@ */ private VectorExpression[] keyExpressions; + private boolean isVectorOutput; + + // Create a new outgoing vectorization context because column name map will change. + private VectorizationContext vOutContext; + + private String fileKey; + private transient VectorExpressionWriter[] keyOutputWriters; /** @@ -85,11 +99,18 @@ private transient Object[] forwardCache; + private transient VectorizedRowBatch outputBatch; + private transient VectorizedRowBatchCtx vrbCtx; + + private transient VectorColumnAssign[] vectorColumnAssign; + /** - * Interface for processing mode: global, hash or streaming + * Interface for processing mode: global, hash, unsorted streaming, or group batch */ private static interface IProcessingMode { public void initialize(Configuration hconf) throws HiveException; + public void startGroup() throws HiveException; + public void endGroup() throws HiveException; public void processBatch(VectorizedRowBatch batch) throws HiveException; public void close(boolean aborted) throws HiveException; } @@ -98,6 +119,15 @@ * Base class for all processing modes */ private abstract class ProcessingModeBase implements IProcessingMode { + + // Overridden and used in sorted reduce group batch processing mode. + public void startGroup() throws HiveException { + // Do nothing. + } + public void endGroup() throws HiveException { + // Do nothing. + } + /** * Evaluates the aggregators on the current batch. * The aggregationBatchInfo must have been prepared @@ -170,7 +200,7 @@ public void processBatch(VectorizedRowBatch batch) throws HiveException { @Override public void close(boolean aborted) throws HiveException { if (!aborted) { - flushSingleRow(null, aggregationBuffers); + writeSingleRow(null, aggregationBuffers); } } } @@ -426,7 +456,7 @@ private void flush(boolean all) throws HiveException { while(iter.hasNext()) { Map.Entry pair = iter.next(); - flushSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue()); + writeSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue()); if (!all) { iter.remove(); @@ -501,20 +531,21 @@ private void checkHashModeEfficiency() throws HiveException { if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) { flush(true); - changeToStreamingMode(); + changeToUnsortedStreamingMode(); } } } } /** - * Streaming processing mode. Intermediate values are flushed each time key changes. - * In this mode we're relying on the MR shuffle and merge the intermediates in the reduce. + * Unsorted streaming processing mode. Each input VectorizedRowBatch may have + * a mix of different keys (hence unsorted). Intermediate values are flushed + * each time key changes. */ - private class ProcessingModeStreaming extends ProcessingModeBase { + private class ProcessingModeUnsortedStreaming extends ProcessingModeBase { /** - * The aggreagation buffers used in streaming mode + * The aggregation buffers used in streaming mode */ private VectorAggregationBufferRow currentStreamingAggregators; @@ -557,7 +588,7 @@ public void free(VectorAggregationBufferRow t) { // Nothing to do } }); - LOG.info("using streaming aggregation processing mode"); + LOG.info("using unsorted streaming aggregation processing mode"); } @Override @@ -601,7 +632,7 @@ public void processBatch(VectorizedRowBatch batch) throws HiveException { // Now flush/forward all keys/rows, except the last (current) one for (int i = 0; i < flushMark; ++i) { - flushSingleRow(keysToFlush[i], rowsToFlush[i]); + writeSingleRow(keysToFlush[i], rowsToFlush[i]); rowsToFlush[i].reset(); streamAggregationBufferRowPool.putInPool(rowsToFlush[i]); } @@ -610,7 +641,79 @@ public void processBatch(VectorizedRowBatch batch) throws HiveException { @Override public void close(boolean aborted) throws HiveException { if (!aborted && null != streamingKey) { - flushSingleRow(streamingKey, currentStreamingAggregators); + writeSingleRow(streamingKey, currentStreamingAggregators); + } + } + } + + /** + * Sorted reduce group batch processing mode. Each input VectorizedRowBatch will have the + * same key. On endGroup (or close), the intermediate values are flushed. + */ + private class ProcessingModeGroupBatches extends ProcessingModeBase { + + private boolean inGroup; + private boolean first; + + /** + * The group vector key helper. + */ + VectorGroupKeyHelper groupKeyHelper; + + /** + * The group vector aggregation buffers. + */ + private VectorAggregationBufferRow groupAggregators; + + /** + * Buffer to hold string values. + */ + private DataOutputBuffer buffer; + + @Override + public void initialize(Configuration hconf) throws HiveException { + inGroup = false; + groupKeyHelper = new VectorGroupKeyHelper(keyExpressions.length); + groupKeyHelper.init(keyExpressions); + groupAggregators = allocateAggregationBuffer(); + buffer = new DataOutputBuffer(); + LOG.info("using sorted group batch aggregation processing mode"); + } + + @Override + public void startGroup() throws HiveException { + inGroup = true; + first = true; + } + + @Override + public void endGroup() throws HiveException { + if (inGroup && !first) { + writeGroupRow(groupAggregators, buffer); + groupAggregators.reset(); + } + inGroup = false; + } + + @Override + public void processBatch(VectorizedRowBatch batch) throws HiveException { + assert(inGroup); + if (first) { + // Copy the group key to output batch now. We'll copy in the aggregates at the end of the group. + first = false; + groupKeyHelper.copyGroupKey(batch, outputBatch, buffer); + } + + // Aggregate this batch. + for (int i = 0; i < aggregators.length; ++i) { + aggregators[i].aggregateInput(groupAggregators.getAggregationBuffer(i), batch); + } + } + + @Override + public void close(boolean aborted) throws HiveException { + if (!aborted && inGroup && !first) { + writeGroupRow(groupAggregators, buffer); } } } @@ -635,6 +738,18 @@ public VectorGroupByOperator(VectorizationContext vContext, OperatorDesc conf) AggregationDesc aggDesc = aggrDesc.get(i); aggregators[i] = vContext.getAggregatorExpression(aggDesc); } + + isVectorOutput = desc.getVectorExtra().isVectorOutput(); + + List outColNames = desc.getOutputColumnNames(); + Map mapOutCols = new HashMap(outColNames.size()); + int outColIndex = 0; + for(String outCol: outColNames) { + mapOutCols.put(outCol, outColIndex++); + } + vOutContext = new VectorizationContext(mapOutCols, outColIndex); + vOutContext.setFileKey(vContext.getFileKey() + "/_GROUPBY_"); + fileKey = vOutContext.getFileKey(); } public VectorGroupByOperator() { @@ -662,13 +777,23 @@ protected void initializeOp(Configuration hconf) throws HiveException { objectInspectors.add(aggregators[i].getOutputObjectInspector()); } - keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); - aggregationBatchInfo = new VectorAggregationBufferBatch(); - aggregationBatchInfo.compileAggregationBatchInfo(aggregators); - + if (!conf.getVectorExtra().isVectorGroupBatches()) { + // These data structures are only used by the map-side processing modes. + keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); + aggregationBatchInfo = new VectorAggregationBufferBatch(); + aggregationBatchInfo.compileAggregationBatchInfo(aggregators); + } + LOG.warn("VectorGroupByOperator is vector output " + isVectorOutput); List outputFieldNames = conf.getOutputColumnNames(); outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector( outputFieldNames, objectInspectors); + if (isVectorOutput) { + vrbCtx = new VectorizedRowBatchCtx(); + vrbCtx.init(hconf, fileKey, (StructObjectInspector) outputObjInspector); + outputBatch = vrbCtx.createVectorizedRowBatch(); + vectorColumnAssign = VectorColumnAssignFactory.buildAssigners( + outputBatch, outputObjInspector, vOutContext.getColumnMap(), conf.getOutputColumnNames()); + } } catch (HiveException he) { throw he; @@ -678,32 +803,43 @@ protected void initializeOp(Configuration hconf) throws HiveException { initializeChildren(hconf); - forwardCache =new Object[keyExpressions.length + aggregators.length]; + forwardCache = new Object[keyExpressions.length + aggregators.length]; - if (keyExpressions.length == 0) { + if (conf.getVectorExtra().isVectorGroupBatches()) { + // Sorted GroupBy of vector batches where an individual batch has the same group key (e.g. reduce). + processingMode = this.new ProcessingModeGroupBatches(); + } else if (keyExpressions.length == 0) { processingMode = this.new ProcessingModeGlobalAggregate(); - } - else { - //TODO: consider if parent can offer order guarantees - // If input is sorted, is more efficient to use the streaming mode + } else { + // We start in hash mode and may dynamically switch to unsorted stream mode. processingMode = this.new ProcessingModeHashAggregate(); } processingMode.initialize(hconf); } /** - * changes the processing mode to streaming + * changes the processing mode to unsorted streaming * This is done at the request of the hash agg mode, if the number of keys * exceeds the minReductionHashAggr factor * @throws HiveException */ - private void changeToStreamingMode() throws HiveException { - processingMode = this.new ProcessingModeStreaming(); + private void changeToUnsortedStreamingMode() throws HiveException { + processingMode = this.new ProcessingModeUnsortedStreaming(); processingMode.initialize(null); LOG.trace("switched to streaming mode"); } @Override + public void startGroup() throws HiveException { + processingMode.startGroup(); + } + + @Override + public void endGroup() throws HiveException { + processingMode.endGroup(); + } + + @Override public void processOp(Object row, int tag) throws HiveException { VectorizedRowBatch batch = (VectorizedRowBatch) row; @@ -719,26 +855,72 @@ public void processOp(Object row, int tag) throws HiveException { * @param agg * @throws HiveException */ - private void flushSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg) + private void writeSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg) throws HiveException { int fi = 0; - for (int i = 0; i < keyExpressions.length; ++i) { - forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue ( - kw, i, keyOutputWriters[i]); + if (!isVectorOutput) { + // Output row. + for (int i = 0; i < keyExpressions.length; ++i) { + forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue ( + kw, i, keyOutputWriters[i]); + } + for (int i = 0; i < aggregators.length; ++i) { + forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)); + } + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("forwarding keys: %s: %s", + kw, Arrays.toString(forwardCache))); + } + forward(forwardCache, outputObjInspector); + } else { + // Output keys and aggregates into the output batch. + for (int i = 0; i < keyExpressions.length; ++i) { + vectorColumnAssign[fi++].assignObjectValue(keyWrappersBatch.getWritableKeyValue ( + kw, i, keyOutputWriters[i]), outputBatch.size); + } + for (int i = 0; i < aggregators.length; ++i) { + vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput( + agg.getAggregationBuffer(i)), outputBatch.size); + } + ++outputBatch.size; + if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { + flushOutput(); + } } + } + + /** + * Emits a (reduce) group row, made from the key (copied in at the beginning of the group) and + * the row aggregation buffers values + * @param agg + * @param buffer + * @throws HiveException + */ + private void writeGroupRow(VectorAggregationBufferRow agg, DataOutputBuffer buffer) + throws HiveException { + int fi = keyExpressions.length; // Start after group keys. for (int i = 0; i < aggregators.length; ++i) { - forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)); + vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput( + agg.getAggregationBuffer(i)), outputBatch.size); } - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("forwarding keys: %s: %s", - kw, Arrays.toString(forwardCache))); + ++outputBatch.size; + if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { + flushOutput(); + buffer.reset(); } - forward(forwardCache, outputObjInspector); + } + + private void flushOutput() throws HiveException { + forward(outputBatch, null); + outputBatch.reset(); } @Override public void closeOp(boolean aborted) throws HiveException { processingMode.close(aborted); + if (!aborted && isVectorOutput && outputBatch.size > 0) { + flushOutput(); + } } static public String getOperatorName() { @@ -761,4 +943,8 @@ public void setAggregators(VectorAggregateExpression[] aggregators) { this.aggregators = aggregators; } + @Override + public VectorizationContext getOuputVectorizationContext() { + return vOutContext; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java new file mode 100644 index 0000000..51beb7c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.io.DataOutputBuffer; + +/** + * Class for copying the group key from an input batch to an output batch. + */ +public class VectorGroupKeyHelper extends VectorColumnSetInfo { + + public VectorGroupKeyHelper(int keyCount) { + super(keyCount); + } + + void init(VectorExpression[] keyExpressions) throws HiveException { + // Inspect the output type of each key expression. + for(int i=0; i < keyExpressions.length; ++i) { + addKey(keyExpressions[i].getOutputType()); + } + finishAdding(); + } + + public void copyGroupKey(VectorizedRowBatch inputBatch, VectorizedRowBatch outputBatch, + DataOutputBuffer buffer) throws HiveException { + // Grab the key at index 0. We don't care about selected or repeating since all keys in the input batch are the same. + for(int i = 0; i< longIndices.length; ++i) { + int keyIndex = longIndices[i]; + LongColumnVector inputColumnVector = (LongColumnVector) inputBatch.cols[keyIndex]; + LongColumnVector outputColumnVector = (LongColumnVector) outputBatch.cols[keyIndex]; + if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) { + outputColumnVector.vector[outputBatch.size] = inputColumnVector.vector[0]; + } else if (inputColumnVector.noNulls ){ + outputColumnVector.noNulls = false; + outputColumnVector.isNull[outputBatch.size] = true; + } else { + outputColumnVector.isNull[outputBatch.size] = true; + } + } + for(int i=0;i> scratchColumnVectorTypes; + if (mapredWork.getMapWork() != null) { + scratchColumnVectorTypes = mapredWork.getMapWork().getScratchColumnVectorTypes(); + } else { + scratchColumnVectorTypes = mapredWork.getReduceWork().getScratchColumnVectorTypes(); + } + columnTypeMap = scratchColumnVectorTypes.get(fileKey); this.rowOI= rowOI; this.rawRowOI = rowOI; } @@ -566,13 +572,17 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveExcepti } } - private void addScratchColumnsToBatch(VectorizedRowBatch vrb) { + private void addScratchColumnsToBatch(VectorizedRowBatch vrb) throws HiveException { if (columnTypeMap != null && !columnTypeMap.isEmpty()) { int origNumCols = vrb.numCols; int newNumCols = vrb.cols.length+columnTypeMap.keySet().size(); vrb.cols = Arrays.copyOf(vrb.cols, newNumCols); for (int i = origNumCols; i < newNumCols; i++) { - vrb.cols[i] = allocateColumnVector(columnTypeMap.get(i), + String typeName = columnTypeMap.get(i); + if (typeName == null) { + throw new HiveException("No type name for column " + i); + } + vrb.cols[i] = allocateColumnVector(typeName, VectorizedRowBatch.DEFAULT_SIZE); } vrb.numCols = vrb.cols.length; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index ba4ac69..df7cadb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; +import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; @@ -61,6 +62,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; @@ -71,6 +73,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.VectorGroupByExtra; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.UDFAcos; import org.apache.hadoop.hive.ql.udf.UDFAsin; @@ -284,23 +287,26 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SemanticException { Task currTask = (Task) nd; if (currTask instanceof MapRedTask) { - convertMapWork(((MapRedTask) currTask).getWork().getMapWork()); + convertMapWork(((MapRedTask) currTask).getWork().getMapWork(), false); } else if (currTask instanceof TezTask) { TezWork work = ((TezTask) currTask).getWork(); for (BaseWork w: work.getAllWork()) { if (w instanceof MapWork) { - convertMapWork((MapWork)w); + convertMapWork((MapWork) w, true); } else if (w instanceof ReduceWork) { // We are only vectorizing Reduce under Tez. - convertReduceWork((ReduceWork)w); + if (HiveConf.getBoolVar(pctx.getConf(), + HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) { + convertReduceWork((ReduceWork) w); + } } } } return null; } - private void convertMapWork(MapWork mapWork) throws SemanticException { - boolean ret = validateMapWork(mapWork); + private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException { + boolean ret = validateMapWork(mapWork, isTez); if (ret) { vectorizeMapWork(mapWork); } @@ -313,7 +319,8 @@ private void addMapWorkRules(Map opRules, NodeProcessor np) + ReduceSinkOperator.getOperatorName()), np); } - private boolean validateMapWork(MapWork mapWork) throws SemanticException { + private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticException { + LOG.info("Validating MapWork..."); // Validate the input format for (String path : mapWork.getPathToPartitionInfo().keySet()) { @@ -327,7 +334,7 @@ private boolean validateMapWork(MapWork mapWork) throws SemanticException { } } Map opRules = new LinkedHashMap(); - MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(); + MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(isTez); addMapWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -411,9 +418,12 @@ private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork) throws Sema private void addReduceWorkRules(Map opRules, NodeProcessor np) { opRules.put(new RuleRegExp("R1", ExtractOperator.getOperatorName() + ".*"), np); opRules.put(new RuleRegExp("R2", GroupByOperator.getOperatorName() + ".*"), np); + opRules.put(new RuleRegExp("R3", SelectOperator.getOperatorName() + ".*"), np); } private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException { + LOG.info("Validating ReduceWork..."); + // Validate input to ReduceWork. if (!getOnlyStructObjectInspectors(reduceWork)) { return false; @@ -481,16 +491,21 @@ private void vectorizeReduceWork(ReduceWork reduceWork) throws SemanticException class MapWorkValidationNodeProcessor implements NodeProcessor { + private boolean isTez; + + public MapWorkValidationNodeProcessor(boolean isTez) { + this.isTez = isTez; + } + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { for (Node n : stack) { Operator op = (Operator) n; - if ((op.getType().equals(OperatorType.REDUCESINK) || op.getType().equals(OperatorType.FILESINK)) && - op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) { + if (nonVectorizableChildOfGroupBy(op)) { return new Boolean(true); } - boolean ret = validateMapWorkOperator(op); + boolean ret = validateMapWorkOperator(op, isTez); if (!ret) { LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized."); return new Boolean(false); @@ -507,6 +522,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { for (Node n : stack) { Operator op = (Operator) n; + if (nonVectorizableChildOfGroupBy(op)) { + return new Boolean(true); + } boolean ret = validateReduceWorkOperator(op); if (!ret) { LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized."); @@ -573,21 +591,6 @@ public VectorizationContext walkStackToFindVectorizationContext(Stack stac return vContext; } - public Boolean nonVectorizableChildOfGroupBy(Operator op) { - Operator currentOp = op; - while (currentOp.getParentOperators().size() > 0) { - currentOp = currentOp.getParentOperators().get(0); - if (currentOp.getType().equals(OperatorType.GROUPBY)) { - // No need to vectorize - if (!opsDone.contains(op)) { - opsDone.add(op); - } - return true; - } - } - return false; - } - public Operator doVectorize(Operator op, VectorizationContext vContext) throws SemanticException { Operator vectorOp = op; @@ -659,9 +662,13 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, assert vContext != null; - // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs. So, don't vectorize - // any operators below GROUPBY. + // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't + // vectorize the operators below it. if (nonVectorizableChildOfGroupBy(op)) { + // No need to vectorize + if (!opsDone.contains(op)) { + opsDone.add(op); + } return null; } @@ -713,13 +720,22 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, assert vContext != null; - // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs. So, don't vectorize - // any operators below GROUPBY. + // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't + // vectorize the operators below it. if (nonVectorizableChildOfGroupBy(op)) { + // No need to vectorize + if (!opsDone.contains(op)) { + opsDone.add(op); + } return null; } Operator vectorOp = doVectorize(op, vContext); + if (vectorOp instanceof VectorGroupByOperator) { + VectorGroupByOperator groupBy = (VectorGroupByOperator) vectorOp; + VectorGroupByExtra vectorExtra = groupBy.getConf().getVectorExtra(); + vectorExtra.setVectorGroupBatches(true); + } if (saveRootVectorOp && op != vectorOp) { rootVectorOp = vectorOp; } @@ -766,7 +782,7 @@ public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { return pctx; } - boolean validateMapWorkOperator(Operator op) { + boolean validateMapWorkOperator(Operator op, boolean isTez) { boolean ret = false; switch (op.getType()) { case MAPJOIN: @@ -777,7 +793,7 @@ boolean validateMapWorkOperator(Operator op) { } break; case GROUPBY: - ret = validateGroupByOperator((GroupByOperator) op); + ret = validateGroupByOperator((GroupByOperator) op, false, isTez); break; case FILTER: ret = validateFilterOperator((FilterOperator) op); @@ -808,6 +824,17 @@ boolean validateReduceWorkOperator(Operator op) { case EXTRACT: ret = validateExtractOperator((ExtractOperator) op); break; + case MAPJOIN: + // Does MAPJOIN actually get planned in Reduce? + if (op instanceof MapJoinOperator) { + ret = validateMapJoinOperator((MapJoinOperator) op); + } else if (op instanceof SMBMapJoinOperator) { + ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op); + } + break; + case GROUPBY: + ret = validateGroupByOperator((GroupByOperator) op, true, true); + break; case FILTER: ret = validateFilterOperator((FilterOperator) op); break; @@ -828,6 +855,23 @@ boolean validateReduceWorkOperator(Operator op) { return ret; } + public Boolean nonVectorizableChildOfGroupBy(Operator op) { + Operator currentOp = op; + while (currentOp.getParentOperators().size() > 0) { + currentOp = currentOp.getParentOperators().get(0); + if (currentOp.getType().equals(OperatorType.GROUPBY)) { + GroupByDesc desc = (GroupByDesc)currentOp.getConf(); + boolean isVectorOutput = desc.getVectorExtra().isVectorOutput(); + if (isVectorOutput) { + // This GROUP BY does vectorize its output. + return false; + } + return true; + } + } + return false; + } + private boolean validateSMBMapJoinOperator(SMBMapJoinOperator op) { SMBJoinDesc desc = op.getConf(); // Validation is the same as for map join, since the 'small' tables are not vectorized @@ -878,16 +922,49 @@ private boolean validateFilterOperator(FilterOperator op) { return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.FILTER); } - private boolean validateGroupByOperator(GroupByOperator op) { - if (op.getConf().isGroupingSetsPresent()) { + private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTez) { + GroupByDesc desc = op.getConf(); + VectorGroupByExtra vectorExtra = desc.getVectorExtra(); + + if (desc.isGroupingSetsPresent()) { LOG.warn("Grouping sets not supported in vector mode"); return false; } - boolean ret = validateExprNodeDesc(op.getConf().getKeys()); + boolean ret = validateExprNodeDesc(desc.getKeys()); + if (!ret) { + return false; + } + ret = validateAggregationDesc(desc.getAggregators()); if (!ret) { return false; } - return validateAggregationDesc(op.getConf().getAggregators()); + boolean isVectorOutput = isTez && aggregatorsOutputIsPrimitive(desc.getAggregators()); + LOG.warn("All aggregators have primitive output " + isVectorOutput); + vectorExtra.setVectorOutput(isVectorOutput); + if (isReduce) { + if (desc.isDistinct()) { + LOG.warn("Distinct not supported in reduce vector mode"); + return false; + } + // Sort-based GroupBy? + if (desc.getMode() != GroupByDesc.Mode.COMPLETE && + desc.getMode() != GroupByDesc.Mode.PARTIAL1 && + desc.getMode() != GroupByDesc.Mode.PARTIAL2 && + desc.getMode() != GroupByDesc.Mode.MERGEPARTIAL) { + LOG.warn("Reduce vector mode not supported when input for GROUP BY not sorted"); + return false; + } + if (desc.getGroupKeyNotReductionKey()) { + LOG.warn("Reduce vector mode not supported when group key is not reduction key"); + return false; + } + if (!isVectorOutput) { + LOG.warn("Reduce vector mode only supported when aggregate outputs are primitive types"); + return false; + } + vectorExtra.setVectorGroupBatches(true); + } + return true; } private boolean validateExtractOperator(ExtractOperator op) { @@ -937,9 +1014,7 @@ private boolean validateExprNodeDescRecursive(ExprNodeDesc desc) { String typeName = desc.getTypeInfo().getTypeName(); boolean ret = validateDataType(typeName); if (!ret) { - if (LOG.isDebugEnabled()) { - LOG.debug("Cannot vectorize " + desc.toString() + " of type " + typeName); - } + LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName); return false; } if (desc instanceof ExprNodeGenericFuncDesc) { @@ -972,12 +1047,11 @@ boolean validateExprNodeDesc(ExprNodeDesc desc, VectorExpressionDescriptor.Mode VectorizationContext vc = new ValidatorVectorizationContext(); if (vc.getVectorExpression(desc, mode) == null) { // TODO: this cannot happen - VectorizationContext throws in such cases. + LOG.info("getVectorExpression returned null"); return false; } } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to vectorize", e); - } + LOG.info("Failed to vectorize", e); return false; } return true; @@ -1006,6 +1080,27 @@ private boolean validateAggregationDesc(AggregationDesc aggDesc) { return true; } + private boolean aggregatorsOutputIsPrimitive(List descs) { + for (AggregationDesc d : descs) { + boolean ret = aggregatorsOutputIsPrimitive(d); + if (!ret) { + return false; + } + } + return true; + } + + private boolean aggregatorsOutputIsPrimitive(AggregationDesc aggDesc) { + String name = aggDesc.getGenericUDAFName().toLowerCase(); + // For now, look for aggregators known to return primitive types... + if (name.equals("min") || + name.equals("max") || + name.equals("sum")) { + return true; + } + return false; + } + private boolean validateDataType(String type) { return supportedDataTypesPattern.matcher(type.toLowerCase()).matches(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java index 4475b76..74793e6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java @@ -69,7 +69,11 @@ transient private boolean isDistinct; private boolean dontResetAggrsDistinct; + // Extra parameters only for vectorization. + private VectorGroupByExtra vectorExtra; + public GroupByDesc() { + vectorExtra = new VectorGroupByExtra(); } public GroupByDesc( @@ -102,6 +106,7 @@ public GroupByDesc( final boolean groupingSetsPresent, final int groupingSetsPosition, final boolean isDistinct) { + vectorExtra = new VectorGroupByExtra(); this.mode = mode; this.outputColumnNames = outputColumnNames; this.keys = keys; @@ -116,6 +121,14 @@ public GroupByDesc( this.isDistinct = isDistinct; } + public void setVectorExtra(VectorGroupByExtra vectorExtra) { + this.vectorExtra = vectorExtra; + } + + public VectorGroupByExtra getVectorExtra() { + return vectorExtra; + } + public Mode getMode() { return mode; } @@ -268,14 +281,6 @@ public void setGroupingSetPosition(int groupingSetPosition) { this.groupingSetPosition = groupingSetPosition; } - public boolean isDistinct() { - return isDistinct; - } - - public void setDistinct(boolean isDistinct) { - this.isDistinct = isDistinct; - } - public boolean isDontResetAggrsDistinct() { return dontResetAggrsDistinct; } @@ -283,4 +288,13 @@ public boolean isDontResetAggrsDistinct() { public void setDontResetAggrsDistinct(boolean dontResetAggrsDistinct) { this.dontResetAggrsDistinct = dontResetAggrsDistinct; } + + public boolean isDistinct() { + return isDistinct; + } + + public void setDistinct(boolean isDistinct) { + this.isDistinct = isDistinct; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/VectorAbstractExtra.java ql/src/java/org/apache/hadoop/hive/ql/plan/VectorAbstractExtra.java new file mode 100644 index 0000000..86e737b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/VectorAbstractExtra.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +public class VectorAbstractExtra implements VectorExtra { + + @Override + public Object clone() throws CloneNotSupportedException { + throw new CloneNotSupportedException("clone not supported"); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/VectorExtra.java ql/src/java/org/apache/hadoop/hive/ql/plan/VectorExtra.java new file mode 100644 index 0000000..22ef7ce --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/VectorExtra.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.io.Serializable; + +public interface VectorExtra extends Serializable, Cloneable { + public Object clone() throws CloneNotSupportedException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByExtra.java ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByExtra.java new file mode 100644 index 0000000..f654c0b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/VectorGroupByExtra.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +/** + * VectorGroupByExtra. + * + * Extra parameters beyond GroupByDesc just for the VectorGroupByOperator. + * + * We don't extend GroupByDesc because the base OperatorDesc doesn't support + * clone and adding it is a lot work for little gain. + */ +public class VectorGroupByExtra extends VectorAbstractExtra { + + private static long serialVersionUID = 1L; + + private boolean isVectorGroupBatches; + private boolean isVectorOutput; + + public VectorGroupByExtra() { + this.isVectorGroupBatches = false; + } + + public boolean isVectorGroupBatches() { + return isVectorGroupBatches; + } + + public void setVectorGroupBatches(boolean isVectorGroupBatches) { + this.isVectorGroupBatches = isVectorGroupBatches; + } + + public boolean isVectorOutput() { + return isVectorOutput; + } + + public void setVectorOutput(boolean isVectorOutput) { + this.isVectorOutput = isVectorOutput; + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java index 0a0e0c3..fbab3af 100644 --- ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java +++ ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestVectorizer.java @@ -107,7 +107,7 @@ public void testAggregateOnUDF() throws HiveException { gbyOp.setConf(desc); Vectorizer v = new Vectorizer(); - Assert.assertTrue(v.validateMapWorkOperator(gbyOp)); + Assert.assertTrue(v.validateMapWorkOperator(gbyOp, false)); VectorGroupByOperator vectorOp = (VectorGroupByOperator) v.vectorizeOperator(gbyOp, vContext); Assert.assertEquals(VectorUDAFSumLong.class, vectorOp.getAggregators()[0].getClass()); VectorUDAFSumLong udaf = (VectorUDAFSumLong) vectorOp.getAggregators()[0]; @@ -150,7 +150,7 @@ public void testValidateNestedExpressions() { /** * prepareAbstractMapJoin prepares a join operator descriptor, used as helper by SMB and Map join tests. */ - private void prepareAbstractMapJoin(AbstractMapJoinOperator mop, MapJoinDesc mjdesc) { + private void prepareAbstractMapJoin(AbstractMapJoinOperator map, MapJoinDesc mjdesc) { mjdesc.setPosBigTable(0); List expr = new ArrayList(); expr.add(new ExprNodeColumnDesc(Integer.class, "col1", "T", false)); @@ -180,14 +180,14 @@ private void prepareAbstractMapJoin(AbstractMapJoinOperator