commit 29b1d7afcde7780e19dd62bca72555835fb84297 Author: Todd Lipcon Date: Wed May 9 12:08:05 2012 -0700 HBASE-5973. add ability to detect caller disconnection diff --git src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java new file mode 100644 index 0000000..f2b7a60 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/ipc/CallerDisconnectedException.java @@ -0,0 +1,37 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +/** + * Exception indicating that the remote host making this IPC lost its + * IPC connection. This will never be returned back to a client, + * but is only used for logging on the server side, etc. + */ +public class CallerDisconnectedException extends IOException { + public CallerDisconnectedException(String msg) { + super(msg); + } + + private static final long serialVersionUID = 1L; + + +} diff --git src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 973c7cb..bcafd01 100644 --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -266,7 +266,7 @@ public abstract class HBaseServer implements RpcServer { } /** A call queued for handling. */ - protected class Call implements Delayable { + protected class Call implements RpcCallContext { protected int id; // the client's call id protected Writable param; // the parameter passed protected Connection connection; // connection to client @@ -409,6 +409,16 @@ public abstract class HBaseServer implements RpcServer { public synchronized boolean isReturnValueDelayed() { return this.delayReturnValue; } + + @Override + public void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException { + if (!connection.channel.isOpen()) { + long afterTime = System.currentTimeMillis() - timestamp; + throw new CallerDisconnectedException( + "Aborting call " + this + " after " + afterTime + " ms, since " + + "caller disconnected"); + } + } public long getSize() { return this.size; @@ -1763,7 +1773,12 @@ public abstract class HBaseServer implements RpcServer { return (nBytes > 0) ? nBytes : ret; } - public Delayable getCurrentCall() { + /** + * Needed for delayed calls. We need to be able to store the current call + * so that we can complete it later. + * @return Call the server is currently handling. + */ + public static RpcCallContext getCurrentCall() { return CurCall.get(); } } diff --git src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java new file mode 100644 index 0000000..d9138f5 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java @@ -0,0 +1,31 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +public interface RpcCallContext extends Delayable { + + /** + * Throw an exception if the caller who made this IPC call has disconnected. + * If called from outside the context of IPC, this does nothing. + * @throws CallerDisconnectedException + */ + void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException; + +} diff --git src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index ce2cb4e..ddc89d7 100644 --- src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -62,12 +62,6 @@ public interface RpcServer { void startThreads(); - /** - * Needed for delayed calls. We need to be able to store the current call - * so that we can complete it later. - * @return Call the server is currently handling. - */ - Delayable getCurrentCall(); /** * Returns the metrics instance for reporting RPC call statistics diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 6e1b475..df0e1b9 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -106,6 +106,8 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.HBaseRPC; +import org.apache.hadoop.hbase.ipc.HBaseServer; +import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -3421,7 +3423,16 @@ public class HRegion implements HeapSize { // , Writable{ } private boolean nextInternal(int limit, String metric) throws IOException { + RpcCallContext rpcCall = HBaseServer.getCurrentCall(); while (true) { + if (rpcCall != null) { + // If a user specifies a too-restrictive or too-slow scanner, the + // client might time out and disconnect while the server side + // is still processing the request. We should abort aggressively + // in that case. + rpcCall.throwExceptionIfCallerDisconnected(); + } + byte [] currentRow = peekRow(); if (isStopRow(currentRow)) { if (filter != null && filter.hasFilterRow()) { diff --git src/main/ruby/hbase/table.rb src/main/ruby/hbase/table.rb index 0560d33..0757db0 100644 --- src/main/ruby/hbase/table.rb +++ src/main/ruby/hbase/table.rb @@ -307,7 +307,8 @@ EOF stoprow = args["STOPROW"] timestamp = args["TIMESTAMP"] columns = args["COLUMNS"] || args["COLUMN"] || [] - cache = args["CACHE_BLOCKS"] || true + cache_blocks = args["CACHE_BLOCKS"] || true + cache = args["CACHE"] || 0 versions = args["VERSIONS"] || 1 timerange = args[TIMERANGE] raw = args["RAW"] || false @@ -340,7 +341,8 @@ EOF end scan.setTimeStamp(timestamp) if timestamp - scan.setCacheBlocks(cache) + scan.setCacheBlocks(cache_blocks) + scan.setCaching(cache) if cache > 0 scan.setMaxVersions(versions) if versions > 1 scan.setTimeRange(timerange[0], timerange[1]) if timerange scan.setRaw(raw) diff --git src/main/ruby/shell/commands/scan.rb src/main/ruby/shell/commands/scan.rb index 869d274..74d5ba4 100644 --- src/main/ruby/shell/commands/scan.rb +++ src/main/ruby/shell/commands/scan.rb @@ -26,7 +26,7 @@ module Shell Scan a table; pass table name and optionally a dictionary of scanner specifications. Scanner specifications may include one or more of: TIMERANGE, FILTER, LIMIT, STARTROW, STOPROW, TIMESTAMP, MAXLENGTH, -or COLUMNS. +or COLUMNS, CACHE If no columns are specified, all columns will be scanned. To scan all members of a column family, leave the qualifier empty as in diff --git src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java index 396bba3..130d84d 100644 --- src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java +++ src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.filter; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -41,12 +43,14 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.junit.experimental.categories.Category; +import com.google.common.base.Throwables; + /** * Test filters at the HRegion doorstep. */ @Category(SmallTests.class) public class TestFilter extends HBaseTestCase { - private final Log LOG = LogFactory.getLog(this.getClass()); + private final static Log LOG = LogFactory.getLog(TestFilter.class); private HRegion region; // @@ -1616,6 +1620,41 @@ public class TestFilter extends HBaseTestCase { verifyScanFullNoValues(s, expectedKVs, useLen); } } + + /** + * Filter which makes sleeps for a second between each row of a scan. + * This can be useful for manual testing of bugs like HBASE-5973. For example: + * + * create 't1', 'f1' + * 1.upto(100) { |x| put 't1', 'r' + x.to_s, 'f1:q1', 'hi' } + * import org.apache.hadoop.hbase.filter.TestFilter + * scan 't1', { FILTER => TestFilter::SlowScanFilter.new(), CACHE => 50 } + * + */ + public static class SlowScanFilter extends FilterBase { + private static Thread ipcHandlerThread = null; + + @Override + public void readFields(DataInput arg0) throws IOException { + } + + @Override + public void write(DataOutput arg0) throws IOException { + } + + @Override + public boolean filterRow() { + ipcHandlerThread = Thread.currentThread(); + try { + LOG.info("Handler thread " + ipcHandlerThread + " sleeping in filter..."); + Thread.sleep(1000); + } catch (InterruptedException e) { + Throwables.propagate(e); + } + return super.filterRow(); + } + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = diff --git src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java index 220e133..977db98 100644 --- src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java +++ src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java @@ -187,7 +187,7 @@ public class TestDelayedRpc { if (!delay) { return UNDELAYED; } - final Delayable call = rpcServer.getCurrentCall(); + final Delayable call = HBaseServer.getCurrentCall(); call.startDelay(delayReturnValue); new Thread() { public void run() { @@ -289,7 +289,7 @@ public class TestDelayedRpc { public int test(boolean delay) { if (!delay) return UNDELAYED; - Delayable call = rpcServer.getCurrentCall(); + Delayable call = HBaseServer.getCurrentCall(); call.startDelay(true); try { call.endDelayThrowing(new Exception("Something went wrong"));