Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1202523)
+++ conf/hive-default.xml (working copy)
@@ -281,6 +281,12 @@
+ hive.external.map.groupby.impl
+ org.apache.hadoop.hive.ql.util.ExternalJavaHashMap
+ Name of the class that implements rg.apache.hadoop.hive.ql.util.ExternalMap interface. Used to speed up GroupByOperator
+
+
+
hive.default.fileformat
TextFile
Default file format for CREATE TABLE statement. Options are TextFile and SequenceFile. Users can explicitly say CREATE TABLE ... STORED AS <TEXTFILE|SEQUENCEFILE> to override
Index: lib/hppc-0.4.1.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: lib/hppc-0.4.1.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: build-common.xml
===================================================================
--- build-common.xml (revision 1202523)
+++ build-common.xml (working copy)
@@ -352,7 +352,7 @@
errorProperty="tests.failed" failureProperty="tests.failed" filtertrace="off">
-
+
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (revision 1202523)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (working copy)
@@ -22,11 +22,11 @@
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -34,6 +34,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
@@ -44,17 +45,18 @@
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.util.ExternalMap;
import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.primitive.LazyStringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -71,6 +73,7 @@
private static final long serialVersionUID = 1L;
private static final int NUMROWSESTIMATESIZE = 1000;
+ private static final int HASH_AGGR_INITIAL_SIZE = 2048;
protected transient ExprNodeEvaluator[] keyFields;
protected transient ObjectInspector[] keyObjectInspectors;
@@ -108,8 +111,9 @@
protected transient Object[][] aggregationsParametersLastInvoke;
// Used by hash-based GroupBy: Mode = HASH, PARTIALS
- protected transient HashMap hashAggregations;
+ protected transient ExternalMap hashAggregations;
+
// Used by hash distinct aggregations when hashGrpKeyNotRedKey is true
protected transient HashSet keysCurrentGroup;
@@ -177,6 +181,30 @@
transient int countAfterReport;
transient int heartbeatInterval;
+ @SuppressWarnings("unchecked")
+ private ExternalMap getExternalMap(Configuration hconf)
+ throws HiveException {
+ try {
+ return (ExternalMap) Class.forName(
+ hconf.get("hive.external.map.groupby.impl"), true, JavaUtils.getClassLoader())
+ .getDeclaredConstructor(int.class).newInstance(HASH_AGGR_INITIAL_SIZE);
+ } catch (InstantiationException e) {
+ throw new HiveException(e);
+ } catch (IllegalAccessException e) {
+ throw new HiveException(e);
+ } catch (ClassNotFoundException e) {
+ throw new HiveException(e);
+ } catch (IllegalArgumentException e) {
+ throw new HiveException(e);
+ } catch (SecurityException e) {
+ throw new HiveException(e);
+ } catch (InvocationTargetException e) {
+ throw new HiveException(e);
+ } catch (NoSuchMethodException e) {
+ throw new HiveException(e);
+ }
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
totalMemory = Runtime.getRuntime().totalMemory();
@@ -328,7 +356,7 @@
aggregations = newAggregations();
hashAggr = false;
} else {
- hashAggregations = new HashMap(256);
+ hashAggregations = getExternalMap(hconf);
aggregations = newAggregations();
hashAggr = true;
keyPositionsSize = new ArrayList();
@@ -838,7 +866,7 @@
// The fixed size for the aggregation class is already known. Get the
// variable portion of the size every NUMROWSESTIMATESIZE rows.
if ((numEntriesHashTable == 0) || ((numEntries % NUMROWSESTIMATESIZE) == 0)) {
- //check how much memory left memory
+ //check how much memory is left
usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
rate = (float) usedMemory / (float) maxMemory;
if(rate > memoryThreshold){
@@ -904,33 +932,15 @@
// changed in the future
if (complete) {
- Iterator> iter = hashAggregations
- .entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry m = iter.next();
- forward(m.getKey().getKeyArray(), m.getValue());
- }
- hashAggregations.clear();
+ hashAggregations.removeAndForwardAll(this);
hashAggregations = null;
LOG.warn("Hash Table completed flushed");
return;
}
- int oldSize = hashAggregations.size();
- LOG.warn("Hash Tbl flush: #hash table = " + oldSize);
- Iterator> iter = hashAggregations
- .entrySet().iterator();
- int numDel = 0;
- while (iter.hasNext()) {
- Map.Entry m = iter.next();
- forward(m.getKey().getKeyArray(), m.getValue());
- iter.remove();
- numDel++;
- if (numDel * 10 >= oldSize) {
- LOG.warn("Hash Table flushed: new size = " + hashAggregations.size());
- return;
- }
- }
+ LOG.warn("Hash Tbl flush: #hash table = " + hashAggregations.size());
+ hashAggregations.removeAndForwardSome(0.1f, this);
+ LOG.warn("Hash Table flushed: new size = " + hashAggregations.size());
}
transient Object[] forwardCache;
@@ -942,9 +952,9 @@
* The keys in the record
* @throws HiveException
*/
- protected void forward(Object[] keys, AggregationBuffer[] aggs)
+ protected void forward(Object[] keys, AggregationBuffer[] aggs)
throws HiveException {
- int totalFields = keys.length+ aggs.length;
+ int totalFields = keys.length + aggs.length;
if (forwardCache == null) {
forwardCache = new Object[totalFields];
}
@@ -959,6 +969,11 @@
forward(forwardCache, outputObjInspector);
}
+ public void forward(KeyWrapper keyWrapper, AggregationBuffer[] aggregationBuffers)
+ throws HiveException {
+ forward(keyWrapper.getKeyArray(), aggregationBuffers);
+ }
+
/**
* We need to forward all the aggregations to children.
*
@@ -995,17 +1010,8 @@
forward(new Object[0], aggregations);
} else {
if (hashAggregations != null) {
- LOG.warn("Begin Hash Table flush at close: size = "
- + hashAggregations.size());
- Iterator iter = hashAggregations.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry m = (Map.Entry) iter
- .next();
-
- forward(m.getKey().getKeyArray(), m.getValue());
- iter.remove();
- }
- hashAggregations.clear();
+ LOG.warn("Begin Hash Table flush at close: size = " + hashAggregations.size());
+ hashAggregations.removeAndForwardAll(this);
} else if (aggregations != null) {
// sort-based aggregations
if (currentKeys != null) {
Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalHPPCObjectObjectOpenHashMap.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalHPPCObjectObjectOpenHashMap.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalHPPCObjectObjectOpenHashMap.java (revision 0)
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.KeyWrapper;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+
+public class ExternalHPPCObjectObjectOpenHashMap implements ExternalMap{
+
+ final ObjectObjectExpandedOpenHashMap innerHashMap;
+
+ public ExternalHPPCObjectObjectOpenHashMap(int initialCapacity){
+ innerHashMap = new ObjectObjectExpandedOpenHashMap(initialCapacity);
+ }
+
+ public ExternalHPPCObjectObjectOpenHashMap(int initialCapacity, float loadFactor){
+ innerHashMap = new ObjectObjectExpandedOpenHashMap(initialCapacity, loadFactor);
+ }
+
+ @Override
+ public int size() {
+ return innerHashMap.size();
+ }
+
+ @Override
+ public void clear() {
+ innerHashMap.clear();
+ }
+
+ @Override
+ public V get(K key) {
+ return innerHashMap.get(key);
+ }
+
+ @Override
+ public V put(K key, V value) {
+ return innerHashMap.put(key, value);
+ }
+
+ @Override
+ public void removeAndForwardSome(float part, GroupByOperator groupByOperator) throws HiveException {
+ final ObjectObjectExpandedOpenHashMap groupByMap = getGroupByMap();
+ final Object[] keys = groupByMap.keys;
+ final Object[] values = groupByMap.values;
+ final boolean[] states = groupByMap.allocated;
+ final int toLeave = Math.round(groupByMap.assigned * (1-part));
+
+ while(true){
+ for (int i = 0; i < states.length; ++i){
+ if(states[i]){
+ groupByOperator.forward((KeyWrapper) keys[i], (AggregationBuffer[]) values[i]);
+ groupByMap.remove(i);
+ if (toLeave > groupByMap.assigned) {
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void removeAndForwardAll(GroupByOperator groupByOperator) throws HiveException{
+ final ObjectObjectExpandedOpenHashMap groupByMap = getGroupByMap();
+ final Object[] keys = groupByMap.keys;
+ final Object[] values = groupByMap.values;
+ final boolean[] states = groupByMap.allocated;
+
+ for (int i = 0; i < states.length; ++i){
+ if(states[i]){
+ groupByOperator.forward((KeyWrapper) keys[i], (AggregationBuffer[]) values[i]);
+ }
+ }
+ groupByMap.clear();
+ }
+
+ @SuppressWarnings("unchecked")
+ private ObjectObjectExpandedOpenHashMap getGroupByMap() throws HiveException{
+ try{
+ return (ObjectObjectExpandedOpenHashMap) innerHashMap;
+ } catch (ClassCastException e){
+ throw new HiveException(e);
+ }
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalMap.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalMap.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalMap.java (revision 0)
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+
+public interface ExternalMap {
+
+ public int size();
+ public void clear();
+ public V get(K key);
+ public V put(K key, V Value);
+ public void removeAndForwardSome(float part, GroupByOperator groupByOperator) throws HiveException;
+ public void removeAndForwardAll(GroupByOperator groupByOperator) throws HiveException;
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaMap.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaMap.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaMap.java (revision 0)
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.KeyWrapper;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+
+public abstract class ExternalJavaMap implements ExternalMap {
+
+ protected Map innerHashMap;
+
+ @Override
+ public int size() {
+ return innerHashMap.size();
+ }
+
+ @Override
+ public void clear() {
+ innerHashMap.clear();
+ }
+
+ @Override
+ public V get(K key) {
+ return innerHashMap.get(key);
+ }
+
+ @Override
+ public V put(K key, V value) {
+ return innerHashMap.put(key, value);
+ }
+
+ @Override
+ public void removeAndForwardSome(float part, GroupByOperator groupByOperator) throws HiveException {
+ final Map groupByMap = getGroupByMap();
+ final Iterator> iter = groupByMap.entrySet().iterator();
+ final int toLeave = Math.round(groupByMap.size() * (1-part));
+
+ while (iter.hasNext() && toLeave < groupByMap.size()) {
+ Map.Entry entry = iter.next();
+ groupByOperator.forward(entry.getKey(), entry.getValue());
+ iter.remove();
+ }
+ }
+
+ @Override
+ public void removeAndForwardAll(GroupByOperator groupByOperator) throws HiveException{
+ final Map groupByMap = getGroupByMap();
+
+ for (Map.Entry entry : groupByMap.entrySet()){
+ groupByOperator.forward(entry.getKey(), entry.getValue());
+ }
+ groupByMap.clear();
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map getGroupByMap() throws HiveException{
+ try{
+ return (Map) innerHashMap;
+ } catch (ClassCastException e){
+ throw new HiveException(e);
+ }
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaHashMap.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaHashMap.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaHashMap.java (revision 0)
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import java.util.HashMap;
+
+public class ExternalJavaHashMap extends ExternalJavaMap{
+
+ public ExternalJavaHashMap(int initialCapacity, float loadFactor){
+ innerHashMap = new HashMap(initialCapacity, loadFactor);
+ }
+
+ public ExternalJavaHashMap(int initialCapacity){
+ innerHashMap = new HashMap(initialCapacity);
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/util/ObjectObjectExpandedOpenHashMap.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/util/ObjectObjectExpandedOpenHashMap.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/util/ObjectObjectExpandedOpenHashMap.java (revision 0)
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
+
+public class ObjectObjectExpandedOpenHashMap extends ObjectObjectOpenHashMap{
+
+ public ObjectObjectExpandedOpenHashMap (int initialCapacity){
+ super(initialCapacity);
+ }
+
+ public ObjectObjectExpandedOpenHashMap (int initialCapacity, float loadFactor){
+ super(initialCapacity, loadFactor);
+ }
+
+ public void remove(int slot){
+ --assigned;
+ shiftConflictingKeys(slot);
+ }
+}