Index: contrib/src/test/results/clientpositive/udaf_example_min_n.q.out =================================================================== --- contrib/src/test/results/clientpositive/udaf_example_min_n.q.out (revision 0) +++ contrib/src/test/results/clientpositive/udaf_example_min_n.q.out (revision 0) @@ -0,0 +1,92 @@ +PREHOOK: query: CREATE TEMPORARY FUNCTION example_min_n AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMinN' +PREHOOK: type: CREATEFUNCTION +POSTHOOK: query: CREATE TEMPORARY FUNCTION example_min_n AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMinN' +POSTHOOK: type: CREATEFUNCTION +PREHOOK: query: EXPLAIN +SELECT example_min_n(substr(value,5),10), + example_min_n(IF(substr(value,5) < 250, NULL, substr(value,5)),10) +FROM src +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT example_min_n(substr(value,5),10), + example_min_n(IF(substr(value,5) < 250, NULL, substr(value,5)),10) +FROM src +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION example_min_n (TOK_FUNCTION substr (TOK_TABLE_OR_COL value) 5) 10)) (TOK_SELEXPR (TOK_FUNCTION example_min_n (TOK_FUNCTION IF (< (TOK_FUNCTION substr (TOK_TABLE_OR_COL value) 5) 250) TOK_NULL (TOK_FUNCTION substr (TOK_TABLE_OR_COL value) 5)) 10))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: value + type: string + outputColumnNames: value + Group By Operator + aggregations: + expr: example_min_n(substr(value, 5), 10) + expr: example_min_n(if((substr(value, 5) < 250), null, substr(value, 5)), 10) + bucketGroup: false + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: struct,n:int> + expr: _col1 + type: struct,n:int> + Reduce Operator Tree: + Group By Operator + aggregations: + expr: example_min_n(VALUE._col0) + expr: example_min_n(VALUE._col1) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: array + expr: _col1 + type: array + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT example_min_n(substr(value,5),10), + example_min_n(IF(substr(value,5) < 250, NULL, substr(value,5)),10) +FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/zshao/hadoop_hive_trunk2/build/contrib/scratchdir/hive_2010-02-03_21-19-38_782_886141785917668830/10000 +POSTHOOK: query: SELECT example_min_n(substr(value,5),10), + example_min_n(IF(substr(value,5) < 250, NULL, substr(value,5)),10) +FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/zshao/hadoop_hive_trunk2/build/contrib/scratchdir/hive_2010-02-03_21-19-38_782_886141785917668830/10000 +[0.0,0.0,0.0,2.0,4.0,5.0,5.0,5.0,8.0,9.0] [252.0,255.0,255.0,256.0,256.0,257.0,258.0,260.0,262.0,263.0] +PREHOOK: query: DROP TEMPORARY FUNCTION example_min_n +PREHOOK: type: DROPFUNCTION +POSTHOOK: query: DROP TEMPORARY FUNCTION example_min_n +POSTHOOK: type: DROPFUNCTION Index: contrib/src/test/results/clientpositive/udaf_example_max_n.q.out =================================================================== --- contrib/src/test/results/clientpositive/udaf_example_max_n.q.out (revision 0) +++ contrib/src/test/results/clientpositive/udaf_example_max_n.q.out (revision 0) @@ -0,0 +1,92 @@ +PREHOOK: query: CREATE TEMPORARY FUNCTION example_max_n AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMaxN' +PREHOOK: type: CREATEFUNCTION +POSTHOOK: query: CREATE TEMPORARY FUNCTION example_max_n AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMaxN' +POSTHOOK: type: CREATEFUNCTION +PREHOOK: query: EXPLAIN +SELECT example_max_n(substr(value,5),10), + example_max_n(IF(substr(value,5) > 250, NULL, substr(value,5)),10) +FROM src +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT example_max_n(substr(value,5),10), + example_max_n(IF(substr(value,5) > 250, NULL, substr(value,5)),10) +FROM src +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION example_max_n (TOK_FUNCTION substr (TOK_TABLE_OR_COL value) 5) 10)) (TOK_SELEXPR (TOK_FUNCTION example_max_n (TOK_FUNCTION IF (> (TOK_FUNCTION substr (TOK_TABLE_OR_COL value) 5) 250) TOK_NULL (TOK_FUNCTION substr (TOK_TABLE_OR_COL value) 5)) 10))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: value + type: string + outputColumnNames: value + Group By Operator + aggregations: + expr: example_max_n(substr(value, 5), 10) + expr: example_max_n(if((substr(value, 5) > 250), null, substr(value, 5)), 10) + bucketGroup: false + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: struct,n:int> + expr: _col1 + type: struct,n:int> + Reduce Operator Tree: + Group By Operator + aggregations: + expr: example_max_n(VALUE._col0) + expr: example_max_n(VALUE._col1) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: array + expr: _col1 + type: array + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT example_max_n(substr(value,5),10), + example_max_n(IF(substr(value,5) > 250, NULL, substr(value,5)),10) +FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/zshao/hadoop_hive_trunk2/build/contrib/scratchdir/hive_2010-02-03_21-18-51_159_789721602553735643/10000 +POSTHOOK: query: SELECT example_max_n(substr(value,5),10), + example_max_n(IF(substr(value,5) > 250, NULL, substr(value,5)),10) +FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/zshao/hadoop_hive_trunk2/build/contrib/scratchdir/hive_2010-02-03_21-18-51_159_789721602553735643/10000 +[498.0,498.0,498.0,497.0,496.0,495.0,494.0,493.0,492.0,492.0] [249.0,248.0,247.0,244.0,242.0,242.0,241.0,239.0,239.0,238.0] +PREHOOK: query: DROP TEMPORARY FUNCTION example_max_n +PREHOOK: type: DROPFUNCTION +POSTHOOK: query: DROP TEMPORARY FUNCTION example_max_n +POSTHOOK: type: DROPFUNCTION Index: contrib/src/test/queries/clientpositive/udaf_example_min_n.q =================================================================== --- contrib/src/test/queries/clientpositive/udaf_example_min_n.q (revision 0) +++ contrib/src/test/queries/clientpositive/udaf_example_min_n.q (revision 0) @@ -0,0 +1,13 @@ +add jar ../build/contrib/hive_contrib.jar; +CREATE TEMPORARY FUNCTION example_min_n AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMinN'; + +EXPLAIN +SELECT example_min_n(substr(value,5),10), + example_min_n(IF(substr(value,5) < 250, NULL, substr(value,5)),10) +FROM src; + +SELECT example_min_n(substr(value,5),10), + example_min_n(IF(substr(value,5) < 250, NULL, substr(value,5)),10) +FROM src; + +DROP TEMPORARY FUNCTION example_min_n; Index: contrib/src/test/queries/clientpositive/udaf_example_max_n.q =================================================================== --- contrib/src/test/queries/clientpositive/udaf_example_max_n.q (revision 0) +++ contrib/src/test/queries/clientpositive/udaf_example_max_n.q (revision 0) @@ -0,0 +1,14 @@ +add jar ../build/contrib/hive_contrib.jar; + +CREATE TEMPORARY FUNCTION example_max_n AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMaxN'; + +EXPLAIN +SELECT example_max_n(substr(value,5),10), + example_max_n(IF(substr(value,5) > 250, NULL, substr(value,5)),10) +FROM src; + +SELECT example_max_n(substr(value,5),10), + example_max_n(IF(substr(value,5) > 250, NULL, substr(value,5)),10) +FROM src; + +DROP TEMPORARY FUNCTION example_max_n; Index: contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleMaxN.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleMaxN.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleMaxN.java (revision 0) @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.contrib.udaf.example; + +import org.apache.hadoop.hive.ql.exec.UDAF; + +/** + * Returns the max N double values. + */ +public class UDAFExampleMaxN extends UDAF { + + /** + * The evaluator for getting max N double values. + */ + public static class UDAFMaxNEvaluator extends UDAFExampleMaxMinNUtil.Evaluator { + + @Override + protected boolean getAscending() { + return false; + } + } + +} + Property changes on: contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleMaxN.java ___________________________________________________________________ Added: svn:mergeinfo Index: contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleMaxMinNUtil.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleMaxMinNUtil.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleMaxMinNUtil.java (revision 0) @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.contrib.udaf.example; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; + +/** + * The utility class for UDAFMaxN and UDAFMinN. + */ +public final class UDAFExampleMaxMinNUtil { + + /** + * This class stores the information during an aggregation. + * + * Note that this class has to have a public constructor, so that Hive can + * serialize/deserialize this class using reflection. + */ + public static class State { + ArrayList a; // This ArrayList holds the max/min N + int n; // This is the N + } + + /** + * The base class of the UDAFEvaluator for UDAFMaxN and UDAFMinN. + * We just need to override the getAscending function to make it work. + */ + public abstract static class Evaluator implements UDAFEvaluator { + + private State state; + + public Evaluator() { + state = new State(); + init(); + } + + /** + * Reset the state. + */ + public void init() { + state.a = new ArrayList(); + state.n = 0; + } + + /** + * Returns true in UDAFMaxN, and false in UDAFMinN. + */ + protected abstract boolean getAscending(); + + /** + * Iterate through one row of original data. + * This function will update the internal max/min buffer if the internal buffer is not full, + * or the new row is larger/smaller than the current max/min n. + */ + public boolean iterate(Double o, int n) { + boolean ascending = getAscending(); + state.n = n; + if (o != null) { + boolean doInsert = state.a.size() < n; + if (!doInsert) { + Double last = state.a.get(state.a.size()-1); + if (ascending) { + doInsert = o < last; + } else { + doInsert = o > last; + } + } + if (doInsert) { + binaryInsert(state.a, o, ascending); + if (state.a.size() > n) { + state.a.remove(state.a.size()-1); + } + } + } + return true; + } + + /** + * Get partial aggregation results. + */ + public State terminatePartial() { + // This is SQL standard - max_n of zero items should be null. + return state.a.size() == 0 ? null : state; + } + + /** Two pointers are created to track the maximal elements in both o and MaxNArray. + * The smallest element is added into tempArrayList + * Consider the sizes of o and MaxNArray may be different. + */ + public boolean merge(State o) { + if (o != null) { + state.n = o.n; + state.a = sortedMerge(o.a, state.a, getAscending(), o.n); + } + return true; + } + + /** + * Terminates the max N lookup and return the final result. + */ + public ArrayList terminate() { + // This is SQL standard - return state.MaxNArray, or null if the size is zero. + return state.a.size() == 0 ? null : state.a; + } + } + + + /** + * Returns a comparator based on whether the order is ascending or not. + * Has a dummy parameter to make sure generics can infer the type correctly. + */ + static > Comparator getComparator(boolean ascending, T dummy) { + Comparator comp; + if (ascending) { + comp = new Comparator() { + public int compare(T o1, T o2) { + return o1.compareTo(o2); + } + }; + } else { + comp = new Comparator() { + public int compare(T o1, T o2) { + return o2.compareTo(o1); + } + }; + } + return comp; + } + + /** + * Insert an element into an ascending/descending array, and keep the order. + * @param ascending + * if true, the array is sorted in ascending order, + * otherwise it is in descending order. + * + */ + static > void binaryInsert(List list, T value, boolean ascending) { + + int position = Collections.binarySearch(list, value, getComparator(ascending, (T)null)); + if (position < 0) { + position = (-position) - 1; + } + list.add(position, value); + } + + /** + * Merge two ascending/descending array and keep the first n elements. + * @param ascending + * if true, the array is sorted in ascending order, + * otherwise it is in descending order. + */ + static > ArrayList sortedMerge(List a1, List a2, + boolean ascending, int n) { + + Comparator comparator = getComparator(ascending, (T)null); + + int n1 = a1.size(); + int n2 = a2.size(); + int p1 = 0; // The current element in a1 + int p2 = 0; // The current element in a2 + + ArrayList output = new ArrayList(n); + + while (output.size() < n && (p1 < n1 || p2 < n2)) { + if (p1 < n1) { + if (p2 == n2 || comparator.compare(a1.get(p1), a2.get(p2)) < 0) { + output.add(a1.get(p1++)); + } + } + if (output.size() == n) { + break; + } + if (p2 < n2) { + if (p1 == n1 || comparator.compare(a2.get(p2), a1.get(p1)) < 0) { + output.add(a2.get(p2++)); + } + } + } + + return output; + } + + // No instantiation. + private UDAFExampleMaxMinNUtil() { + } + +} + Property changes on: contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleMaxMinNUtil.java ___________________________________________________________________ Added: svn:mergeinfo Index: contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleMinN.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleMinN.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleMinN.java (revision 0) @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.contrib.udaf.example; + +import org.apache.hadoop.hive.ql.exec.UDAF; + +/** + * Returns the min N double values. + */ +public class UDAFExampleMinN extends UDAF{ + + /** + * The evaluator for getting min N double values. + */ + public static class UDAFMinNEvaluator extends UDAFExampleMaxMinNUtil.Evaluator { + + @Override + protected boolean getAscending() { + return true; + } + } + +} + Property changes on: contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleMinN.java ___________________________________________________________________ Added: svn:mergeinfo Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultUDAFEvaluatorResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultUDAFEvaluatorResolver.java (revision 906315) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultUDAFEvaluatorResolver.java (working copy) @@ -61,19 +61,20 @@ // Add all the public member classes that implement an evaluator for (Class enclClass : udafClass.getClasses()) { - for (Class iface : enclClass.getInterfaces()) { - if (iface == UDAFEvaluator.class) { - classList.add((Class) enclClass); - } + if (UDAFEvaluator.class.isAssignableFrom(enclClass)) { + classList.add((Class) enclClass); } } // Next we locate all the iterate methods for each of these classes. ArrayList mList = new ArrayList(); + ArrayList> cList = + new ArrayList>(); for (Class evaluator : classList) { for (Method m : evaluator.getMethods()) { if (m.getName().equalsIgnoreCase("iterate")) { mList.add(m); + cList.add(evaluator); } } } @@ -83,7 +84,22 @@ throw new AmbiguousMethodException(udafClass, argClasses); } - return (Class) m.getDeclaringClass(); + // Find the class that has this method. + // Note that Method.getDeclaringClass() may not work here because the method + // can be inherited from a base class. + int found = -1; + for (int i = 0; i < mList.size(); i++) { + if (mList.get(i) == m) { + if (found == -1) { + found = i; + } else { + throw new AmbiguousMethodException(udafClass, argClasses); + } + } + } + assert (found != -1); + + return cList.get(found); } } Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBridge.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBridge.java (revision 906315) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFBridge.java (working copy) @@ -154,6 +154,7 @@ @Override public AggregationBuffer getNewAggregationBuffer() { + System.err.println("udafEvaluator = " + udafEvaluator); return new UDAFAgg((UDAFEvaluator) ReflectionUtils.newInstance( udafEvaluator, null)); }