Index: src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java (revision 1336797) @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +/** + * Exception indicating that the remote host making this IPC lost its + * IPC connection. This will never be returned back to a client, + * but is only used for logging on the server side, etc. + */ +public class CallerDisconnectedException extends IOException { + public CallerDisconnectedException(String msg) { + super(msg); + } + + private static final long serialVersionUID = 1L; + + +} Index: src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java (revision 1336797) @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +public interface RpcCallContext extends Delayable { + + /** + * Throw an exception if the caller who made this IPC call has disconnected. + * If called from outside the context of IPC, this does nothing. + * @throws CallerDisconnectedException + */ + void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException; + +} Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1336787) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -256,7 +256,7 @@ } /** A call queued for handling. */ - protected class Call implements Delayable { + protected class Call implements RpcCallContext { protected int id; // the client's call id protected Writable param; // the parameter passed protected Connection connection; // connection to client @@ -412,6 +412,16 @@ public synchronized boolean isReturnValueDelayed() { return this.delayReturnValue; } + + @Override + public void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException { + if (!connection.channel.isOpen()) { + long afterTime = System.currentTimeMillis() - timestamp; + throw new CallerDisconnectedException( + "Aborting call " + this + " after " + afterTime + " ms, since " + + "caller disconnected"); + } + } public long getSize() { return this.size; @@ -1777,7 +1787,12 @@ return (nBytes > 0) ? nBytes : ret; } - public Delayable getCurrentCall() { + /** + * Needed for delayed calls. We need to be able to store the current call + * so that we can complete it later. + * @return Call the server is currently handling. + */ + public static RpcCallContext getCurrentCall() { return CurCall.get(); } } Index: src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (revision 1336787) +++ src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (working copy) @@ -60,12 +60,6 @@ void startThreads(); - /** - * Needed for delayed calls. We need to be able to store the current call - * so that we can complete it later. - * @return Call the server is currently handling. - */ - Delayable getCurrentCall(); /** * Returns the metrics instance for reporting RPC call statistics Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1336787) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -102,6 +102,8 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.HBaseRPC; +import org.apache.hadoop.hbase.ipc.HBaseServer; +import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -3355,7 +3357,16 @@ } private boolean nextInternal(int limit) throws IOException { + RpcCallContext rpcCall = HBaseServer.getCurrentCall(); while (true) { + if (rpcCall != null) { + // If a user specifies a too-restrictive or too-slow scanner, the + // client might time out and disconnect while the server side + // is still processing the request. We should abort aggressively + // in that case. + rpcCall.throwExceptionIfCallerDisconnected(); + } + byte [] currentRow = peekRow(); if (isStopRow(currentRow)) { if (filter != null && filter.hasFilterRow()) { Index: src/main/ruby/hbase/table.rb =================================================================== --- src/main/ruby/hbase/table.rb (revision 1336787) +++ src/main/ruby/hbase/table.rb (working copy) @@ -220,7 +220,8 @@ stoprow = args["STOPROW"] timestamp = args["TIMESTAMP"] columns = args["COLUMNS"] || args["COLUMN"] || [] - cache = args["CACHE_BLOCKS"] || true + cache_blocks = args["CACHE_BLOCKS"] || true + cache = args["CACHE"] || 0 versions = args["VERSIONS"] || 1 timerange = args[TIMERANGE] raw = args["RAW"] || false @@ -253,7 +254,8 @@ end scan.setTimeStamp(timestamp) if timestamp - scan.setCacheBlocks(cache) + scan.setCacheBlocks(cache_blocks) + scan.setCaching(cache) if cache > 0 scan.setMaxVersions(versions) if versions > 1 scan.setTimeRange(timerange[0], timerange[1]) if timerange scan.setRaw(raw) Index: src/main/ruby/shell/commands/scan.rb =================================================================== --- src/main/ruby/shell/commands/scan.rb (revision 1336787) +++ src/main/ruby/shell/commands/scan.rb (working copy) @@ -26,7 +26,7 @@ Scan a table; pass table name and optionally a dictionary of scanner specifications. Scanner specifications may include one or more of: TIMERANGE, FILTER, LIMIT, STARTROW, STOPROW, TIMESTAMP, MAXLENGTH, -or COLUMNS. +or COLUMNS, CACHE If no columns are specified, all columns will be scanned. To scan all members of a column family, leave the qualifier empty as in Index: src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (revision 1336787) +++ src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java (working copy) @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.filter; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -41,12 +43,14 @@ import org.apache.hadoop.hbase.util.Bytes; import org.junit.experimental.categories.Category; +import com.google.common.base.Throwables; + /** * Test filters at the HRegion doorstep. */ @Category(SmallTests.class) public class TestFilter extends HBaseTestCase { - private final Log LOG = LogFactory.getLog(this.getClass()); + private final static Log LOG = LogFactory.getLog(TestFilter.class); private HRegion region; // @@ -1616,7 +1620,42 @@ verifyScanFullNoValues(s, expectedKVs, useLen); } } + + /** + * Filter which makes sleeps for a second between each row of a scan. + * This can be useful for manual testing of bugs like HBASE-5973. For example: + * + * create 't1', 'f1' + * 1.upto(100) { |x| put 't1', 'r' + x.to_s, 'f1:q1', 'hi' } + * import org.apache.hadoop.hbase.filter.TestFilter + * scan 't1', { FILTER => TestFilter::SlowScanFilter.new(), CACHE => 50 } + * + */ + public static class SlowScanFilter extends FilterBase { + private static Thread ipcHandlerThread = null; + + @Override + public void readFields(DataInput arg0) throws IOException { + } + @Override + public void write(DataOutput arg0) throws IOException { + } + + @Override + public boolean filterRow() { + ipcHandlerThread = Thread.currentThread(); + try { + LOG.info("Handler thread " + ipcHandlerThread + " sleeping in filter..."); + Thread.sleep(1000); + } catch (InterruptedException e) { + Throwables.propagate(e); + } + return super.filterRow(); + } + } + + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); Index: src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (revision 1336787) +++ src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (working copy) @@ -186,7 +186,7 @@ if (!delay) { return UNDELAYED; } - final Delayable call = rpcServer.getCurrentCall(); + final Delayable call = HBaseServer.getCurrentCall(); call.startDelay(delayReturnValue); new Thread() { public void run() { @@ -288,7 +288,7 @@ public int test(boolean delay) { if (!delay) return UNDELAYED; - Delayable call = rpcServer.getCurrentCall(); + Delayable call = HBaseServer.getCurrentCall(); call.startDelay(true); try { call.endDelayThrowing(new Exception("Something went wrong"));