Index: ql/src/test/results/clientpositive/udaf_histogram_numeric.q.out =================================================================== --- ql/src/test/results/clientpositive/udaf_histogram_numeric.q.out (revision 0) +++ ql/src/test/results/clientpositive/udaf_histogram_numeric.q.out (revision 0) @@ -0,0 +1,36 @@ +PREHOOK: query: SELECT histogram_numeric(cast(substr(src.value,5) AS double), 2) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/Users/mlahiri/hive/build/ql/scratchdir/hive_2010-06-15_14-57-44_755_8104146030986077342/10000 +POSTHOOK: query: SELECT histogram_numeric(cast(substr(src.value,5) AS double), 2) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/Users/mlahiri/hive/build/ql/scratchdir/hive_2010-06-15_14-57-44_755_8104146030986077342/10000 +[{"x":135.0284552845532,"y":246.0},{"x":381.39370078740143,"y":254.0}] +PREHOOK: query: SELECT histogram_numeric(cast(substr(src.value,5) AS double), 3) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/Users/mlahiri/hive/build/ql/scratchdir/hive_2010-06-15_14-57-49_030_4264110202846137295/10000 +POSTHOOK: query: SELECT histogram_numeric(cast(substr(src.value,5) AS double), 3) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/Users/mlahiri/hive/build/ql/scratchdir/hive_2010-06-15_14-57-49_030_4264110202846137295/10000 +[{"x":96.7349397590361,"y":166.0},{"x":257.14970059880255,"y":167.0},{"x":425.6826347305388,"y":167.0}] +PREHOOK: query: SELECT histogram_numeric(cast(substr(src.value,5) AS double), 20) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/Users/mlahiri/hive/build/ql/scratchdir/hive_2010-06-15_14-57-52_881_7930370768216690364/10000 +POSTHOOK: query: SELECT histogram_numeric(cast(substr(src.value,5) AS double), 20) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/Users/mlahiri/hive/build/ql/scratchdir/hive_2010-06-15_14-57-52_881_7930370768216690364/10000 +[{"x":9.761904761904763,"y":21.0},{"x":33.84210526315789,"y":19.0},{"x":62.75000000000001,"y":20.0},{"x":90.90322580645162,"y":31.0},{"x":122.91666666666667,"y":24.0},{"x":146.33333333333334,"y":21.0},{"x":170.70967741935485,"y":31.0},{"x":194.3571428571428,"y":28.0},{"x":214.84615384615384,"y":26.0},{"x":235.08695652173907,"y":23.0},{"x":257.80000000000007,"y":15.0},{"x":281.0333333333333,"y":30.0},{"x":298.0,"y":1.0},{"x":313.0000000000001,"y":29.0},{"x":339.5925925925926,"y":27.0},{"x":372.49999999999983,"y":24.0},{"x":402.23684210526324,"y":38.0},{"x":430.6896551724138,"y":29.0},{"x":462.32352941176464,"y":34.0},{"x":487.72413793103453,"y":29.0}] +PREHOOK: query: SELECT histogram_numeric(cast(substr(src.value,5) AS double), 200) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: file:/Users/mlahiri/hive/build/ql/scratchdir/hive_2010-06-15_14-57-58_020_390223944446262914/10000 +POSTHOOK: query: SELECT histogram_numeric(cast(substr(src.value,5) AS double), 200) FROM src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: file:/Users/mlahiri/hive/build/ql/scratchdir/hive_2010-06-15_14-57-58_020_390223944446262914/10000 +[{"x":0.0,"y":3.0},{"x":2.0,"y":1.0},{"x":4.75,"y":4.0},{"x":8.0,"y":1.0},{"x":9.5,"y":2.0},{"x":11.666666666666666,"y":3.0},{"x":15.0,"y":2.0},{"x":17.666666666666664,"y":3.0},{"x":19.5,"y":2.0},{"x":24.0,"y":2.0},{"x":26.333333333333336,"y":3.0},{"x":28.0,"y":1.0},{"x":30.0,"y":1.0},{"x":33.0,"y":1.0},{"x":34.75,"y":4.0},{"x":37.0,"y":2.0},{"x":41.666666666666664,"y":3.0},{"x":43.5,"y":2.0},{"x":47.0,"y":1.0},{"x":51.0,"y":2.0},{"x":53.5,"y":2.0},{"x":57.666666666666664,"y":3.0},{"x":64.5,"y":2.0},{"x":66.66666666666666,"y":3.0},{"x":69.75,"y":4.0},{"x":72.0,"y":2.0},{"x":74.0,"y":1.0},{"x":76.33333333333333,"y":3.0},{"x":78.0,"y":1.0},{"x":80.0,"y":1.0},{"x":82.0,"y":1.0},{"x":83.5,"y":4.0},{"x":85.5,"y":2.0},{"x":87.0,"y":1.0},{"x":90.0,"y":3.0},{"x":92.0,"y":1.0},{"x":95.33333333333333,"y":3.0},{"x":97.5,"y":4.0},{"x":100.0,"y":2.0},{"x":103.5,"y":4.0},{"x":105.0,"y":1.0},{"x":111.0,"y":1.0},{"x":113.33333333333333,"y":3.0},{"x":116.0,"y":1.0},{"x":118.0,"y":2.0},{"x":119.4,"y":5.0},{"x":125.33333333333333,"y":3.0},{"x":128.4,"y":5.0},{"x":131.0,"y":1.0},{"x":133.66666666666666,"y":3.0},{"x":136.66666666666666,"y":3.0},{"x":138.0,"y":4.0},{"x":143.0,"y":1.0},{"x":145.66666666666666,"y":3.0},{"x":149.33333333333331,"y":3.0},{"x":152.33333333333334,"y":3.0},{"x":155.5,"y":2.0},{"x":157.5,"y":2.0},{"x":160.0,"y":1.0},{"x":162.5,"y":2.0},{"x":164.5,"y":4.0},{"x":166.75,"y":4.0},{"x":168.8,"y":5.0},{"x":170.0,"y":1.0},{"x":172.0,"y":2.0},{"x":174.5,"y":4.0},{"x":176.33333333333331,"y":3.0},{"x":178.0,"y":1.0},{"x":179.33333333333331,"y":3.0},{"x":181.0,"y":1.0},{"x":183.0,"y":1.0},{"x":186.75,"y":4.0},{"x":189.0,"y":1.0},{"x":190.66666666666666,"y":3.0},{"x":192.75,"y":4.0},{"x":194.0,"y":1.0},{"x":195.33333333333331,"y":3.0},{"x":197.0,"y":2.0},{"x":199.4,"y":5.0},{"x":201.0,"y":1.0},{"x":202.66666666666669,"y":3.0},{"x":205.0,"y":2.0},{"x":207.0,"y":2.0},{"x":208.40000000000003,"y":5.0},{"x":213.33333333333331,"y":3.0},{"x":216.0,"y":2.0},{"x":217.33333333333331,"y":3.0},{"x":219.0,"y":2.0},{"x":221.33333333333331,"y":3.0},{"x":223.5,"y":4.0},{"x":226.0,"y":1.0},{"x":228.66666666666663,"y":3.0},{"x":230.0,"y":5.0},{"x":233.0,"y":2.0},{"x":235.0,"y":1.0},{"x":237.5,"y":4.0},{"x":239.0,"y":2.0},{"x":241.66666666666669,"y":3.0},{"x":244.0,"y":1.0},{"x":247.5,"y":2.0},{"x":249.0,"y":1.0},{"x":252.0,"y":1.0},{"x":255.5,"y":4.0},{"x":257.5,"y":2.0},{"x":260.0,"y":1.0},{"x":262.5,"y":2.0},{"x":265.3333333333333,"y":3.0},{"x":272.6,"y":5.0},{"x":274.5,"y":2.0},{"x":277.3333333333333,"y":6.0},{"x":280.0,"y":2.0},{"x":281.5,"y":4.0},{"x":283.5,"y":2.0},{"x":285.0,"y":1.0},{"x":286.5,"y":2.0},{"x":288.3333333333333,"y":3.0},{"x":291.5,"y":2.0},{"x":296.0,"y":1.0},{"x":298.0,"y":3.0},{"x":302.0,"y":1.0},{"x":305.5,"y":2.0},{"x":307.3333333333333,"y":3.0},{"x":309.0,"y":2.0},{"x":310.75,"y":4.0},{"x":315.75,"y":4.0},{"x":317.6,"y":5.0},{"x":321.5,"y":4.0},{"x":323.0,"y":1.0},{"x":325.0,"y":2.0},{"x":327.0,"y":3.0},{"x":331.3333333333333,"y":3.0},{"x":333.0,"y":2.0},{"x":335.5,"y":2.0},{"x":338.5,"y":2.0},{"x":341.66666666666663,"y":3.0},{"x":344.3333333333333,"y":3.0},{"x":348.0,"y":5.0},{"x":351.0,"y":1.0},{"x":353.0,"y":2.0},{"x":356.0,"y":1.0},{"x":360.0,"y":1.0},{"x":362.0,"y":1.0},{"x":364.5,"y":2.0},{"x":366.66666666666663,"y":3.0},{"x":368.75,"y":4.0},{"x":373.5,"y":2.0},{"x":375.0,"y":1.0},{"x":377.5,"y":2.0},{"x":379.0,"y":1.0},{"x":382.0,"y":2.0},{"x":384.0,"y":3.0},{"x":386.0,"y":1.0},{"x":389.0,"y":1.0},{"x":392.0,"y":1.0},{"x":393.5,"y":2.0},{"x":395.6,"y":5.0},{"x":397.0,"y":2.0},{"x":399.0,"y":2.0},{"x":400.0,"y":1.0},{"x":401.16666666666663,"y":6.0},{"x":403.40000000000003,"y":5.0},{"x":406.20000000000005,"y":5.0},{"x":409.0,"y":3.0},{"x":411.0,"y":1.0},{"x":413.5,"y":4.0},{"x":417.0,"y":3.0},{"x":418.5,"y":2.0},{"x":421.0,"y":1.0},{"x":424.0,"y":2.0},{"x":427.0,"y":1.0},{"x":429.6,"y":5.0},{"x":431.25,"y":4.0},{"x":435.5,"y":2.0},{"x":437.75,"y":4.0},{"x":439.0,"y":2.0},{"x":443.5,"y":2.0},{"x":446.0,"y":1.0},{"x":448.5,"y":2.0},{"x":452.5,"y":2.0},{"x":454.24999999999994,"y":4.0},{"x":457.66666666666663,"y":3.0},{"x":459.33333333333337,"y":3.0},{"x":462.5,"y":4.0},{"x":466.0,"y":3.0},{"x":467.80000000000007,"y":5.0},{"x":469.16666666666663,"y":6.0},{"x":472.0,"y":1.0},{"x":475.0,"y":1.0},{"x":477.0,"y":1.0},{"x":478.33333333333326,"y":3.0},{"x":480.25,"y":4.0},{"x":482.5,"y":2.0},{"x":484.5,"y":2.0},{"x":487.0,"y":1.0},{"x":489.2,"y":5.0},{"x":491.66666666666663,"y":3.0},{"x":493.0,"y":1.0},{"x":494.5,"y":2.0},{"x":496.0,"y":1.0},{"x":497.75,"y":4.0}] Index: ql/src/test/results/clientpositive/show_functions.q.out =================================================================== --- ql/src/test/results/clientpositive/show_functions.q.out (revision 953449) +++ ql/src/test/results/clientpositive/show_functions.q.out (working copy) @@ -55,6 +55,7 @@ get_json_object hash hex +histogram_numeric hour if in Index: ql/src/test/queries/clientpositive/udaf_histogram_numeric.q =================================================================== --- ql/src/test/queries/clientpositive/udaf_histogram_numeric.q (revision 0) +++ ql/src/test/queries/clientpositive/udaf_histogram_numeric.q (revision 0) @@ -0,0 +1,5 @@ + +SELECT histogram_numeric(cast(substr(src.value,5) AS double), 2) FROM src; +SELECT histogram_numeric(cast(substr(src.value,5) AS double), 3) FROM src; +SELECT histogram_numeric(cast(substr(src.value,5) AS double), 20) FROM src; +SELECT histogram_numeric(cast(substr(src.value,5) AS double), 200) FROM src; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (revision 953449) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (working copy) @@ -127,6 +127,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFHistogramNumeric; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver; @@ -342,6 +343,8 @@ registerGenericUDAF("var_pop", new GenericUDAFVariance()); registerGenericUDAF("var_samp", new GenericUDAFVarianceSample()); + registerGenericUDAF("histogram_numeric", new GenericUDAFHistogramNumeric()); + registerUDAF("percentile", UDAFPercentile.class); // Generic UDFs Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFHistogramNumeric.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFHistogramNumeric.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFHistogramNumeric.java (revision 0) @@ -0,0 +1,421 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.udf.generic; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Arrays; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableDoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.util.StringUtils; + +/** + * Computes an approximate histogram of a numerical column using a user-specified number of bins. + * + * The output is an array of (x,y) pairs as Hive struct objects that represents the histogram's + * bin centers and heights. + */ +@Description(name = "histogram_numeric", + value = "_FUNC_(expr, nb) - Computes a histogram on numeric 'expr' using nb bins.") +public class GenericUDAFHistogramNumeric implements GenericUDAFResolver { + // class static variables + static final Log LOG = LogFactory.getLog(GenericUDAFHistogramNumeric.class.getName()); + + @Override + public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException { + if (parameters.length != 2) { + throw new UDFArgumentTypeException(parameters.length - 1, + "Please specify exactly two arguments."); + } + + // validate the first parameter, which is the expression to compute over + if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { + throw new UDFArgumentTypeException(0, + "Only primitive type arguments are accepted but " + + parameters[0].getTypeName() + " was passed as parameter 1."); + } + switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + break; + case STRING: + case BOOLEAN: + default: + throw new UDFArgumentTypeException(0, + "Only numeric type arguments are accepted but " + + parameters[0].getTypeName() + " was passed as parameter 1."); + } + + // validate the second parameter, which is the number of histogram bins + if (parameters[1].getCategory() != ObjectInspector.Category.PRIMITIVE) { + throw new UDFArgumentTypeException(1, + "Only primitive type arguments are accepted but " + + parameters[1].getTypeName() + " was passed as parameter 2."); + } + if( ((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory() + != PrimitiveObjectInspector.PrimitiveCategory.INT) { + throw new UDFArgumentTypeException(1, + "Only an integer argument is accepted as parameter 2, but " + + parameters[1].getTypeName() + " was passed instead."); + } + + return new GenericUDAFHistogramNumericEvaluator(); + } + + /** + * Construct a histogram using the algorithm described by Ben-Haim and Tom-Tov. + * + * The algorithm is a heuristic adapted from the following paper: + * Yael Ben-Haim and Elad Tom-Tov, "A streaming parallel decision tree algorithm", + * J. Machine Learning Research 11 (2010), pp. 849--872. Although there are no approximation + * guarantees, it appears to work well with adequate data and a large (e.g., 20-80) number + * of histogram bins. + */ + public static class GenericUDAFHistogramNumericEvaluator extends GenericUDAFEvaluator { + + // For PARTIAL1 and COMPLETE: ObjectInspectors for original data + private PrimitiveObjectInspector inputOI; + private PrimitiveObjectInspector nbinsOI; + + // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations (list of doubles) + private StandardListObjectInspector loi; + + // A PRNG for breaking ties in histogram bin merging + Random prng; + + @Override + public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { + super.init(m, parameters); + + // init the RNG for breaking ties in histogram merging. A fixed seed is specified here + // to aid testing, but can be eliminated to use a time-based seed (and have non-deterministic + // results). + prng = new Random(31183); + + // init input object inspectors + if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) { + assert(parameters.length == 2); + inputOI = (PrimitiveObjectInspector) parameters[0]; + nbinsOI = (PrimitiveObjectInspector) parameters[1]; + } else { + loi = (StandardListObjectInspector) parameters[0]; + } + + // init output object inspectors + if (m == Mode.PARTIAL1 || m == Mode.PARTIAL2) { + // The output of a partial aggregation is a list of doubles representing the + // histogram being constructed. The first element in the list is the user-specified + // number of bins in the histogram, and the histogram itself is represented as (x,y) + // pairs following the first element, so the list length should *always* be odd. + return ObjectInspectorFactory.getStandardListObjectInspector( + PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); + } else { + // The output of FINAL and COMPLETE is a full aggregation, which is a + // list of DoubleWritable structs that represent the final histogram as + // (x,y) pairs of bin centers and heights. + ArrayList foi = new ArrayList(); + foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); + foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); + ArrayList fname = new ArrayList(); + fname.add("x"); + fname.add("y"); + + return ObjectInspectorFactory.getStandardListObjectInspector( + ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi) ); + } + } + + @Override + public Object terminatePartial(AggregationBuffer agg) throws HiveException { + ArrayList partialResult = new ArrayList(); + StdAgg myagg = (StdAgg) agg; + + // Return a single ArrayList where the first element is the number of histogram bins, + // and subsequent elements represent histogram (x,y) pairs. + partialResult.add(new DoubleWritable(myagg.nbins)); + if(myagg.hist != null) { + for(int i = 0; i < myagg.nusedbins; i++) { + partialResult.add(new DoubleWritable(myagg.hist[i].x)); + partialResult.add(new DoubleWritable(myagg.hist[i].y)); + } + } + + return partialResult; + } + + + @Override + public Object terminate(AggregationBuffer agg) throws HiveException { + StdAgg myagg = (StdAgg) agg; + + if (myagg.nusedbins < 1) { // SQL standard - return null for zero elements + return null; + } else { + ArrayList result = new ArrayList(); + for(int i = 0; i < myagg.nusedbins; i++) { + DoubleWritable[] bin = new DoubleWritable[2]; + bin[0] = new DoubleWritable(myagg.hist[i].x); + bin[1] = new DoubleWritable(myagg.hist[i].y); + result.add(bin); + } + return result; + } + } + + @Override + public void merge(AggregationBuffer agg, Object partial) throws HiveException { + if(partial == null) { + return; + } + ArrayList partialHistogram = (ArrayList) loi.getList(partial); + StdAgg myagg = (StdAgg) agg; + + if(myagg.nbins == 0 || myagg.nusedbins == 0) { + // The aggregation buffer has nothing in it, so just copy over 'partial' into myagg + // by deserializing the ArrayList of (x,y) pairs into an array of Coord objects + myagg.nbins = (int) ((DoubleWritable) partialHistogram.get(0)).get(); + myagg.nusedbins = (partialHistogram.size()-1)/2; + myagg.hist = new StdAgg.Coord[myagg.nbins+1]; // +1 to hold a temporary bin for insert() + for(int i = 1; i < partialHistogram.size(); i+=2) { + myagg.hist[(i-1)/2] = new StdAgg.Coord(); + myagg.hist[(i-1)/2].x = ((DoubleWritable) partialHistogram.get(i)).get(); + myagg.hist[(i-1)/2].y = ((DoubleWritable) partialHistogram.get(i+1)).get(); + } + } else { + // The aggregation buffer already contains a partial histogram. Therefore, we need + // to merge histograms using Algorithm #2 from the Ben-Haim and Tom-Tov paper. + StdAgg.Coord[] tmp_histogram = new StdAgg.Coord[myagg.nusedbins + + (partialHistogram.size()-1)/2]; + for(int j = 0; j < tmp_histogram.length; j++) { + tmp_histogram[j] = new StdAgg.Coord(); + } + + // Copy all the histogram bins from 'myagg' and 'partial' into an overstuffed histogram + int i; + for(i = 0; i < myagg.nusedbins; i++) { + tmp_histogram[i].x = myagg.hist[i].x; + tmp_histogram[i].y = myagg.hist[i].y; + } + for(int j = 1; j < partialHistogram.size(); j+=2, i++) { + tmp_histogram[i].x = ((DoubleWritable) partialHistogram.get(j)).get(); + tmp_histogram[i].y = ((DoubleWritable) partialHistogram.get(j+1)).get(); + } + Arrays.sort(tmp_histogram); + + // Now trim the overstuffed histogram down to the correct number of bins + myagg.hist = tmp_histogram; + myagg.nusedbins += (partialHistogram.size()-1)/2; + trim(myagg); + } + } + + // Algorithm #1 from the Ben-Haim and Tom-Tov paper: histogram update procedure for a single + // new data point 'v'. + private void insert(StdAgg myagg, double v) { + StdAgg.Coord[] histogram = myagg.hist; + + // Binary search to find the closest bucket that v should go into. + // 'bin' should be interpreted as the bin to shift right in order to accomodate + // v. As a result, bin is in the range [0,N], where N means that the value v is + // greater than all the N bins currently in the histogram. It is also possible that + // a bucket centered at 'v' already exists, so this must be checked in the next step. + int bin = 0; + for(int l=0, r=myagg.nusedbins; l < r; ) { + bin = (l+r)/2; + if(histogram[bin].x > v) { + r = bin; + } else { + if(histogram[bin].x < v) { + l = ++bin; + } else { + break; // break loop on equal comparator + } + } + } + + // If we found an exact bin match for value v, then just increment that bin's count. + // Otherwise, we need to insert a new bin and trim the resulting histogram back to size. + // A possible optimization here might be to set some threshold under which 'v' is just + // assumed to be equal to the closest bin -- if fabs(v-histogram[bin].x) < THRESHOLD, then + // just increment 'bin' + if(bin < myagg.nusedbins && histogram[bin].x == v) { + histogram[bin].y++; + } else { + for(int i = myagg.nusedbins; i > bin; i--) { + myagg.hist[i].x = myagg.hist[i-1].x; + myagg.hist[i].y = myagg.hist[i-1].y; + } + myagg.hist[bin].x = v; // new histogram bin for value 'v' + myagg.hist[bin].y = 1; // of height 1 unit + + // Trim the histogram down to the correct number of bins. + if(++myagg.nusedbins > myagg.nbins) { + trim(myagg); + } + } + } + + // Trims a histogram down to 'nbins' bins by iteratively merging the closest bins. + // If two pairs of bins are equally close to each other, decide uniformly at random which + // pair to merge, based on a PRNG. + private void trim(StdAgg myagg) { + // Ensure that there are at least 3 histogram bins (because nbins>=2). + if(myagg.nusedbins <= myagg.nbins) { + return; + } + StdAgg.Coord[] histogram = myagg.hist; + + while(myagg.nusedbins > myagg.nbins) { + // Find the closest histogram bins in terms of x coordinates. Break ties randomly. + double smallestdiff = histogram[1].x - histogram[0].x; + int smallestdiffloc = 0, smallestdiffcount = 1; + for(int i = 1; i < myagg.nusedbins-1; i++) { + double diff = histogram[i+1].x - histogram[i].x; + if(diff < smallestdiff) { + smallestdiff = diff; + smallestdiffloc = i; + smallestdiffcount = 1; + } else { + if(diff == smallestdiff && prng.nextDouble() <= (1.0/++smallestdiffcount) ) { + smallestdiffloc = i; + } + } + } + + // Merge the two closest bins into their average x location, weighted by their heights. + // The height of the new bin is the sum of the heights of the old bins. + double d = histogram[smallestdiffloc].y + histogram[smallestdiffloc+1].y; + histogram[smallestdiffloc].x *= histogram[smallestdiffloc].y / d; + histogram[smallestdiffloc].x += histogram[smallestdiffloc+1].x / d * + histogram[smallestdiffloc+1].y; + histogram[smallestdiffloc].y = d; + + // Shift the remaining bins left one position + for(int i = smallestdiffloc+1; i < myagg.nusedbins-1; i++) { + histogram[i].x = histogram[i+1].x; + histogram[i].y = histogram[i+1].y; + } + myagg.nusedbins--; + } + } + + private boolean warned = false; + + @Override + public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { + assert (parameters.length == 2); + if(parameters[0] == null || parameters[1] == null) { + return; + } + StdAgg myagg = (StdAgg) agg; + + // Parse out the number of histogram bins only once, if we haven't already done + // so before. We need at least 2 bins; otherwise, there is no point in creating + // a histogram. + if(myagg.nbins == 0) { + try { + myagg.nbins = PrimitiveObjectInspectorUtils.getInt(parameters[1], nbinsOI); + } catch(NumberFormatException e) { + throw new HiveException(getClass().getSimpleName() + " " + + StringUtils.stringifyException(e)); + } + if(myagg.nbins < 2) { + throw new HiveException(getClass().getSimpleName() + " needs nbins to be at least 2," + + " but you supplied " + myagg.nbins + "."); + } + + // allocate memory for the histogram + myagg.hist = new StdAgg.Coord[myagg.nbins+1]; // +1 is used for holding a temporary bin + for(int i = 0; i < myagg.nbins+1; i++) { + myagg.hist[i] = new StdAgg.Coord(); + } + myagg.nusedbins = 0; + } + + // Process the current data point + Object p = parameters[0]; + if (p != null) { + double v = PrimitiveObjectInspectorUtils.getDouble(p, inputOI); + insert(myagg, v); + } + } + + + // Aggregation buffer definition and manipulation methods + static class StdAgg implements AggregationBuffer { + static class Coord implements Comparable { + double x; + double y; + + public int compareTo(Object other) { + Coord o = (Coord) other; + if(x < o.x) { + return -1; + } + if(x > o.x) { + return 1; + } + return 0; + } + }; + + int nbins; // maximum number of histogram bins + int nusedbins; // number of histogram bins actually used + Coord[] hist; // histogram coordinates + }; + + @Override + public AggregationBuffer getNewAggregationBuffer() throws HiveException { + StdAgg result = new StdAgg(); + reset(result); + return result; + } + + @Override + public void reset(AggregationBuffer agg) throws HiveException { + StdAgg myagg = (StdAgg) agg; + myagg.nbins = 0; + myagg.nusedbins = 0; + myagg.hist = null; + } + } +}