### Eclipse Workspace Patch 1.0 #P HBaseTrunkMyPatch Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggFunctions.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggFunctions.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggFunctions.java (revision 0) @@ -0,0 +1,139 @@ +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; + +import org.junit.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + A test class to cover agg functions, that can be implemented using Coprocessors. + + */ +public class TestAggFunctions { + protected static Log myLog = LogFactory.getLog(TestAggFunctions.class); + + /** + * Creating the test infrastructure. + */ + private static final byte[] TEST_TABLE = Bytes.toBytes("TestTable"); + private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily"); + private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier"); + private static byte [] ROW = Bytes.toBytes("testRow"); + private static final int ROWSIZE = 20; + private static final int rowSeperator1 = 5; + private static final int rowSeperator2 = 12; + private static byte [][] ROWS = makeN(ROW, ROWSIZE); + + private static HBaseTestingUtility util = new HBaseTestingUtility(); + private static MiniHBaseCluster cluster = null; + + /** + * A set up method to start the test cluster. AggregateProtocolImpl is registered and will be loaded during region startup. + * @throws Exception + */ + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration conf = util.getConfiguration(); + conf.set("hbase.coprocessor.default.classes", + "org.apache.hadoop.hbase.coprocessor.AggregateProtocolImpl"); + + util.startMiniCluster(2); + cluster = util.getMiniHBaseCluster(); + HTable table = util.createTable(TEST_TABLE, TEST_FAMILY); + util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY, + new byte[][]{ HConstants.EMPTY_BYTE_ARRAY, ROWS[rowSeperator1], ROWS[rowSeperator2]}); + + for(int i = 0; i < ROWSIZE; i++) { + Put put = new Put(ROWS[i]); + Long l = new Long(i); + put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(l)); + table.put(put); + } + + // sleep here is an ugly hack to allow region transitions to finish + Thread.sleep(5000); + for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) { + for (HRegionInfo r : t.getRegionServer().getOnlineRegions()) { + t.getRegionServer().getOnlineRegion(r.getRegionName()). + getCoprocessorHost(). + load(AggregateProtocolImpl.class, //TestAggFunctions.AggFunctionsCT.class, + Coprocessor.Priority.USER); + + } + } + } + /** + * Shutting down the cluster + * @throws Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + /** + * an infrastructure method to prepare rows for the testtable. + * @param base + * @param n + * @return + */ + private static byte [][] makeN(byte [] base, int n) { + byte [][] ret = new byte[n][]; + for(int i=0;i results = new ArrayList(); + try { + boolean done = false; + do{ + results.clear(); + done = scanner.next(results); + KeyValue val = results.get(0); + temp = Bytes.toLong(val.getValue()); + log.debug("val read in the region is: "+temp); + if(temp > maximum){ + maximum = temp; + } + } + while(done); + scanner.close(); + } catch (IOException ie){ + log.error("Some error occurred. Aborting the computation"+ ie.getMessage()); + throw new IOException("Aborting the Max aggregate computation"); + } + log.debug("Maximum from this region is "+getEnvironment().getRegion().hashCode()+": "+maximum); + return maximum; + } + + /** + * For a given column family and column Qualifier for a table, it gives the minimum at the region level. + */ + @Override + public long getMin(byte[] colFamily, byte[] colQualifier) throws IOException{ + Long min = Long.MAX_VALUE; + Long temp = 0l; + InternalScanner scanner = getScanWithColAndQualifier(colFamily, colQualifier); + List results = new ArrayList(); + try { + boolean done = false; + do{ + results.clear(); + done = scanner.next(results); + KeyValue val = results.get(0); + temp = Bytes.toLong(val.getValue()); + if(temp < min){ + min = temp; + } + }while(done); + scanner.close(); + }catch (IOException ie){ + log.error("Some error occurred. Aborting the computation"+ ie.getMessage()); + throw new IOException("Aborting the Min aggregate computation"); + } + log.debug("Minimum from this region is "+getEnvironment().getRegion().hashCode()+": "+min); + return min.longValue(); + } + + /** + * For a given column family and column Qualifier for a table, it gives its sum of all its values at the region level. + */ + + @Override + public long getSum(byte[] colFamily, byte[] colQualifier) throws IOException{ + Long sum = 0l; + InternalScanner scanner = getScanWithColAndQualifier(colFamily, colQualifier); + List results = new ArrayList(); + try { + boolean done = false; + do{ + results.clear(); + done = scanner.next(results); + if(results.get(0)== null) + continue; + KeyValue val = results.get(0); + sum += Bytes.toLong(val.getValue()); + }while(done); + scanner.close(); + }catch (IOException ie){ + log.error("Some error occurred. Aborting the computation"+ ie.getMessage()); + throw new IOException("Aborting the Sum aggregate computation"); + } + log.debug("Sum from this region is "+getEnvironment().getRegion().hashCode()+": "+sum); + return sum; + } + + /** + * For a given column family and column Qualifier for a table, it gives the number of rows at the region level. + */ + + @Override + public long getRowNum(byte[] colFamily, byte[] colQualifier) throws IOException{ + Long counter = 0l; + InternalScanner scanner = getScanWithColAndQualifier(colFamily, colQualifier); + + List results = new ArrayList(); + try { + boolean done = false; + do{ + results.clear(); + done = scanner.next(results); + KeyValue val = results.get(0); + if(val != null) counter++; // TODO: Or shall it only caters to the row, and ignore whether a specific column is null or not. + // Or is it like a val can't be null. Need to look in to all possible values of keyval! + }while(done); + }finally{ + scanner.close(); + } + log.debug("Row counter from this region is "+getEnvironment().getRegion().hashCode()+": "+counter); + return counter; + } + + + + + /** + * Returns a scanner for a given column qualifier and family. Throws null pointer in case either of them is null. + * @param colFamily + * @param colQualifier + * @return + * @throws IOException + */ + + private InternalScanner getScanWithColAndQualifier(byte[] colFamily, byte[] colQualifier) throws IOException{ + Scan scan = new Scan(); + try { + scan.addColumn(colFamily, colQualifier); + HRegion region = getEnvironment().getRegion(); + return region.getScanner(scan); + } catch (Exception e) { + log.error("Exception occurred while creating the scanner! Either column family or qualifier is null."); + throw new IOException("Exception occurred while creating the scanner"); + } + } + + /** + * It returns sum and number of elements of a given qualifier and family. This is returned as a list of LongWritables containing sum and + * total count. The idea is at the client side, we will just do a grand sum of both these and get the overall average. + */ + @Override + public List getAvg(byte[] colFamily, byte[] colQualifier) throws IOException { + long sum = 0l,rowNum = 0l; + InternalScanner scanner = getScanWithColAndQualifier(colFamily, colQualifier); + boolean done = false; + List kvList = new ArrayList(); + try{ + do{ + kvList.clear(); + done = scanner.next(kvList); + sum += Bytes.toLong(kvList.get(0).getValue()); + rowNum++; + }while(done); + }catch (IOException ie){ + log.error("Some error occurred. Aborting the computation"+ ie.getMessage()); + throw new IOException("Aborting the Avg aggregate computation"); + } + List sumAndRowcount = new ArrayList(); + sumAndRowcount.add(new LongWritable(sum)); + sumAndRowcount.add(new LongWritable(rowNum)); + return sumAndRowcount; + } + + /** + * This returns three values in order to be able to calculate a global standard deviation value. It returns sum, sum of square, and row num + * for a given column qualifier and family. + * The idea is get the value of variance first: the average of the squares less the square of the average + * a standard deviation is square root of variance. + */ + @Override + public List getStd(byte[] colFamily, byte[] colQualifier) + throws IOException { + long sum = 0l, rowNum = 0l, sumSq = 0l, temp = 0l; + InternalScanner scanner = getScanWithColAndQualifier(colFamily, colQualifier); + boolean done = false; + List kvList = new ArrayList(); + try{ + do{ + kvList.clear(); + done = scanner.next(kvList); + temp = Bytes.toLong(kvList.get(0).getValue()); + sum += temp; + sumSq += (temp)*temp; + rowNum ++; + }while(done); + }catch (IOException ie){ + log.error("Some error occurred. Aborting the computation"+ ie.getMessage()); + throw new IOException("Aborting the Std aggregate computation"); + } + List result = new ArrayList(); + result.add(new LongWritable(sum)); + result.add(new LongWritable(sumSq)); + result.add(new LongWritable(rowNum)); + return result; + } +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateCpProtocol.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateCpProtocol.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateCpProtocol.java (revision 0) @@ -0,0 +1,24 @@ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.io.LongWritable; + +/** +* It defines the aggregation functions that are to be implemented. +*/ +public interface AggregateCpProtocol extends CoprocessorProtocol{ + long getMax(byte[] colFamily, byte[] colQualifier) throws IOException; + long getMin(byte[] colFamily, byte[] colQualifier) throws IOException; + + long getSum(byte[] colFamily, byte[] colQualifier) throws IOException; + long getRowNum(byte[] colFamily, byte[] colQualifier) throws IOException; + + List getAvg(byte[] colFamily, byte[] colQualifier) throws IOException; + List getStd(byte[] colFamily, byte[] colQualifier) throws IOException; + + + +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (revision 0) @@ -0,0 +1,261 @@ +package org.apache.hadoop.hbase.client.coprocessor; + +import java.io.IOException; +import java.text.NumberFormat; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.coprocessor.AggregateCpProtocol; +import org.apache.hadoop.hbase.coprocessor.AggregateProtocolImpl; +import org.apache.hadoop.io.LongWritable; + +/** + * + * This client class is for invoking the agg functions deployed on the RS side via the cp impls. This class will implement the + * supporting functionality for summing/processing the individual results obtained from the cp impl for each region server. + * Note: there is one to one mapping between the client and server operations now. + * + * This will serve as the client side handle for invoking the agg funcitons, like AggrationClient#sum(table, start, end, colFamily, colQualifier); + * + * + */ +public class AggregationClient { + + private static Log log = LogFactory.getLog(AggregationClient.class); + + /** + * It gives the global maximum value of a column for a given column family. + * @param tableName + * @param startKey + * @param endKey + * @param colFamily + * @param colQualifier + * @return the maximum value as a long. + * @throws Throwable The caller is supposed to handle the exception as they are thrown & propagated to it. + */ + public long getMaximum(final byte[] tableName, final byte[] startKey, final byte[] endKey, + final byte[] colFamily, final byte[] colQualifier) throws Throwable { + HTable table = new HTable(tableName); + class MaxCallBack implements Batch.Callback{ + long maximum = Long.MIN_VALUE; + @Override + public void update(byte[] region, byte[] row, Long result) { + if(maximum() { + @Override + public Long call(AggregateCpProtocol instance) throws IOException { + return instance.getMax(colFamily, colQualifier); + } + }, aMaxCallBack); + + log.debug("Maximum from all the regions is: "+ aMaxCallBack.getMax()); + return aMaxCallBack.getMax(); + } + + /** + * It gives the global minimum value, gathered over all regions. + * @param tableName + * @param startKey + * @param endKey + * @param colFamily + * @param colQualifier + * @return + * @throws Throwable + */ + public Long getMinimum(final byte[] tableName, final byte[] startKey, final byte[] endKey, + final byte[] colFamily, final byte[] colQualifier) throws Throwable { + + class MinCallBack implements Batch.Callback { + + private Long minimum = Long.MAX_VALUE; + @Override + public void update(byte[] region, byte[] row, Long result) { + if (minimum > result) + minimum = result; + } + public Long getMinimum() { + return minimum; + } + } + HTable table = new HTable(tableName); + MinCallBack minCallBack = new MinCallBack(); + table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey, + new Batch.Call() { + + @Override + public Long call(AggregateCpProtocol instance) throws IOException { + return instance.getMin(colFamily, colQualifier); + } + }, minCallBack); + log.debug("Min fom all regions is: "+minCallBack.getMinimum()); + return minCallBack.getMinimum(); + } + +/** + * It gives the row num, by summing up the individual results obtained from regions. + * @param tableName + * @param startKey + * @param endKey + * @param colFamily + * @param colQualifier + * @return + * @throws Throwable + */ + public long getRowNum(final byte[] tableName, final byte[] startKey, final byte[] endKey, + final byte[] colFamily, final byte[] colQualifier) throws Throwable{ + + class RowNumCallback implements Batch.Callback{ + private Long rowNumCount = 0l; + @Override + public void update(byte[] region, byte[] row, Long result) { + rowNumCount += result; + } + public Long getRowNumCount() { + return rowNumCount; + } + } + RowNumCallback rowNum = new RowNumCallback(); + HTable table = new HTable(tableName); + table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey, + new Batch.Call() { + @Override + public Long call(AggregateCpProtocol instance) throws IOException { + return instance.getRowNum(colFamily, colQualifier); + } + }, rowNum); + + return rowNum.getRowNumCount(); + } + + /** + * It sums up the value returned from various regions. + * @param tableName + * @param startKey + * @param endKey + * @param colFamily + * @param colQualifier + * @return + * @throws Throwable + */ + public long getSum(final byte[] tableName, final byte[] startKey, final byte[] endKey, + final byte[] colFamily, final byte[] colQualifier) throws Throwable{ + class SumCallBack implements Batch.Callback{ + Long sumResult = 0l; + @Override + public void update(byte[] region, byte[] row, Long result) { + sumResult += result; + } + public Long getSumResult() { + return sumResult; + } + } + SumCallBack sumCallBack = new SumCallBack(); + HTable table = new HTable(tableName); + table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey, + new Batch.Call() { + @Override + public Long call(AggregateCpProtocol instance) throws IOException { + return instance.getSum(colFamily, colQualifier); + } + }, sumCallBack); + return sumCallBack.getSumResult(); + } + + /** + * It computes average while fetching sum and row count from all the corresponding regions. Idea is compute a global + * sum of region level sum and rowcounts and compute a global average. + * @param tableName + * @param startKey + * @param endKey + * @param colFamily + * @param colQualifier + * @return TODO: number formatting? + * @throws Throwable + */ + public double getAvg( final byte[] tableName, final byte[] startKey, final byte[] endKey, + final byte[] colFamily, final byte[] colQualifier) throws Throwable{ + + class AvgCallBack implements Batch.Callback>{ + long sum = 0l; + long rsCount = 0; + @Override + public void update(byte[] region, byte[] row, List result) { + sum += result.get(0).get(); + rsCount += result.get(1).get(); + } + public double getAvg(){ + double avg = (rsCount!=0)? (double) sum/rsCount: 0; + return avg; + } + } + AvgCallBack avgCallBack = new AvgCallBack(); + HTable table = new HTable(tableName); + table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey, + new Batch.Call>() { + @Override + public List call(AggregateCpProtocol instance) throws IOException { + // TODO Auto-generated method stub + return instance.getAvg(colFamily, colQualifier); + } + }, avgCallBack); + log.debug("Avg of all the region values is: "+ avgCallBack.getAvg()); + return avgCallBack.getAvg(); + } + + /** + * It computes a global standard deviation for a given column and its value. + * Standard deviation is square root of (average of squares - average*average). From individual regions, it obtains sum, square sum and + * number of rows. With these, the above values are computed to get the global std. + * @param tableName + * @param startKey + * @param endKey + * @param colFamily + * @param colQualifier + * @return TODO: number formatting + * @throws Throwable + */ + public double getStd(final byte[] tableName, final byte[] startKey, final byte[] endKey, + final byte[] colFamily, final byte[] colQualifier) throws Throwable{ + + class StdCallback implements Batch.Callback>{ + long rowNum = 0l, sum = 0l, sumSq = 0l; + @Override + public void update(byte[] region, byte[] row, List result) { + sum += result.get(0).get(); + sumSq += result.get(1).get(); + rowNum += result.get(2).get(); + } + public double getStd(){ + double res = 0d; + double avg = (rowNum != 0)? (double)sum/rowNum: 0; + double avgOfSumSq = (rowNum != 0)? (double) sumSq/rowNum:0; + res = avgOfSumSq - (avg)*(avg); //variance + res = Math.pow(res, 0.5); + return res; + } + } + StdCallback stdCallback = new StdCallback(); + HTable table = new HTable(tableName); + table.coprocessorExec(AggregateCpProtocol.class, startKey, endKey, + new Batch.Call>() { + @Override + public List call(AggregateCpProtocol instance) + throws IOException { + return instance.getStd(colFamily, colQualifier); + } + + }, stdCallback); + return stdCallback.getStd(); + } +}