Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (revision 0)
@@ -0,0 +1,445 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.coprocessor.AggregateCpProtocol;
+import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ *
+ * This client class is for invoking the agg functions deployed on the RS side
+ * via the cp impls. This class will implement the supporting functionality for
+ * summing/processing the individual results obtained from the cp impl for each
+ * region server.
+ *
+ * This will serve as the client side handle for invoking the agg funcitons,
+ * like AggrationClient#sum(table, start, end, colFamily, colQualifier);
+ *
+ *
+ * For all aggregate functions,
+ * - start row < end row is an essential condition (if they are not
+ * {@link HConstants#EMPTY_BYTE_ARRAY})
+ *
- Column family can't be null.
+ *
- For all mathematical computations like avg, std etc, a long cell value (8
+ * bytes) is assumed; otherwise that value is skipped.
+ *
+ */
+public class AggregationClient {
+
+ private static final Log log = LogFactory.getLog(AggregationClient.class);
+ Configuration conf;
+
+ /**
+ * no arg constructor
+ */
+ public AggregationClient() {
+
+ }
+
+ /**
+ * Constructor with Conf object
+ *
+ * @param cfg
+ */
+ public AggregationClient(Configuration cfg) {
+ this.conf = cfg;
+ }
+
+ /**
+ * It gives the maximum value of a column for a given column family for the
+ * given range. In case qualifier is null, a max of all values for the given
+ * family is returned.
+ *
+ * @param
+ *
+ * @param tableName
+ * @param startKey
+ * @param endKey
+ * @param colFamily
+ * @param colQualifier
+ * @param ci
+ * @param f
+ * @return the maximum value as a long.
+ * @throws Throwable
+ * The caller is supposed to handle the exception as they are thrown
+ * & propagated to it.
+ */
+ public R getMaximum(final byte[] tableName, final byte[] startKey,
+ final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,
+ final ColumnInterpreter ci, final Filter f) throws Throwable {
+ validateStartEndRows(startKey, endKey);
+ HTable table = new HTable(tableName);
+
+ class MaxCallBack implements Batch.Callback {
+ R max = ci.getMinValue();
+
+ R getMax() {
+ return max;
+ }
+
+ @Override
+ public void update(byte[] region, byte[] row, R result) {
+ if (ci.compare(max, result) < 0) {
+ max = result;
+ }
+ }
+ }
+ MaxCallBack aMaxCallBack = new MaxCallBack();
+ table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,
+ new Batch.Call() {
+ @Override
+ public R call(AggregateCpProtocol instance) throws IOException {
+ return instance.getMax(colFamily, colQualifier, startKey, endKey,
+ ci, f);
+ }
+ }, aMaxCallBack);
+ return aMaxCallBack.getMax();
+ }
+
+ private void validateStartEndRows(final byte[] startKey, final byte[] endKey)
+ throws IOException {
+ if ((Bytes.equals(startKey, endKey) && (!Bytes.equals(startKey,
+ HConstants.EMPTY_START_ROW)))
+ || Bytes.compareTo(startKey, endKey) > 0) {
+ throw new IOException(
+ "Agg client Exception: Startrow can't be equal or greater than Stoprow");
+ }
+ }
+
+ /**
+ * It gives the minimum value of a column for a given column family for the
+ * given range. In case qualifier is null, a min of all values for the given
+ * family is returned.
+ *
+ * @param
+ *
+ * @param tableName
+ * @param startKey
+ * @param endKey
+ * @param colFamily
+ * @param colQualifier
+ * @param ci
+ * @param f
+ * @return
+ * @throws Throwable
+ */
+ public R getMinimum(final byte[] tableName, final byte[] startKey,
+ final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,
+ final ColumnInterpreter ci, final Filter f) throws Throwable {
+ validateStartEndRows(startKey, endKey);
+ class MinCallBack implements Batch.Callback {
+
+ private R min = ci.getMaxValue();
+
+ public R getMinimum() {
+ return min;
+ }
+
+ @Override
+ public void update(byte[] region, byte[] row, R result) {
+ if (ci.compare(min, result) > 0) {
+ min = result;
+ }
+ }
+ }
+ HTable table = new HTable(tableName);
+ MinCallBack minCallBack = new MinCallBack();
+ table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,
+ new Batch.Call() {
+
+ @Override
+ public R call(AggregateCpProtocol instance) throws IOException {
+ return instance.getMin(colFamily, colQualifier, startKey, endKey,
+ ci, f);
+ }
+ }, minCallBack);
+ log.debug("Min fom all regions is: " + minCallBack.getMinimum());
+ return minCallBack.getMinimum();
+ }
+
+ /**
+ * It gives the row num, by summing up the individual results obtained from
+ * regions. In case the qualifier is null, FirstKEyValueFilter is used to
+ * optimised the operation. In case qualifier is provided, I can't use the
+ * filter as it may set the flag to skip to next row, but the value read is
+ * not of the given filter: in this case, this particular row will not be
+ * counted ==> an error.
+ *
+ * @param
+ *
+ * @param tableName
+ * @param startKey
+ * @param endKey
+ * @param colFamily
+ * @param colQualifier
+ * @param ci
+ * @param f
+ * @return
+ * @throws Throwable
+ */
+ public R getRowNum(final byte[] tableName, final byte[] startKey,
+ final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,
+ final ColumnInterpreter ci, final Filter f) throws Throwable {
+ validateStartEndRows(startKey, endKey);
+ class RowNumCallback implements Batch.Callback {
+ private R rowCount = ci.getInitialValue();
+
+ public R getRowNumCount() {
+ return rowCount;
+ }
+
+ @Override
+ public void update(byte[] region, byte[] row, R result) {
+ rowCount = ci.add(rowCount, result);
+ }
+ }
+ RowNumCallback rowNum = new RowNumCallback();
+ HTable table = new HTable(tableName);
+ table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,
+ new Batch.Call() {
+ @Override
+ public R call(AggregateCpProtocol instance) throws IOException {
+ return instance.getRowNum(colFamily, colQualifier, startKey,
+ endKey, ci, f);
+ }
+ }, rowNum);
+
+ return rowNum.getRowNumCount();
+ }
+
+ /**
+ * It sums up the value returned from various regions. In case qualifier is
+ * null, summation of all the column qualifiers in the given family is done.
+ *
+ * @param
+ *
+ * @param tableName
+ * @param startKey
+ * @param endKey
+ * @param colFamily
+ * @param colQualifier
+ * @param ci
+ * @param f
+ * @return
+ * @throws Throwable
+ */
+ public R getSum(final byte[] tableName, final byte[] startKey,
+ final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,
+ final ColumnInterpreter ci, final Filter f) throws Throwable {
+ validateStartEndRows(startKey, endKey);
+ class SumCallBack implements Batch.Callback {
+ R sumVal = ci.getInitialValue();
+
+ public R getSumResult() {
+ return sumVal;
+ }
+
+ @Override
+ public void update(byte[] region, byte[] row, R result) {
+ sumVal = ci.add(sumVal, result);
+ }
+ }
+ SumCallBack sumCallBack = new SumCallBack();
+ HTable table = new HTable(tableName);
+ table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,
+ new Batch.Call() {
+ @Override
+ public R call(AggregateCpProtocol instance) throws IOException {
+ return instance.getSum(colFamily, colQualifier, startKey, endKey,
+ ci, f);
+ }
+ }, sumCallBack);
+ return sumCallBack.getSumResult();
+ }
+
+ /**
+ * It computes average while fetching sum and row count from all the
+ * corresponding regions. Approach is to compute a global sum of region level
+ * sum and rowcount and then compute the average.
+ *
+ * @param tableName
+ * @param startKey
+ * @param endKey
+ * @param colFamily
+ * @param colQualifier
+ * @throws Throwable
+ */
+ private List getAvgArgs(final byte[] tableName, final byte[] startKey,
+ final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,
+ final ColumnInterpreter ci, final Filter f) throws Throwable {
+ validateStartEndRows(startKey, endKey);
+ class AvgCallBack implements Batch.Callback
> {
+ R sumVal = ci.getInitialValue();
+ R rowVal = ci.getInitialValue();
+
+ public List getAvgArgs() {
+ List l = new ArrayList();
+ l.add(sumVal);
+ l.add(rowVal);
+ return l;
+ }
+
+ @Override
+ public void update(byte[] region, byte[] row, List result) {
+ sumVal = ci.add(sumVal, result.get(0));
+ rowVal = ci.add(rowVal, result.get(1));
+ }
+ }
+ AvgCallBack avgCallBack = new AvgCallBack();
+ HTable table = new HTable(tableName);
+ table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,
+ new Batch.Call>() {
+ @Override
+ public List call(AggregateCpProtocol instance) throws IOException {
+ return instance.getAvg(colFamily, colQualifier, startKey, endKey,
+ ci, f);
+ }
+ }, avgCallBack);
+ return avgCallBack.getAvgArgs();
+ }
+
+ /**
+ * This is the client side interface/handle for calling the average method for
+ * a given cf-cq combination. It was necessary to add one more call stack as
+ * its return type should be a decimal value, irrespective of what
+ * columninterpreter says. So, this methods collects the necessary parameters
+ * to compute the average and returs the double value.
+ *
+ * @param tableName
+ * @param startKey
+ * @param endKey
+ * @param colFamily
+ * @param colQualifier
+ * @param ci
+ * @param
+ * @param f
+ * @return
+ * @throws Throwable
+ *
+ */
+ public double getAvg(final byte[] tableName, final byte[] startKey,
+ final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,
+ final ColumnInterpreter ci, final Filter f) throws Throwable {
+ List l = getAvgArgs(tableName, startKey, endKey, colFamily,
+ colQualifier, ci, f);
+ R q = l.get(0), d = l.get(1);
+ return ci.divide(q, d);
+ }
+
+ /**
+ * It computes a global standard deviation for a given column and its value.
+ * Standard deviation is square root of (average of squares -
+ * average*average). From individual regions, it obtains sum, square sum and
+ * number of rows. With these, the above values are computed to get the global
+ * std.
+ *
+ * @param tableName
+ * @param startKey
+ * @param endKey
+ * @param colFamily
+ * @param colQualifier
+ * @return TODO: number formatting
+ * @throws Throwable
+ */
+ private List getStdArgs(final byte[] tableName, final byte[] startKey,
+ final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,
+ final ColumnInterpreter ci, final Filter f) throws Throwable {
+ validateStartEndRows(startKey, endKey);
+ class StdCallback implements Batch.Callback> {
+ R rowCountVal = ci.getInitialValue();
+ R sumVal = ci.getInitialValue(), sumSqVal = ci.getInitialValue();
+
+ public List getStdParams() {
+ List l = new ArrayList();
+ l.add(sumVal);
+ l.add(sumSqVal);
+ l.add(rowCountVal);
+ return l;
+ }
+
+ @Override
+ public void update(byte[] region, byte[] row, List result) {
+ sumVal = ci.add(sumVal, result.get(0));
+ sumSqVal = ci.add(sumSqVal, result.get(1));
+ rowCountVal = ci.add(rowCountVal, result.get(2));
+ }
+ }
+ StdCallback stdCallback = new StdCallback();
+ HTable table = new HTable(tableName);
+ table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey,
+ new Batch.Call>() {
+ @Override
+ public List call(AggregateCpProtocol instance) throws IOException {
+ return instance.getStd(colFamily, colQualifier, startKey, endKey,
+ ci, f);
+ }
+
+ }, stdCallback);
+ return stdCallback.getStdParams();
+ }
+
+ /**
+ * This is the client side interface/handle for calling the std method for a
+ * given cf-cq combination. It was necessary to add one more call stack as its
+ * return type should be a decimal value, irrespective of what
+ * columninterpreter says. So, this methods collects the necessary parameters
+ * to compute the std and returns the double value.
+ *
+ * @param
+ *
+ * @param tableName
+ * @param startKey
+ * @param endKey
+ * @param colFamily
+ * @param colQualifier
+ * @param ci
+ * @param f
+ * @return
+ * @throws Throwable
+ */
+ public double getStd(final byte[] tableName, final byte[] startKey,
+ final byte[] endKey, final byte[] colFamily, final byte[] colQualifier,
+ ColumnInterpreter ci, final Filter f) throws Throwable {
+ List l = getStdArgs(tableName, startKey, endKey, colFamily,
+ colQualifier, ci, f);
+
+ double res = 0d;
+ double avg = ci.divide(l.get(0), l.get(2));
+
+ double avgOfSumSq = ci.divide(l.get(1), l.get(2));
+ res = avgOfSumSq - (avg) * (avg); // variance
+ res = Math.pow(res, 0.5);
+
+ return res;
+ }
+
+}
Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java (revision 0)
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * ColumnInterpreters implementations:
+ */
+/**
+ * A Long Column interpreter.
+ */
+public class LongColumnInterpreter implements ColumnInterpreter {
+
+ public Long getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv)
+ throws IOException {
+ if (kv == null || kv.getValue().length != 8)
+ return null;
+ return new Long(Bytes.toLong(kv.getValue()));
+ }
+
+ @Override
+ public Long add(Long l1, Long l2) {
+ return l1 + l2;
+ }
+
+ @Override
+ public int compare(Long l1, Long l2) {
+ return l1.compareTo(l2);
+ }
+
+ @Override
+ public Long getInitialValue() {
+ return 0l;
+ }
+
+ @Override
+ public Long getMaxValue() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public Long increment(Long o) {
+ return o + 1l;
+ }
+
+ @Override
+ public Long mult(Long o1, Long o2) {
+ return o1.longValue() * o2.longValue();
+ }
+
+ @Override
+ public Long getMinValue() {
+ return Long.MIN_VALUE;
+ }
+
+ @Override
+ public void readFields(DataInput arg0) throws IOException {
+
+ }
+
+ @Override
+ public void write(DataOutput arg0) throws IOException {
+
+ }
+
+ @Override
+ public double divide(Long l1, Long l2) {
+ if (l2 == null || l1 == null)
+ return Double.NaN;
+
+ return (l1.doubleValue() / l2.doubleValue());
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateCpProtocol.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateCpProtocol.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateCpProtocol.java (revision 0)
@@ -0,0 +1,159 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+
+/**
+ * It defines the aggregation functions that are to be supported in this
+ * Coprocessor. For each method, it take column family, column qualifier, start
+ * row and an end row [startRow, endRow) Refer to {@link AggregationClient} for
+ * some general conditions on input parameters
+ *
+ * @param
+ */
+public interface AggregateCpProtocol extends CoprocessorProtocol {
+
+ /**
+ * It gives the maximum for a given combination of column qualifier and column
+ * family, in the given row range. In case of null column qualifier, maximum
+ * value for the entire column family will be returned.
+ *
+ * @param
+ *
+ * @param colFamily
+ * @param colQualifier
+ * @param startRow
+ * @param endRow
+ * @param ci
+ * @param f
+ * @return max value as mentioned above
+ * @throws IOException
+ *
+ */
+ T getMax(byte[] colFamily, byte[] colQualifier, byte[] startRow,
+ byte[] endRow, ColumnInterpreter ci, Filter f) throws IOException;
+
+ /**
+ *
+ * For a given column family and column Qualifier for a table, it gives the
+ * minimum at the region level.
+ *
+ * @param
+ *
+ * @param colFamily
+ * @param colQualifier
+ * @param startRow
+ * @param endRow
+ * @param ci
+ * @param f
+ * @return
+ * @throws IOException
+ */
+ T getMin(byte[] colFamily, byte[] colQualifier, byte[] startRow,
+ byte[] endRow, ColumnInterpreter ci, Filter f) throws IOException;
+
+ /**
+ * For a given column family and column Qualifier for a table, it gives its
+ * sum of all its values at the region level.
+ *
+ * @param
+ *
+ * @param colFamily
+ * @param colQualifier
+ * @param startRow
+ * @param endRow
+ * @param ci
+ * @return
+ * @throws IOException
+ */
+
+ T getSum(byte[] colFamily, byte[] colQualifier, byte[] startRow,
+ byte[] endRow, ColumnInterpreter ci, Filter f) throws IOException;
+
+ /**
+ * It gives the row count for the given column family and column qualifier. In
+ * case of null qualifier, FirstKeyOnlyFilter is used for optimization.
+ *
+ * In case user has provided a filter that will be used, other wise
+ * FirstKeyOnlyFilter is used in case of null qualifier.
+ *
+ * @param
+ *
+ * @param colFamily
+ * @param colQualifier
+ * @param startRow
+ * @param endRow
+ * @param ci
+ * @param f
+ * @return
+ * @throws IOException
+ */
+ T getRowNum(byte[] colFamily, byte[] colQualifier, byte[] startRow,
+ byte[] endRow, ColumnInterpreter ci, Filter f) throws IOException;
+
+ /**
+ * It returns sum and number of elements of a given qualifier and family. This
+ * is returned as a list of LongWritables containing sum and total count. The
+ * idea is at the client side, we will just do a grand sum of both these and
+ * get the overall average.
+ *
+ * @param
+ *
+ * @param colFamily
+ * @param colQualifier
+ * @param startRow
+ * @param endRow
+ * @param ci
+ * @param f
+ * @return
+ * @throws IOException
+ */
+ List getAvg(byte[] colFamily, byte[] colQualifier, byte[] startRow,
+ byte[] endRow, ColumnInterpreter ci, Filter f) throws IOException;
+
+ /**
+ * This returns three values in order to be able to calculate a global
+ * standard deviation value. It returns sum, sum of square, and row num for a
+ * given column qualifier and family. The idea is get the value of variance
+ * first: the average of the squares less the square of the average a standard
+ * deviation is square root of variance.
+ *
+ * @param
+ *
+ * @param colFamily
+ * @param colQualifier
+ * @param startRow
+ * @param endRow
+ * @param ci
+ * @param f
+ * @return
+ * @throws IOException
+ */
+ List getStd(byte[] colFamily, byte[] colQualifier, byte[] startRow,
+ byte[] endRow, ColumnInterpreter ci, Filter f) throws IOException;
+
+}
\ No newline at end of file
Index: src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocolImpl.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocolImpl.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocolImpl.java (revision 0)
@@ -0,0 +1,292 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ *
+ */
+public class AggregateProtocolImpl extends BaseEndpointCoprocessor implements
+ AggregateCpProtocol {
+ protected static Log log = LogFactory.getLog(AggregateProtocolImpl.class);
+
+ @Override
+ public T getMax(byte[] colFamily, byte[] colQualifier, byte[] startRow,
+ byte[] endRow, ColumnInterpreter ci, Filter f) throws IOException {
+ T temp;
+ T max = ci.getMinValue();
+ InternalScanner scanner = getScanWithColAndQualifier(colFamily,
+ colQualifier, endRow, f);
+ List results = new ArrayList();
+ try {
+ boolean hasMoreRows = false;
+ do {
+ results.clear();//
+ hasMoreRows = scanner.next(results);
+ while (hasMoreRows
+ && Bytes.compareTo(startRow, HConstants.EMPTY_START_ROW) != 0
+ && Bytes.compareTo(results.get(0).getRow(), startRow) < 0) {
+ results.clear();
+ hasMoreRows = scanner.next(results);
+ }
+ for (KeyValue kv : results) {
+ temp = ci.getValue(colFamily, colQualifier, kv);
+ if (temp != null && (ci.compare(temp, max) > 0)) {
+ max = temp;
+ }
+ }
+ } while (hasMoreRows);
+ } finally {
+ scanner.close();
+ }
+ log.info("Maximum from this region is "
+ + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
+ .hashCode() + ": " + max);
+ return max;
+ }
+
+ @Override
+ public T getMin(byte[] colFamily, byte[] colQualifier, byte[] startRow,
+ byte[] endRow, ColumnInterpreter ci, Filter f) throws IOException {
+ T min = ci.getMaxValue();
+ T temp;
+ InternalScanner scanner = getScanWithColAndQualifier(colFamily,
+ colQualifier, endRow, f);
+ List results = new ArrayList();
+ try {
+ boolean hasMoreRows = false;
+ do {
+ results.clear();
+ hasMoreRows = scanner.next(results);
+ while (hasMoreRows
+ && Bytes.compareTo(startRow, HConstants.EMPTY_START_ROW) != 0
+ && Bytes.compareTo(results.get(0).getRow(), startRow) < 0) {
+ results.clear();
+ hasMoreRows = scanner.next(results);
+ }
+ for (KeyValue kv : results) {
+ temp = ci.getValue(colFamily, colQualifier, kv);
+ if (temp != null && (ci.compare(temp, min) < 0)) {
+ min = temp;
+ }
+ }
+ } while (hasMoreRows);
+ } finally {
+ scanner.close();
+ }
+ log.info("Minimum from this region is "
+ + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
+ .hashCode() + ": " + min);
+ return min;
+ }
+
+ @Override
+ public T getSum(byte[] colFamily, byte[] colQualifier, byte[] startRow,
+ byte[] endRow, ColumnInterpreter ci, Filter f) throws IOException {
+ long sum = 0l;
+ T sumVal = ci.getInitialValue();
+ T temp;
+ InternalScanner scanner = getScanWithColAndQualifier(colFamily,
+ colQualifier, endRow, f);
+ List results = new ArrayList();
+ try {
+ boolean hasMoreRows = false;
+ do {
+ results.clear();
+ hasMoreRows = scanner.next(results);
+ while (hasMoreRows
+ && Bytes.compareTo(startRow, HConstants.EMPTY_START_ROW) != 0
+ && Bytes.compareTo(results.get(0).getRow(), startRow) < 0) {
+ results.clear();
+ hasMoreRows = scanner.next(results);
+ }
+ for (KeyValue kv : results) {
+ temp = ci.getValue(colFamily, colQualifier, kv);
+ if (temp != null)
+ sumVal = ci.add(sumVal, temp);
+ }
+ } while (hasMoreRows);
+ } finally {
+ scanner.close();
+ }
+ log.debug("Sum from this region is "
+ + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
+ .hashCode() + ": " + sum);
+ return sumVal;
+ }
+
+ @Override
+ public T getRowNum(byte[] colFamily, byte[] colQualifier,
+ byte[] startRow, byte[] endRow, ColumnInterpreter ci, Filter f)
+ throws IOException {
+ long counter = 0l;
+ T rowCount = ci.getInitialValue();
+ InternalScanner scanner = getScanWithColAndQualifier(colFamily,
+ colQualifier, endRow, f == null ? ((colQualifier != null) ? null
+ : new FirstKeyOnlyFilter()) : f);
+ List results = new ArrayList();
+ try {
+ boolean hasMoreRows = false;
+ do {
+ results.clear();
+ hasMoreRows = scanner.next(results);
+ while (hasMoreRows
+ && Bytes.compareTo(startRow, HConstants.EMPTY_START_ROW) != 0
+ && Bytes.compareTo(results.get(0).getRow(), startRow) < 0) {
+ results.clear();
+ hasMoreRows = scanner.next(results);
+ }
+ if (results.size() > 0) {
+ rowCount = ci.increment(rowCount);
+ }
+ } while (hasMoreRows);
+ } finally {
+ scanner.close();
+ }
+ log.debug("Row counter from this region is "
+ + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
+ .hashCode() + ": " + counter);
+ return rowCount;
+ }
+
+ /**
+ * Returns a scanner for a given column qualifier and family. Throws an ioe in
+ * case both of them are null.
+ *
+ * @param colFamily
+ * @param colQualifier
+ * @param stopRow
+ * @param filter
+ * @return
+ * @throws IOException
+ */
+
+ private InternalScanner getScanWithColAndQualifier(byte[] colFamily,
+ byte[] colQualifier, byte[] stopRow, Filter filter) throws IOException {
+ if (colFamily == null && colQualifier == null) { // abort the process
+ throw new IOException("Family can't be null");
+ }
+ try {
+ Scan scan = new Scan();
+ scan.setStopRow(stopRow);
+ scan.setFilter(filter);
+ if (colQualifier == null)
+ scan.addFamily(colFamily);
+ else {
+ scan.addColumn(colFamily, colQualifier);
+ }
+ HRegion region = ((RegionCoprocessorEnvironment) getEnvironment())
+ .getRegion();
+ return region.getScanner(scan);
+ } catch (Exception e) {
+ log.error("Exception occurred while creating the scanner!");
+ throw new IOException("Exception occurred while creating the scanner");
+ }
+ }
+
+ @Override
+ public List getAvg(byte[] colFamily, byte[] colQualifier,
+ byte[] startRow, byte[] endRow, ColumnInterpreter ci, Filter f)
+ throws IOException {
+ // long sum = 0l, rowNum = 0l;
+ T sumVal = ci.getInitialValue();
+ T rowCountVal = ci.getInitialValue();
+ InternalScanner scanner = getScanWithColAndQualifier(colFamily,
+ colQualifier, endRow, f);
+ boolean hasMoreRows = false;
+ List results = new ArrayList();
+ try {
+ do {
+ results.clear();
+ hasMoreRows = scanner.next(results);
+ while (hasMoreRows
+ && Bytes.compareTo(startRow, HConstants.EMPTY_START_ROW) != 0
+ && Bytes.compareTo(results.get(0).getRow(), startRow) < 0) {
+ results.clear();
+ hasMoreRows = scanner.next(results);
+ }
+ for (KeyValue kv : results) {
+ sumVal = ci.add(sumVal, ci.getValue(colFamily, colQualifier, kv));
+ }
+ rowCountVal = ci.increment(rowCountVal);
+ } while (hasMoreRows);
+ } finally {
+ scanner.close();
+ }
+ List sumAndRowcount = new ArrayList();
+ sumAndRowcount.add(sumVal);
+ sumAndRowcount.add(rowCountVal);
+ return sumAndRowcount;
+ }
+
+ @Override
+ public List getStd(byte[] colFamily, byte[] colQualifier,
+ byte[] startRow, byte[] endRow, ColumnInterpreter ci, Filter f)
+ throws IOException {
+ T sumVal = ci.getInitialValue(), rowCountVal = ci.getInitialValue(), sumSqVal = ci
+ .getInitialValue(), tempVal = ci.getInitialValue();
+ InternalScanner scanner = getScanWithColAndQualifier(colFamily,
+ colQualifier, endRow, f);
+ boolean hasMoreRows = false;
+ List results = new ArrayList();
+ try {
+ do {
+ results.clear();
+ tempVal = ci.getInitialValue();
+ hasMoreRows = scanner.next(results);
+ while (hasMoreRows
+ && Bytes.compareTo(startRow, HConstants.EMPTY_START_ROW) != 0
+ && Bytes.compareTo(results.get(0).getRow(), startRow) < 0) {
+ results.clear();
+ hasMoreRows = scanner.next(results);
+ }
+ for (KeyValue kv : results) {
+ tempVal = ci.add(tempVal, ci.getValue(colFamily, colQualifier, kv));
+ }
+ sumVal = ci.add(sumVal, tempVal);
+ sumSqVal = ci.add(sumSqVal, ci.mult(tempVal, tempVal));
+ rowCountVal = ci.increment(rowCountVal);
+ } while (hasMoreRows);
+ } finally {
+ scanner.close();
+ }
+ List result = new ArrayList();
+ result.add(sumVal);
+ result.add(sumSqVal);
+ result.add(rowCountVal);
+ log.info("STD results are: " + result);
+ return result;
+ }
+
+}
Index: src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java (revision 0)
@@ -0,0 +1,105 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * It defines how value for specific column is interpreted It takes column
+ * family, column qualifier and value Refer to {@link AggregationClient} for
+ * example
+ *
+ * @param
+ */
+public interface ColumnInterpreter extends Writable {
+
+ /**
+ * @param colFamily
+ * @param colQualifier
+ * @param value
+ * @return value of type T
+ * @throws IOException
+ */
+ T getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv)
+ throws IOException;
+
+ /**
+ * returns the maximum value for this type T
+ *
+ * @return
+ */
+
+ T getMaxValue();
+
+ /**
+ *
+ * @return
+ */
+
+ T getMinValue();
+
+ T getInitialValue();
+
+ /**
+ *
+ * @param l1
+ * @param l2
+ * @return
+ */
+ T add(T l1, T l2);
+
+ /**
+ *
+ * @param o1
+ * @param o2
+ * @return
+ */
+ T mult(T o1, T o2);
+
+ /**
+ *
+ * @param o
+ * @return
+ */
+ T increment(T o);
+
+ /**
+ * returns > 0 if l1 > l2; < 0 if l1 < l2
+ *
+ * @param l1
+ * @param l2
+ * @return
+ */
+ int compare(T l1, T l2);
+
+ /**
+ * Assuming the returned value is always double. Think it is right
+ *
+ * @param l1
+ * @param l2
+ * @return
+ */
+ double divide(T l1, T l2);
+}
\ No newline at end of file
Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggFunctions.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggFunctions.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggFunctions.java (revision 0)
@@ -0,0 +1,715 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
+import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * A test class to cover agg functions, that can be implemented using
+ * Coprocessors.
+ */
+public class TestAggFunctions {
+ protected static Log myLog = LogFactory.getLog(TestAggFunctions.class);
+
+ /**
+ * Creating the test infrastructure.
+ */
+ private static final byte[] TEST_TABLE = Bytes.toBytes("TestTable");
+ private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
+ private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
+ private static final byte[] TEST_MULTI_CQ = Bytes.toBytes("TestMultiCQ");
+
+ private static byte[] ROW = Bytes.toBytes("testRow");
+ private static final int ROWSIZE = 20;
+ private static final int rowSeperator1 = 5;
+ private static final int rowSeperator2 = 12;
+ private static byte[][] ROWS = makeN(ROW, ROWSIZE);
+
+ private static HBaseTestingUtility util = new HBaseTestingUtility();
+ private static MiniHBaseCluster cluster = null;
+
+ /**
+ * A set up method to start the test cluster. AggregateProtocolImpl is
+ * registered and will be loaded during region startup.
+ *
+ * @throws Exception
+ */
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Configuration conf = util.getConfiguration();
+ conf.set("hbase.coprocessor.default.classes",
+ "org.apache.hadoop.hbase.coprocessor.AggregateProtocolImpl");
+
+ util.startMiniCluster(2);
+ cluster = util.getMiniHBaseCluster();
+ HTable table = util.createTable(TEST_TABLE, TEST_FAMILY);
+ util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY,
+ new byte[][] { HConstants.EMPTY_BYTE_ARRAY, ROWS[rowSeperator1],
+ ROWS[rowSeperator2] });
+ /**
+ * The testtable has one CQ which is always populated and one variable CQ
+ * for each row rowkey1: CF:CQ CF:CQ1 rowKey2: CF:CQ CF:CQ2
+ */
+ for (int i = 0; i < ROWSIZE; i++) {
+ Put put = new Put(ROWS[i]);
+ Long l = new Long(i);
+ put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(l));
+ table.put(put);
+ Put p2 = new Put(ROWS[i]);
+ p2.add(TEST_FAMILY, Bytes.add(TEST_MULTI_CQ, Bytes.toBytes(l)), Bytes
+ .toBytes(l * 10));
+ table.put(p2);
+ }
+
+ // sleep here is an ugly hack to allow region transitions to finish
+ Thread.sleep(5000); // TODO: Is this sleep really necessary to do...
+ for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) {
+ for (HRegionInfo r : t.getRegionServer().getOnlineRegions()) {
+ t.getRegionServer().getOnlineRegion(r.getRegionName())
+ .getCoprocessorHost().load(AggregateProtocolImpl.class,
+ Coprocessor.Priority.USER);
+
+ }
+ }
+ }
+
+ /**
+ * Shutting down the cluster
+ *
+ * @throws Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ /**
+ * an infrastructure method to prepare rows for the testtable.
+ *
+ * @param base
+ * @param n
+ * @return
+ */
+ private static byte[][] makeN(byte[] base, int n) {
+ byte[][] ret = new byte[n][];
+ for (int i = 0; i < n; i++) {
+ ret[i] = Bytes.add(base, Bytes.toBytes(i));
+ }
+ return ret;
+ }
+
+ /**
+ * **************************** ROW COUNT Test cases *******************
+ */
+
+ /**
+ *
+ * This will test rowcount with a valid range, i.e., a subset of rows. It will
+ * be the most common use case.
+ *
+ * @throws Throwable
+ */
+ @Test
+ public void testRowCountWithValidRange() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long rowCount = aClient.getRowNum(TEST_TABLE, ROWS[2], ROWS[14],
+ TEST_FAMILY, TEST_QUALIFIER, ci, null);
+ assertEquals(12, rowCount);
+
+ }
+
+ /**
+ * This will test the row count on the entire table. Startrow and endrow will
+ * be null.
+ *
+ * @throws Throwable
+ */
+ @Test
+ public void testRowCountAllTable() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long rowCount = aClient.getRowNum(TEST_TABLE, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, TEST_FAMILY, TEST_QUALIFIER, ci, null);
+ System.out.printf("Val for rowNum should be %d", rowCount);
+ assertEquals(ROWSIZE, rowCount);
+ }
+
+ /**
+ * This will test the row count with startrow > endrow. The result should be
+ * -1.
+ *
+ * @throws Throwable
+ */
+ @Test
+ public void testRowCountWithInvalidRange1() {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long rowCount = -1;
+ try {
+ rowCount = aClient.getRowNum(TEST_TABLE, ROWS[5], ROWS[2], TEST_FAMILY,
+ TEST_QUALIFIER, ci, null);
+ } catch (Throwable e) {
+ myLog.error("Exception thrown in the invalidRange method"
+ + e.getStackTrace());
+ }
+ assertEquals(-1, rowCount);
+ }
+
+ /**
+ * This will test the row count with startrow = endrow and they will be
+ * non-null. The result should be 0, as it assumes a non-get query.
+ *
+ * @throws Throwable
+ */
+ @Test
+ public void testRowCountWithInvalidRange2() {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long rowCount = -1;
+ try {
+ rowCount = aClient.getRowNum(TEST_TABLE, ROWS[5], ROWS[5], TEST_FAMILY,
+ TEST_QUALIFIER, ci, null);
+ } catch (Throwable e) {
+ rowCount = 0;
+ }
+ assertEquals(0, rowCount);
+ }
+
+ /**
+ * This should return a 0
+ */
+ @Test
+ public void testRowCountWithNullCF() {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long rowCount = -1;
+ try {
+ rowCount = aClient.getRowNum(TEST_TABLE, ROWS[5], ROWS[5], null, null,
+ ci, null);
+ } catch (Throwable e) {
+ rowCount = 0;
+ }
+ assertEquals(0, rowCount);
+ }
+
+ @Test
+ public void testRowCountWithNullCQ() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long rowCount = aClient.getRowNum(TEST_TABLE, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, TEST_FAMILY, null, ci, null);
+ assertEquals(20, rowCount);
+ }
+
+ @Test
+ public void testRowCountWithPrefixFilter() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
+ long rowCount = aClient.getRowNum(TEST_TABLE, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, TEST_FAMILY, null, ci, f);
+ assertEquals(0, rowCount);
+ }
+
+ /**
+ * ***************Test cases for Maximum *******************
+ */
+
+ /**
+ * give max for the entire table.
+ *
+ * @throws Throwable
+ *
+ */
+ @Test
+ public void testMaxWithValidRange() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long maximum = aClient.getMaximum(TEST_TABLE, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, TEST_FAMILY, TEST_QUALIFIER, ci, null);
+ assertEquals(19, maximum);
+ }
+
+ /**
+ *
+ * @throws Throwable
+ */
+ @Test
+ public void testMaxWithValidRange2() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long max = aClient.getMaximum(TEST_TABLE, ROWS[5], ROWS[15], TEST_FAMILY,
+ TEST_QUALIFIER, ci, null);
+ assertEquals(14, max);
+ }
+
+ @Test
+ public void testMaxWithValidRangeWithNoCQ() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long maximum = aClient.getMaximum(TEST_TABLE, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, TEST_FAMILY, null, ci, null);
+ assertEquals(190, maximum);
+ }
+
+ @Test
+ public void testMaxWithValidRange2WithNoCQ() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long max = aClient.getMaximum(TEST_TABLE, ROWS[6], ROWS[7], TEST_FAMILY,
+ null, ci, null);
+ assertEquals(60, max);
+ }
+
+ @Test
+ public void testMaxWithValidRangeWithNullCF() {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long max = Long.MIN_VALUE;
+ try {
+ max = aClient.getMaximum(TEST_TABLE, ROWS[2], ROWS[5], null, null, ci,
+ null);
+ } catch (Throwable e) {
+ max = 0;
+ }
+ assertEquals(0, max);// CP will throw an IOException about the
+ // null column family, and max will be set to 0
+ }
+
+ @Test
+ public void testMaxWithInvalidRange() {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long max = Long.MIN_VALUE;
+ try {
+ max = aClient.getMaximum(TEST_TABLE, ROWS[4], ROWS[2], TEST_FAMILY,
+ TEST_QUALIFIER, ci, null);
+ } catch (Throwable e) {
+ max = 0;
+ }
+ assertEquals(0, max);// control should go to the catch block
+ }
+
+ @Test
+ public void testMaxWithInvalidRange2() throws Throwable {
+ long max = Long.MIN_VALUE;
+ try {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ max = aClient.getMaximum(TEST_TABLE, ROWS[6], ROWS[6], TEST_FAMILY,
+ TEST_QUALIFIER, ci, null);
+ } catch (Exception e) {
+ max = 0;
+ }
+ assertEquals(0, max);// control should go to the catch block
+ }
+
+ @Test
+ public void testMaxWithFilter() throws Throwable {
+ long max = 0l;
+ AggregationClient aClient = new AggregationClient();
+ Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ max = aClient.getMaximum(TEST_TABLE, ROWS[1], ROWS[15], TEST_FAMILY,
+ TEST_QUALIFIER, ci, f);
+
+ assertEquals(Long.MIN_VALUE, max);
+ }
+
+ /**
+ * **************************Test cases for Minimum ***********************
+ */
+
+ /**
+ *
+ * @throws Throwable
+ */
+ @Test
+ public void testMinWithValidRange() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long min = aClient.getMinimum(TEST_TABLE, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, TEST_FAMILY, TEST_QUALIFIER, ci, null);
+ assertEquals(0, min);
+ }
+
+ /**
+ *
+ * @throws Throwable
+ */
+ @Test
+ public void testMinWithValidRange2() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long min = aClient.getMinimum(TEST_TABLE, ROWS[5], ROWS[15], TEST_FAMILY,
+ TEST_QUALIFIER, ci, null);
+ assertEquals(5, min);
+ }
+
+ @Test
+ public void testMinWithValidRangeWithNoCQ() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long min = aClient.getMinimum(TEST_TABLE, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, TEST_FAMILY, null, ci, null);
+ assertEquals(0, min);
+ }
+
+ @Test
+ public void testMinWithValidRange2WithNoCQ() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long min = aClient.getMinimum(TEST_TABLE, ROWS[6], ROWS[7], TEST_FAMILY,
+ null, ci, null);
+ assertEquals(6, min);
+ }
+
+ @Test
+ public void testMinWithValidRangeWithNullCF() {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long min = Long.MAX_VALUE;
+ try {
+ min = aClient.getMinimum(TEST_TABLE, ROWS[4], ROWS[8], null, null, ci,
+ null);
+ } catch (Throwable e) {
+ min = 0;
+ }
+ assertEquals(0, min);// CP will throw an IOException about the
+ // null column family, and max will be set to 0
+ }
+
+ @Test
+ public void testMinWithInvalidRange() {
+ AggregationClient aClient = new AggregationClient();
+ long min = Long.MAX_VALUE;
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ try {
+ min = aClient.getMinimum(TEST_TABLE, ROWS[4], ROWS[2], TEST_FAMILY,
+ TEST_QUALIFIER, ci, null);
+ } catch (Throwable e) {
+ min = 0;
+ }
+ assertEquals(0, min);// control should go to the catch block
+ }
+
+ @Test
+ public void testMinWithInvalidRange2() {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long min = Long.MAX_VALUE;
+ try {
+ min = aClient.getMinimum(TEST_TABLE, ROWS[4], ROWS[4], TEST_FAMILY,
+ TEST_QUALIFIER, ci, null);
+ } catch (Throwable e) {
+ min = 0;
+ }
+ assertEquals(0, min);// control should go to the catch block
+ }
+
+ @Test
+ public void testMinWithFilter() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long min = 0l;
+ min = aClient.getMinimum(TEST_TABLE, ROWS[4], ROWS[15], TEST_FAMILY,
+ TEST_QUALIFIER, ci, f);
+ assertEquals(Long.MAX_VALUE, min);
+ }
+
+ /**
+ * *************** Test cases for Sum *********************
+ */
+ /**
+ *
+ * @throws Throwable
+ */
+ @Test
+ public void testSumWithValidRange() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long sum = aClient.getSum(TEST_TABLE, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, TEST_FAMILY, TEST_QUALIFIER, ci, null);
+ assertEquals(190, sum);
+ }
+
+ /**
+ *
+ * @throws Throwable
+ */
+ @Test
+ public void testSumWithValidRange2() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long sum = aClient.getSum(TEST_TABLE, ROWS[5], ROWS[15], TEST_FAMILY,
+ TEST_QUALIFIER, ci, null);
+ assertEquals(95, sum);
+ }
+
+ @Test
+ public void testSumWithValidRangeWithNoCQ() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long sum = aClient.getSum(TEST_TABLE, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, TEST_FAMILY, null, ci, null);
+ assertEquals(190 + 1900, sum);
+ }
+
+ @Test
+ public void testSumWithValidRange2WithNoCQ() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long sum = aClient.getSum(TEST_TABLE, ROWS[6], ROWS[7], TEST_FAMILY, null,
+ ci, null);
+ assertEquals(6 + 60, sum);
+ }
+
+ @Test
+ public void testSumWithValidRangeWithNullCF() {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long sum = Long.MAX_VALUE;
+ try {
+ sum = aClient.getSum(TEST_TABLE, ROWS[4], ROWS[4], null, null, ci, null);
+ } catch (Throwable e) {
+ sum = 0;
+ }
+ assertEquals(0, sum);// CP will throw an IOException about the
+ // null column family, and max will be set to 0
+ }
+
+ @Test
+ public void testSumWithInvalidRange() {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long sum = Long.MAX_VALUE;
+ try {
+ sum = aClient.getSum(TEST_TABLE, ROWS[4], ROWS[2], TEST_FAMILY,
+ TEST_QUALIFIER, ci, null);
+ } catch (Throwable e) {
+ sum = 0;
+ }
+ assertEquals(0, sum);// control should go to the catch block
+ }
+
+ @Test
+ public void testSumWithFilter() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ long sum = 0l;
+ sum = aClient.getSum(TEST_TABLE, ROWS[1], ROWS[15], TEST_FAMILY,
+ TEST_QUALIFIER, ci, f);
+ assertEquals(0l, sum);
+ }
+
+ /**
+ * ****************************** Test Cases for Avg **************
+ */
+ /**
+ *
+ * @throws Throwable
+ */
+ @Test
+ public void testAvgWithValidRange() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ double avg = aClient.getAvg(TEST_TABLE, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, TEST_FAMILY, TEST_QUALIFIER, ci, null);
+ assertEquals(9.5, avg, 0);
+ }
+
+ /**
+ *
+ * @throws Throwable
+ */
+ @Test
+ public void testAvgWithValidRange2() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ double avg = aClient.getAvg(TEST_TABLE, ROWS[5], ROWS[15], TEST_FAMILY,
+ TEST_QUALIFIER, ci, null);
+ assertEquals(9.5, avg, 0);
+ }
+
+ @Test
+ public void testAvgWithValidRangeWithNoCQ() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ double avg = aClient.getAvg(TEST_TABLE, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, TEST_FAMILY, null, ci, null);
+ assertEquals(104.5, avg, 0);
+ }
+
+ @Test
+ public void testAvgWithValidRange2WithNoCQ() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ double avg = aClient.getAvg(TEST_TABLE, ROWS[6], ROWS[7], TEST_FAMILY,
+ null, ci, null);
+ assertEquals(6 + 60, avg, 0);
+ }
+
+ @Test
+ public void testAvgWithValidRangeWithNullCF() {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ double avg = Double.MAX_VALUE;
+ try {
+ avg = aClient.getAvg(TEST_TABLE, ROWS[4], ROWS[4], null, null, ci, null);
+ } catch (Throwable e) {
+ avg = 0;
+ }
+ assertEquals(0, avg, 0);// CP will throw an IOException about the
+ // null column family, and max will be set to 0
+ }
+
+ @Test
+ public void testAvgWithInvalidRange() {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ double avg = Double.MAX_VALUE;
+ try {
+ avg = aClient.getAvg(TEST_TABLE, ROWS[4], ROWS[2], TEST_FAMILY,
+ TEST_QUALIFIER, ci, null);
+ } catch (Throwable e) {
+ avg = 0;
+ }
+ assertEquals(0, avg, 0);// control should go to the catch block
+ }
+
+ @Test
+ public void testAvgWithFilter() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ double avg = 0d;
+ avg = aClient.getAvg(TEST_TABLE, ROWS[1], ROWS[15], TEST_FAMILY,
+ TEST_QUALIFIER, ci, f);
+ assertEquals(0d, avg, 0);
+ }
+
+ /**
+ * ****************** Test cases for STD **********************
+ */
+ /**
+ *
+ * @throws Throwable
+ */
+ @Test
+ public void testStdWithValidRange() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ double std = aClient.getStd(TEST_TABLE, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, TEST_FAMILY, TEST_QUALIFIER, ci, null);
+ assertEquals(5.766, std, 0.05d);
+ }
+
+ /**
+ *
+ * @throws Throwable
+ */
+ @Test
+ public void testStdWithValidRange2() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ double std = aClient.getStd(TEST_TABLE, ROWS[5], ROWS[15], TEST_FAMILY,
+ TEST_QUALIFIER, ci, null);
+ assertEquals(2.87, std, 0.05d);
+ }
+
+ @Test
+ public void testStdWithValidRangeWithNoCQ() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ double std = aClient.getStd(TEST_TABLE, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW, TEST_FAMILY, null, ci, null);
+ assertEquals(63.42, std, 0.05d);
+ }
+
+ @Test
+ public void testStdWithValidRange2WithNoCQ() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ double std = aClient.getStd(TEST_TABLE, ROWS[6], ROWS[7], TEST_FAMILY,
+ null, ci, null);
+ assertEquals(0, std, 0);
+ }
+
+ @Test
+ public void testStdWithValidRangeWithNullCF() {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ double std = Double.MAX_VALUE;
+ try {
+ std = aClient.getStd(TEST_TABLE, ROWS[4], ROWS[4], null, null, ci, null);
+ } catch (Throwable e) {
+ std = 0;
+ }
+ assertEquals(0, std, 0);// CP will throw an IOException about the
+ // null column family, and max will be set to 0
+ }
+
+ @Test
+ public void testStdWithInvalidRange() {
+ AggregationClient aClient = new AggregationClient();
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ double std = Double.MAX_VALUE;
+ try {
+ std = aClient.getStd(TEST_TABLE, ROWS[4], ROWS[2], TEST_FAMILY,
+ TEST_QUALIFIER, ci, null);
+ } catch (Throwable e) {
+ std = 0;
+ }
+ assertEquals(0, std, 0);// control should go to the catch block
+ }
+
+ @Test
+ public void testStdWithFilter() throws Throwable {
+ AggregationClient aClient = new AggregationClient();
+ Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
+ final ColumnInterpreter ci = new LongColumnInterpreter();
+ double avg = 0d;
+ avg = aClient.getStd(TEST_TABLE, ROWS[1], ROWS[15], TEST_FAMILY,
+ TEST_QUALIFIER, ci, f);
+ assertEquals(0d, avg, 0);
+ }
+}