diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java index 9d35805..765a647 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java @@ -32,9 +32,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; -import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; -import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBase; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.ByteStream.Output; @@ -163,7 +163,7 @@ public GetAdaptor(MapJoinKey key) { } @Override - public JoinUtil.JoinResult setFromVector(VectorHashKeyWrapper kw, + public JoinUtil.JoinResult setFromVector(VectorHashKeyWrapperBase kw, VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) throws HiveException { if (currentKey == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index 027e39a..13f1702 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -39,10 +39,10 @@ import org.apache.hadoop.hive.ql.exec.JoinUtil.JoinResult; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper; -import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; -import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.rowbytescontainer.VectorRowBytesContainer; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBase; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; @@ -812,7 +812,7 @@ public GetAdaptor() { } @Override - public JoinUtil.JoinResult setFromVector(VectorHashKeyWrapper kw, + public JoinUtil.JoinResult setFromVector(VectorHashKeyWrapperBase kw, VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) throws HiveException { if (nulls == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index 033bbdb..b632e1d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -30,9 +30,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; -import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; -import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBase; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.ByteStream.Output; @@ -519,7 +519,7 @@ public GetAdaptor() { } @Override - public JoinUtil.JoinResult setFromVector(VectorHashKeyWrapper kw, + public JoinUtil.JoinResult setFromVector(VectorHashKeyWrapperBase kw, VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) throws HiveException { if (nulls == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java index 6504a5f..2e3716c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java @@ -24,9 +24,9 @@ import java.util.HashSet; import java.util.List; -import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; -import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBase; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.AbstractSerDe; @@ -118,7 +118,7 @@ public static MapJoinKey readFromVector(Output output, MapJoinKey key, Object[] * Serializes row to output for vectorized path. * @param byteStream Output to reuse. Can be null, in that case a new one would be created. */ - public static Output serializeVector(Output byteStream, VectorHashKeyWrapper kw, + public static Output serializeVector(Output byteStream, VectorHashKeyWrapperBase kw, VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch, boolean[] nulls, boolean[] sortableSortOrders, byte[] nullMarkers, byte[] notNullMarkers) throws HiveException, SerDeException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKeyObject.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKeyObject.java index 5c750a3..555ccdf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKeyObject.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKeyObject.java @@ -25,10 +25,10 @@ import java.util.List; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; -import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; -import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBase; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -149,7 +149,7 @@ public void readFromRow(Object[] fieldObjs, List keyFieldsOI) return nulls; } - public void readFromVector(VectorHashKeyWrapper kw, VectorExpressionWriter[] keyOutputWriters, + public void readFromVector(VectorHashKeyWrapperBase kw, VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) throws HiveException { if (key == null || key.length != keyOutputWriters.length) { key = new Object[keyOutputWriters.length]; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java index b0c7574..2c4229f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java @@ -24,9 +24,9 @@ import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.common.MemoryEstimate; -import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; -import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBase; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -43,7 +43,7 @@ * Changes current rows to which adaptor is referring to the rows corresponding to * the key represented by a VHKW object, and writers and batch used to interpret it. */ - JoinUtil.JoinResult setFromVector(VectorHashKeyWrapper kw, VectorExpressionWriter[] keyOutputWriters, + JoinUtil.JoinResult setFromVector(VectorHashKeyWrapperBase kw, VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) throws HiveException; /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java index 2f15749..f24b953 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java @@ -38,34 +38,34 @@ /** * indices of LONG primitive keys. */ - protected int[] longIndices; + public int[] longIndices; /** * indices of DOUBLE primitive keys. */ - protected int[] doubleIndices; + public int[] doubleIndices; /** * indices of string (byte[]) primitive keys. */ - protected int[] stringIndices; + public int[] stringIndices; /** * indices of decimal primitive keys. */ - protected int[] decimalIndices; + public int[] decimalIndices; /** * indices of TIMESTAMP primitive keys. */ - protected int[] timestampIndices; + public int[] timestampIndices; /** * indices of INTERVAL_DAY_TIME primitive keys. */ - protected int[] intervalDayTimeIndices; + public int[] intervalDayTimeIndices; - final protected int keyCount; + final public int keyCount; private int addKeyIndex; private int addLongIndex; @@ -78,9 +78,9 @@ // Given the keyIndex these arrays return: // The ColumnVector.Type, // The type specific index into longIndices, doubleIndices, etc... - protected TypeInfo[] typeInfos; - protected ColumnVector.Type[] columnVectorTypes; - protected int[] columnTypeSpecificIndices; + public TypeInfo[] typeInfos; + public ColumnVector.Type[] columnVectorTypes; + public int[] columnTypeSpecificIndices; protected VectorColumnSetInfo(int keyCount) { this.keyCount = keyCount; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 43f1162..7816cbb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -44,6 +44,8 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBase; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; @@ -453,7 +455,7 @@ public void close(boolean aborted) throws HiveException { if (!aborted && sumBatchSize == 0 && GroupByOperator.shouldEmitSummaryRow(conf)) { // in case the empty grouping set is preset; but no output has done // the "summary row" still needs to be emitted - VectorHashKeyWrapper kw = keyWrappersBatch.getVectorHashKeyWrappers()[0]; + VectorHashKeyWrapperBase kw = keyWrappersBatch.getVectorHashKeyWrappers()[0]; kw.setNull(); int pos = conf.getGroupingSetPosition(); if (pos >= 0) { @@ -481,13 +483,13 @@ private void prepareBatchAggregationBufferSets(VectorizedRowBatch batch) throws // We now have to probe the global hash and find-or-allocate // the aggregation buffers to use for each key present in the batch - VectorHashKeyWrapper[] keyWrappers = keyWrappersBatch.getVectorHashKeyWrappers(); + VectorHashKeyWrapperBase[] keyWrappers = keyWrappersBatch.getVectorHashKeyWrappers(); final int n = keyExpressions.length == 0 ? 1 : batch.size; // note - the row mapping is not relevant when aggregationBatchInfo::getDistinctBufferSetCount() == 1 for (int i=0; i < n; ++i) { - VectorHashKeyWrapper kw = keyWrappers[i]; + VectorHashKeyWrapperBase kw = keyWrappers[i]; VectorAggregationBufferRow aggregationBuffer = mapKeysAggregationBuffers.get(kw); if (null == aggregationBuffer) { // the probe failed, we must allocate a set of aggregation buffers @@ -564,7 +566,7 @@ private void flush(boolean all) throws HiveException { while(iter.hasNext()) { Map.Entry pair = iter.next(); - writeSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue()); + writeSingleRow((VectorHashKeyWrapperBase) pair.getKey(), pair.getValue()); if (!all) { iter.remove(); @@ -659,13 +661,13 @@ private void checkHashModeEfficiency() throws HiveException { /** * The current key, used in streaming mode */ - private VectorHashKeyWrapper streamingKey; + private VectorHashKeyWrapperBase streamingKey; /** * The keys that needs to be flushed at the end of the current batch */ - private final VectorHashKeyWrapper[] keysToFlush = - new VectorHashKeyWrapper[VectorizedRowBatch.DEFAULT_SIZE]; + private final VectorHashKeyWrapperBase[] keysToFlush = + new VectorHashKeyWrapperBase[VectorizedRowBatch.DEFAULT_SIZE]; /** * The aggregates that needs to be flushed at the end of the current batch @@ -723,9 +725,9 @@ public void doProcessBatch(VectorizedRowBatch batch, boolean isFirstGroupingSet, keyWrappersBatch.evaluateBatchGroupingSets(batch, currentGroupingSetsOverrideIsNulls); } - VectorHashKeyWrapper[] batchKeys = keyWrappersBatch.getVectorHashKeyWrappers(); + VectorHashKeyWrapperBase[] batchKeys = keyWrappersBatch.getVectorHashKeyWrappers(); - final VectorHashKeyWrapper prevKey = streamingKey; + final VectorHashKeyWrapperBase prevKey = streamingKey; if (streamingKey == null) { // This is the first batch we process after switching from hash mode currentStreamingAggregators = streamAggregationBufferRowPool.getFromPool(); @@ -760,7 +762,7 @@ public void doProcessBatch(VectorizedRowBatch batch, boolean isFirstGroupingSet, } if (streamingKey != prevKey) { - streamingKey = (VectorHashKeyWrapper) streamingKey.copyKey(); + streamingKey = (VectorHashKeyWrapperBase) streamingKey.copyKey(); } } @@ -1127,7 +1129,7 @@ public void process(Object row, int tag) throws HiveException { * @param agg * @throws HiveException */ - private void writeSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg) + private void writeSingleRow(VectorHashKeyWrapperBase kw, VectorAggregationBufferRow agg) throws HiveException { int colNum = 0; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java deleted file mode 100644 index 38c31a5..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java +++ /dev/null @@ -1,682 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector; - -import org.apache.hadoop.hive.serde2.io.DateWritableV2; -import org.apache.hive.common.util.Murmur3; - -import java.sql.Date; -import java.sql.Timestamp; -import java.util.Arrays; - -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; -import org.apache.hadoop.hive.ql.exec.KeyWrapper; -import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.util.JavaDataModel; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; - -import com.google.common.base.Preconditions; - -/** - * A hash map key wrapper for vectorized processing. - * It stores the key values as primitives in arrays for each supported primitive type. - * This works in conjunction with - * {@link org.apache.hadoop.hive.ql.exec.VectorHashKeyWrapperBatch VectorHashKeyWrapperBatch} - * to hash vectorized processing units (batches). - */ -public class VectorHashKeyWrapper extends KeyWrapper { - - public static final class HashContext { - private final Murmur3.IncrementalHash32 bytesHash = new Murmur3.IncrementalHash32(); - - public static Murmur3.IncrementalHash32 getBytesHash(HashContext ctx) { - if (ctx == null) { - return new Murmur3.IncrementalHash32(); - } - return ctx.bytesHash; - } - } - - private static final int[] EMPTY_INT_ARRAY = new int[0]; - private static final long[] EMPTY_LONG_ARRAY = new long[0]; - private static final double[] EMPTY_DOUBLE_ARRAY = new double[0]; - private static final byte[][] EMPTY_BYTES_ARRAY = new byte[0][]; - private static final HiveDecimalWritable[] EMPTY_DECIMAL_ARRAY = new HiveDecimalWritable[0]; - private static final Timestamp[] EMPTY_TIMESTAMP_ARRAY = new Timestamp[0]; - private static final HiveIntervalDayTime[] EMPTY_INTERVAL_DAY_TIME_ARRAY = new HiveIntervalDayTime[0]; - - public static final VectorHashKeyWrapper EMPTY_KEY_WRAPPER = new EmptyVectorHashKeyWrapper(); - - private long[] longValues; - private double[] doubleValues; - - private byte[][] byteValues; - private int[] byteStarts; - private int[] byteLengths; - - private HiveDecimalWritable[] decimalValues; - - private Timestamp[] timestampValues; - private static Timestamp ZERO_TIMESTAMP = new Timestamp(0); - - private HiveIntervalDayTime[] intervalDayTimeValues; - private static HiveIntervalDayTime ZERO_INTERVALDAYTIME= new HiveIntervalDayTime(0, 0); - - // NOTE: The null array is indexed by keyIndex, which is not available internally. The mapping - // from a long, double, etc index to key index is kept once in the separate - // VectorColumnSetInfo object. - private boolean[] isNull; - - private int hashcode; - - private HashContext hashCtx; - - private VectorHashKeyWrapper(HashContext ctx, int longValuesCount, int doubleValuesCount, - int byteValuesCount, int decimalValuesCount, int timestampValuesCount, - int intervalDayTimeValuesCount, - int keyCount) { - hashCtx = ctx; - longValues = longValuesCount > 0 ? new long[longValuesCount] : EMPTY_LONG_ARRAY; - doubleValues = doubleValuesCount > 0 ? new double[doubleValuesCount] : EMPTY_DOUBLE_ARRAY; - decimalValues = decimalValuesCount > 0 ? new HiveDecimalWritable[decimalValuesCount] : EMPTY_DECIMAL_ARRAY; - timestampValues = timestampValuesCount > 0 ? new Timestamp[timestampValuesCount] : EMPTY_TIMESTAMP_ARRAY; - intervalDayTimeValues = intervalDayTimeValuesCount > 0 ? new HiveIntervalDayTime[intervalDayTimeValuesCount] : EMPTY_INTERVAL_DAY_TIME_ARRAY; - for(int i = 0; i < decimalValuesCount; ++i) { - decimalValues[i] = new HiveDecimalWritable(HiveDecimal.ZERO); - } - if (byteValuesCount > 0) { - byteValues = new byte[byteValuesCount][]; - byteStarts = new int[byteValuesCount]; - byteLengths = new int[byteValuesCount]; - } else { - byteValues = EMPTY_BYTES_ARRAY; - byteStarts = EMPTY_INT_ARRAY; - byteLengths = EMPTY_INT_ARRAY; - } - for(int i = 0; i < timestampValuesCount; ++i) { - timestampValues[i] = new Timestamp(0); - } - for(int i = 0; i < intervalDayTimeValuesCount; ++i) { - intervalDayTimeValues[i] = new HiveIntervalDayTime(); - } - isNull = new boolean[keyCount]; - hashcode = 0; - } - - private VectorHashKeyWrapper() { - } - - public static VectorHashKeyWrapper allocate(HashContext ctx, int longValuesCount, int doubleValuesCount, - int byteValuesCount, int decimalValuesCount, int timestampValuesCount, - int intervalDayTimeValuesCount, int keyCount) { - if ((longValuesCount + doubleValuesCount + byteValuesCount + decimalValuesCount - + timestampValuesCount + intervalDayTimeValuesCount) == 0) { - return EMPTY_KEY_WRAPPER; - } - return new VectorHashKeyWrapper(ctx, longValuesCount, doubleValuesCount, byteValuesCount, - decimalValuesCount, timestampValuesCount, intervalDayTimeValuesCount, - keyCount); - } - - @Override - public void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException { - throw new HiveException("Should not be called"); - } - - @Override - public void setHashKey() { - // compute locally and assign - int hash = Arrays.hashCode(longValues) ^ - Arrays.hashCode(doubleValues) ^ - Arrays.hashCode(isNull); - - for (int i = 0; i < decimalValues.length; i++) { - // Use the new faster hash code since we are hashing memory objects. - hash ^= decimalValues[i].newFasterHashCode(); - } - - for (int i = 0; i < timestampValues.length; i++) { - hash ^= timestampValues[i].hashCode(); - } - - for (int i = 0; i < intervalDayTimeValues.length; i++) { - hash ^= intervalDayTimeValues[i].hashCode(); - } - - // This code, with branches and all, is not executed if there are no string keys - Murmur3.IncrementalHash32 bytesHash = null; - for (int i = 0; i < byteValues.length; ++i) { - /* - * Hashing the string is potentially expensive so is better to branch. - * Additionally not looking at values for nulls allows us not reset the values. - */ - if (byteLengths[i] == -1) { - continue; - } - if (bytesHash == null) { - bytesHash = HashContext.getBytesHash(hashCtx); - bytesHash.start(hash); - } - bytesHash.add(byteValues[i], byteStarts[i], byteLengths[i]); - } - if (bytesHash != null) { - hash = bytesHash.end(); - } - this.hashcode = hash; - } - - @Override - public int hashCode() { - return hashcode; - } - - @Override - public boolean equals(Object that) { - if (that instanceof VectorHashKeyWrapper) { - VectorHashKeyWrapper keyThat = (VectorHashKeyWrapper)that; - // not comparing hashCtx - irrelevant - return hashcode == keyThat.hashcode && - Arrays.equals(longValues, keyThat.longValues) && - Arrays.equals(doubleValues, keyThat.doubleValues) && - Arrays.equals(decimalValues, keyThat.decimalValues) && - Arrays.equals(timestampValues, keyThat.timestampValues) && - Arrays.equals(intervalDayTimeValues, keyThat.intervalDayTimeValues) && - Arrays.equals(isNull, keyThat.isNull) && - byteValues.length == keyThat.byteValues.length && - (0 == byteValues.length || bytesEquals(keyThat)); - } - return false; - } - - private boolean bytesEquals(VectorHashKeyWrapper keyThat) { - //By the time we enter here the byteValues.lentgh and isNull must have already been compared - for (int i = 0; i < byteValues.length; ++i) { - // the byte comparison is potentially expensive so is better to branch on null - if (byteLengths[i] != -1) { - if (!StringExpr.equal( - byteValues[i], - byteStarts[i], - byteLengths[i], - keyThat.byteValues[i], - keyThat.byteStarts[i], - keyThat.byteLengths[i])) { - return false; - } - } - } - return true; - } - - @Override - protected Object clone() { - VectorHashKeyWrapper clone = new VectorHashKeyWrapper(); - duplicateTo(clone); - return clone; - } - - public void duplicateTo(VectorHashKeyWrapper clone) { - clone.hashCtx = hashCtx; - clone.longValues = (longValues.length > 0) ? longValues.clone() : EMPTY_LONG_ARRAY; - clone.doubleValues = (doubleValues.length > 0) ? doubleValues.clone() : EMPTY_DOUBLE_ARRAY; - clone.isNull = isNull.clone(); - - if (decimalValues.length > 0) { - // Decimal columns use HiveDecimalWritable. - clone.decimalValues = new HiveDecimalWritable[decimalValues.length]; - for(int i = 0; i < decimalValues.length; ++i) { - clone.decimalValues[i] = new HiveDecimalWritable(decimalValues[i]); - } - } else { - clone.decimalValues = EMPTY_DECIMAL_ARRAY; - } - - if (byteLengths.length > 0) { - clone.byteValues = new byte[byteValues.length][]; - clone.byteStarts = new int[byteValues.length]; - clone.byteLengths = byteLengths.clone(); - for (int i = 0; i < byteValues.length; ++i) { - // avoid allocation/copy of nulls, because it potentially expensive. - // branch instead. - if (byteLengths[i] != -1) { - clone.byteValues[i] = Arrays.copyOfRange(byteValues[i], - byteStarts[i], byteStarts[i] + byteLengths[i]); - } - } - } else { - clone.byteValues = EMPTY_BYTES_ARRAY; - clone.byteStarts = EMPTY_INT_ARRAY; - clone.byteLengths = EMPTY_INT_ARRAY; - } - if (timestampValues.length > 0) { - clone.timestampValues = new Timestamp[timestampValues.length]; - for(int i = 0; i < timestampValues.length; ++i) { - clone.timestampValues[i] = (Timestamp) timestampValues[i].clone(); - } - } else { - clone.timestampValues = EMPTY_TIMESTAMP_ARRAY; - } - if (intervalDayTimeValues.length > 0) { - clone.intervalDayTimeValues = new HiveIntervalDayTime[intervalDayTimeValues.length]; - for(int i = 0; i < intervalDayTimeValues.length; ++i) { - clone.intervalDayTimeValues[i] = (HiveIntervalDayTime) intervalDayTimeValues[i].clone(); - } - } else { - clone.intervalDayTimeValues = EMPTY_INTERVAL_DAY_TIME_ARRAY; - } - - clone.hashcode = hashcode; - assert clone.equals(this); - } - - @Override - public KeyWrapper copyKey() { - return (KeyWrapper) clone(); - } - - @Override - public void copyKey(KeyWrapper oldWrapper) { - throw new UnsupportedOperationException(); - } - - @Override - public Object[] getKeyArray() { - throw new UnsupportedOperationException(); - } - - public void assignLong(int keyIndex, int index, long v) { - isNull[keyIndex] = false; - longValues[index] = v; - } - - // FIXME: isNull is not updated; which might cause problems - @Deprecated - public void assignLong(int index, long v) { - longValues[index] = v; - } - - public void assignNullLong(int keyIndex, int index) { - isNull[keyIndex] = true; - longValues[index] = 0; // assign 0 to simplify hashcode - } - - public void assignDouble(int index, double d) { - doubleValues[index] = d; - } - - public void assignNullDouble(int keyIndex, int index) { - isNull[keyIndex] = true; - doubleValues[index] = 0; // assign 0 to simplify hashcode - } - - public void assignString(int index, byte[] bytes, int start, int length) { - Preconditions.checkState(bytes != null); - byteValues[index] = bytes; - byteStarts[index] = start; - byteLengths[index] = length; - } - - public void assignNullString(int keyIndex, int index) { - isNull[keyIndex] = true; - byteValues[index] = null; - byteStarts[index] = 0; - // We need some value that indicates NULL. - byteLengths[index] = -1; - } - - public void assignDecimal(int index, HiveDecimalWritable value) { - decimalValues[index].set(value); - } - - public void assignNullDecimal(int keyIndex, int index) { - isNull[keyIndex] = true; - decimalValues[index].set(HiveDecimal.ZERO); // assign 0 to simplify hashcode - } - - public void assignTimestamp(int index, Timestamp value) { - // Do not assign the input value object to the timestampValues array element. - // Always copy value using set* methods. - timestampValues[index].setTime(value.getTime()); - timestampValues[index].setNanos(value.getNanos()); - } - - public void assignTimestamp(int index, TimestampColumnVector colVector, int elementNum) { - colVector.timestampUpdate(timestampValues[index], elementNum); - } - - public void assignNullTimestamp(int keyIndex, int index) { - isNull[keyIndex] = true; - // assign 0 to simplify hashcode - timestampValues[index].setTime(ZERO_TIMESTAMP.getTime()); - timestampValues[index].setNanos(ZERO_TIMESTAMP.getNanos()); - } - - public void assignIntervalDayTime(int index, HiveIntervalDayTime value) { - intervalDayTimeValues[index].set(value); - } - - public void assignIntervalDayTime(int index, IntervalDayTimeColumnVector colVector, int elementNum) { - intervalDayTimeValues[index].set(colVector.asScratchIntervalDayTime(elementNum)); - } - - public void assignNullIntervalDayTime(int keyIndex, int index) { - isNull[keyIndex] = true; - intervalDayTimeValues[index].set(ZERO_INTERVALDAYTIME); // assign 0 to simplify hashcode - } - - /* - * This method is mainly intended for debug display purposes. - */ - public String stringifyKeys(VectorColumnSetInfo columnSetInfo) - { - StringBuilder sb = new StringBuilder(); - boolean isFirstKey = true; - - if (longValues.length > 0) { - isFirstKey = false; - sb.append("longs "); - boolean isFirstValue = true; - for (int i = 0; i < columnSetInfo.longIndices.length; i++) { - if (isFirstValue) { - isFirstValue = false; - } else { - sb.append(", "); - } - int keyIndex = columnSetInfo.longIndices[i]; - if (isNull[keyIndex]) { - sb.append("null"); - } else { - sb.append(longValues[i]); - PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) columnSetInfo.typeInfos[keyIndex]; - // FUTURE: Add INTERVAL_YEAR_MONTH, etc, as desired. - switch (primitiveTypeInfo.getPrimitiveCategory()) { - case DATE: - { - Date dt = new Date(0); - dt.setTime(DateWritableV2.daysToMillis((int) longValues[i])); - sb.append(" date "); - sb.append(dt.toString()); - } - break; - default: - // Add nothing more. - break; - } - } - } - } - if (doubleValues.length > 0) { - if (isFirstKey) { - isFirstKey = false; - } else { - sb.append(", "); - } - sb.append("doubles "); - boolean isFirstValue = true; - for (int i = 0; i < columnSetInfo.doubleIndices.length; i++) { - if (isFirstValue) { - isFirstValue = false; - } else { - sb.append(", "); - } - int keyIndex = columnSetInfo.doubleIndices[i]; - if (isNull[keyIndex]) { - sb.append("null"); - } else { - sb.append(doubleValues[i]); - } - } - } - if (byteValues.length > 0) { - if (isFirstKey) { - isFirstKey = false; - } else { - sb.append(", "); - } - sb.append("byte lengths "); - boolean isFirstValue = true; - for (int i = 0; i < columnSetInfo.stringIndices.length; i++) { - if (isFirstValue) { - isFirstValue = false; - } else { - sb.append(", "); - } - int keyIndex = columnSetInfo.stringIndices[i]; - if (isNull[keyIndex]) { - sb.append("null"); - } else { - sb.append(byteLengths[i]); - } - } - } - if (decimalValues.length > 0) { - if (isFirstKey) { - isFirstKey = true; - } else { - sb.append(", "); - } - sb.append("decimals "); - boolean isFirstValue = true; - for (int i = 0; i < columnSetInfo.decimalIndices.length; i++) { - if (isFirstValue) { - isFirstValue = false; - } else { - sb.append(", "); - } - int keyIndex = columnSetInfo.decimalIndices[i]; - if (isNull[keyIndex]) { - sb.append("null"); - } else { - sb.append(decimalValues[i]); - } - } - } - if (timestampValues.length > 0) { - if (isFirstKey) { - isFirstKey = false; - } else { - sb.append(", "); - } - sb.append("timestamps "); - boolean isFirstValue = true; - for (int i = 0; i < columnSetInfo.timestampIndices.length; i++) { - if (isFirstValue) { - isFirstValue = false; - } else { - sb.append(", "); - } - int keyIndex = columnSetInfo.timestampIndices[i]; - if (isNull[keyIndex]) { - sb.append("null"); - } else { - sb.append(timestampValues[i]); - } - } - } - if (intervalDayTimeValues.length > 0) { - if (isFirstKey) { - isFirstKey = false; - } else { - sb.append(", "); - } - sb.append("interval day times "); - boolean isFirstValue = true; - for (int i = 0; i < columnSetInfo.intervalDayTimeIndices.length; i++) { - if (isFirstValue) { - isFirstValue = false; - } else { - sb.append(", "); - } - int keyIndex = columnSetInfo.intervalDayTimeIndices[i]; - if (isNull[keyIndex]) { - sb.append("null"); - } else { - sb.append(intervalDayTimeValues[i]); - } - } - } - - return sb.toString(); - } - - @Override - public String toString() - { - StringBuilder sb = new StringBuilder(); - boolean isFirst = true; - if (longValues.length > 0) { - isFirst = false; - sb.append("longs "); - sb.append(Arrays.toString(longValues)); - } - if (doubleValues.length > 0) { - if (isFirst) { - isFirst = false; - } else { - sb.append(", "); - } - sb.append("doubles "); - sb.append(Arrays.toString(doubleValues)); - } - if (byteValues.length > 0) { - if (isFirst) { - isFirst = false; - } else { - sb.append(", "); - } - sb.append("byte lengths "); - sb.append(Arrays.toString(byteLengths)); - } - if (decimalValues.length > 0) { - if (isFirst) { - isFirst = false; - } else { - sb.append(", "); - } - sb.append("decimals "); - sb.append(Arrays.toString(decimalValues)); - } - if (timestampValues.length > 0) { - if (isFirst) { - isFirst = false; - } else { - sb.append(", "); - } - sb.append("timestamps "); - sb.append(Arrays.toString(timestampValues)); - } - if (intervalDayTimeValues.length > 0) { - if (isFirst) { - isFirst = false; - } else { - sb.append(", "); - } - sb.append("interval day times "); - sb.append(Arrays.toString(intervalDayTimeValues)); - } - - if (isFirst) { - isFirst = false; - } else { - sb.append(", "); - } - sb.append("nulls "); - sb.append(Arrays.toString(isNull)); - - return sb.toString(); - } - - public long getLongValue(int i) { - return longValues[i]; - } - - public double getDoubleValue(int i) { - return doubleValues[i]; - } - - public byte[] getBytes(int i) { - return byteValues[i]; - } - - public int getByteStart(int i) { - return byteStarts[i]; - } - - public int getByteLength(int i) { - return byteLengths[i]; - } - - public int getVariableSize() { - int variableSize = 0; - for (int i=0; i - *
  • Evaluates each key vector expression.
  • - *
  • Copies out each key's primitive values into the key wrappers
  • - *
  • computes the hashcode of the key wrappers
  • - * - * @param batch - * @throws HiveException - */ - public void evaluateBatch(VectorizedRowBatch batch) throws HiveException { - - if (keyCount == 0) { - // all keywrappers must be EmptyVectorHashKeyWrapper - return; - } - - for(int i=0;i evaluate(VectorHashKeyWrapper kw) throws HiveException; + List evaluate(VectorHashKeyWrapperBase kw) throws HiveException; } /** Kryo ctor. */ @@ -193,7 +195,7 @@ public SMBJoinKeyEvaluator init() { } @Override - public List evaluate(VectorHashKeyWrapper kw) throws HiveException { + public List evaluate(VectorHashKeyWrapperBase kw) throws HiveException { for(int i = 0; i < keyExpressions.length; ++i) { key.set(i, keyWrapperBatch.getWritableKeyValue(kw, i, keyOutputWriters[i])); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBase.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBase.java new file mode 100644 index 0000000..3150b90 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBase.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.wrapper; + +import org.apache.hive.common.util.Murmur3; + +import java.sql.Timestamp; + +import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; +import org.apache.hadoop.hive.ql.exec.KeyWrapper; +import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSetInfo; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * A hash map key wrapper for vectorized processing. + * It stores the key values as primitives in arrays for each supported primitive type. + * This works in conjunction with + * {@link org.apache.hadoop.hive.ql.exec.VectorHashKeyWrapperBatch VectorHashKeyWrapperBatch} + * to hash vectorized processing units (batches). + */ +public abstract class VectorHashKeyWrapperBase extends KeyWrapper { + + public static final class HashContext { + private final Murmur3.IncrementalHash32 bytesHash = new Murmur3.IncrementalHash32(); + + public static Murmur3.IncrementalHash32 getBytesHash(HashContext ctx) { + if (ctx == null) { + return new Murmur3.IncrementalHash32(); + } + return ctx.bytesHash; + } + } + + protected final int keyCount; + protected int hashcode; + + protected VectorHashKeyWrapperBase(int keyCount) { + this.keyCount = keyCount; + hashcode = 0; + } + + @Override + public void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException { + throw new HiveException("Should not be called"); + } + + @Override + public void setHashKey() { + throw new RuntimeException("Not implemented"); + } + + @Override + public int hashCode() { + return hashcode; + } + + @Override + public boolean equals(Object that) { + throw new RuntimeException("Not implemented"); + } + + @Override + protected Object clone() { + throw new RuntimeException("Not implemented"); + } + + @Override + public KeyWrapper copyKey() { + return (KeyWrapper) clone(); + } + + @Override + public void copyKey(KeyWrapper oldWrapper) { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] getKeyArray() { + throw new UnsupportedOperationException(); + } + + public void assignLong(int keyIndex, int index, long v) { + throw new RuntimeException("Not implemented"); + } + + // FIXME: isNull is not updated; which might cause problems + @Deprecated + public void assignLong(int index, long v) { + throw new RuntimeException("Not implemented"); + } + + public void assignNullLong(int keyIndex, int index) { + throw new RuntimeException("Not implemented"); + } + + public void assignDouble(int index, double d) { + throw new RuntimeException("Not implemented"); + } + + public void assignNullDouble(int keyIndex, int index) { + throw new RuntimeException("Not implemented"); + } + + public void assignString(int index, byte[] bytes, int start, int length) { + throw new RuntimeException("Not implemented"); + } + + public void assignNullString(int keyIndex, int index) { + throw new RuntimeException("Not implemented"); + } + + public void assignDecimal(int index, HiveDecimalWritable value) { + throw new RuntimeException("Not implemented"); + } + + public void assignNullDecimal(int keyIndex, int index) { + throw new RuntimeException("Not implemented"); + } + + public void assignTimestamp(int index, Timestamp value) { + throw new RuntimeException("Not implemented"); + } + + public void assignTimestamp(int index, TimestampColumnVector colVector, int elementNum) { + throw new RuntimeException("Not implemented"); + } + + public void assignNullTimestamp(int keyIndex, int index) { + throw new RuntimeException("Not implemented"); + } + + public void assignIntervalDayTime(int index, HiveIntervalDayTime value) { + throw new RuntimeException("Not implemented"); + } + + public void assignIntervalDayTime(int index, IntervalDayTimeColumnVector colVector, int elementNum) { + throw new RuntimeException("Not implemented"); + } + + public void assignNullIntervalDayTime(int keyIndex, int index) { + throw new RuntimeException("Not implemented"); + } + + /* + * This method is mainly intended for debug display purposes. + */ + public String stringifyKeys(VectorColumnSetInfo columnSetInfo) + { + throw new RuntimeException("Not implemented"); + } + + @Override + public String toString() + { + throw new RuntimeException("Not implemented"); + } + + public long getLongValue(int i) { + throw new RuntimeException("Not implemented"); + } + + public double getDoubleValue(int i) { + throw new RuntimeException("Not implemented"); + } + + public byte[] getBytes(int i) { + throw new RuntimeException("Not implemented"); + } + + public int getByteStart(int i) { + throw new RuntimeException("Not implemented"); + } + + public int getByteLength(int i) { + throw new RuntimeException("Not implemented"); + } + + public HiveDecimalWritable getDecimal(int i) { + throw new RuntimeException("Not implemented"); + } + + public Timestamp getTimestamp(int i) { + throw new RuntimeException("Not implemented"); + } + + public HiveIntervalDayTime getIntervalDayTime(int i) { + throw new RuntimeException("Not implemented"); + } + + public int getVariableSize() { + throw new RuntimeException("Not implemented"); + } + + public void clearIsNull() { + throw new RuntimeException("Not implemented"); + } + + public void setNull() { + throw new RuntimeException("Not implemented"); + } + + public boolean isNull(int keyIndex) { + throw new RuntimeException("Not implemented"); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBatch.java new file mode 100644 index 0000000..26e5734 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperBatch.java @@ -0,0 +1,1078 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.wrapper; + +import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSetInfo; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +/** + * Class for handling vectorized hash map key wrappers. It evaluates the key columns in a + * row batch in a vectorized fashion. + * This class stores additional information about keys needed to evaluate and output the key values. + * + */ +public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo { + + public VectorHashKeyWrapperBatch(int keyCount) { + super(keyCount); + } + + /** + * Number of object references in 'this' (for size computation) + */ + private static final int MODEL_REFERENCES_COUNT = 7; + + /** + * The key expressions that require evaluation and output the primitive values for each key. + */ + private VectorExpression[] keyExpressions; + + /** + * Pre-allocated batch size vector of keys wrappers. + * N.B. these keys are **mutable** and should never be used in a HashMap. + * Always clone the key wrapper to obtain an immutable keywrapper suitable + * to use a key in a HashMap. + */ + private VectorHashKeyWrapperBase[] vectorHashKeyWrappers; + + /** + * The fixed size of the key wrappers. + */ + private int keysFixedSize; + + /** + * Shared hashcontext for all keys in this batch + */ + private final VectorHashKeyWrapperBase.HashContext hashCtx = new VectorHashKeyWrapperBase.HashContext(); + + /** + * Returns the compiled fixed size for the key wrappers. + * @return + */ + public int getKeysFixedSize() { + return keysFixedSize; + } + + /** + * Accessor for the batch-sized array of key wrappers. + */ + public VectorHashKeyWrapperBase[] getVectorHashKeyWrappers() { + return vectorHashKeyWrappers; + } + + /** + * Processes a batch: + *
      + *
    • Evaluates each key vector expression.
    • + *
    • Copies out each key's primitive values into the key wrappers
    • + *
    • computes the hashcode of the key wrappers
    • + *
    + * @param batch + * @throws HiveException + */ + public void evaluateBatch(VectorizedRowBatch batch) throws HiveException { + + if (keyCount == 0) { + // all keywrappers must be EmptyVectorHashKeyWrapper + return; + } + + for(int i=0;i 0 ? new long[longValuesCount] : EMPTY_LONG_ARRAY; + doubleValues = doubleValuesCount > 0 ? new double[doubleValuesCount] : EMPTY_DOUBLE_ARRAY; + decimalValues = decimalValuesCount > 0 ? new HiveDecimalWritable[decimalValuesCount] : EMPTY_DECIMAL_ARRAY; + timestampValues = timestampValuesCount > 0 ? new Timestamp[timestampValuesCount] : EMPTY_TIMESTAMP_ARRAY; + intervalDayTimeValues = intervalDayTimeValuesCount > 0 ? new HiveIntervalDayTime[intervalDayTimeValuesCount] : EMPTY_INTERVAL_DAY_TIME_ARRAY; + for(int i = 0; i < decimalValuesCount; ++i) { + decimalValues[i] = new HiveDecimalWritable(HiveDecimal.ZERO); + } + if (byteValuesCount > 0) { + byteValues = new byte[byteValuesCount][]; + byteStarts = new int[byteValuesCount]; + byteLengths = new int[byteValuesCount]; + } else { + byteValues = EMPTY_BYTES_ARRAY; + byteStarts = EMPTY_INT_ARRAY; + byteLengths = EMPTY_INT_ARRAY; + } + for(int i = 0; i < timestampValuesCount; ++i) { + timestampValues[i] = new Timestamp(0); + } + for(int i = 0; i < intervalDayTimeValuesCount; ++i) { + intervalDayTimeValues[i] = new HiveIntervalDayTime(); + } + isNull = new boolean[keyCount]; + } + + private VectorHashKeyWrapperGeneral(int keyCount) { + super(keyCount); + } + + @Override + public void setHashKey() { + // compute locally and assign + int hash = Arrays.hashCode(longValues) ^ + Arrays.hashCode(doubleValues) ^ + Arrays.hashCode(isNull); + + for (int i = 0; i < decimalValues.length; i++) { + // Use the new faster hash code since we are hashing memory objects. + hash ^= decimalValues[i].newFasterHashCode(); + } + + for (int i = 0; i < timestampValues.length; i++) { + hash ^= timestampValues[i].hashCode(); + } + + for (int i = 0; i < intervalDayTimeValues.length; i++) { + hash ^= intervalDayTimeValues[i].hashCode(); + } + + // This code, with branches and all, is not executed if there are no string keys + Murmur3.IncrementalHash32 bytesHash = null; + for (int i = 0; i < byteValues.length; ++i) { + /* + * Hashing the string is potentially expensive so is better to branch. + * Additionally not looking at values for nulls allows us not reset the values. + */ + if (byteLengths[i] == -1) { + continue; + } + if (bytesHash == null) { + bytesHash = HashContext.getBytesHash(hashCtx); + bytesHash.start(hash); + } + bytesHash.add(byteValues[i], byteStarts[i], byteLengths[i]); + } + if (bytesHash != null) { + hash = bytesHash.end(); + } + this.hashcode = hash; + } + + @Override + public int hashCode() { + return hashcode; + } + + @Override + public boolean equals(Object that) { + if (that instanceof VectorHashKeyWrapperGeneral) { + VectorHashKeyWrapperGeneral keyThat = (VectorHashKeyWrapperGeneral)that; + // not comparing hashCtx - irrelevant + return hashcode == keyThat.hashcode && + Arrays.equals(longValues, keyThat.longValues) && + Arrays.equals(doubleValues, keyThat.doubleValues) && + Arrays.equals(decimalValues, keyThat.decimalValues) && + Arrays.equals(timestampValues, keyThat.timestampValues) && + Arrays.equals(intervalDayTimeValues, keyThat.intervalDayTimeValues) && + Arrays.equals(isNull, keyThat.isNull) && + byteValues.length == keyThat.byteValues.length && + (0 == byteValues.length || bytesEquals(keyThat)); + } + return false; + } + + private boolean bytesEquals(VectorHashKeyWrapperGeneral keyThat) { + //By the time we enter here the byteValues.lentgh and isNull must have already been compared + for (int i = 0; i < byteValues.length; ++i) { + // the byte comparison is potentially expensive so is better to branch on null + if (byteLengths[i] != -1) { + if (!StringExpr.equal( + byteValues[i], + byteStarts[i], + byteLengths[i], + keyThat.byteValues[i], + keyThat.byteStarts[i], + keyThat.byteLengths[i])) { + return false; + } + } + } + return true; + } + + @Override + protected Object clone() { + VectorHashKeyWrapperGeneral clone = + new VectorHashKeyWrapperGeneral(keyCount); + duplicateTo(clone); + return clone; + } + + private void duplicateTo(VectorHashKeyWrapperGeneral clone) { + clone.hashCtx = hashCtx; + clone.longValues = (longValues.length > 0) ? longValues.clone() : EMPTY_LONG_ARRAY; + clone.doubleValues = (doubleValues.length > 0) ? doubleValues.clone() : EMPTY_DOUBLE_ARRAY; + clone.isNull = isNull.clone(); + + if (decimalValues.length > 0) { + // Decimal columns use HiveDecimalWritable. + clone.decimalValues = new HiveDecimalWritable[decimalValues.length]; + for(int i = 0; i < decimalValues.length; ++i) { + clone.decimalValues[i] = new HiveDecimalWritable(decimalValues[i]); + } + } else { + clone.decimalValues = EMPTY_DECIMAL_ARRAY; + } + + if (byteLengths.length > 0) { + clone.byteValues = new byte[byteValues.length][]; + clone.byteStarts = new int[byteValues.length]; + clone.byteLengths = byteLengths.clone(); + for (int i = 0; i < byteValues.length; ++i) { + // avoid allocation/copy of nulls, because it potentially expensive. + // branch instead. + if (byteLengths[i] != -1) { + clone.byteValues[i] = Arrays.copyOfRange(byteValues[i], + byteStarts[i], byteStarts[i] + byteLengths[i]); + } + } + } else { + clone.byteValues = EMPTY_BYTES_ARRAY; + clone.byteStarts = EMPTY_INT_ARRAY; + clone.byteLengths = EMPTY_INT_ARRAY; + } + if (timestampValues.length > 0) { + clone.timestampValues = new Timestamp[timestampValues.length]; + for(int i = 0; i < timestampValues.length; ++i) { + clone.timestampValues[i] = (Timestamp) timestampValues[i].clone(); + } + } else { + clone.timestampValues = EMPTY_TIMESTAMP_ARRAY; + } + if (intervalDayTimeValues.length > 0) { + clone.intervalDayTimeValues = new HiveIntervalDayTime[intervalDayTimeValues.length]; + for(int i = 0; i < intervalDayTimeValues.length; ++i) { + clone.intervalDayTimeValues[i] = (HiveIntervalDayTime) intervalDayTimeValues[i].clone(); + } + } else { + clone.intervalDayTimeValues = EMPTY_INTERVAL_DAY_TIME_ARRAY; + } + + clone.hashcode = hashcode; + assert clone.equals(this); + } + + @Override + public void assignLong(int keyIndex, int index, long v) { + isNull[keyIndex] = false; + longValues[index] = v; + } + + // FIXME: isNull is not updated; which might cause problems + @Deprecated + @Override + public void assignLong(int index, long v) { + longValues[index] = v; + } + + @Override + public void assignNullLong(int keyIndex, int index) { + isNull[keyIndex] = true; + longValues[index] = 0; // assign 0 to simplify hashcode + } + + @Override + public void assignDouble(int index, double d) { + doubleValues[index] = d; + } + + @Override + public void assignNullDouble(int keyIndex, int index) { + isNull[keyIndex] = true; + doubleValues[index] = 0; // assign 0 to simplify hashcode + } + + @Override + public void assignString(int index, byte[] bytes, int start, int length) { + Preconditions.checkState(bytes != null); + byteValues[index] = bytes; + byteStarts[index] = start; + byteLengths[index] = length; + } + + @Override + public void assignNullString(int keyIndex, int index) { + isNull[keyIndex] = true; + byteValues[index] = null; + byteStarts[index] = 0; + // We need some value that indicates NULL. + byteLengths[index] = -1; + } + + @Override + public void assignDecimal(int index, HiveDecimalWritable value) { + decimalValues[index].set(value); + } + + @Override + public void assignNullDecimal(int keyIndex, int index) { + isNull[keyIndex] = true; + decimalValues[index].set(HiveDecimal.ZERO); // assign 0 to simplify hashcode + } + + @Override + public void assignTimestamp(int index, Timestamp value) { + // Do not assign the input value object to the timestampValues array element. + // Always copy value using set* methods. + timestampValues[index].setTime(value.getTime()); + timestampValues[index].setNanos(value.getNanos()); + } + + @Override + public void assignTimestamp(int index, TimestampColumnVector colVector, int elementNum) { + colVector.timestampUpdate(timestampValues[index], elementNum); + } + + @Override + public void assignNullTimestamp(int keyIndex, int index) { + isNull[keyIndex] = true; + // assign 0 to simplify hashcode + timestampValues[index].setTime(ZERO_TIMESTAMP.getTime()); + timestampValues[index].setNanos(ZERO_TIMESTAMP.getNanos()); + } + + @Override + public void assignIntervalDayTime(int index, HiveIntervalDayTime value) { + intervalDayTimeValues[index].set(value); + } + + @Override + public void assignIntervalDayTime(int index, IntervalDayTimeColumnVector colVector, int elementNum) { + intervalDayTimeValues[index].set(colVector.asScratchIntervalDayTime(elementNum)); + } + + @Override + public void assignNullIntervalDayTime(int keyIndex, int index) { + isNull[keyIndex] = true; + intervalDayTimeValues[index].set(ZERO_INTERVALDAYTIME); // assign 0 to simplify hashcode + } + + /* + * This method is mainly intended for debug display purposes. + */ + @Override + public String stringifyKeys(VectorColumnSetInfo columnSetInfo) + { + StringBuilder sb = new StringBuilder(); + boolean isFirstKey = true; + + if (longValues.length > 0) { + isFirstKey = false; + sb.append("longs "); + boolean isFirstValue = true; + for (int i = 0; i < columnSetInfo.longIndices.length; i++) { + if (isFirstValue) { + isFirstValue = false; + } else { + sb.append(", "); + } + int keyIndex = columnSetInfo.longIndices[i]; + if (isNull[keyIndex]) { + sb.append("null"); + } else { + sb.append(longValues[i]); + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) columnSetInfo.typeInfos[keyIndex]; + // FUTURE: Add INTERVAL_YEAR_MONTH, etc, as desired. + switch (primitiveTypeInfo.getPrimitiveCategory()) { + case DATE: + { + Date dt = new Date(0); + dt.setTime(DateWritableV2.daysToMillis((int) longValues[i])); + sb.append(" date "); + sb.append(dt.toString()); + } + break; + default: + // Add nothing more. + break; + } + } + } + } + if (doubleValues.length > 0) { + if (isFirstKey) { + isFirstKey = false; + } else { + sb.append(", "); + } + sb.append("doubles "); + boolean isFirstValue = true; + for (int i = 0; i < columnSetInfo.doubleIndices.length; i++) { + if (isFirstValue) { + isFirstValue = false; + } else { + sb.append(", "); + } + int keyIndex = columnSetInfo.doubleIndices[i]; + if (isNull[keyIndex]) { + sb.append("null"); + } else { + sb.append(doubleValues[i]); + } + } + } + if (byteValues.length > 0) { + if (isFirstKey) { + isFirstKey = false; + } else { + sb.append(", "); + } + sb.append("byte lengths "); + boolean isFirstValue = true; + for (int i = 0; i < columnSetInfo.stringIndices.length; i++) { + if (isFirstValue) { + isFirstValue = false; + } else { + sb.append(", "); + } + int keyIndex = columnSetInfo.stringIndices[i]; + if (isNull[keyIndex]) { + sb.append("null"); + } else { + sb.append(byteLengths[i]); + } + } + } + if (decimalValues.length > 0) { + if (isFirstKey) { + isFirstKey = true; + } else { + sb.append(", "); + } + sb.append("decimals "); + boolean isFirstValue = true; + for (int i = 0; i < columnSetInfo.decimalIndices.length; i++) { + if (isFirstValue) { + isFirstValue = false; + } else { + sb.append(", "); + } + int keyIndex = columnSetInfo.decimalIndices[i]; + if (isNull[keyIndex]) { + sb.append("null"); + } else { + sb.append(decimalValues[i]); + } + } + } + if (timestampValues.length > 0) { + if (isFirstKey) { + isFirstKey = false; + } else { + sb.append(", "); + } + sb.append("timestamps "); + boolean isFirstValue = true; + for (int i = 0; i < columnSetInfo.timestampIndices.length; i++) { + if (isFirstValue) { + isFirstValue = false; + } else { + sb.append(", "); + } + int keyIndex = columnSetInfo.timestampIndices[i]; + if (isNull[keyIndex]) { + sb.append("null"); + } else { + sb.append(timestampValues[i]); + } + } + } + if (intervalDayTimeValues.length > 0) { + if (isFirstKey) { + isFirstKey = false; + } else { + sb.append(", "); + } + sb.append("interval day times "); + boolean isFirstValue = true; + for (int i = 0; i < columnSetInfo.intervalDayTimeIndices.length; i++) { + if (isFirstValue) { + isFirstValue = false; + } else { + sb.append(", "); + } + int keyIndex = columnSetInfo.intervalDayTimeIndices[i]; + if (isNull[keyIndex]) { + sb.append("null"); + } else { + sb.append(intervalDayTimeValues[i]); + } + } + } + + return sb.toString(); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + boolean isFirst = true; + if (longValues.length > 0) { + isFirst = false; + sb.append("longs "); + sb.append(Arrays.toString(longValues)); + } + if (doubleValues.length > 0) { + if (isFirst) { + isFirst = false; + } else { + sb.append(", "); + } + sb.append("doubles "); + sb.append(Arrays.toString(doubleValues)); + } + if (byteValues.length > 0) { + if (isFirst) { + isFirst = false; + } else { + sb.append(", "); + } + sb.append("byte lengths "); + sb.append(Arrays.toString(byteLengths)); + } + if (decimalValues.length > 0) { + if (isFirst) { + isFirst = false; + } else { + sb.append(", "); + } + sb.append("decimals "); + sb.append(Arrays.toString(decimalValues)); + } + if (timestampValues.length > 0) { + if (isFirst) { + isFirst = false; + } else { + sb.append(", "); + } + sb.append("timestamps "); + sb.append(Arrays.toString(timestampValues)); + } + if (intervalDayTimeValues.length > 0) { + if (isFirst) { + isFirst = false; + } else { + sb.append(", "); + } + sb.append("interval day times "); + sb.append(Arrays.toString(intervalDayTimeValues)); + } + + if (isFirst) { + isFirst = false; + } else { + sb.append(", "); + } + sb.append("nulls "); + sb.append(Arrays.toString(isNull)); + + return sb.toString(); + } + + @Override + public long getLongValue(int i) { + return longValues[i]; + } + + @Override + public double getDoubleValue(int i) { + return doubleValues[i]; + } + + @Override + public byte[] getBytes(int i) { + return byteValues[i]; + } + + @Override + public int getByteStart(int i) { + return byteStarts[i]; + } + + @Override + public int getByteLength(int i) { + return byteLengths[i]; + } + + @Override + public HiveDecimalWritable getDecimal(int i) { + return decimalValues[i]; + } + + @Override + public Timestamp getTimestamp(int i) { + return timestampValues[i]; + } + + @Override + public HiveIntervalDayTime getIntervalDayTime(int i) { + return intervalDayTimeValues[i]; + } + + @Override + public int getVariableSize() { + int variableSize = 0; + for (int i=0; i>> 16 ^ + HashCodeUtil.calculateLongHashCode(longValue1); + } + } + + @Override + public boolean equals(Object that) { + if (that instanceof VectorHashKeyWrapperTwoLong) { + VectorHashKeyWrapperTwoLong keyThat = (VectorHashKeyWrapperTwoLong) that; + return + isNull0 == keyThat.isNull0 && + longValue0 == keyThat.longValue0 && + isNull1 == keyThat.isNull1 && + longValue1 == keyThat.longValue1; + } + return false; + } + + @Override + protected Object clone() { + VectorHashKeyWrapperTwoLong clone = new VectorHashKeyWrapperTwoLong(); + clone.isNull0 = isNull0; + clone.longValue0 = longValue0; + clone.isNull1 = isNull1; + clone.longValue1 = longValue1; + clone.hashcode = hashcode; + return clone; + } + + @Override + public void assignLong(int keyIndex, int index, long v) { + if (keyIndex == 0 && index == 0) { + isNull0 = false; + longValue0 = v; + } else if (keyIndex == 1 && index == 1) { + isNull1 = false; + longValue1 = v; + } else { + throw new ArrayIndexOutOfBoundsException(); + } + } + + // FIXME: isNull is not updated; which might cause problems + @Deprecated + @Override + public void assignLong(int index, long v) { + if (index == 0) { + longValue0 = v; + } else if (index == 1) { + longValue1 = v; + } else { + throw new ArrayIndexOutOfBoundsException(); + } + } + + @Override + public void assignNullLong(int keyIndex, int index) { + if (keyIndex == 0 && index == 0) { + isNull0 = true; + longValue0 = 0; // Assign 0 to make equals simple. + } else if (keyIndex == 1 && index == 1) { + isNull1 = true; + longValue1 = 0; // Assign 0 to make equals simple. + } else { + throw new ArrayIndexOutOfBoundsException(); + } + } + + /* + * This method is mainly intended for debug display purposes. + */ + @Override + public String stringifyKeys(VectorColumnSetInfo columnSetInfo) + { + StringBuilder sb = new StringBuilder(); + sb.append("longs ["); + if (!isNull0) { + sb.append(longValue0); + } else { + sb.append("null"); + } + sb.append(", "); + if (!isNull1) { + sb.append(longValue1); + } else { + sb.append("null"); + } + sb.append("]"); + return sb.toString(); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append("longs ["); + sb.append(longValue0); + sb.append(", "); + sb.append(longValue1); + sb.append("], nulls ["); + sb.append(isNull0); + sb.append(", "); + sb.append(isNull1); + sb.append("]"); + return sb.toString(); + } + + @Override + public long getLongValue(int i) { + if (i == 0) { + return longValue0; + } else if (i == 1) { + return longValue1; + } else { + throw new ArrayIndexOutOfBoundsException(); + } + } + + @Override + public int getVariableSize() { + return 0; + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorHashKeyWrapperBatch.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorHashKeyWrapperBatch.java index b2818ad..da60d2b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorHashKeyWrapperBatch.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorHashKeyWrapperBatch.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.util.FakeVectorRowBatchFromObjectIterables; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBase; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -77,8 +79,8 @@ public void testVectorHashKeyWrapperBatch() throws HiveException { batch.size = 3; vhkwb.evaluateBatch(batch); - VectorHashKeyWrapper[] vhkwArray = vhkwb.getVectorHashKeyWrappers(); - VectorHashKeyWrapper vhk = vhkwArray[0]; + VectorHashKeyWrapperBase[] vhkwArray = vhkwb.getVectorHashKeyWrappers(); + VectorHashKeyWrapperBase vhk = vhkwArray[0]; assertTrue(vhk.isNull(0)); vhk = vhkwArray[1]; assertFalse(vhk.isNull(0));