Index: ql/src/test/results/clientpositive/udaf_percentile_approx.q.out =================================================================== --- ql/src/test/results/clientpositive/udaf_percentile_approx.q.out (revision 0) +++ ql/src/test/results/clientpositive/udaf_percentile_approx.q.out (revision 0) @@ -0,0 +1,108 @@ +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-37_073_6076210560386322054/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-37_073_6076210560386322054/10000 +255.5 +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5, 100) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-41_860_8275933713801449697/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5, 100) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-41_860_8275933713801449697/10000 +252.77777777777777 +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5, 1000) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-46_334_9017578099479193081/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5, 1000) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-46_334_9017578099479193081/10000 +255.5 +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-51_723_1654864352248105322/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-51_723_1654864352248105322/10000 +255.5 +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5, 100) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-56_385_7799592520279056727/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5, 100) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-43-56_385_7799592520279056727/10000 +252.77777777777777 +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5, 1000) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-00_946_3923666334878603072/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5, 1000) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-00_946_3923666334878603072/10000 +255.5 +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98)) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-05_574_6679873308435909842/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98)) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-05_574_6679873308435909842/10000 +[26.0,255.5,479.0,491.0] +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98), 100) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-10_350_1432232531700471217/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98), 100) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-10_350_1432232531700471217/10000 +[24.07,252.77777777777777,476.9444444444444,487.82] +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98), 1000) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-15_355_3332753751548479422/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98), 1000) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-15_355_3332753751548479422/10000 +[26.0,255.5,479.0,491.0] +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98)) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-19_897_625588204506780432/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98)) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-19_897_625588204506780432/10000 +[26.0,255.5,479.0,491.0] +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98), 100) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-24_513_3382890303984991141/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98), 100) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-24_513_3382890303984991141/10000 +[24.07,252.77777777777777,476.9444444444444,487.82] +PREHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98), 1000) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-29_036_9025181836738993015/10000 +POSTHOOK: query: SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98), 1000) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/var/folders/7i/7iCDbWRkGHOcgJgX0zscimPXXts/-Tmp-/mlahiri/hive_2010-06-23_12-44-29_036_9025181836738993015/10000 +[26.0,255.5,479.0,491.0] Index: ql/src/test/queries/clientpositive/udaf_percentile_approx.q =================================================================== --- ql/src/test/queries/clientpositive/udaf_percentile_approx.q (revision 0) +++ ql/src/test/queries/clientpositive/udaf_percentile_approx.q (revision 0) @@ -0,0 +1,16 @@ + +SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5, 100) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS double), 0.5, 1000) FROM src; + +SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5, 100) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS int), 0.5, 1000) FROM src; + +SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98)) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98), 100) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS double), array(0.05,0.5,0.95,0.98), 1000) FROM src; + +SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98)) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98), 100) FROM src; +SELECT percentile_approx(cast(substr(src.value,5) AS int), array(0.05,0.5,0.95,0.98), 1000) FROM src; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (revision 957296) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (working copy) @@ -137,6 +137,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFHistogramNumeric; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStd; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStdSample; @@ -360,6 +361,7 @@ registerGenericUDAF("var_samp", new GenericUDAFVarianceSample()); registerGenericUDAF("histogram_numeric", new GenericUDAFHistogramNumeric()); + registerGenericUDAF("percentile_approx", new GenericUDAFPercentileApprox()); registerUDAF("percentile", UDAFPercentile.class); Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFHistogramNumeric.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFHistogramNumeric.java (revision 957296) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFHistogramNumeric.java (working copy) @@ -102,7 +102,7 @@ } /** - * Construct a histogram using the algorithm described by Ben-Haim and Tom-Tov. + * Construct a histogram using an algorithm described by Ben-Haim and Tom-Tov. * * The algorithm is a heuristic adapted from the following paper: * Yael Ben-Haim and Elad Tom-Tov, "A streaming parallel decision tree algorithm", @@ -119,18 +119,11 @@ // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations (list of doubles) private StandardListObjectInspector loi; - // A PRNG for breaking ties in histogram bin merging - Random prng; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); - // init the RNG for breaking ties in histogram merging. A fixed seed is specified here - // to aid testing, but can be eliminated to use a time-based seed (and have non-deterministic - // results). - prng = new Random(31183); - // init input object inspectors if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) { assert(parameters.length == 2); @@ -166,20 +159,10 @@ @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { - ArrayList partialResult = new ArrayList(); - StdAgg myagg = (StdAgg) agg; - // Return a single ArrayList where the first element is the number of histogram bins, // and subsequent elements represent histogram (x,y) pairs. - partialResult.add(new DoubleWritable(myagg.nbins)); - if(myagg.hist != null) { - for(int i = 0; i < myagg.nusedbins; i++) { - partialResult.add(new DoubleWritable(myagg.hist[i].x)); - partialResult.add(new DoubleWritable(myagg.hist[i].y)); - } - } - - return partialResult; + StdAgg myagg = (StdAgg) agg; + return myagg.histogram.serialize(); } @@ -187,14 +170,14 @@ public Object terminate(AggregationBuffer agg) throws HiveException { StdAgg myagg = (StdAgg) agg; - if (myagg.nusedbins < 1) { // SQL standard - return null for zero elements + if (myagg.histogram.nusedbins < 1) { // SQL standard - return null for zero elements return null; } else { ArrayList result = new ArrayList(); - for(int i = 0; i < myagg.nusedbins; i++) { + for(int i = 0; i < myagg.histogram.nusedbins; i++) { DoubleWritable[] bin = new DoubleWritable[2]; - bin[0] = new DoubleWritable(myagg.hist[i].x); - bin[1] = new DoubleWritable(myagg.hist[i].y); + bin[0] = new DoubleWritable(myagg.histogram.bins[i].x); + bin[1] = new DoubleWritable(myagg.histogram.bins[i].y); result.add(bin); } return result; @@ -208,63 +191,171 @@ } ArrayList partialHistogram = (ArrayList) loi.getList(partial); StdAgg myagg = (StdAgg) agg; + myagg.histogram.merge(partialHistogram); + } + + private boolean warned = false; + + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { + assert (parameters.length == 2); + if(parameters[0] == null || parameters[1] == null) { + return; + } + StdAgg myagg = (StdAgg) agg; + + // Parse out the number of histogram bins only once, if we haven't already done + // so before. We need at least 2 bins; otherwise, there is no point in creating + // a histogram. + if(myagg.histogram.nbins == 0) { + myagg.histogram.nbins = PrimitiveObjectInspectorUtils.getInt(parameters[1], nbinsOI); + if(myagg.histogram.nbins < 2) { + throw new HiveException(getClass().getSimpleName() + " needs nbins to be at least 2," + + " but you supplied " + myagg.histogram.nbins + "."); + } + + // allocate memory for the histogram bins + myagg.histogram.allocate(); + } + + // Process the current data point + double v = PrimitiveObjectInspectorUtils.getDouble(parameters[0], inputOI); + myagg.histogram.add(v); + } + + + // Aggregation buffer definition and manipulation methods + static protected class StdAgg implements AggregationBuffer { + NumericHistogram histogram; // the histogram object + }; + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + StdAgg result = new StdAgg(); + reset(result); + return result; + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + StdAgg myagg = (StdAgg) agg; + myagg.histogram = new NumericHistogram(); + myagg.histogram.reset(); + } + } + + + // A general re-usable histogram class. + static class NumericHistogram { + // Coord defines a histogram bin + static protected class Coord implements Comparable { + double x; + double y; + + public int compareTo(Object other) { + Coord o = (Coord) other; + if(x < o.x) { + return -1; + } + if(x > o.x) { + return 1; + } + return 0; + } + }; + + // Class variables + public int nbins; + public int nusedbins; + public Coord[] bins; + Random prng; + + // Create a new histogram object + public NumericHistogram() { + nbins = 0; + nusedbins = 0; + bins = null; + + // init the RNG for breaking ties in histogram merging. A fixed seed is specified here + // to aid testing, but can be eliminated to use a time-based seed (which would + // make the algorightmic non-deterministic). + prng = new Random(31183); + } + + public void reset() { + bins = null; + nbins = nusedbins = 0; + } + + // Allocates memory for a 'nbins' bins + public void allocate() { + bins = new Coord[nbins+1]; + for(int i = 0; i < nbins+1; i++) { + bins[i] = new Coord(); + } + nusedbins = 0; + } + + // Takes a serialized bins and merges it with the current histogram + public void merge(ArrayList other) { + if(other == null) { + return; + } - if(myagg.nbins == 0 || myagg.nusedbins == 0) { - // The aggregation buffer has nothing in it, so just copy over 'partial' into myagg + if(nbins == 0 || nusedbins == 0) { + // Our aggregation buffer has nothing in it, so just copy over 'other' // by deserializing the ArrayList of (x,y) pairs into an array of Coord objects - myagg.nbins = (int) ((DoubleWritable) partialHistogram.get(0)).get(); - myagg.nusedbins = (partialHistogram.size()-1)/2; - myagg.hist = new StdAgg.Coord[myagg.nbins+1]; // +1 to hold a temporary bin for insert() - for(int i = 1; i < partialHistogram.size(); i+=2) { - myagg.hist[(i-1)/2] = new StdAgg.Coord(); - myagg.hist[(i-1)/2].x = ((DoubleWritable) partialHistogram.get(i)).get(); - myagg.hist[(i-1)/2].y = ((DoubleWritable) partialHistogram.get(i+1)).get(); + nbins = (int) (other.get(0).get()); + nusedbins = (other.size()-1)/2; + bins = new Coord[nbins+1]; // +1 to hold a temporary bin for insert() + for(int i = 1; i < other.size(); i+=2) { + bins[(i-1)/2] = new Coord(); + bins[(i-1)/2].x = other.get(i).get(); + bins[(i-1)/2].y = other.get(i+1).get(); } } else { // The aggregation buffer already contains a partial histogram. Therefore, we need // to merge histograms using Algorithm #2 from the Ben-Haim and Tom-Tov paper. - StdAgg.Coord[] tmp_histogram = new StdAgg.Coord[myagg.nusedbins - + (partialHistogram.size()-1)/2]; - for(int j = 0; j < tmp_histogram.length; j++) { - tmp_histogram[j] = new StdAgg.Coord(); + Coord[] tmp_bins = new Coord[nusedbins + (other.size()-1)/2]; + for(int j = 0; j < tmp_bins.length; j++) { + tmp_bins[j] = new Coord(); } - // Copy all the histogram bins from 'myagg' and 'partial' into an overstuffed histogram + // Copy all the histogram bins from us and 'other' into an overstuffed histogram int i; - for(i = 0; i < myagg.nusedbins; i++) { - tmp_histogram[i].x = myagg.hist[i].x; - tmp_histogram[i].y = myagg.hist[i].y; + for(i = 0; i < nusedbins; i++) { + tmp_bins[i].x = bins[i].x; + tmp_bins[i].y = bins[i].y; } - for(int j = 1; j < partialHistogram.size(); j+=2, i++) { - tmp_histogram[i].x = ((DoubleWritable) partialHistogram.get(j)).get(); - tmp_histogram[i].y = ((DoubleWritable) partialHistogram.get(j+1)).get(); + for(int j = 1; j < other.size(); j+=2, i++) { + tmp_bins[i].x = other.get(j).get(); + tmp_bins[i].y = other.get(j+1).get(); } - Arrays.sort(tmp_histogram); + Arrays.sort(tmp_bins); // Now trim the overstuffed histogram down to the correct number of bins - myagg.hist = tmp_histogram; - myagg.nusedbins += (partialHistogram.size()-1)/2; - trim(myagg); + bins = tmp_bins; + nusedbins += (other.size()-1)/2; + trim(); } } + // Algorithm #1 from the Ben-Haim and Tom-Tov paper: histogram update procedure for a single // new data point 'v'. - private void insert(StdAgg myagg, double v) { - StdAgg.Coord[] histogram = myagg.hist; - + public void add(double v) { // Binary search to find the closest bucket that v should go into. // 'bin' should be interpreted as the bin to shift right in order to accomodate // v. As a result, bin is in the range [0,N], where N means that the value v is // greater than all the N bins currently in the histogram. It is also possible that // a bucket centered at 'v' already exists, so this must be checked in the next step. int bin = 0; - for(int l=0, r=myagg.nusedbins; l < r; ) { + for(int l=0, r=nusedbins; l < r; ) { bin = (l+r)/2; - if(histogram[bin].x > v) { + if(bins[bin].x > v) { r = bin; } else { - if(histogram[bin].x < v) { + if(bins[bin].x < v) { l = ++bin; } else { break; // break loop on equal comparator @@ -275,21 +366,22 @@ // If we found an exact bin match for value v, then just increment that bin's count. // Otherwise, we need to insert a new bin and trim the resulting histogram back to size. // A possible optimization here might be to set some threshold under which 'v' is just - // assumed to be equal to the closest bin -- if fabs(v-histogram[bin].x) < THRESHOLD, then - // just increment 'bin' - if(bin < myagg.nusedbins && histogram[bin].x == v) { - histogram[bin].y++; + // assumed to be equal to the closest bin -- if fabs(v-bins[bin].x) < THRESHOLD, then + // just increment 'bin'. This is not done now because we don't want to make any + // assumptions about the range of numeric data being analyzed. + if(bin < nusedbins && bins[bin].x == v) { + bins[bin].y++; } else { - for(int i = myagg.nusedbins; i > bin; i--) { - myagg.hist[i].x = myagg.hist[i-1].x; - myagg.hist[i].y = myagg.hist[i-1].y; + for(int i = nusedbins; i > bin; i--) { + bins[i].x = bins[i-1].x; + bins[i].y = bins[i-1].y; } - myagg.hist[bin].x = v; // new histogram bin for value 'v' - myagg.hist[bin].y = 1; // of height 1 unit + bins[bin].x = v; // new bins bin for value 'v' + bins[bin].y = 1; // of height 1 unit - // Trim the histogram down to the correct number of bins. - if(++myagg.nusedbins > myagg.nbins) { - trim(myagg); + // Trim the bins down to the correct number of bins. + if(++nusedbins > nbins) { + trim(); } } } @@ -297,19 +389,13 @@ // Trims a histogram down to 'nbins' bins by iteratively merging the closest bins. // If two pairs of bins are equally close to each other, decide uniformly at random which // pair to merge, based on a PRNG. - private void trim(StdAgg myagg) { - // Ensure that there are at least 3 histogram bins (because nbins>=2). - if(myagg.nusedbins <= myagg.nbins) { - return; - } - StdAgg.Coord[] histogram = myagg.hist; - - while(myagg.nusedbins > myagg.nbins) { - // Find the closest histogram bins in terms of x coordinates. Break ties randomly. - double smallestdiff = histogram[1].x - histogram[0].x; + private void trim() { + while(nusedbins > nbins) { + // Find the closest pair of bins in terms of x coordinates. Break ties randomly. + double smallestdiff = bins[1].x - bins[0].x; int smallestdiffloc = 0, smallestdiffcount = 1; - for(int i = 1; i < myagg.nusedbins-1; i++) { - double diff = histogram[i+1].x - histogram[i].x; + for(int i = 1; i < nusedbins-1; i++) { + double diff = bins[i+1].x - bins[i].x; if(diff < smallestdiff) { smallestdiff = diff; smallestdiffloc = i; @@ -323,99 +409,60 @@ // Merge the two closest bins into their average x location, weighted by their heights. // The height of the new bin is the sum of the heights of the old bins. - double d = histogram[smallestdiffloc].y + histogram[smallestdiffloc+1].y; - histogram[smallestdiffloc].x *= histogram[smallestdiffloc].y / d; - histogram[smallestdiffloc].x += histogram[smallestdiffloc+1].x / d * - histogram[smallestdiffloc+1].y; - histogram[smallestdiffloc].y = d; + double d = bins[smallestdiffloc].y + bins[smallestdiffloc+1].y; + bins[smallestdiffloc].x *= bins[smallestdiffloc].y / d; + bins[smallestdiffloc].x += bins[smallestdiffloc+1].x / d * + bins[smallestdiffloc+1].y; + bins[smallestdiffloc].y = d; // Shift the remaining bins left one position - for(int i = smallestdiffloc+1; i < myagg.nusedbins-1; i++) { - histogram[i].x = histogram[i+1].x; - histogram[i].y = histogram[i+1].y; + for(int i = smallestdiffloc+1; i < nusedbins-1; i++) { + bins[i].x = bins[i+1].x; + bins[i].y = bins[i+1].y; } - myagg.nusedbins--; + nusedbins--; } } - private boolean warned = false; - - @Override - public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { - assert (parameters.length == 2); - if(parameters[0] == null || parameters[1] == null) { - return; + // Gets an approximate quantile value from the current histogram. + public double quantile(double q) { + assert(bins != null && nusedbins > 0 && nbins > 0); + double sum = 0, csum = 0; + int b; + for(b = 0; b < nusedbins; b++) { + sum += bins[b].y; } - StdAgg myagg = (StdAgg) agg; - - // Parse out the number of histogram bins only once, if we haven't already done - // so before. We need at least 2 bins; otherwise, there is no point in creating - // a histogram. - if(myagg.nbins == 0) { - try { - myagg.nbins = PrimitiveObjectInspectorUtils.getInt(parameters[1], nbinsOI); - } catch(NumberFormatException e) { - throw new HiveException(getClass().getSimpleName() + " " + - StringUtils.stringifyException(e)); + for(b = 0; b < nusedbins; b++) { + csum += bins[b].y; + if(csum / sum >= q) { + if(b == 0) { + return bins[b].x; + } + csum -= bins[b].y; + double r = bins[b-1].x + (q*sum - csum) * (bins[b].x-bins[b-1].x)/(bins[b].y); + return r; } - if(myagg.nbins < 2) { - throw new HiveException(getClass().getSimpleName() + " needs nbins to be at least 2," - + " but you supplied " + myagg.nbins + "."); - } - - // allocate memory for the histogram - myagg.hist = new StdAgg.Coord[myagg.nbins+1]; // +1 is used for holding a temporary bin - for(int i = 0; i < myagg.nbins+1; i++) { - myagg.hist[i] = new StdAgg.Coord(); - } - myagg.nusedbins = 0; } - - // Process the current data point - Object p = parameters[0]; - if (p != null) { - double v = PrimitiveObjectInspectorUtils.getDouble(p, inputOI); - insert(myagg, v); - } + return -1; // for Xlint, code will never reach here } + // In preparation for a Hive merge() call, serialize the current histogram object into an + // ArrayList of DoubleWritable objects. This list is deserialized and merged by the + // NumericHistogram.merge() method + public ArrayList serialize() { + ArrayList result = new ArrayList(); - // Aggregation buffer definition and manipulation methods - static class StdAgg implements AggregationBuffer { - static class Coord implements Comparable { - double x; - double y; - - public int compareTo(Object other) { - Coord o = (Coord) other; - if(x < o.x) { - return -1; - } - if(x > o.x) { - return 1; - } - return 0; + // Return a single ArrayList where the first element is the number of bins bins, + // and subsequent elements represent bins (x,y) pairs. + result.add(new DoubleWritable(nbins)); + if(bins != null) { + for(int i = 0; i < nusedbins; i++) { + result.add(new DoubleWritable(bins[i].x)); + result.add(new DoubleWritable(bins[i].y)); } - }; + } - int nbins; // maximum number of histogram bins - int nusedbins; // number of histogram bins actually used - Coord[] hist; // histogram coordinates - }; - - @Override - public AggregationBuffer getNewAggregationBuffer() throws HiveException { - StdAgg result = new StdAgg(); - reset(result); return result; } - - @Override - public void reset(AggregationBuffer agg) throws HiveException { - StdAgg myagg = (StdAgg) agg; - myagg.nbins = 0; - myagg.nusedbins = 0; - myagg.hist = null; - } } } Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox.java (revision 0) @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableDoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFHistogramNumeric.NumericHistogram; + +/** + * Computes an approximate percentile (quantile) from an approximate histogram, for very + * large numbers of rows where the regular percentile() UDAF might run out of memory. + * + * The input is a single double value or an array of double values representing the quantiles + * requested. The output, corresponding to the input, is either an single double value or an + * array of doubles that are the quantile values. + */ +@Description(name = "percentile_approx", + value = "_FUNC_(expr, pc, [nb]) - For very large data, computes an approximate percentile " + + "value from a histogram, using the optional argument [nb] as the number of histogram" + + " bins to use. A higher value of nb results in a more accurate approximation, at " + + "the cost of higher memory usage.") +public class GenericUDAFPercentileApprox implements GenericUDAFResolver { + static final Log LOG = LogFactory.getLog(GenericUDAFPercentileApprox.class.getName()); + + @Override + public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException { + if (parameters.length != 2 && parameters.length != 3) { + throw new UDFArgumentTypeException(parameters.length - 1, + "Please specify either two or three arguments."); + } + + // Validate the first parameter, which is the expression to compute over. This should be a + // numeric primitive type. + if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { + throw new UDFArgumentTypeException(0, + "Only primitive type arguments are accepted but " + + parameters[0].getTypeName() + " was passed as parameter 1."); + } + switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + break; + default: + throw new UDFArgumentTypeException(0, + "Only numeric type arguments are accepted but " + + parameters[0].getTypeName() + " was passed as parameter 1."); + } + + // Validate the second parameter, which is either a solitary double or an array of doubles. + switch(parameters[1].getCategory()) { + case PRIMITIVE: + // Only a single double was passed as parameter 2, a single quantile is being requested + switch(((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory()) { + case FLOAT: + case DOUBLE: + break; + default: + throw new UDFArgumentTypeException(1, + "Only a float/double or float/double array argument is accepted as parameter 2, but " + + parameters[1].getTypeName() + " was passed instead."); + } + break; + + case LIST: + // An array was passed as parameter 2, make sure it's an array of primitives + if(((ListTypeInfo) parameters[1]).getListElementTypeInfo().getCategory() != + ObjectInspector.Category.PRIMITIVE) { + throw new UDFArgumentTypeException(1, + "A float/double array argument may be passed as parameter 2, but " + + parameters[1].getTypeName() + " was passed instead."); + } + // Now make sure it's an array of doubles or floats. We don't allow integer types here + // because percentile (really, quantile) values should generally be strictly between 0 and 1. + switch(((PrimitiveTypeInfo)((ListTypeInfo) parameters[1]).getListElementTypeInfo()). + getPrimitiveCategory()) { + case FLOAT: + case DOUBLE: + break; + default: + throw new UDFArgumentTypeException(1, + "A float/double array argument may be passed as parameter 2, but " + + parameters[1].getTypeName() + " was passed instead."); + } + break; + + default: + throw new UDFArgumentTypeException(1, + "Only a float/double or float/double array argument is accepted as parameter 2, but " + + parameters[1].getTypeName() + " was passed instead."); + } + + // If a third parameter has been specified, it should be an integer that specifies the number + // of histogram bins to use in the percentile approximation. + if(parameters.length == 3) { + if(parameters[2].getCategory() != ObjectInspector.Category.PRIMITIVE) { + throw new UDFArgumentTypeException(2, "Only a primitive argument is accepted as " + + "parameter 3, but " + parameters[2].getTypeName() + " was passed instead."); + } + switch(((PrimitiveTypeInfo) parameters[2]).getPrimitiveCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + break; + default: + throw new UDFArgumentTypeException(2, "Only an integer argument is accepted as " + + "parameter 3, but " + parameters[2].getTypeName() + " was passed instead."); + } + } + + return new GenericUDAFPercentileApproxEvaluator(); + } + + /** + * Construct a histogram using the algorithm described by Ben-Haim and Tom-Tov, and then + * use it to compute an approximate percentile value. + */ + public static class GenericUDAFPercentileApproxEvaluator extends GenericUDAFEvaluator { + // For PARTIAL1 and COMPLETE: ObjectInspectors for original data + private PrimitiveObjectInspector inputOI; + private ObjectInspector quantilesOI; + private PrimitiveObjectInspector nbinsOI; + + // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations (list of doubles) + private StandardListObjectInspector loi; + + // General error output control variable + private boolean warned = false; + + @Override + public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { + super.init(m, parameters); + + // init input object inspectors + if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) { + inputOI = (PrimitiveObjectInspector) parameters[0]; + quantilesOI = parameters[1]; + if(parameters.length > 2) { + nbinsOI = (PrimitiveObjectInspector) parameters[2]; + } + } else { + loi = (StandardListObjectInspector) parameters[0]; + } + + // Init output object inspectors. + // + // The return type for a partial aggregation is still a list of doubles, as in + // GenericUDAFHistogramNumeric, but we add on the percentile values requested to the + // end, and handle serializing/deserializing before we pass things on to the parent + // method. + // The return type for FINAL and COMPLETE is a full aggregation result, which is also a + // list of DoubleWritables with the requested quantile values. The only exception is + // when a single double, as opposed to an array of doubles, is passed as a parameter. In + // that case, just return a single double value. + if (m == Mode.PARTIAL1 || m == Mode.COMPLETE || + quantilesOI.getCategory() == ObjectInspector.Category.LIST) { + return ObjectInspectorFactory.getStandardListObjectInspector( + PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); + } else { + return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; + } + } + + @Override + public void merge(AggregationBuffer agg, Object partial) throws HiveException { + if(partial == null) { + return; + } + PercentileAggBuf myagg = (PercentileAggBuf) agg; + ArrayList partialHistogram = (ArrayList) loi.getList(partial); + + // remove requested quantiles from the head of the list + int nquantiles = (int) partialHistogram.get(0).get(); + if(nquantiles > 0) { + myagg.quantiles = new double[nquantiles]; + for(int i = 1; i <= nquantiles; i++) { + myagg.quantiles[i-1] = partialHistogram.get(i).get(); + } + partialHistogram.subList(0, nquantiles+1).clear(); + } + + // merge histograms + myagg.histogram.merge(partialHistogram); + } + + @Override + public Object terminatePartial(AggregationBuffer agg) throws HiveException { + PercentileAggBuf myagg = (PercentileAggBuf) agg; + ArrayList result = new ArrayList(); + + if(myagg.quantiles != null) { + result.add(new DoubleWritable(myagg.quantiles.length)); + for(int i = 0; i < myagg.quantiles.length; i++) { + result.add(new DoubleWritable(myagg.quantiles[i])); + } + } else { + result.add(new DoubleWritable(0)); + } + result.addAll(myagg.histogram.serialize()); + + return result; + } + + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { + assert (parameters.length == 2 || parameters.length == 3); + if(parameters[0] == null || parameters[1] == null) { + return; + } + PercentileAggBuf myagg = (PercentileAggBuf) agg; + + // Parse out the requested quantiles just once, if we haven't already done so before. + if(myagg.quantiles == null) { + if(quantilesOI.getCategory() == ObjectInspector.Category.LIST) { + // Multiple quantiles are requested + int nquantiles = ((StandardListObjectInspector) quantilesOI).getListLength(parameters[1]); + assert(nquantiles >= 1); + myagg.quantiles = new double[nquantiles]; + StandardListObjectInspector sloi = (StandardListObjectInspector) quantilesOI; + for(int i = 0; i < nquantiles; i++) { + myagg.quantiles[i] = PrimitiveObjectInspectorUtils.getDouble( + sloi.getListElement(parameters[1], i), + (PrimitiveObjectInspector) sloi.getListElementObjectInspector()); + } + } else { + // Just one quantile is requested + myagg.quantiles = new double[1]; + myagg.quantiles[0] = PrimitiveObjectInspectorUtils.getDouble(parameters[1], + (PrimitiveObjectInspector) quantilesOI); + } + + + // Validate requested quantiles, make sure they're in [0,1] + for(int i = 0; i < myagg.quantiles.length; i++) { + if(myagg.quantiles[i] <= 0 || myagg.quantiles[i] >= 1) { + throw new HiveException(getClass().getSimpleName() + " requires percentile values to " + + "lie strictly between 0 and 1, but you supplied " + + myagg.quantiles[i]); + + } + } + } + + // Parse out the number of histogram bins just once, if we haven't done so before. + if(myagg.histogram.nbins == 0) { + if(parameters.length == 3 && nbinsOI != null) { + // User has specified the number of histogram bins to use + myagg.histogram.nbins = PrimitiveObjectInspectorUtils.getInt(parameters[2], nbinsOI); + } else { + // Choose a nice default value. + myagg.histogram.nbins = 10000; + } + myagg.histogram.allocate(); + } + + // Get and process the current datum + double v = PrimitiveObjectInspectorUtils.getDouble(parameters[0], inputOI); + myagg.histogram.add(v); + } + + @Override + public Object terminate(AggregationBuffer agg) throws HiveException { + PercentileAggBuf myagg = (PercentileAggBuf) agg; + + if (myagg.histogram.nusedbins < 1) { // SQL standard - return null for zero elements + return null; + } else { + ArrayList result = new ArrayList(); + assert(myagg.quantiles != null); + for(int i = 0; i < myagg.quantiles.length; i++) { + result.add(new DoubleWritable(myagg.histogram.quantile(myagg.quantiles[i]))); + } + if(myagg.quantiles.length == 1) { + return result.get(0); + } else { + return result; + } + } + } + + + // Aggregation buffer methods. We wrap GenericUDAFHistogramNumeric's aggregation buffer + // inside our own, so that we can also store requested quantile values between calls + static protected class PercentileAggBuf implements AggregationBuffer { + NumericHistogram histogram; + double[] quantiles; // the quantiles requested + }; + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + PercentileAggBuf result = new PercentileAggBuf(); + result.histogram = new NumericHistogram(); + reset(result); + return result; + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + PercentileAggBuf result = (PercentileAggBuf) agg; + result.histogram.reset(); + result.quantiles = null; + } + } +}