Index: ql/src/test/results/clientpositive/udaf_percentile_approx.q.out =================================================================== --- ql/src/test/results/clientpositive/udaf_percentile_approx.q.out (revision 0) +++ ql/src/test/results/clientpositive/udaf_percentile_approx.q.out (revision 0) @@ -0,0 +1,108 @@ +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-37_073_6076210560386322054/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-37_073_6076210560386322054/10000 +255.5 +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5, 100) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-41_860_8275933713801449697/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5, 100) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-41_860_8275933713801449697/10000 +252.77777777777777 +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5, 1000) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-46_334_9017578099479193081/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5, 1000) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-46_334_9017578099479193081/10000 +255.5 +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-51_723_1654864352248105322/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-51_723_1654864352248105322/10000 +255.5 +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5, 100) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-56_385_7799592520279056727/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5, 100) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-56_385_7799592520279056727/10000 +252.77777777777777 +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5, 1000) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-00_946_3923666334878603072/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5, 1000) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-00_946_3923666334878603072/10000 +255.5 +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98)) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-05_574_6679873308435909842/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98)) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-05_574_6679873308435909842/10000 +[26.0,255.5,479.0,491.0] +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98), 100) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-10_350_1432232531700471217/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98), 100) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-10_350_1432232531700471217/10000 +[24.07,252.77777777777777,476.9444444444444,487.82] +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98), 1000) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-15_355_3332753751548479422/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98), 1000) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-15_355_3332753751548479422/10000 +[26.0,255.5,479.0,491.0] +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98)) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-19_897_625588204506780432/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98)) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-19_897_625588204506780432/10000 +[26.0,255.5,479.0,491.0] +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98), 100) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-24_513_3382890303984991141/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98), 100) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-24_513_3382890303984991141/10000 +[24.07,252.77777777777777,476.9444444444444,487.82] +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98), 1000) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-29_036_9025181836738993015/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98), 1000) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-29_036_9025181836738993015/10000 +[26.0,255.5,479.0,491.0] Index: ql/src/test/results/clientpositive/show_functions.q.out =================================================================== --- ql/src/test/results/clientpositive/show_functions.q.out (revision 958071) +++ ql/src/test/results/clientpositive/show_functions.q.out (working copy) @@ -89,6 +89,7 @@ or parse_url percentile +percentile_approx pi pmod positive Index: ql/src/test/queries/clientpositive/udaf_percentile_approx.q =================================================================== --- ql/src/test/queries/clientpositive/udaf_percentile_approx.q (revision 0) +++ ql/src/test/queries/clientpositive/udaf_percentile_approx.q (revision 0) @@ -0,0 +1,16 @@ + +SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5, 100) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5, 1000) FROM src; + +SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5, 100) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5, 1000) FROM src; + +SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98)) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98), 100) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98), 1000) FROM src; + +SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98)) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98), 100) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98), 1000) FROM src; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (revision 958071) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (working copy) @@ -137,6 +137,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFHistogramNumeric; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStd; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStdSample; @@ -360,6 +361,7 @@ registerGenericUDAF("var_samp", new GenericUDAFVarianceSample()); registerGenericUDAF("histogram_numeric", new GenericUDAFHistogramNumeric()); + registerGenericUDAF("percentile_approx", new GenericUDAFPercentileApprox()); registerUDAF("percentile", UDAFPercentile.class); Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFHistogramNumeric.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFHistogramNumeric.java (revision 958071) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFHistogramNumeric.java (working copy) @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.udf.generic; import java.util.ArrayList; -import java.util.HashMap; import java.util.Arrays; import java.util.Random; @@ -51,7 +50,18 @@ * bin centers and heights. */ @Description(name = "histogram_numeric", - value = "_FUNC_(expr, nb) - Computes a histogram on numeric 'expr' using nb bins.") + value = "_FUNC_(expr, nb) - Computes a histogram on numeric 'expr' using nb bins.", + extended = "Example:\n" + + "> SELECT histogram_numeric(val, 3) FROM src;\n" + + "[{\"x\":100,\"y\":14.0},{\"x\":200,\"y\":22.0},{\"x\":290.5,\"y\":11.0}]\n" + + "The return value is an array of (x,y) pairs representing the centers of the " + + "histogram's bins. As the value of 'nb' is increased, the histogram approximation" + + "gets finer-grained, but may yield artifacts around outliers. In practice, 20-40 " + + "histogram bins appear to work well, with more bins being required for skewed or " + + "smaller datasets. Note that this function creates a histogram with non-uniform " + + "bin widths. It offers no guarantees in terms of the mean-squared-error of the " + + "histogram, but in practice is comparable to the histograms produced by the R/S-Plus " + + "statistical computing packages." ) public class GenericUDAFHistogramNumeric implements GenericUDAFResolver { // class static variables static final Log LOG = LogFactory.getLog(GenericUDAFHistogramNumeric.class.getName()); @@ -102,7 +112,7 @@ } /** - * Construct a histogram using the algorithm described by Ben-Haim and Tom-Tov. + * Construct a histogram using an algorithm described by Ben-Haim and Tom-Tov. * * The algorithm is a heuristic adapted from the following paper: * Yael Ben-Haim and Elad Tom-Tov, "A streaming parallel decision tree algorithm", @@ -119,18 +129,11 @@ // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations (list of doubles) private StandardListObjectInspector loi; - // A PRNG for breaking ties in histogram bin merging - Random prng; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); - // init the RNG for breaking ties in histogram merging. A fixed seed is specified here - // to aid testing, but can be eliminated to use a time-based seed (and have non-deterministic - // results). - prng = new Random(31183); - // init input object inspectors if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) { assert(parameters.length == 2); @@ -166,20 +169,10 @@ @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { - ArrayList partialResult = new ArrayList(); - StdAgg myagg = (StdAgg) agg; - // Return a single ArrayList where the first element is the number of histogram bins, // and subsequent elements represent histogram (x,y) pairs. - partialResult.add(new DoubleWritable(myagg.nbins)); - if(myagg.hist != null) { - for(int i = 0; i < myagg.nusedbins; i++) { - partialResult.add(new DoubleWritable(myagg.hist[i].x)); - partialResult.add(new DoubleWritable(myagg.hist[i].y)); - } - } - - return partialResult; + StdAgg myagg = (StdAgg) agg; + return myagg.histogram.serialize(); } @@ -187,14 +180,14 @@ public Object terminate(AggregationBuffer agg) throws HiveException { StdAgg myagg = (StdAgg) agg; - if (myagg.nusedbins < 1) { // SQL standard - return null for zero elements + if (myagg.histogram.getUsedBins() < 1) { // SQL standard - return null for zero elements return null; } else { ArrayList result = new ArrayList(); - for(int i = 0; i < myagg.nusedbins; i++) { + for(int i = 0; i < myagg.histogram.getUsedBins(); i++) { DoubleWritable[] bin = new DoubleWritable[2]; - bin[0] = new DoubleWritable(myagg.hist[i].x); - bin[1] = new DoubleWritable(myagg.hist[i].y); + bin[0] = new DoubleWritable(myagg.histogram.getBin(i).x); + bin[1] = new DoubleWritable(myagg.histogram.getBin(i).y); result.add(bin); } return result; @@ -208,138 +201,9 @@ } ArrayList partialHistogram = (ArrayList) loi.getList(partial); StdAgg myagg = (StdAgg) agg; - - if(myagg.nbins == 0 || myagg.nusedbins == 0) { - // The aggregation buffer has nothing in it, so just copy over 'partial' into myagg - // by deserializing the ArrayList of (x,y) pairs into an array of Coord objects - myagg.nbins = (int) ((DoubleWritable) partialHistogram.get(0)).get(); - myagg.nusedbins = (partialHistogram.size()-1)/2; - myagg.hist = new StdAgg.Coord[myagg.nbins+1]; // +1 to hold a temporary bin for insert() - for(int i = 1; i < partialHistogram.size(); i+=2) { - myagg.hist[(i-1)/2] = new StdAgg.Coord(); - myagg.hist[(i-1)/2].x = ((DoubleWritable) partialHistogram.get(i)).get(); - myagg.hist[(i-1)/2].y = ((DoubleWritable) partialHistogram.get(i+1)).get(); - } - } else { - // The aggregation buffer already contains a partial histogram. Therefore, we need - // to merge histograms using Algorithm #2 from the Ben-Haim and Tom-Tov paper. - StdAgg.Coord[] tmp_histogram = new StdAgg.Coord[myagg.nusedbins - + (partialHistogram.size()-1)/2]; - for(int j = 0; j < tmp_histogram.length; j++) { - tmp_histogram[j] = new StdAgg.Coord(); - } - - // Copy all the histogram bins from 'myagg' and 'partial' into an overstuffed histogram - int i; - for(i = 0; i < myagg.nusedbins; i++) { - tmp_histogram[i].x = myagg.hist[i].x; - tmp_histogram[i].y = myagg.hist[i].y; - } - for(int j = 1; j < partialHistogram.size(); j+=2, i++) { - tmp_histogram[i].x = ((DoubleWritable) partialHistogram.get(j)).get(); - tmp_histogram[i].y = ((DoubleWritable) partialHistogram.get(j+1)).get(); - } - Arrays.sort(tmp_histogram); - - // Now trim the overstuffed histogram down to the correct number of bins - myagg.hist = tmp_histogram; - myagg.nusedbins += (partialHistogram.size()-1)/2; - trim(myagg); - } + myagg.histogram.merge(partialHistogram); } - // Algorithm #1 from the Ben-Haim and Tom-Tov paper: histogram update procedure for a single - // new data point 'v'. - private void insert(StdAgg myagg, double v) { - StdAgg.Coord[] histogram = myagg.hist; - - // Binary search to find the closest bucket that v should go into. - // 'bin' should be interpreted as the bin to shift right in order to accomodate - // v. As a result, bin is in the range [0,N], where N means that the value v is - // greater than all the N bins currently in the histogram. It is also possible that - // a bucket centered at 'v' already exists, so this must be checked in the next step. - int bin = 0; - for(int l=0, r=myagg.nusedbins; l < r; ) { - bin = (l+r)/2; - if(histogram[bin].x > v) { - r = bin; - } else { - if(histogram[bin].x < v) { - l = ++bin; - } else { - break; // break loop on equal comparator - } - } - } - - // If we found an exact bin match for value v, then just increment that bin's count. - // Otherwise, we need to insert a new bin and trim the resulting histogram back to size. - // A possible optimization here might be to set some threshold under which 'v' is just - // assumed to be equal to the closest bin -- if fabs(v-histogram[bin].x) < THRESHOLD, then - // just increment 'bin' - if(bin < myagg.nusedbins && histogram[bin].x == v) { - histogram[bin].y++; - } else { - for(int i = myagg.nusedbins; i > bin; i--) { - myagg.hist[i].x = myagg.hist[i-1].x; - myagg.hist[i].y = myagg.hist[i-1].y; - } - myagg.hist[bin].x = v; // new histogram bin for value 'v' - myagg.hist[bin].y = 1; // of height 1 unit - - // Trim the histogram down to the correct number of bins. - if(++myagg.nusedbins > myagg.nbins) { - trim(myagg); - } - } - } - - // Trims a histogram down to 'nbins' bins by iteratively merging the closest bins. - // If two pairs of bins are equally close to each other, decide uniformly at random which - // pair to merge, based on a PRNG. - private void trim(StdAgg myagg) { - // Ensure that there are at least 3 histogram bins (because nbins>=2). - if(myagg.nusedbins <= myagg.nbins) { - return; - } - StdAgg.Coord[] histogram = myagg.hist; - - while(myagg.nusedbins > myagg.nbins) { - // Find the closest histogram bins in terms of x coordinates. Break ties randomly. - double smallestdiff = histogram[1].x - histogram[0].x; - int smallestdiffloc = 0, smallestdiffcount = 1; - for(int i = 1; i < myagg.nusedbins-1; i++) { - double diff = histogram[i+1].x - histogram[i].x; - if(diff < smallestdiff) { - smallestdiff = diff; - smallestdiffloc = i; - smallestdiffcount = 1; - } else { - if(diff == smallestdiff && prng.nextDouble() <= (1.0/++smallestdiffcount) ) { - smallestdiffloc = i; - } - } - } - - // Merge the two closest bins into their average x location, weighted by their heights. - // The height of the new bin is the sum of the heights of the old bins. - double d = histogram[smallestdiffloc].y + histogram[smallestdiffloc+1].y; - histogram[smallestdiffloc].x *= histogram[smallestdiffloc].y / d; - histogram[smallestdiffloc].x += histogram[smallestdiffloc+1].x / d * - histogram[smallestdiffloc+1].y; - histogram[smallestdiffloc].y = d; - - // Shift the remaining bins left one position - for(int i = smallestdiffloc+1; i < myagg.nusedbins-1; i++) { - histogram[i].x = histogram[i+1].x; - histogram[i].y = histogram[i+1].y; - } - myagg.nusedbins--; - } - } - - private boolean warned = false; - @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { assert (parameters.length == 2); @@ -351,56 +215,26 @@ // Parse out the number of histogram bins only once, if we haven't already done // so before. We need at least 2 bins; otherwise, there is no point in creating // a histogram. - if(myagg.nbins == 0) { - try { - myagg.nbins = PrimitiveObjectInspectorUtils.getInt(parameters[1], nbinsOI); - } catch(NumberFormatException e) { - throw new HiveException(getClass().getSimpleName() + " " + - StringUtils.stringifyException(e)); - } - if(myagg.nbins < 2) { + if(!myagg.histogram.isReady()) { + int nbins = PrimitiveObjectInspectorUtils.getInt(parameters[1], nbinsOI); + if(nbins < 2) { throw new HiveException(getClass().getSimpleName() + " needs nbins to be at least 2," - + " but you supplied " + myagg.nbins + "."); + + " but you supplied " + nbins + "."); } - // allocate memory for the histogram - myagg.hist = new StdAgg.Coord[myagg.nbins+1]; // +1 is used for holding a temporary bin - for(int i = 0; i < myagg.nbins+1; i++) { - myagg.hist[i] = new StdAgg.Coord(); - } - myagg.nusedbins = 0; + // allocate memory for the histogram bins + myagg.histogram.allocate(nbins); } // Process the current data point - Object p = parameters[0]; - if (p != null) { - double v = PrimitiveObjectInspectorUtils.getDouble(p, inputOI); - insert(myagg, v); - } + double v = PrimitiveObjectInspectorUtils.getDouble(parameters[0], inputOI); + myagg.histogram.add(v); } // Aggregation buffer definition and manipulation methods static class StdAgg implements AggregationBuffer { - static class Coord implements Comparable { - double x; - double y; - - public int compareTo(Object other) { - Coord o = (Coord) other; - if(x < o.x) { - return -1; - } - if(x > o.x) { - return 1; - } - return 0; - } - }; - - int nbins; // maximum number of histogram bins - int nusedbins; // number of histogram bins actually used - Coord[] hist; // histogram coordinates + NumericHistogram histogram; // the histogram object }; @Override @@ -413,9 +247,8 @@ @Override public void reset(AggregationBuffer agg) throws HiveException { StdAgg myagg = (StdAgg) agg; - myagg.nbins = 0; - myagg.nusedbins = 0; - myagg.hist = null; + myagg.histogram = new NumericHistogram(); + myagg.histogram.reset(); } } } Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/NumericHistogram.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/NumericHistogram.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/NumericHistogram.java (revision 0) @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Random; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; + + +/** + * A generic, re-usable histogram class that supports partial aggregations. + * The algorithm is a heuristic adapted from the following paper: + * Yael Ben-Haim and Elad Tom-Tov, "A streaming parallel decision tree algorithm", + * J. Machine Learning Research 11 (2010), pp. 849--872. Although there are no approximation + * guarantees, it appears to work well with adequate data and a large (e.g., 20-80) number + * of histogram bins. + */ +public class NumericHistogram { + /** + * The Coord class defines a histogram bin, which is just an (x,y) pair. + */ + static class Coord implements Comparable { + double x; + double y; + + public int compareTo(Object other) { + Coord o = (Coord) other; + if(x < o.x) { + return -1; + } + if(x > o.x) { + return 1; + } + return 0; + } + }; + + // Class variables + private int nbins; + private int nusedbins; + private Coord[] bins; + private Random prng; + + /** + * Creates a new histogram object. Note that the allocate() or merge() + * method must be called before the histogram can be used. + */ + public NumericHistogram() { + nbins = 0; + nusedbins = 0; + bins = null; + + // init the RNG for breaking ties in histogram merging. A fixed seed is specified here + // to aid testing, but can be eliminated to use a time-based seed (which would + // make the algorithm non-deterministic). + prng = new Random(31183); + } + + /** + * Resets a histogram object to its initial state. allocate() or merge() must be + * called again before use. + */ + public void reset() { + bins = null; + nbins = nusedbins = 0; + } + + /** + * Returns the number of bins currently being used by the histogram. + */ + public int getUsedBins() { + return nusedbins; + } + + /** + * Returns true if this histogram object has been initialized by calling merge() + * or allocate(). + */ + public boolean isReady() { + return nbins != 0; + } + + /** + * Returns a particular histogram bin. + */ + public Coord getBin(int b) { + return bins[b]; + } + + /** + * Sets the number of histogram bins to use for approximating data. + * + * @param num_bins Number of non-uniform-width histogram bins to use + */ + public void allocate(int num_bins) { + nbins = num_bins; + bins = new Coord[nbins+1]; + for(int i = 0; i < nbins+1; i++) { + bins[i] = new Coord(); + } + nusedbins = 0; + } + + /** + * Takes a serialized histogram created by the serialize() method and merges + * it with the current histogram object. + * + * @param other A serialized histogram created by the serialize() method + * @see merge + */ + public void merge(ArrayList other) { + if(other == null) { + return; + } + + if(nbins == 0 || nusedbins == 0) { + // Our aggregation buffer has nothing in it, so just copy over 'other' + // by deserializing the ArrayList of (x,y) pairs into an array of Coord objects + nbins = (int) (other.get(0).get()); + nusedbins = (other.size()-1)/2; + bins = new Coord[nbins+1]; // +1 to hold a temporary bin for insert() + for(int i = 1; i < other.size(); i+=2) { + bins[(i-1)/2] = new Coord(); + bins[(i-1)/2].x = other.get(i).get(); + bins[(i-1)/2].y = other.get(i+1).get(); + } + } else { + // The aggregation buffer already contains a partial histogram. Therefore, we need + // to merge histograms using Algorithm #2 from the Ben-Haim and Tom-Tov paper. + Coord[] tmp_bins = new Coord[nusedbins + (other.size()-1)/2]; + for(int j = 0; j < tmp_bins.length; j++) { + tmp_bins[j] = new Coord(); + } + + // Copy all the histogram bins from us and 'other' into an overstuffed histogram + int i; + for(i = 0; i < nusedbins; i++) { + tmp_bins[i].x = bins[i].x; + tmp_bins[i].y = bins[i].y; + } + for(int j = 1; j < other.size(); j+=2, i++) { + tmp_bins[i].x = other.get(j).get(); + tmp_bins[i].y = other.get(j+1).get(); + } + Arrays.sort(tmp_bins); + + // Now trim the overstuffed histogram down to the correct number of bins + bins = tmp_bins; + nusedbins += (other.size()-1)/2; + trim(); + } + } + + + /** + * Adds a new data point to the histogram approximation. Make sure you have + * called either allocate() or merge() first. This method implements Algorithm #1 + * from Ben-Haim and Tom-Tov, "A Streaming Parallel Decision Tree Algorithm", JMLR 2010. + * + * @param v The data point to add to the histogram approximation. + */ + public void add(double v) { + // Binary search to find the closest bucket that v should go into. + // 'bin' should be interpreted as the bin to shift right in order to accomodate + // v. As a result, bin is in the range [0,N], where N means that the value v is + // greater than all the N bins currently in the histogram. It is also possible that + // a bucket centered at 'v' already exists, so this must be checked in the next step. + int bin = 0; + for(int l=0, r=nusedbins; l < r; ) { + bin = (l+r)/2; + if(bins[bin].x > v) { + r = bin; + } else { + if(bins[bin].x < v) { + l = ++bin; + } else { + break; // break loop on equal comparator + } + } + } + + // If we found an exact bin match for value v, then just increment that bin's count. + // Otherwise, we need to insert a new bin and trim the resulting histogram back to size. + // A possible optimization here might be to set some threshold under which 'v' is just + // assumed to be equal to the closest bin -- if fabs(v-bins[bin].x) < THRESHOLD, then + // just increment 'bin'. This is not done now because we don't want to make any + // assumptions about the range of numeric data being analyzed. + if(bin < nusedbins && bins[bin].x == v) { + bins[bin].y++; + } else { + for(int i = nusedbins; i > bin; i--) { + bins[i].x = bins[i-1].x; + bins[i].y = bins[i-1].y; + } + bins[bin].x = v; // new bins bin for value 'v' + bins[bin].y = 1; // of height 1 unit + + // Trim the bins down to the correct number of bins. + if(++nusedbins > nbins) { + trim(); + } + } + } + + /** + * Trims a histogram down to 'nbins' bins by iteratively merging the closest bins. + * If two pairs of bins are equally close to each other, decide uniformly at random which + * pair to merge, based on a PRNG. + */ + private void trim() { + while(nusedbins > nbins) { + // Find the closest pair of bins in terms of x coordinates. Break ties randomly. + double smallestdiff = bins[1].x - bins[0].x; + int smallestdiffloc = 0, smallestdiffcount = 1; + for(int i = 1; i < nusedbins-1; i++) { + double diff = bins[i+1].x - bins[i].x; + if(diff < smallestdiff) { + smallestdiff = diff; + smallestdiffloc = i; + smallestdiffcount = 1; + } else { + if(diff == smallestdiff && prng.nextDouble() <= (1.0/++smallestdiffcount) ) { + smallestdiffloc = i; + } + } + } + + // Merge the two closest bins into their average x location, weighted by their heights. + // The height of the new bin is the sum of the heights of the old bins. + double d = bins[smallestdiffloc].y + bins[smallestdiffloc+1].y; + bins[smallestdiffloc].x *= bins[smallestdiffloc].y / d; + bins[smallestdiffloc].x += bins[smallestdiffloc+1].x / d * + bins[smallestdiffloc+1].y; + bins[smallestdiffloc].y = d; + + // Shift the remaining bins left one position + for(int i = smallestdiffloc+1; i < nusedbins-1; i++) { + bins[i].x = bins[i+1].x; + bins[i].y = bins[i+1].y; + } + nusedbins--; + } + } + + /** + * Gets an approximate quantile value from the current histogram. Some popular + * quantiles are 0.5 (median), 0.95, and 0.98. + * + * @param q The requested quantile, must be strictly within the range (0,1). + * @return The quantile value. + */ + public double quantile(double q) { + assert(bins != null && nusedbins > 0 && nbins > 0); + double sum = 0, csum = 0; + int b; + for(b = 0; b < nusedbins; b++) { + sum += bins[b].y; + } + for(b = 0; b < nusedbins; b++) { + csum += bins[b].y; + if(csum / sum >= q) { + if(b == 0) { + return bins[b].x; + } + csum -= bins[b].y; + double r = bins[b-1].x + (q*sum - csum) * (bins[b].x-bins[b-1].x)/(bins[b].y); + return r; + } + } + return -1; // for Xlint, code will never reach here + } + + /** + * In preparation for a Hive merge() call, serializes the current histogram object into an + * ArrayList of DoubleWritable objects. This list is deserialized and merged by the + * merge method. + * + * @return An ArrayList of Hadoop DoubleWritable objects that represents the current + * histogram. + * @see merge(ArrayList) + */ + public ArrayList serialize() { + ArrayList result = new ArrayList(); + + // Return a single ArrayList where the first element is the number of bins bins, + // and subsequent elements represent bins (x,y) pairs. + result.add(new DoubleWritable(nbins)); + if(bins != null) { + for(int i = 0; i < nusedbins; i++) { + result.add(new DoubleWritable(bins[i].x)); + result.add(new DoubleWritable(bins[i].y)); + } + } + + return result; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox.java (revision 0) @@ -0,0 +1,350 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableDoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.util.StringUtils; + +/** + * Computes an approximate percentile (quantile) from an approximate histogram, for very + * large numbers of rows where the regular percentile() UDAF might run out of memory. + * + * The input is a single double value or an array of double values representing the quantiles + * requested. The output, corresponding to the input, is either an single double value or an + * array of doubles that are the quantile values. + */ +@Description(name = "percentile_approx", + value = "_FUNC_(expr, pc, [nb]) - For very large data, computes an approximate percentile " + + "value from a histogram, using the optional argument [nb] as the number of histogram" + + " bins to use. A higher value of nb results in a more accurate approximation, at " + + "the cost of higher memory usage.", + extended = "'expr' can be any numeric column, including doubles and floats, and 'pc' is " + + "either a single double/float with a requested percentile, or an array of double/" + + "float with multiple percentiles. If 'nb' is not specified, the default " + + "approximation is done with 10,000 histogram bins, which means that if there are " + + "10,000 or fewer unique values in 'expr', you can expect an exact result. The " + + "percentile() function always computes an exact percentile and can run out of " + + "memory if there are too many unique values in a column, which necessitates " + + "this function.\n" + + "Example (three percentiles requested using a finer histogram approximation):\n" + + "> SELECT percentile_approx(val, array(0.5, 0.95, 0.98), 100000) FROM somedata;\n" + + "[0.05,1.64,2.26]\n" ) +public class GenericUDAFPercentileApprox implements GenericUDAFResolver { + static final Log LOG = LogFactory.getLog(GenericUDAFPercentileApprox.class.getName()); + + @Override + public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException { + if (parameters.length != 2 && parameters.length != 3) { + throw new UDFArgumentTypeException(parameters.length - 1, + "Please specify either two or three arguments."); + } + + // Validate the first parameter, which is the expression to compute over. This should be a + // numeric primitive type. + if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { + throw new UDFArgumentTypeException(0, + "Only primitive type arguments are accepted but " + + parameters[0].getTypeName() + " was passed as parameter 1."); + } + switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + break; + default: + throw new UDFArgumentTypeException(0, + "Only numeric type arguments are accepted but " + + parameters[0].getTypeName() + " was passed as parameter 1."); + } + + // Validate the second parameter, which is either a solitary double or an array of doubles. + switch(parameters[1].getCategory()) { + case PRIMITIVE: + // Only a single double was passed as parameter 2, a single quantile is being requested + switch(((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory()) { + case FLOAT: + case DOUBLE: + break; + default: + throw new UDFArgumentTypeException(1, + "Only a float/double or float/double array argument is accepted as parameter 2, but " + + parameters[1].getTypeName() + " was passed instead."); + } + break; + + case LIST: + // An array was passed as parameter 2, make sure it's an array of primitives + if(((ListTypeInfo) parameters[1]).getListElementTypeInfo().getCategory() != + ObjectInspector.Category.PRIMITIVE) { + throw new UDFArgumentTypeException(1, + "A float/double array argument may be passed as parameter 2, but " + + parameters[1].getTypeName() + " was passed instead."); + } + // Now make sure it's an array of doubles or floats. We don't allow integer types here + // because percentile (really, quantile) values should generally be strictly between 0 and 1. + switch(((PrimitiveTypeInfo)((ListTypeInfo) parameters[1]).getListElementTypeInfo()). + getPrimitiveCategory()) { + case FLOAT: + case DOUBLE: + break; + default: + throw new UDFArgumentTypeException(1, + "A float/double array argument may be passed as parameter 2, but " + + parameters[1].getTypeName() + " was passed instead."); + } + break; + + default: + throw new UDFArgumentTypeException(1, + "Only a float/double or float/double array argument is accepted as parameter 2, but " + + parameters[1].getTypeName() + " was passed instead."); + } + + // If a third parameter has been specified, it should be an integer that specifies the number + // of histogram bins to use in the percentile approximation. + if(parameters.length == 3) { + if(parameters[2].getCategory() != ObjectInspector.Category.PRIMITIVE) { + throw new UDFArgumentTypeException(2, "Only a primitive argument is accepted as " + + "parameter 3, but " + parameters[2].getTypeName() + " was passed instead."); + } + switch(((PrimitiveTypeInfo) parameters[2]).getPrimitiveCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + break; + default: + throw new UDFArgumentTypeException(2, "Only an integer argument is accepted as " + + "parameter 3, but " + parameters[2].getTypeName() + " was passed instead."); + } + } + + return new GenericUDAFPercentileApproxEvaluator(); + } + + /** + * Construct a histogram using the algorithm described by Ben-Haim and Tom-Tov, and then + * use it to compute an approximate percentile value. + */ + public static class GenericUDAFPercentileApproxEvaluator extends GenericUDAFEvaluator { + // For PARTIAL1 and COMPLETE: ObjectInspectors for original data + private PrimitiveObjectInspector inputOI; + private ObjectInspector quantilesOI; + private PrimitiveObjectInspector nbinsOI; + + // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations (list of doubles) + private StandardListObjectInspector loi; + + @Override + public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { + super.init(m, parameters); + + // init input object inspectors + if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) { + inputOI = (PrimitiveObjectInspector) parameters[0]; + quantilesOI = parameters[1]; + if(parameters.length > 2) { + nbinsOI = (PrimitiveObjectInspector) parameters[2]; + } + } else { + loi = (StandardListObjectInspector) parameters[0]; + } + + // Init output object inspectors. + // + // The return type for a partial aggregation is still a list of doubles, as in + // GenericUDAFHistogramNumeric, but we add on the percentile values requested to the + // end, and handle serializing/deserializing before we pass things on to the parent + // method. + // The return type for FINAL and COMPLETE is a full aggregation result, which is also a + // list of DoubleWritables with the requested quantile values. The only exception is + // when a single double, as opposed to an array of doubles, is passed as a parameter. In + // that case, just return a single double value. + if (m == Mode.PARTIAL1 || m == Mode.COMPLETE || + quantilesOI.getCategory() == ObjectInspector.Category.LIST) { + return ObjectInspectorFactory.getStandardListObjectInspector( + PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); + } else { + return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; + } + } + + @Override + public void merge(AggregationBuffer agg, Object partial) throws HiveException { + if(partial == null) { + return; + } + PercentileAggBuf myagg = (PercentileAggBuf) agg; + ArrayList partialHistogram = (ArrayList) loi.getList(partial); + + // remove requested quantiles from the head of the list + int nquantiles = (int) partialHistogram.get(0).get(); + if(nquantiles > 0) { + myagg.quantiles = new double[nquantiles]; + for(int i = 1; i <= nquantiles; i++) { + myagg.quantiles[i-1] = partialHistogram.get(i).get(); + } + partialHistogram.subList(0, nquantiles+1).clear(); + } + + // merge histograms + myagg.histogram.merge(partialHistogram); + } + + @Override + public Object terminatePartial(AggregationBuffer agg) throws HiveException { + PercentileAggBuf myagg = (PercentileAggBuf) agg; + ArrayList result = new ArrayList(); + + if(myagg.quantiles != null) { + result.add(new DoubleWritable(myagg.quantiles.length)); + for(int i = 0; i < myagg.quantiles.length; i++) { + result.add(new DoubleWritable(myagg.quantiles[i])); + } + } else { + result.add(new DoubleWritable(0)); + } + result.addAll(myagg.histogram.serialize()); + + return result; + } + + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { + assert (parameters.length == 2 || parameters.length == 3); + if(parameters[0] == null || parameters[1] == null) { + return; + } + PercentileAggBuf myagg = (PercentileAggBuf) agg; + + // Parse out the requested quantiles just once, if we haven't already done so before. + if(myagg.quantiles == null) { + if(quantilesOI.getCategory() == ObjectInspector.Category.LIST) { + // Multiple quantiles are requested + int nquantiles = ((StandardListObjectInspector) quantilesOI).getListLength(parameters[1]); + assert(nquantiles >= 1); + myagg.quantiles = new double[nquantiles]; + StandardListObjectInspector sloi = (StandardListObjectInspector) quantilesOI; + for(int i = 0; i < nquantiles; i++) { + myagg.quantiles[i] = PrimitiveObjectInspectorUtils.getDouble( + sloi.getListElement(parameters[1], i), + (PrimitiveObjectInspector) sloi.getListElementObjectInspector()); + } + } else { + // Just one quantile is requested + myagg.quantiles = new double[1]; + myagg.quantiles[0] = PrimitiveObjectInspectorUtils.getDouble(parameters[1], + (PrimitiveObjectInspector) quantilesOI); + } + + + // Validate requested quantiles, make sure they're in [0,1] + for(int i = 0; i < myagg.quantiles.length; i++) { + if(myagg.quantiles[i] <= 0 || myagg.quantiles[i] >= 1) { + throw new HiveException(getClass().getSimpleName() + " requires percentile values to " + + "lie strictly between 0 and 1, but you supplied " + + myagg.quantiles[i]); + + } + } + } + + // Parse out the number of histogram bins just once, if we haven't done so before. + if(!myagg.histogram.isReady()) { + if(parameters.length == 3 && nbinsOI != null) { + // User has specified the number of histogram bins to use + myagg.histogram.allocate(PrimitiveObjectInspectorUtils.getInt(parameters[2], nbinsOI)); + } else { + // Choose a nice default value. + myagg.histogram.allocate(10000); + } + } + + // Get and process the current datum + double v = PrimitiveObjectInspectorUtils.getDouble(parameters[0], inputOI); + myagg.histogram.add(v); + } + + @Override + public Object terminate(AggregationBuffer agg) throws HiveException { + PercentileAggBuf myagg = (PercentileAggBuf) agg; + + if (myagg.histogram.getUsedBins() < 1) { // SQL standard - return null for zero elements + return null; + } else { + ArrayList result = new ArrayList(); + assert(myagg.quantiles != null); + for(int i = 0; i < myagg.quantiles.length; i++) { + result.add(new DoubleWritable(myagg.histogram.quantile(myagg.quantiles[i]))); + } + if(myagg.quantiles.length == 1) { + return result.get(0); + } else { + return result; + } + } + } + + // Aggregation buffer methods. We wrap GenericUDAFHistogramNumeric's aggregation buffer + // inside our own, so that we can also store requested quantile values between calls + static class PercentileAggBuf implements AggregationBuffer { + NumericHistogram histogram; // histogram used for quantile approximation + double[] quantiles; // the quantiles requested + }; + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + PercentileAggBuf result = new PercentileAggBuf(); + result.histogram = new NumericHistogram(); + reset(result); + return result; + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + PercentileAggBuf result = (PercentileAggBuf) agg; + result.histogram.reset(); + result.quantiles = null; + } + } +}