Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1202523) +++ conf/hive-default.xml (working copy) @@ -281,6 +281,12 @@ + hive.external.map.groupby.impl + org.apache.hadoop.hive.ql.util.ExternalJavaHashMap + Name of the class that implements rg.apache.hadoop.hive.ql.util.ExternalMap interface. Used to speed up GroupByOperator + + + hive.default.fileformat TextFile Default file format for CREATE TABLE statement. Options are TextFile and SequenceFile. Users can explicitly say CREATE TABLE ... STORED AS <TEXTFILE|SEQUENCEFILE> to override Index: lib/hppc-0.4.1.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: lib/hppc-0.4.1.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: lib/patricia-trie-0.6.jar =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/octet-stream Property changes on: lib/patricia-trie-0.6.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Index: build-common.xml =================================================================== --- build-common.xml (revision 1202523) +++ build-common.xml (working copy) @@ -352,7 +352,7 @@ errorProperty="tests.failed" failureProperty="tests.failed" filtertrace="off"> - + Index: serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ListObjectsEqualComparer.java =================================================================== --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ListObjectsEqualComparer.java (revision 1202523) +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ListObjectsEqualComparer.java (working copy) @@ -39,7 +39,7 @@ * */ public class ListObjectsEqualComparer { - enum CompareType { + public enum CompareType { // Now only string, text, int, long, byte and boolean comparisons // are treated as special cases. // For other types, we reuse ObjectInspectorUtils.compare() @@ -47,7 +47,7 @@ COMPARE_BOOL, OTHER } - class FieldComparer { + public class FieldComparer { protected ObjectInspector oi0, oi1; protected ObjectInspector compareOI; protected CompareType compareType; @@ -128,6 +128,18 @@ o0, oi0, o1, oi1) == 0); } } + + public CompareType getCompareType(){ + return compareType; + } + + //can throw class cast exception, assumes the ones with 1 are the proper ones + public PrimitiveObjectInspector getMainObjectInspector(){ + return (PrimitiveObjectInspector) oi1; + } + public StringObjectInspector getMainStringObjectInspector(){ + return soi1; + } } FieldComparer[] fieldComparers; @@ -158,9 +170,9 @@ if (ol0.length != ol1.length) { return false; } + assert (ol0.length <= numFields); - assert (ol1.length <= numFields); - for (int i = 0; i < Math.min(ol0.length, ol1.length); i++) { + for (int i = 0; i < ol0.length; i++) { if (!fieldComparers[i].areEqual(ol0[i], ol1[i])) { return false; } @@ -175,4 +187,12 @@ } return true; } + + public FieldComparer[] getFieldComparers(){ + return fieldComparers; + } + + public int getNumFields(){ + return numFields; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (revision 1202523) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (working copy) @@ -22,11 +22,11 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,6 +34,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.OpParseContext; @@ -44,17 +45,18 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.util.ExternalMap; import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; import org.apache.hadoop.hive.serde2.lazy.objectinspector.primitive.LazyStringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.UnionObject; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -71,6 +73,7 @@ private static final long serialVersionUID = 1L; private static final int NUMROWSESTIMATESIZE = 1000; + private static final int HASH_AGGR_INITIAL_SIZE = 2048; protected transient ExprNodeEvaluator[] keyFields; protected transient ObjectInspector[] keyObjectInspectors; @@ -108,8 +111,9 @@ protected transient Object[][] aggregationsParametersLastInvoke; // Used by hash-based GroupBy: Mode = HASH, PARTIALS - protected transient HashMap hashAggregations; + protected transient ExternalMap hashAggregations; + // Used by hash distinct aggregations when hashGrpKeyNotRedKey is true protected transient HashSet keysCurrentGroup; @@ -177,6 +181,31 @@ transient int countAfterReport; transient int heartbeatInterval; + @SuppressWarnings("unchecked") + private ExternalMap getExternalMap(Configuration hconf) + throws HiveException { + try { + return (ExternalMap) Class.forName( + hconf.get("hive.external.map.groupby.impl"), true, JavaUtils.getClassLoader()) + .getDeclaredConstructor(int.class, Object.class) + .newInstance(HASH_AGGR_INITIAL_SIZE, keyWrapperFactory); + } catch (InstantiationException e) { + throw new HiveException(e); + } catch (IllegalAccessException e) { + throw new HiveException(e); + } catch (ClassNotFoundException e) { + throw new HiveException(e); + } catch (IllegalArgumentException e) { + throw new HiveException(e); + } catch (SecurityException e) { + throw new HiveException(e); + } catch (InvocationTargetException e) { + throw new HiveException(e); + } catch (NoSuchMethodException e) { + throw new HiveException(e); + } + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { totalMemory = Runtime.getRuntime().totalMemory(); @@ -328,7 +357,6 @@ aggregations = newAggregations(); hashAggr = false; } else { - hashAggregations = new HashMap(256); aggregations = newAggregations(); hashAggr = true; keyPositionsSize = new ArrayList(); @@ -371,6 +399,10 @@ newKeys = keyWrapperFactory.getKeyWrapper(); + if (!(conf.getMode() != GroupByDesc.Mode.HASH || bucketGroup)){ + hashAggregations = getExternalMap(hconf); //sometimes needs keyWrapperFactory to be ready + } + firstRow = true; // estimate the number of hash table entries based on the size of each // entry. Since the size of a entry @@ -838,7 +870,7 @@ // The fixed size for the aggregation class is already known. Get the // variable portion of the size every NUMROWSESTIMATESIZE rows. if ((numEntriesHashTable == 0) || ((numEntries % NUMROWSESTIMATESIZE) == 0)) { - //check how much memory left memory + //check how much memory is left usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); rate = (float) usedMemory / (float) maxMemory; if(rate > memoryThreshold){ @@ -904,33 +936,15 @@ // changed in the future if (complete) { - Iterator> iter = hashAggregations - .entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry m = iter.next(); - forward(m.getKey().getKeyArray(), m.getValue()); - } - hashAggregations.clear(); + hashAggregations.removeAndForwardAll(this); hashAggregations = null; LOG.warn("Hash Table completed flushed"); return; } - int oldSize = hashAggregations.size(); - LOG.warn("Hash Tbl flush: #hash table = " + oldSize); - Iterator> iter = hashAggregations - .entrySet().iterator(); - int numDel = 0; - while (iter.hasNext()) { - Map.Entry m = iter.next(); - forward(m.getKey().getKeyArray(), m.getValue()); - iter.remove(); - numDel++; - if (numDel * 10 >= oldSize) { - LOG.warn("Hash Table flushed: new size = " + hashAggregations.size()); - return; - } - } + LOG.warn("Hash Tbl flush: #hash table = " + hashAggregations.size()); + hashAggregations.removeAndForwardSome(0.1f, this); + LOG.warn("Hash Table flushed: new size = " + hashAggregations.size()); } transient Object[] forwardCache; @@ -942,9 +956,9 @@ * The keys in the record * @throws HiveException */ - protected void forward(Object[] keys, AggregationBuffer[] aggs) + protected void forward(Object[] keys, AggregationBuffer[] aggs) throws HiveException { - int totalFields = keys.length+ aggs.length; + int totalFields = keys.length + aggs.length; if (forwardCache == null) { forwardCache = new Object[totalFields]; } @@ -959,6 +973,11 @@ forward(forwardCache, outputObjInspector); } + public void forward(KeyWrapper keyWrapper, AggregationBuffer[] aggregationBuffers) + throws HiveException { + forward(keyWrapper.getKeyArray(), aggregationBuffers); + } + /** * We need to forward all the aggregations to children. * @@ -995,17 +1014,8 @@ forward(new Object[0], aggregations); } else { if (hashAggregations != null) { - LOG.warn("Begin Hash Table flush at close: size = " - + hashAggregations.size()); - Iterator iter = hashAggregations.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry m = (Map.Entry) iter - .next(); - - forward(m.getKey().getKeyArray(), m.getValue()); - iter.remove(); - } - hashAggregations.clear(); + LOG.warn("Begin Hash Table flush at close: size = " + hashAggregations.size()); + hashAggregations.removeAndForwardAll(this); } else if (aggregations != null) { // sort-based aggregations if (currentKeys != null) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java (revision 1202523) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapperFactory.java (working copy) @@ -55,6 +55,16 @@ } } + public Class getKeyWrappersClass(){ + if (keyFields.length == 1 + && TypeInfoUtils.getTypeInfoFromObjectInspector(keyObjectInspectors[0]).equals( + TypeInfoFactory.stringTypeInfo)) { + return TextKeyWrapper.class; + } else { + return ListKeyWrapper.class; + } + } + transient ExprNodeEvaluator[] keyFields; transient ObjectInspector[] keyObjectInspectors; transient ObjectInspector[] currentKeyObjectInspectors; @@ -63,9 +73,11 @@ transient ListObjectsEqualComparer currentStructEqualComparer; transient ListObjectsEqualComparer newKeyStructEqualComparer; - class ListKeyWrapper extends KeyWrapper { + public class ListKeyWrapper extends KeyWrapper { int hashcode; Object[] keys; + int[] keyOffsets; //number of 'bits' at which subsequent keys start and end. + // decide whether this is already in hashmap (keys in hashmap are deepcopied // version, and we need to use 'currentKeyObjectInspector'). ListObjectsEqualComparer equalComparer; @@ -151,12 +163,71 @@ keyObjectInspectors[i], copyOption); } } + + public ListObjectsEqualComparer getEqualComparer(){ + return equalComparer; + } + + public int[] getKeyOffsets() { + if (keyOffsets == null) { + CountKeyOffsets(equalComparer); + } + return keyOffsets; + } + + public int getBitLength() { + if (keyOffsets == null) { + CountKeyOffsets(equalComparer); + } + return keyOffsets[keyOffsets.length - 1]; + } + + // Currently only supports String, int, long, byte and bool + // Only used by Patricia-Trie implementation + // Assumes keys don't change after insertion into the Trie and are not null + private void CountKeyOffsets(ListObjectsEqualComparer currentStructEqualComparer) { + assert (keys.length <= currentStructEqualComparer.getFieldComparers().length); + keyOffsets = new int[keys.length + 1]; + for (int i = 0; i < keys.length; ++i) { + switch (currentStructEqualComparer.getFieldComparers()[i].getCompareType()) { + case COMPARE_TEXT: + case COMPARE_STRING: + keyOffsets[i + 1] = keyOffsets[i] + + currentStructEqualComparer.getFieldComparers()[i].getMainStringObjectInspector() + .getPrimitiveJavaObject(keys[i]).length() * Character.SIZE; + break; + case COMPARE_INT: + keyOffsets[i + 1] = keyOffsets[i] + Integer.SIZE; + break; + case COMPARE_LONG: + keyOffsets[i + 1] = keyOffsets[i] + Long.SIZE; + break; + case COMPARE_BYTE: + keyOffsets[i + 1] = keyOffsets[i] + Byte.SIZE; + break; + case COMPARE_BOOL: + keyOffsets[i + 1] = keyOffsets[i] + 1; + break; + default: + throw new RuntimeException("Patricie Trie key type currently not supported: " + + currentStructEqualComparer.getFieldComparers()[i].getCompareType()); + } + } + } } transient Object[] singleEleArray = new Object[1]; transient StringObjectInspector soi_new, soi_copy; - class TextKeyWrapper extends KeyWrapper { + public StringObjectInspector getSoi_new(){ + return soi_new; + } + + public StringObjectInspector getSoi_copy(){ + return soi_copy; + } + + public class TextKeyWrapper extends KeyWrapper { int hashcode; Object key; boolean isCopy; @@ -234,5 +305,12 @@ singleEleArray[0] = key; return singleEleArray; } + + public boolean isCopy(){ + return isCopy; + } + public Object getKey(){ + return key; + } } } Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalHPPCObjectObjectOpenHashMap.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalHPPCObjectObjectOpenHashMap.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalHPPCObjectObjectOpenHashMap.java (revision 0) @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.KeyWrapper; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; + +public class ExternalHPPCObjectObjectOpenHashMap implements ExternalMap{ + + final ObjectObjectExpandedOpenHashMap innerHashMap; + + public ExternalHPPCObjectObjectOpenHashMap(int initialCapacity){ + innerHashMap = new ObjectObjectExpandedOpenHashMap(initialCapacity); + } + + public ExternalHPPCObjectObjectOpenHashMap(int initialCapacity, float loadFactor){ + innerHashMap = new ObjectObjectExpandedOpenHashMap(initialCapacity, loadFactor); + } + + @Override + public int size() { + return innerHashMap.size(); + } + + @Override + public void clear() { + innerHashMap.clear(); + } + + @Override + public V get(K key) { + return innerHashMap.get(key); + } + + @Override + public V put(K key, V value) { + return innerHashMap.put(key, value); + } + + @Override + public void removeAndForwardSome(float part, GroupByOperator groupByOperator) throws HiveException { + final ObjectObjectExpandedOpenHashMap groupByMap = getGroupByMap(); + final Object[] keys = groupByMap.keys; + final Object[] values = groupByMap.values; + final boolean[] states = groupByMap.allocated; + final int toLeave = Math.round(groupByMap.assigned * (1-part)); + + while(true){ + for (int i = 0; i < states.length; ++i){ + if(states[i]){ + groupByOperator.forward((KeyWrapper) keys[i], (AggregationBuffer[]) values[i]); + groupByMap.remove(i); + if (toLeave > groupByMap.assigned) { + return; + } + } + } + } + } + + @Override + public void removeAndForwardAll(GroupByOperator groupByOperator) throws HiveException{ + final ObjectObjectExpandedOpenHashMap groupByMap = getGroupByMap(); + final Object[] keys = groupByMap.keys; + final Object[] values = groupByMap.values; + final boolean[] states = groupByMap.allocated; + + for (int i = 0; i < states.length; ++i){ + if(states[i]){ + groupByOperator.forward((KeyWrapper) keys[i], (AggregationBuffer[]) values[i]); + } + } + groupByMap.clear(); + } + + @SuppressWarnings("unchecked") + private ObjectObjectExpandedOpenHashMap getGroupByMap() throws HiveException{ + try{ + return (ObjectObjectExpandedOpenHashMap) innerHashMap; + } catch (ClassCastException e){ + throw new HiveException(e); + } + } +} \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalMap.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalMap.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalMap.java (revision 0) @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +public interface ExternalMap { + + public int size(); + public void clear(); + public V get(K key); + public V put(K key, V Value); + public void removeAndForwardSome(float part, GroupByOperator groupByOperator) throws HiveException; + public void removeAndForwardAll(GroupByOperator groupByOperator) throws HiveException; +} Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaMap.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaMap.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaMap.java (revision 0) @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.KeyWrapper; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; + +public abstract class ExternalJavaMap implements ExternalMap { + + protected Map innerMap; + + @Override + public int size() { + return innerMap.size(); + } + + @Override + public void clear() { + innerMap.clear(); + } + + @Override + public V get(K key) { + return innerMap.get(key); + } + + @Override + public V put(K key, V value) { + return innerMap.put(key, value); + } + + @Override + public void removeAndForwardSome(float part, GroupByOperator groupByOperator) throws HiveException { + final Map groupByMap = getGroupByMap(); + final Iterator> iter = groupByMap.entrySet().iterator(); + final int toLeave = Math.round(groupByMap.size() * (1-part)); + + while (iter.hasNext() && toLeave < groupByMap.size()) { + Map.Entry entry = iter.next(); + groupByOperator.forward(entry.getKey(), entry.getValue()); + iter.remove(); + } + } + + @Override + public void removeAndForwardAll(GroupByOperator groupByOperator) throws HiveException{ + final Map groupByMap = getGroupByMap(); + + for (Map.Entry entry : groupByMap.entrySet()){ + groupByOperator.forward(entry.getKey(), entry.getValue()); + } + groupByMap.clear(); + } + + @SuppressWarnings("unchecked") + private Map getGroupByMap() throws HiveException{ + try{ + return (Map) innerMap; + } catch (ClassCastException e){ + throw new HiveException(e); + } + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalPatriciaTrie.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalPatriciaTrie.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalPatriciaTrie.java (revision 0) @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory; +import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory.ListKeyWrapper; +import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory.TextKeyWrapper; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.ardverk.collection.KeyAnalyzer; +import org.ardverk.collection.PatriciaTrie; + +public class ExternalPatriciaTrie extends ExternalJavaMap { + + @SuppressWarnings({"rawtypes", "unchecked"}) + public ExternalPatriciaTrie(int ignored, Object inputKeyWrapperFactory) throws HiveException { + KeyWrapperFactory keyWraperFactory = (KeyWrapperFactory) inputKeyWrapperFactory; + + if (keyWraperFactory.getKeyWrappersClass().equals(TextKeyWrapper.class)) { + KeyAnalyzer keyAnalyzer = new TextKeyWrapperAnalyzer(keyWraperFactory); + innerMap = new PatriciaTrie(keyAnalyzer); + } else { + // assume ListKeyWrapper + KeyAnalyzer keyAnalyzer = new ListKeyWrapperAnalyzer(keyWraperFactory); + innerMap = new PatriciaTrie(keyAnalyzer); + } + } + + public ExternalPatriciaTrie(int ignored1, float ignored2, Object inputKeyWrapperFactory) + throws HiveException { + this(ignored1, inputKeyWrapperFactory); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/util/TextKeyWrapperAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/util/TextKeyWrapperAnalyzer.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/util/TextKeyWrapperAnalyzer.java (revision 0) @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.ql.util; + +import java.lang.reflect.InvocationTargetException; + +import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory; +import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory.TextKeyWrapper; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.ardverk.collection.KeyAnalyzer; +import org.ardverk.collection.StringKeyAnalyzer; + +public class TextKeyWrapperAnalyzer implements KeyAnalyzer{ + + final StringObjectInspector soi_new, soi_copy; + final StringKeyAnalyzer stringKeyAnalyzer; + + public TextKeyWrapperAnalyzer(KeyWrapperFactory keyWrapperFactory) throws HiveException{ + soi_new = keyWrapperFactory.getSoi_new(); + soi_copy = keyWrapperFactory.getSoi_copy(); + Class[] clazz = {int.class}; + try { + stringKeyAnalyzer = PrivateInstantiator.getInstance(StringKeyAnalyzer.class, clazz, Character.SIZE); + } catch (SecurityException e) { + throw new HiveException(e); + } catch (IllegalArgumentException e) { + throw new HiveException(e); + } catch (NoSuchMethodException e) { + throw new HiveException(e); + } catch (InstantiationException e) { + throw new HiveException(e); + } catch (IllegalAccessException e) { + throw new HiveException(e); + } catch (InvocationTargetException e) { + throw new HiveException(e); + } + } + + private String getString(TextKeyWrapper o){ + if (o.isCopy()){ + return soi_copy.getPrimitiveJavaObject(o.getKey()); + } else { + return soi_new.getPrimitiveJavaObject(o.getKey()); + } + } + + @Override + public int compare(TextKeyWrapper o1, TextKeyWrapper o2) { + return stringKeyAnalyzer.compare(getString(o1), getString(o2)); + } + + @Override + public int bitIndex(TextKeyWrapper o1, TextKeyWrapper o2) { + return stringKeyAnalyzer.bitIndex(getString(o1), getString(o2)); + } + + @Override + public boolean isBitSet(TextKeyWrapper o, int bitIndex) { + return stringKeyAnalyzer.isBitSet(getString(o), bitIndex); + } + + @Override + public boolean isPrefix(TextKeyWrapper o1, TextKeyWrapper o2) { + return stringKeyAnalyzer.isPrefix(getString(o1), getString(o2)); + } + + @Override + public int lengthInBits(TextKeyWrapper o) { + return stringKeyAnalyzer.lengthInBits(getString(o)); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaHashMap.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaHashMap.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/util/ExternalJavaHashMap.java (revision 0) @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import java.util.HashMap; + +public class ExternalJavaHashMap extends ExternalJavaMap{ + + public ExternalJavaHashMap(int initialCapacity, Object ignored){ + innerMap = new HashMap(initialCapacity); + } + + public ExternalJavaHashMap(int initialCapacity, float loadFactor, Object ignored){ + innerMap = new HashMap(initialCapacity, loadFactor); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/util/ObjectObjectExpandedOpenHashMap.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/util/ObjectObjectExpandedOpenHashMap.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/util/ObjectObjectExpandedOpenHashMap.java (revision 0) @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import com.carrotsearch.hppc.ObjectObjectOpenHashMap; + +public class ObjectObjectExpandedOpenHashMap extends ObjectObjectOpenHashMap{ + + public ObjectObjectExpandedOpenHashMap (int initialCapacity){ + super(initialCapacity); + } + + public ObjectObjectExpandedOpenHashMap (int initialCapacity, float loadFactor){ + super(initialCapacity, loadFactor); + } + + public void remove(int slot){ + --assigned; + shiftConflictingKeys(slot); + } +} \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/util/PrivateInstantiator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/util/PrivateInstantiator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/util/PrivateInstantiator.java (revision 0) @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.ql.util; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +public class PrivateInstantiator { + + private PrivateInstantiator() { + }; // oh the irony + + public static T getInstance(Class clazz, + @SuppressWarnings("rawtypes") Class[] constructorArgumentsClasses, + Object... constructorArguments) + throws SecurityException, NoSuchMethodException, IllegalArgumentException, + InstantiationException, IllegalAccessException, InvocationTargetException { + + Constructor cons = clazz.getDeclaredConstructor(constructorArgumentsClasses); + cons.setAccessible(true); + return cons.newInstance(constructorArguments); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/util/ListKeyWrapperAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/util/ListKeyWrapperAnalyzer.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/util/ListKeyWrapperAnalyzer.java (revision 0) @@ -0,0 +1,221 @@ +package org.apache.hadoop.hive.ql.util; + +import java.lang.reflect.InvocationTargetException; + +import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory; +import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory.ListKeyWrapper; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectsEqualComparer.FieldComparer; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.ardverk.collection.ByteKeyAnalyzer; +import org.ardverk.collection.IntegerKeyAnalyzer; +import org.ardverk.collection.KeyAnalyzer; +import org.ardverk.collection.LongKeyAnalyzer; +import org.ardverk.collection.StringKeyAnalyzer; + +// now there is only a support for primitive types (those that have PrimitiveObjectInspector defined +// for them. If not primitive type inserted, a class cast exception will be thrown. +public class ListKeyWrapperAnalyzer implements KeyAnalyzer { + + final StringObjectInspector soi_new, soi_copy; + final StringKeyAnalyzer stringKeyAnalyzer; + final IntegerKeyAnalyzer integerKeyAnalyzer; + final ByteKeyAnalyzer byteKeyAnalyzer; + final LongKeyAnalyzer longKeyAnalyzer; + + public ListKeyWrapperAnalyzer(KeyWrapperFactory keyWrapperFactory) throws HiveException { + soi_new = keyWrapperFactory.getSoi_new(); + soi_copy = keyWrapperFactory.getSoi_copy(); + integerKeyAnalyzer = new IntegerKeyAnalyzer(); + byteKeyAnalyzer = new ByteKeyAnalyzer(); + longKeyAnalyzer = new LongKeyAnalyzer(); + @SuppressWarnings("rawtypes") + Class[] clazz = {int.class}; + try { + stringKeyAnalyzer = PrivateInstantiator.getInstance(StringKeyAnalyzer.class, clazz, + Character.SIZE); + } catch (SecurityException e) { + throw new HiveException(e); + } catch (IllegalArgumentException e) { + throw new HiveException(e); + } catch (NoSuchMethodException e) { + throw new HiveException(e); + } catch (InstantiationException e) { + throw new HiveException(e); + } catch (IllegalAccessException e) { + throw new HiveException(e); + } catch (InvocationTargetException e) { + throw new HiveException(e); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private int compareKeys(Object key1, Object key2, FieldComparer fieldComparer1, + FieldComparer fieldComparer2) { + return ((Comparable) fieldComparer1.getMainObjectInspector() + .getPrimitiveJavaObject(key1)).compareTo(fieldComparer2.getMainObjectInspector() + .getPrimitiveJavaObject(key2)); + } + + private boolean areKeysEqual(Object key1, Object key2, FieldComparer fieldComparer1, + FieldComparer fieldComparer2) { + return (fieldComparer1.getMainObjectInspector() + .getPrimitiveJavaObject(key1)).equals(fieldComparer2.getMainObjectInspector() + .getPrimitiveJavaObject(key2)); + } + + + @Override + public int compare(ListKeyWrapper o1, ListKeyWrapper o2) { + final Object[] keys1 = o1.getKeyArray(); + final Object[] keys2 = o2.getKeyArray(); + + if (keys1.length != keys2.length) { + return keys1.length - keys2.length; + } + + final FieldComparer[] fieldComparers1 = o1.getEqualComparer().getFieldComparers(); + final FieldComparer[] fieldComparers2 = o2.getEqualComparer().getFieldComparers(); + + for (int i = 0; i < keys1.length; ++i) { + int compareResult = compareKeys(keys1[i], keys2[i], fieldComparers1[i], fieldComparers2[i]); + if (compareResult != 0) { + return compareResult; + } + } + return 0; + } + + @Override + public int bitIndex(ListKeyWrapper o1, ListKeyWrapper o2) { + final Object[] keys1 = o1.getKeyArray(); + final Object[] keys2 = o2.getKeyArray(); + final FieldComparer[] fieldComparers1 = o1.getEqualComparer().getFieldComparers(); + final FieldComparer[] fieldComparers2 = o2.getEqualComparer().getFieldComparers(); + + + for (int i = 0; i < Math.min(keys1.length, keys2.length); ++i) { + if (!areKeysEqual(keys1[i], keys2[i], fieldComparers1[i], fieldComparers2[i])) { + switch (fieldComparers1[i].getCompareType()) { + case COMPARE_TEXT: + case COMPARE_STRING: + String s1 = (String) fieldComparers1[i].getMainObjectInspector().getPrimitiveJavaObject( + keys1[i]); + String s2 = (String) fieldComparers2[i].getMainObjectInspector().getPrimitiveJavaObject( + keys2[i]); + if (s1.length() != s2.length()) { + return Math.abs((s1.length() - s2.length()) * Character.SIZE); + } + return stringKeyAnalyzer.bitIndex((String) fieldComparers1[i].getMainObjectInspector() + .getPrimitiveJavaObject(keys1[i]), (String) fieldComparers2[i] + .getMainObjectInspector() + .getPrimitiveJavaObject(keys2[i])) + + o1.getKeyOffsets()[i]; + case COMPARE_INT: + return integerKeyAnalyzer.bitIndex((Integer) fieldComparers1[i].getMainObjectInspector() + .getPrimitiveJavaObject(keys1[i]), (Integer) fieldComparers2[i] + .getMainObjectInspector() + .getPrimitiveJavaObject(keys2[i])) + + o1.getKeyOffsets()[i]; + case COMPARE_LONG: + return longKeyAnalyzer.bitIndex((Long) fieldComparers1[i].getMainObjectInspector() + .getPrimitiveJavaObject(keys1[i]), (Long) fieldComparers2[i].getMainObjectInspector() + .getPrimitiveJavaObject(keys2[i])) + o1.getKeyOffsets()[i]; + case COMPARE_BYTE: + return byteKeyAnalyzer.bitIndex((Byte) fieldComparers1[i].getMainObjectInspector() + .getPrimitiveJavaObject(keys1[i]), (Byte) fieldComparers2[i].getMainObjectInspector() + .getPrimitiveJavaObject(keys2[i])) + o1.getKeyOffsets()[i]; + case COMPARE_BOOL: + return o1.getKeyOffsets()[i]; + default: + // Not Supported! + throw new RuntimeException("Patricia Trie unsupported key type"); + } + } + } + return KeyAnalyzer.EQUAL_BIT_KEY; + } + + @Override + public boolean isBitSet(ListKeyWrapper o, int index) { + for (int i = 1; i < o.getKeyOffsets().length; ++i) { + if (index < o.getKeyOffsets()[i]) { + index -= o.getKeyOffsets()[i - 1]; + final FieldComparer fieldComparer = o.getEqualComparer().getFieldComparers()[i - 1]; + final Object key = o.getKeyArray()[i - 1]; + switch (fieldComparer.getCompareType()) { + case COMPARE_TEXT: + case COMPARE_STRING: + return stringKeyAnalyzer.isBitSet(((String) fieldComparer.getMainObjectInspector() + .getPrimitiveJavaObject(key)), index); + case COMPARE_INT: + return integerKeyAnalyzer.isBitSet(((Integer) fieldComparer.getMainObjectInspector() + .getPrimitiveJavaObject(key)), index); + case COMPARE_LONG: + return longKeyAnalyzer.isBitSet(((Long) fieldComparer.getMainObjectInspector() + .getPrimitiveJavaObject(key)), index); + case COMPARE_BYTE: + return byteKeyAnalyzer.isBitSet(((Byte) fieldComparer.getMainObjectInspector() + .getPrimitiveJavaObject(key)), index); + case COMPARE_BOOL: + return (Boolean) fieldComparer.getMainObjectInspector().getPrimitiveJavaObject(key); + default: + // Not Supported! + throw new RuntimeException("Patricia Trie unsupported key type"); + } + } + } + return false; + } + + @Override + public boolean isPrefix(ListKeyWrapper key, ListKeyWrapper prefix) { + if (key.getBitLength() < prefix.getBitLength()) { + return false; + } + + final Object[] keys1 = key.getKeyArray(); + final Object[] keys2 = prefix.getKeyArray(); + final FieldComparer[] fieldComparers1 = key.getEqualComparer().getFieldComparers(); + final FieldComparer[] fieldComparers2 = prefix.getEqualComparer().getFieldComparers(); + + for (int i = 0; i < prefix.getKeyArray().length; ++i) { + if (!areKeysEqual(keys1[i], keys2[i], fieldComparers1[i], fieldComparers2[i])) { + switch (fieldComparers1[i].getCompareType()) { + case COMPARE_TEXT: + case COMPARE_STRING: + return stringKeyAnalyzer.isPrefix((String) fieldComparers1[i].getMainObjectInspector() + .getPrimitiveJavaObject(keys1[i]), (String) fieldComparers2[i] + .getMainObjectInspector() + .getPrimitiveJavaObject(keys2[i])); + case COMPARE_INT: + return integerKeyAnalyzer.isPrefix((Integer) fieldComparers1[i].getMainObjectInspector() + .getPrimitiveJavaObject(keys1[i]), (Integer) fieldComparers2[i] + .getMainObjectInspector() + .getPrimitiveJavaObject(keys2[i])); + case COMPARE_LONG: + return longKeyAnalyzer.isPrefix((Long) fieldComparers1[i].getMainObjectInspector() + .getPrimitiveJavaObject(keys1[i]), (Long) fieldComparers2[i].getMainObjectInspector() + .getPrimitiveJavaObject(keys2[i])); + case COMPARE_BYTE: + return byteKeyAnalyzer.isPrefix((Byte) fieldComparers1[i].getMainObjectInspector() + .getPrimitiveJavaObject(keys1[i]), (Byte) fieldComparers2[i].getMainObjectInspector() + .getPrimitiveJavaObject(keys2[i])); + case COMPARE_BOOL: + return fieldComparers1[i].getMainObjectInspector().getPrimitiveJavaObject( + keys1[i]).equals(fieldComparers2[i].getMainObjectInspector() + .getPrimitiveJavaObject(keys2[i])); + default: + // Not Supported! + throw new RuntimeException("Patricia Trie unsupported key type"); + } + } + } + return true; + } + + @Override + public int lengthInBits(ListKeyWrapper o) { + return o.getBitLength(); + } +} Index: ql/build.xml =================================================================== --- ql/build.xml (revision 1202523) +++ ql/build.xml (working copy) @@ -216,6 +216,18 @@ + + + + + + + + + + + + @@ -226,6 +238,8 @@ + +