diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index bc1a686..5bef075 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -243,9 +243,11 @@ public class HTable implements HTableInterface { */ private void finishSetup() throws IOException { this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW); - this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT - : this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? + this.configuration.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT): + this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.writeBufferSize = this.configuration.getLong( "hbase.client.write.buffer", 2097152); this.clearBufferOnFail = true; @@ -547,8 +549,7 @@ public class HTable implements HTableInterface { if (scan.getCaching() <= 0) { scan.setCaching(getScannerCaching()); } - return new ClientScanner(getConfiguration(), scan, getTableName(), - this.connection); + return new ClientScanner(getConfiguration(), scan, getTableName(), this.connection); } /** @@ -888,6 +889,7 @@ public class HTable implements HTableInterface { try { GetRequest request = RequestConverter.buildGetRequest( location.getRegionInfo().getRegionName(), get, true); + GetResponse response = stub.get(null, request); return response.getExists(); } catch (ServiceException se) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java index 994efa6..d454231 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java @@ -206,13 +206,13 @@ public abstract class ServerCallable implements Callable { } // If, after the planned sleep, there won't be enough time left, we stop now. - if (((this.endTime - this.globalStartTime) + MIN_RPC_TIMEOUT + expectedSleep) > - this.callTimeout) { + long duration = duration(expectedSleep); + if (duration > this.callTimeout) { throw (SocketTimeoutException) new SocketTimeoutException( "Call to access row '" + Bytes.toString(row) + "' on table '" + Bytes.toString(tableName) + "' failed on timeout. " + " callTimeout=" + this.callTimeout + - ", time=" + (this.endTime - this.startTime)).initCause(t); + ", duration=" + duration).initCause(t); } } finally { afterCall(); @@ -227,6 +227,14 @@ public abstract class ServerCallable implements Callable { } /** + * @param expectedSleep + * @return Calculate how long a call took + */ + private long duration(final long expectedSleep) { + return (this.endTime - this.globalStartTime) + MIN_RPC_TIMEOUT + expectedSleep; + } + + /** * Run this instance against the server once. * @return an object of type T * @throws IOException if a remote or network exception occurs diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 6ff45d7..8c25b99 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -69,7 +69,6 @@ import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; import org.apache.hadoop.hbase.security.SecurityInfo; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -1492,6 +1491,14 @@ public class RpcClient { return rpcTimeout.get(); } + /** + * Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given + * default timeout. + */ + public static int getRpcTimeout(int defaultTimeout) { + return Math.min(defaultTimeout, rpcTimeout.get()); + } + public static void resetRpcTimeout() { rpcTimeout.remove(); } @@ -1576,7 +1583,9 @@ public class RpcClient { final User ticket, final int rpcTimeout) { this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); this.rpcClient = rpcClient; - this.rpcTimeout = rpcTimeout; + // Set the rpc timeout to be the minimum of configured timeout and whatever the current + // thread local setting is. + this.rpcTimeout = getRpcTimeout(rpcTimeout); this.ticket = ticket; } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java index fec02be..5936c91 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java @@ -17,7 +17,10 @@ */ package org.apache.hadoop.hbase.client; +import static org.junit.Assert.*; + import java.io.IOException; +import java.net.SocketTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +34,7 @@ import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface; +import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -87,6 +91,40 @@ public class TestClientNoCluster { } } + /** + * Test that operation timeout prevails over rpc default timeout and retries, etc. + * @throws IOException + */ + @Test + public void testRocTimeout() throws IOException { + Configuration localConfig = HBaseConfiguration.create(this.conf); + // This override mocks up our exists/get call to throw a RegionServerStoppedException. + localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName()); + int pause = 10; + localConfig.setInt("hbase.client.pause", pause); + localConfig.setInt("hbase.client.retries.number", 10); + // Set the operation timeout to be < the pause. Expectation is that after first pause, we will + // fail out of the rpc because the rpc timeout will have been set to the operation tiemout + // and it has expired. Otherwise, if this functionality is broke, all retries will be run -- + // all ten of them -- and we'll get the RetriesExhaustedException exception. + localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1); + HTable table = new HTable(localConfig, HConstants.META_TABLE_NAME); + Throwable t = null; + try { + // An exists call turns into a get w/ a flag. + table.exists(new Get(Bytes.toBytes("abc"))); + } catch (SocketTimeoutException e) { + // I expect this exception. + LOG.info("Got expected exception", e); + t = e; + } catch (RetriesExhaustedException e) { + throw e; + } finally { + table.close(); + } + assertTrue(t != null); + } + // Disabled. Used looking at timeouts and retries. @Test public void testTimeout() throws IOException { this.conf.set("hbase.client.connection.impl", TimeoutConnection.class.getName()); @@ -236,4 +274,31 @@ public class TestClientNoCluster { return this.stub; } } + + /** + * Override to check we are setting rpc timeout right. + */ + static class RpcTimeoutConnection + extends HConnectionManager.HConnectionImplementation { + final ClientService.BlockingInterface stub; + + RpcTimeoutConnection(Configuration conf, boolean managed) + throws IOException { + super(conf, managed); + // Mock up my stub so an exists call -- which turns into a get -- throws an exception + this.stub = Mockito.mock(ClientService.BlockingInterface.class); + try { + Mockito.when(stub.get((RpcController)Mockito.any(), + (ClientProtos.GetRequest)Mockito.any())). + thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))); + } catch (ServiceException e) { + throw new IOException(e); + } + } + + @Override + public BlockingInterface getClient(ServerName sn) throws IOException { + return this.stub; + } + } } \ No newline at end of file diff --git a/hbase-client/src/test/resources/log4j.properties b/hbase-client/src/test/resources/log4j.properties new file mode 100644 index 0000000..6ee91ef --- /dev/null +++ b/hbase-client/src/test/resources/log4j.properties @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Define some default values that can be overridden by system properties +hbase.root.logger=INFO,console +hbase.log.dir=. +hbase.log.file=hbase.log + +# Define the root logger to the system property "hbase.root.logger". +log4j.rootLogger=${hbase.root.logger} + +# Logging Threshold +log4j.threshhold=ALL + +# +# Daily Rolling File Appender +# +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${hbase.log.dir}/${hbase.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout +# Debugging Pattern format +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n + + +# +# console +# Add "console" to rootlogger above if you want to use this +# +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n + +# Custom Logging levels + +#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG + +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.apache.hadoop.hbase=DEBUG + +#These two settings are workarounds against spurious logs from the minicluster. +#See HBASE-4709 +log4j.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=ERROR +log4j.org.apache.hadoop.metrics2.util.MBeans=ERROR +# Enable this to get detailed connection error/retry logging. +# log4j.logger.org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation=TRACE diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 07ec3b6..6af47d7 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -264,6 +264,10 @@ public final class HConstants { /** Parameter name for HBase client operation timeout, which overrides RPC timeout */ public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout"; + /** Parameter name for HBase client operation timeout, which overrides RPC timeout */ + public static final String HBASE_CLIENT_META_OPERATION_TIMEOUT = + "hbase.client.meta.operation.timeout"; + /** Default HBase client operation timeout, which is tantamount to a blocking call */ public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT = Integer.MAX_VALUE;