Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (revision 1481258) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (working copy) @@ -98,8 +98,34 @@ * The caller is supposed to handle the exception as they are thrown * & propagated to it. */ + public R max( + final byte[] tableName, final ColumnInterpreter ci, final Scan scan) + throws Throwable { + HTable table = null; + try { + table = new HTable(conf, tableName); + return max(table, ci, scan); + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * It gives the maximum value of a column for a given column family for the + * given range. In case qualifier is null, a max of all values for the given + * family is returned. + * @param table + * @param ci + * @param scan + * @return max val + * @throws Throwable + * The caller is supposed to handle the exception as they are thrown + * & propagated to it. + */ public - R max(final byte[] tableName, final ColumnInterpreter ci, + R max(final HTable table, final ColumnInterpreter ci, final Scan scan) throws Throwable { final AggregateArgument requestArg = validateArgAndGetPB(scan, ci); class MaxCallBack implements Batch.Callback { @@ -115,34 +141,26 @@ } } MaxCallBack aMaxCallBack = new MaxCallBack(); - HTable table = null; - try { - table = new HTable(conf, tableName); - table.coprocessorService(AggregateService.class, scan.getStartRow(), - scan.getStopRow(), new Batch.Call() { - @Override - public R call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback rpcCallback = - new BlockingRpcCallback(); - instance.getMax(controller, requestArg, rpcCallback); - AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - if (response.getFirstPartCount() > 0) { - ByteString b = response.getFirstPart(0); - Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b); - return ci.getCellValueFromProto(q); - } - return null; + table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), + new Batch.Call() { + @Override + public R call(AggregateService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + instance.getMax(controller, requestArg, rpcCallback); + AggregateResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); } - }, aMaxCallBack); - } finally { - if (table != null) { - table.close(); - } - } + if (response.getFirstPartCount() > 0) { + ByteString b = response.getFirstPart(0); + Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b); + return ci.getCellValueFromProto(q); + } + return null; + } + }, aMaxCallBack); return aMaxCallBack.getMax(); } @@ -169,8 +187,32 @@ * @return min val * @throws Throwable */ + public R min( + final byte[] tableName, final ColumnInterpreter ci, final Scan scan) + throws Throwable { + HTable table = null; + try { + table = new HTable(conf, tableName); + return min(table, ci, scan); + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * It gives the minimum value of a column for a given column family for the + * given range. In case qualifier is null, a min of all values for the given + * family is returned. + * @param table + * @param ci + * @param scan + * @return min val + * @throws Throwable + */ public - R min(final byte[] tableName, final ColumnInterpreter ci, + R min(final HTable table, final ColumnInterpreter ci, final Scan scan) throws Throwable { final AggregateArgument requestArg = validateArgAndGetPB(scan, ci); class MinCallBack implements Batch.Callback { @@ -187,37 +229,56 @@ } } MinCallBack minCallBack = new MinCallBack(); + table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), + new Batch.Call() { + + @Override + public R call(AggregateService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + instance.getMin(controller, requestArg, rpcCallback); + AggregateResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + if (response.getFirstPartCount() > 0) { + ByteString b = response.getFirstPart(0); + Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b); + return ci.getCellValueFromProto(q); + } + return null; + } + }, minCallBack); + log.debug("Min fom all regions is: " + minCallBack.getMinimum()); + return minCallBack.getMinimum(); + } + + /** + * It gives the row count, by summing up the individual results obtained from + * regions. In case the qualifier is null, FirstKeyValueFilter is used to + * optimised the operation. In case qualifier is provided, I can't use the + * filter as it may set the flag to skip to next row, but the value read is + * not of the given filter: in this case, this particular row will not be + * counted ==> an error. + * @param tableName + * @param ci + * @param scan + * @return + * @throws Throwable + */ + public long rowCount( + final byte[] tableName, final ColumnInterpreter ci, final Scan scan) + throws Throwable { HTable table = null; try { table = new HTable(conf, tableName); - table.coprocessorService(AggregateService.class, scan.getStartRow(), - scan.getStopRow(), new Batch.Call() { - - @Override - public R call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback rpcCallback = - new BlockingRpcCallback(); - instance.getMin(controller, requestArg, rpcCallback); - AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - if (response.getFirstPartCount() > 0) { - ByteString b = response.getFirstPart(0); - Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b); - return ci.getCellValueFromProto(q); - } - return null; - } - }, minCallBack); + return rowCount(table, ci, scan); } finally { if (table != null) { table.close(); } } - log.debug("Min fom all regions is: " + minCallBack.getMinimum()); - return minCallBack.getMinimum(); } /** @@ -227,14 +288,14 @@ * filter as it may set the flag to skip to next row, but the value read is * not of the given filter: in this case, this particular row will not be * counted ==> an error. - * @param tableName + * @param table * @param ci * @param scan * @return * @throws Throwable */ public - long rowCount(final byte[] tableName, + long rowCount(final HTable table, final ColumnInterpreter ci, final Scan scan) throws Throwable { final AggregateArgument requestArg = validateArgAndGetPB(scan, ci); class RowNumCallback implements Batch.Callback { @@ -250,46 +311,61 @@ } } RowNumCallback rowNum = new RowNumCallback(); + table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), + new Batch.Call() { + @Override + public Long call(AggregateService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + instance.getRowNum(controller, requestArg, rpcCallback); + AggregateResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + byte[] bytes = getBytesFromResponse(response.getFirstPart(0)); + ByteBuffer bb = ByteBuffer.allocate(8).put(bytes); + bb.rewind(); + return bb.getLong(); + } + }, rowNum); + return rowNum.getRowNumCount(); + } + + /** + * It sums up the value returned from various regions. In case qualifier is + * null, summation of all the column qualifiers in the given family is done. + * @param tableName + * @param ci + * @param scan + * @return sum + * @throws Throwable + */ + public S sum( + final byte[] tableName, final ColumnInterpreter ci, final Scan scan) + throws Throwable { HTable table = null; try { table = new HTable(conf, tableName); - table.coprocessorService(AggregateService.class, scan.getStartRow(), - scan.getStopRow(), new Batch.Call() { - @Override - public Long call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback rpcCallback = - new BlockingRpcCallback(); - instance.getRowNum(controller, requestArg, rpcCallback); - AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - byte[] bytes = getBytesFromResponse(response.getFirstPart(0)); - ByteBuffer bb = ByteBuffer.allocate(8).put(bytes); - bb.rewind(); - return bb.getLong(); - } - }, rowNum); + return sum(table, ci, scan); } finally { if (table != null) { table.close(); } } - return rowNum.getRowNumCount(); } /** * It sums up the value returned from various regions. In case qualifier is * null, summation of all the column qualifiers in the given family is done. - * @param tableName + * @param table * @param ci * @param scan * @return sum * @throws Throwable */ public - S sum(final byte[] tableName, final ColumnInterpreter ci, + S sum(final HTable table, final ColumnInterpreter ci, final Scan scan) throws Throwable { final AggregateArgument requestArg = validateArgAndGetPB(scan, ci); @@ -306,48 +382,62 @@ } } SumCallBack sumCallBack = new SumCallBack(); + table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), + new Batch.Call() { + @Override + public S call(AggregateService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + instance.getSum(controller, requestArg, rpcCallback); + AggregateResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + if (response.getFirstPartCount() == 0) { + return null; + } + ByteString b = response.getFirstPart(0); + T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); + S s = ci.getPromotedValueFromProto(t); + return s; + } + }, sumCallBack); + return sumCallBack.getSumResult(); + } + + /** + * It computes average while fetching sum and row count from all the + * corresponding regions. Approach is to compute a global sum of region level + * sum and rowcount and then compute the average. + * @param tableName + * @param scan + * @throws Throwable + */ + private Pair getAvgArgs( + final byte[] tableName, final ColumnInterpreter ci, final Scan scan) + throws Throwable { HTable table = null; try { table = new HTable(conf, tableName); - table.coprocessorService(AggregateService.class, scan.getStartRow(), - scan.getStopRow(), new Batch.Call() { - @Override - public S call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback rpcCallback = - new BlockingRpcCallback(); - instance.getSum(controller, requestArg, rpcCallback); - AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - if (response.getFirstPartCount() == 0) { - return null; - } - ByteString b = response.getFirstPart(0); - T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); - S s = ci.getPromotedValueFromProto(t); - return s; - } - }, sumCallBack); + return getAvgArgs(table, ci, scan); } finally { if (table != null) { table.close(); } } - return sumCallBack.getSumResult(); } /** * It computes average while fetching sum and row count from all the * corresponding regions. Approach is to compute a global sum of region level * sum and rowcount and then compute the average. - * @param tableName + * @param table * @param scan * @throws Throwable */ private - Pair getAvgArgs(final byte[] tableName, + Pair getAvgArgs(final HTable table, final ColumnInterpreter ci, final Scan scan) throws Throwable { final AggregateArgument requestArg = validateArgAndGetPB(scan, ci); class AvgCallBack implements Batch.Callback> { @@ -365,43 +455,33 @@ } } AvgCallBack avgCallBack = new AvgCallBack(); - HTable table = null; - try { - table = new HTable(conf, tableName); - table.coprocessorService(AggregateService.class, scan.getStartRow(), - scan.getStopRow(), - new Batch.Call>() { - @Override - public Pair call(AggregateService instance) - throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback rpcCallback = - new BlockingRpcCallback(); - instance.getAvg(controller, requestArg, rpcCallback); - AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - Pair pair = new Pair(null, 0L); - if (response.getFirstPartCount() == 0) { - return pair; - } - ByteString b = response.getFirstPart(0); - T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); - S s = ci.getPromotedValueFromProto(t); - pair.setFirst(s); - ByteBuffer bb = ByteBuffer.allocate(8).put( - getBytesFromResponse(response.getSecondPart())); - bb.rewind(); - pair.setSecond(bb.getLong()); + table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), + new Batch.Call>() { + @Override + public Pair call(AggregateService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + instance.getAvg(controller, requestArg, rpcCallback); + AggregateResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + Pair pair = new Pair(null, 0L); + if (response.getFirstPartCount() == 0) { return pair; } - }, avgCallBack); - } finally { - if (table != null) { - table.close(); - } - } + ByteString b = response.getFirstPart(0); + T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); + S s = ci.getPromotedValueFromProto(t); + pair.setFirst(s); + ByteBuffer bb = ByteBuffer.allocate(8).put( + getBytesFromResponse(response.getSecondPart())); + bb.rewind(); + pair.setSecond(bb.getLong()); + return pair; + } + }, avgCallBack); return avgCallBack.getAvgArgs(); } @@ -425,18 +505,36 @@ } /** + * This is the client side interface/handle for calling the average method for + * a given cf-cq combination. It was necessary to add one more call stack as + * its return type should be a decimal value, irrespective of what + * columninterpreter says. So, this methods collects the necessary parameters + * to compute the average and returs the double value. + * @param table + * @param ci + * @param scan + * @return + * @throws Throwable + */ + public double avg( + final HTable table, final ColumnInterpreter ci, Scan scan) throws Throwable { + Pair p = getAvgArgs(table, ci, scan); + return ci.divideForAvg(p.getFirst(), p.getSecond()); + } + + /** * It computes a global standard deviation for a given column and its value. * Standard deviation is square root of (average of squares - * average*average). From individual regions, it obtains sum, square sum and * number of rows. With these, the above values are computed to get the global * std. - * @param tableName + * @param table * @param scan * @return * @throws Throwable */ private - Pair, Long> getStdArgs(final byte[] tableName, + Pair, Long> getStdArgs(final HTable table, final ColumnInterpreter ci, final Scan scan) throws Throwable { final AggregateArgument requestArg = validateArgAndGetPB(scan, ci); class StdCallback implements Batch.Callback, Long>> { @@ -461,49 +559,64 @@ } } StdCallback stdCallback = new StdCallback(); + table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), + new Batch.Call, Long>>() { + @Override + public Pair, Long> call(AggregateService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + instance.getStd(controller, requestArg, rpcCallback); + AggregateResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + Pair, Long> pair = new Pair, Long>(new ArrayList(), 0L); + if (response.getFirstPartCount() == 0) { + return pair; + } + List list = new ArrayList(); + for (int i = 0; i < response.getFirstPartCount(); i++) { + ByteString b = response.getFirstPart(i); + T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); + S s = ci.getPromotedValueFromProto(t); + list.add(s); + } + pair.setFirst(list); + ByteBuffer bb = ByteBuffer.allocate(8).put( + getBytesFromResponse(response.getSecondPart())); + bb.rewind(); + pair.setSecond(bb.getLong()); + return pair; + } + }, stdCallback); + return stdCallback.getStdParams(); + } + + /** + * This is the client side interface/handle for calling the std method for a + * given cf-cq combination. It was necessary to add one more call stack as its + * return type should be a decimal value, irrespective of what + * columninterpreter says. So, this methods collects the necessary parameters + * to compute the std and returns the double value. + * @param tableName + * @param ci + * @param scan + * @return + * @throws Throwable + */ + public + double std(final byte[] tableName, ColumnInterpreter ci, + Scan scan) throws Throwable { HTable table = null; try { table = new HTable(conf, tableName); - table.coprocessorService(AggregateService.class, scan.getStartRow(), - scan.getStopRow(), - new Batch.Call, Long>>() { - @Override - public Pair, Long> call(AggregateService instance) - throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback rpcCallback = - new BlockingRpcCallback(); - instance.getStd(controller, requestArg, rpcCallback); - AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - Pair,Long> pair = - new Pair, Long>(new ArrayList(), 0L); - if (response.getFirstPartCount() == 0) { - return pair; - } - List list = new ArrayList(); - for (int i = 0; i < response.getFirstPartCount(); i++) { - ByteString b = response.getFirstPart(i); - T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); - S s = ci.getPromotedValueFromProto(t); - list.add(s); - } - pair.setFirst(list); - ByteBuffer bb = ByteBuffer.allocate(8).put( - getBytesFromResponse(response.getSecondPart())); - bb.rewind(); - pair.setSecond(bb.getLong()); - return pair; - } - }, stdCallback); + return std(table, ci, scan); } finally { if (table != null) { table.close(); } } - return stdCallback.getStdParams(); } /** @@ -512,16 +625,15 @@ * return type should be a decimal value, irrespective of what * columninterpreter says. So, this methods collects the necessary parameters * to compute the std and returns the double value. - * @param tableName + * @param table * @param ci * @param scan * @return * @throws Throwable */ - public - double std(final byte[] tableName, ColumnInterpreter ci, - Scan scan) throws Throwable { - Pair, Long> p = getStdArgs(tableName, ci, scan); + public double std( + final HTable table, ColumnInterpreter ci, Scan scan) throws Throwable { + Pair, Long> p = getStdArgs(table, ci, scan); double res = 0d; double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond()); double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond()); @@ -534,7 +646,7 @@ * It helps locate the region with median for a given column whose weight * is specified in an optional column. * From individual regions, it obtains sum of values and sum of weights. - * @param tableName + * @param table * @param ci * @param scan * @return pair whose first element is a map between start row of the region @@ -544,7 +656,7 @@ */ private Pair>, List> - getMedianArgs(final byte[] tableName, + getMedianArgs(final HTable table, final ColumnInterpreter ci, final Scan scan) throws Throwable { final AggregateArgument requestArg = validateArgAndGetPB(scan, ci); final NavigableMap> map = @@ -569,55 +681,71 @@ } } StdCallback stdCallback = new StdCallback(); + table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), + new Batch.Call>() { + @Override + public List call(AggregateService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback(); + instance.getMedian(controller, requestArg, rpcCallback); + AggregateResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + + List list = new ArrayList(); + for (int i = 0; i < response.getFirstPartCount(); i++) { + ByteString b = response.getFirstPart(i); + T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); + S s = ci.getPromotedValueFromProto(t); + list.add(s); + } + return list; + } + + }, stdCallback); + return stdCallback.getMedianParams(); + } + + /** + * This is the client side interface/handler for calling the median method for a + * given cf-cq combination. This method collects the necessary parameters + * to compute the median and returns the median. + * @param tableName + * @param ci + * @param scan + * @return R the median + * @throws Throwable + */ + public + R median(final byte[] tableName, ColumnInterpreter ci, + Scan scan) throws Throwable { HTable table = null; try { table = new HTable(conf, tableName); - table.coprocessorService(AggregateService.class, scan.getStartRow(), - scan.getStopRow(), new Batch.Call>() { - @Override - public List call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback rpcCallback = - new BlockingRpcCallback(); - instance.getMedian(controller, requestArg, rpcCallback); - AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - - List list = new ArrayList(); - for (int i = 0; i < response.getFirstPartCount(); i++) { - ByteString b = response.getFirstPart(i); - T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); - S s = ci.getPromotedValueFromProto(t); - list.add(s); - } - return list; - } - - }, stdCallback); + return median(table, ci, scan); } finally { if (table != null) { table.close(); } } - return stdCallback.getMedianParams(); } /** * This is the client side interface/handler for calling the median method for a * given cf-cq combination. This method collects the necessary parameters * to compute the median and returns the median. - * @param tableName + * @param table * @param ci * @param scan * @return R the median * @throws Throwable */ public - R median(final byte[] tableName, ColumnInterpreter ci, + R median(final HTable table, ColumnInterpreter ci, Scan scan) throws Throwable { - Pair>, List> p = getMedianArgs(tableName, ci, scan); + Pair>, List> p = getMedianArgs(table, ci, scan); byte[] startRow = null; byte[] colFamily = scan.getFamilies()[0]; NavigableSet quals = scan.getFamilyMap().get(colFamily); @@ -643,10 +771,8 @@ Scan scan2 = new Scan(scan); // inherit stop row from method parameter if (startRow != null) scan2.setStartRow(startRow); - HTable table = null; ResultScanner scanner = null; try { - table = new HTable(conf, tableName); int cacheSize = scan2.getCaching(); if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) { scan2.setCacheBlocks(true); @@ -683,9 +809,6 @@ if (scanner != null) { scanner.close(); } - if (table != null) { - table.close(); - } } return null; }