From cb4a78cc9bb765bbe1018fb9ba3fb998358e1536 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Mon, 18 Apr 2016 16:37:37 +0800 Subject: [PATCH] HBASE-15645 hbase.rpc.timeout is not used in operations of HTable --- .../org/apache/hadoop/hbase/client/HBaseAdmin.java | 18 +++++---- .../org/apache/hadoop/hbase/client/HTable.java | 46 ++++++++++++++++------ .../hbase/client/RpcRetryingCallerFactory.java | 18 ++++++++- .../hadoop/hbase/client/RpcRetryingCallerImpl.java | 16 ++++++-- .../java/org/apache/hadoop/hbase/client/Table.java | 32 +++++++++++++++ .../java/org/apache/hadoop/hbase/HConstants.java | 4 +- hbase-common/src/main/resources/hbase-default.xml | 11 +++++- .../hadoop/hbase/rest/client/RemoteHTable.java | 20 ++++++++++ .../apache/hadoop/hbase/client/HTableWrapper.java | 20 ++++++++++ .../org/apache/hadoop/hbase/client/TestHCM.java | 30 ++++++++++++-- .../hadoop/hbase/regionserver/RegionAsTable.java | 20 ++++++++++ 11 files changed, 207 insertions(+), 28 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 9541967..df64129 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -215,6 +215,7 @@ public class HBaseAdmin implements Admin { private final int syncWaitTimeout; private boolean aborted; private int operationTimeout; + private int rpcTimeout; private RpcRetryingCallerFactory rpcCallerFactory; private RpcControllerFactory rpcControllerFactory; @@ -239,6 +240,8 @@ public class HBaseAdmin implements Admin { "hbase.client.retries.longer.multiplier", 10); this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.syncWaitTimeout = this.conf.getInt( "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min @@ -402,12 +405,12 @@ public class HBaseAdmin implements Admin { @Override public HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException { return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory, - operationTimeout); + operationTimeout, rpcTimeout); } static HTableDescriptor getTableDescriptor(final TableName tableName, HConnection connection, RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory, - int operationTimeout) throws IOException { + int operationTimeout, int rpcTimeout) throws IOException { if (tableName == null) return null; HTableDescriptor htd = executeCallable(new MasterCallable(connection) { @Override @@ -424,7 +427,7 @@ public class HBaseAdmin implements Admin { } return null; } - }, rpcCallerFactory, operationTimeout); + }, rpcCallerFactory, operationTimeout, rpcTimeout); if (htd != null) { return htd; } @@ -2841,12 +2844,13 @@ public class HBaseAdmin implements Admin { private & Closeable, V> V executeCallable(C callable) throws IOException { - return executeCallable(callable, rpcCallerFactory, operationTimeout); + return executeCallable(callable, rpcCallerFactory, operationTimeout, rpcTimeout); } - private static & Closeable, V> V executeCallable(C callable, - RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout) throws IOException { - RpcRetryingCaller caller = rpcCallerFactory.newCaller(); + static private & Closeable, V> V executeCallable(C callable, + RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, + int rpcTimeout) throws IOException { + RpcRetryingCaller caller = rpcCallerFactory.newCaller(rpcTimeout); try { return caller.callWithRetries(callable, operationTimeout); } finally { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index befc671..bf9ec22 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -112,7 +112,8 @@ public class HTable implements HTableInterface { protected int scannerCaching; protected long scannerMaxResultSize; private ExecutorService pool; // For Multi & Scan - private int operationTimeout; + private int operationTimeout; // global timeout for each blocking method with retrying rpc + private int rpcTimeout; // timeout for each rpc request private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupConnectionOnClose; // close the connection in close() private Consistency defaultConsistency = Consistency.STRONG; @@ -212,6 +213,8 @@ public class HTable implements HTableInterface { this.operationTimeout = tableName.isSystemTable() ? connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); + this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.scannerCaching = connConfiguration.getScannerCaching(); this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); if (this.rpcCallerFactory == null) { @@ -266,7 +269,7 @@ public class HTable implements HTableInterface { @Override public HTableDescriptor getTableDescriptor() throws IOException { HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, - rpcControllerFactory, operationTimeout); + rpcControllerFactory, operationTimeout, rpcTimeout); if (htd != null) { return new UnmodifyableHTableDescriptor(htd); } @@ -439,7 +442,8 @@ public class HTable implements HTableInterface { } } }; - return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory.newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } // Call that takes into account the replica @@ -525,7 +529,8 @@ public class HTable implements HTableInterface { } } }; - rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -654,7 +659,8 @@ public class HTable implements HTableInterface { } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -685,7 +691,8 @@ public class HTable implements HTableInterface { } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -740,7 +747,8 @@ public class HTable implements HTableInterface { } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -769,7 +777,8 @@ public class HTable implements HTableInterface { } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -799,7 +808,8 @@ public class HTable implements HTableInterface { } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -828,7 +838,8 @@ public class HTable implements HTableInterface { } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -858,7 +869,8 @@ public class HTable implements HTableInterface { } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory. newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -1203,15 +1215,27 @@ public class HTable implements HTableInterface { return getKeysAndRegionsInRange(start, end, true).getFirst(); } + @Override public void setOperationTimeout(int operationTimeout) { this.operationTimeout = operationTimeout; } + @Override public int getOperationTimeout() { return operationTimeout; } @Override + public int getRpcTimeout() { + return rpcTimeout; + } + + @Override + public void setRpcTimeout(int rpcTimeout) { + this.rpcTimeout = rpcTimeout; + } + + @Override public String toString() { return tableName + ";" + connection; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 550812f..1c723c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -33,6 +33,7 @@ public class RpcRetryingCallerFactory { protected final Configuration conf; private final long pause; private final int retries; + private final int rpcTimeout; private final RetryingCallerInterceptor interceptor; private final int startLogErrorsCnt; private final boolean enableBackPressure; @@ -53,6 +54,7 @@ public class RpcRetryingCallerFactory { this.interceptor = interceptor; enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,HConstants.DEFAULT_HBASE_RPC_TIMEOUT); } /** @@ -62,11 +64,25 @@ public class RpcRetryingCallerFactory { this.stats = statisticTracker; } + /** + * Create a new RetryingCaller with specific rpc timeout. + */ + public RpcRetryingCaller newCaller(int rpcTimeout) { + // We store the values in the factory instance. This way, constructing new objects + // is cheap as it does not require parsing a complex structure. + RpcRetryingCaller caller = new RpcRetryingCallerImpl(pause, retries, interceptor, + startLogErrorsCnt, rpcTimeout); + return caller; + } + + /** + * Create a new RetryingCaller with configured rpc timeout. + */ public RpcRetryingCaller newCaller() { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. RpcRetryingCaller caller = new RpcRetryingCallerImpl(pause, retries, interceptor, - startLogErrorsCnt); + startLogErrorsCnt, rpcTimeout); return caller; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index 6ce4956..ccd0d2c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -57,23 +57,25 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { private final long pause; private final int maxAttempts;// how many times to try + private final int rpcTimeout;// timeout for each rpc request private final AtomicBoolean cancelled = new AtomicBoolean(false); private final RetryingCallerInterceptor interceptor; private final RetryingCallerInterceptorContext context; private final RetryingTimeTracker tracker; public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) { - this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt); + this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0); } public RpcRetryingCallerImpl(long pause, int retries, - RetryingCallerInterceptor interceptor, int startLogErrorsCnt) { + RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) { this.pause = pause; this.maxAttempts = retries + 1; this.interceptor = interceptor; context = interceptor.createEmptyContext(); this.startLogErrorsCnt = startLogErrorsCnt; this.tracker = new RetryingTimeTracker(); + this.rpcTimeout = rpcTimeout; } @Override @@ -96,7 +98,7 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { try { callable.prepare(tries != 0); // if called with false, check table status on ZK interceptor.intercept(context.prepare(callable, tries)); - return callable.call(tracker.getRemainingTime(callTimeout)); + return callable.call(getTimeout(callTimeout)); } catch (PreemptiveFastFailException e) { throw e; } catch (Throwable t) { @@ -209,6 +211,14 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { return t; } + private int getTimeout(int callTimeout){ + int timeout = tracker.getRemainingTime(callTimeout); + if (timeout <= 0 || rpcTimeout > 0 && rpcTimeout < timeout){ + timeout = rpcTimeout; + } + return timeout; + } + @Override public String toString() { return "RpcRetryingCaller{" + "globalStartTime=" + tracker.getStartTime() + diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 3e9db00..1b0f387 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -566,4 +566,36 @@ public interface Table extends Closeable { */ boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException; + + /** + * Set timeout (millisecond) of each operation in this Table instance, will override the value + * of hbase.client.operation.timeout in configuration. + * Operation timeout is a top-level restriction that makes sure a blocking method will not be + * blocked more than this. In each operation, if rpc request fails because of timeout or + * other reason, it will retry until success or throw a RetriesExhaustedException. But if the + * total time being blocking reach the operation timeout before retries exhausted, it will break + * early and throw SocketTimeoutException. + * @param operationTimeout the total timeout of each operation in millisecond. + */ + public void setOperationTimeout(int operationTimeout); + + /** + * Get timeout (millisecond) of each operation for in Table instance. + */ + public int getOperationTimeout(); + + /** + * Set timeout (millisecond) of each rpc request in operations of this Table instance, will + * override the value of hbase.rpc.timeout in configuration. + * If a rpc request waiting too long, it will stop waiting and send a new request to retry until + * retries exhausted or operation timeout reached. + * @param rpcTimeout the timeout of each rpc request in millisecond. + */ + public void setRpcTimeout(int rpcTimeout); + + /** + * Get timeout (millisecond) of each rpc request in this Table instance. + */ + public int getRpcTimeout(); + } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 0c6244f..88cfa9a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -286,10 +286,10 @@ public final class HConstants { /** Parameter name for HBase client IPC pool size */ public static final String HBASE_CLIENT_IPC_POOL_SIZE = "hbase.client.ipc.pool.size"; - /** Parameter name for HBase client operation timeout, which overrides RPC timeout */ + /** Parameter name for HBase client operation timeout. */ public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout"; - /** Parameter name for HBase client operation timeout, which overrides RPC timeout */ + /** Parameter name for HBase client operation timeout. */ public static final String HBASE_CLIENT_META_OPERATION_TIMEOUT = "hbase.client.meta.operation.timeout"; diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index e50e89e..d3f601c 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -918,11 +918,20 @@ possible configurations would overwhelm and obscure the important. hbase.rpc.timeout 60000 - This is for the RPC layer to define how long HBase client applications + This is for the RPC layer to define how long (millisecond) HBase client applications take for a remote call to time out. It uses pings to check connections but will eventually throw a TimeoutException. + hbase.client.operation.timeout + 1200000 + Operation timeout is a top-level restriction (millisecond) that makes sure a + blocking operation in Table will not be blocked more than this. In each operation, if rpc + request fails because of timeout or other reason, it will retry until success or throw + RetriesExhaustedException. But if the total time being blocking reach the operation timeout + before retries exhausted, it will break early and throw SocketTimeoutException. + + hbase.cells.scanned.per.heartbeat.check 10000 The number of cells scanned in between heartbeat checks. Heartbeat diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index 2a30e99..b9e393e 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -848,4 +848,24 @@ public class RemoteHTable implements Table { CompareOp compareOp, byte[] value, RowMutations rm) throws IOException { throw new UnsupportedOperationException("checkAndMutate not implemented"); } + + @Override + public void setOperationTimeout(int operationTimeout) { + throw new UnsupportedOperationException(); + } + + @Override + public int getOperationTimeout() { + throw new UnsupportedOperationException(); + } + + @Override + public void setRpcTimeout(int rpcTimeout) { + throw new UnsupportedOperationException(); + } + + @Override + public int getRpcTimeout() { + throw new UnsupportedOperationException(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java index 7865cc0..292a935 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java @@ -294,4 +294,24 @@ public final class HTableWrapper implements Table { CompareOp compareOp, byte[] value, RowMutations rm) throws IOException { return table.checkAndMutate(row, family, qualifier, compareOp, value, rm); } + + @Override + public void setOperationTimeout(int operationTimeout) { + table.setOperationTimeout(operationTimeout); + } + + @Override + public int getOperationTimeout() { + return table.getOperationTimeout(); + } + + @Override + public void setRpcTimeout(int rpcTimeout) { + table.setRpcTimeout(rpcTimeout); + } + + @Override + public int getRpcTimeout() { + return table.getRpcTimeout(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 6370127..a0f91f4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -128,11 +128,21 @@ public class TestHCM { } } + public static class SleepCoprocessor extends BaseRegionObserver { + public static final int SLEEP_TIME = 5000; + @Override + public void preGetOp(final ObserverContext e, + final Get get, final List results) throws IOException { + Threads.sleep(SLEEP_TIME); + } + } + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); // Up the handlers; this test needs more than usual. TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); TEST_UTIL.startMiniCluster(2); } @@ -300,11 +310,10 @@ public class TestHCM { public void testOperationTimeout() throws Exception { HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout"); hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); - Table t = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, null); - + Table t = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}); if (t instanceof HTable) { HTable table = (HTable) t; - + table.setRpcTimeout(Integer.MAX_VALUE); // Check that it works if the timeout is big enough table.setOperationTimeout(120 * 1000); table.get(new Get(FAM_NAM)); @@ -328,6 +337,21 @@ public class TestHCM { } } + @Test(expected = RetriesExhaustedException.class) + public void testRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + assert t instanceof HTable; + HTable table = (HTable) t; + table.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + table.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + table.get(new Get(FAM_NAM)); + } + } + private void testConnectionClose(boolean allowsInterrupt) throws Exception { TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java index f65bc5d..770c39b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java @@ -321,4 +321,24 @@ public class RegionAsTable implements Table { throws IOException { throw new UnsupportedOperationException(); } + + @Override + public void setOperationTimeout(int operationTimeout) { + throw new UnsupportedOperationException(); + } + + @Override + public int getOperationTimeout() { + throw new UnsupportedOperationException(); + } + + @Override + public void setRpcTimeout(int rpcTimeout) { + throw new UnsupportedOperationException(); + } + + @Override + public int getRpcTimeout() { + throw new UnsupportedOperationException(); + } } \ No newline at end of file -- 2.6.4 (Apple Git-63)