From 3b26e0a20bd37e527450faaa9916905e8a009e75 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Tue, 11 Oct 2016 17:12:54 +0800 Subject: [PATCH] HBASE-16664 Timeout logic in AsyncProcess is broken --- .../apache/hadoop/hbase/client/AsyncProcess.java | 75 +++++---- .../hadoop/hbase/client/BufferedMutatorImpl.java | 21 ++- .../hadoop/hbase/client/ConnectionManager.java | 3 +- .../org/apache/hadoop/hbase/client/HTable.java | 16 +- .../hadoop/hbase/client/HTableMultiplexer.java | 5 +- .../hadoop/hbase/client/MultiServerCallable.java | 16 +- .../hadoop/hbase/client/TestAsyncProcess.java | 22 ++- .../hbase/client/HConnectionTestingUtility.java | 5 +- .../org/apache/hadoop/hbase/client/TestHCM.java | 172 ++++++++++++++++++--- 9 files changed, 273 insertions(+), 62 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index cdcb1b2..ac04767 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -232,7 +232,8 @@ class AsyncProcess { protected final long pause; protected int numTries; protected int serverTrackerTimeout; - protected int timeout; + protected int rpcTimeout; + protected int operationTimeout; protected long primaryCallTimeoutMicroseconds; // End configuration settings. @@ -275,7 +276,8 @@ class AsyncProcess { } public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, - RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) { + RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory, + int rpcTimeout) { if (hc == null) { throw new IllegalArgumentException("HConnection cannot be null."); } @@ -290,8 +292,9 @@ class AsyncProcess { HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.rpcTimeout = rpcTimeout; + this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, @@ -336,6 +339,14 @@ class AsyncProcess { DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS); } + public void setRpcTimeout(int rpcTimeout) { + this.rpcTimeout = rpcTimeout; + } + + public void setOperationTimeout(int operationTimeout) { + this.operationTimeout = operationTimeout; + } + /** * @return pool if non null, otherwise returns this.pool if non null, otherwise throws * RuntimeException @@ -561,12 +572,12 @@ class AsyncProcess { */ public AsyncRequestFuture submitAll(TableName tableName, List rows, Batch.Callback callback, Object[] results) { - return submitAll(null, tableName, rows, callback, results, null, timeout); + return submitAll(null, tableName, rows, callback, results, null, operationTimeout, rpcTimeout); } public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List rows, Batch.Callback callback, Object[] results) { - return submitAll(pool, tableName, rows, callback, results, null, timeout); + return submitAll(pool, tableName, rows, callback, results, null, operationTimeout, rpcTimeout); } /** * Submit immediately the list of rows, whatever the server status. Kept for backward @@ -580,7 +591,7 @@ class AsyncProcess { */ public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List rows, Batch.Callback callback, Object[] results, - PayloadCarryingServerCallable callable, int curTimeout) { + PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) { List> actions = new ArrayList>(rows.size()); // The position will be used by the processBatch to match the object array returned. @@ -600,7 +611,7 @@ class AsyncProcess { } AsyncRequestFutureImpl ars = createAsyncRequestFuture( tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null, - callable, curTimeout); + callable, operationTimeout, rpcTimeout); ars.groupAndSendMultiAction(actions, 1); return ars; } @@ -752,12 +763,12 @@ class AsyncProcess { if (callable == null) { callable = createCallable(server, tableName, multiAction); } - RpcRetryingCaller caller = createCaller(callable); + RpcRetryingCaller caller = createCaller(callable, rpcTimeout); try { if (callsInProgress != null) { callsInProgress.add(callable); } - res = caller.callWithoutRetries(callable, currentCallTotalTimeout); + res = caller.callWithoutRetries(callable, operationTimeout); if (res == null) { // Cancelled return; @@ -823,11 +834,14 @@ class AsyncProcess { private final boolean hasAnyReplicaGets; private final long nonceGroup; private PayloadCarryingServerCallable currentCallable; - private int currentCallTotalTimeout; + private int operationTimeout; + private int rpcTimeout; + private RetryingTimeTracker tracker; public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, - Batch.Callback callback, PayloadCarryingServerCallable callable, int timeout) { + Batch.Callback callback, PayloadCarryingServerCallable callable, + int operationTimeout, int rpcTimeout) { this.pool = pool; this.callback = callback; this.nonceGroup = nonceGroup; @@ -897,7 +911,8 @@ class AsyncProcess { this.errorsByServer = createServerErrorTracker(); this.errors = (globalErrors != null) ? globalErrors : new BatchErrors(); this.currentCallable = callable; - this.currentCallTotalTimeout = timeout; + this.operationTimeout = operationTimeout; + this.rpcTimeout = rpcTimeout; } public Set getCallsInProgress() { @@ -1717,6 +1732,19 @@ class AsyncProcess { waitUntilDone(); return results; } + + /** + * Create a callable. Isolated to be easily overridden in the tests. + */ + @VisibleForTesting + protected MultiServerCallable createCallable(final ServerName server, + TableName tableName, final MultiAction multi) { + if (tracker == null) { + tracker = new RetryingTimeTracker(); + } + return new MultiServerCallable(connection, tableName, server, + AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker); + } } private void updateStats(ServerName server, Map results) { @@ -1738,10 +1766,10 @@ class AsyncProcess { protected AsyncRequestFutureImpl createAsyncRequestFuture( TableName tableName, List> actions, long nonceGroup, ExecutorService pool, Batch.Callback callback, Object[] results, boolean needResults, - PayloadCarryingServerCallable callable, int curTimeout) { + PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) { return new AsyncRequestFutureImpl( tableName, actions, nonceGroup, getPool(pool), needResults, - results, callback, callable, curTimeout); + results, callback, callable, operationTimeout, rpcTimeout); } @VisibleForTesting @@ -1750,24 +1778,17 @@ class AsyncProcess { TableName tableName, List> actions, long nonceGroup, ExecutorService pool, Batch.Callback callback, Object[] results, boolean needResults) { return createAsyncRequestFuture( - tableName, actions, nonceGroup, pool, callback, results, needResults, null, timeout); - } - - /** - * Create a callable. Isolated to be easily overridden in the tests. - */ - @VisibleForTesting - protected MultiServerCallable createCallable(final ServerName server, - TableName tableName, final MultiAction multi) { - return new MultiServerCallable(connection, tableName, server, this.rpcFactory, multi); + tableName, actions, nonceGroup, pool, callback, results, needResults, null, + operationTimeout, rpcTimeout); } /** * Create a caller. Isolated to be easily overridden in the tests. */ @VisibleForTesting - protected RpcRetryingCaller createCaller(PayloadCarryingServerCallable callable) { - return rpcCallerFactory. newCaller(); + protected RpcRetryingCaller createCaller(PayloadCarryingServerCallable callable, + int rpcTimeout) { + return rpcCallerFactory. newCaller(rpcTimeout); } @VisibleForTesting diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 273f2e4..d722821 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -71,6 +72,8 @@ public class BufferedMutatorImpl implements BufferedMutator { private final int maxKeyValueSize; private boolean closed = false; private final ExecutorService pool; + private int rpcTimeout; + private int operationTimeout; @VisibleForTesting protected AsyncProcess ap; // non-final so can be overridden in test @@ -92,9 +95,13 @@ public class BufferedMutatorImpl implements BufferedMutator { params.getWriteBufferSize() : tableConf.getWriteBufferSize(); this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); - + this.rpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.operationTimeout = conn.getConfiguration().getInt( + HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory); + ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, rpcTimeout); } @Override @@ -279,6 +286,16 @@ public class BufferedMutatorImpl implements BufferedMutator { return this.writeBufferSize; } + public void setRpcTimeout(int rpcTimeout) { + this.rpcTimeout = rpcTimeout; + this.ap.setRpcTimeout(rpcTimeout); + } + + public void setOperationTimeout(int operationTimeout) { + this.operationTimeout = operationTimeout; + this.ap.setOperationTimeout(operationTimeout); + } + /** * This is used for legacy purposes in {@link HTable#getWriteBuffer()} only. This should not beÓ * called from production uses. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index b055884..4e9d208a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -2333,7 +2333,8 @@ class ConnectionManager { // For tests to override. protected AsyncProcess createAsyncProcess(Configuration conf) { // No default pool available. - return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory); + return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, + rpcTimeout); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index efa03c6..6e7c584 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -1071,7 +1071,7 @@ public class HTable implements HTableInterface, RegionLocator { } }; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, null, callable, operationTimeout); + null, null, callable, operationTimeout, rpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -1384,7 +1384,7 @@ public class HTable implements HTableInterface, RegionLocator { * */ Object[] results = new Object[rm.getMutations().size()]; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, results, callable, operationTimeout); + null, results, callable, operationTimeout, rpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -1800,6 +1800,10 @@ public class HTable implements HTableInterface, RegionLocator { public void setOperationTimeout(int operationTimeout) { this.operationTimeout = operationTimeout; + if (mutator != null) { + mutator.setOperationTimeout(operationTimeout); + } + multiAp.setOperationTimeout(operationTimeout); } public int getOperationTimeout() { @@ -1808,6 +1812,10 @@ public class HTable implements HTableInterface, RegionLocator { @Override public void setRpcTimeout(int rpcTimeout) { this.rpcTimeout = rpcTimeout; + if (mutator != null) { + mutator.setRpcTimeout(rpcTimeout); + } + multiAp.setRpcTimeout(rpcTimeout); } @Override public int getRpcTimeout() { @@ -1891,7 +1899,7 @@ public class HTable implements HTableInterface, RegionLocator { AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool, RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), - true, RpcControllerFactory.instantiate(configuration)); + true, RpcControllerFactory.instantiate(configuration), rpcTimeout); AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs, new Callback() { @@ -1941,6 +1949,8 @@ public class HTable implements HTableInterface, RegionLocator { .writeBufferSize(connConfiguration.getWriteBufferSize()) .maxKeyValueSize(connConfiguration.getMaxKeyValueSize()) ); + mutator.setRpcTimeout(rpcTimeout); + mutator.setOperationTimeout(operationTimeout); } return mutator; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index dfb0104..6863eab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -450,7 +450,10 @@ public class HTableMultiplexer { this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); - this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory); + int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, + rpcTimeout); this.executor = executor; this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index d0b4c81..82a156f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -52,9 +52,12 @@ import com.google.protobuf.ServiceException; class MultiServerCallable extends PayloadCarryingServerCallable { private final MultiAction multiAction; private final boolean cellBlock; + private final RetryingTimeTracker tracker; + private final int rpcTimeout; MultiServerCallable(final ClusterConnection connection, final TableName tableName, - final ServerName location, RpcControllerFactory rpcFactory, final MultiAction multi) { + final ServerName location, RpcControllerFactory rpcFactory, final MultiAction multi, + int rpcTimeout, RetryingTimeTracker tracker) { super(connection, tableName, null, rpcFactory); this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. @@ -62,6 +65,8 @@ class MultiServerCallable extends PayloadCarryingServerCallable extends PayloadCarryingServerCallable(), new CountingThreadFactory(nbThreads)), - new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); } public MyAsyncProcess( ClusterConnection hc, Configuration conf, boolean useGlobalErrors) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())), - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); } public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors, @@ -174,7 +178,9 @@ public class TestAsyncProcess { throw new RejectedExecutionException("test under failure"); } }, - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); } @Override @@ -187,7 +193,7 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller createCaller( - PayloadCarryingServerCallable callable) { + PayloadCarryingServerCallable callable, int rpcTimeout) { callsCt.incrementAndGet(); MultiServerCallable callable1 = (MultiServerCallable) callable; final MultiResponse mr = createMultiResponse( @@ -250,7 +256,7 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller createCaller( - PayloadCarryingServerCallable callable) { + PayloadCarryingServerCallable callable, int rpcTimeout) { callsCt.incrementAndGet(); return new CallerWithFailure(ioe); } @@ -287,7 +293,7 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller createCaller( - PayloadCarryingServerCallable payloadCallable) { + PayloadCarryingServerCallable payloadCallable, int rpcTimeout) { MultiServerCallable callable = (MultiServerCallable) payloadCallable; final MultiResponse mr = createMultiResponse( callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { @@ -1118,7 +1124,9 @@ public class TestAsyncProcess { public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf, ExecutorService pool) { super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory( - conf)); + conf), + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 7b22ba4..dcac58f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HRegionInfo; @@ -159,7 +160,9 @@ public class HConnectionTestingUtility { Mockito.when(c.getNonceGenerator()).thenReturn(ng); Mockito.when(c.getAsyncProcess()).thenReturn( new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false, - RpcControllerFactory.instantiate(conf))); + RpcControllerFactory.instantiate(conf), + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT))); Mockito.doNothing().when(c).incCount(); Mockito.doNothing().when(c).decCount(); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 3307d42..c6482fe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -67,12 +67,14 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -127,14 +129,14 @@ public class TestHCM { * This copro sleeps 20 second. The first call it fails. The second time, it works. */ public static class SleepAndFailFirstTime extends BaseRegionObserver { - static final AtomicLong ct = new AtomicLong(0); - static final String SLEEP_TIME_CONF_KEY = - "hbase.coprocessor.SleepAndFailFirstTime.sleepTime"; - static final long DEFAULT_SLEEP_TIME = 20000; - static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME); + static final AtomicLong ct = new AtomicLong(0); + static final String SLEEP_TIME_CONF_KEY = + "hbase.coprocessor.SleepAndFailFirstTime.sleepTime"; + static final long DEFAULT_SLEEP_TIME = 20000; + static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME); - public SleepAndFailFirstTime() { - } + public SleepAndFailFirstTime() { + } @Override public void postOpen(ObserverContext c) { @@ -145,12 +147,42 @@ public class TestHCM { @Override public void preGetOp(final ObserverContext e, - final Get get, final List results) throws IOException { + final Get get, final List results) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + } + + @Override + public void prePut(final ObserverContext e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + } + + @Override + public void preDelete(final ObserverContext e, + final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { Threads.sleep(sleepTime.get()); - if (ct.incrementAndGet() == 1){ + if (ct.incrementAndGet() == 1) { throw new IOException("first call I fail"); } } + + @Override + public Result preIncrement(final ObserverContext e, + final Increment increment) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + return super.preIncrement(e, increment); + } + } public static class SleepCoprocessor extends BaseRegionObserver { @@ -160,16 +192,26 @@ public class TestHCM { final Get get, final List results) throws IOException { Threads.sleep(SLEEP_TIME); } - } - public static class SleepWriteCoprocessor extends BaseRegionObserver { - public static final int SLEEP_TIME = 5000; @Override public Result preIncrement(final ObserverContext e, - final Increment increment) throws IOException { + final Increment increment) throws IOException { Threads.sleep(SLEEP_TIME); return super.preIncrement(e, increment); } + + @Override + public void preDelete(final ObserverContext e, final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(SLEEP_TIME); + } + + @Override + public void prePut(final ObserverContext e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(SLEEP_TIME); + } + } public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver { @@ -358,11 +400,12 @@ public class TestHCM { * timeouted when the server answers. */ @Test - public void testOperationTimeout() throws Exception { - HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout"); + public void testGetOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetOperationTimeout"); hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); - HTable table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration()); + Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration()); table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); // Check that it works if the timeout is big enough table.setOperationTimeout(120 * 1000); table.get(new Get(FAM_NAM)); @@ -385,6 +428,99 @@ public class TestHCM { } } + @Test + public void testPutOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutOperationTimeout"); + hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); + Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration()); + table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); + // Check that it works if the timeout is big enough + table.setOperationTimeout(120 * 1000); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + + // Resetting and retrying. Will fail this time, not enough time for the second try + SleepAndFailFirstTime.ct.set(0); + try { + table.setOperationTimeout(30 * 1000); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + Assert.fail("We expect an exception here"); + } catch (RetriesExhaustedWithDetailsException e) { + // The client has a CallTimeout class, but it's not shared.We're not very clean today, + // in the general case you can expect the call to stop, but the exception may vary. + // In this test however, we're sure that it will be a socket timeout. + LOG.info("We received an exception, as expected ", e); + } catch (IOException e) { + Assert.fail("Wrong exception:" + e.getMessage()); + } finally { + table.close(); + } + } + + @Test + public void testDeleteOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteOperationTimeout"); + hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); + Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration()); + table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); + // Check that it works if the timeout is big enough + table.setOperationTimeout(120 * 1000); + table.delete(new Delete(FAM_NAM)); + + // Resetting and retrying. Will fail this time, not enough time for the second try + SleepAndFailFirstTime.ct.set(0); + try { + table.setOperationTimeout(30 * 1000); + table.delete(new Delete(FAM_NAM)); + Assert.fail("We expect an exception here"); + } catch (IOException e) { + // The client has a CallTimeout class, but it's not shared.We're not very clean today, + // in the general case you can expect the call to stop, but the exception may vary. + // In this test however, we're sure that it will be a socket timeout. + LOG.info("We received an exception, as expected ", e); + } finally { + table.close(); + } + } + @Test + public void testDeleteRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + Delete d = new Delete(FAM_NAM); + d.addColumn(FAM_NAM, FAM_NAM, 1); + t.delete(d); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + } + + @Test + public void testPutRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + Put p = new Put(FAM_NAM); + p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM); + t.put(p); + fail("Write should not have succeeded"); + } catch (IOException e) { + // expected + } + + } + @Test(expected = RetriesExhaustedException.class) public void testRpcTimeout() throws Exception { HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout"); @@ -426,6 +562,7 @@ public class TestHCM { TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close(); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + SleepAndFailFirstTime.ct.set(0); c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000); c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000); @@ -932,8 +1069,7 @@ public class TestHCM { curServer.getServerName().getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()); - TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); table.close(); } -- 2.8.4 (Apple Git-73)