Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1202523)
+++ conf/hive-default.xml (working copy)
@@ -281,6 +281,12 @@
+ hive.external.map.groupby.impl
+ org.apache.hadoop.hive.ql.util.ExternalJavaHashMap
+ Name of the class that implements rg.apache.hadoop.hive.ql.util.ExternalMap interface. Used to speed up GroupByOperator
+
+
+
hive.default.fileformat
TextFile
Default file format for CREATE TABLE statement. Options are TextFile and SequenceFile. Users can explicitly say CREATE TABLE ... STORED AS <TEXTFILE|SEQUENCEFILE> to override
Index: lib/hppc-0.4.1.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: lib/hppc-0.4.1.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: lib/patricia-trie-0.6.jar
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: lib/patricia-trie-0.6.jar
___________________________________________________________________
Added: svn:mime-type
+ application/octet-stream
Index: build-common.xml
===================================================================
--- build-common.xml (revision 1202523)
+++ build-common.xml (working copy)
@@ -352,7 +352,7 @@
errorProperty="tests.failed" failureProperty="tests.failed" filtertrace="off">
-
+
Index: serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ListObjectsEqualComparer.java
===================================================================
--- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ListObjectsEqualComparer.java (revision 1202523)
+++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ListObjectsEqualComparer.java (working copy)
@@ -39,7 +39,7 @@
*
*/
public class ListObjectsEqualComparer {
- enum CompareType {
+ public enum CompareType {
// Now only string, text, int, long, byte and boolean comparisons
// are treated as special cases.
// For other types, we reuse ObjectInspectorUtils.compare()
@@ -47,7 +47,7 @@
COMPARE_BOOL, OTHER
}
- class FieldComparer {
+ public class FieldComparer {
protected ObjectInspector oi0, oi1;
protected ObjectInspector compareOI;
protected CompareType compareType;
@@ -128,6 +128,18 @@
o0, oi0, o1, oi1) == 0);
}
}
+
+ public CompareType getCompareType(){
+ return compareType;
+ }
+
+ //can throw class cast exception, assumes the ones with 1 are the proper ones
+ public PrimitiveObjectInspector getMainObjectInspector(){
+ return (PrimitiveObjectInspector) oi1;
+ }
+ public StringObjectInspector getMainStringObjectInspector(){
+ return soi1;
+ }
}
FieldComparer[] fieldComparers;
@@ -158,9 +170,9 @@
if (ol0.length != ol1.length) {
return false;
}
+
assert (ol0.length <= numFields);
- assert (ol1.length <= numFields);
- for (int i = 0; i < Math.min(ol0.length, ol1.length); i++) {
+ for (int i = 0; i < ol0.length; i++) {
if (!fieldComparers[i].areEqual(ol0[i], ol1[i])) {
return false;
}
@@ -175,4 +187,12 @@
}
return true;
}
+
+ public FieldComparer[] getFieldComparers(){
+ return fieldComparers;
+ }
+
+ public int getNumFields(){
+ return numFields;
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (revision 1202523)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (working copy)
@@ -22,11 +22,11 @@
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -34,6 +34,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
@@ -44,17 +45,18 @@
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import org.apache.hadoop.hive.ql.util.ExternalMap;
import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive;
import org.apache.hadoop.hive.serde2.lazy.objectinspector.primitive.LazyStringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -71,6 +73,7 @@
private static final long serialVersionUID = 1L;
private static final int NUMROWSESTIMATESIZE = 1000;
+ private static final int HASH_AGGR_INITIAL_SIZE = 2048;
protected transient ExprNodeEvaluator[] keyFields;
protected transient ObjectInspector[] keyObjectInspectors;
@@ -108,8 +111,9 @@
protected transient Object[][] aggregationsParametersLastInvoke;
// Used by hash-based GroupBy: Mode = HASH, PARTIALS
- protected transient HashMap hashAggregations;
+ protected transient ExternalMap hashAggregations;
+
// Used by hash distinct aggregations when hashGrpKeyNotRedKey is true
protected transient HashSet keysCurrentGroup;
@@ -177,6 +181,31 @@
transient int countAfterReport;
transient int heartbeatInterval;
+ @SuppressWarnings("unchecked")
+ private ExternalMap getExternalMap(Configuration hconf)
+ throws HiveException {
+ try {
+ return (ExternalMap) Class.forName(
+ hconf.get("hive.external.map.groupby.impl"), true, JavaUtils.getClassLoader())
+ .getDeclaredConstructor(int.class, Object.class)
+ .newInstance(HASH_AGGR_INITIAL_SIZE, keyWrapperFactory);
+ } catch (InstantiationException e) {
+ throw new HiveException(e);
+ } catch (IllegalAccessException e) {
+ throw new HiveException(e);
+ } catch (ClassNotFoundException e) {
+ throw new HiveException(e);
+ } catch (IllegalArgumentException e) {
+ throw new HiveException(e);
+ } catch (SecurityException e) {
+ throw new HiveException(e);
+ } catch (InvocationTargetException e) {
+ throw new HiveException(e);
+ } catch (NoSuchMethodException e) {
+ throw new HiveException(e);
+ }
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
totalMemory = Runtime.getRuntime().totalMemory();
@@ -328,7 +357,6 @@
aggregations = newAggregations();
hashAggr = false;
} else {
- hashAggregations = new HashMap(256);
aggregations = newAggregations();
hashAggr = true;
keyPositionsSize = new ArrayList();
@@ -371,6 +399,10 @@
newKeys = keyWrapperFactory.getKeyWrapper();
+ if (!(conf.getMode() != GroupByDesc.Mode.HASH || bucketGroup)){
+ hashAggregations = getExternalMap(hconf); //sometimes needs keyWrapperFactory to be ready
+ }
+
firstRow = true;
// estimate the number of hash table entries based on the size of each
// entry. Since the size of a entry
@@ -838,7 +870,7 @@
// The fixed size for the aggregation class is already known. Get the
// variable portion of the size every NUMROWSESTIMATESIZE rows.
if ((numEntriesHashTable == 0) || ((numEntries % NUMROWSESTIMATESIZE) == 0)) {
- //check how much memory left memory
+ //check how much memory is left
usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
rate = (float) usedMemory / (float) maxMemory;
if(rate > memoryThreshold){
@@ -904,33 +936,15 @@
// changed in the future
if (complete) {
- Iterator> iter = hashAggregations
- .entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry m = iter.next();
- forward(m.getKey().getKeyArray(), m.getValue());
- }
- hashAggregations.clear();
+ hashAggregations.removeAndForwardAll(this);
hashAggregations = null;
LOG.warn("Hash Table completed flushed");
return;
}
- int oldSize = hashAggregations.size();
- LOG.warn("Hash Tbl flush: #hash table = " + oldSize);
- Iterator> iter = hashAggregations
- .entrySet().iterator();
- int numDel = 0;
- while (iter.hasNext()) {
- Map.Entry m = iter.next();
- forward(m.getKey().getKeyArray(), m.getValue());
- iter.remove();
- numDel++;
- if (numDel * 10 >= oldSize) {
- LOG.warn("Hash Table flushed: new size = " + hashAggregations.size());
- return;
- }
- }
+ LOG.warn("Hash Tbl flush: #hash table = " + hashAggregations.size());
+ hashAggregations.removeAndForwardSome(0.1f, this);
+ LOG.warn("Hash Table flushed: new size = " + hashAggregations.size());
}
transient Object[] forwardCache;
@@ -942,9 +956,9 @@
* The keys in the record
* @throws HiveException
*/
- protected void forward(Object[] keys, AggregationBuffer[] aggs)
+ protected void forward(Object[] keys, AggregationBuffer[] aggs)
throws HiveException {
- int totalFields = keys.length+ aggs.length;
+ int totalFields = keys.length + aggs.length;
if (forwardCache == null) {
forwardCache = new Object[totalFields];
}
@@ -959,6 +973,11 @@
forward(forwardCache, outputObjInspector);
}
+ public void forward(KeyWrapper keyWrapper, AggregationBuffer[] aggregationBuffers)
+ throws HiveException {
+ forward(keyWrapper.getKeyArray(), aggregationBuffers);
+ }
+
/**
* We need to forward all the aggregations to children.
*
@@ -995,17 +1014,8 @@
forward(new Object[0], aggregations);
} else {
if (hashAggregations != null) {
- LOG.warn("Begin Hash Table flush at close: size = "
- + hashAggregations.size());
- Iterator iter = hashAggregations.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry m = (Map.Entry) iter
- .next();
-
- forward(m.getKey().getKeyArray(), m.getValue());
- iter.remove();
- }
- hashAggregations.clear();
+ LOG.warn("Begin Hash Table flush at close: size = " + hashAggregations.size());
+ hashAggregations.removeAndForwardAll(this);
} else if (aggregations != null) {
// sort-based aggregations
if (currentKeys != null) {
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java (revision 1202523)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java (working copy)
@@ -55,6 +55,16 @@
}
}
+ public Class> getKeyWrappersClass(){
+ if (keyFields.length == 1
+ && TypeInfoUtils.getTypeInfoFromObjectInspector(keyObjectInspectors[0]).equals(
+ TypeInfoFactory.stringTypeInfo)) {
+ return TextKeyWrapper.class;
+ } else {
+ return ListKeyWrapper.class;
+ }
+ }
+
transient ExprNodeEvaluator[] keyFields;
transient ObjectInspector[] keyObjectInspectors;
transient ObjectInspector[] currentKeyObjectInspectors;
@@ -63,9 +73,11 @@
transient ListObjectsEqualComparer currentStructEqualComparer;
transient ListObjectsEqualComparer newKeyStructEqualComparer;
- class ListKeyWrapper extends KeyWrapper {
+ public class ListKeyWrapper extends KeyWrapper {
int hashcode;
Object[] keys;
+ int[] keyOffsets; //number of 'bits' at which subsequent keys start and end.
+
// decide whether this is already in hashmap (keys in hashmap are deepcopied
// version, and we need to use 'currentKeyObjectInspector').
ListObjectsEqualComparer equalComparer;
@@ -151,12 +163,71 @@
keyObjectInspectors[i], copyOption);
}
}
+
+ public ListObjectsEqualComparer getEqualComparer(){
+ return equalComparer;
+ }
+
+ public int[] getKeyOffsets() {
+ if (keyOffsets == null) {
+ CountKeyOffsets(equalComparer);
+ }
+ return keyOffsets;
+ }
+
+ public int getBitLength() {
+ if (keyOffsets == null) {
+ CountKeyOffsets(equalComparer);
+ }
+ return keyOffsets[keyOffsets.length - 1];
+ }
+
+ // Currently only supports String, int, long, byte and bool
+ // Only used by Patricia-Trie implementation
+ // Assumes keys don't change after insertion into the Trie and are not null
+ private void CountKeyOffsets(ListObjectsEqualComparer currentStructEqualComparer) {
+ assert (keys.length <= currentStructEqualComparer.getFieldComparers().length);
+ keyOffsets = new int[keys.length + 1];
+ for (int i = 0; i < keys.length; ++i) {
+ switch (currentStructEqualComparer.getFieldComparers()[i].getCompareType()) {
+ case COMPARE_TEXT:
+ case COMPARE_STRING:
+ keyOffsets[i + 1] = keyOffsets[i]
+ + currentStructEqualComparer.getFieldComparers()[i].getMainStringObjectInspector()
+ .getPrimitiveJavaObject(keys[i]).length() * Character.SIZE;
+ break;
+ case COMPARE_INT:
+ keyOffsets[i + 1] = keyOffsets[i] + Integer.SIZE;
+ break;
+ case COMPARE_LONG:
+ keyOffsets[i + 1] = keyOffsets[i] + Long.SIZE;
+ break;
+ case COMPARE_BYTE:
+ keyOffsets[i + 1] = keyOffsets[i] + Byte.SIZE;
+ break;
+ case COMPARE_BOOL:
+ keyOffsets[i + 1] = keyOffsets[i] + 1;
+ break;
+ default:
+ throw new RuntimeException("Patricie Trie key type currently not supported: "
+ + currentStructEqualComparer.getFieldComparers()[i].getCompareType());
+ }
+ }
+ }
}
transient Object[] singleEleArray = new Object[1];
transient StringObjectInspector soi_new, soi_copy;
- class TextKeyWrapper extends KeyWrapper {
+ public StringObjectInspector getSoi_new(){
+ return soi_new;
+ }
+
+ public StringObjectInspector getSoi_copy(){
+ return soi_copy;
+ }
+
+ public class TextKeyWrapper extends KeyWrapper {
int hashcode;
Object key;
boolean isCopy;
@@ -234,5 +305,12 @@
singleEleArray[0] = key;
return singleEleArray;
}
+
+ public boolean isCopy(){
+ return isCopy;
+ }
+ public Object getKey(){
+ return key;
+ }
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalHPPCObjectObjectOpenHashMap.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalHPPCObjectObjectOpenHashMap.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalHPPCObjectObjectOpenHashMap.java (revision 0)
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.KeyWrapper;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+
+public class ExternalHPPCObjectObjectOpenHashMap implements ExternalMap{
+
+ final ObjectObjectExpandedOpenHashMap innerHashMap;
+
+ public ExternalHPPCObjectObjectOpenHashMap(int initialCapacity){
+ innerHashMap = new ObjectObjectExpandedOpenHashMap(initialCapacity);
+ }
+
+ public ExternalHPPCObjectObjectOpenHashMap(int initialCapacity, float loadFactor){
+ innerHashMap = new ObjectObjectExpandedOpenHashMap(initialCapacity, loadFactor);
+ }
+
+ @Override
+ public int size() {
+ return innerHashMap.size();
+ }
+
+ @Override
+ public void clear() {
+ innerHashMap.clear();
+ }
+
+ @Override
+ public V get(K key) {
+ return innerHashMap.get(key);
+ }
+
+ @Override
+ public V put(K key, V value) {
+ return innerHashMap.put(key, value);
+ }
+
+ @Override
+ public void removeAndForwardSome(float part, GroupByOperator groupByOperator) throws HiveException {
+ final ObjectObjectExpandedOpenHashMap groupByMap = getGroupByMap();
+ final Object[] keys = groupByMap.keys;
+ final Object[] values = groupByMap.values;
+ final boolean[] states = groupByMap.allocated;
+ final int toLeave = Math.round(groupByMap.assigned * (1-part));
+
+ while(true){
+ for (int i = 0; i < states.length; ++i){
+ if(states[i]){
+ groupByOperator.forward((KeyWrapper) keys[i], (AggregationBuffer[]) values[i]);
+ groupByMap.remove(i);
+ if (toLeave > groupByMap.assigned) {
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void removeAndForwardAll(GroupByOperator groupByOperator) throws HiveException{
+ final ObjectObjectExpandedOpenHashMap groupByMap = getGroupByMap();
+ final Object[] keys = groupByMap.keys;
+ final Object[] values = groupByMap.values;
+ final boolean[] states = groupByMap.allocated;
+
+ for (int i = 0; i < states.length; ++i){
+ if(states[i]){
+ groupByOperator.forward((KeyWrapper) keys[i], (AggregationBuffer[]) values[i]);
+ }
+ }
+ groupByMap.clear();
+ }
+
+ @SuppressWarnings("unchecked")
+ private ObjectObjectExpandedOpenHashMap getGroupByMap() throws HiveException{
+ try{
+ return (ObjectObjectExpandedOpenHashMap) innerHashMap;
+ } catch (ClassCastException e){
+ throw new HiveException(e);
+ }
+ }
+}
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalMap.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalMap.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalMap.java (revision 0)
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+public interface ExternalMap {
+
+ public int size();
+ public void clear();
+ public V get(K key);
+ public V put(K key, V Value);
+ public void removeAndForwardSome(float part, GroupByOperator groupByOperator) throws HiveException;
+ public void removeAndForwardAll(GroupByOperator groupByOperator) throws HiveException;
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaMap.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaMap.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaMap.java (revision 0)
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.KeyWrapper;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+
+public abstract class ExternalJavaMap implements ExternalMap {
+
+ protected Map innerMap;
+
+ @Override
+ public int size() {
+ return innerMap.size();
+ }
+
+ @Override
+ public void clear() {
+ innerMap.clear();
+ }
+
+ @Override
+ public V get(K key) {
+ return innerMap.get(key);
+ }
+
+ @Override
+ public V put(K key, V value) {
+ return innerMap.put(key, value);
+ }
+
+ @Override
+ public void removeAndForwardSome(float part, GroupByOperator groupByOperator) throws HiveException {
+ final Map groupByMap = getGroupByMap();
+ final Iterator> iter = groupByMap.entrySet().iterator();
+ final int toLeave = Math.round(groupByMap.size() * (1-part));
+
+ while (iter.hasNext() && toLeave < groupByMap.size()) {
+ Map.Entry entry = iter.next();
+ groupByOperator.forward(entry.getKey(), entry.getValue());
+ iter.remove();
+ }
+ }
+
+ @Override
+ public void removeAndForwardAll(GroupByOperator groupByOperator) throws HiveException{
+ final Map groupByMap = getGroupByMap();
+
+ for (Map.Entry entry : groupByMap.entrySet()){
+ groupByOperator.forward(entry.getKey(), entry.getValue());
+ }
+ groupByMap.clear();
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map getGroupByMap() throws HiveException{
+ try{
+ return (Map) innerMap;
+ } catch (ClassCastException e){
+ throw new HiveException(e);
+ }
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalPatriciaTrie.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalPatriciaTrie.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalPatriciaTrie.java (revision 0)
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory;
+import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory.ListKeyWrapper;
+import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory.TextKeyWrapper;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.ardverk.collection.KeyAnalyzer;
+import org.ardverk.collection.PatriciaTrie;
+
+public class ExternalPatriciaTrie extends ExternalJavaMap {
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public ExternalPatriciaTrie(int ignored, Object inputKeyWrapperFactory) throws HiveException {
+ KeyWrapperFactory keyWraperFactory = (KeyWrapperFactory) inputKeyWrapperFactory;
+
+ if (keyWraperFactory.getKeyWrappersClass().equals(TextKeyWrapper.class)) {
+ KeyAnalyzer keyAnalyzer = new TextKeyWrapperAnalyzer(keyWraperFactory);
+ innerMap = new PatriciaTrie(keyAnalyzer);
+ } else {
+ // assume ListKeyWrapper
+ KeyAnalyzer keyAnalyzer = new ListKeyWrapperAnalyzer(keyWraperFactory);
+ innerMap = new PatriciaTrie(keyAnalyzer);
+ }
+ }
+
+ public ExternalPatriciaTrie(int ignored1, float ignored2, Object inputKeyWrapperFactory)
+ throws HiveException {
+ this(ignored1, inputKeyWrapperFactory);
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/util/TextKeyWrapperAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/util/TextKeyWrapperAnalyzer.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/util/TextKeyWrapperAnalyzer.java (revision 0)
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hive.ql.util;
+
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory;
+import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory.TextKeyWrapper;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.ardverk.collection.KeyAnalyzer;
+import org.ardverk.collection.StringKeyAnalyzer;
+
+public class TextKeyWrapperAnalyzer implements KeyAnalyzer{
+
+ final StringObjectInspector soi_new, soi_copy;
+ final StringKeyAnalyzer stringKeyAnalyzer;
+
+ public TextKeyWrapperAnalyzer(KeyWrapperFactory keyWrapperFactory) throws HiveException{
+ soi_new = keyWrapperFactory.getSoi_new();
+ soi_copy = keyWrapperFactory.getSoi_copy();
+ Class[] clazz = {int.class};
+ try {
+ stringKeyAnalyzer = PrivateInstantiator.getInstance(StringKeyAnalyzer.class, clazz, Character.SIZE);
+ } catch (SecurityException e) {
+ throw new HiveException(e);
+ } catch (IllegalArgumentException e) {
+ throw new HiveException(e);
+ } catch (NoSuchMethodException e) {
+ throw new HiveException(e);
+ } catch (InstantiationException e) {
+ throw new HiveException(e);
+ } catch (IllegalAccessException e) {
+ throw new HiveException(e);
+ } catch (InvocationTargetException e) {
+ throw new HiveException(e);
+ }
+ }
+
+ private String getString(TextKeyWrapper o){
+ if (o.isCopy()){
+ return soi_copy.getPrimitiveJavaObject(o.getKey());
+ } else {
+ return soi_new.getPrimitiveJavaObject(o.getKey());
+ }
+ }
+
+ @Override
+ public int compare(TextKeyWrapper o1, TextKeyWrapper o2) {
+ return stringKeyAnalyzer.compare(getString(o1), getString(o2));
+ }
+
+ @Override
+ public int bitIndex(TextKeyWrapper o1, TextKeyWrapper o2) {
+ return stringKeyAnalyzer.bitIndex(getString(o1), getString(o2));
+ }
+
+ @Override
+ public boolean isBitSet(TextKeyWrapper o, int bitIndex) {
+ return stringKeyAnalyzer.isBitSet(getString(o), bitIndex);
+ }
+
+ @Override
+ public boolean isPrefix(TextKeyWrapper o1, TextKeyWrapper o2) {
+ return stringKeyAnalyzer.isPrefix(getString(o1), getString(o2));
+ }
+
+ @Override
+ public int lengthInBits(TextKeyWrapper o) {
+ return stringKeyAnalyzer.lengthInBits(getString(o));
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaHashMap.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaHashMap.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaHashMap.java (revision 0)
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import java.util.HashMap;
+
+public class ExternalJavaHashMap extends ExternalJavaMap{
+
+ public ExternalJavaHashMap(int initialCapacity, Object ignored){
+ innerMap = new HashMap(initialCapacity);
+ }
+
+ public ExternalJavaHashMap(int initialCapacity, float loadFactor, Object ignored){
+ innerMap = new HashMap(initialCapacity, loadFactor);
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/util/ObjectObjectExpandedOpenHashMap.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/util/ObjectObjectExpandedOpenHashMap.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/util/ObjectObjectExpandedOpenHashMap.java (revision 0)
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
+
+public class ObjectObjectExpandedOpenHashMap extends ObjectObjectOpenHashMap{
+
+ public ObjectObjectExpandedOpenHashMap (int initialCapacity){
+ super(initialCapacity);
+ }
+
+ public ObjectObjectExpandedOpenHashMap (int initialCapacity, float loadFactor){
+ super(initialCapacity, loadFactor);
+ }
+
+ public void remove(int slot){
+ --assigned;
+ shiftConflictingKeys(slot);
+ }
+}
\ No newline at end of file
Index: ql/src/java/org/apache/hadoop/hive/ql/util/PrivateInstantiator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/util/PrivateInstantiator.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/util/PrivateInstantiator.java (revision 0)
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hive.ql.util;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+public class PrivateInstantiator {
+
+ private PrivateInstantiator() {
+ }; // oh the irony
+
+ public static T getInstance(Class clazz,
+ @SuppressWarnings("rawtypes") Class[] constructorArgumentsClasses,
+ Object... constructorArguments)
+ throws SecurityException, NoSuchMethodException, IllegalArgumentException,
+ InstantiationException, IllegalAccessException, InvocationTargetException {
+
+ Constructor cons = clazz.getDeclaredConstructor(constructorArgumentsClasses);
+ cons.setAccessible(true);
+ return cons.newInstance(constructorArguments);
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/util/ListKeyWrapperAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/util/ListKeyWrapperAnalyzer.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/util/ListKeyWrapperAnalyzer.java (revision 0)
@@ -0,0 +1,221 @@
+package org.apache.hadoop.hive.ql.util;
+
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory;
+import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory.ListKeyWrapper;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectsEqualComparer.FieldComparer;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.ardverk.collection.ByteKeyAnalyzer;
+import org.ardverk.collection.IntegerKeyAnalyzer;
+import org.ardverk.collection.KeyAnalyzer;
+import org.ardverk.collection.LongKeyAnalyzer;
+import org.ardverk.collection.StringKeyAnalyzer;
+
+// now there is only a support for primitive types (those that have PrimitiveObjectInspector defined
+// for them. If not primitive type inserted, a class cast exception will be thrown.
+public class ListKeyWrapperAnalyzer implements KeyAnalyzer {
+
+ final StringObjectInspector soi_new, soi_copy;
+ final StringKeyAnalyzer stringKeyAnalyzer;
+ final IntegerKeyAnalyzer integerKeyAnalyzer;
+ final ByteKeyAnalyzer byteKeyAnalyzer;
+ final LongKeyAnalyzer longKeyAnalyzer;
+
+ public ListKeyWrapperAnalyzer(KeyWrapperFactory keyWrapperFactory) throws HiveException {
+ soi_new = keyWrapperFactory.getSoi_new();
+ soi_copy = keyWrapperFactory.getSoi_copy();
+ integerKeyAnalyzer = new IntegerKeyAnalyzer();
+ byteKeyAnalyzer = new ByteKeyAnalyzer();
+ longKeyAnalyzer = new LongKeyAnalyzer();
+ @SuppressWarnings("rawtypes")
+ Class[] clazz = {int.class};
+ try {
+ stringKeyAnalyzer = PrivateInstantiator.getInstance(StringKeyAnalyzer.class, clazz,
+ Character.SIZE);
+ } catch (SecurityException e) {
+ throw new HiveException(e);
+ } catch (IllegalArgumentException e) {
+ throw new HiveException(e);
+ } catch (NoSuchMethodException e) {
+ throw new HiveException(e);
+ } catch (InstantiationException e) {
+ throw new HiveException(e);
+ } catch (IllegalAccessException e) {
+ throw new HiveException(e);
+ } catch (InvocationTargetException e) {
+ throw new HiveException(e);
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private int compareKeys(Object key1, Object key2, FieldComparer fieldComparer1,
+ FieldComparer fieldComparer2) {
+ return ((Comparable) fieldComparer1.getMainObjectInspector()
+ .getPrimitiveJavaObject(key1)).compareTo(fieldComparer2.getMainObjectInspector()
+ .getPrimitiveJavaObject(key2));
+ }
+
+ private boolean areKeysEqual(Object key1, Object key2, FieldComparer fieldComparer1,
+ FieldComparer fieldComparer2) {
+ return (fieldComparer1.getMainObjectInspector()
+ .getPrimitiveJavaObject(key1)).equals(fieldComparer2.getMainObjectInspector()
+ .getPrimitiveJavaObject(key2));
+ }
+
+
+ @Override
+ public int compare(ListKeyWrapper o1, ListKeyWrapper o2) {
+ final Object[] keys1 = o1.getKeyArray();
+ final Object[] keys2 = o2.getKeyArray();
+
+ if (keys1.length != keys2.length) {
+ return keys1.length - keys2.length;
+ }
+
+ final FieldComparer[] fieldComparers1 = o1.getEqualComparer().getFieldComparers();
+ final FieldComparer[] fieldComparers2 = o2.getEqualComparer().getFieldComparers();
+
+ for (int i = 0; i < keys1.length; ++i) {
+ int compareResult = compareKeys(keys1[i], keys2[i], fieldComparers1[i], fieldComparers2[i]);
+ if (compareResult != 0) {
+ return compareResult;
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public int bitIndex(ListKeyWrapper o1, ListKeyWrapper o2) {
+ final Object[] keys1 = o1.getKeyArray();
+ final Object[] keys2 = o2.getKeyArray();
+ final FieldComparer[] fieldComparers1 = o1.getEqualComparer().getFieldComparers();
+ final FieldComparer[] fieldComparers2 = o2.getEqualComparer().getFieldComparers();
+
+
+ for (int i = 0; i < Math.min(keys1.length, keys2.length); ++i) {
+ if (!areKeysEqual(keys1[i], keys2[i], fieldComparers1[i], fieldComparers2[i])) {
+ switch (fieldComparers1[i].getCompareType()) {
+ case COMPARE_TEXT:
+ case COMPARE_STRING:
+ String s1 = (String) fieldComparers1[i].getMainObjectInspector().getPrimitiveJavaObject(
+ keys1[i]);
+ String s2 = (String) fieldComparers2[i].getMainObjectInspector().getPrimitiveJavaObject(
+ keys2[i]);
+ if (s1.length() != s2.length()) {
+ return Math.abs((s1.length() - s2.length()) * Character.SIZE);
+ }
+ return stringKeyAnalyzer.bitIndex((String) fieldComparers1[i].getMainObjectInspector()
+ .getPrimitiveJavaObject(keys1[i]), (String) fieldComparers2[i]
+ .getMainObjectInspector()
+ .getPrimitiveJavaObject(keys2[i]))
+ + o1.getKeyOffsets()[i];
+ case COMPARE_INT:
+ return integerKeyAnalyzer.bitIndex((Integer) fieldComparers1[i].getMainObjectInspector()
+ .getPrimitiveJavaObject(keys1[i]), (Integer) fieldComparers2[i]
+ .getMainObjectInspector()
+ .getPrimitiveJavaObject(keys2[i]))
+ + o1.getKeyOffsets()[i];
+ case COMPARE_LONG:
+ return longKeyAnalyzer.bitIndex((Long) fieldComparers1[i].getMainObjectInspector()
+ .getPrimitiveJavaObject(keys1[i]), (Long) fieldComparers2[i].getMainObjectInspector()
+ .getPrimitiveJavaObject(keys2[i])) + o1.getKeyOffsets()[i];
+ case COMPARE_BYTE:
+ return byteKeyAnalyzer.bitIndex((Byte) fieldComparers1[i].getMainObjectInspector()
+ .getPrimitiveJavaObject(keys1[i]), (Byte) fieldComparers2[i].getMainObjectInspector()
+ .getPrimitiveJavaObject(keys2[i])) + o1.getKeyOffsets()[i];
+ case COMPARE_BOOL:
+ return o1.getKeyOffsets()[i];
+ default:
+ // Not Supported!
+ throw new RuntimeException("Patricia Trie unsupported key type");
+ }
+ }
+ }
+ return KeyAnalyzer.EQUAL_BIT_KEY;
+ }
+
+ @Override
+ public boolean isBitSet(ListKeyWrapper o, int index) {
+ for (int i = 1; i < o.getKeyOffsets().length; ++i) {
+ if (index < o.getKeyOffsets()[i]) {
+ index -= o.getKeyOffsets()[i - 1];
+ final FieldComparer fieldComparer = o.getEqualComparer().getFieldComparers()[i - 1];
+ final Object key = o.getKeyArray()[i - 1];
+ switch (fieldComparer.getCompareType()) {
+ case COMPARE_TEXT:
+ case COMPARE_STRING:
+ return stringKeyAnalyzer.isBitSet(((String) fieldComparer.getMainObjectInspector()
+ .getPrimitiveJavaObject(key)), index);
+ case COMPARE_INT:
+ return integerKeyAnalyzer.isBitSet(((Integer) fieldComparer.getMainObjectInspector()
+ .getPrimitiveJavaObject(key)), index);
+ case COMPARE_LONG:
+ return longKeyAnalyzer.isBitSet(((Long) fieldComparer.getMainObjectInspector()
+ .getPrimitiveJavaObject(key)), index);
+ case COMPARE_BYTE:
+ return byteKeyAnalyzer.isBitSet(((Byte) fieldComparer.getMainObjectInspector()
+ .getPrimitiveJavaObject(key)), index);
+ case COMPARE_BOOL:
+ return (Boolean) fieldComparer.getMainObjectInspector().getPrimitiveJavaObject(key);
+ default:
+ // Not Supported!
+ throw new RuntimeException("Patricia Trie unsupported key type");
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isPrefix(ListKeyWrapper key, ListKeyWrapper prefix) {
+ if (key.getBitLength() < prefix.getBitLength()) {
+ return false;
+ }
+
+ final Object[] keys1 = key.getKeyArray();
+ final Object[] keys2 = prefix.getKeyArray();
+ final FieldComparer[] fieldComparers1 = key.getEqualComparer().getFieldComparers();
+ final FieldComparer[] fieldComparers2 = prefix.getEqualComparer().getFieldComparers();
+
+ for (int i = 0; i < prefix.getKeyArray().length; ++i) {
+ if (!areKeysEqual(keys1[i], keys2[i], fieldComparers1[i], fieldComparers2[i])) {
+ switch (fieldComparers1[i].getCompareType()) {
+ case COMPARE_TEXT:
+ case COMPARE_STRING:
+ return stringKeyAnalyzer.isPrefix((String) fieldComparers1[i].getMainObjectInspector()
+ .getPrimitiveJavaObject(keys1[i]), (String) fieldComparers2[i]
+ .getMainObjectInspector()
+ .getPrimitiveJavaObject(keys2[i]));
+ case COMPARE_INT:
+ return integerKeyAnalyzer.isPrefix((Integer) fieldComparers1[i].getMainObjectInspector()
+ .getPrimitiveJavaObject(keys1[i]), (Integer) fieldComparers2[i]
+ .getMainObjectInspector()
+ .getPrimitiveJavaObject(keys2[i]));
+ case COMPARE_LONG:
+ return longKeyAnalyzer.isPrefix((Long) fieldComparers1[i].getMainObjectInspector()
+ .getPrimitiveJavaObject(keys1[i]), (Long) fieldComparers2[i].getMainObjectInspector()
+ .getPrimitiveJavaObject(keys2[i]));
+ case COMPARE_BYTE:
+ return byteKeyAnalyzer.isPrefix((Byte) fieldComparers1[i].getMainObjectInspector()
+ .getPrimitiveJavaObject(keys1[i]), (Byte) fieldComparers2[i].getMainObjectInspector()
+ .getPrimitiveJavaObject(keys2[i]));
+ case COMPARE_BOOL:
+ return fieldComparers1[i].getMainObjectInspector().getPrimitiveJavaObject(
+ keys1[i]).equals(fieldComparers2[i].getMainObjectInspector()
+ .getPrimitiveJavaObject(keys2[i]));
+ default:
+ // Not Supported!
+ throw new RuntimeException("Patricia Trie unsupported key type");
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public int lengthInBits(ListKeyWrapper o) {
+ return o.getBitLength();
+ }
+}
Index: ql/build.xml
===================================================================
--- ql/build.xml (revision 1202523)
+++ ql/build.xml (working copy)
@@ -216,6 +216,18 @@
+
+
+
+
+
+
+
+
+
+
+
+
@@ -226,6 +238,8 @@
+
+