diff --git src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java new file mode 100644 index 0000000..bf1f7c7 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +/** + * Exception indicating that the remote host making this IPC lost its + * IPC connection. This will never be returned back to a client, + * but is only used for logging on the server side, etc. + */ +public class CallerDisconnectedException extends IOException { + public CallerDisconnectedException(String msg) { + super(msg); + } + + private static final long serialVersionUID = 1L; + + +} diff --git src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 1f3598b..c78cdd7 100644 --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -217,7 +217,7 @@ public abstract class HBaseServer { } /** A call queued for handling. */ - private static class Call { + private static class Call implements RpcCallContext { protected int id; // the client's call id protected Writable param; // the parameter passed protected Connection connection; // connection to client @@ -241,6 +241,16 @@ public abstract class HBaseServer { public void setResponse(ByteBuffer response) { this.response = response; } + + @Override + public void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException { + if (!connection.channel.isOpen()) { + long afterTime = System.currentTimeMillis() - timestamp; + throw new CallerDisconnectedException( + "Aborting call " + this + " after " + afterTime + " ms, since " + + "caller disconnected"); + } + } } /** Listens on the socket. Creates jobs for the handler threads*/ @@ -1400,4 +1410,13 @@ public abstract class HBaseServer { int nBytes = initialRemaining - buf.remaining(); return (nBytes > 0) ? nBytes : ret; } + + /** + * Needed for delayed calls. We need to be able to store the current call + * so that we can complete it later. + * @return Call the server is currently handling. + */ + public static RpcCallContext getCurrentCall() { + return CurCall.get(); + } } diff --git src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java new file mode 100644 index 0000000..1283345 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +public interface RpcCallContext { + + /** + * Throw an exception if the caller who made this IPC call has disconnected. + * If called from outside the context of IPC, this does nothing. + * @throws CallerDisconnectedException + */ + void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException; + +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 88abd82..a256b3c 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -77,6 +77,8 @@ import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.ipc.HBaseServer; +import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -2449,7 +2451,16 @@ public class HRegion implements HeapSize { // , Writable{ } private boolean nextInternal(int limit) throws IOException { + RpcCallContext rpcCall = HBaseServer.getCurrentCall(); while (true) { + if (rpcCall != null) { + // If a user specifies a too-restrictive or too-slow scanner, the + // client might time out and disconnect while the server side + // is still processing the request. We should abort aggressively + // in that case. + rpcCall.throwExceptionIfCallerDisconnected(); + } + byte [] currentRow = peekRow(); if (isStopRow(currentRow)) { if (filter != null && filter.hasFilterRow()) { diff --git src/main/ruby/hbase/table.rb src/main/ruby/hbase/table.rb index f4c2637..f711517 100644 --- src/main/ruby/hbase/table.rb +++ src/main/ruby/hbase/table.rb @@ -219,7 +219,8 @@ module Hbase stoprow = args["STOPROW"] timestamp = args["TIMESTAMP"] columns = args["COLUMNS"] || args["COLUMN"] || get_all_columns - cache = args["CACHE_BLOCKS"] || true + cache_blocks = args["CACHE_BLOCKS"] || true + cache = args["CACHE"] || 0 versions = args["VERSIONS"] || 1 timerange = args[TIMERANGE] @@ -238,7 +239,8 @@ module Hbase columns.each { |c| scan.addColumns(c) } scan.setFilter(filter) if filter scan.setTimeStamp(timestamp) if timestamp - scan.setCacheBlocks(cache) + scan.setCacheBlocks(cache_blocks) + scan.setCaching(cache) if cache > 0 scan.setMaxVersions(versions) if versions > 1 scan.setTimeRange(timerange[0], timerange[1]) if timerange else diff --git src/main/ruby/shell/commands/scan.rb src/main/ruby/shell/commands/scan.rb index c68c6f1..db4bb60 100644 --- src/main/ruby/shell/commands/scan.rb +++ src/main/ruby/shell/commands/scan.rb @@ -26,7 +26,7 @@ module Shell Scan a table; pass table name and optionally a dictionary of scanner specifications. Scanner specifications may include one or more of: TIMERANGE, FILTER, LIMIT, STARTROW, STOPROW, TIMESTAMP, MAXLENGTH, -or COLUMNS. If no columns are specified, all columns will be scanned. +or COLUMNS, CACHE. If no columns are specified, all columns will be scanned. To scan all members of a column family, leave the qualifier empty as in 'col_family:'. diff --git src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java index 3444785..ca74246 100644 --- src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java +++ src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.filter; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -44,11 +46,13 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.base.Throwables; + /** * Test filters at the HRegion doorstep. */ public class TestFilter extends HBaseTestCase { - private final Log LOG = LogFactory.getLog(this.getClass()); + private final static Log LOG = LogFactory.getLog(TestFilter.class); private HRegion region; // @@ -1511,4 +1515,38 @@ public class TestFilter extends HBaseTestCase { verifyScanFullNoValues(s, expectedKVs, useLen); } } + + /** + * Filter which makes sleeps for a second between each row of a scan. + * This can be useful for manual testing of bugs like HBASE-5973. For example: + * + * create 't1', 'f1' + * 1.upto(100) { |x| put 't1', 'r' + x.to_s, 'f1:q1', 'hi' } + * import org.apache.hadoop.hbase.filter.TestFilter + * scan 't1', { FILTER => TestFilter::SlowScanFilter.new(), CACHE => 50 } + * + */ + public static class SlowScanFilter extends FilterBase { + private static Thread ipcHandlerThread = null; + + @Override + public void readFields(DataInput arg0) throws IOException { + } + + @Override + public void write(DataOutput arg0) throws IOException { + } + + @Override + public boolean filterRow() { + ipcHandlerThread = Thread.currentThread(); + try { + LOG.info("Handler thread " + ipcHandlerThread + " sleeping in filter..."); + Thread.sleep(1000); + } catch (InterruptedException e) { + Throwables.propagate(e); + } + return super.filterRow(); + } + } }