Index: serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ListObjectsEqualComparer.java =================================================================== --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ListObjectsEqualComparer.java (revision 10111) +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ListObjectsEqualComparer.java (working copy) @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.serde2.objectinspector; -import java.util.ArrayList; - import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; @@ -155,15 +153,15 @@ * @param ol1 * @return True if object in ol0 and ol1 are all identical */ - public boolean areEqual(ArrayList ol0, ArrayList ol1) { - if (ol0.size() != numFields || ol1.size() != numFields) { - if (ol0.size() != ol1.size()) { + public boolean areEqual(Object[] ol0, Object[] ol1) { + if (ol0.length != numFields || ol1.length != numFields) { + if (ol0.length != ol1.length) { return false; } - assert (ol0.size() <= numFields); - assert (ol1.size() <= numFields); - for (int i = 0; i < Math.min(ol0.size(), ol1.size()); i++) { - if (!fieldComparers[i].areEqual(ol0.get(i), ol1.get(i))) { + assert (ol0.length <= numFields); + assert (ol1.length <= numFields); + for (int i = 0; i < Math.min(ol0.length, ol1.length); i++) { + if (!fieldComparers[i].areEqual(ol0[i], ol1[i])) { return false; } } @@ -171,7 +169,7 @@ } for (int i = 0; i < numFields; i++) { - if (!fieldComparers[i].areEqual(ol0.get(i), ol1.get(i))) { + if (!fieldComparers[i].areEqual(ol0[i], ol1[i])) { return false; } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java (revision 0) @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +public abstract class KeyWrapper { + abstract void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException; + abstract void setHashKey(); + abstract KeyWrapper copyKey(); + abstract void copyKey(KeyWrapper oldWrapper); + abstract Object[] getKeyArray(); +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (revision 10111) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (working copy) @@ -44,7 +44,6 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; import org.apache.hadoop.hive.serde2.lazy.objectinspector.primitive.LazyStringObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ListObjectsEqualComparer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; @@ -73,7 +72,6 @@ protected transient ExprNodeEvaluator[] keyFields; protected transient ObjectInspector[] keyObjectInspectors; - protected transient Object[] keyObjects; protected transient ExprNodeEvaluator[][] aggregationParameterFields; protected transient ObjectInspector[][] aggregationParameterObjectInspectors; @@ -99,10 +97,11 @@ protected transient ArrayList objectInspectors; transient ArrayList fieldNames; + transient KeyWrapperFactory keyWrapperFactory; // Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2, // MERGEPARTIAL - protected transient ArrayList currentKeys; - protected transient ArrayList newKeys; + protected transient KeyWrapper currentKeys; + protected transient KeyWrapper newKeys; protected transient AggregationBuffer[] aggregations; protected transient Object[][] aggregationsParametersLastInvoke; @@ -110,7 +109,7 @@ protected transient HashMap hashAggregations; // Used by hash distinct aggregations when hashGrpKeyNotRedKey is true - protected transient HashSet> keysCurrentGroup; + protected transient HashSet keysCurrentGroup; transient boolean bucketGroup; @@ -134,8 +133,6 @@ // new Key ObjectInspectors are objectInspectors from the parent transient StructObjectInspector newKeyObjectInspector; transient StructObjectInspector currentKeyObjectInspector; - transient ListObjectsEqualComparer currentStructEqualComparer; - transient ListObjectsEqualComparer newKeyStructEqualComparer; /** * This is used to store the position and field names for variable length @@ -192,16 +189,13 @@ keyFields = new ExprNodeEvaluator[conf.getKeys().size()]; keyObjectInspectors = new ObjectInspector[conf.getKeys().size()]; currentKeyObjectInspectors = new ObjectInspector[conf.getKeys().size()]; - keyObjects = new Object[conf.getKeys().size()]; for (int i = 0; i < keyFields.length; i++) { keyFields[i] = ExprNodeEvaluatorFactory.get(conf.getKeys().get(i)); keyObjectInspectors[i] = keyFields[i].initialize(rowInspector); currentKeyObjectInspectors[i] = ObjectInspectorUtils .getStandardObjectInspector(keyObjectInspectors[i], ObjectInspectorCopyOption.WRITABLE); - keyObjects[i] = null; } - newKeys = new ArrayList(keyFields.length); // initialize unionExpr for reduce-side // reduce KEY has union field as the last field if there are distinct @@ -344,7 +338,7 @@ HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION); groupKeyIsNotReduceKey = conf.getGroupKeyNotReductionKey(); if (groupKeyIsNotReduceKey) { - keysCurrentGroup = new HashSet>(); + keysCurrentGroup = new HashSet(); } } @@ -365,12 +359,14 @@ currentKeyObjectInspector = ObjectInspectorFactory .getStandardStructObjectInspector(keyNames, Arrays .asList(currentKeyObjectInspectors)); - currentStructEqualComparer = new ListObjectsEqualComparer(currentKeyObjectInspectors, currentKeyObjectInspectors); - newKeyStructEqualComparer = new ListObjectsEqualComparer(currentKeyObjectInspectors, keyObjectInspectors); outputObjInspector = ObjectInspectorFactory .getStandardStructObjectInspector(fieldNames, objectInspectors); + keyWrapperFactory = new KeyWrapperFactory(keyFields, keyObjectInspectors, currentKeyObjectInspectors); + + newKeys = keyWrapperFactory.getKeyWrapper(); + firstRow = true; // estimate the number of hash table entries based on the size of each // entry. Since the size of a entry @@ -713,17 +709,9 @@ try { countAfterReport++; - // Compute the keys - newKeys.clear(); - for (int i = 0; i < keyFields.length; i++) { - if (keyObjectInspectors[i] == null) { - keyObjectInspectors[i] = keyFields[i].initialize(rowInspector); - } - keyObjects[i] = keyFields[i].evaluate(row); - newKeys.add(keyObjects[i]); - } - + newKeys.getNewKey(row, rowInspector); if (hashAggr) { + newKeys.setHashKey(); processHashAggr(row, rowInspector, newKeys); } else { processAggr(row, rowInspector, newKeys); @@ -743,84 +731,16 @@ } } - private static ArrayList deepCopyElements(Object[] keys, - ObjectInspector[] keyObjectInspectors, - ObjectInspectorCopyOption copyOption) { - ArrayList result = new ArrayList(keys.length); - deepCopyElements(keys, keyObjectInspectors, result, copyOption); - return result; - } - - private static void deepCopyElements(Object[] keys, - ObjectInspector[] keyObjectInspectors, ArrayList result, - ObjectInspectorCopyOption copyOption) { - result.clear(); - for (int i = 0; i < keys.length; i++) { - result.add(ObjectInspectorUtils.copyToStandardObject(keys[i], - keyObjectInspectors[i], copyOption)); - } - } - - class KeyWrapper { - int hashcode; - ArrayList keys; - // decide whether this is already in hashmap (keys in hashmap are deepcopied - // version, and we need to use 'currentKeyObjectInspector'). - boolean copy = false; - - KeyWrapper() { - } - - public KeyWrapper(int hashcode, ArrayList copiedKeys) { - this(hashcode, copiedKeys, false); - } - - public KeyWrapper(int hashcode, ArrayList copiedKeys, - boolean inHashMap) { - super(); - this.hashcode = hashcode; - keys = copiedKeys; - copy = inHashMap; - } - - @Override - public int hashCode() { - return hashcode; - } - - @Override - public boolean equals(Object obj) { - ArrayList copied_in_hashmap = ((KeyWrapper) obj).keys; - if (!copy) { - return newKeyStructEqualComparer.areEqual(copied_in_hashmap, keys); - } else { - return currentStructEqualComparer.areEqual(copied_in_hashmap, keys); - } - } - } - - - - KeyWrapper keyProber = new KeyWrapper(); - private void processHashAggr(Object row, ObjectInspector rowInspector, - ArrayList newKeys) throws HiveException { + KeyWrapper newKeys) throws HiveException { // Prepare aggs for updating AggregationBuffer[] aggs = null; boolean newEntryForHashAggr = false; - keyProber.hashcode = newKeys.hashCode(); - // use this to probe the hashmap - keyProber.keys = newKeys; - // hash-based aggregations - aggs = hashAggregations.get(keyProber); - ArrayList newDefaultKeys = null; + aggs = hashAggregations.get(newKeys); if (aggs == null) { - newDefaultKeys = deepCopyElements(keyObjects, keyObjectInspectors, - ObjectInspectorCopyOption.WRITABLE); - KeyWrapper newKeyProber = new KeyWrapper(keyProber.hashcode, - newDefaultKeys, true); + KeyWrapper newKeyProber = newKeys.copyKey(); aggs = newAggregations(); hashAggregations.put(newKeyProber, aggs); newEntryForHashAggr = true; @@ -833,11 +753,7 @@ // Peek into the set to find out if a new grouping key is seen for the given // reduction key if (groupKeyIsNotReduceKey) { - if (newDefaultKeys == null) { - newDefaultKeys = deepCopyElements(keyObjects, keyObjectInspectors, - ObjectInspectorCopyOption.WRITABLE); - } - newEntryForHashAggr = keysCurrentGroup.add(newDefaultKeys); + newEntryForHashAggr = keysCurrentGroup.add(newKeys.copyKey()); } // Update the aggs @@ -859,27 +775,30 @@ // Non-hash aggregation private void processAggr(Object row, ObjectInspector rowInspector, - ArrayList newKeys) throws HiveException { + KeyWrapper newKeys) throws HiveException { // Prepare aggs for updating AggregationBuffer[] aggs = null; Object[][] lastInvoke = null; + //boolean keysAreEqual = (currentKeys != null && newKeys != null)? + // newKeyStructEqualComparer.areEqual(currentKeys, newKeys) : false; + boolean keysAreEqual = (currentKeys != null && newKeys != null)? - newKeyStructEqualComparer.areEqual(currentKeys, newKeys) : false; + newKeys.equals(currentKeys) : false; // Forward the current keys if needed for sort-based aggregation if (currentKeys != null && !keysAreEqual) { - forward(currentKeys, aggregations); + forward(currentKeys.getKeyArray(), aggregations); countAfterReport = 0; } // Need to update the keys? if (currentKeys == null || !keysAreEqual) { if (currentKeys == null) { - currentKeys = new ArrayList(keyFields.length); + currentKeys = newKeys.copyKey(); + } else { + currentKeys.copyKey(newKeys); } - deepCopyElements(keyObjects, keyObjectInspectors, currentKeys, - ObjectInspectorCopyOption.WRITABLE); // Reset the aggregations resetAggregations(aggregations); @@ -904,14 +823,14 @@ * @param newKeys * keys for the row under consideration **/ - private boolean shouldBeFlushed(ArrayList newKeys) { + private boolean shouldBeFlushed(KeyWrapper newKeys) { int numEntries = hashAggregations.size(); // The fixed size for the aggregation class is already known. Get the // variable portion of the size every NUMROWSESTIMATESIZE rows. if ((numEntriesHashTable == 0) || ((numEntries % NUMROWSESTIMATESIZE) == 0)) { for (Integer pos : keyPositionsSize) { - Object key = newKeys.get(pos.intValue()); + Object key = newKeys.getKeyArray()[pos.intValue()]; // Ignore nulls if (key != null) { if (key instanceof LazyPrimitive) { @@ -928,8 +847,7 @@ AggregationBuffer[] aggs = null; if (aggrPositions.size() > 0) { - KeyWrapper newKeyProber = new KeyWrapper( - newKeys.hashCode(), newKeys); + KeyWrapper newKeyProber = newKeys.copyKey(); aggs = hashAggregations.get(newKeyProber); } @@ -975,7 +893,7 @@ .entrySet().iterator(); while (iter.hasNext()) { Map.Entry m = iter.next(); - forward(m.getKey().keys, m.getValue()); + forward(m.getKey().getKeyArray(), m.getValue()); } hashAggregations.clear(); hashAggregations = null; @@ -990,7 +908,7 @@ int numDel = 0; while (iter.hasNext()) { Map.Entry m = iter.next(); - forward(m.getKey().keys, m.getValue()); + forward(m.getKey().getKeyArray(), m.getValue()); iter.remove(); numDel++; if (numDel * 10 >= oldSize) { @@ -1009,19 +927,20 @@ * The keys in the record * @throws HiveException */ - protected void forward(ArrayList keys, AggregationBuffer[] aggs) + protected void forward(Object[] keys, AggregationBuffer[] aggs) throws HiveException { - int totalFields = keys.size() + aggs.length; + int totalFields = keys.length+ aggs.length; if (forwardCache == null) { forwardCache = new Object[totalFields]; } - for (int i = 0; i < keys.size(); i++) { - forwardCache[i] = keys.get(i); + for (int i = 0; i < keys.length; i++) { + forwardCache[i] = keys[i]; } for (int i = 0; i < aggs.length; i++) { - forwardCache[keys.size() + i] = aggregationEvaluators[i] + forwardCache[keys.length + i] = aggregationEvaluators[i] .evaluate(aggs[i]); } + forward(forwardCache, outputObjInspector); } @@ -1058,7 +977,7 @@ } // create dummy keys - size 0 - forward(new ArrayList(0), aggregations); + forward(new Object[0], aggregations); } else { if (hashAggregations != null) { LOG.warn("Begin Hash Table flush at close: size = " @@ -1067,14 +986,15 @@ while (iter.hasNext()) { Map.Entry m = (Map.Entry) iter .next(); - forward(m.getKey().keys, m.getValue()); + + forward(m.getKey().getKeyArray(), m.getValue()); iter.remove(); } hashAggregations.clear(); } else if (aggregations != null) { // sort-based aggregations if (currentKeys != null) { - forward(currentKeys, aggregations); + forward(currentKeys.getKeyArray(), aggregations); } currentKeys = null; } else { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java (revision 0) @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectsEqualComparer; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Text; + +public class KeyWrapperFactory { + public KeyWrapperFactory(ExprNodeEvaluator[] keyFields, ObjectInspector[] keyObjectInspectors, + ObjectInspector[] currentKeyObjectInspectors) { + this.keyFields = keyFields; + this.keyObjectInspectors = keyObjectInspectors; + this.currentKeyObjectInspectors = currentKeyObjectInspectors; + + } + + public KeyWrapper getKeyWrapper() { + if (keyFields.length == 1 + && TypeInfoUtils.getTypeInfoFromObjectInspector(keyObjectInspectors[0]).equals( + TypeInfoFactory.stringTypeInfo)) { + assert(TypeInfoUtils.getTypeInfoFromObjectInspector(currentKeyObjectInspectors[0]).equals( + TypeInfoFactory.stringTypeInfo)); + soi_new = (StringObjectInspector) keyObjectInspectors[0]; + soi_copy = (StringObjectInspector) currentKeyObjectInspectors[0]; + return new TextKeyWrapper(false); + } else { + currentStructEqualComparer = new ListObjectsEqualComparer(currentKeyObjectInspectors, currentKeyObjectInspectors); + newKeyStructEqualComparer = new ListObjectsEqualComparer(currentKeyObjectInspectors, keyObjectInspectors); + return new ListKeyWrapper(false); + } + } + + transient ExprNodeEvaluator[] keyFields; + transient ObjectInspector[] keyObjectInspectors; + transient ObjectInspector[] currentKeyObjectInspectors; + + + transient ListObjectsEqualComparer currentStructEqualComparer; + transient ListObjectsEqualComparer newKeyStructEqualComparer; + + class ListKeyWrapper extends KeyWrapper { + int hashcode; + Object[] keys; + // decide whether this is already in hashmap (keys in hashmap are deepcopied + // version, and we need to use 'currentKeyObjectInspector'). + ListObjectsEqualComparer equalComparer; + + public ListKeyWrapper(boolean isCopy) { + this(-1, new Object[keyFields.length], isCopy); + } + + private ListKeyWrapper(int hashcode, Object[] copiedKeys, + boolean isCopy) { + super(); + this.hashcode = hashcode; + keys = copiedKeys; + setEqualComparer(isCopy); + } + + private void setEqualComparer(boolean copy) { + if (!copy) { + equalComparer = newKeyStructEqualComparer; + } else { + equalComparer = currentStructEqualComparer; + } + } + + @Override + public int hashCode() { + return hashcode; + } + + @Override + public boolean equals(Object obj) { + Object[] copied_in_hashmap = ((ListKeyWrapper) obj).keys; + return equalComparer.areEqual(copied_in_hashmap, keys); + } + + @Override + public void setHashKey() { + hashcode = Arrays.hashCode(keys); + } + + @Override + public void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException { + // Compute the keys + for (int i = 0; i < keyFields.length; i++) { + keys[i] = keyFields[i].evaluate(row); + } + } + + @Override + public KeyWrapper copyKey() { + Object[] newDefaultKeys = deepCopyElements(keys, keyObjectInspectors, + ObjectInspectorCopyOption.WRITABLE); + return new ListKeyWrapper(hashcode, newDefaultKeys, true); + } + + @Override + public void copyKey(KeyWrapper oldWrapper) { + ListKeyWrapper listWrapper = (ListKeyWrapper) oldWrapper; + hashcode = listWrapper.hashcode; + equalComparer = currentStructEqualComparer; + deepCopyElements(listWrapper.keys, keyObjectInspectors, keys, + ObjectInspectorCopyOption.WRITABLE); + } + + @Override + public Object[] getKeyArray() { + return keys; + } + + private Object[] deepCopyElements(Object[] keys, + ObjectInspector[] keyObjectInspectors, + ObjectInspectorCopyOption copyOption) { + Object[] result = new Object[keys.length]; + deepCopyElements(keys, keyObjectInspectors, result, copyOption); + return result; + } + + private void deepCopyElements(Object[] keys, + ObjectInspector[] keyObjectInspectors, Object[] result, + ObjectInspectorCopyOption copyOption) { + for (int i = 0; i < keys.length; i++) { + result[i] = ObjectInspectorUtils.copyToStandardObject(keys[i], + keyObjectInspectors[i], copyOption); + } + } + } + + transient Object[] singleEleArray = new Object[1]; + transient StringObjectInspector soi_new, soi_copy; + + class TextKeyWrapper extends KeyWrapper { + int hashcode; + Object key; + boolean isCopy; + + public TextKeyWrapper(boolean isCopy) { + this(-1, null, isCopy); + } + + private TextKeyWrapper(int hashcode, Object key, + boolean isCopy) { + super(); + this.hashcode = hashcode; + this.key = key; + this.isCopy = isCopy; + } + + @Override + public int hashCode() { + return hashcode; + } + + @Override + public boolean equals(Object other) { + Object obj = ((TextKeyWrapper) other).key; + Text t1; + Text t2; + if (isCopy) { + t1 = soi_copy.getPrimitiveWritableObject(key); + t2 = soi_copy.getPrimitiveWritableObject(obj); + } else { + t1 = soi_new.getPrimitiveWritableObject(key); + t2 = soi_copy.getPrimitiveWritableObject(obj); + } + if (t1 == null && t2 == null) { + return true; + } else if (t1 == null || t2 == null) { + return false; + } else { + return t1.equals(t2); + } + } + + @Override + public void setHashKey() { + if (key == null) { + hashcode = 0; + } else{ + hashcode = key.hashCode(); + } + } + + @Override + public void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException { + // Compute the keys + key = keyFields[0].evaluate(row); + } + + @Override + public KeyWrapper copyKey() { + return new TextKeyWrapper(hashcode, ObjectInspectorUtils.copyToStandardObject(key, + soi_new, ObjectInspectorCopyOption.WRITABLE), true); + } + + @Override + public void copyKey(KeyWrapper oldWrapper) { + TextKeyWrapper textWrapper = (TextKeyWrapper) oldWrapper; + hashcode = textWrapper.hashcode; + isCopy = true; + key = ObjectInspectorUtils.copyToStandardObject(textWrapper.key, + soi_new, ObjectInspectorCopyOption.WRITABLE); + } + + @Override + public Object[] getKeyArray() { + singleEleArray[0] = key; + return singleEleArray; + } + } +}