Index: src/main/java/org/apache/hadoop/hbase/client/CursorCallable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/CursorCallable.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/CursorCallable.java (revision 0) @@ -0,0 +1,88 @@ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.NotServingRegionException; + +/** + * This captures the call of the cursor behavior from client to the server. + * It has the location of the regionserver, row, and the regioninfo for that row. + * + * it is almost similar to the scannercallable, so it should have a corresponding ID, + * that is mapped to a fat object (CursorResultSetCp) on the server side. this object is created by the + * coprocessor endpoint. That object provides support for navigating/pulling rows from a resultset. + * There can be two types of resultset: InMemoryResultSet and IncrementalResultSet. + */ +public class CursorCallable extends ServerCallable { + + private long cursorId = -1; + private boolean instantiated = false; + private boolean closed = false; + private int cache = 1; + + private static final Log LOG = LogFactory.getLog(CursorCallable.class); + + public void setCache(int cache) { + this.cache = cache; + } + + @Override + public void instantiateServer(boolean reload) throws IOException { + if(!instantiated){ + super.instantiateServer(reload); + instantiated = true; + } + } + + public CursorCallable(HConnection connection, byte[] tableName, byte[] row) { + super(connection, tableName, row); + } + + public void setCursorId(long cursorId) { + this.cursorId = cursorId; + } + + @Override + public Result[] call() throws IOException { + Result [] res; + if(cursorId == -1) + return null;// cursor is not set/registered. + else if (closed && cursorId != -1){ // go and close the cursor on the HRS + this.close(); + }else{ + try { + res = this.server.nextCp(cursorId, cache); + if(res ==null || res.length !=this.cache){ //results are all fetch, set the close flag, so that on next call, the cursor is closed. + closed = true; + } + return res; + } catch (Exception e) { + if (e instanceof NotServingRegionException){ + LOG.error("got a NSRE from region server with location"+ this.location.toString()); + // at this point, it will abort the process! (sad but true). not supporting the logic of resending the request. + throw new IOException("Aborting the process due to NSRE"); + } + throw new IOException(e.getCause()); + } + } + return null; + } + + + public void setClosed(boolean closed) { + this.closed = closed; + } + + public void close(){ + if(this.cursorId ==-1) + return; + try{ + this.server.closeCp(cursorId); + }catch(IOException ioe){ + LOG.error("Got an exception while closing the cursor resource"); + } + this.cursorId = -1; + } +} Index: src/main/java/org/apache/hadoop/hbase/client/CursorCp.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/CursorCp.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/CursorCp.java (revision 0) @@ -0,0 +1,115 @@ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * This class represnts the client side interface of the cursor-server-client plumbing. + * + */ +public class CursorCp implements ResultScanner{ + // starting with essential fields. + private CursorCallable cursorCallable = null; + private HTable htable; + private final LinkedList cache = new LinkedList(); + private boolean closed = false; + private static final Log LOG = LogFactory.getLog(CursorCp.class); + + public CursorCp(CursorCallable callable, HTable htable) { + this.cursorCallable = callable; + this.htable = htable; + } + + @Override + public void close(){ + if(this.cursorCallable != null) + this.cursorCallable.setClosed(true); + try { + this.htable.getConnection().getRegionServerWithRetries(this.cursorCallable); + } catch (IOException e) { + e.printStackTrace(); + } + this.closed = true; + + } + + @Override + public Result next() throws IOException { + if(cache.size() ==0 && this.closed) + return null; + if(cache.size() ==0){//do a rpc and fetch results + Result[] res = this.htable.getConnection().getRegionServerWithRetries(cursorCallable); + if(res != null && res.length >0){ + for (Result r: res){ + cache.add(r); + } + } + } + if(cache.size() > 0){ + return cache.poll(); + } + return null; + } + + @Override + public Result[] next(int nbRows) throws IOException { + List res = new ArrayList(); + for (int i = 0; i < nbRows; i++){ + Result r = next(); + if (r != null){ + res.add(r); + }else{ + break; + } + } + return res.toArray(new Result[0]); + } + + /* + * Iterator impl for the cursor (similar to scanner) + * I haven't used this iterator in the test method yet. + * (non-Javadoc) + * @see java.lang.Iterable#iterator() + */ + @Override + public Iterator iterator() { + // TODO Auto-generated method stub + return new Iterator() { + Result res; + @Override + public Result next() { + if(!hasNext()){ + return null; // done + } + Result tmp = res; //res has the latest row. + res = null; + return tmp; + } + + @Override + public boolean hasNext() { + if(res == null){ + try { + res = CursorCp.this.next(); + return res != null; + } catch (IOException e) { + LOG.error("Exception while getting next element"+ e.getMessage()); + throw new RuntimeException(e); + } + } + return true; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } +} Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1088894) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -1002,7 +1002,7 @@ for(int tries = 0; tries < numRetries; tries++) { try { callable.instantiateServer(tries != 0); - return callable.call(); + return callable.call(); // HV: this is giving the null pointer error. } catch (Throwable t) { t = translateException(t); exceptions.add(t); @@ -1127,17 +1127,18 @@ Map> futures = new TreeMap>(Bytes.BYTES_COMPARATOR); for (final byte[] r : rows) { - final ExecRPCInvoker invoker = - new ExecRPCInvoker(conf, this, protocol, tableName, r); + final ExecRPCInvoker invoker = + new ExecRPCInvoker(conf, this, protocol, tableName, r); // r is starting row for each target region: first region, it is the target row Future future = pool.submit( new Callable() { public R call() throws Exception { T instance = (T)Proxy.newProxyInstance(conf.getClassLoader(), new Class[]{protocol}, invoker); - R result = callable.call(instance); + R result = callable.call(instance); // local jvm will call this. byte[] region = invoker.getRegionName(); if (callback != null) { + LOG.debug("RESULT IN HCM: "+result); callback.update(region, r, result); } return result; Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1088894) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -1352,6 +1352,7 @@ this.connection.clearRegionCache(); } + @SuppressWarnings("unchecked") @Override public T coprocessorProxy( Class protocol, byte[] row) { @@ -1398,13 +1399,18 @@ Pair startEndKeys = getStartEndKeys(); byte[][] startKeys = startEndKeys.getFirst(); byte[][] endKeys = startEndKeys.getSecond(); - if (start == null) { start = HConstants.EMPTY_START_ROW; } if (end == null) { end = HConstants.EMPTY_END_ROW; } + /** + * A check in case start row > end row (and both are non null) + */ + if(Bytes.compareTo(start, end) > 0) { + throw new IOException("Start row is larger than Stop row"); + } List rangeKeys = new ArrayList(); for (int i=0; i protocol, byte[] startKey, byte[] endKey, Batch.Call callable, Batch.Callback callback) throws IOException, Throwable; + } Index: src/main/java/org/apache/hadoop/hbase/client/Scan.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Scan.java (revision 1088894) +++ src/main/java/org/apache/hadoop/hbase/client/Scan.java (working copy) @@ -196,7 +196,7 @@ * @return this */ public Scan addColumn(byte [] family, byte [] qualifier) { - NavigableSet set = familyMap.get(family); + NavigableSet set = familyMap.get(family); // it has a mao with a family name to set of wualifier s to be selected. if(set == null) { set = new TreeSet(Bytes.BYTES_COMPARATOR); } Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (revision 0) @@ -0,0 +1,362 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client.coprocessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.AggregateProtocol; +import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +/** + * This client class is for invoking the aggregate functions deployed on the + * Region Server side via the AggregateProtocol. This class will implement the + * supporting functionality for summing/processing the individual results + * obtained from the AggregateProtocol for each region. + *

+ * This will serve as the client side handler for invoking the aggregate + * functions. + *

    + * For all aggregate functions, + *
  • start row < end row is an essential condition (if they are not + * {@link HConstants#EMPTY_BYTE_ARRAY}) + *
  • Column family can't be null. In case where multiple families are + * provided, an IOException will be thrown. An optional column qualifier can + * also be defined. + *
  • For methods to find maximum, minimum, sum, rowcount, it returns the + * parameter type. For average and std, it returns a double value. For row + * count, it returns a long value. + */ +public class AggregationClient { + + private static final Log log = LogFactory.getLog(AggregationClient.class); + Configuration conf; + + /** + * Constructor with Conf object + * @param cfg + */ + public AggregationClient(Configuration cfg) { + this.conf = cfg; + } + + /** + * It gives the maximum value of a column for a given column family for the + * given range. In case qualifier is null, a max of all values for the given + * family is returned. + * @param tableName + * @param ci + * @param scan + * @return max val + * @throws Throwable + * The caller is supposed to handle the exception as they are thrown + * & propagated to it. + */ + public R max(final byte[] tableName, final ColumnInterpreter ci, + final Scan scan) throws Throwable { + validateParameters(scan); + HTable table = new HTable(conf, tableName); + + class MaxCallBack implements Batch.Callback { + R max = null; + + R getMax() { + return max; + } + + @Override + public void update(byte[] region, byte[] row, R result) { + max = ci.compare(max, result) < 0 ? result : max; + } + } + MaxCallBack aMaxCallBack = new MaxCallBack(); + table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan + .getStopRow(), new Batch.Call() { + @Override + public R call(AggregateProtocol instance) throws IOException { + return instance.getMax(ci, scan); + } + }, aMaxCallBack); + return aMaxCallBack.getMax(); + } + + private void validateParameters(Scan scan) throws IOException { + if (scan == null || + (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes + .equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))){ + throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow"); + }else if(scan.getFamilyMap().size() != 1) { + throw new IOException("Family can't be null"); + } + } + + /** + * It gives the minimum value of a column for a given column family for the + * given range. In case qualifier is null, a min of all values for the given + * family is returned. + * @param tableName + * @param ci + * @param scan + * @return min val + * @throws Throwable + */ + public R min(final byte[] tableName, final ColumnInterpreter ci, + final Scan scan) throws Throwable { + validateParameters(scan); + class MinCallBack implements Batch.Callback { + + private R min = null; + + public R getMinimum() { + return min; + } + + @Override + public void update(byte[] region, byte[] row, R result) { + min = (min == null || ci.compare(result, min) < 0) ? result : min; + } + } + HTable table = new HTable(conf, tableName); + MinCallBack minCallBack = new MinCallBack(); + table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan + .getStopRow(), new Batch.Call() { + + @Override + public R call(AggregateProtocol instance) throws IOException { + return instance.getMin(ci, scan); + } + }, minCallBack); + log.debug("Min fom all regions is: " + minCallBack.getMinimum()); + return minCallBack.getMinimum(); + } + + /** + * It gives the row count, by summing up the individual results obtained from + * regions. In case the qualifier is null, FirstKEyValueFilter is used to + * optimised the operation. In case qualifier is provided, I can't use the + * filter as it may set the flag to skip to next row, but the value read is + * not of the given filter: in this case, this particular row will not be + * counted ==> an error. + * @param tableName + * @param ci + * @param scan + * @return + * @throws Throwable + */ + public long rowCount(final byte[] tableName, + final ColumnInterpreter ci, final Scan scan) throws Throwable { + validateParameters(scan); + class RowNumCallback implements Batch.Callback { + private long rowCountL = 0l; + + public long getRowNumCount() { + return rowCountL; + } + + @Override + public void update(byte[] region, byte[] row, Long result) { + rowCountL += result.longValue(); + } + } + RowNumCallback rowNum = new RowNumCallback(); + HTable table = new HTable(conf, tableName); + table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan + .getStopRow(), new Batch.Call() { + @Override + public Long call(AggregateProtocol instance) throws IOException { + return instance.getRowNum(ci, scan); + } + }, rowNum); + return rowNum.getRowNumCount(); + } + + /** + * It sums up the value returned from various regions. In case qualifier is + * null, summation of all the column qualifiers in the given family is done. + * @param tableName + * @param ci + * @param scan + * @return sum + * @throws Throwable + */ + public S sum(final byte[] tableName, final ColumnInterpreter ci, + final Scan scan) throws Throwable { + validateParameters(scan); + class SumCallBack implements Batch.Callback { + S sumVal = null; + + public S getSumResult() { + return sumVal; + } + + @Override + public void update(byte[] region, byte[] row, S result) { + sumVal = ci.add(sumVal, result); + } + } + SumCallBack sumCallBack = new SumCallBack(); + HTable table = new HTable(conf, tableName); + table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan + .getStopRow(), new Batch.Call() { + @Override + public S call(AggregateProtocol instance) throws IOException { + return instance.getSum(ci, scan); + } + }, sumCallBack); + return sumCallBack.getSumResult(); + } + + /** + * It computes average while fetching sum and row count from all the + * corresponding regions. Approach is to compute a global sum of region level + * sum and rowcount and then compute the average. + * @param tableName + * @param scan + * @throws Throwable + */ + private Pair getAvgArgs(final byte[] tableName, + final ColumnInterpreter ci, final Scan scan) throws Throwable { + validateParameters(scan); + class AvgCallBack implements Batch.Callback> { + S sum = null; + Long rowCount = 0l; + + public Pair getAvgArgs() { + return new Pair(sum, rowCount); + } + + @Override + public void update(byte[] region, byte[] row, Pair result) { + sum = ci.add(sum, result.getFirst()); + rowCount += result.getSecond(); + } + } + AvgCallBack avgCallBack = new AvgCallBack(); + HTable table = new HTable(conf, tableName); + table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan + .getStopRow(), new Batch.Call>() { + @Override + public Pair call(AggregateProtocol instance) throws IOException { + return instance.getAvg(ci, scan); + } + }, avgCallBack); + return avgCallBack.getAvgArgs(); + } + + /** + * This is the client side interface/handle for calling the average method for + * a given cf-cq combination. It was necessary to add one more call stack as + * its return type should be a decimal value, irrespective of what + * columninterpreter says. So, this methods collects the necessary parameters + * to compute the average and returs the double value. + * @param tableName + * @param ci + * @param scan + * @return + * @throws Throwable + */ + public double avg(final byte[] tableName, + final ColumnInterpreter ci, Scan scan) throws Throwable { + Pair p = getAvgArgs(tableName, ci, scan); + return ci.divideForAvg(p.getFirst(), p.getSecond()); + } + + /** + * It computes a global standard deviation for a given column and its value. + * Standard deviation is square root of (average of squares - + * average*average). From individual regions, it obtains sum, square sum and + * number of rows. With these, the above values are computed to get the global + * std. + * @param tableName + * @param scan + * @return + * @throws Throwable + */ + private Pair, Long> getStdArgs(final byte[] tableName, + final ColumnInterpreter ci, final Scan scan) throws Throwable { + validateParameters(scan); + class StdCallback implements Batch.Callback, Long>> { + long rowCountVal = 0l; + S sumVal = null, sumSqVal = null; + + public Pair, Long> getStdParams() { + List l = new ArrayList(); + l.add(sumVal); + l.add(sumSqVal); + Pair, Long> p = new Pair, Long>(l, rowCountVal); + return p; + } + + @Override + public void update(byte[] region, byte[] row, Pair, Long> result) { + sumVal = ci.add(sumVal, result.getFirst().get(0)); + sumSqVal = ci.add(sumSqVal, result.getFirst().get(1)); + rowCountVal += result.getSecond(); + } + } + StdCallback stdCallback = new StdCallback(); + HTable table = new HTable(conf, tableName); + table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan + .getStopRow(), + new Batch.Call, Long>>() { + @Override + public Pair, Long> call(AggregateProtocol instance) + throws IOException { + return instance.getStd(ci, scan); + } + + }, stdCallback); + return stdCallback.getStdParams(); + } + + /** + * This is the client side interface/handle for calling the std method for a + * given cf-cq combination. It was necessary to add one more call stack as its + * return type should be a decimal value, irrespective of what + * columninterpreter says. So, this methods collects the necessary parameters + * to compute the std and returns the double value. + * @param tableName + * @param ci + * @param scan + * @return + * @throws Throwable + */ + public double std(final byte[] tableName, ColumnInterpreter ci, + Scan scan) throws Throwable { + Pair, Long> p = getStdArgs(tableName, ci, scan); + double res = 0d; + double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond()); + double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond()); + res = avgOfSumSq - (avg) * (avg); // variance + res = Math.pow(res, 0.5); + return res; + } + +} Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java (revision 0) @@ -0,0 +1,106 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.coprocessor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * a concrete column interpreter implementation. The cell value is a Long value + * and its promoted data type is also a Long value. For computing aggregation + * function, this class is used to find the datatype of the cell value. Client + * is supposed to instantiate it and passed along as a parameter. See + * {@link TestAggregateProtocol} methods for its sample usage. + * Its methods handles null arguments gracefully. + */ +public class LongColumnInterpreter implements ColumnInterpreter { + + public Long getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv) + throws IOException { + if (kv == null || kv.getValue().length != Bytes.SIZEOF_LONG) + return null; + return Bytes.toLong(kv.getValue()); + } + + @Override + public Long add(Long l1, Long l2) { + if (l1 == null ^ l2 == null) { + return (l1 == null) ? l2 : l1; // either of one is null. + } else if (l1 == null) // both are null + return null; + return l1 + l2; + } + + @Override + public int compare(final Long l1, final Long l2) { + if (l1 == null ^ l2 == null) { + return l1 == null ? -1 : 1; // either of one is null. + } else if (l1 == null) + return 0; // both are null + return l1.compareTo(l2); // natural ordering. + } + + @Override + public Long getMaxValue() { + return Long.MAX_VALUE; + } + + @Override + public Long increment(Long o) { + return o == null ? null : (o + 1l); + } + + @Override + public Long multiply(Long l1, Long l2) { + return (l1 == null || l2 == null) ? null : l1 * l2; + } + + @Override + public Long getMinValue() { + return Long.MIN_VALUE; + } + + @Override + public void readFields(DataInput arg0) throws IOException { + // nothing to serialize + } + + @Override + public void write(DataOutput arg0) throws IOException { + // nothing to serialize + } + + @Override + public double divideForAvg(Long l1, Long l2) { + return (l2 == null || l1 == null) ? Double.NaN : (l1.doubleValue() / l2 + .doubleValue()); + } + + @Override + public Long castToReturnType(Long o) { + return o; + } + +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java (revision 0) @@ -0,0 +1,224 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Pair; + +/** + * A concrete AggregateProtocol implementation. Its system level coprocessor + * that computes the aggregate function at a region level. + */ +public class AggregateImplementation extends BaseEndpointCoprocessor implements + AggregateProtocol { + protected static Log log = LogFactory.getLog(AggregateImplementation.class); + + @Override + public T getMax(ColumnInterpreter ci, Scan scan) + throws IOException { + T temp; + T max = null; + InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()) + .getRegion().getScanner(scan); + List results = new ArrayList(); + byte[] colFamily = scan.getFamilies()[0]; + byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst(); + // qualifier can be null. + try { + boolean hasMoreRows = false; + do { + hasMoreRows = scanner.next(results); + for (KeyValue kv : results) { + temp = ci.getValue(colFamily, qualifier, kv); + max = (max == null || ci.compare(temp, max) > 0) ? temp : max; + } + results.clear(); + } while (hasMoreRows); + } finally { + scanner.close(); + } + log.info("Maximum from this region is " + + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion() + .getRegionNameAsString() + ": " + max); + return max; + } + + @Override + public T getMin(ColumnInterpreter ci, Scan scan) + throws IOException { + T min = null; + T temp; + InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()) + .getRegion().getScanner(scan); + List results = new ArrayList(); + byte[] colFamily = scan.getFamilies()[0]; + byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst(); + try { + boolean hasMoreRows = false; + do { + hasMoreRows = scanner.next(results); + for (KeyValue kv : results) { + temp = ci.getValue(colFamily, qualifier, kv); + min = (min == null || ci.compare(temp, min) < 0) ? temp : min; + } + results.clear(); + } while (hasMoreRows); + } finally { + scanner.close(); + } + log.info("Minimum from this region is " + + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion() + .getRegionNameAsString() + ": " + min); + return min; + } + + @Override + public S getSum(ColumnInterpreter ci, Scan scan) + throws IOException { + long sum = 0l; + S sumVal = null; + T temp; + InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()) + .getRegion().getScanner(scan); + byte[] colFamily = scan.getFamilies()[0]; + byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst(); + List results = new ArrayList(); + try { + boolean hasMoreRows = false; + do { + hasMoreRows = scanner.next(results); + for (KeyValue kv : results) { + temp = ci.getValue(colFamily, qualifier, kv); + if (temp != null) + sumVal = ci.add(sumVal, ci.castToReturnType(temp)); + } + results.clear(); + } while (hasMoreRows); + } finally { + scanner.close(); + } + log.debug("Sum from this region is " + + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion() + .getRegionNameAsString() + ": " + sum); + return sumVal; + } + + @Override + public long getRowNum(ColumnInterpreter ci, Scan scan) + throws IOException { + long counter = 0l; + List results = new ArrayList(); + byte[] colFamily = scan.getFamilies()[0]; + byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst(); + if (scan.getFilter() == null && qualifier == null) + scan.setFilter(new FirstKeyOnlyFilter()); + InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()) + .getRegion().getScanner(scan); + try { + boolean hasMoreRows = false; + do { + hasMoreRows = scanner.next(results); + if (results.size() > 0) { + counter++; + } + results.clear(); + } while (hasMoreRows); + } finally { + scanner.close(); + } + log.info("Row counter from this region is " + + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion() + .getRegionNameAsString() + ": " + counter); + return counter; + } + + @Override + public Pair getAvg(ColumnInterpreter ci, Scan scan) + throws IOException { + S sumVal = null; + Long rowCountVal = 0l; + InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()) + .getRegion().getScanner(scan); + byte[] colFamily = scan.getFamilies()[0]; + byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst(); + List results = new ArrayList(); + boolean hasMoreRows = false; + try { + do { + results.clear(); + hasMoreRows = scanner.next(results); + for (KeyValue kv : results) { + sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily, + qualifier, kv))); + } + rowCountVal++; + } while (hasMoreRows); + } finally { + scanner.close(); + } + Pair pair = new Pair(sumVal, rowCountVal); + return pair; + } + + @Override + public Pair, Long> getStd(ColumnInterpreter ci, Scan scan) + throws IOException { + S sumVal = null, sumSqVal = null, tempVal = null; + long rowCountVal = 0l; + InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()) + .getRegion().getScanner(scan); + byte[] colFamily = scan.getFamilies()[0]; + byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst(); + List results = new ArrayList(); + + boolean hasMoreRows = false; + try { + do { + tempVal = null; + hasMoreRows = scanner.next(results); + for (KeyValue kv : results) { + tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily, + qualifier, kv))); + } + results.clear(); + sumVal = ci.add(sumVal, tempVal); + sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal)); + rowCountVal++; + } while (hasMoreRows); + } finally { + scanner.close(); + } + List l = new ArrayList(); + l.add(sumVal); + l.add(sumSqVal); + Pair, Long> p = new Pair, Long>(l, rowCountVal); + return p; + } + +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java (revision 0) @@ -0,0 +1,129 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.util.Pair; + +/** + * Defines the aggregation functions that are to be supported in this + * Coprocessor. For each method, it takes a Scan object and a columnInterpreter. + * The scan object should have a column family (else an exception will be + * thrown), and an optional column qualifier. In the current implementation + * {@link AggregateImplementation}, only one column family and column qualifier + * combination is served. In case there are more than one, only first one will + * be picked. Refer to {@link AggregationClient} for some general conditions on + * input parameters. + */ +public interface AggregateProtocol extends CoprocessorProtocol { + + /** + * Gives the maximum for a given combination of column qualifier and column + * family, in the given row range as defined in the Scan object. In its + * current implementation, it takes one column family and one column qualifier + * (if provided). In case of null column qualifier, maximum value for the + * entire column family will be returned. + * @param ci + * @param scan + * @return max value as mentioned above + * @throws IOException + */ + T getMax(ColumnInterpreter ci, Scan scan) throws IOException; + + /** + * Gives the minimum for a given combination of column qualifier and column + * family, in the given row range as defined in the Scan object. In its + * current implementation, it takes one column family and one column qualifier + * (if provided). In case of null column qualifier, minimum value for the + * entire column family will be returned. + * @param ci + * @param scan + * @return min as mentioned above + * @throws IOException + */ + T getMin(ColumnInterpreter ci, Scan scan) throws IOException; + + /** + * Gives the sum for a given combination of column qualifier and column + * family, in the given row range as defined in the Scan object. In its + * current implementation, it takes one column family and one column qualifier + * (if provided). In case of null column qualifier, sum for the entire column + * family will be returned. + * @param ci + * @param scan + * @return sum of values as defined by the column interpreter + * @throws IOException + */ + S getSum(ColumnInterpreter ci, Scan scan) throws IOException; + + /** + * Gives the row count for the given column family and column qualifier, in + * the given row range as defined in the Scan object. + * @param ci + * @param scan + * @return + * @throws IOException + */ + long getRowNum(ColumnInterpreter ci, Scan scan) + throws IOException; + + /** + * Gives a Pair with first object as Sum and second object as row count, + * computed for a given combination of column qualifier and column family in + * the given row range as defined in the Scan object. In its current + * implementation, it takes one column family and one column qualifier (if + * provided). In case of null column qualifier, an aggregate sum over all the + * entire column family will be returned. + *

    + * The average is computed in + * {@link AggregationClient#avg(byte[], ColumnInterpreter, Scan)} by + * processing results from all regions, so its "ok" to pass sum and a Long + * type. + * @param ci + * @param scan + * @return + * @throws IOException + */ + Pair getAvg(ColumnInterpreter ci, Scan scan) + throws IOException; + + /** + * Gives a Pair with first object a List containing Sum and sum of squares, + * and the second object as row count. It is computed for a given combination of + * column qualifier and column family in the given row range as defined in the + * Scan object. In its current implementation, it takes one column family and + * one column qualifier (if provided). The idea is get the value of variance first: + * the average of the squares less the square of the average a standard + * deviation is square root of variance. + * @param ci + * @param scan + * @return + * @throws IOException + */ + Pair, Long> getStd(ColumnInterpreter ci, Scan scan) + throws IOException; + +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java (revision 0) @@ -0,0 +1,118 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; +import org.apache.hadoop.io.Writable; + +/** + * Defines how value for specific column is interpreted and provides utility + * methods like compare, add, multiply etc for them. Takes column family, column + * qualifier and return the cell value. Its concrete implementation should + * handle null case gracefully. Refer to {@link LongColumnInterpreter} for an + * example. + *

    + * Takes two generic parameters. The cell value type of the interpreter is . + * During some computations like sum, average, the return type can be different + * than the cell value data type, for eg, sum of int cell values might overflow + * in case of a int result, we should use Long for its result. Therefore, this + * class mandates to use a different (promoted) data type for result of these + * computations . All computations are performed on the promoted data type + * . There is a conversion method + * {@link ColumnInterpreter#castToReturnType(Object)} which takes a type and + * returns a type. + * @param : T - cell value data type, S - promoted data type + */ +public interface ColumnInterpreter extends Writable { + + /** + * @param colFamily + * @param colQualifier + * @param value + * @return value of type T + * @throws IOException + */ + T getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv) + throws IOException; + + /** + * returns sum or non null value among (if either of them is null); otherwise + * returns a null. + * @param l1 + * @param l2 + * @return + */ + public S add(S l1, S l2); + + /** + * returns the maximum value for this type T + * @return + */ + + T getMaxValue(); + + /** + * @return + */ + + T getMinValue(); + + /** + * @param o1 + * @param o2 + * @return + */ + S multiply(S o1, S o2); + + /** + * @param o + * @return + */ + S increment(S o); + + /** + * provides casting opportunity between the data types. + * @param o + * @return + */ + S castToReturnType(T o); + + /** + * This takes care if either of arguments are null. returns 0 if they are + * equal or both are null; + *

      + *
    • >0 if l1 > l2 or l1 is not null and l2 is null. + *
    • < 0 if l1 < l2 or l1 is null and l2 is not null. + */ + int compare(final T l1, final T l2); + + /** + * used for computing average of data values. Not providing the divide + * method that takes two values as it si not needed as of now. + * @param o + * @param l + * @return + */ + double divideForAvg(S o, Long l); +} \ No newline at end of file Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (revision 0) @@ -0,0 +1,786 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; +import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * A test class to cover aggregate functions, that can be implemented using + * Coprocessors. + */ +public class TestAggregateProtocol { + protected static Log myLog = LogFactory.getLog(TestAggregateProtocol.class); + + /** + * Creating the test infrastructure. + */ + private static final byte[] TEST_TABLE = Bytes.toBytes("TestTable"); + private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily"); + private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier"); + private static final byte[] TEST_MULTI_CQ = Bytes.toBytes("TestMultiCQ"); + + private static byte[] ROW = Bytes.toBytes("testRow"); + private static final int ROWSIZE = 20; + private static final int rowSeperator1 = 5; + private static final int rowSeperator2 = 12; + private static byte[][] ROWS = makeN(ROW, ROWSIZE); + + private static HBaseTestingUtility util = new HBaseTestingUtility(); + private static MiniHBaseCluster cluster = null; + private static Configuration conf = util.getConfiguration(); + + /** + * A set up method to start the test cluster. AggregateProtocolImpl is + * registered and will be loaded during region startup. + * @throws Exception + */ + @BeforeClass + public static void setupBeforeClass() throws Exception { + + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.coprocessor.AggregateImplementation"); + + util.startMiniCluster(2); + cluster = util.getMiniHBaseCluster(); + HTable table = util.createTable(TEST_TABLE, TEST_FAMILY); + util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY, + new byte[][] { HConstants.EMPTY_BYTE_ARRAY, ROWS[rowSeperator1], + ROWS[rowSeperator2] }); + /** + * The testtable has one CQ which is always populated and one variable CQ + * for each row rowkey1: CF:CQ CF:CQ1 rowKey2: CF:CQ CF:CQ2 + */ + for (int i = 0; i < ROWSIZE; i++) { + Put put = new Put(ROWS[i]); + Long l = new Long(i); + put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(l)); + table.put(put); + Put p2 = new Put(ROWS[i]); + p2.add(TEST_FAMILY, Bytes.add(TEST_MULTI_CQ, Bytes.toBytes(l)), Bytes + .toBytes(l * 10)); + table.put(p2); + } + } + + /** + * Shutting down the cluster + * @throws Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + /** + * an infrastructure method to prepare rows for the testtable. + * @param base + * @param n + * @return + */ + private static byte[][] makeN(byte[] base, int n) { + byte[][] ret = new byte[n][]; + for (int i = 0; i < n; i++) { + ret[i] = Bytes.add(base, Bytes.toBytes(i)); + } + return ret; + } + + /** + * **************************** ROW COUNT Test cases ******************* + */ + + /** + * This will test rowcount with a valid range, i.e., a subset of rows. It will + * be the most common use case. + * @throws Throwable + */ + @Test + public void testRowCountWithValidRange() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); + scan.setStartRow(ROWS[2]); + scan.setStopRow(ROWS[14]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long rowCount = aClient.rowCount(TEST_TABLE, ci, scan); + assertEquals(12, rowCount); + } + + /** + * This will test the row count on the entire table. Startrow and endrow will + * be null. + * @throws Throwable + */ + @Test + public void testRowCountAllTable() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long rowCount = aClient.rowCount(TEST_TABLE, ci, + scan); + assertEquals(ROWSIZE, rowCount); + } + + /** + * This will test the row count with startrow > endrow. The result should be + * -1. + * @throws Throwable + */ + @Test + public void testRowCountWithInvalidRange1() { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); + scan.setStartRow(ROWS[5]); + scan.setStopRow(ROWS[2]); + + final ColumnInterpreter ci = new LongColumnInterpreter(); + long rowCount = -1; + try { + rowCount = aClient.rowCount(TEST_TABLE, ci, scan); + } catch (Throwable e) { + myLog.error("Exception thrown in the invalidRange method" + + e.getStackTrace()); + } + assertEquals(-1, rowCount); + } + + /** + * This will test the row count with startrow = endrow and they will be + * non-null. The result should be 0, as it assumes a non-get query. + * @throws Throwable + */ + @Test + public void testRowCountWithInvalidRange2() { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); + scan.setStartRow(ROWS[5]); + scan.setStopRow(ROWS[5]); + + final ColumnInterpreter ci = new LongColumnInterpreter(); + long rowCount = -1; + try { + rowCount = aClient.rowCount(TEST_TABLE, ci, scan); + } catch (Throwable e) { + rowCount = 0; + } + assertEquals(0, rowCount); + } + + /** + * This should return a 0 + */ + @Test + public void testRowCountWithNullCF() { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.setStartRow(ROWS[5]); + scan.setStopRow(ROWS[15]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long rowCount = -1; + try { + rowCount = aClient.rowCount(TEST_TABLE, ci, scan); + } catch (Throwable e) { + rowCount = 0; + } + assertEquals(0, rowCount); + } + + @Test + public void testRowCountWithNullCQ() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long rowCount = aClient.rowCount(TEST_TABLE, ci, + scan); + assertEquals(20, rowCount); + } + + @Test + public void testRowCountWithPrefixFilter() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Filter f = new PrefixFilter(Bytes.toBytes("foo:bar")); + scan.setFilter(f); + long rowCount = aClient.rowCount(TEST_TABLE, ci, + scan); + assertEquals(0, rowCount); + } + + /** + * ***************Test cases for Maximum ******************* + */ + + /** + * give max for the entire table. + * @throws Throwable + */ + @Test + public void testMaxWithValidRange() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long maximum = aClient.max(TEST_TABLE, ci, scan); + assertEquals(19, maximum); + } + + /** + * @throws Throwable + */ + @Test + public void testMaxWithValidRange2() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); + scan.setStartRow(ROWS[5]); + scan.setStopRow(ROWS[15]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long max = aClient.max(TEST_TABLE, ci, scan); + assertEquals(14, max); + } + + @Test + public void testMaxWithValidRangeWithNoCQ() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long maximum = aClient.max(TEST_TABLE, ci, scan); + assertEquals(190, maximum); + } + + @Test + public void testMaxWithValidRange2WithNoCQ() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + scan.setStartRow(ROWS[6]); + scan.setStopRow(ROWS[7]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long max = aClient.max(TEST_TABLE, ci, scan); + assertEquals(60, max); + } + + @Test + public void testMaxWithValidRangeWithNullCF() { + AggregationClient aClient = new AggregationClient(conf); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Scan scan = new Scan(); + Long max = null; + try { + max = aClient.max(TEST_TABLE, ci, scan); + } catch (Throwable e) { + max = null; + } + assertEquals(null, max);// CP will throw an IOException about the + // null column family, and max will be set to 0 + } + + @Test + public void testMaxWithInvalidRange() { + AggregationClient aClient = new AggregationClient(conf); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Scan scan = new Scan(); + scan.setStartRow(ROWS[4]); + scan.setStopRow(ROWS[2]); + scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); + long max = Long.MIN_VALUE; + try { + max = aClient.max(TEST_TABLE, ci, scan); + } catch (Throwable e) { + max = 0; + } + assertEquals(0, max);// control should go to the catch block + } + + @Test + public void testMaxWithInvalidRange2() throws Throwable { + long max = Long.MIN_VALUE; + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); + scan.setStartRow(ROWS[4]); + scan.setStopRow(ROWS[4]); + try { + AggregationClient aClient = new AggregationClient(conf); + final ColumnInterpreter ci = new LongColumnInterpreter(); + max = aClient.max(TEST_TABLE, ci, scan); + } catch (Exception e) { + max = 0; + } + assertEquals(0, max);// control should go to the catch block + } + + @Test + public void testMaxWithFilter() throws Throwable { + Long max = 0l; + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); + Filter f = new PrefixFilter(Bytes.toBytes("foo:bar")); + scan.setFilter(f); + final ColumnInterpreter ci = new LongColumnInterpreter(); + max = aClient.max(TEST_TABLE, ci, scan); + assertEquals(null, max); + } + + /** + * **************************Test cases for Minimum *********************** + */ + + /** + * @throws Throwable + */ + @Test + public void testMinWithValidRange() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); + scan.setStartRow(HConstants.EMPTY_START_ROW); + scan.setStopRow(HConstants.EMPTY_END_ROW); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Long min = aClient.min(TEST_TABLE, ci, + scan); + assertEquals(0l, min.longValue()); + } + + /** + * @throws Throwable + */ + @Test + public void testMinWithValidRange2() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); + scan.setStartRow(ROWS[5]); + scan.setStopRow(ROWS[15]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long min = aClient.min(TEST_TABLE, ci, scan); + assertEquals(5, min); + } + + @Test + public void testMinWithValidRangeWithNoCQ() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + scan.setStartRow(HConstants.EMPTY_START_ROW); + scan.setStopRow(HConstants.EMPTY_END_ROW); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long min = aClient.min(TEST_TABLE, ci, + scan); + assertEquals(0, min); + } + + @Test + public void testMinWithValidRange2WithNoCQ() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + scan.setStartRow(ROWS[6]); + scan.setStopRow(ROWS[7]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long min = aClient.min(TEST_TABLE, ci, scan); + assertEquals(6, min); + } + + @Test + public void testMinWithValidRangeWithNullCF() { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.setStartRow(ROWS[5]); + scan.setStopRow(ROWS[15]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Long min = null; + try { + min = aClient.min(TEST_TABLE, ci, scan); + } catch (Throwable e) { + } + assertEquals(null, min);// CP will throw an IOException about the + // null column family, and max will be set to 0 + } + + @Test + public void testMinWithInvalidRange() { + AggregationClient aClient = new AggregationClient(conf); + Long min = null; + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + scan.setStartRow(ROWS[4]); + scan.setStopRow(ROWS[2]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + try { + min = aClient.min(TEST_TABLE, ci, scan); + } catch (Throwable e) { + } + assertEquals(null, min);// control should go to the catch block + } + + @Test + public void testMinWithInvalidRange2() { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + scan.setStartRow(ROWS[6]); + scan.setStopRow(ROWS[6]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Long min = null; + try { + min = aClient.min(TEST_TABLE, ci, scan); + } catch (Throwable e) { + } + assertEquals(null, min);// control should go to the catch block + } + + @Test + public void testMinWithFilter() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); + Filter f = new PrefixFilter(Bytes.toBytes("foo:bar")); + scan.setFilter(f); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Long min = null; + min = aClient.min(TEST_TABLE, ci, scan); + assertEquals(null, min); + } + + /** + * *************** Test cases for Sum ********************* + */ + /** + * @throws Throwable + */ + @Test + public void testSumWithValidRange() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY,TEST_QUALIFIER); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long sum = aClient.sum(TEST_TABLE, ci, + scan); + assertEquals(190, sum); + } + + /** + * @throws Throwable + */ + @Test + public void testSumWithValidRange2() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY,TEST_QUALIFIER); + scan.setStartRow(ROWS[5]); + scan.setStopRow(ROWS[15]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long sum = aClient.sum(TEST_TABLE, ci, scan); + assertEquals(95, sum); + } + + @Test + public void testSumWithValidRangeWithNoCQ() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long sum = aClient.sum(TEST_TABLE, ci, + scan); + assertEquals(190 + 1900, sum); + } + + @Test + public void testSumWithValidRange2WithNoCQ() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + scan.setStartRow(ROWS[6]); + scan.setStopRow(ROWS[7]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + long sum = aClient.sum(TEST_TABLE, ci, scan); + assertEquals(6 + 60, sum); + } + + @Test + public void testSumWithValidRangeWithNullCF() { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.setStartRow(ROWS[6]); + scan.setStopRow(ROWS[7]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Long sum = null; + try { + sum = aClient.sum(TEST_TABLE, ci, scan); + } catch (Throwable e) { + } + assertEquals(null, sum);// CP will throw an IOException about the + // null column family, and max will be set to 0 + } + + @Test + public void testSumWithInvalidRange() { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + scan.setStartRow(ROWS[6]); + scan.setStopRow(ROWS[2]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Long sum = null; + try { + sum = aClient.sum(TEST_TABLE, ci, scan); + } catch (Throwable e) { + } + assertEquals(null, sum);// control should go to the catch block + } + + @Test + public void testSumWithFilter() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Filter f = new PrefixFilter(Bytes.toBytes("foo:bar")); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + scan.setFilter(f); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Long sum = null; + sum = aClient.sum(TEST_TABLE, ci, scan); + assertEquals(null, sum); + } + + /** + * ****************************** Test Cases for Avg ************** + */ + /** + * @throws Throwable + */ + @Test + public void testAvgWithValidRange() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY,TEST_QUALIFIER); + final ColumnInterpreter ci = new LongColumnInterpreter(); + double avg = aClient.avg(TEST_TABLE, ci, + scan); + assertEquals(9.5, avg, 0); + } + + /** + * @throws Throwable + */ + @Test + public void testAvgWithValidRange2() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY,TEST_QUALIFIER); + scan.setStartRow(ROWS[5]); + scan.setStopRow(ROWS[15]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + double avg = aClient.avg(TEST_TABLE, ci, scan); + assertEquals(9.5, avg, 0); + } + + @Test + public void testAvgWithValidRangeWithNoCQ() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + final ColumnInterpreter ci = new LongColumnInterpreter(); + double avg = aClient.avg(TEST_TABLE, ci, + scan); + assertEquals(104.5, avg, 0); + } + + @Test + public void testAvgWithValidRange2WithNoCQ() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + scan.setStartRow(ROWS[6]); + scan.setStopRow(ROWS[7]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + double avg = aClient.avg(TEST_TABLE, ci, scan); + assertEquals(6 + 60, avg, 0); + } + + @Test + public void testAvgWithValidRangeWithNullCF() { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Double avg = null; + try { + avg = aClient.avg(TEST_TABLE, ci, scan); + } catch (Throwable e) { + } + assertEquals(null, avg);// CP will throw an IOException about the + // null column family, and max will be set to 0 + } + + @Test + public void testAvgWithInvalidRange() { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY,TEST_QUALIFIER); + scan.setStartRow(ROWS[5]); + scan.setStopRow(ROWS[1]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Double avg = null; + try { + avg = aClient.avg(TEST_TABLE, ci, scan); + } catch (Throwable e) { + } + assertEquals(null, avg);// control should go to the catch block + } + + @Test + public void testAvgWithFilter() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY,TEST_QUALIFIER); + Filter f = new PrefixFilter(Bytes.toBytes("foo:bar")); + scan.setFilter(f); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Double avg = null; + avg = aClient.avg(TEST_TABLE, ci, scan); + assertEquals(Double.NaN, avg, 0); + } + + /** + * ****************** Test cases for STD ********************** + */ + /** + * @throws Throwable + */ + @Test + public void testStdWithValidRange() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY,TEST_QUALIFIER); + final ColumnInterpreter ci = new LongColumnInterpreter(); + double std = aClient.std(TEST_TABLE, ci, + scan); + assertEquals(5.766, std, 0.05d); + } + + /** + * @throws Throwable + */ + @Test + public void testStdWithValidRange2() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY,TEST_QUALIFIER); + scan.setStartRow(ROWS[5]); + scan.setStopRow(ROWS[15]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + double std = aClient.std(TEST_TABLE, ci, scan); + assertEquals(2.87, std, 0.05d); + } + + @Test + public void testStdWithValidRangeWithNoCQ() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + final ColumnInterpreter ci = new LongColumnInterpreter(); + double std = aClient.std(TEST_TABLE, ci, + scan); + assertEquals(63.42, std, 0.05d); + } + + @Test + public void testStdWithValidRange2WithNoCQ() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + scan.setStartRow(ROWS[6]); + scan.setStopRow(ROWS[7]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + double std = aClient.std(TEST_TABLE, ci, scan); + assertEquals(0, std, 0); + } + + @Test + public void testStdWithValidRangeWithNullCF() { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.setStartRow(ROWS[6]); + scan.setStopRow(ROWS[17]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Double std = null; + try { + std = aClient.std(TEST_TABLE, ci, scan); + } catch (Throwable e) { + } + assertEquals(null, std);// CP will throw an IOException about the + // null column family, and max will be set to 0 + } + + @Test + public void testStdWithInvalidRange() { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + scan.setStartRow(ROWS[6]); + scan.setStopRow(ROWS[1]); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Double std = null; + try { + std = aClient.std(TEST_TABLE, ci, scan); + } catch (Throwable e) { + } + assertEquals(null, std);// control should go to the catch block + } + + @Test + public void testStdWithFilter() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Filter f = new PrefixFilter(Bytes.toBytes("foo:bar")); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + scan.setFilter(f); + final ColumnInterpreter ci = new LongColumnInterpreter(); + Double std = null; + std = aClient.std(TEST_TABLE, ci, scan); + assertEquals(Double.NaN, std, 0); + } +}