From 2aee3cf0720e036cbda425e900f302e4fb70d8ea Mon Sep 17 00:00:00 2001 From: Vivek Date: Fri, 22 Jul 2016 10:33:57 -0700 Subject: [PATCH] HBASE-15866:Split hbase.rpc.timeout into *.read.timeout and *.write.timeout --- .../apache/hadoop/hbase/client/AsyncProcess.java | 5 ++-- .../hadoop/hbase/client/BufferedMutatorImpl.java | 8 ++++++- .../hbase/client/ConnectionImplementation.java | 3 ++- .../org/apache/hadoop/hbase/client/HTable.java | 28 ++++++++++++++-------- .../hadoop/hbase/client/HTableMultiplexer.java | 6 ++++- .../hadoop/hbase/client/TestAsyncProcess.java | 11 +++++---- .../java/org/apache/hadoop/hbase/HConstants.java | 10 ++++++++ 7 files changed, 51 insertions(+), 20 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 812e4bf..0576f09 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -281,7 +281,7 @@ class AsyncProcess { public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, - RpcControllerFactory rpcFactory) { + RpcControllerFactory rpcFactory, int rpcTimeout) { if (hc == null) { throw new IllegalArgumentException("ClusterConnection cannot be null."); } @@ -297,8 +297,7 @@ class AsyncProcess { // how many times we could try in total, one more than retry number this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; - this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.timeout = rpcTimeout; this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index e98ad4e..305cefd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -72,6 +73,7 @@ public class BufferedMutatorImpl implements BufferedMutator { private final int maxKeyValueSize; private boolean closed = false; private final ExecutorService pool; + private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor @VisibleForTesting protected AsyncProcess ap; // non-final so can be overridden in test @@ -94,8 +96,12 @@ public class BufferedMutatorImpl implements BufferedMutator { this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); + this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory); + ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 1fe29c8..6a716f4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1823,7 +1823,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { // For tests to override. protected AsyncProcess createAsyncProcess(Configuration conf) { // No default pool available. - return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory); + int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, rpcTimeout); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index fbd9f51..cb39e8e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -113,6 +113,8 @@ public class HTable implements Table { private ExecutorService pool; // For Multi & Scan private int operationTimeout; // global timeout for each blocking method with retrying rpc private int rpcTimeout; // timeout for each rpc request + private int readRpcTimeout; // timeout for each read rpc request + private int writeRpcTimeout; // timeout for each write rpc request private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupConnectionOnClose; // close the connection in close() private Consistency defaultConsistency = Consistency.STRONG; @@ -214,6 +216,12 @@ public class HTable implements Table { connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); this.scannerCaching = connConfiguration.getScannerCaching(); this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); if (this.rpcCallerFactory == null) { @@ -430,7 +438,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory.newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.newCaller(readRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -528,7 +536,7 @@ public class HTable implements Table { } } }; - rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -654,7 +662,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -686,7 +694,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -742,7 +750,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -772,7 +780,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -803,7 +811,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -833,7 +841,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -864,7 +872,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory. newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -1282,7 +1290,7 @@ public class HTable implements Table { AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool, RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), - true, RpcControllerFactory.instantiate(configuration)); + true, RpcControllerFactory.instantiate(configuration), rpcTimeout); AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs, new Callback() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index f1bbcb3..2a1f1e6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -442,6 +442,7 @@ public class HTableMultiplexer { private final ScheduledExecutorService executor; private final int maxRetryInQueue; private final AtomicInteger retryInQueue = new AtomicInteger(0); + private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, @@ -451,7 +452,10 @@ public class HTableMultiplexer { this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); - this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory); + this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout); this.executor = executor; this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index d943316..0aa9704 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -138,6 +138,7 @@ public class TestAsyncProcess { final AtomicInteger nbActions = new AtomicInteger(); public List allReqs = new ArrayList(); public AtomicInteger callsCt = new AtomicInteger(); + private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); @Override protected AsyncRequestFutureImpl createAsyncRequestFuture(TableName tableName, @@ -157,14 +158,14 @@ public class TestAsyncProcess { public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue(), new CountingThreadFactory(nbThreads)), - new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout); } public MyAsyncProcess( ClusterConnection hc, Configuration conf, boolean useGlobalErrors) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())), - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout); } public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors, @@ -176,7 +177,7 @@ public class TestAsyncProcess { throw new RejectedExecutionException("test under failure"); } }, - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout); } @Override @@ -1111,10 +1112,12 @@ public class TestAsyncProcess { } static class AsyncProcessForThrowableCheck extends AsyncProcess { + private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf, ExecutorService pool) { super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory( - conf)); + conf), rpcTimeout); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 256c374..afd1256 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -819,6 +819,16 @@ public final class HConstants { public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout"; /** + * timeout for each read RPC + */ + public static final String HBASE_RPC_READ_TIMEOUT_KEY = "hbase.rpc.read.timeout"; + + /** + * timeout for each write RPC + */ + public static final String HBASE_RPC_WRITE_TIMEOUT_KEY = "hbase.rpc.write.timeout"; + + /** * Default value of {@link #HBASE_RPC_TIMEOUT_KEY} */ public static final int DEFAULT_HBASE_RPC_TIMEOUT = 60000; -- 2.9.0