Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (revision 1229670) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (working copy) @@ -124,6 +124,23 @@ } /** + * ****************** Test cases for Median ********************** + */ + /** + * @throws Throwable + */ + @Test + public void testMedianWithValidRange() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY,TEST_QUALIFIER); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long median = aClient.median(TEST_TABLE, ci, + scan); + assertEquals(8L, median); + } + + /** * **************************** ROW COUNT Test cases ******************* */ Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (revision 1229670) +++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (working copy) @@ -23,13 +23,20 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.AggregateProtocol; import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; @@ -362,4 +369,133 @@ return res; } + /** + * It helps locate the region with median for a given column whose weight + * is specified in an optional column. + * From individual regions, it obtains sum of values and sum of weights. + * @param tableName + * @param ci + * @param scan + * @return pair whose first element is a map between start row of the region + * and (sum of values, sum of weights) for the region, the second element is + * (sum of values, sum of weights) for all the regions chosen + * @throws Throwable + */ + private Pair>, List> + getMedianArgs(final byte[] tableName, + final ColumnInterpreter ci, final Scan scan) throws Throwable { + validateParameters(scan); + final NavigableMap> map = + new TreeMap>(Bytes.BYTES_COMPARATOR); + class StdCallback implements Batch.Callback> { + S sumVal = null, sumWeights = null; + + public Pair>, List> getMedianParams() { + List l = new ArrayList(); + l.add(sumVal); + l.add(sumWeights); + Pair>, List> p = + new Pair>, List>(map, l); + return p; + } + + @Override + public synchronized void update(byte[] region, byte[] row, List result) { + map.put(row, result); + sumVal = ci.add(sumVal, result.get(0)); + sumWeights = ci.add(sumWeights, result.get(1)); + } + } + StdCallback stdCallback = new StdCallback(); + HTable table = new HTable(conf, tableName); + table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan + .getStopRow(), + new Batch.Call>() { + @Override + public List call(AggregateProtocol instance) + throws IOException { + return instance.getMedian(ci, scan); + } + + }, stdCallback); + return stdCallback.getMedianParams(); + } + + /** + * This is the client side interface/handler for calling the median method for a + * given cf-cq combination. This method collects the necessary parameters + * to compute the median and returns the median. + * @param tableName + * @param ci + * @param scan + * @return R the median + * @throws Throwable + */ + public R median(final byte[] tableName, ColumnInterpreter ci, + Scan scan) throws Throwable { + Pair>, List> p = getMedianArgs(tableName, ci, scan); + byte[] startRow = null; + byte[] colFamily = scan.getFamilies()[0]; + NavigableSet quals = scan.getFamilyMap().get(colFamily); + NavigableMap> map = p.getFirst(); + S sumVal = p.getSecond().get(0); + S sumWeights = p.getSecond().get(1); + double halfSumVal = ci.divideForAvg(sumVal, 2L); + double movingSumVal = 0; + boolean weighted = false; + if (quals.size() > 1) { + weighted = true; + halfSumVal = ci.divideForAvg(sumWeights, 2L); + } + + for (Map.Entry> entry : map.entrySet()) { + S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0); + double newSumVal = movingSumVal + ci.divideForAvg(s, 1L); + if (newSumVal > halfSumVal) break; // we found the region with the median + movingSumVal = newSumVal; + startRow = entry.getKey(); + } + // scan the region with median and find it + Scan scan2 = new Scan(scan); + // inherit stop row from method parameter + scan2.setStartRow(startRow); + HTable table = new HTable(conf, tableName); + int cacheSize = scan2.getCaching(); + if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) { + scan2.setCacheBlocks(true); + cacheSize = 5; + scan2.setCaching(cacheSize); + } + ResultScanner scanner = table.getScanner(scan2); + Result[] results = null; + byte[] qualifier = quals.pollFirst(); + // qualifier for the weight column + byte[] weightQualifier = weighted ? quals.pollLast() : qualifier; + R value = null; + try { + do { + results = scanner.next(cacheSize); + if (results != null && results.length > 0) { + for (int i = 0; i < results.length; i++) { + Result r = results[i]; + // retrieve weight + KeyValue kv = r.getColumnLatest(colFamily, weightQualifier); + R newValue = ci.getValue(colFamily, weightQualifier, kv); + S s = ci.castToReturnType(newValue); + double newSumVal = movingSumVal + ci.divideForAvg(s, 1L); + // see if we have moved past the median + if (newSumVal > halfSumVal) { + return value; + } + movingSumVal = newSumVal; + kv = r.getColumnLatest(colFamily, qualifier); + value = ci.getValue(colFamily, qualifier, kv); + } + } + } while (results != null && results.length > 0); + } finally { + scanner.close(); + } + return null; + } } Index: src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java (revision 1229670) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java (working copy) @@ -126,4 +126,19 @@ Pair, Long> getStd(ColumnInterpreter ci, Scan scan) throws IOException; + /** + * Gives a List containing sum of values and sum of weights. + * It is computed for the combination of column + * family and column qualifier(s) in the given row range as defined in the + * Scan object. In its current implementation, it takes one column family and + * two column qualifiers. The first qualifier is for values column and + * the second qualifier (optional) is for weight column. + * @param ci + * @param scan + * @return Pair + * @throws IOException + */ + List getMedian(ColumnInterpreter ci, Scan scan) + throws IOException; + } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java (revision 1229670) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java (working copy) @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.NavigableSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -232,4 +233,45 @@ return p; } + @Override + public List getMedian(ColumnInterpreter ci, Scan scan) + throws IOException { + S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null; + + InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()) + .getRegion().getScanner(scan); + byte[] colFamily = scan.getFamilies()[0]; + NavigableSet quals = scan.getFamilyMap().get(colFamily); + byte[] valQualifier = quals.pollFirst(); + // if weighted median is requested, get qualifier for the weight column + byte[] weightQualifier = quals.size() > 1 ? quals.pollLast() : null; + List results = new ArrayList(); + + boolean hasMoreRows = false; + try { + do { + tempVal = null; + tempWeight = null; + hasMoreRows = scanner.next(results); + for (KeyValue kv : results) { + tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily, + valQualifier, kv))); + if (weightQualifier != null) { + tempWeight = ci.add(tempWeight, + ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv))); + } + } + results.clear(); + sumVal = ci.add(sumVal, tempVal); + sumWeights = ci.add(sumWeights, tempWeight); + } while (hasMoreRows); + } finally { + scanner.close(); + } + List l = new ArrayList(); + l.add(sumVal); + l.add(sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights); + return l; + } + }