Index: src/main/java/org/apache/hadoop/hbase/client/CursorCallable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/CursorCallable.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/CursorCallable.java (revision 0) @@ -0,0 +1,88 @@ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.NotServingRegionException; + +/** + * This captures the call of the cursor behavior from client to the server. + * It has the location of the regionserver, row, and the regioninfo for that row. + * + * it is almost similar to the scannercallable, so it should have a corresponding ID, + * that is mapped to a fat object (CursorResultSetCp) on the server side. this object is created by the + * coprocessor endpoint. That object provides support for navigating/pulling rows from a resultset. + * There can be two types of resultset: InMemoryResultSet and IncrementalResultSet. + */ +public class CursorCallable extends ServerCallable { + + private long cursorId = -1; + private boolean instantiated = false; + private boolean closed = false; + private int cache = 1; + + private static final Log LOG = LogFactory.getLog(CursorCallable.class); + + public void setCache(int cache) { + this.cache = cache; + } + + @Override + public void instantiateServer(boolean reload) throws IOException { + if(!instantiated){ + super.instantiateServer(reload); + instantiated = true; + } + } + + public CursorCallable(HConnection connection, byte[] tableName, byte[] row) { + super(connection, tableName, row); + } + + public void setCursorId(long cursorId) { + this.cursorId = cursorId; + } + + @Override + public Result[] call() throws IOException { + Result [] res; + if(cursorId == -1) + return null;// cursor is not set/registered. + else if (closed && cursorId != -1){ // go and close the cursor on the HRS + this.close(); + }else{ + try { + res = this.server.nextCp(cursorId, cache); + if(res ==null || res.length !=this.cache){ //results are all fetch, set the close flag, so that on next call, the cursor is closed. + closed = true; + } + return res; + } catch (Exception e) { + if (e instanceof NotServingRegionException){ + LOG.error("got a NSRE from region server with location"+ this.location.toString()); + // at this point, it will abort the process! (sad but true). not supporting the logic of resending the request. + throw new IOException("Aborting the process due to NSRE"); + } + throw new IOException(e.getCause()); + } + } + return null; + } + + + public void setClosed(boolean closed) { + this.closed = closed; + } + + public void close(){ + if(this.cursorId ==-1) + return; + try{ + this.server.closeCp(cursorId); + }catch(IOException ioe){ + LOG.error("Got an exception while closing the cursor resource"); + } + this.cursorId = -1; + } +} Index: src/main/java/org/apache/hadoop/hbase/client/CursorCp.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/CursorCp.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/CursorCp.java (revision 0) @@ -0,0 +1,115 @@ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * This class represnts the client side interface of the cursor-server-client plumbing. + * + */ +public class CursorCp implements ResultScanner{ + // starting with essential fields. + private CursorCallable cursorCallable = null; + private HTable htable; + private final LinkedList cache = new LinkedList(); + private boolean closed = false; + private static final Log LOG = LogFactory.getLog(CursorCp.class); + + public CursorCp(CursorCallable callable, HTable htable) { + this.cursorCallable = callable; + this.htable = htable; + } + + @Override + public void close(){ + if(this.cursorCallable != null) + this.cursorCallable.setClosed(true); + try { + this.htable.getConnection().getRegionServerWithRetries(this.cursorCallable); + } catch (IOException e) { + e.printStackTrace(); + } + this.closed = true; + + } + + @Override + public Result next() throws IOException { + if(cache.size() ==0 && this.closed) + return null; + if(cache.size() ==0){//do a rpc and fetch results + Result[] res = this.htable.getConnection().getRegionServerWithRetries(cursorCallable); + if(res != null && res.length >0){ + for (Result r: res){ + cache.add(r); + } + } + } + if(cache.size() > 0){ + return cache.poll(); + } + return null; + } + + @Override + public Result[] next(int nbRows) throws IOException { + List res = new ArrayList(); + for (int i = 0; i < nbRows; i++){ + Result r = next(); + if (r != null){ + res.add(r); + }else{ + break; + } + } + return res.toArray(new Result[0]); + } + + /* + * Iterator impl for the cursor (similar to scanner) + * I haven't used this iterator in the test method yet. + * (non-Javadoc) + * @see java.lang.Iterable#iterator() + */ + @Override + public Iterator iterator() { + // TODO Auto-generated method stub + return new Iterator() { + Result res; + @Override + public Result next() { + if(!hasNext()){ + return null; // done + } + Result tmp = res; //res has the latest row. + res = null; + return tmp; + } + + @Override + public boolean hasNext() { + if(res == null){ + try { + res = CursorCp.this.next(); + return res != null; + } catch (IOException e) { + LOG.error("Exception while getting next element"+ e.getMessage()); + throw new RuntimeException(e); + } + } + return true; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/CursorResultSetCp.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/CursorResultSetCp.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/CursorResultSetCp.java (revision 0) @@ -0,0 +1,62 @@ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import org.apache.hadoop.hbase.client.Result; + +/** + * This is the in-memory result set that is generated by the CP impl. It also has a cursor interface, to navigate the result set. + * This is registered on to the RS, and the associated key is provided to the client. The client can invoke "next, hasNext" like methods + * onto the resultset (but they are convoluted to next() calls when they reached here. + * T: is the type of result one is interested in. At present it can be like KeyValue or Result (one result obj associated to one row) + * + * This serves for both types of resultsets, incremental (like listed here), or may be inmemory one (like topK). + * For top K, the producer impl will set its hasMoreRows to false in the very first call to the CP impl method. + */ +public abstract class CursorResultSetCp { + protected Queue resultList = new LinkedList(); + protected boolean hasMoreRows = true; + private long timeToLastAccess = 0l;// was thinking about a resource release thread that will call the closeResources method based on some time interval thingy +// private int resultBatchSize = 100; //default 100 + +// public CursorResultSetCp( int productionBatchSize) { +// this.resultBatchSize = productionBatchSize; +// } + + /** + * this returns nbRows from the resultset, if there are less . + * @param nbRows + * @return + * @throws IOException + */ + public Result[] next(int nbRows) throws IOException { + timeToLastAccess = System.currentTimeMillis(); + List res = new ArrayList(); + if(!hasMoreRows && resultList.size()==0) { + return null; + } + while(hasMoreRows && resultList.size() scanners = new ConcurrentHashMap(); + final Map cursors = new ConcurrentHashMap(); + + // zookeeper connection and watcher private ZooKeeperWatcher zooKeeper; @@ -2819,4 +2823,64 @@ new HRegionServerCommandLine(regionServerClass).doMain(args); } + + /** + * Registers the cursorresultset object, created by a CP impl in RS. Used by client side in which it sends the associated key id + * and navigates this object's resultSet. (similar to the current Scanner flow) + */ + @Override + public long registerCursorResultSetCp(CursorResultSetCp resultSet) throws IOException { + checkOpen(); + long cursorId = -1L; + cursorId = rand.nextLong(); + String cursorName = String.valueOf(cursorId); + requestCount.incrementAndGet(); + try{ + cursors.put(cursorName, resultSet); + LOG.debug("Registering the object in hrs:"+resultSet+ "with cursor id and hrs hashcode"+cursorId+":"+this.hashCode()); + return cursorId; + }catch (Exception e){ + LOG.error("exception occuccered while registering the cursor object"); + throw new IOException(e.getCause()); + //TODO: some kind of lease management things need to be pulled in here + } + } + /** + * Navigating the CP resultset object, with a cache arguments (aka the number of result rows that are to be send in this one rpc) + */ + @Override + public Result[] nextCp(long cursorId, int cache) throws IOException { + checkOpen(); + + try { + String cursorName = String.valueOf(cursorId); + CursorResultSetCp res = this.cursors.get(cursorName); + LOG.debug("Fetching object from the cursor map:"+res+" this hrs is"+this.hashCode()); + Result[] r = res.next(cache); + return r; + } catch (Exception e) { + LOG.error("exception while fetching the next from a region's cursor."); + if (e instanceof NotServingRegionException){ + closeCp(cursorId); // close the resource as the region is down, and the client will retry this. + } + throw new IOException(e.getCause()); + } + } + /** Unregisters the cursor object from RS & release the region level resources. + * + * @param cursorId + * @throws IOException + */ + public void closeCp(long cursorId) throws IOException { + checkOpen(); + try{ + requestCount.incrementAndGet(); + LOG.debug("Closing/releasing the cursor object"); + CursorResultSetCp cp = this.cursors.remove(String.valueOf(cursorId)); + cp.closeResources(); + }catch(Exception e){ + LOG.error("some exception while releasing the cursor object."); + throw new IOException(e.getCause()); + } + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (revision 1076347) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (working copy) @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.coprocessor.CursorResultSetCp; import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -83,4 +84,12 @@ * Returns a reference to the RPC server metrics. */ public HBaseRpcMetrics getRpcMetrics(); + + /** + * @throws IOException + * + */ + public long registerCursorResultSetCp(CursorResultSetCp resultSet) throws IOException; + + } \ No newline at end of file Index: src/test/java/org/apache/hadoop/hbase/coprocessor/CountGroupByImpl.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/CountGroupByImpl.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/CountGroupByImpl.java (revision 0) @@ -0,0 +1,117 @@ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.TestCursorCp.CountGroupBy; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.rest.protobuf.generated.CellSetMessage.CellSet.Row; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * +'aaa-123' +'aaa-456' +'abc-111' +'abd-111' +'abd-222' + +I want to return: + +('aaa', 2) +('abc', 1) +('abd', 2) + */ +public class CountGroupByImpl extends BaseEndpointCoprocessor implements CountGroupBy { + private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily"); + private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier"); + + private static final Log LOG = LogFactory.getLog(CountGroupByImpl.class); + + /** + * a bunch of steps that needs to be done here. The target as of now is to produce the CursorResultSetCp object here, + * and register it against a region level mapping, preferably inside the HRS. I will send the key to the client. + * + */ + @Override + public Long getRowCountByPrefix(final String prefix) throws IOException { + HRegion reg = ((RegionCoprocessorEnvironment)getEnvironment()).getRegion(); + LOG.debug("start::"+Bytes.toString(reg.getStartKey()) + " end :"+ Bytes.toString(reg.getEndKey())); +// ((RegionCoprocessorEnvironment)getEnvironment()).setCursorCacheSize(100); + + CursorResultSetCp rs = new CursorResultSetCp() { + Scan scan = new Scan(); + InternalScanner scanner = ((RegionCoprocessorEnvironment)getEnvironment()).getRegion().getScanner(scan); + List list = new ArrayList(); + String currentRow = null; + int count = 0; + /** + * This guy produces the result, and stores them in the cursorRS object, that will be eventually registered in the RS. + * It can be implemented for a complete Inmemory result or an incremental inmemory mode + * Below is a sample incremental implementation. + * @throws IOException + */ + @Override + public void produceResult() throws IOException { + do{ + hasMoreRows = scanner.next(list); + byte[] row = this.list.get(0).getRow(); + String str = Bytes.toString(row); // str = aaa-01,etc + str = str.substring(0, str.indexOf(prefix)); + if(currentRow == null) { + currentRow = str; + count++; + list.clear(); + }else{ // check whether they are equal. if yes, increment the count and if not, you have found another key. save the existing to result. + if(str.equals(currentRow)){ + count++; + list.clear(); + + }else{ + //can you add the correct keyvalue to the result, based on this key and its count? + KeyValue kv = new KeyValue(Bytes.toBytes(currentRow), CountGroupByImpl.TEST_FAMILY, CountGroupByImpl.TEST_QUALIFIER, System.currentTimeMillis(),Bytes.toBytes(count)); + list.clear(); + list.add(kv); + Result res = new Result(list); + this.resultList.add(res); + LOG.debug("ROW is: "+res.getRow() +" row and counter::"+currentRow +", "+count);//TODO: delete this line + count = 1; + currentRow = str; + list.clear(); + break; + } + } + + }while(hasMoreRows); + if(!hasMoreRows){ + KeyValue kv = new KeyValue(Bytes.toBytes(currentRow), CountGroupByImpl.TEST_FAMILY, CountGroupByImpl.TEST_QUALIFIER, System.currentTimeMillis(),Bytes.toBytes(count)); + list.clear(); + list.add(kv); + Result res = new Result(list); + this.resultList.add(res); + } + } + /** + * release the resources + */ + @Override + public void closeResources() throws IOException { + this.scanner.close(); + this.scan = null; + this.list = null; + this.resultList = null; + } + }; + //register the cursorResult object + return ((RegionCoprocessorEnvironment)getEnvironment()).getRegionServerServices().registerCursorResultSetCp(rs); + } +} Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestCursorCp.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestCursorCp.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestCursorCp.java (revision 0) @@ -0,0 +1,136 @@ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.CursorCallable; +import org.apache.hadoop.hbase.client.CursorCp; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * this will be a test class for the cp. have a cp that returns a Cursor from its core method, and iterates like the current Scanner on a raw table. + * /** + * The scenario is that I have these rows in the test table: +'aaa-123' +'aaa-456' +'abc-111' +'abd-111' +'abd-222' + +& I want to return: +('aaa', 2) +('abc', 1) +('abd', 2) + */ + +public class TestCursorCp { + + public interface CountGroupBy extends CoprocessorProtocol{ + Long getRowCountByPrefix(String prefix) throws IOException; + } + private static final Log log = LogFactory.getLog(TestCursorCp.class); + + private static final byte[] TEST_TABLE = Bytes.toBytes("TestTable"); + private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily"); + private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier"); + private static byte[][] ROW = {Bytes.toBytes("aaa-"), Bytes.toBytes("bbb-"), Bytes.toBytes("ccc-"), Bytes.toBytes("ddd-"), + Bytes.toBytes("eee-"), Bytes.toBytes("fff-"), Bytes.toBytes("ggg-"), Bytes.toBytes("iii-")}; + + private static final int ROWSIZE = 40; + private static final int rowSeperator1 = 3; + private static final int rowSeperator2 = 6; // Region 3 (last region will have rows from bbb-11 (inclusive) onwards) + private static byte[][] ROWS = makeN(ROW, ROWSIZE); + + private static HBaseTestingUtility util = new HBaseTestingUtility(); + private static MiniHBaseCluster cluster = null; + + // making a parameter, so that i can use it in the test case where i need to look at the active threads in table's pool. + private static final int slaves = 2; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + // set configure to indicate which cp should be loaded + Configuration conf = util.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.coprocessor.CountGroupByImpl"); + + util.startMiniCluster(slaves); + cluster = util.getMiniHBaseCluster(); + + HTable table = util.createTable(TEST_TABLE, TEST_FAMILY); + util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY, + new byte[][] { HConstants.EMPTY_BYTE_ARRAY, + ROWS[rowSeperator1], ROWS[rowSeperator2] }); + + for (int i = 0; i < ROWSIZE; i++) { + Put put = new Put(ROWS[i]); + put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i)); + table.put(put); + } + + // sleep here is an ugly hack to allow region transitions to finish + Thread.sleep(5000); + } + + @Test + public void testCursor() throws IOException{ + HTable table = new HTable(TEST_TABLE); + CountGroupBy c = table.coprocessorProxy(CountGroupBy.class, "ddd".getBytes()); + log.debug("************** Starting the CP at one region *******************************"); + log.debug("**************START KEY:: "+table.getRegionLocation("ddd".getBytes()).getRegionInfo().getStartKey()+" *************"); + + /** + * i think i shd put below lines (cursor instantiation logic) inside HTable, a new method may be: & get a cursor + * but these calls on CP specific methods (business logic), like getRowCountByPrefix(..) needs to be invoked on the returned proxy. + */ + Long cursorId = c.getRowCountByPrefix("-"); // this should be set to a full cursor object (i.e., with a mapping at server side) + log.debug("CURSOR ID::"+cursorId); + /////////////////Settng the cursor object at the client side ///////////////////////// + CursorCallable callable = new CursorCallable(table.getConnection(), TEST_TABLE, "ddd".getBytes()); + callable.setCursorId(cursorId); + callable.instantiateServer(false); + callable.setCache(2); + c = null; + CursorCp cursor = new CursorCp(callable, table); + /////////// so, this cursor is all set to its respective callable object.////////////// + + for (Result r = cursor.next(); r != null; r = cursor.next()){ + log.debug("IS THERE A ROW ::"+Bytes.toString(r.getRow() ) + " val is::"+Bytes.toInt(r.getValue(TEST_FAMILY, TEST_QUALIFIER))); + } + cursor.close(); + } + + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + /** + * I want to create rows such that they are like aaa-00, aaa-11, aaa-22,aaa-33, bbb-00 + * @param base + * @param n + * @return + */ + private static byte[][] makeN(byte[][] base, int n) { + byte[][] ret = new byte[n][]; + int index = 0; + for(int i =0; i< base.length; i++){ + for (int j = 0; j <5; j++){ + ret[index++] = Bytes.add(base[i], Bytes.toBytes(String.format("%02d",j))); + } + } + return ret; + } +}