diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 7918194..43e03bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -33,7 +33,9 @@ import javolution.util.FastBitSet; +import javolution.util.FastMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveBaseChar; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.OpParseContext; @@ -45,6 +47,8 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.util.JavaDataModel; +import org.apache.hadoop.hive.serde2.io.HiveBaseCharWritable; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyBinary; import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; @@ -73,9 +77,6 @@ private static final long serialVersionUID = 1L; private static final int NUMROWSESTIMATESIZE = 1000; - private transient ExprNodeEvaluator[] keyFields; - private transient ObjectInspector[] keyObjectInspectors; - private transient ExprNodeEvaluator[][] aggregationParameterFields; private transient ObjectInspector[][] aggregationParameterObjectInspectors; private transient ObjectInspector[][] aggregationParameterStandardObjectInspectors; @@ -99,13 +100,14 @@ // Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2, // MERGEPARTIAL + private transient KeyWrapperFactory keyFactory; private transient KeyWrapper currentKeys; private transient KeyWrapper newKeys; private transient AggregationBuffer[] aggregations; private transient Object[][] aggregationsParametersLastInvoke; // Used by hash-based GroupBy: Mode = HASH, PARTIALS - private transient HashMap hashAggregations; + private transient FastMap hashAggregations; private transient boolean firstRow; private transient boolean hashAggr; @@ -117,9 +119,6 @@ private transient int outputKeyLength; - // current Key ObjectInspectors are standard ObjectInspectors - private transient ObjectInspector[] currentKeyObjectInspectors; - private transient MemoryMXBean memoryMXBean; private transient boolean groupingSetsPresent; // generates grouping set @@ -191,15 +190,11 @@ protected void initializeOp(Configuration hconf) throws HiveException { // init keyFields int numKeys = conf.getKeys().size(); - keyFields = new ExprNodeEvaluator[numKeys]; - keyObjectInspectors = new ObjectInspector[numKeys]; - currentKeyObjectInspectors = new ObjectInspector[numKeys]; + ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numKeys]; + ObjectInspector[] keyObjectInspectors = new ObjectInspector[numKeys]; for (int i = 0; i < numKeys; i++) { keyFields[i] = ExprNodeEvaluatorFactory.get(conf.getKeys().get(i)); keyObjectInspectors[i] = keyFields[i].initialize(rowInspector); - currentKeyObjectInspectors[i] = ObjectInspectorUtils - .getStandardObjectInspector(keyObjectInspectors[i], - ObjectInspectorCopyOption.WRITABLE); } // Initialize the constants for the grouping sets, so that they can be re-used for @@ -327,6 +322,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { AggregationDesc agg = conf.getAggregators().get(i); aggregationEvaluators[i] = agg.getGenericUDAFEvaluator(); } + aggregations = newAggregations(); MapredContext context = MapredContext.get(); if (context != null) { @@ -338,11 +334,9 @@ protected void initializeOp(Configuration hconf) throws HiveException { aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][]; if ((conf.getMode() != GroupByDesc.Mode.HASH || conf.getBucketGroup()) && (!groupingSetsPresent)) { - aggregations = newAggregations(); hashAggr = false; } else { - hashAggregations = new HashMap(256); - aggregations = newAggregations(); + hashAggregations = new FastMap(256); hashAggr = true; keyPositionsSize = new ArrayList(); aggrPositions = new List[aggregations.length]; @@ -361,12 +355,14 @@ protected void initializeOp(Configuration hconf) throws HiveException { // see ColumnPrunerGroupByProc outputKeyLength = conf.pruneGroupingSetId() ? keyFields.length - 1 : keyFields.length; + keyFactory = new KeyWrapperFactory(keyFields, keyObjectInspectors); + newKeys = keyFactory.getKeyWrapper(); + // init objectInspectors ObjectInspector[] objectInspectors = new ObjectInspector[outputKeyLength + aggregationEvaluators.length]; - for (int i = 0; i < outputKeyLength; i++) { - objectInspectors[i] = currentKeyObjectInspectors[i]; - } + + System.arraycopy(keyFactory.getOutputOI(), 0, objectInspectors, 0, outputKeyLength); for (int i = 0; i < aggregationEvaluators.length; i++) { objectInspectors[outputKeyLength + i] = aggregationEvaluators[i].init(conf.getAggregators() .get(i).getMode(), aggregationParameterObjectInspectors[i]); @@ -375,11 +371,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { outputObjInspector = ObjectInspectorFactory .getStandardStructObjectInspector(fieldNames, Arrays.asList(objectInspectors)); - KeyWrapperFactory keyWrapperFactory = - new KeyWrapperFactory(keyFields, keyObjectInspectors, currentKeyObjectInspectors); - - newKeys = keyWrapperFactory.getKeyWrapper(); - firstRow = true; // estimate the number of hash table entries based on the size of each // entry. Since the size of a entry @@ -424,6 +415,7 @@ private void computeMaxEntriesHashAggr(Configuration hconf) throws HiveException * @return the size of this datatype **/ private int getSize(int pos, PrimitiveCategory category) { + JavaDataModel model = JavaDataModel.get(); switch (category) { case VOID: case BOOLEAN: @@ -431,17 +423,22 @@ private int getSize(int pos, PrimitiveCategory category) { case SHORT: case INT: case LONG: + return model.object() + model.ref() + JavaDataModel.PRIMITIVES1; case FLOAT: case DOUBLE: - return javaSizePrimitiveType; + return model.object() + model.ref() + JavaDataModel.PRIMITIVES2; + case DECIMAL: + return model.lengthOfHiveDecimal(); case STRING: + case CHAR: + case VARCHAR: + case BINARY: keyPositionsSize.add(new Integer(pos)); - return javaObjectOverHead; - case BINARY: - keyPositionsSize.add(new Integer(pos)); - return javaObjectOverHead; + return model.object() + model.ref(); + case DATE: + return model.object() + model.primitive1(); case TIMESTAMP: - return javaObjectOverHead + javaSizePrimitiveType; + return model.lengthOfTimestampWritable(); default: return javaSizeUnknownType; } @@ -462,27 +459,36 @@ private int getSize(int pos, PrimitiveCategory category) { * @return the size of this datatype **/ private int getSize(int pos, Class c, Field f) { - if (c.isPrimitive() - || c.isInstance(Boolean.valueOf(true)) - || c.isInstance(Byte.valueOf((byte) 0)) - || c.isInstance(Short.valueOf((short) 0)) - || c.isInstance(Integer.valueOf(0)) - || c.isInstance(Long.valueOf(0)) - || c.isInstance(new Float(0)) - || c.isInstance(new Double(0))) { - return javaSizePrimitiveType; + if (c.isPrimitive()) { + if (c == long.class || c == double.class) { + return JavaDataModel.PRIMITIVES2; + } + return JavaDataModel.PRIMITIVES1; + } + + if (c.isInstance(Long.class) || c.isInstance(Double.class)) { + return JavaDataModel.get().object() + JavaDataModel.PRIMITIVES2; + } + + if (c.isInstance(Boolean.class) + || c.isInstance(Byte.class) + || c.isInstance(Short.class) + || c.isInstance(Integer.class) + || c.isInstance(Float.class)) { + return JavaDataModel.get().object() + JavaDataModel.PRIMITIVES1; } - if (c.isInstance(new Timestamp(0))){ - return javaObjectOverHead + javaSizePrimitiveType; + if (c.isInstance(Timestamp.class)){ + return JavaDataModel.get().lengthOfTimestamp(); } - if (c.isInstance(new String()) || c.isInstance(new ByteArrayRef())) { + if (c.isInstance(String.class) || c.isInstance(ByteArrayRef.class) + || c.isInstance(byte[].class)) { if (aggrPositions[pos] == null) { aggrPositions[pos] = new ArrayList(); } aggrPositions[pos].add(f); - return javaObjectOverHead; + return JavaDataModel.get().object(); } return javaSizeUnknownType; @@ -506,16 +512,18 @@ private int getSize(int pos, TypeInfo typeInfo) { * @return the size of each row **/ private void estimateRowSize() throws HiveException { + JavaDataModel model = JavaDataModel.get(); // estimate the size of each entry - // a datatype with unknown size (String/Struct etc. - is assumed to be 256 // bytes for now). - // 64 bytes is the overhead for a reference - fixedRowSize = javaHashEntryOverHead; + fixedRowSize = model.hashMapEntry(); ArrayList keys = conf.getKeys(); // Go over all the keys and get the size of the fields of fixed length. Keep // track of the variable length keys + fixedRowSize += model.ref() + model.primitive1(); + fixedRowSize += model.lengthForObjectArrayOfSize(keys.size()); for (int pos = 0; pos < keys.size(); pos++) { fixedRowSize += getSize(pos, keys.get(pos).getTypeInfo()); } @@ -526,7 +534,7 @@ private void estimateRowSize() throws HiveException { estimableAggregationEvaluators = new boolean[aggregationEvaluators.length]; for (int i = 0; i < aggregationEvaluators.length; i++) { - fixedRowSize += javaObjectOverHead; + fixedRowSize += model.object(); AggregationBuffer agg = aggregationEvaluators[i].getNewAggregationBuffer(); if (GenericUDAFEvaluator.isEstimable(agg)) { estimableAggregationEvaluators[i] = true; @@ -733,13 +741,13 @@ public void processOp(Object row, int tag) throws HiveException { newKeys.getNewKey(row, rowInspector); if (groupingSetsPresent) { - Object[] newKeysArray = newKeys.getKeyArray(); - Object[] cloneNewKeysArray = new Object[newKeysArray.length]; - for (int keyPos = 0; keyPos < groupingSetsPosition; keyPos++) { - cloneNewKeysArray[keyPos] = newKeysArray[keyPos]; - } - + Object[] cloneNewKeysArray = new Object[outputKeyLength]; + System.arraycopy(newKeys.getKeyArray(), 0, cloneNewKeysArray, 0, outputKeyLength); + for (int groupingSetPos = 0; groupingSetPos < groupingSets.size(); groupingSetPos++) { + Object[] newKeysArray = newKeys.getKeyArray(); + System.arraycopy(cloneNewKeysArray, groupingSetsPosition, newKeysArray, + groupingSetsPosition, outputKeyLength - groupingSetsPosition); for (int keyPos = 0; keyPos < groupingSetsPosition; keyPos++) { newKeysArray[keyPos] = null; } @@ -767,21 +775,20 @@ public void processOp(Object row, int tag) throws HiveException { private void processHashAggr(Object row, ObjectInspector rowInspector, KeyWrapper newKeys) throws HiveException { // Prepare aggs for updating - AggregationBuffer[] aggs = null; boolean newEntryForHashAggr = false; // hash-based aggregations - aggs = hashAggregations.get(newKeys); + AggregationBuffer[] aggs = hashAggregations.putIfAbsent(newKeys, aggregations); if (aggs == null) { - KeyWrapper newKeyProber = newKeys.copyKey(); - aggs = newAggregations(); - hashAggregations.put(newKeyProber, aggs); + aggs = aggregations; newEntryForHashAggr = true; + newKeys.copyKey(newKeys); // copy to self numRowsHashTbl++; // new entry in the hash table } + boolean mergeToHashAggregation = newEntryForHashAggr; // Update the aggs - updateAggregations(aggs, row, rowInspector, true, newEntryForHashAggr, null); + updateAggregations(aggs, row, rowInspector, true, mergeToHashAggregation, null); // We can only flush after the updateAggregations is done, or the // potentially new entry "aggs" @@ -792,6 +799,10 @@ private void processHashAggr(Object row, ObjectInspector rowInspector, if ( shouldBeFlushed(newKeys)) { flushHashTable(false); } + if (newEntryForHashAggr) { + newKeys = keyFactory.getKeyWrapper(); + aggregations = newAggregations(); + } } // Non-hash aggregation @@ -804,24 +815,20 @@ private void processAggr(Object row, //boolean keysAreEqual = (currentKeys != null && newKeys != null)? // newKeyStructEqualComparer.areEqual(currentKeys, newKeys) : false; - boolean keysAreEqual = (currentKeys != null && newKeys != null)? - newKeys.equals(currentKeys) : false; - - // Forward the current keys if needed for sort-based aggregation - if (currentKeys != null && !keysAreEqual) { - // This is to optimize queries of the form: - // select count(distinct key) from T - // where T is sorted and bucketized by key - // Partial aggregation is performed on the mapper, and the - // reducer gets 1 row (partial result) per mapper. - if (!conf.isDontResetAggrsDistinct()) { - forward(currentKeys.getKeyArray(), aggregations); - countAfterReport = 0; - } - } - // Need to update the keys? - if (currentKeys == null || !keysAreEqual) { + if (currentKeys == null || !newKeys.equals(currentKeys)) { + // Forward the current keys if needed for sort-based aggregation + if (currentKeys != null) { + // This is to optimize queries of the form: + // select count(distinct key) from T + // where T is sorted and bucketized by key + // Partial aggregation is performed on the mapper, and the + // reducer gets 1 row (partial result) per mapper. + if (!conf.isDontResetAggrsDistinct()) { + forward(currentKeys.getKeyArray(), aggregations); + countAfterReport = 0; + } + } if (currentKeys == null) { currentKeys = newKeys.copyKey(); } else { @@ -868,10 +875,14 @@ private boolean shouldBeFlushed(KeyWrapper newKeys) { if(rate > memoryThreshold){ return true; } + + Object[] keyArray = newKeys.getKeyArray(); + for (Integer pos : keyPositionsSize) { - Object key = newKeys.getKeyArray()[pos.intValue()]; + Object key = keyArray[pos.intValue()]; // Ignore nulls if (key != null) { + totalVariableSize += JavaDataModel.get().ref(); if (key instanceof LazyString) { totalVariableSize += ((LazyPrimitive) key). @@ -888,6 +899,12 @@ private boolean shouldBeFlushed(KeyWrapper newKeys) { totalVariableSize += ((BytesWritable) key).getLength(); } else if (key instanceof ByteArrayRef) { totalVariableSize += ((ByteArrayRef) key).getData().length; + } else if (key instanceof byte[]) { + totalVariableSize += ((byte[]) key).length; + } else if (key instanceof HiveBaseChar) { + totalVariableSize += ((HiveBaseChar) key).getValue().length(); + } else if (key instanceof HiveBaseCharWritable) { + totalVariableSize += ((HiveBaseCharWritable) key).getTextValue().getLength(); } } } @@ -1023,7 +1040,6 @@ public void flush() throws HiveException{ .next(); forward(m.getKey().getKeyArray(), m.getValue()); - iter.remove(); } hashAggregations.clear(); } else if (aggregations != null) { @@ -1052,7 +1068,7 @@ public void closeOp(boolean abort) throws HiveException { if (!abort) { try { // If there is no grouping key and no row came to this operator - if (firstRow && (keyFields.length == 0)) { + if (firstRow && outputKeyLength == 0) { firstRow = false; // There is no grouping key - simulate a null row @@ -1069,9 +1085,6 @@ public void closeOp(boolean abort) throws HiveException { } // Calculate the parameters - for (int pi = 0; pi < aggregationParameterFields[ai].length; pi++) { - o[pi] = null; - } aggregationEvaluators[ai].aggregate(aggregations[ai], o); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java index bf4ba7f..09b5bdd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java @@ -25,40 +25,72 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.Text; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; public class KeyWrapperFactory { - public KeyWrapperFactory(ExprNodeEvaluator[] keyFields, ObjectInspector[] keyObjectInspectors, - ObjectInspector[] currentKeyObjectInspectors) { + public KeyWrapperFactory(ExprNodeEvaluator[] keyFields, ObjectInspector[] keyObjectInspectors) { this.keyFields = keyFields; this.keyObjectInspectors = keyObjectInspectors; - this.currentKeyObjectInspectors = currentKeyObjectInspectors; + if (allPrimitives(keyObjectInspectors)) { + outKeyObjectInspectors = createSimpleKeyOutputOI(keyObjectInspectors); + preferWritable = new boolean[keyFields.length]; + for (int i = 0; i < preferWritable.length; i++) { + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) outKeyObjectInspectors[i]; + preferWritable[i] = poi.preferWritable(); + } + } else { + outKeyObjectInspectors = createListKeyOutputOI(keyObjectInspectors); + preferWritable = null; + } + } + + public ObjectInspector[] getOutputOI() { + return outKeyObjectInspectors; + } + + private boolean allPrimitives(ObjectInspector[] inputOI) { + for (ObjectInspector oi : inputOI) { + if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE) { + return false; + } + } + return true; + } + private ObjectInspector[] createSimpleKeyOutputOI(ObjectInspector[] inputOI) { + ObjectInspector[] outputOI = new ObjectInspector[inputOI.length]; + for (int i = 0; i < inputOI.length; i++) { + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) inputOI[i]; + ObjectInspectorCopyOption option = + !poi.getPrimitiveCategory().isJavaPrimitive() && poi.preferWritable() ? + ObjectInspectorCopyOption.WRITABLE : ObjectInspectorCopyOption.JAVA; + outputOI[i] = ObjectInspectorUtils.getStandardObjectInspector(inputOI[i], option); + } + return outputOI; + } + + private ObjectInspector[] createListKeyOutputOI(ObjectInspector[] inputOI) { + ObjectInspector[] outputOI = new ObjectInspector[inputOI.length]; + for (int i = 0; i < inputOI.length; i++) { + outputOI[i] = ObjectInspectorUtils.getStandardObjectInspector(inputOI[i], + ObjectInspectorCopyOption.WRITABLE); + } + return outputOI; } public KeyWrapper getKeyWrapper() { - if (keyFields.length == 1 - && TypeInfoUtils.getTypeInfoFromObjectInspector(keyObjectInspectors[0]).equals( - TypeInfoFactory.stringTypeInfo)) { - assert(TypeInfoUtils.getTypeInfoFromObjectInspector(currentKeyObjectInspectors[0]).equals( - TypeInfoFactory.stringTypeInfo)); - soi_new = (StringObjectInspector) keyObjectInspectors[0]; - soi_copy = (StringObjectInspector) currentKeyObjectInspectors[0]; - return new TextKeyWrapper(false); - } else { - currentStructEqualComparer = new ListObjectsEqualComparer(currentKeyObjectInspectors, currentKeyObjectInspectors); - newKeyStructEqualComparer = new ListObjectsEqualComparer(currentKeyObjectInspectors, keyObjectInspectors); - return new ListKeyWrapper(false); + if (preferWritable != null) { + return new SimpleKeys(); } + currentStructEqualComparer = new ListObjectsEqualComparer(outKeyObjectInspectors, outKeyObjectInspectors); + newKeyStructEqualComparer = new ListObjectsEqualComparer(outKeyObjectInspectors, keyObjectInspectors); + return new ListKeyWrapper(false); } - transient ExprNodeEvaluator[] keyFields; - transient ObjectInspector[] keyObjectInspectors; - transient ObjectInspector[] currentKeyObjectInspectors; - + private final ExprNodeEvaluator[] keyFields; + private final ObjectInspector[] keyObjectInspectors; + private final ObjectInspector[] outKeyObjectInspectors; + private final boolean[] preferWritable; transient ListObjectsEqualComparer currentStructEqualComparer; transient ListObjectsEqualComparer newKeyStructEqualComparer; @@ -107,7 +139,7 @@ public void setHashKey() { } @Override - public void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException { + public void getNewKey(Object row, ObjectInspector inputOI) throws HiveException { // Compute the keys for (int i = 0; i < keyFields.length; i++) { keys[i] = keyFields[i].evaluate(row); @@ -153,24 +185,18 @@ private void deepCopyElements(Object[] keys, } } - transient Object[] singleEleArray = new Object[1]; - transient StringObjectInspector soi_new, soi_copy; - - class TextKeyWrapper extends KeyWrapper { + class SimpleKeys extends KeyWrapper { + int hashcode; - Object key; - boolean isCopy; + final Object[] keys; - public TextKeyWrapper(boolean isCopy) { - this(-1, null, isCopy); + public SimpleKeys() { + this(-1, new Object[keyFields.length]); } - private TextKeyWrapper(int hashcode, Object key, - boolean isCopy) { - super(); + private SimpleKeys(int hashcode, Object[] keys) { this.hashcode = hashcode; - this.key = key; - this.isCopy = isCopy; + this.keys = keys; } @Override @@ -180,59 +206,55 @@ public int hashCode() { @Override public boolean equals(Object other) { - Object obj = ((TextKeyWrapper) other).key; - Text t1; - Text t2; - if (isCopy) { - t1 = soi_copy.getPrimitiveWritableObject(key); - t2 = soi_copy.getPrimitiveWritableObject(obj); - } else { - t1 = soi_new.getPrimitiveWritableObject(key); - t2 = soi_copy.getPrimitiveWritableObject(obj); - } - if (t1 == null && t2 == null) { - return true; - } else if (t1 == null || t2 == null) { - return false; - } else { - return t1.equals(t2); - } + return Arrays.equals(keys, ((SimpleKeys)other).keys); } @Override public void setHashKey() { - if (key == null) { - hashcode = 0; - } else{ - hashcode = key.hashCode(); - } + hashcode = Arrays.hashCode(keys); } @Override - public void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException { + public void getNewKey(Object row, ObjectInspector inputOI) throws HiveException { // Compute the keys - key = keyFields[0].evaluate(row); + for (int i = 0; i < keyFields.length; i++) { + Object value = keyFields[i].evaluate(row); + if (value != null) { + PrimitiveObjectInspector inspector = (PrimitiveObjectInspector) keyObjectInspectors[i]; + if (preferWritable[i]) { + value = inspector.getPrimitiveWritableObject(value); + } else { + value = inspector.getPrimitiveJavaObject(value); + } + } + keys[i] = value; + } } @Override public KeyWrapper copyKey() { - return new TextKeyWrapper(hashcode, ObjectInspectorUtils.copyToStandardObject(key, - soi_new, ObjectInspectorCopyOption.WRITABLE), true); + SimpleKeys copy = new SimpleKeys(); + copy.copyKey(this); + return copy; } @Override public void copyKey(KeyWrapper oldWrapper) { - TextKeyWrapper textWrapper = (TextKeyWrapper) oldWrapper; + SimpleKeys textWrapper = (SimpleKeys) oldWrapper; hashcode = textWrapper.hashcode; - isCopy = true; - key = ObjectInspectorUtils.copyToStandardObject(textWrapper.key, - soi_new, ObjectInspectorCopyOption.WRITABLE); + for (int i = 0; i < keys.length; i++) { + keys[i] = textWrapper.keys[i]; + if (preferWritable[i]) { + keys[i] = ((PrimitiveObjectInspector) outKeyObjectInspectors[i]).copyObject(textWrapper.keys[i]); + } else if (this != oldWrapper) { + keys[i] = textWrapper.keys[i]; + } + } } @Override public Object[] getKeyArray() { - singleEleArray[0] = key; - return singleEleArray; + return keys; } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java index e95505c..9a720c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java @@ -24,7 +24,6 @@ import java.util.Stack; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; @@ -39,8 +38,6 @@ import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; public class PTFOperator extends Operator implements Serializable { @@ -106,14 +103,11 @@ public void processOp(Object row, int tag) throws HiveException { * - set currentKey to the newKey if it is null or has changed. */ newKeys.getNewKey(row, inputObjInspectors[0]); - boolean keysAreEqual = (currentKeys != null && newKeys != null)? - newKeys.equals(currentKeys) : false; - - if (currentKeys != null && !keysAreEqual) { - ptfInvocation.finishPartition(); - } - - if (currentKeys == null || !keysAreEqual) { + + if (currentKeys == null || !newKeys.equals(currentKeys)) { + if (currentKeys != null) { + ptfInvocation.finishPartition(); + } ptfInvocation.startPartition(); if (currentKeys == null) { currentKeys = newKeys.copyKey(); @@ -149,7 +143,6 @@ protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException { int numExprs = exprs.size(); ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numExprs]; ObjectInspector[] keyOIs = new ObjectInspector[numExprs]; - ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs]; for(int i=0; i