Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1202523) +++ conf/hive-default.xml (working copy) @@ -281,6 +281,12 @@ + hive.external.map.groupby.impl + org.apache.hadoop.hive.ql.util.ExternalJavaHashMap + Name of the class that implements rg.apache.hadoop.hive.ql.util.ExternalMap interface. Used to speed up GroupByOperator + + + hive.default.fileformat TextFile Default file format for CREATE TABLE statement. Options are TextFile and SequenceFile. Users can explicitly say CREATE TABLE ... STORED AS <TEXTFILE|SEQUENCEFILE> to override Index: lib/hppc-0.4.1.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: lib/hppc-0.4.1.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: build-common.xml =================================================================== --- build-common.xml (revision 1202523) +++ build-common.xml (working copy) @@ -352,7 +352,7 @@ errorProperty="tests.failed" failureProperty="tests.failed" filtertrace="off"> - + Index: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (revision 1202523) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (working copy) @@ -22,11 +22,11 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,6 +34,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.OpParseContext; @@ -44,17 +45,18 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.util.ExternalMap; import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; import org.apache.hadoop.hive.serde2.lazy.objectinspector.primitive.LazyStringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.UnionObject; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -71,6 +73,7 @@ private static final long serialVersionUID = 1L; private static final int NUMROWSESTIMATESIZE = 1000; + private static final int HASH_AGGR_INITIAL_SIZE = 2048; protected transient ExprNodeEvaluator[] keyFields; protected transient ObjectInspector[] keyObjectInspectors; @@ -108,8 +111,9 @@ protected transient Object[][] aggregationsParametersLastInvoke; // Used by hash-based GroupBy: Mode = HASH, PARTIALS - protected transient HashMap hashAggregations; + protected transient ExternalMap hashAggregations; + // Used by hash distinct aggregations when hashGrpKeyNotRedKey is true protected transient HashSet keysCurrentGroup; @@ -177,6 +181,30 @@ transient int countAfterReport; transient int heartbeatInterval; + @SuppressWarnings("unchecked") + private ExternalMap getExternalMap(Configuration hconf) + throws HiveException { + try { + return (ExternalMap) Class.forName( + hconf.get("hive.external.map.groupby.impl"), true, JavaUtils.getClassLoader()) + .getDeclaredConstructor(int.class).newInstance(HASH_AGGR_INITIAL_SIZE); + } catch (InstantiationException e) { + throw new HiveException(e); + } catch (IllegalAccessException e) { + throw new HiveException(e); + } catch (ClassNotFoundException e) { + throw new HiveException(e); + } catch (IllegalArgumentException e) { + throw new HiveException(e); + } catch (SecurityException e) { + throw new HiveException(e); + } catch (InvocationTargetException e) { + throw new HiveException(e); + } catch (NoSuchMethodException e) { + throw new HiveException(e); + } + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { totalMemory = Runtime.getRuntime().totalMemory(); @@ -328,7 +356,7 @@ aggregations = newAggregations(); hashAggr = false; } else { - hashAggregations = new HashMap(256); + hashAggregations = getExternalMap(hconf); aggregations = newAggregations(); hashAggr = true; keyPositionsSize = new ArrayList(); @@ -838,7 +866,7 @@ // The fixed size for the aggregation class is already known. Get the // variable portion of the size every NUMROWSESTIMATESIZE rows. if ((numEntriesHashTable == 0) || ((numEntries % NUMROWSESTIMATESIZE) == 0)) { - //check how much memory left memory + //check how much memory is left usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); rate = (float) usedMemory / (float) maxMemory; if(rate > memoryThreshold){ @@ -904,33 +932,15 @@ // changed in the future if (complete) { - Iterator> iter = hashAggregations - .entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry m = iter.next(); - forward(m.getKey().getKeyArray(), m.getValue()); - } - hashAggregations.clear(); + hashAggregations.removeAndForwardAll(this); hashAggregations = null; LOG.warn("Hash Table completed flushed"); return; } - int oldSize = hashAggregations.size(); - LOG.warn("Hash Tbl flush: #hash table = " + oldSize); - Iterator> iter = hashAggregations - .entrySet().iterator(); - int numDel = 0; - while (iter.hasNext()) { - Map.Entry m = iter.next(); - forward(m.getKey().getKeyArray(), m.getValue()); - iter.remove(); - numDel++; - if (numDel * 10 >= oldSize) { - LOG.warn("Hash Table flushed: new size = " + hashAggregations.size()); - return; - } - } + LOG.warn("Hash Tbl flush: #hash table = " + hashAggregations.size()); + hashAggregations.removeAndForwardSome(0.1f, this); + LOG.warn("Hash Table flushed: new size = " + hashAggregations.size()); } transient Object[] forwardCache; @@ -942,9 +952,9 @@ * The keys in the record * @throws HiveException */ - protected void forward(Object[] keys, AggregationBuffer[] aggs) + protected void forward(Object[] keys, AggregationBuffer[] aggs) throws HiveException { - int totalFields = keys.length+ aggs.length; + int totalFields = keys.length + aggs.length; if (forwardCache == null) { forwardCache = new Object[totalFields]; } @@ -959,6 +969,11 @@ forward(forwardCache, outputObjInspector); } + public void forward(KeyWrapper keyWrapper, AggregationBuffer[] aggregationBuffers) + throws HiveException { + forward(keyWrapper.getKeyArray(), aggregationBuffers); + } + /** * We need to forward all the aggregations to children. * @@ -995,17 +1010,8 @@ forward(new Object[0], aggregations); } else { if (hashAggregations != null) { - LOG.warn("Begin Hash Table flush at close: size = " - + hashAggregations.size()); - Iterator iter = hashAggregations.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry m = (Map.Entry) iter - .next(); - - forward(m.getKey().getKeyArray(), m.getValue()); - iter.remove(); - } - hashAggregations.clear(); + LOG.warn("Begin Hash Table flush at close: size = " + hashAggregations.size()); + hashAggregations.removeAndForwardAll(this); } else if (aggregations != null) { // sort-based aggregations if (currentKeys != null) { Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalHPPCObjectObjectOpenHashMap.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalHPPCObjectObjectOpenHashMap.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalHPPCObjectObjectOpenHashMap.java (revision 0) @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.KeyWrapper; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; + +public class ExternalHPPCObjectObjectOpenHashMap implements ExternalMap{ + + final ObjectObjectExpandedOpenHashMap innerHashMap; + + public ExternalHPPCObjectObjectOpenHashMap(int initialCapacity){ + innerHashMap = new ObjectObjectExpandedOpenHashMap(initialCapacity); + } + + public ExternalHPPCObjectObjectOpenHashMap(int initialCapacity, float loadFactor){ + innerHashMap = new ObjectObjectExpandedOpenHashMap(initialCapacity, loadFactor); + } + + @Override + public int size() { + return innerHashMap.size(); + } + + @Override + public void clear() { + innerHashMap.clear(); + } + + @Override + public V get(K key) { + return innerHashMap.get(key); + } + + @Override + public V put(K key, V value) { + return innerHashMap.put(key, value); + } + + @Override + public void removeAndForwardSome(float part, GroupByOperator groupByOperator) throws HiveException { + final ObjectObjectExpandedOpenHashMap groupByMap = getGroupByMap(); + final Object[] keys = groupByMap.keys; + final Object[] values = groupByMap.values; + final boolean[] states = groupByMap.allocated; + final int toLeave = Math.round(groupByMap.assigned * (1-part)); + + while(true){ + for (int i = 0; i < states.length; ++i){ + if(states[i]){ + groupByOperator.forward((KeyWrapper) keys[i], (AggregationBuffer[]) values[i]); + groupByMap.remove(i); + if (toLeave > groupByMap.assigned) { + return; + } + } + } + } + } + + @Override + public void removeAndForwardAll(GroupByOperator groupByOperator) throws HiveException{ + final ObjectObjectExpandedOpenHashMap groupByMap = getGroupByMap(); + final Object[] keys = groupByMap.keys; + final Object[] values = groupByMap.values; + final boolean[] states = groupByMap.allocated; + + for (int i = 0; i < states.length; ++i){ + if(states[i]){ + groupByOperator.forward((KeyWrapper) keys[i], (AggregationBuffer[]) values[i]); + } + } + groupByMap.clear(); + } + + @SuppressWarnings("unchecked") + private ObjectObjectExpandedOpenHashMap getGroupByMap() throws HiveException{ + try{ + return (ObjectObjectExpandedOpenHashMap) innerHashMap; + } catch (ClassCastException e){ + throw new HiveException(e); + } + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalMap.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalMap.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalMap.java (revision 0) @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.metadata.HiveException; + + +public interface ExternalMap { + + public int size(); + public void clear(); + public V get(K key); + public V put(K key, V Value); + public void removeAndForwardSome(float part, GroupByOperator groupByOperator) throws HiveException; + public void removeAndForwardAll(GroupByOperator groupByOperator) throws HiveException; +} Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaMap.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaMap.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaMap.java (revision 0) @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.KeyWrapper; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; + +public abstract class ExternalJavaMap implements ExternalMap { + + protected Map innerHashMap; + + @Override + public int size() { + return innerHashMap.size(); + } + + @Override + public void clear() { + innerHashMap.clear(); + } + + @Override + public V get(K key) { + return innerHashMap.get(key); + } + + @Override + public V put(K key, V value) { + return innerHashMap.put(key, value); + } + + @Override + public void removeAndForwardSome(float part, GroupByOperator groupByOperator) throws HiveException { + final Map groupByMap = getGroupByMap(); + final Iterator> iter = groupByMap.entrySet().iterator(); + final int toLeave = Math.round(groupByMap.size() * (1-part)); + + while (iter.hasNext() && toLeave < groupByMap.size()) { + Map.Entry entry = iter.next(); + groupByOperator.forward(entry.getKey(), entry.getValue()); + iter.remove(); + } + } + + @Override + public void removeAndForwardAll(GroupByOperator groupByOperator) throws HiveException{ + final Map groupByMap = getGroupByMap(); + + for (Map.Entry entry : groupByMap.entrySet()){ + groupByOperator.forward(entry.getKey(), entry.getValue()); + } + groupByMap.clear(); + } + + @SuppressWarnings("unchecked") + private Map getGroupByMap() throws HiveException{ + try{ + return (Map) innerHashMap; + } catch (ClassCastException e){ + throw new HiveException(e); + } + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaHashMap.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaHashMap.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaHashMap.java (revision 0) @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import java.util.HashMap; + +public class ExternalJavaHashMap extends ExternalJavaMap{ + + public ExternalJavaHashMap(int initialCapacity, float loadFactor){ + innerHashMap = new HashMap(initialCapacity, loadFactor); + } + + public ExternalJavaHashMap(int initialCapacity){ + innerHashMap = new HashMap(initialCapacity); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/util/ObjectObjectExpandedOpenHashMap.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/util/ObjectObjectExpandedOpenHashMap.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/util/ObjectObjectExpandedOpenHashMap.java (revision 0) @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import com.carrotsearch.hppc.ObjectObjectOpenHashMap; + +public class ObjectObjectExpandedOpenHashMap extends ObjectObjectOpenHashMap{ + + public ObjectObjectExpandedOpenHashMap (int initialCapacity){ + super(initialCapacity); + } + + public ObjectObjectExpandedOpenHashMap (int initialCapacity, float loadFactor){ + super(initialCapacity, loadFactor); + } + + public void remove(int slot){ + --assigned; + shiftConflictingKeys(slot); + } +}