Index: src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (revision 1336785) +++ src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (working copy) @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.filter; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -41,12 +43,14 @@ import org.apache.hadoop.hbase.util.Bytes; import org.junit.experimental.categories.Category; +import com.google.common.base.Throwables; + /** * Test filters at the HRegion doorstep. */ @Category(SmallTests.class) public class TestFilter extends HBaseTestCase { - private final Log LOG = LogFactory.getLog(this.getClass()); + private final static Log LOG = LogFactory.getLog(TestFilter.class); private HRegion region; // @@ -1616,7 +1620,42 @@ verifyScanFullNoValues(s, expectedKVs, useLen); } } + + /** + * Filter which makes sleeps for a second between each row of a scan. + * This can be useful for manual testing of bugs like HBASE-5973. For example: + * + * create 't1', 'f1' + * 1.upto(100) { |x| put 't1', 'r' + x.to_s, 'f1:q1', 'hi' } + * import org.apache.hadoop.hbase.filter.TestFilter + * scan 't1', { FILTER => TestFilter::SlowScanFilter.new(), CACHE => 50 } + * + */ + public static class SlowScanFilter extends FilterBase { + private static Thread ipcHandlerThread = null; + + @Override + public void readFields(DataInput arg0) throws IOException { + } + @Override + public void write(DataOutput arg0) throws IOException { + } + + @Override + public boolean filterRow() { + ipcHandlerThread = Thread.currentThread(); + try { + LOG.info("Handler thread " + ipcHandlerThread + " sleeping in filter..."); + Thread.sleep(1000); + } catch (InterruptedException e) { + Throwables.propagate(e); + } + return super.filterRow(); + } + } + + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); Index: src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (revision 1336785) +++ src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (working copy) @@ -186,7 +186,7 @@ if (!delay) { return UNDELAYED; } - final Delayable call = rpcServer.getCurrentCall(); + final Delayable call = HBaseServer.getCurrentCall(); call.startDelay(delayReturnValue); new Thread() { public void run() { @@ -288,7 +288,7 @@ public int test(boolean delay) { if (!delay) return UNDELAYED; - Delayable call = rpcServer.getCurrentCall(); + Delayable call = HBaseServer.getCurrentCall(); call.startDelay(true); try { call.endDelayThrowing(new Exception("Something went wrong")); Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1336785) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -102,6 +102,8 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.HBaseRPC; +import org.apache.hadoop.hbase.ipc.HBaseServer; +import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -3355,7 +3357,16 @@ } private boolean nextInternal(int limit) throws IOException { + RpcCallContext rpcCall = HBaseServer.getCurrentCall(); while (true) { + if (rpcCall != null) { + // If a user specifies a too-restrictive or too-slow scanner, the + // client might time out and disconnect while the server side + // is still processing the request. We should abort aggressively + // in that case. + rpcCall.throwExceptionIfCallerDisconnected(); + } + byte [] currentRow = peekRow(); if (isStopRow(currentRow)) { if (filter != null && filter.hasFilterRow()) { Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1336785) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -256,7 +256,7 @@ } /** A call queued for handling. */ - protected class Call implements Delayable { + protected class Call implements RpcCallContext { protected int id; // the client's call id protected Writable param; // the parameter passed protected Connection connection; // connection to client @@ -412,6 +412,16 @@ public synchronized boolean isReturnValueDelayed() { return this.delayReturnValue; } + + @Override + public void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException { + if (!connection.channel.isOpen()) { + long afterTime = System.currentTimeMillis() - timestamp; + throw new CallerDisconnectedException( + "Aborting call " + this + " after " + afterTime + " ms, since " + + "caller disconnected"); + } + } public long getSize() { return this.size; @@ -1777,7 +1787,12 @@ return (nBytes > 0) ? nBytes : ret; } - public Delayable getCurrentCall() { + /** + * Needed for delayed calls. We need to be able to store the current call + * so that we can complete it later. + * @return Call the server is currently handling. + */ + public static RpcCallContext getCurrentCall() { return CurCall.get(); } } Index: src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (revision 1336785) +++ src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (working copy) @@ -60,12 +60,6 @@ void startThreads(); - /** - * Needed for delayed calls. We need to be able to store the current call - * so that we can complete it later. - * @return Call the server is currently handling. - */ - Delayable getCurrentCall(); /** * Returns the metrics instance for reporting RPC call statistics Index: src/main/ruby/hbase/table.rb =================================================================== --- src/main/ruby/hbase/table.rb (revision 1336785) +++ src/main/ruby/hbase/table.rb (working copy) @@ -220,7 +220,8 @@ stoprow = args["STOPROW"] timestamp = args["TIMESTAMP"] columns = args["COLUMNS"] || args["COLUMN"] || [] - cache = args["CACHE_BLOCKS"] || true + cache_blocks = args["CACHE_BLOCKS"] || true + cache = args["CACHE"] || 0 versions = args["VERSIONS"] || 1 timerange = args[TIMERANGE] raw = args["RAW"] || false @@ -253,7 +254,8 @@ end scan.setTimeStamp(timestamp) if timestamp - scan.setCacheBlocks(cache) + scan.setCacheBlocks(cache_blocks) + scan.setCaching(cache) if cache > 0 scan.setMaxVersions(versions) if versions > 1 scan.setTimeRange(timerange[0], timerange[1]) if timerange scan.setRaw(raw) Index: src/main/ruby/shell/commands/scan.rb =================================================================== --- src/main/ruby/shell/commands/scan.rb (revision 1336785) +++ src/main/ruby/shell/commands/scan.rb (working copy) @@ -26,7 +26,7 @@ Scan a table; pass table name and optionally a dictionary of scanner specifications. Scanner specifications may include one or more of: TIMERANGE, FILTER, LIMIT, STARTROW, STOPROW, TIMESTAMP, MAXLENGTH, -or COLUMNS. +or COLUMNS, CACHE If no columns are specified, all columns will be scanned. To scan all members of a column family, leave the qualifier empty as in