Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1099357) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -469,6 +469,16 @@ */ public static final float HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD = 0.2f; + /** + * Constants for Cursor + */ + public static final String REGION_CURSOR_LEASE_PERIOD_KEY = "hbase.region.cursor.period"; + + /** + * Default value of {@link REGION_CURSOR_LEASE_PERIOD_KEY} in ms + */ + public static final long REGION_CURSOR_DEFAULT_LEASE_PERIOD = 60000l; + private HConstants() { // Can't be instantiated with this ctor. } Index: src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 1099357) +++ src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -1524,7 +1524,7 @@ for (int i = 0; i < results.length; i++) { // if null (fail) or instanceof Throwable && not instanceof DNRIOE // then retry that row. else dont. - if (results[i] == null || + if (//results[i] == null || Himanshu : commenting it out as the results may be null! (results[i] instanceof Throwable && !(results[i] instanceof DoNotRetryIOException))) { @@ -1556,7 +1556,8 @@ List addresses = new ArrayList(actionCount); for (int i = 0 ; i < results.length; i++) { - if (results[i] == null || results[i] instanceof Throwable) { + if (/*results[i] == null || */results[i] instanceof Throwable) { + //Himanshu : commenting it out as the results may be null! exceptions.add((Throwable)results[i]); actions.add(list.get(i)); addresses.add(lastServers[i].getHostnamePort()); Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1099357) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -26,6 +26,7 @@ import java.lang.reflect.Proxy; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -53,10 +54,11 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.UnknownScannerException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.client.coprocessor.CursorClient; +import org.apache.hadoop.hbase.client.coprocessor.Exec; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.ExecRPCInvoker; import org.apache.hadoop.hbase.util.Addressing; @@ -1468,5 +1470,29 @@ return rangeKeys; } + @Override + public void processBatchExecsForCursor(List rows, Batch.Callback callback) throws IOException, InterruptedException { + Object[] results= new Object[rows.size()]; + this.connection.processBatchCallback(rows, getTableName(), pool, results, callback); + + } + @Override + public CursorClient execCPAndGetCursor( + Class protocol, byte[] startKey, byte[] endKey, + Batch.Call callable, int cursorBatchSize) + throws IOException, Throwable { + List rows = getStartKeysInRange(startKey, endKey); + + Map cursorIdMap = new HashMap(); + CursorClient c = new CursorClient(protocol, this, cursorBatchSize); + for (byte[] r : rows){ + // initialize the map with region reference row with id = 0 + cursorIdMap.put(r, 0l); + } + c.setRegionToCursorId(cursorIdMap); + connection.processExecs(protocol, rows, tableName, pool, callable, + (Batch.Callback)c.getCursorCallbackLong()); + return c; + } } Index: src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (revision 1099357) +++ src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (working copy) @@ -21,25 +21,24 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.client.coprocessor.CursorClient; +import org.apache.hadoop.hbase.client.coprocessor.Exec; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; -import java.util.Map; - /** * Used to communicate with a single HBase table. - * * @since 0.21.0 */ public interface HTableInterface { /** * Gets the name of this table. - * * @return the table name. */ byte[] getTableName(); @@ -47,91 +46,99 @@ /** * Returns the {@link Configuration} object used by this instance. *

- * The reference returned is not a copy, so any change made to it will - * affect this instance. + * The reference returned is not a copy, so any change made to it will affect + * this instance. */ Configuration getConfiguration(); /** * Gets the {@link HTableDescriptor table descriptor} for this table. - * @throws IOException if a remote or network exception occurs. + * @throws IOException + * if a remote or network exception occurs. */ HTableDescriptor getTableDescriptor() throws IOException; /** * Test for the existence of columns in the table, as specified in the Get. *

- * * This will return true if the Get matches one or more keys, false if not. *

- * * This is a server-side call so it prevents any data from being transfered to * the client. - * - * @param get the Get + * @param get + * the Get * @return true if the specified Get matches one or more keys, false if not - * @throws IOException e + * @throws IOException + * e */ boolean exists(Get get) throws IOException; /** * Method that does a batch call on Deletes, Gets and Puts. - * - * @param actions list of Get, Put, Delete objects - * @param results Empty Object[], same size as actions. Provides access to partial - * results, in case an exception is thrown. A null in the result array means that - * the call for that action failed, even after retries + * @param actions + * list of Get, Put, Delete objects + * @param results + * Empty Object[], same size as actions. Provides access to partial + * results, in case an exception is thrown. A null in the result + * array means that the call for that action failed, even after + * retries * @throws IOException * @since 0.90.0 */ - void batch(final List actions, final Object[] results) throws IOException, InterruptedException; + void batch(final List actions, final Object[] results) + throws IOException, InterruptedException; /** * Method that does a batch call on Deletes, Gets and Puts. - * - * - * @param actions list of Get, Put, Delete objects + * @param actions + * list of Get, Put, Delete objects * @return the results from the actions. A null in the return array means that * the call for that action failed, even after retries * @throws IOException * @since 0.90.0 */ - Object[] batch(final List actions) throws IOException, InterruptedException; + Object[] batch(final List actions) throws IOException, + InterruptedException; /** * Extracts certain cells from a given row. - * @param get The object that specifies what data to fetch and from which row. - * @return The data coming from the specified row, if it exists. If the row - * specified doesn't exist, the {@link Result} instance returned won't - * contain any {@link KeyValue}, as indicated by {@link Result#isEmpty()}. - * @throws IOException if a remote or network exception occurs. + * @param get + * The object that specifies what data to fetch and from which row. + * @return The data coming from the specified row, if it exists. If the row + * specified doesn't exist, the {@link Result} instance returned won't + * contain any {@link KeyValue}, as indicated by + * {@link Result#isEmpty()}. + * @throws IOException + * if a remote or network exception occurs. * @since 0.20.0 */ Result get(Get get) throws IOException; /** * Extracts certain cells from the given rows, in batch. - * - * @param gets The objects that specify what data to fetch and from which rows. - * - * @return The data coming from the specified rows, if it exists. If the row + * @param gets + * The objects that specify what data to fetch and from which rows. + * @return The data coming from the specified rows, if it exists. If the row * specified doesn't exist, the {@link Result} instance returned won't - * contain any {@link KeyValue}, as indicated by {@link Result#isEmpty()}. - * If there are any failures even after retries, there will be a null in - * the results array for those Gets, AND an exception will be thrown. - * @throws IOException if a remote or network exception occurs. - * + * contain any {@link KeyValue}, as indicated by + * {@link Result#isEmpty()}. If there are any failures even after + * retries, there will be a null in the results array for those Gets, + * AND an exception will be thrown. + * @throws IOException + * if a remote or network exception occurs. * @since 0.90.0 */ Result[] get(List gets) throws IOException; /** - * Return the row that matches row exactly, - * or the one that immediately precedes it. - * - * @param row A row key. - * @param family Column family to include in the {@link Result}. - * @throws IOException if a remote or network exception occurs. + * Return the row that matches row exactly, or the one that immediately + * precedes it. + * @param row + * A row key. + * @param family + * Column family to include in the {@link Result}. + * @throws IOException + * if a remote or network exception occurs. * @since 0.20.0 */ Result getRowOrBefore(byte[] row, byte[] family) throws IOException; @@ -139,43 +146,48 @@ /** * Returns a scanner on the current table as specified by the {@link Scan} * object. - * - * @param scan A configured {@link Scan} object. + * @param scan + * A configured {@link Scan} object. * @return A scanner. - * @throws IOException if a remote or network exception occurs. + * @throws IOException + * if a remote or network exception occurs. * @since 0.20.0 */ ResultScanner getScanner(Scan scan) throws IOException; /** * Gets a scanner on the current table for the given family. - * - * @param family The column family to scan. + * @param family + * The column family to scan. * @return A scanner. - * @throws IOException if a remote or network exception occurs. + * @throws IOException + * if a remote or network exception occurs. * @since 0.20.0 */ ResultScanner getScanner(byte[] family) throws IOException; /** * Gets a scanner on the current table for the given family and qualifier. - * - * @param family The column family to scan. - * @param qualifier The column qualifier to scan. + * @param family + * The column family to scan. + * @param qualifier + * The column qualifier to scan. * @return A scanner. - * @throws IOException if a remote or network exception occurs. + * @throws IOException + * if a remote or network exception occurs. * @since 0.20.0 */ ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException; - /** * Puts some data in the table. *

- * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered - * until the internal buffer is full. - * @param put The data to put. - * @throws IOException if a remote or network exception occurs. + * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered until + * the internal buffer is full. + * @param put + * The data to put. + * @throws IOException + * if a remote or network exception occurs. * @since 0.20.0 */ void put(Put put) throws IOException; @@ -183,27 +195,34 @@ /** * Puts some data in the table, in batch. *

- * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered - * until the internal buffer is full. - * @param puts The list of mutations to apply. The batch put is done by - * aggregating the iteration of the Puts over the write buffer - * at the client-side for a single RPC call. - * @throws IOException if a remote or network exception occurs. + * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered until + * the internal buffer is full. + * @param puts + * The list of mutations to apply. The batch put is done by + * aggregating the iteration of the Puts over the write buffer at the + * client-side for a single RPC call. + * @throws IOException + * if a remote or network exception occurs. * @since 0.20.0 */ void put(List puts) throws IOException; /** * Atomically checks if a row/family/qualifier value matches the expected - * value. If it does, it adds the put. If the passed value is null, the check + * value. If it does, it adds the put. If the passed value is null, the check * is for the lack of column (ie: non-existance) - * - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param value the expected value - * @param put data to put if check succeeds - * @throws IOException e + * @param row + * to check + * @param family + * column family to check + * @param qualifier + * column qualifier to check + * @param value + * the expected value + * @param put + * data to put if check succeeds + * @throws IOException + * e * @return true if the new put was executed, false otherwise */ boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, @@ -211,37 +230,45 @@ /** * Deletes the specified cells/row. - * - * @param delete The object that specifies what to delete. - * @throws IOException if a remote or network exception occurs. + * @param delete + * The object that specifies what to delete. + * @throws IOException + * if a remote or network exception occurs. * @since 0.20.0 */ void delete(Delete delete) throws IOException; /** * Deletes the specified cells/rows in bulk. - * @param deletes List of things to delete. List gets modified by this - * method (in particular it gets re-ordered, so the order in which the elements - * are inserted in the list gives no guarantee as to the order in which the - * {@link Delete}s are executed). - * @throws IOException if a remote or network exception occurs. In that case - * the {@code deletes} argument will contain the {@link Delete} instances - * that have not be successfully applied. + * @param deletes + * List of things to delete. List gets modified by this method (in + * particular it gets re-ordered, so the order in which the elements + * are inserted in the list gives no guarantee as to the order in + * which the {@link Delete}s are executed). + * @throws IOException + * if a remote or network exception occurs. In that case the {@code + * deletes} argument will contain the {@link Delete} instances that + * have not be successfully applied. * @since 0.20.1 */ void delete(List deletes) throws IOException; /** * Atomically checks if a row/family/qualifier value matches the expected - * value. If it does, it adds the delete. If the passed value is null, the + * value. If it does, it adds the delete. If the passed value is null, the * check is for the lack of column (ie: non-existance) - * - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param value the expected value - * @param delete data to delete if check succeeds - * @throws IOException e + * @param row + * to check + * @param family + * column family to check + * @param qualifier + * column qualifier to check + * @param value + * the expected value + * @param delete + * data to delete if check succeeds + * @throws IOException + * e * @return true if the new delete was executed, false otherwise */ boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, @@ -250,14 +277,15 @@ /** * Increments one or more columns within a single row. *

- * This operation does not appear atomic to readers. Increments are done - * under a single row lock, so write operations to a row are synchronized, but + * This operation does not appear atomic to readers. Increments are done under + * a single row lock, so write operations to a row are synchronized, but * readers do not take row locks so get and scan operations can see this * operation partially completed. - * - * @param increment object that specifies the columns and amounts to be used - * for the increment operations - * @throws IOException e + * @param increment + * object that specifies the columns and amounts to be used for the + * increment operations + * @throws IOException + * e * @return values of columns after the increment */ public Result increment(final Increment increment) throws IOException; @@ -265,16 +293,21 @@ /** * Atomically increments a column value. *

- * Equivalent to {@code {@link #incrementColumnValue(byte[], byte[], byte[], - * long, boolean) incrementColumnValue}(row, family, qualifier, amount, - * true)} - * @param row The row that contains the cell to increment. - * @param family The column family of the cell to increment. - * @param qualifier The column qualifier of the cell to increment. - * @param amount The amount to increment the cell with (or decrement, if the - * amount is negative). + * Equivalent to {@code + * {@link #incrementColumnValue(byte[], byte[], byte[], long, boolean) + * incrementColumnValue}(row, family, qualifier, amount, true)} + * @param row + * The row that contains the cell to increment. + * @param family + * The column family of the cell to increment. + * @param qualifier + * The column qualifier of the cell to increment. + * @param amount + * The amount to increment the cell with (or decrement, if the amount + * is negative). * @return The new value, post increment. - * @throws IOException if a remote or network exception occurs. + * @throws IOException + * if a remote or network exception occurs. */ long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException; @@ -284,33 +317,38 @@ * and is not a big-endian long, this could throw an exception. If the column * value does not yet exist it is initialized to amount and * written to the specified column. - * - *

Setting writeToWAL to false means that in a fail scenario, you will lose + *

+ * Setting writeToWAL to false means that in a fail scenario, you will lose * any increments that have not been flushed. - * @param row The row that contains the cell to increment. - * @param family The column family of the cell to increment. - * @param qualifier The column qualifier of the cell to increment. - * @param amount The amount to increment the cell with (or decrement, if the - * amount is negative). - * @param writeToWAL if {@code true}, the operation will be applied to the - * Write Ahead Log (WAL). This makes the operation slower but safer, as if - * the call returns successfully, it is guaranteed that the increment will - * be safely persisted. When set to {@code false}, the call may return - * successfully before the increment is safely persisted, so it's possible - * that the increment be lost in the event of a failure happening before the - * operation gets persisted. + * @param row + * The row that contains the cell to increment. + * @param family + * The column family of the cell to increment. + * @param qualifier + * The column qualifier of the cell to increment. + * @param amount + * The amount to increment the cell with (or decrement, if the amount + * is negative). + * @param writeToWAL + * if {@code true}, the operation will be applied to the Write Ahead + * Log (WAL). This makes the operation slower but safer, as if the + * call returns successfully, it is guaranteed that the increment + * will be safely persisted. When set to {@code false}, the call may + * return successfully before the increment is safely persisted, so + * it's possible that the increment be lost in the event of a failure + * happening before the operation gets persisted. * @return The new value, post increment. - * @throws IOException if a remote or network exception occurs. + * @throws IOException + * if a remote or network exception occurs. */ long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) throws IOException; /** * Tells whether or not 'auto-flush' is turned on. - * * @return {@code true} if 'auto-flush' is enabled (default), meaning - * {@link Put} operations don't get buffered/delayed and are immediately - * executed. + * {@link Put} operations don't get buffered/delayed and are + * immediately executed. */ boolean isAutoFlush(); @@ -320,23 +358,25 @@ * This method gets called once automatically for every {@link Put} or batch * of {@link Put}s (when put(List) is used) when * {@link #isAutoFlush} is {@code true}. - * @throws IOException if a remote or network exception occurs. + * @throws IOException + * if a remote or network exception occurs. */ void flushCommits() throws IOException; /** * Releases any resources help or pending changes in internal buffers. - * - * @throws IOException if a remote or network exception occurs. + * @throws IOException + * if a remote or network exception occurs. */ void close() throws IOException; /** * Obtains a lock on a row. - * - * @param row The row to lock. + * @param row + * The row to lock. * @return A {@link RowLock} containing the row and lock id. - * @throws IOException if a remote or network exception occurs. + * @throws IOException + * if a remote or network exception occurs. * @see RowLock * @see #unlockRow */ @@ -344,90 +384,135 @@ /** * Releases a row lock. - * - * @param rl The row lock to release. - * @throws IOException if a remote or network exception occurs. + * @param rl + * The row lock to release. + * @throws IOException + * if a remote or network exception occurs. * @see RowLock * @see #unlockRow */ void unlockRow(RowLock rl) throws IOException; /** - * Creates and returns a proxy to the CoprocessorProtocol instance running in the - * region containing the specified row. The row given does not actually have - * to exist. Whichever region would contain the row based on start and end keys will - * be used. Note that the {@code row} parameter is also not passed to the - * coprocessor handler registered for this protocol, unless the {@code row} - * is separately passed as an argument in a proxy method call. The parameter - * here is just used to locate the region used to handle the call. - * - * @param protocol The class or interface defining the remote protocol - * @param row The row key used to identify the remote region location + * Creates and returns a proxy to the CoprocessorProtocol instance running in + * the region containing the specified row. The row given does not actually + * have to exist. Whichever region would contain the row based on start and + * end keys will be used. Note that the {@code row} parameter is also not + * passed to the coprocessor handler registered for this protocol, unless the + * {@code row} is separately passed as an argument in a proxy method call. The + * parameter here is just used to locate the region used to handle the call. + * @param protocol + * The class or interface defining the remote protocol + * @param row + * The row key used to identify the remote region location * @return */ - T coprocessorProxy(Class protocol, byte[] row); + T coprocessorProxy(Class protocol, + byte[] row); /** * Invoke the passed - * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} against - * the {@link CoprocessorProtocol} instances running in the selected regions. - * All regions beginning with the region containing the startKey - * row, through to the region containing the endKey row (inclusive) - * will be used. If startKey or endKey is + * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} against the + * {@link CoprocessorProtocol} instances running in the selected regions. All + * regions beginning with the region containing the startKey row, + * through to the region containing the endKey row (inclusive) + * will be used. If startKey or endKey is * null, the first and last regions in the table, respectively, * will be used in the range selection. - * - * @param protocol the CoprocessorProtocol implementation to call - * @param startKey start region selection with region containing this row - * @param endKey select regions up to and including the region containing - * this row - * @param callable wraps the CoprocessorProtocol implementation method calls - * made per-region - * @param CoprocessorProtocol subclass for the remote invocation - * @param Return type for the - * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)} - * method + * @param protocol + * the CoprocessorProtocol implementation to call + * @param startKey + * start region selection with region containing this row + * @param endKey + * select regions up to and including the region containing this row + * @param callable + * wraps the CoprocessorProtocol implementation method calls made + * per-region + * @param + * CoprocessorProtocol subclass for the remote invocation + * @param + * Return type for the + * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)} + * method * @return a Map of region names to - * {@link Batch.Call#call(Object)} return values + * {@link Batch.Call#call(Object)} return values */ - Map coprocessorExec( - Class protocol, byte[] startKey, byte[] endKey, Batch.Call callable) - throws IOException, Throwable; + Map coprocessorExec( + Class protocol, byte[] startKey, byte[] endKey, + Batch.Call callable) throws IOException, Throwable; /** * Invoke the passed - * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} against - * the {@link CoprocessorProtocol} instances running in the selected regions. - * All regions beginning with the region containing the startKey - * row, through to the region containing the endKey row - * (inclusive) - * will be used. If startKey or endKey is + * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} against the + * {@link CoprocessorProtocol} instances running in the selected regions. All + * regions beginning with the region containing the startKey row, + * through to the region containing the endKey row (inclusive) + * will be used. If startKey or endKey is * null, the first and last regions in the table, respectively, * will be used in the range selection. - * *

* For each result, the given - * {@link Batch.Callback#update(byte[], byte[], Object)} - * method will be called. + * {@link Batch.Callback#update(byte[], byte[], Object)} method will be + * called. *

- * - * @param protocol the CoprocessorProtocol implementation to call - * @param startKey start region selection with region containing this row - * @param endKey select regions up to and including the region containing - * this row - * @param callable wraps the CoprocessorProtocol implementation method calls - * made per-region - * @param callback an instance upon which - * {@link Batch.Callback#update(byte[], byte[], Object)} with the - * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)} - * return value for each region - * @param CoprocessorProtocol subclass for the remote invocation - * @param Return type for the - * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)} - * method + * @param protocol + * the CoprocessorProtocol implementation to call + * @param startKey + * start region selection with region containing this row + * @param endKey + * select regions up to and including the region containing this row + * @param callable + * wraps the CoprocessorProtocol implementation method calls made + * per-region + * @param callback + * an instance upon which + * {@link Batch.Callback#update(byte[], byte[], Object)} with the + * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)} + * return value for each region + * @param + * CoprocessorProtocol subclass for the remote invocation + * @param + * Return type for the + * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(Object)} + * method */ - void coprocessorExec( + void coprocessorExec(Class protocol, + byte[] startKey, byte[] endKey, Batch.Call callable, + Batch.Callback callback) throws IOException, Throwable; + + /** + * added for cursor to invoke different Exec objects on different regions. The + * client side "next" invocation on the cursor object results in this call. + * It eventually invokes the "multi" method on the Server side. + * @param + * @param rows + * @param callback + * @throws IOException + * @throws InterruptedException + */ + + public void processBatchExecsForCursor(List rows, + Batch.Callback callback) throws IOException, InterruptedException; + +/** + * Returns a cursor after executing the CP method that should return a cursor. + * CursorCallback is used for this call, its main purpose is to create a data + * structure for region reference rows to cursorId on that region. + * @param + * @param + * @param + * @param protocol + * @param startKey + * @param endKey + * @param callable + * @param cursorBatchSize + * @return + * @throws IOException + * @throws Throwable + */ + CursorClient execCPAndGetCursor( Class protocol, byte[] startKey, byte[] endKey, - Batch.Call callable, Batch.Callback callback) - throws IOException, Throwable; + Batch.Call callable, int cursorBatchSize) throws IOException, + Throwable; + } Index: src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java (revision 1099357) +++ src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java (working copy) @@ -114,8 +114,8 @@ } else { out.writeBoolean(false); // no exception - if (! (obj instanceof Writable)) - obj = null; // squash all non-writables to null. +// if (! (obj instanceof Writable)) // Himanshu: why this check? HBaseObjectWritable handles others too! +// obj = null; // squash all non-writables to null. HbaseObjectWritable.writeObject(out, r.getSecond(), obj != null ? obj.getClass() : Writable.class, null); } Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/CursorClient.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/coprocessor/CursorClient.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/CursorClient.java (revision 0) @@ -0,0 +1,141 @@ +package org.apache.hadoop.hbase.client.coprocessor; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; + +/** + * A client side representing cursors at region server side. It has a list of + * active RScursor's id and use them in invoking next calls. Its a stateful + * object as per it has a list of active regions reference via the row but it + * doesn't cache previous call results. It just gives results back to the user + * (caller) + * @author hv + */ +public class CursorClient { + Map regionToCursorId = new HashMap(); + HTable table; + Class protocol; + private static final Log log = LogFactory.getLog(CursorClient.class); + int cursorBatchSize = 1; // default + boolean hasNext; + + public CursorClient(Class c, HTable table, + int cursorBatchSize) { + this.protocol = c; + this.table = table; + this.cursorBatchSize = cursorBatchSize; + } + + /** + * a generic CursorCallback. It will be invoked on ever "next" call. + */ + class CursorCallback implements Batch.Callback { + List resultsPerCall = new ArrayList(); + + @Override + public void update(byte[] region, byte[] row, T result) { + // check whether the region has some result or not, don't call it next + // time. + if (result != null) { + log.debug("in Cursor Callback, adding result" + result); + resultsPerCall.add(result); + hasNext = true; + } else { + log.debug("in cursor callback, removing region:" + row); + regionToCursorId.remove(row); + } + } + + List getResult() { + List temp = new ArrayList(resultsPerCall); + resultsPerCall.clear(); + return temp; // will give results from old calls, so make a + // clone and then return that and reset this + // attribute. + } + } + + CursorCallback callback = new CursorCallback(); + + /** + * cursor callback. It is used when instantiating cursors a regions. makes a + * map of reference row to region cursorId TODO: Can i merge these two call + * backs. can the above class be used for this. + */ + class CursorCallbackLong implements Batch.Callback { + + @Override + public void update(byte[] region, byte[] row, Long result) { + // check whether the region has some result or not, don't call it next + // time. + if (result != null) { + regionToCursorId.put(row, result); + hasNext = true; + } else { + regionToCursorId.remove(row); + } + } + } + + CursorCallbackLong initialCallback = new CursorCallbackLong(); + + public CursorCallbackLong getCursorCallbackLong() { + return initialCallback; + } + + public void setRegionToCursorId(Map m) { + this.regionToCursorId = m; + } + + /** + * This involves invoking an Exec object on regions that have a cursor. There + * is no need to explicitly call close on the region cursors as they are + * closed when a next() invocation returns in null result. + *

+ * The idea of invoking is because the call to each of the individual regions + * should have different cursorId value. So, one can't use the standard + * coprocessorExec. + * @return + * @throws IOException + * @throws InterruptedException + */ + public List next() throws IOException, InterruptedException { + List execList = new ArrayList(); + hasNext = false; + for (Map.Entry e : regionToCursorId.entrySet()) { + try { + // "next" is part of CursorProtocol, so will be there in CP. + Method m = protocol.getMethod("next", long.class, int.class); + Object[] args = new Object[] { e.getValue(), this.cursorBatchSize }; + Exec exec = new Exec(table.getConfiguration(), e.getKey(), protocol, m, + args); + execList.add(exec); + } catch (SecurityException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } catch (NoSuchMethodException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + } + this.table.processBatchExecsForCursor(execList, callback); + return callback.getResult(); + } + + /** + * am i done or still live? + * @return + */ + public boolean hasNext() { + return this.hasNext; + } +} Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/NGramClient.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/coprocessor/NGramClient.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/NGramClient.java (revision 0) @@ -0,0 +1,79 @@ +package org.apache.hadoop.hbase.client.coprocessor; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; +import org.apache.hadoop.hbase.coprocessor.NGramProtocol; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * client side interface to the NGram Implementation. + */ +public class NGramClient { + + final static byte[] table_name = "TestTable".getBytes(); + Configuration conf; + HTable table; + + public NGramClient(Configuration conf) throws IOException { + this.conf = conf; + table = new HTable(conf, table_name); + } + + public List getWordHistory(final String word) throws IOException, + Throwable { + class NGramCallBack implements Batch.Callback> { + List result; + + @Override + public void update(byte[] region, byte[] row, List result) { + this.result = result; + } + + List getResult() { + return result; + } + } + + final Get g = new Get(word.getBytes()); + NGramCallBack callBack = new NGramCallBack(); + final ColumnInterpreter ci = new NGramColumnInterpreter(); + table.coprocessorExec(NGramProtocol.class, g.getRow(), g.getRow(), + new Batch.Call>() { + @Override + public List call(NGramProtocol instance) throws IOException { + return instance.getWordHistory(word, g, ci); + } + }, callBack); + return callBack.getResult(); + } + + /** + * returns a cursor that gives a List on invoking next call. + * @param scan + * @return + * @throws Throwable + * @throws IOException + */ + public CursorClient getDetailofSimilarWords( + final Scan scan, final ColumnInterpreter ci) throws IOException, Throwable { + + CursorClient cursor = table.execCPAndGetCursor(NGramProtocol.class, + scan.getStartRow(), scan.getStopRow(), new Batch.Call() { + + @Override + public Long call(NGramProtocol instance) throws IOException { + return instance.getDetailofSimilarWords(scan, ci); + } + }, 2);// initial fetch size, lets say 2. + + return cursor; + + } +} Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/NGramColumnInterpreter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/coprocessor/NGramColumnInterpreter.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/NGramColumnInterpreter.java (revision 0) @@ -0,0 +1,73 @@ +package org.apache.hadoop.hbase.client.coprocessor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Sample NGram column interpreter. At the moment, only the getValue is + * implemented. Will evolve as per usage. + */ + +public class NGramColumnInterpreter implements + ColumnInterpreter { + + @Override + public String add(String l1, String l2) { + return null; + } + + @Override + public String castToReturnType(String o) { + return null; + } + + @Override + public int compare(String l1, String l2) { + return 0; + } + + @Override + public double divideForAvg(String o, Long l) { + return 0; + } + + @Override + public String getMaxValue() { + return null; + } + + @Override + public String getMinValue() { + return null; + } + + @Override + public String getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv) + throws IOException { + return Bytes.toString(kv.getValue()); + } + + @Override + public String increment(String o) { + return null; + } + + @Override + public String multiply(String o1, String o2) { + return null; + } + + @Override + public void readFields(DataInput arg0) throws IOException { + } + + @Override + public void write(DataOutput arg0) throws IOException { + } + +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (revision 1099357) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (working copy) @@ -29,6 +29,10 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.client.coprocessor.CursorClient; +import org.apache.hadoop.hbase.client.coprocessor.Exec; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet; @@ -431,6 +435,20 @@ byte[] row) { return table.coprocessorProxy(protocol, row); } + + @Override + public CursorClient execCPAndGetCursor( + Class protocol, byte[] startKey, byte[] endKey, + Call callable, int cursorBatchSize) throws IOException, + Throwable { + return table.execCPAndGetCursor(protocol, startKey, endKey, callable, cursorBatchSize); + } + + @Override + public void processBatchExecsForCursor(List rows, + Callback callback) throws IOException, InterruptedException { + // do nothing + } } /** The coprocessor */ Index: src/main/java/org/apache/hadoop/hbase/coprocessor/Cursor.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/Cursor.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/Cursor.java (revision 0) @@ -0,0 +1,34 @@ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.KeyValue; + +public interface Cursor { + + /** + * grab the next worth of rows of T type. + * @param num + * @return + * @throws IOException + */ + List next(int num) throws IOException; + + /** + * @param id + */ + void setCursorId(Long id); + + Long getCursorId(); + /** + * contains the logic of processing the entire row and computing the final result + * type from that row. + * its an important method. + * @param row + * @return + * @throws IOException + */ + T processRow(List row) throws IOException; + +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/CursorProtocol.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/CursorProtocol.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/CursorProtocol.java (revision 0) @@ -0,0 +1,22 @@ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; + +/** + * Cursor protocol. Defines mandatory methods that will be implemented + * by CP that needs a cursor (streaming results) functionality + */ +public interface CursorProtocol extends CoprocessorProtocol { + /** + * next invocation on a cursor + * @param + * @param cursorId + * @param resultSize: batch size of cursor. + * @return + * @throws IOException + */ + List next(long cursorId, int resultSize) throws IOException; +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/NGramImplementation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/NGramImplementation.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/NGramImplementation.java (revision 0) @@ -0,0 +1,120 @@ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Bytes; + +public class NGramImplementation extends BaseEndpointCoprocessor implements + NGramProtocol { + + private static final Log log = LogFactory.getLog(NGramImplementation.class); + + /** + * It doesn't uses streaming results, so need for a cursor. Its a plain old + * one result call. + */ + @Override + public List getWordHistory(String word, Get g, + ColumnInterpreter ci) throws IOException { + Result r = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion() + .get(g, null); + + String year, wordCount, pageCount, bookCount, temp; + List res = new ArrayList(); + for (KeyValue kv : r.raw()) { + year = Bytes.toString(kv.getQualifier()); + temp = ci.getValue(null, null, kv);// no need to pass anything as of now + // temp = "14#12#2" + res.add(word + ":" + year + "#" + temp); + } + return res; + } + + @Override + public Long getDetailofSimilarWords(final Scan scan, + final ColumnInterpreter ci) throws IOException { + log.debug("In the NGramImplementation with in region"); + + Cursor c = new Cursor() { + InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()) + .getRegion().getScanner(scan); + List res = new ArrayList(); + Long cursorId = 0l; + + @Override + public List next(int num) throws IOException { + res.clear(); + List temp = new ArrayList(); + int counter = 0; + boolean hasMoreRows = false; + do { + hasMoreRows = scanner.next(temp); + T val = processRow(temp); + temp.clear(); + if (val == null) + continue; + res.add(val); + counter++; + } while (hasMoreRows && counter < num); + return res.size() > 0 ? res : null; + } + + @Override + public void setCursorId(Long id) { + this.cursorId = id; + } + + @Override + public T processRow(List row) throws IOException { + // TODO fix it to make it generic + if (row != null && row.size() > 0) + return ci.getValue(null, null, row.get(0)); + + return null; + } + + @Override + public Long getCursorId() { + return this.cursorId; + } + + }; + c = ((RegionCoprocessorEnvironment) getEnvironment()).registerCursor(c); + return c.getCursorId(); + } + + @Override + public List next(long cursorId, int resultSize) throws IOException { + Cursor c = ((RegionCoprocessorEnvironment) getEnvironment()) + .getCursor(cursorId); + List res = null; + try { + res = c.next(resultSize); + String rName = ((RegionCoprocessorEnvironment) getEnvironment()) + .getRegion().getRegionNameAsString(); + log.debug("In the next method, return val is: " + rName + ": "); + return res; + } finally { + if (res != null) { + log.debug(res); + } else { + log.debug("null. Removing cursor"); + // seems the cursor is done here. + ((RegionCoprocessorEnvironment) getEnvironment()).closeCursor(cursorId); + + log.debug("null. Removing cursor"); + // seems the cursor is done here. + ((RegionCoprocessorEnvironment) getEnvironment()).closeCursor(cursorId); + } + } + } +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/NGramProtocol.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/NGramProtocol.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/NGramProtocol.java (revision 0) @@ -0,0 +1,32 @@ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Scan; + +public interface NGramProtocol extends CursorProtocol { + + /** + * Individual rows in the table are like: Word Det:1981 = 12,12,1 Det:1983 = + * 15,12,3 etc It returns year and frequencies for the given word. + * @param word + * @return + * @throws IOException + */ + List getWordHistory(String word, Get g, + ColumnInterpreter ci) throws IOException; + + /** + * This method gives words that are similar to the target word as defined in + * the scan's start and end rows. + * @param scan + * @param ci + * @param + * @return + * @throws IOException + */ + Long getDetailofSimilarWords(Scan scan, ColumnInterpreter ci) throws IOException; + +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java (revision 1099357) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java (working copy) @@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment { /** @return the region associated with this coprocessor */ @@ -30,4 +31,13 @@ /** @return reference to the region server services */ public RegionServerServices getRegionServerServices(); + /** + * Cursor methods cursor registry + * @throws LeaseStillHeldException + */ + Cursor registerCursor(Cursor c) throws LeaseStillHeldException; + + Cursor getCursor(long cursorId); + + void closeCursor(long cusorId); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1099357) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.*; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; @@ -61,6 +63,79 @@ private HRegion region; private RegionServerServices rsServices; + // =========================== + + CursorRegistry registry; + private Leases leases; + private Random random = new Random(System.nanoTime()); + + public Leases getLeases() { + return leases; + } + + public Random getRandom() { + return random; + } + + class CursorRegistry { + public CursorRegistry() { + leases = new Leases((int) region.getConf().getLong( + HConstants.REGION_CURSOR_LEASE_PERIOD_KEY, + HConstants.REGION_CURSOR_DEFAULT_LEASE_PERIOD), region.getConf() + .getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000)); + } + + private Map> cursorMap = new HashMap>(); + + public Cursor register(Cursor c) throws LeaseStillHeldException { + Long l = random.nextLong(); + c.setCursorId(l); + cursorMap.put(l, c); + leases.createLease("CursorLease:" + l, new CursorLeaseListner(l)); + return c; + } + + public Cursor getCursor(Long id) { + return cursorMap.get(id); + } + + void closeCursor(long id) { + cursorMap.remove(id); + } + } + + private class CursorLeaseListner implements LeaseListener { + long cursorId; + + public CursorLeaseListner(long cursorId) { + this.cursorId = cursorId; + } + + @Override + public void leaseExpired() { + LOG.info("Lease expired for cursor with id: " + this.cursorId); + registry.closeCursor(this.cursorId); + } + } + + @Override + public Cursor registerCursor(Cursor c) + throws LeaseStillHeldException { + return registry.register(c); + // return null; + } + + @Override + public Cursor getCursor(long cursorId) { + return registry.getCursor(cursorId); + } + + @Override + public void closeCursor(long cursorId) { + LOG.debug("removing cursor: " + cursorId); + registry.closeCursor(cursorId); + } + /** * Constructor * @param impl the coprocessor instance @@ -72,6 +147,9 @@ super(impl, priority, seq); this.region = region; this.rsServices = services; + + registry = new CursorRegistry(); + } /** @return the region */ Index: src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (revision 1099357) +++ src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (working copy) @@ -34,6 +34,10 @@ import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.client.coprocessor.CursorClient; +import org.apache.hadoop.hbase.client.coprocessor.Exec; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.util.StringUtils; @@ -643,4 +647,16 @@ throws IOException, Throwable { throw new UnsupportedOperationException("coprocessorExec not implemented"); } + + @Override + public CursorClient execCPAndGetCursor( + Class protocol, byte[] startKey, byte[] endKey, Call callable, + int cursorBatchSize) throws IOException, Throwable { + return null; + } + + @Override + public void processBatchExecsForCursor(List rows, + Callback callback) throws IOException, InterruptedException { + } } Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestNGramsProtocol.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestNGramsProtocol.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestNGramsProtocol.java (revision 0) @@ -0,0 +1,103 @@ +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.CursorClient; +import org.apache.hadoop.hbase.client.coprocessor.NGramClient; +import org.apache.hadoop.hbase.client.coprocessor.NGramColumnInterpreter; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestNGramsProtocol { + + private static final byte[] TEST_TABLE = Bytes.toBytes("TestTable"); + private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily"); + private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier"); + + private static final int ROWSIZE = 20; + private static final int rowSeperator1 = 2; + private static final int rowSeperator2 = 14; + private static byte[][] ROWS = makeN(ROWSIZE); + + private static HBaseTestingUtility util = new HBaseTestingUtility(); + private static MiniHBaseCluster cluster = null; + private static Configuration conf = util.getConfiguration(); + + /** + * A set up method to start the test cluster. AggregateProtocolImpl is + * registered and will be loaded during region startup. + * @throws Exception + */ + @BeforeClass + public static void setupBeforeClass() throws Exception { + + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.coprocessor.NGramImplementation"); + + util.startMiniCluster(2); + cluster = util.getMiniHBaseCluster(); + HTable table = util.createTable(TEST_TABLE, TEST_FAMILY); + util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY, + new byte[][] { HConstants.EMPTY_BYTE_ARRAY, ROWS[rowSeperator1] + ,ROWS[rowSeperator2] + }); + for (int i = 0; i < ROWSIZE; i++) { + Put put = new Put(ROWS[i]); + String val = "Value#"+i; + put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(val)); + table.put(put); + } + } + + + private static byte[][] makeN(int rowsize) { + byte[][]rows = new byte[rowsize][]; + String val; + for(int i =0; i< rowsize; i++){ + val = "America"+i; + rows[i] = Bytes.toBytes(val); + } + + return rows; + } + + + @Test + public void testGetSimilarWords() throws Throwable { + String targetWord = "America"; + NGramClient client = new NGramClient(conf); + + final NGramColumnInterpreter nci = new NGramColumnInterpreter(); + final Scan scan = new Scan(); + byte[] startKey = Bytes.toBytes(targetWord); + int len = startKey.length; + byte[] stopKey = Bytes.toBytes(targetWord); + stopKey[len - 1] = (byte) (stopKey[len - 1] + 4); + scan.setStartRow(startKey); + scan.setStopRow(stopKey); + // make the call to create the cursor + CursorClient cursor = client.getDetailofSimilarWords(scan, nci); + int i = 1; + while(cursor.hasNext()) { + System.out.println(i++ +"th fetch: "+cursor.next()); + + } + + } + /** + * Shutting down the cluster + * @throws Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + +}