Index: contrib/src/test/results/clientpositive/udaf_example_min_n.q.out =================================================================== --- contrib/src/test/results/clientpositive/udaf_example_min_n.q.out (revision 0) +++ contrib/src/test/results/clientpositive/udaf_example_min_n.q.out (revision 0) @@ -0,0 +1,92 @@ +PREHOOK: query: CREATE TEMPORARY FUNCTION example_min_n AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFMinN' +PREHOOK: type: CREATEFUNCTION +POSTHOOK: query: CREATE TEMPORARY FUNCTION example_min_n AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFMinN' +POSTHOOK: type: CREATEFUNCTION +PREHOOK: query: EXPLAIN +SELECT example_min_n(substr(value,5),10), + example_min_n(IF(substr(value,5) < 250, NULL, substr(value,5)),10) +FROM src +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT example_min_n(substr(value,5),10), + example_min_n(IF(substr(value,5) < 250, NULL, substr(value,5)),10) +FROM src +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION example_min_n (TOK_FUNCTION substr (TOK_TABLE_OR_COL value) 5) 10)) (TOK_SELEXPR (TOK_FUNCTION example_min_n (TOK_FUNCTION IF (< (TOK_FUNCTION substr (TOK_TABLE_OR_COL value) 5) 250) TOK_NULL (TOK_FUNCTION substr (TOK_TABLE_OR_COL value) 5)) 10))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: value + type: string + outputColumnNames: value + Group By Operator + aggregations: + expr: example_min_n(substr(value, 5), 10) + expr: example_min_n(if((substr(value, 5) < 250), null, substr(value, 5)), 10) + bucketGroup: false + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: struct,n:int> + expr: _col1 + type: struct,n:int> + Reduce Operator Tree: + Group By Operator + aggregations: + expr: example_min_n(VALUE._col0) + expr: example_min_n(VALUE._col1) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: array + expr: _col1 + type: array + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT example_min_n(substr(value,5),10), + example_min_n(IF(substr(value,5) < 250, NULL, substr(value,5)),10) +FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/data/users/zshao/hadoop_hive_trunk2/build/contrib/scratchdir/hive_2010-02-03_17-07-42_200_4917023766072696699/10000 +POSTHOOK: query: SELECT example_min_n(substr(value,5),10), + example_min_n(IF(substr(value,5) < 250, NULL, substr(value,5)),10) +FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/data/users/zshao/hadoop_hive_trunk2/build/contrib/scratchdir/hive_2010-02-03_17-07-42_200_4917023766072696699/10000 +[0.0,0.0,0.0,2.0,4.0,5.0,5.0,5.0,8.0,9.0] [252.0,255.0,255.0,256.0,256.0,257.0,258.0,260.0,262.0,263.0] +PREHOOK: query: DROP TEMPORARY FUNCTION example_min_n +PREHOOK: type: DROPFUNCTION +POSTHOOK: query: DROP TEMPORARY FUNCTION example_min_n +POSTHOOK: type: DROPFUNCTION Index: contrib/src/test/results/clientpositive/udaf_example_max_n.q.out =================================================================== --- contrib/src/test/results/clientpositive/udaf_example_max_n.q.out (revision 0) +++ contrib/src/test/results/clientpositive/udaf_example_max_n.q.out (revision 0) @@ -0,0 +1,92 @@ +PREHOOK: query: CREATE TEMPORARY FUNCTION example_max_n AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFMaxN' +PREHOOK: type: CREATEFUNCTION +POSTHOOK: query: CREATE TEMPORARY FUNCTION example_max_n AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFMaxN' +POSTHOOK: type: CREATEFUNCTION +PREHOOK: query: EXPLAIN +SELECT example_max_n(substr(value,5),10), + example_max_n(IF(substr(value,5) > 250, NULL, substr(value,5)),10) +FROM src +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT example_max_n(substr(value,5),10), + example_max_n(IF(substr(value,5) > 250, NULL, substr(value,5)),10) +FROM src +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION example_max_n (TOK_FUNCTION substr (TOK_TABLE_OR_COL value) 5) 10)) (TOK_SELEXPR (TOK_FUNCTION example_max_n (TOK_FUNCTION IF (> (TOK_FUNCTION substr (TOK_TABLE_OR_COL value) 5) 250) TOK_NULL (TOK_FUNCTION substr (TOK_TABLE_OR_COL value) 5)) 10))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: value + type: string + outputColumnNames: value + Group By Operator + aggregations: + expr: example_max_n(substr(value, 5), 10) + expr: example_max_n(if((substr(value, 5) > 250), null, substr(value, 5)), 10) + bucketGroup: false + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: struct,n:int> + expr: _col1 + type: struct,n:int> + Reduce Operator Tree: + Group By Operator + aggregations: + expr: example_max_n(VALUE._col0) + expr: example_max_n(VALUE._col1) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: array + expr: _col1 + type: array + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT example_max_n(substr(value,5),10), + example_max_n(IF(substr(value,5) > 250, NULL, substr(value,5)),10) +FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/home/bjia/trunk/VENDOR.hive/trunk/build/ql/tmp/1462992517/10000 +POSTHOOK: query: SELECT example_max_n(substr(value,5),10), + example_max_n(IF(substr(value,5) > 250, NULL, substr(value,5)),10) +FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/home/bjia/trunk/VENDOR.hive/trunk/build/ql/tmp/1462992517/10000 +[498.0,498.0,498.0,497.0,496.0,495.0,494.0,493.0,492.0,492.0] [249.0,248.0,247.0,244.0,242.0,242.0,241.0,239.0,239.0,238.0] +PREHOOK: query: DROP TEMPORARY FUNCTION example_max_n +PREHOOK: type: DROPFUNCTION +POSTHOOK: query: DROP TEMPORARY FUNCTION example_max_n +POSTHOOK: type: DROPFUNCTION Index: contrib/src/test/queries/clientpositive/udaf_example_min_n.q =================================================================== --- contrib/src/test/queries/clientpositive/udaf_example_min_n.q (revision 0) +++ contrib/src/test/queries/clientpositive/udaf_example_min_n.q (revision 0) @@ -0,0 +1,13 @@ +add jar ../build/contrib/hive_contrib.jar; +CREATE TEMPORARY FUNCTION example_min_n AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFMinN'; + +EXPLAIN +SELECT example_min_n(substr(value,5),10), + example_min_n(IF(substr(value,5) < 250, NULL, substr(value,5)),10) +FROM src; + +SELECT example_min_n(substr(value,5),10), + example_min_n(IF(substr(value,5) < 250, NULL, substr(value,5)),10) +FROM src; + +DROP TEMPORARY FUNCTION example_min_n; Index: contrib/src/test/queries/clientpositive/udaf_example_max_n.q =================================================================== --- contrib/src/test/queries/clientpositive/udaf_example_max_n.q (revision 0) +++ contrib/src/test/queries/clientpositive/udaf_example_max_n.q (revision 0) @@ -0,0 +1,14 @@ +add jar ../build/contrib/hive_contrib.jar; + +CREATE TEMPORARY FUNCTION example_max_n AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFMaxN'; + +EXPLAIN +SELECT example_max_n(substr(value,5),10), + example_max_n(IF(substr(value,5) > 250, NULL, substr(value,5)),10) +FROM src; + +SELECT example_max_n(substr(value,5),10), + example_max_n(IF(substr(value,5) > 250, NULL, substr(value,5)),10) +FROM src; + +DROP TEMPORARY FUNCTION example_max_n; Index: contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/SortedArrayUtils.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/SortedArrayUtils.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/SortedArrayUtils.java (revision 0) @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.contrib.udaf.example; +import java.util.*; + +public class SortedArrayUtils { + + static void binaryInsert(ArrayList array, Double value, boolean ascending) { + + int position = 0; + + if (ascending) { + position = Collections.binarySearch(array, value); + } else { + position = Collections.binarySearch(array, value, + new Comparator() { + public int compare(Double o1, Double o2) { + return Double.compare(o2.doubleValue(), o1.doubleValue()); + } + } + ); + } + + if (position < 0) { + position = (-position) - 1; + } + + array.add(position, value); + } + + static ArrayList sortedMerge(ArrayList a1, ArrayList a2, final boolean ascending, int n) { + + Comparator comparator = new Comparator() { + public int compare(Double o1, Double o2) { + if (ascending) { + return Double.compare(o1.doubleValue(), o2.doubleValue()); + } else { + return Double.compare(o2.doubleValue(), o1.doubleValue()); + } + } + }; + + int n1 = a1.size(); + int n2 = a2.size(); + int p1 = 0; + int p2 = 0; + + ArrayList output = new ArrayList(n); + + while (output.size() < n && (p1 < n1 || p2 < n2)) { + if (p1 < n1) { + if (p2 == n2 || comparator.compare(a1.get(p1), a2.get(p2)) < 0) { + output.add(a1.get(p1++)); + } + } + if (output.size() == n) { + break; + } + if (p2 < n2) { + if (p1 == n1 || comparator.compare(a2.get(p2), a1.get(p1)) < 0) { + output.add(a2.get(p2++)); + } + } + } + + return output; + } +} + Index: contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFMinN.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFMinN.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFMinN.java (revision 0) @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.contrib.udaf.example; + +import org.apache.hadoop.hive.ql.exec.UDAF; +import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; +import java.util.*; + +public class UDAFMinN extends UDAF{ + + /** + * Hive will automatically look for all internal classes of the UDAF + * that implements UDAFEvaluator. + */ + public static class UDAFMinNState { + private ArrayList minNArray; //This ArrrayList holds the max N + private int n; // This is the N + } + + public static class UDAFMinNEvaluator implements UDAFEvaluator { + + UDAFMinNState state; //Define a state + + public UDAFMinNEvaluator() { + super(); + state = new UDAFMinNState (); + init(); + } + + /** + * Reset the state. + */ + public void init() { + state.minNArray = new ArrayList (); + state.n = 0; + } + + /** + * Iterate through one row of original data. + * + * The number and type of arguments need to the same as we call + * this UDAF from Hive command line. + * + * This function should always return true. + * This function fills MinNArray with one element and then adds following elements by binary insertion. + * When MinNArray has a size of N and the following element is greater than the smallest element in MinNArray, insert it and remove the smallest element in MinNArray. + */ + public boolean iterate(Double o, int n) { + state.n = n; + if (o != null) { + if (state.minNArray.size() < n || o < state.minNArray.get(state.minNArray.size()-1)) { + SortedArrayUtils.binaryInsert(state.minNArray, o, true); + if (state.minNArray.size() > n) { + state.minNArray.remove(state.minNArray.size()-1); + } + } + } + return true; + } + + public UDAFMinNState terminatePartial() { + // This is SQL standard - min_n of zero items should be null. + return state.minNArray.size() == 0 ? null : state; + } + + /** Two pointers are created to track the minimal elements in both o and MinNArray + * The smallest element is added into tempArrayList + * Consider the sizes of o and MinNArray may be different. + */ + public boolean merge(UDAFMinNState o) { + if (o != null) { + state.n = o.n; + state.minNArray = SortedArrayUtils.sortedMerge(o.minNArray, state.minNArray, true, o.n); + } + return true; + } + + /** + * Terminates the min N lookup and return the final result. + */ + public ArrayList terminate() { + // This is SQL standard - return state.MinNArray, or null if the size is zero. + return state.minNArray.size() == 0 ? null : state.minNArray; + } + } + +} + Index: contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFMaxN.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFMaxN.java (revision 0) +++ contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFMaxN.java (revision 0) @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.contrib.udaf.example; + +import org.apache.hadoop.hive.ql.exec.UDAF; +import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; +import java.util.*; + +public class UDAFMaxN extends UDAF{ + + /** + * Hive will automatically look for all internal classes of the UDAF + * that implements UDAFEvaluator. + */ + public static class UDAFMaxNState { + private ArrayList maxNArray; //This ArrrayList holds the max N + private int n; // This is the N + } + + public static class UDAFMaxNEvaluator implements UDAFEvaluator { + + UDAFMaxNState state; //Define a state + + public UDAFMaxNEvaluator() { + super(); + state = new UDAFMaxNState (); + init(); + } + + /** + * Reset the state. + */ + public void init() { + state.maxNArray = new ArrayList (); + state.n = 0; + } + + /** + * Iterate through one row of original data. + * + * The number and type of arguments need to the same as we call + * this UDAF from Hive command line. + * + * This function should always return true. + * This function fills MaxNArray with one element and then adds following elements by binary insertion. + * When MaxNArray has a size of N and the following element is greater than the smallest element in MaxNArray, insert it and remove the smallest element in MaxNArray. + */ + public boolean iterate(Double o, int n) { + state.n = n; + if (o != null) { + if (state.maxNArray.size() < n || o > state.maxNArray.get(state.maxNArray.size()-1)) { + SortedArrayUtils.binaryInsert(state.maxNArray, o, false); + if (state.maxNArray.size() > n) { + state.maxNArray.remove(state.maxNArray.size()-1); + } + } + } + return true; + } + + public UDAFMaxNState terminatePartial() { + // This is SQL standard - max_n of zero items should be null. + return state.maxNArray.size() == 0 ? null : state; + } + + /** Two pointers are created to track the maximal elements in both o and MaxNArray. + * The smallest element is added into tempArrayList + * Consider the sizes of o and MaxNArray may be different. + */ + public boolean merge(UDAFMaxNState o) { + if (o != null) { + state.n = o.n; + state.maxNArray = SortedArrayUtils.sortedMerge(o.maxNArray, state.maxNArray, false, o.n); + } + return true; + } + + /** + * Terminates the max N lookup and return the final result. + */ + public ArrayList terminate() { + // This is SQL standard - return state.MaxNArray, or null if the size is zero. + return state.maxNArray.size() == 0 ? null : state.maxNArray; + } + } + +} +