diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java index 94eaf56..defaf90 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java @@ -150,6 +150,25 @@ public void init(StructObjectInspector structObjectInspector, List proj } /* + * Initialize using an ObjectInspector array and a column projection array. + */ + public void init(TypeInfo[] typeInfos, int[] projectedColumns) + throws HiveException { + + final int count = typeInfos.length; + allocateArrays(count); + + for (int i = 0; i < count; i++) { + + int projectionColumnNum = projectedColumns[i]; + + TypeInfo typeInfo = typeInfos[i]; + + initEntry(i, projectionColumnNum, typeInfo); + } + } + + /* * Initialize using data type names. * No projection -- the column range 0 .. types.size()-1 */ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java index 8133aef..76cfb4a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java @@ -102,36 +102,29 @@ //--------------------------------------------------------------------------- // Whether there is to be a tag added to the end of each key and the tag value. - private transient boolean reduceSkipTag; - private transient byte reduceTagByte; + protected transient boolean reduceSkipTag; + protected transient byte reduceTagByte; // Binary sortable key serializer. protected transient BinarySortableSerializeWrite keyBinarySortableSerializeWrite; - // The serialized all null key and its hash code. - private transient byte[] nullBytes; - private transient int nullKeyHashCode; - // Lazy binary value serializer. - private transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite; + protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite; // This helper object serializes LazyBinary format reducer values from columns of a row // in a vectorized row batch. - private transient VectorSerializeRow valueVectorSerializeRow; + protected transient VectorSerializeRow valueVectorSerializeRow; // The output buffer used to serialize a value into. - private transient Output valueOutput; + protected transient Output valueOutput; // The hive key and bytes writable value needed to pass the key and value to the collector. - private transient HiveKey keyWritable; - private transient BytesWritable valueBytesWritable; + protected transient HiveKey keyWritable; + protected transient BytesWritable valueBytesWritable; // Where to write our key and value pairs. private transient OutputCollector out; - // The object that determines equal key series. - protected transient VectorKeySeriesSerialized serializedKeySeries; - private transient long numRows = 0; private transient long cntr = 1; private transient long logEveryNRows = 0; @@ -158,6 +151,8 @@ public VectorReduceSinkCommonOperator(CompilationOpContext ctx, VectorizationContext vContext, OperatorDesc conf) throws HiveException { this(ctx); + LOG.info("VectorReduceSinkCommonOperator constructor"); + ReduceSinkDesc desc = (ReduceSinkDesc) conf; this.conf = desc; vectorDesc = desc.getVectorDesc(); @@ -247,6 +242,48 @@ public VectorReduceSinkCommonOperator(CompilationOpContext ctx, protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); + // UNDONE + LOG.debug("*NEW* useUniformHash " + vectorReduceSinkInfo.getUseUniformHash()); + LOG.debug("*NEW* bucketNumKind " + vectorReduceSinkInfo.getBucketNumKind()); + LOG.debug("*NEW* partitionHashCodeKind " + vectorReduceSinkInfo.getPartitionHashCodeKind()); + + LOG.debug("*NEW* reduceSinkKeyColumnMap " + + (vectorReduceSinkInfo.getReduceSinkKeyColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyColumnMap()))); + LOG.debug("*NEW* reduceSinkKeyTypeInfos " + + (vectorReduceSinkInfo.getReduceSinkKeyTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyTypeInfos()))); + LOG.debug("*NEW* reduceSinkKeyColumnVectorTypes " + + (vectorReduceSinkInfo.getReduceSinkKeyColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyColumnVectorTypes()))); + LOG.debug("*NEW* reduceSinkKeyExpressions " + + (vectorReduceSinkInfo.getReduceSinkKeyExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyExpressions()))); + + LOG.debug("*NEW* reduceSinkValueColumnMap " + + (vectorReduceSinkInfo.getReduceSinkValueColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueColumnMap()))); + LOG.debug("*NEW* reduceSinkValueTypeInfos " + + (vectorReduceSinkInfo.getReduceSinkValueTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueTypeInfos()))); + LOG.debug("*NEW* reduceSinkValueColumnVectorTypes " + + (vectorReduceSinkInfo.getReduceSinkValueColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueColumnVectorTypes()))); + LOG.debug("*NEW* reduceSinkValueExpressions " + + (vectorReduceSinkInfo.getReduceSinkValueExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueExpressions()))); + + LOG.debug("*NEW* reduceSinkBucketColumnMap " + + (vectorReduceSinkInfo.getReduceSinkBucketColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketColumnMap()))); + LOG.debug("*NEW* reduceSinkBucketTypeInfos " + + (vectorReduceSinkInfo.getReduceSinkBucketTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketTypeInfos()))); + LOG.debug("*NEW* reduceSinkBucketColumnVectorTypes " + + (vectorReduceSinkInfo.getReduceSinkBucketColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketColumnVectorTypes()))); + LOG.debug("*NEW* reduceSinkBucketExpressions " + + (vectorReduceSinkInfo.getReduceSinkBucketExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketExpressions()))); + + LOG.debug("*NEW* reduceSinkPartitionColumnMap " + + (vectorReduceSinkInfo.getReduceSinkPartitionColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionColumnMap()))); + LOG.debug("*NEW* reduceSinkPartitionTypeInfos " + + (vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos()))); + LOG.debug("*NEW* reduceSinkPartitionColumnVectorTypes " + + (vectorReduceSinkInfo.getReduceSinkPartitionColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionColumnVectorTypes()))); + LOG.debug("*NEW* reduceSinkPartitionExpressions " + + (vectorReduceSinkInfo.getReduceSinkPartitionExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionExpressions()))); + // UNDONE + if (LOG.isDebugEnabled()) { // Determine the name of our map or reduce task for debug tracing. BaseWork work = Utilities.getMapWork(hconf); @@ -280,21 +317,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { keyBinarySortableSerializeWrite = new BinarySortableSerializeWrite(columnSortOrder, columnNullMarker, columnNotNullMarker); - // Create all nulls key. - try { - Output nullKeyOutput = new Output(); - keyBinarySortableSerializeWrite.set(nullKeyOutput); - for (int i = 0; i < reduceSinkKeyColumnMap.length; i++) { - keyBinarySortableSerializeWrite.writeNull(); - } - int nullBytesLength = nullKeyOutput.getLength(); - nullBytes = new byte[nullBytesLength]; - System.arraycopy(nullKeyOutput.getData(), 0, nullBytes, 0, nullBytesLength); - nullKeyHashCode = HashCodeUtil.calculateBytesHashCode(nullBytes, 0, nullBytesLength); - } catch (Exception e) { - throw new HiveException(e); - } - valueLazyBinarySerializeWrite = new LazyBinarySerializeWrite(reduceSinkValueColumnMap.length); valueVectorSerializeRow = @@ -312,101 +334,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { batchCounter = 0; } - @Override - public void process(Object row, int tag) throws HiveException { - - try { - VectorizedRowBatch batch = (VectorizedRowBatch) row; - - batchCounter++; - - if (batch.size == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); - } - return; - } - - // Perform any key expressions. Results will go into scratch columns. - if (reduceSinkKeyExpressions != null) { - for (VectorExpression ve : reduceSinkKeyExpressions) { - ve.evaluate(batch); - } - } - - // Perform any value expressions. Results will go into scratch columns. - if (reduceSinkValueExpressions != null) { - for (VectorExpression ve : reduceSinkValueExpressions) { - ve.evaluate(batch); - } - } - - serializedKeySeries.processBatch(batch); - - boolean selectedInUse = batch.selectedInUse; - int[] selected = batch.selected; - - int keyLength; - int logical; - int end; - int batchIndex; - do { - if (serializedKeySeries.getCurrentIsAllNull()) { - - // Use the same logic as ReduceSinkOperator.toHiveKey. - // - if (tag == -1 || reduceSkipTag) { - keyWritable.set(nullBytes, 0, nullBytes.length); - } else { - keyWritable.setSize(nullBytes.length + 1); - System.arraycopy(nullBytes, 0, keyWritable.get(), 0, nullBytes.length); - keyWritable.get()[nullBytes.length] = reduceTagByte; - } - keyWritable.setDistKeyLength(nullBytes.length); - keyWritable.setHashCode(nullKeyHashCode); - - } else { - - // One serialized key for 1 or more rows for the duplicate keys. - // LOG.info("reduceSkipTag " + reduceSkipTag + " tag " + tag + " reduceTagByte " + (int) reduceTagByte + " keyLength " + serializedKeySeries.getSerializedLength()); - // LOG.info("process offset " + serializedKeySeries.getSerializedStart() + " length " + serializedKeySeries.getSerializedLength()); - keyLength = serializedKeySeries.getSerializedLength(); - if (tag == -1 || reduceSkipTag) { - keyWritable.set(serializedKeySeries.getSerializedBytes(), - serializedKeySeries.getSerializedStart(), keyLength); - } else { - keyWritable.setSize(keyLength + 1); - System.arraycopy(serializedKeySeries.getSerializedBytes(), - serializedKeySeries.getSerializedStart(), keyWritable.get(), 0, keyLength); - keyWritable.get()[keyLength] = reduceTagByte; - } - keyWritable.setDistKeyLength(keyLength); - keyWritable.setHashCode(serializedKeySeries.getCurrentHashCode()); - } - - logical = serializedKeySeries.getCurrentLogical(); - end = logical + serializedKeySeries.getCurrentDuplicateCount(); - do { - batchIndex = (selectedInUse ? selected[logical] : logical); - - valueLazyBinarySerializeWrite.reset(); - valueVectorSerializeRow.serializeWrite(batch, batchIndex); - - valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength()); - - collect(keyWritable, valueBytesWritable); - } while (++logical < end); - - if (!serializedKeySeries.next()) { - break; - } - } while (true); - - } catch (Exception e) { - throw new HiveException(e); - } - } - protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException { // Since this is a terminal operator, update counters explicitly - // forward is not called diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java index 325f773..0bc1cd1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java @@ -32,7 +32,7 @@ /* * Specialized class for native vectorized reduce sink that is reducing on a single long key column. */ -public class VectorReduceSinkLongOperator extends VectorReduceSinkCommonOperator { +public class VectorReduceSinkLongOperator extends VectorReduceSinkUniformHashOperator { private static final long serialVersionUID = 1L; private static final String CLASS_NAME = VectorReduceSinkLongOperator.class.getName(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java index 2027187..1cca94d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java @@ -32,7 +32,7 @@ * Specialized class for native vectorized reduce sink that is reducing on multiple key columns * (or a single non-long / non-string column). */ -public class VectorReduceSinkMultiKeyOperator extends VectorReduceSinkCommonOperator { +public class VectorReduceSinkMultiKeyOperator extends VectorReduceSinkUniformHashOperator { private static final long serialVersionUID = 1L; private static final String CLASS_NAME = VectorReduceSinkMultiKeyOperator.class.getName(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java new file mode 100644 index 0000000..de5b2e9 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.reducesink; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter; +import org.apache.hadoop.hive.ql.exec.TerminalOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo.BucketNumKind; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo.PartitionHashCodeKind; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; +import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hive.common.util.HashCodeUtil; + +/** + * This class is uniform hash (common) operator class for native vectorized reduce sink. + */ +public class VectorReduceSinkObjectHashOperator extends VectorReduceSinkCommonOperator { + + private static final long serialVersionUID = 1L; + private static final String CLASS_NAME = VectorReduceSinkObjectHashOperator.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + protected int[] reduceSinkBucketColumnMap; + protected TypeInfo[] reduceSinkBucketTypeInfos; + + protected VectorExpression[] reduceSinkBucketExpressions; + + protected int[] reduceSinkPartitionColumnMap; + protected TypeInfo[] reduceSinkPartitionTypeInfos; + + protected VectorExpression[] reduceSinkPartitionExpressions; + + protected BucketNumKind bucketNumKind; + + protected PartitionHashCodeKind partitionHashCodeKind; + + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + + protected transient Output keyOutput; + protected transient VectorSerializeRow keyVectorSerializeRow; + + private transient ObjectInspector[] bucketObjectInspectors; + private transient VectorExtractRow bucketVectorExtractRow; + private transient Object[] bucketFieldValues; + private transient int numBuckets; + + private transient ObjectInspector[] partitionObjectInspectors; + private transient VectorExtractRow partitionVectorExtractRow; + private transient Object[] partitionFieldValues; + private transient boolean isPartitioned; + private transient Random nonPartitionRandom; + + /** Kryo ctor. */ + protected VectorReduceSinkObjectHashOperator() { + super(); + } + + public VectorReduceSinkObjectHashOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorReduceSinkObjectHashOperator(CompilationOpContext ctx, + VectorizationContext vContext, OperatorDesc conf) throws HiveException { + super(ctx, vContext, conf); + + LOG.info("VectorReduceSinkObjectHashOperator constructor vectorReduceSinkInfo " + vectorReduceSinkInfo); + + reduceSinkBucketColumnMap = vectorReduceSinkInfo.getReduceSinkBucketColumnMap(); + reduceSinkBucketTypeInfos = vectorReduceSinkInfo.getReduceSinkBucketTypeInfos(); + reduceSinkBucketExpressions = vectorReduceSinkInfo.getReduceSinkBucketExpressions(); + + reduceSinkPartitionColumnMap = vectorReduceSinkInfo.getReduceSinkPartitionColumnMap(); + reduceSinkPartitionTypeInfos = vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos(); + reduceSinkPartitionExpressions = vectorReduceSinkInfo.getReduceSinkPartitionExpressions(); + + bucketNumKind = vectorReduceSinkInfo.getBucketNumKind(); + partitionHashCodeKind = vectorReduceSinkInfo.getPartitionHashCodeKind(); + } + + private ObjectInspector[] getObjectInspectorArray(TypeInfo[] typeInfos) { + final int size = typeInfos.length; + ObjectInspector[] objectInspectors = new ObjectInspector[size]; + for(int i = 0; i < size; i++) { + TypeInfo typeInfo = typeInfos[i]; + ObjectInspector standardWritableObjectInspector = + TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(typeInfo); + objectInspectors[i] = standardWritableObjectInspector; + } + return objectInspectors; + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + + keyOutput = new Output(); + keyBinarySortableSerializeWrite.set(keyOutput); + keyVectorSerializeRow = + new VectorSerializeRow( + keyBinarySortableSerializeWrite); + keyVectorSerializeRow.init(reduceSinkKeyTypeInfos, reduceSinkKeyColumnMap); + + numBuckets = conf.getNumBuckets(); + switch (bucketNumKind) { + case NONE: + break; + case REGULAR_EVAL: + bucketObjectInspectors = getObjectInspectorArray(reduceSinkBucketTypeInfos); + LOG.debug("*NEW* bucketObjectInspectors " + Arrays.toString(bucketObjectInspectors)); + bucketVectorExtractRow = new VectorExtractRow(); + bucketVectorExtractRow.init(reduceSinkBucketTypeInfos, reduceSinkBucketColumnMap); + bucketFieldValues = new Object[reduceSinkBucketTypeInfos.length]; + break; + case ACID_UPDATE_OR_DELETE: + throw new RuntimeException("ACID UPDATE or DELETE not supported yet"); + default: + throw new RuntimeException("Unexpected bucketNum kind " + bucketNumKind); + } + + isPartitioned = false; + switch (partitionHashCodeKind) { + case NONE: + case NON_PART_ACID: + break; + case NON_PART_NOT_ACID: + nonPartitionRandom = new Random(12345);; + break; + case PARTITIONED: + isPartitioned = true; + partitionObjectInspectors = getObjectInspectorArray(reduceSinkPartitionTypeInfos); + LOG.debug("*NEW* partitionObjectInspectors " + Arrays.toString(partitionObjectInspectors)); + partitionVectorExtractRow = new VectorExtractRow(); + partitionVectorExtractRow.init(reduceSinkPartitionTypeInfos, reduceSinkPartitionColumnMap); + partitionFieldValues = new Object[reduceSinkPartitionTypeInfos.length]; + break; + default: + throw new RuntimeException("Unexpected partitionHashCode kind " + partitionHashCodeKind); + } + } + + @Override + public void process(Object row, int tag) throws HiveException { + + try { + + VectorizedRowBatch batch = (VectorizedRowBatch) row; + + batchCounter++; + + if (batch.size == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); + } + return; + } + + // Perform any key expressions. Results will go into scratch columns. + if (reduceSinkKeyExpressions != null) { + for (VectorExpression ve : reduceSinkKeyExpressions) { + ve.evaluate(batch); + } + } + + // Perform any value expressions. Results will go into scratch columns. + if (reduceSinkValueExpressions != null) { + for (VectorExpression ve : reduceSinkValueExpressions) { + ve.evaluate(batch); + } + } + + // Perform any bucket expressions. Results will go into scratch columns. + if (reduceSinkBucketExpressions != null) { + for (VectorExpression ve : reduceSinkBucketExpressions) { + ve.evaluate(batch); + } + } + + // Perform any partition expressions. Results will go into scratch columns. + if (reduceSinkPartitionExpressions != null) { + for (VectorExpression ve : reduceSinkPartitionExpressions) { + ve.evaluate(batch); + } + } + + final boolean selectedInUse = batch.selectedInUse; + int[] selected = batch.selected; + int batchIndex; + int bucketNum; + int hashCode; + int keyLength; + final int size = batch.size; + for (int logical = 0; logical < size; logical++) { + batchIndex = (selectedInUse ? selected[logical] : logical); + + switch (bucketNumKind) { + case NONE: + bucketNum = -1; + break; + case REGULAR_EVAL: + bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); + bucketNum = ObjectInspectorUtils.getBucketNumber(bucketFieldValues, bucketObjectInspectors, numBuckets); + break; + default: + throw new RuntimeException("Unexpected bucketNum kind " + bucketNumKind); + } + + switch (partitionHashCodeKind) { + case NONE: + hashCode = 0; + break; + case NON_PART_NOT_ACID: + hashCode = nonPartitionRandom.nextInt(); + break; + case NON_PART_ACID: + hashCode = 1; + break; + case PARTITIONED: + partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); + hashCode = ObjectInspectorUtils.getBucketHashCode(partitionFieldValues, partitionObjectInspectors); + break; + default: + throw new RuntimeException("Unexpected partitionHashCode kind " + partitionHashCodeKind); + } + hashCode = bucketNum < 0 ? hashCode : hashCode * 31 + bucketNum; + + keyBinarySortableSerializeWrite.reset(); + keyVectorSerializeRow.serializeWrite(batch, batchIndex); + + // One serialized key for 1 or more rows for the duplicate keys. + keyLength = keyOutput.getLength(); + if (tag == -1 || reduceSkipTag) { + keyWritable.set(keyOutput.getData(), 0, keyLength); + } else { + keyWritable.setSize(keyLength + 1); + System.arraycopy(keyOutput.getData(), 0, keyWritable.get(), 0, keyLength); + keyWritable.get()[keyLength] = reduceTagByte; + } + keyWritable.setDistKeyLength(keyLength); + keyWritable.setHashCode(hashCode); + + valueLazyBinarySerializeWrite.reset(); + valueVectorSerializeRow.serializeWrite(batch, batchIndex); + + valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength()); + + collect(keyWritable, valueBytesWritable); + } + } catch (Exception e) { + throw new HiveException(e); + } + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java index b655e6e..a838f4c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java @@ -32,7 +32,7 @@ /* * Specialized class for native vectorized reduce sink that is reducing on a single long key column. */ -public class VectorReduceSinkStringOperator extends VectorReduceSinkCommonOperator { +public class VectorReduceSinkStringOperator extends VectorReduceSinkUniformHashOperator { private static final long serialVersionUID = 1L; private static final String CLASS_NAME = VectorReduceSinkStringOperator.class.getName(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java new file mode 100644 index 0000000..a1ac02d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.reducesink; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter; +import org.apache.hadoop.hive.ql.exec.TerminalOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo.BucketNumKind; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo.PartitionHashCodeKind; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; +import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hive.common.util.HashCodeUtil; + +/** + * This class is uniform hash (common) operator class for native vectorized reduce sink. + */ +public abstract class VectorReduceSinkUniformHashOperator extends VectorReduceSinkCommonOperator { + + private static final long serialVersionUID = 1L; + private static final String CLASS_NAME = VectorReduceSinkUniformHashOperator.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + + // The serialized all null key and its hash code. + private transient byte[] nullBytes; + private transient int nullKeyHashCode; + + // The object that determines equal key series. + protected transient VectorKeySeriesSerialized serializedKeySeries; + + + /** Kryo ctor. */ + protected VectorReduceSinkUniformHashOperator() { + super(); + } + + public VectorReduceSinkUniformHashOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorReduceSinkUniformHashOperator(CompilationOpContext ctx, + VectorizationContext vContext, OperatorDesc conf) throws HiveException { + super(ctx, vContext, conf); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + + // Create all nulls key. + try { + Output nullKeyOutput = new Output(); + keyBinarySortableSerializeWrite.set(nullKeyOutput); + for (int i = 0; i < reduceSinkKeyColumnMap.length; i++) { + keyBinarySortableSerializeWrite.writeNull(); + } + int nullBytesLength = nullKeyOutput.getLength(); + nullBytes = new byte[nullBytesLength]; + System.arraycopy(nullKeyOutput.getData(), 0, nullBytes, 0, nullBytesLength); + nullKeyHashCode = HashCodeUtil.calculateBytesHashCode(nullBytes, 0, nullBytesLength); + } catch (Exception e) { + throw new HiveException(e); + } + } + + @Override + public void process(Object row, int tag) throws HiveException { + + try { + VectorizedRowBatch batch = (VectorizedRowBatch) row; + + batchCounter++; + + if (batch.size == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); + } + return; + } + + // Perform any key expressions. Results will go into scratch columns. + if (reduceSinkKeyExpressions != null) { + for (VectorExpression ve : reduceSinkKeyExpressions) { + ve.evaluate(batch); + } + } + + // Perform any value expressions. Results will go into scratch columns. + if (reduceSinkValueExpressions != null) { + for (VectorExpression ve : reduceSinkValueExpressions) { + ve.evaluate(batch); + } + } + + serializedKeySeries.processBatch(batch); + + boolean selectedInUse = batch.selectedInUse; + int[] selected = batch.selected; + + int keyLength; + int logical; + int end; + int batchIndex; + do { + if (serializedKeySeries.getCurrentIsAllNull()) { + + // Use the same logic as ReduceSinkOperator.toHiveKey. + // + if (tag == -1 || reduceSkipTag) { + keyWritable.set(nullBytes, 0, nullBytes.length); + } else { + keyWritable.setSize(nullBytes.length + 1); + System.arraycopy(nullBytes, 0, keyWritable.get(), 0, nullBytes.length); + keyWritable.get()[nullBytes.length] = reduceTagByte; + } + keyWritable.setDistKeyLength(nullBytes.length); + keyWritable.setHashCode(nullKeyHashCode); + + } else { + + // One serialized key for 1 or more rows for the duplicate keys. + // LOG.info("reduceSkipTag " + reduceSkipTag + " tag " + tag + " reduceTagByte " + (int) reduceTagByte + " keyLength " + serializedKeySeries.getSerializedLength()); + // LOG.info("process offset " + serializedKeySeries.getSerializedStart() + " length " + serializedKeySeries.getSerializedLength()); + keyLength = serializedKeySeries.getSerializedLength(); + if (tag == -1 || reduceSkipTag) { + keyWritable.set(serializedKeySeries.getSerializedBytes(), + serializedKeySeries.getSerializedStart(), keyLength); + } else { + keyWritable.setSize(keyLength + 1); + System.arraycopy(serializedKeySeries.getSerializedBytes(), + serializedKeySeries.getSerializedStart(), keyWritable.get(), 0, keyLength); + keyWritable.get()[keyLength] = reduceTagByte; + } + keyWritable.setDistKeyLength(keyLength); + keyWritable.setHashCode(serializedKeySeries.getCurrentHashCode()); + } + + logical = serializedKeySeries.getCurrentLogical(); + end = logical + serializedKeySeries.getCurrentDuplicateCount(); + do { + batchIndex = (selectedInUse ? selected[logical] : logical); + + valueLazyBinarySerializeWrite.reset(); + valueVectorSerializeRow.serializeWrite(batch, batchIndex); + + valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength()); + + collect(keyWritable, valueBytesWritable); + } while (++logical < end); + + if (!serializedKeySeries.next()) { + break; + } + } while (true); + + } catch (Exception e) { + throw new HiveException(e); + } + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index e3d9d7f..a83e1d8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -28,6 +28,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; @@ -61,6 +62,7 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOperator; import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkLongOperator; import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkMultiKeyOperator; +import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkObjectHashOperator; import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkStringOperator; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator; @@ -119,9 +121,11 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorDeserializeType; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo.BucketNumKind; import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; +import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo.PartitionHashCodeKind; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.UDFAcos; import org.apache.hadoop.hive.ql.udf.UDFAsin; @@ -2271,9 +2275,6 @@ private boolean canSpecializeMapJoin(Operator op, MapJoi Operator op, VectorizationContext vContext, ReduceSinkDesc desc, VectorReduceSinkInfo vectorReduceSinkInfo) throws HiveException { - Operator vectorOp = null; - Class> opClass = null; - Type[] reduceSinkKeyColumnVectorTypes = vectorReduceSinkInfo.getReduceSinkKeyColumnVectorTypes(); // By default, we can always use the multi-key class. @@ -2310,18 +2311,23 @@ private boolean canSpecializeMapJoin(Operator op, MapJoi } } - switch (reduceSinkKeyType) { - case LONG: - opClass = VectorReduceSinkLongOperator.class; - break; - case STRING: - opClass = VectorReduceSinkStringOperator.class; - break; - case MULTI_KEY: - opClass = VectorReduceSinkMultiKeyOperator.class; - break; - default: - throw new HiveException("Unknown reduce sink key type " + reduceSinkKeyType); + Class> opClass = null; + if (vectorReduceSinkInfo.getUseUniformHash()) { + switch (reduceSinkKeyType) { + case LONG: + opClass = VectorReduceSinkLongOperator.class; + break; + case STRING: + opClass = VectorReduceSinkStringOperator.class; + break; + case MULTI_KEY: + opClass = VectorReduceSinkMultiKeyOperator.class; + break; + default: + throw new HiveException("Unknown reduce sink key type " + reduceSinkKeyType); + } + } else { + opClass = VectorReduceSinkObjectHashOperator.class; } VectorReduceSinkDesc vectorDesc = new VectorReduceSinkDesc(); @@ -2329,9 +2335,17 @@ private boolean canSpecializeMapJoin(Operator op, MapJoi vectorDesc.setReduceSinkKeyType(reduceSinkKeyType); vectorDesc.setVectorReduceSinkInfo(vectorReduceSinkInfo); - vectorOp = OperatorFactory.getVectorOperator( - opClass, op.getCompilationOpContext(), op.getConf(), vContext); - LOG.info("Vectorizer vectorizeOperator reduce sink class " + vectorOp.getClass().getSimpleName()); + LOG.info("Vectorizer vectorizeOperator reduce sink class " + opClass.getSimpleName()); + + Operator vectorOp = null; + try { + vectorOp = OperatorFactory.getVectorOperator( + opClass, op.getCompilationOpContext(), op.getConf(), vContext); + } catch (Exception e) { + LOG.info("Vectorizer vectorizeOperator reduce sink class exception " + opClass.getSimpleName() + + " exception " + e); + throw new HiveException(e); + } return vectorOp; } @@ -2345,26 +2359,21 @@ private boolean canSpecializeReduceSink(ReduceSinkDesc desc, return false; } - // Many restrictions. + // Varous restrictions. if (!isTez) { return false; } + /* + * Currently, we do not support UPDATE and DELETE because they require the virtual column + * ROWID and we do not support that yet in vectorization. + */ if (desc.getWriteType() == AcidUtils.Operation.UPDATE || desc.getWriteType() == AcidUtils.Operation.DELETE) { return false; } - if (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) { - return false; - } - - boolean useUniformHash = desc.getReducerTraits().contains(UNIFORM); - if (!useUniformHash) { - return false; - } - if (desc.getTopN() >= 0) { return false; } @@ -2448,6 +2457,131 @@ private boolean canSpecializeReduceSink(ReduceSinkDesc desc, vectorReduceSinkInfo.setReduceSinkValueColumnVectorTypes(reduceSinkValueColumnVectorTypes); vectorReduceSinkInfo.setReduceSinkValueExpressions(reduceSinkValueExpressions); + boolean useUniformHash = desc.getReducerTraits().contains(UNIFORM); + vectorReduceSinkInfo.setUseUniformHash(useUniformHash); + + if (useUniformHash) { + + // Check for unexpected conditions... + + if (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) { + return false; + } + + if (desc.getPartitionCols().size() == 0) { + // With UNIFORM hash, the keys and partitions are identical. + return false; + } + + } else { + + BucketNumKind bucketNumKind = BucketNumKind.NONE; + PartitionHashCodeKind partitionHashCodeKind = PartitionHashCodeKind.NONE; + + if (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) { + + bucketNumKind = BucketNumKind.REGULAR_EVAL; + + } else if (desc.getWriteType() == AcidUtils.Operation.UPDATE || + desc.getWriteType() == AcidUtils.Operation.DELETE) { + + // In the non-partitioned case we still want to compute the bucket number for updates and + // deletes. + // NOTE: Not supported yet. + bucketNumKind = BucketNumKind.ACID_UPDATE_OR_DELETE; + } + + if (desc.getPartitionCols().size() == 0) { + + // (Comment from ReduceSinkOperator): + // If no partition cols and not doing an update or delete, just distribute the data uniformly + // to provide better load balance. If the requirement is to have a single reducer, we should + // set the number of reducers to 1. Use a constant seed to make the code deterministic. + // For acid operations make sure to send all records with the same key to the same + // FileSinkOperator, as the RecordUpdater interface can't manage multiple writers for a file. + if (desc.getWriteType() == AcidUtils.Operation.NOT_ACID) { + partitionHashCodeKind = PartitionHashCodeKind.NON_PART_NOT_ACID; + } else { + partitionHashCodeKind = PartitionHashCodeKind.NON_PART_ACID; + } + } else { + partitionHashCodeKind = PartitionHashCodeKind.PARTITIONED; + } + + int[] reduceSinkBucketColumnMap = null; + TypeInfo[] reduceSinkBucketTypeInfos = null; + Type[] reduceSinkBucketColumnVectorTypes = null; + VectorExpression[] reduceSinkBucketExpressions = null; + + List bucketDescs = desc.getBucketCols(); + if (bucketDescs != null) { + VectorExpression[] allBucketExpressions = vContext.getVectorExpressions(bucketDescs); + + reduceSinkBucketColumnMap = new int[bucketDescs.size()]; + reduceSinkBucketTypeInfos = new TypeInfo[bucketDescs.size()]; + reduceSinkBucketColumnVectorTypes = new Type[bucketDescs.size()]; + ArrayList reduceSinkBucketExpressionsList = new ArrayList(); + for (int i = 0; i < bucketDescs.size(); ++i) { + VectorExpression ve = allBucketExpressions[i]; + reduceSinkBucketColumnMap[i] = ve.getOutputColumn(); + reduceSinkBucketTypeInfos[i] = bucketDescs.get(i).getTypeInfo(); + reduceSinkBucketColumnVectorTypes[i] = + VectorizationContext.getColumnVectorTypeFromTypeInfo(reduceSinkBucketTypeInfos[i]); + if (!IdentityExpression.isColumnOnly(ve)) { + reduceSinkBucketExpressionsList.add(ve); + } + } + if (reduceSinkBucketExpressionsList.size() == 0) { + reduceSinkBucketExpressions = null; + } else { + reduceSinkBucketExpressions = reduceSinkBucketExpressionsList.toArray(new VectorExpression[0]); + } + } + + int[] reduceSinkPartitionColumnMap = null; + TypeInfo[] reduceSinkPartitionTypeInfos = null; + Type[] reduceSinkPartitionColumnVectorTypes = null; + VectorExpression[] reduceSinkPartitionExpressions = null; + + List partitionDescs = desc.getPartitionCols(); + if (partitionDescs != null) { + VectorExpression[] allPartitionExpressions = vContext.getVectorExpressions(partitionDescs); + + reduceSinkPartitionColumnMap = new int[partitionDescs.size()]; + reduceSinkPartitionTypeInfos = new TypeInfo[partitionDescs.size()]; + reduceSinkPartitionColumnVectorTypes = new Type[partitionDescs.size()]; + ArrayList reduceSinkPartitionExpressionsList = new ArrayList(); + for (int i = 0; i < partitionDescs.size(); ++i) { + VectorExpression ve = allPartitionExpressions[i]; + reduceSinkPartitionColumnMap[i] = ve.getOutputColumn(); + reduceSinkPartitionTypeInfos[i] = partitionDescs.get(i).getTypeInfo(); + reduceSinkPartitionColumnVectorTypes[i] = + VectorizationContext.getColumnVectorTypeFromTypeInfo(reduceSinkPartitionTypeInfos[i]); + if (!IdentityExpression.isColumnOnly(ve)) { + reduceSinkPartitionExpressionsList.add(ve); + } + } + if (reduceSinkPartitionExpressionsList.size() == 0) { + reduceSinkPartitionExpressions = null; + } else { + reduceSinkPartitionExpressions = reduceSinkPartitionExpressionsList.toArray(new VectorExpression[0]); + } + } + + vectorReduceSinkInfo.setBucketNumKind(bucketNumKind); + vectorReduceSinkInfo.setPartitionHashCodeKind(partitionHashCodeKind); + + vectorReduceSinkInfo.setReduceSinkBucketColumnMap(reduceSinkBucketColumnMap); + vectorReduceSinkInfo.setReduceSinkBucketTypeInfos(reduceSinkBucketTypeInfos); + vectorReduceSinkInfo.setReduceSinkBucketColumnVectorTypes(reduceSinkBucketColumnVectorTypes); + vectorReduceSinkInfo.setReduceSinkBucketExpressions(reduceSinkBucketExpressions); + + vectorReduceSinkInfo.setReduceSinkPartitionColumnMap(reduceSinkPartitionColumnMap); + vectorReduceSinkInfo.setReduceSinkPartitionTypeInfos(reduceSinkPartitionTypeInfos); + vectorReduceSinkInfo.setReduceSinkPartitionColumnVectorTypes(reduceSinkPartitionColumnVectorTypes); + vectorReduceSinkInfo.setReduceSinkPartitionExpressions(reduceSinkPartitionExpressions); + } + return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkInfo.java ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkInfo.java index 8c35415..335e3be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkInfo.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkInfo.java @@ -35,6 +35,23 @@ private static long serialVersionUID = 1L; + public static enum BucketNumKind { + NONE, + REGULAR_EVAL, + ACID_UPDATE_OR_DELETE + } + + public static enum PartitionHashCodeKind { + NONE, + NON_PART_NOT_ACID, + NON_PART_ACID, + PARTITIONED + } + + private boolean useUniformHash; + private BucketNumKind bucketNumKind; + private PartitionHashCodeKind partitionHashCodeKind; + private int[] reduceSinkKeyColumnMap; private TypeInfo[] reduceSinkKeyTypeInfos; private Type[] reduceSinkKeyColumnVectorTypes; @@ -45,7 +62,21 @@ private Type[] reduceSinkValueColumnVectorTypes; private VectorExpression[] reduceSinkValueExpressions; + private int[] reduceSinkBucketColumnMap; + private TypeInfo[] reduceSinkBucketTypeInfos; + private Type[] reduceSinkBucketColumnVectorTypes; + private VectorExpression[] reduceSinkBucketExpressions; + + private int[] reduceSinkPartitionColumnMap; + private TypeInfo[] reduceSinkPartitionTypeInfos; + private Type[] reduceSinkPartitionColumnVectorTypes; + private VectorExpression[] reduceSinkPartitionExpressions; + public VectorReduceSinkInfo() { + useUniformHash = false; + bucketNumKind = BucketNumKind.NONE; + partitionHashCodeKind = PartitionHashCodeKind.NONE; + reduceSinkKeyColumnMap = null; reduceSinkKeyTypeInfos = null; reduceSinkKeyColumnVectorTypes = null; @@ -55,6 +86,40 @@ public VectorReduceSinkInfo() { reduceSinkValueTypeInfos = null; reduceSinkValueColumnVectorTypes = null; reduceSinkValueExpressions = null; + + reduceSinkBucketColumnMap = null; + reduceSinkBucketTypeInfos = null; + reduceSinkBucketColumnVectorTypes = null; + reduceSinkBucketExpressions = null; + + reduceSinkPartitionColumnMap = null; + reduceSinkPartitionTypeInfos = null; + reduceSinkPartitionColumnVectorTypes = null; + reduceSinkPartitionExpressions = null; + } + + public boolean getUseUniformHash() { + return useUniformHash; + } + + public void setUseUniformHash(boolean useUniformHash) { + this.useUniformHash = useUniformHash; + } + + public BucketNumKind getBucketNumKind() { + return bucketNumKind; + } + + public void setBucketNumKind(BucketNumKind bucketNumKind) { + this.bucketNumKind = bucketNumKind; + } + + public PartitionHashCodeKind getPartitionHashCodeKind() { + return partitionHashCodeKind; + } + + public void setPartitionHashCodeKind(PartitionHashCodeKind partitionHashCodeKind) { + this.partitionHashCodeKind = partitionHashCodeKind; } public int[] getReduceSinkKeyColumnMap() { @@ -120,4 +185,68 @@ public void setReduceSinkValueColumnVectorTypes(Type[] reduceSinkValueColumnVect public void setReduceSinkValueExpressions(VectorExpression[] reduceSinkValueExpressions) { this.reduceSinkValueExpressions = reduceSinkValueExpressions; } + + public int[] getReduceSinkBucketColumnMap() { + return reduceSinkBucketColumnMap; + } + + public void setReduceSinkBucketColumnMap(int[] reduceSinkBucketColumnMap) { + this.reduceSinkBucketColumnMap = reduceSinkBucketColumnMap; + } + + public TypeInfo[] getReduceSinkBucketTypeInfos() { + return reduceSinkBucketTypeInfos; + } + + public void setReduceSinkBucketTypeInfos(TypeInfo[] reduceSinkBucketTypeInfos) { + this.reduceSinkBucketTypeInfos = reduceSinkBucketTypeInfos; + } + + public Type[] getReduceSinkBucketColumnVectorTypes() { + return reduceSinkBucketColumnVectorTypes; + } + + public void setReduceSinkBucketColumnVectorTypes(Type[] reduceSinkBucketColumnVectorTypes) { + this.reduceSinkBucketColumnVectorTypes = reduceSinkBucketColumnVectorTypes; + } + + public VectorExpression[] getReduceSinkBucketExpressions() { + return reduceSinkBucketExpressions; + } + + public void setReduceSinkBucketExpressions(VectorExpression[] reduceSinkBucketExpressions) { + this.reduceSinkBucketExpressions = reduceSinkBucketExpressions; + } + + public int[] getReduceSinkPartitionColumnMap() { + return reduceSinkPartitionColumnMap; + } + + public void setReduceSinkPartitionColumnMap(int[] reduceSinkPartitionColumnMap) { + this.reduceSinkPartitionColumnMap = reduceSinkPartitionColumnMap; + } + + public TypeInfo[] getReduceSinkPartitionTypeInfos() { + return reduceSinkPartitionTypeInfos; + } + + public void setReduceSinkPartitionTypeInfos(TypeInfo[] reduceSinkPartitionTypeInfos) { + this.reduceSinkPartitionTypeInfos = reduceSinkPartitionTypeInfos; + } + + public Type[] getReduceSinkPartitionColumnVectorTypes() { + return reduceSinkPartitionColumnVectorTypes; + } + + public void setReduceSinkPartitionColumnVectorTypes(Type[] reduceSinkPartitionColumnVectorTypes) { + this.reduceSinkPartitionColumnVectorTypes = reduceSinkPartitionColumnVectorTypes; + } + + public VectorExpression[] getReduceSinkPartitionExpressions() { + return reduceSinkPartitionExpressions; + } + + public void setReduceSinkPartitionExpressions(VectorExpression[] reduceSinkPartitionExpressions) { + this.reduceSinkPartitionExpressions = reduceSinkPartitionExpressions; + } }