diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java index 030a73c..630046d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java @@ -18,38 +18,68 @@ package org.apache.hadoop.hive.ql.exec.vector; +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; +import org.apache.hadoop.hive.ql.util.JavaDataModel; + /** - * This maps a batch to the aggregation buffers sets to use for each row (key) + * This maps a batch to the aggregation buffers sets to use for each row (key) * */ public class VectorAggregationBufferBatch { - + /** - * Batch sized array of aggregation buffer sets. + * Batch sized array of aggregation buffer sets. * The array is preallocated and is reused for each batch, but the individual entries * will reference different aggregation buffer set from batch to batch. * the array is not reset between batches, content past this.index will be stale. */ - private VectorAggregationBufferRow[] aggregationBuffers; - + private final VectorAggregationBufferRow[] aggregationBuffers; + /** - * the selection vector that maps row within a batch to the - * specific aggregation buffer set to use. + * Same as aggregationBuffers but only distinct buffers */ - private int[] selection; - + private final VectorAggregationBufferRow[] distinctAggregationBuffers; + /** * versioning number gets incremented on each batch. This allows us to cache the selection - * mapping info in the aggregation buffer set themselves while still being able to + * mapping info in the aggregation buffer set themselves while still being able to * detect stale info. */ private int version; - + /** * Get the number of distinct aggregation buffer sets (ie. keys) used in current batch. */ private int distinctCount; - + + /** + * Memory consumed by a set of aggregation buffers + */ + private int aggregatorsFixedSize; + + /** + * Array of indexes for aggregators that have variable size + */ + private int[] variableSizeAggregators;; + + /** + * returns True if any of the aggregators has a variable size + * @return + */ + public boolean getHasVariableSize() { + return variableSizeAggregators.length > 0; + } + + /** + * Returns the fixed size consumed by the aggregation buffers + * @return + */ + public int getAggregatorsFixedSize() { + return aggregatorsFixedSize; + } + /** * the array of aggregation buffers for the current batch. * content past the {@link #getDistinctBufferSetCount()} index @@ -59,49 +89,77 @@ public VectorAggregationBufferRow[] getAggregationBuffers() { return aggregationBuffers; } - + /** - * number of distinct aggregation buffer sets (ie. keys) in the current batch. + * number of distinct aggregation buffer sets (ie. keys) in the current batch. * @return */ public int getDistinctBufferSetCount () { return distinctCount; } - /** - * gets the selection vector to use for the current batch. This maps the batch rows by position - * (row number) to an index in the {@link #getAggregationBuffers()} array. - * @return - */ - public int[] getSelectionVector() { - return selection; - } - + public VectorAggregationBufferBatch() { aggregationBuffers = new VectorAggregationBufferRow[VectorizedRowBatch.DEFAULT_SIZE]; - selection = new int [VectorizedRowBatch.DEFAULT_SIZE]; + distinctAggregationBuffers = new VectorAggregationBufferRow[VectorizedRowBatch.DEFAULT_SIZE]; } - + /** - * resets the internal aggregation buffers sets index and increments the versioning + * resets the internal aggregation buffers sets index and increments the versioning * used to optimize the selection vector population. */ public void startBatch() { version++; distinctCount = 0; } - + /** * assigns the given aggregation buffer set to a given batch row (by row number). * populates the selection vector appropriately. This is where the versioning numbers - * play a role in determining if the index cached on the aggregation buffer set is stale. + * play a role in determining if the index cached on the aggregation buffer set is stale. */ public void mapAggregationBufferSet(VectorAggregationBufferRow bufferSet, int row) { if (version != bufferSet.getVersion()) { bufferSet.setVersionAndIndex(version, distinctCount); + distinctAggregationBuffers[distinctCount] = bufferSet; ++distinctCount; } aggregationBuffers[row] = bufferSet; } + public void compileAggregationBatchInfo(VectorAggregateExpression[] aggregators) { + JavaDataModel model = JavaDataModel.get(); + int[] variableSizeAggregators = new int[aggregators.length]; + int indexVariableSizes = 0; + + aggregatorsFixedSize = JavaDataModel.alignUp( + model.object() + + model.primitive1()*2 + + model.ref(), + model.memoryAlign()); + + aggregatorsFixedSize += model.lengthForObjectArrayOfSize(aggregators.length); + for(int i=0;i mapKeysAggregationBuffers; @@ -103,15 +157,15 @@ protected void initializeOp(Configuration hconf) throws HiveException { List keysDesc = conf.getKeys(); keyExpressions = vContext.getVectorExpressions(keysDesc); - + keyOutputWriters = new VectorExpressionWriter[keyExpressions.length]; - + for(int i = 0; i < keyExpressions.length; ++i) { keyOutputWriters[i] = VectorExpressionWriterFactory. genVectorExpressionWritable(keysDesc.get(i)); objectInspectors.add(keyOutputWriters[i].getObjectInspector()); } - + ArrayList aggrDesc = conf.getAggregators(); aggregators = new VectorAggregateExpression[aggrDesc.size()]; for (int i = 0; i < aggrDesc.size(); ++i) { @@ -119,9 +173,10 @@ protected void initializeOp(Configuration hconf) throws HiveException { aggregators[i] = vContext.getAggregatorExpression (desc); objectInspectors.add(aggregators[i].getOutputObjectInspector()); } - + keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); aggregationBatchInfo = new VectorAggregationBufferBatch(); + aggregationBatchInfo.compileAggregationBatchInfo(aggregators); mapKeysAggregationBuffers = new HashMap(); List outputFieldNames = conf.getOutputColumnNames(); @@ -133,38 +188,178 @@ protected void initializeOp(Configuration hconf) throws HiveException { } catch (Throwable e) { throw new HiveException(e); } + + computeMemoryLimits(); + initializeChildren(hconf); } + /** + * Computes the memory limits for hash table flush (spill). + */ + private void computeMemoryLimits() { + JavaDataModel model = JavaDataModel.get(); + + fixedHashEntrySize = + model.hashMapEntry() + + keyWrappersBatch.getKeysFixedSize() + + aggregationBatchInfo.getAggregatorsFixedSize(); + + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + memoryThreshold = conf.getMemoryThreshold(); + // Tests may leave this unitialized, so better set it to 1 + if (memoryThreshold == 0.0f) { + memoryThreshold = 1.0f; + } + + maxHashTblMemory = (int)(maxMemory * memoryThreshold); + + LOG.info(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)", + maxHashTblMemory/1024/1024, + maxMemory/1024/1024, + memoryThreshold, + fixedHashEntrySize, + keyWrappersBatch.getKeysFixedSize(), + aggregationBatchInfo.getAggregatorsFixedSize())); + + } + @Override public void processOp(Object row, int tag) throws HiveException { VectorizedRowBatch batch = (VectorizedRowBatch) row; - + // First we traverse the batch to evaluate and prepare the KeyWrappers // After this the KeyWrappers are properly set and hash code is computed keyWrappersBatch.evaluateBatch(batch); // Next we locate the aggregation buffer set for each key prepareBatchAggregationBufferSets(batch); - + // Finally, evaluate the aggregators processAggregators(batch); + + //Flush if memory limits were reached + if (shouldFlush(batch)) { + flush(false); + } + + if (sumBatchSize == 0 && 0 != batch.size) { + // Sample the first batch processed for variable sizes. + updateAvgVariableSize(batch); + } + + sumBatchSize += batch.size; } - + + /** + * Flushes the entries in the hash table by emiting output (forward). + * When parameter 'all' is true all the entries are flushed. + * @param all + * @throws HiveException + */ + private void flush(boolean all) throws HiveException { + + int entriesToFlush = all ? numEntriesHashTable : + (int)(numEntriesHashTable * PERCENT_ENTRIES_TO_FLUSH); + int entriesFlushed = 0; + + LOG.info(String.format("Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb)", + entriesToFlush, all ? "(all)" : "", + numEntriesHashTable, fixedHashEntrySize, avgVariableSize, + numEntriesHashTable * (fixedHashEntrySize + avgVariableSize)/1024/1024, + maxHashTblMemory/1024/1024)); + + Object[] forwardCache = new Object[keyExpressions.length + aggregators.length]; + if (keyExpressions.length == 0 && mapKeysAggregationBuffers.isEmpty()) { + + // if this is a global aggregation (no keys) and empty set, must still emit NULLs + VectorAggregationBufferRow emptyBuffers = allocateAggregationBuffer(); + for (int i = 0; i < aggregators.length; ++i) { + forwardCache[i] = aggregators[i].evaluateOutput(emptyBuffers.getAggregationBuffer(i)); + } + forward(forwardCache, outputObjInspector); + } else { + + /* Iterate the global (keywrapper,aggregationbuffers) map and emit + a row for each key */ + Iterator> iter = + mapKeysAggregationBuffers.entrySet().iterator(); + while(iter.hasNext()) { + Map.Entry pair = iter.next(); + int fi = 0; + for (int i = 0; i < keyExpressions.length; ++i) { + VectorHashKeyWrapper kw = (VectorHashKeyWrapper)pair.getKey(); + forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue ( + kw, i, keyOutputWriters[i]); + } + for (int i = 0; i < aggregators.length; ++i) { + forwardCache[fi++] = aggregators[i].evaluateOutput(pair.getValue() + .getAggregationBuffer(i)); + } + LOG.debug(String.format("forwarding keys: %s: %s", + pair.getKey().toString(), Arrays.toString(forwardCache))); + forward(forwardCache, outputObjInspector); + + if (!all) { + iter.remove(); + --numEntriesHashTable; + if (++entriesFlushed >= entriesToFlush) { + break; + } + } + } + } + + if (all) { + mapKeysAggregationBuffers.clear(); + numEntriesHashTable = 0; + } + } + + /** + * Returns true if the memory threshold for the hash table was reached. + */ + private boolean shouldFlush(VectorizedRowBatch batch) { + if (numEntriesSinceCheck < FLUSH_CHECK_THRESHOLD || + batch.size == 0) { + return false; + } + // Were going to update the average variable row size by sampling the current batch + updateAvgVariableSize(batch); + numEntriesSinceCheck = 0; + return numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory; + } + + /** + * Updates the average variable size of the hash table entries. + * The average is only updates by probing the batch that added the entry in the hash table + * that caused the check threshold to be reached. + */ + private void updateAvgVariableSize(VectorizedRowBatch batch) { + int keyVariableSize = keyWrappersBatch.getVariableSize(batch.size); + int aggVariableSize = aggregationBatchInfo.getVariableSize(batch.size); + + // This assumes the distribution of variable size keys/aggregates in the input + // is the same as the distribution of variable sizes in the hash entries + avgVariableSize = (int)((avgVariableSize * sumBatchSize + keyVariableSize +aggVariableSize) / + (sumBatchSize + batch.size)); + } + /** * Evaluates the aggregators on the current batch. * The aggregationBatchInfo must have been prepared - * by calling {@link #prepareBatchAggregationBufferSets} first. + * by calling {@link #prepareBatchAggregationBufferSets} first. */ private void processAggregators(VectorizedRowBatch batch) throws HiveException { // We now have a vector of aggregation buffer sets to use for each row // We can start computing the aggregates. // If the number of distinct keys in the batch is 1 we can // use the optimized code path of aggregateInput - VectorAggregationBufferRow[] aggregationBufferSets = + VectorAggregationBufferRow[] aggregationBufferSets = aggregationBatchInfo.getAggregationBuffers(); if (aggregationBatchInfo.getDistinctBufferSetCount() == 1) { - VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = + VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = aggregationBufferSets[0].getAggregationBuffers(); for (int i = 0; i < aggregators.length; ++i) { aggregators[i].aggregateInput(aggregationBuffers[i], batch); @@ -178,16 +373,16 @@ private void processAggregators(VectorizedRowBatch batch) throws HiveException { } } } - + /** * Locates the aggregation buffer sets to use for each key in the current batch. * The keyWrappersBatch must have evaluated the current batch first. */ private void prepareBatchAggregationBufferSets(VectorizedRowBatch batch) throws HiveException { // The aggregation batch vector needs to know when we start a new batch - // to bump its internal version. + // to bump its internal version. aggregationBatchInfo.startBatch(); - + // We now have to probe the global hash and find-or-allocate // the aggregation buffers to use for each key present in the batch VectorHashKeyWrapper[] keyWrappers = keyWrappersBatch.getVectorHashKeyWrappers(); @@ -195,12 +390,14 @@ private void prepareBatchAggregationBufferSets(VectorizedRowBatch batch) throws VectorHashKeyWrapper kw = keyWrappers[i]; VectorAggregationBufferRow aggregationBuffer = mapKeysAggregationBuffers.get(kw); if (null == aggregationBuffer) { - // the probe failed, we must allocate a set of aggregation buffers + // the probe failed, we must allocate a set of aggregation buffers // and push the (keywrapper,buffers) pair into the hash. // is very important to clone the keywrapper, the one we have from our // keyWrappersBatch is going to be reset/reused on next batch. aggregationBuffer = allocateAggregationBuffer(); mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer); + numEntriesHashTable++; + numEntriesSinceCheck++; } aggregationBatchInfo.mapAggregationBufferSet(aggregationBuffer, i); } @@ -210,7 +407,7 @@ private void prepareBatchAggregationBufferSets(VectorizedRowBatch batch) throws * allocates a new aggregation buffer set. */ private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException { - VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = + VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = new VectorAggregateExpression.AggregationBuffer[aggregators.length]; for (int i=0; i < aggregators.length; ++i) { aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer(); @@ -223,36 +420,7 @@ private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveExcept @Override public void closeOp(boolean aborted) throws HiveException { if (!aborted) { - Object[] forwardCache = new Object[keyExpressions.length + aggregators.length]; - if (keyExpressions.length == 0 && mapKeysAggregationBuffers.isEmpty()) { - - // if this is a global aggregation (no keys) and empty set, must still emit NULLs - VectorAggregationBufferRow emptyBuffers = allocateAggregationBuffer(); - for (int i = 0; i < aggregators.length; ++i) { - forwardCache[i] = aggregators[i].evaluateOutput(emptyBuffers.getAggregationBuffer(i)); - } - forward(forwardCache, outputObjInspector); - } else { - - /* Iterate the global (keywrapper,aggregationbuffers) map and emit - a row for each key */ - for(Map.Entry pair: - mapKeysAggregationBuffers.entrySet()){ - int fi = 0; - for (int i = 0; i < keyExpressions.length; ++i) { - VectorHashKeyWrapper kw = (VectorHashKeyWrapper)pair.getKey(); - forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue ( - kw, i, keyOutputWriters[i]); - } - for (int i = 0; i < aggregators.length; ++i) { - forwardCache[fi++] = aggregators[i].evaluateOutput(pair.getValue() - .getAggregationBuffer(i)); - } - LOG.debug(String.format("forwarding keys: %s: %s", - pair.getKey().toString(), Arrays.toString(forwardCache))); - forward(forwardCache, outputObjInspector); - } - } + flush(true); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java index 01dd7be..70e3cfd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.KeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; /** @@ -233,6 +234,15 @@ public int getByteLength(int i) { return byteLengths[i - longValues.length - doubleValues.length]; } + public int getVariableSize() { + int variableSize = 0; + for (int i=0; i extends VectorAggregateExpression { count++; } } + + @Override + public int getVariableSize() { + throw new UnsupportedOperationException(); + } } private VectorExpression inputExpression; @@ -435,5 +441,14 @@ public class extends VectorAggregateExpression { public ObjectInspector getOutputObjectInspector() { return soi; } + + @Override + public int getAggregationBufferFixedSize() { + JavaDataModel model = JavaDataModel.get(); + return JavaDataModel.alignUp( + model.object() + + model.primitive2() * 2, + model.memoryAlign()); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt deleted file mode 100644 index 69ff67b..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen; - -import java.util.ArrayList; - -import org.apache.hadoop.hive.ql.exec.Description; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates. - VectorAggregateExpression.AggregationBuffer; -import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; - -/** -* . Vectorized implementation for COUNT aggregates. -*/ -@Description(name = "count", value = "_FUNC_(expr) - Returns the maximum value of expr (vectorized)") -public class extends VectorAggregateExpression { - - /** - /* class for storing the current aggregate value. - */ - static class Aggregation implements AggregationBuffer { - long value; - boolean isNull; - - public void initIfNull() { - if (isNull) { - isNull = false; - value = 0; - } - } - } - - private VectorExpression inputExpression; - private LongWritable result; - - public (VectorExpression inputExpression) { - super(); - this.inputExpression = inputExpression; - result = new LongWritable(0); - } - - private Aggregation getCurrentAggregationBuffer( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int row) { - VectorAggregationBufferRow mySet = aggregationBufferSets[row]; - Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex); - return myagg; - } - - @Override - public void aggregateInputSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - VectorizedRowBatch batch) throws HiveException { - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - inputExpression.evaluate(batch); - - VectorColumn inputVector = (VectorColumn)batch. - cols[this.inputExpression.getOutputColumn()]; - - if (inputVector.noNulls) { - // if there are no nulls then the iteration is the same on all cases - iterateNoNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, batchSize); - } else if (!batch.selectedInUse) { - iterateHasNullsWithAggregationSelection( - aggregationBufferSets, aggregateIndex, - batchSize, inputVector.isNull); - } else if (batch.selectedInUse) { - iterateHasNullsSelectionWithAggregationSelection( - aggregationBufferSets, aggregateIndex, - batchSize, batch.selected, inputVector.isNull); - } - } - - private void iterateNoNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int batchSize) { - - for (int i=0; i < batchSize; ++i) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - myagg.initIfNull(); - myagg.value++; - } - } - - private void iterateHasNullsWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int batchSize, - boolean[] isNull) { - - for (int i=0; i < batchSize; ++i) { - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - i); - myagg.initIfNull(); - myagg.value++; - } - } - } - - private void iterateHasNullsSelectionWithAggregationSelection( - VectorAggregationBufferRow[] aggregationBufferSets, - int aggregateIndex, - int batchSize, - int[] selection, - boolean[] isNull) { - - for (int j=0; j < batchSize; ++j) { - int i = selection[j]; - if (!isNull[i]) { - Aggregation myagg = getCurrentAggregationBuffer( - aggregationBufferSets, - aggregateIndex, - j); - myagg.initIfNull(); - myagg.value++; - } - } - } - - - @Override - public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) - throws HiveException { - - inputExpression.evaluate(batch); - - VectorColumn inputVector = (VectorColumn)batch. - cols[this.inputExpression.getOutputColumn()]; - - int batchSize = batch.size; - - if (batchSize == 0) { - return; - } - - Aggregation myagg = (Aggregation)agg; - - myagg.initIfNull(); - - if (inputVector.isRepeating) { - if (inputVector.noNulls || !inputVector.isNull[0]) { - myagg.value += batchSize; - } - return; - } - - if (inputVector.noNulls) { - myagg.value += batchSize; - return; - } - else if (!batch.selectedInUse) { - iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull); - } - else { - iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected); - } - } - - private void iterateSelectionHasNulls( - Aggregation myagg, - int batchSize, - boolean[] isNull, - int[] selected) { - - for (int j=0; j< batchSize; ++j) { - int i = selected[j]; - if (!isNull[i]) { - myagg.value += 1; - } - } - } - - private void iterateNoSelectionHasNulls( - Aggregation myagg, - int batchSize, - boolean[] isNull) { - - for (int i=0; i< batchSize; ++i) { - if (!isNull[i]) { - myagg.value += 1; - } - } - } - - @Override - public AggregationBuffer getNewAggregationBuffer() throws HiveException { - return new Aggregation(); - } - - @Override - public void reset(AggregationBuffer agg) throws HiveException { - Aggregation myAgg = (Aggregation) agg; - myAgg.isNull = true; - } - - @Override - public Object evaluateOutput(AggregationBuffer agg) throws HiveException { - Aggregation myagg = (Aggregation) agg; - if (myagg.isNull) { - return null; - } - else { - result.set (myagg.value); - return result; - } - } - - @Override - public ObjectInspector getOutputObjectInspector() { - return PrimitiveObjectInspectorFactory.writableLongObjectInspector; - } - -} - diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt index d00d9ae..0637f8f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -63,6 +64,11 @@ public class extends VectorAggregateExpression { this.value = value; } } + + @Override + public int getVariableSize() { + throw new UnsupportedOperationException(); + } } private VectorExpression inputExpression; @@ -411,5 +417,15 @@ public class extends VectorAggregateExpression { public ObjectInspector getOutputObjectInspector() { return ; } + + @Override + public int getAggregationBufferFixedSize() { + JavaDataModel model = JavaDataModel.get(); + return JavaDataModel.alignUp( + model.object() + + model.primitive2(), + model.memoryAlign()); + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt index 96ce80c..43c1a04 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggreg import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -74,6 +75,12 @@ public class extends VectorAggregateExpression { System.arraycopy(bytes, start, this.bytes, 0, length); this.length = length; } + @Override + public int getVariableSize() { + JavaDataModel model = JavaDataModel.get(); + return model.lengthForByteArrayOfSize(bytes.length); + } + } private VectorExpression inputExpression; @@ -359,5 +366,21 @@ public class extends VectorAggregateExpression { public ObjectInspector getOutputObjectInspector() { return PrimitiveObjectInspectorFactory.writableStringObjectInspector; } + + @Override + public int getAggregationBufferFixedSize() { + JavaDataModel model = JavaDataModel.get(); + return JavaDataModel.alignUp( + model.object() + + model.ref()+ + model.primitive1()*2, + model.memoryAlign()); + } + + @Override + public boolean hasVariableSize() { + return true; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt index c4e6a19..6db3aa4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -62,6 +63,11 @@ public class extends VectorAggregateExpression { sum += value; } } + + @Override + public int getVariableSize() { + throw new UnsupportedOperationException(); + } } VectorExpression inputExpression; @@ -404,5 +410,15 @@ public class extends VectorAggregateExpression { public ObjectInspector getOutputObjectInspector() { return ; } + + @Override + public int getAggregationBufferFixedSize() { + JavaDataModel model = JavaDataModel.get(); + return JavaDataModel.alignUp( + model.object(), + model.memoryAlign()); + } + + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt index 9c8ad94..96d0f40 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -62,6 +63,11 @@ public class extends VectorAggregateExpression { count = 0; variance = 0; } + + @Override + public int getVariableSize() { + throw new UnsupportedOperationException(); + } } private VectorExpression inputExpression; @@ -482,5 +488,16 @@ public class extends VectorAggregateExpression { public ObjectInspector getOutputObjectInspector() { return soi; } + + @Override + public int getAggregationBufferFixedSize() { + JavaDataModel model = JavaDataModel.get(); + return JavaDataModel.alignUp( + model.object() + + model.primitive2()*3+ + model.primitive1(), + model.memoryAlign()); + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java index e3eec02..7a4812b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java +++ ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java @@ -27,95 +27,133 @@ public enum JavaDataModel { JAVA32 { + @Override public int object() { return JAVA32_OBJECT; } + @Override public int array() { return JAVA32_ARRAY; } + @Override public int ref() { return JAVA32_REF; } + @Override public int hashMap(int entry) { // base = JAVA32_OBJECT + PRIMITIVES1 * 4 + JAVA32_FIELDREF * 3 + JAVA32_ARRAY; // entry = JAVA32_OBJECT + JAVA32_FIELDREF + PRIMITIVES1 return 64 + 24 * entry; } + @Override + public int hashMapEntry() { + return 24; + } + + @Override public int hashSet(int entry) { // hashMap += JAVA32_OBJECT return 80 + 24 * entry; } + @Override public int linkedHashMap(int entry) { // hashMap += JAVA32_FIELDREF + PRIMITIVES1 // hashMap.entry += JAVA32_FIELDREF * 2 return 72 + 32 * entry; } + @Override public int linkedList(int entry) { // base = JAVA32_OBJECT + PRIMITIVES1 * 2 + JAVA32_FIELDREF; // entry = JAVA32_OBJECT + JAVA32_FIELDREF * 2 return 28 + 24 * entry; } + @Override public int arrayList() { // JAVA32_OBJECT + PRIMITIVES1 * 2 + JAVA32_ARRAY; return 44; } + + @Override + public int memoryAlign() { + return 8; + } }, JAVA64 { + @Override public int object() { return JAVA64_OBJECT; } + @Override public int array() { return JAVA64_ARRAY; } + @Override public int ref() { return JAVA64_REF; } + @Override public int hashMap(int entry) { // base = JAVA64_OBJECT + PRIMITIVES1 * 4 + JAVA64_FIELDREF * 3 + JAVA64_ARRAY; // entry = JAVA64_OBJECT + JAVA64_FIELDREF + PRIMITIVES1 return 112 + 44 * entry; } + @Override + public int hashMapEntry() { + return 44; + } + + @Override public int hashSet(int entry) { // hashMap += JAVA64_OBJECT return 144 + 44 * entry; } + @Override public int linkedHashMap(int entry) { // hashMap += JAVA64_FIELDREF + PRIMITIVES1 // hashMap.entry += JAVA64_FIELDREF * 2 return 128 + 60 * entry; } + @Override public int linkedList(int entry) { // base = JAVA64_OBJECT + PRIMITIVES1 * 2 + JAVA64_FIELDREF; // entry = JAVA64_OBJECT + JAVA64_FIELDREF * 2 return 48 + 48 * entry; } + @Override public int arrayList() { // JAVA64_OBJECT + PRIMITIVES1 * 2 + JAVA64_ARRAY; return 80; } + + @Override + public int memoryAlign() { + return 8; + } }; public abstract int object(); public abstract int array(); public abstract int ref(); public abstract int hashMap(int entry); + public abstract int hashMapEntry(); public abstract int hashSet(int entry); public abstract int linkedHashMap(int entry); public abstract int linkedList(int entry); public abstract int arrayList(); + public abstract int memoryAlign(); // ascii string public int lengthFor(String string) { @@ -161,6 +199,10 @@ public int primitive2() { return PRIMITIVES2; } + public static int alignUp(int value, int align) { + return (value + align - 1) & ~(align - 1); + } + public static final int JAVA32_META = 12; public static final int JAVA32_ARRAY_META = 16; public static final int JAVA32_REF = 4; @@ -176,6 +218,8 @@ public int primitive2() { public static final int PRIMITIVES1 = 4; // void, boolean, byte, short, int, float public static final int PRIMITIVES2 = 8; // long, double + public static final int PRIMITIVE_BYTE = 1; // byte + private static JavaDataModel current; public static JavaDataModel get() { @@ -200,4 +244,27 @@ public static int round(int size) { } return ((size + 8) >> 3) << 3; } + + private int lengthForPrimitiveArrayOfSize(int primitiveSize, int length) { + return alignUp(array() + primitiveSize*length, memoryAlign()); + } + + public int lengthForByteArrayOfSize(int length) { + return lengthForPrimitiveArrayOfSize(PRIMITIVE_BYTE, length); + } + public int lengthForObjectArrayOfSize(int length) { + return lengthForPrimitiveArrayOfSize(ref(), length); + } + public int lengthForLongArrayOfSize(int length) { + return lengthForPrimitiveArrayOfSize(primitive2(), length); + } + public int lengthForDoubleArrayOfSize(int length) { + return lengthForPrimitiveArrayOfSize(primitive2(), length); + } + public int lengthForIntArrayOfSize(int length) { + return lengthForPrimitiveArrayOfSize(primitive1(), length); + } + public int lengthForBooleanArrayOfSize(int length) { + return lengthForPrimitiveArrayOfSize(PRIMITIVE_BYTE, length); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java index 0050ebc..c5a1c40 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java @@ -43,18 +43,17 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.ByteWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; - +import org.apache.hadoop.io.Text; import org.junit.Assert; import org.junit.Test; @@ -151,12 +150,12 @@ private static GroupByDesc buildKeyGroupByDesc( ArrayList keys = new ArrayList(); keys.add(keyExp); desc.setKeys(keys); - + desc.getOutputColumnNames().add("_col1"); return desc; } - + @Test public void testDoubleValueTypeSum() throws HiveException { testKeyTypeAggregate( @@ -168,7 +167,7 @@ public void testDoubleValueTypeSum() throws HiveException { Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 20.0, null, 19.0)); } - + @Test public void testDoubleValueTypeSumOneKey() throws HiveException { testKeyTypeAggregate( @@ -179,8 +178,8 @@ public void testDoubleValueTypeSumOneKey() throws HiveException { Arrays.asList(new Object[]{ 1, 1, 1, 1}), Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 39.0)); - } - + } + @Test public void testDoubleValueTypeCount() throws HiveException { testKeyTypeAggregate( @@ -192,7 +191,7 @@ public void testDoubleValueTypeCount() throws HiveException { Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 2L, null, 1L)); } - + public void testDoubleValueTypeCountOneKey() throws HiveException { testKeyTypeAggregate( "count", @@ -202,8 +201,8 @@ public void testDoubleValueTypeCountOneKey() throws HiveException { Arrays.asList(new Object[]{ 1, 1, 1, 1}), Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 3L)); - } - + } + @Test public void testDoubleValueTypeAvg() throws HiveException { testKeyTypeAggregate( @@ -215,7 +214,7 @@ public void testDoubleValueTypeAvg() throws HiveException { Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 10.0, null, 19.0)); } - + @Test public void testDoubleValueTypeAvgOneKey() throws HiveException { testKeyTypeAggregate( @@ -226,8 +225,8 @@ public void testDoubleValueTypeAvgOneKey() throws HiveException { Arrays.asList(new Object[]{ 1, 1, 1, 1}), Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 13.0)); - } - + } + @Test public void testDoubleValueTypeMin() throws HiveException { testKeyTypeAggregate( @@ -239,7 +238,7 @@ public void testDoubleValueTypeMin() throws HiveException { Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 7.0, null, 19.0)); } - + @Test public void testDoubleValueTypeMinOneKey() throws HiveException { testKeyTypeAggregate( @@ -251,7 +250,7 @@ public void testDoubleValueTypeMinOneKey() throws HiveException { Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 7.0)); } - + @Test public void testDoubleValueTypeMax() throws HiveException { testKeyTypeAggregate( @@ -287,7 +286,7 @@ public void testDoubleValueTypeVariance() throws HiveException { Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 9.0, null, 0.0)); } - + @Test public void testDoubleValueTypeVarianceOneKey() throws HiveException { testKeyTypeAggregate( @@ -298,7 +297,7 @@ public void testDoubleValueTypeVarianceOneKey() throws HiveException { Arrays.asList(new Object[]{ 1, 1, 1, 1}), Arrays.asList(new Object[]{13.0,null,7.0, 19.0})), buildHashMap((byte)1, 24.0)); - } + } @Test public void testTinyintKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -310,7 +309,7 @@ public void testTinyintKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap((byte)1, 20L, null, 19L)); } - + @Test public void testSmallintKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -321,8 +320,8 @@ public void testSmallintKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{ 1,null, 1, null}), Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap((short)1, 20L, null, 19L)); - } - + } + @Test public void testIntKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -334,7 +333,7 @@ public void testIntKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap((int)1, 20L, null, 19L)); } - + @Test public void testBigintKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -346,7 +345,7 @@ public void testBigintKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap((long)1L, 20L, null, 19L)); } - + @Test public void testBooleanKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -358,7 +357,7 @@ public void testBooleanKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap(true, 20L, null, 19L)); } - + @Test public void testTimestampKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -369,8 +368,8 @@ public void testTimestampKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{new Timestamp(1),null, new Timestamp(1), null}), Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap(new Timestamp(1), 20L, null, 19L)); - } - + } + @Test public void testFloatKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -382,7 +381,7 @@ public void testFloatKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap((float)1.0, 20L, null, 19L)); } - + @Test public void testDoubleKeyTypeAggregate() throws HiveException { testKeyTypeAggregate( @@ -393,8 +392,8 @@ public void testDoubleKeyTypeAggregate() throws HiveException { Arrays.asList(new Object[]{ 1,null, 1, null}), Arrays.asList(new Object[]{13L,null,7L, 19L})), buildHashMap((double)1.0, 20L, null, 19L)); - } - + } + @Test public void testCountStar() throws HiveException { testAggregateCountStar( @@ -1262,18 +1261,18 @@ public void testStdDevSampLongRepeat () throws HiveException { 1024, (double)0); } - + private void testKeyTypeAggregate( String aggregateName, FakeVectorRowBatchFromObjectIterables data, Map expected) throws HiveException { - + Map mapColumnNames = new HashMap(); mapColumnNames.put("Key", 0); mapColumnNames.put("Value", 1); VectorizationContext ctx = new VectorizationContext(mapColumnNames, 2); Set keys = new HashSet(); - + AggregationDesc agg = buildAggregationDesc(ctx, aggregateName, "Value", TypeInfoFactory.getPrimitiveTypeInfo(data.getTypes()[1])); ArrayList aggs = new ArrayList(); @@ -1287,7 +1286,7 @@ private void testKeyTypeAggregate( desc.setOutputColumnNames(outputColumnNames); desc.setAggregators(aggs); - ExprNodeDesc keyExp = buildColumnDesc(ctx, "Key", + ExprNodeDesc keyExp = buildColumnDesc(ctx, "Key", TypeInfoFactory.getPrimitiveTypeInfo(data.getTypes()[0])); ArrayList keysDesc = new ArrayList(); keysDesc.add(keyExp); @@ -1338,10 +1337,10 @@ public void inspectRow(Object row, int tag) throws HiveException { BooleanWritable bwKey = (BooleanWritable)key; keyValue = bwKey.get(); } else { - Assert.fail(String.format("Not implemented key output type %s: %s", + Assert.fail(String.format("Not implemented key output type %s: %s", key.getClass().getName(), key)); } - + assertTrue(expected.containsKey(keyValue)); Object expectedValue = expected.get(keyValue); Object value = fields[1]; @@ -1367,8 +1366,8 @@ public void inspectRow(Object row, int tag) throws HiveException { List outBatchList = out.getCapturedRows(); assertNotNull(outBatchList); assertEquals(expected.size(), outBatchList.size()); - assertEquals(expected.size(), keys.size()); - } + assertEquals(expected.size(), keys.size()); + } public void testAggregateLongRepeats ( @@ -1498,9 +1497,9 @@ public void validate(Object expected, Object result) { } else if (arr[0] instanceof LongWritable) { LongWritable lw = (LongWritable) arr[0]; assertEquals((Long) expected, (Long) lw.get()); - } else if (arr[0] instanceof BytesWritable) { - BytesWritable bw = (BytesWritable) arr[0]; - String sbw = new String(bw.getBytes()); + } else if (arr[0] instanceof Text) { + Text tx = (Text) arr[0]; + String sbw = tx.toString(); assertEquals((String) expected, sbw); } else if (arr[0] instanceof DoubleWritable) { DoubleWritable dw = (DoubleWritable) arr[0]; @@ -1849,9 +1848,9 @@ public void inspectRow(Object row, int tag) throws HiveException { Object key = fields[0]; String keyValue = null; if (null != key) { - assertTrue(key instanceof BytesWritable); - BytesWritable bwKey = (BytesWritable)key; - keyValue = new String(bwKey.get()); + assertTrue(key instanceof Text); + Text bwKey = (Text)key; + keyValue = bwKey.toString(); } assertTrue(expected.containsKey(keyValue)); Object expectedValue = expected.get(keyValue);