Index: ql/src/java/org/apache/hadoop/hive/ql/udf/UDAFPercentile.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/udf/UDAFPercentile.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/udf/UDAFPercentile.java (revision 0) @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.udf; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDAF; +import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; +import org.apache.hadoop.io.LongWritable; + +@Description(name = "percentile", + value = "_FUNC_(expr, pc) - Returns the percentile of expr at pc (range: [0,1])") +public class UDAFPercentile extends UDAF { + + static public class MaxLongEvaluator implements UDAFEvaluator { + + static public class State { + HashMap counts; + double percent; + } + + static public class MyComparator implements Comparator> { + @Override + public int compare(Map.Entry o1, Map.Entry o2) { + return o1.getKey().compareTo(o2.getKey()); + } + } + + State state; + + public MaxLongEvaluator() { + state = new State(); + } + + public void init() { + if (state.counts != null) { + state.counts.clear(); + } + } + + private static void increment(State s, Long o, long i) { + if (s.counts == null) { + s.counts = new HashMap(); + } + LongWritable count = s.counts.get(o); + if (count == null) { + s.counts.put(o, new LongWritable(i)); + } else { + count.set(count.get() + i); + } + } + + public boolean iterate(Long o, double percent) { + state.percent = percent; + increment(state, o, 1); + return true; + } + + public State terminatePartial() { + return state; + } + + public boolean merge(State other) { + state.percent = other.percent; + for (Map.Entry e: other.counts.entrySet()) { + increment(state, e.getKey(), e.getValue().get()); + } + return true; + } + + + private static double getPercentile(List> entriesList, double position) { + // We may need to do linear interpolation to get the exact percentile + long lower = (long)Math.floor(position); + long higher = (long)Math.ceil(position); + + // Linear search since this won't take much time from the total execution anyway + // lower has the range of [0 .. total-1] + // The first entry with accumulated count (lower+1) corresponds to the lower position. + int i = 0; + while (entriesList.get(i).getValue().get() < lower + 1) { + i++; + } + + long lowerKey = entriesList.get(i).getKey(); + if (higher == lower) { + // no interpolation needed because position does not have a fraction + return lowerKey; + } + + if (entriesList.get(i).getValue().get() < higher + 1) { + i++; + } + long higherKey = entriesList.get(i).getKey(); + + if (higherKey == lowerKey) { + // no interpolation needed because lower position and higher position has the same key + return lowerKey; + } + + // Linear interpolation to get the exact percentile + return (higher - position) * lowerKey + (position - lower) * higherKey; + } + + public Double terminate() { + + // No input data + if (state.counts == null) { + return null; + } + + // Get all items into an array and sort them + Set> entries = state.counts.entrySet(); + List> entriesList = new ArrayList>(entries); + Collections.sort(entriesList, new MyComparator()); + + // accumulate the counts + long total = 0; + for (int i = 0; i < entriesList.size(); i++) { + LongWritable count = entriesList.get(i).getValue(); + total += count.get(); + count.set(total); + } + + // max is the 1.0 percentile + long max = total - 1; + double position = max * state.percent; + return getPercentile(entriesList, position); + } + } + +} \ No newline at end of file