From ae3f0578c556b80c30c309793aa2274601c7c32e Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Fri, 23 Sep 2016 20:32:42 +0800 Subject: [PATCH] HBASE-16664 Timeout logic in AsyncProcess is broken --- .../hbase/client/AbstractRegionServerCallable.java | 2 +- .../apache/hadoop/hbase/client/AsyncProcess.java | 86 ++++++--- .../hbase/client/AsyncRequestFutureImpl.java | 57 +++--- .../hadoop/hbase/client/BufferedMutator.java | 4 + .../hadoop/hbase/client/BufferedMutatorImpl.java | 13 +- .../client/CancellableRegionServerCallable.java | 22 ++- .../hbase/client/ConnectionImplementation.java | 7 +- .../org/apache/hadoop/hbase/client/HTable.java | 49 ++++-- .../hadoop/hbase/client/MultiServerCallable.java | 5 +- .../hbase/client/NoncedRegionServerCallable.java | 4 +- .../hadoop/hbase/client/TestAsyncProcess.java | 27 ++- .../org/apache/hadoop/hbase/client/TestHCM.java | 194 ++++++++++++++++----- 12 files changed, 325 insertions(+), 145 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java index 5a1f5cc..62032b4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java @@ -75,7 +75,7 @@ abstract class AbstractRegionServerCallable implements RetryingCallable { @Override public void throwable(Throwable t, boolean retrying) { - if (location != null) { + if (location != null && location.getRegionInfo() != null) { getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(), row, t, location.getServerName()); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 2ffb2e3..3759fad 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -20,22 +20,6 @@ package org.apache.hadoop.hbase.client; import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.RegionLocations; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode; -import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdge; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import java.io.IOException; import java.io.InterruptedIOException; @@ -56,6 +40,23 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + /** * This class allows a continuous flow of requests. It's written to be compatible with a * synchronous caller such as HTable. @@ -211,7 +212,8 @@ class AsyncProcess { protected final long pause; protected int numTries; protected int serverTrackerTimeout; - protected int timeout; + protected int rpcTimeout; + protected int operationTimeout; protected long primaryCallTimeoutMicroseconds; /** Whether to log details for batch errors */ protected final boolean logBatchErrorDetails; @@ -235,7 +237,9 @@ class AsyncProcess { // how many times we could try in total, one more than retry number this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; - this.timeout = rpcTimeout; + this.rpcTimeout = rpcTimeout; + this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, @@ -433,7 +437,8 @@ class AsyncProcess { List locationErrorRows, Map> actionsByServer, ExecutorService pool) { AsyncRequestFutureImpl ars = createAsyncRequestFuture( - tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, timeout); + tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, + getDeadline(), rpcTimeout); // Add location errors if any if (locationErrors != null) { for (int i = 0; i < locationErrors.size(); ++i) { @@ -471,8 +476,13 @@ class AsyncProcess { } public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, - List rows, Batch.Callback callback, Object[] results) { - return submitAll(pool, tableName, rows, callback, results, null, timeout); + List rows, Batch.Callback callback, Object[] results){ + return submitAll(pool, tableName, rows, callback, results, null, -1); + } + + public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, + List rows, Batch.Callback callback, Object[] results, int rpcTimeout){ + return submitAll(pool, tableName, rows, callback, results, null, rpcTimeout); } /** * Submit immediately the list of rows, whatever the server status. Kept for backward @@ -483,10 +493,14 @@ class AsyncProcess { * @param rows the list of rows. * @param callback the callback. * @param results Optional array to return the results thru; backward compat. + * @param rpcTimeout -1 means use default value */ public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List rows, Batch.Callback callback, Object[] results, - CancellableRegionServerCallable callable, int curTimeout) { + CancellableRegionServerCallable callable, int rpcTimeout) { + if (rpcTimeout < 0) { + rpcTimeout = this.rpcTimeout; + } List> actions = new ArrayList>(rows.size()); // The position will be used by the processBatch to match the object array returned. @@ -506,7 +520,7 @@ class AsyncProcess { } AsyncRequestFutureImpl ars = createAsyncRequestFuture( tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null, - callable, curTimeout); + callable, getDeadline(), rpcTimeout); ars.groupAndSendMultiAction(actions, 1); return ars; } @@ -519,10 +533,10 @@ class AsyncProcess { protected AsyncRequestFutureImpl createAsyncRequestFuture( TableName tableName, List> actions, long nonceGroup, ExecutorService pool, Batch.Callback callback, Object[] results, boolean needResults, - CancellableRegionServerCallable callable, int curTimeout) { + CancellableRegionServerCallable callable, long deadline, int rpcTimeout) { return new AsyncRequestFutureImpl( tableName, actions, nonceGroup, getPool(pool), needResults, - results, callback, callable, curTimeout, this); + results, callback, callable, deadline, rpcTimeout, this); } /** Wait until the async does not have more than max tasks in progress. */ @@ -664,7 +678,7 @@ class AsyncProcess { @VisibleForTesting protected RpcRetryingCaller createCaller( CancellableRegionServerCallable callable) { - return rpcCallerFactory. newCaller(); + return rpcCallerFactory. newCaller(rpcTimeout); } @@ -963,4 +977,24 @@ class AsyncProcess { return data.iterator(); } } + + public int getRpcTimeout() { + return rpcTimeout; + } + + public void setRpcTimeout(int rpcTimeout) { + this.rpcTimeout = rpcTimeout; + } + + public int getOperationTimeout() { + return operationTimeout; + } + + public void setOperationTimeout(int operationTimeout) { + this.operationTimeout = operationTimeout; + } + + protected long getDeadline() { + return System.currentTimeMillis() + this.operationTimeout; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index c6b2a53..fe973a8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -20,6 +20,24 @@ package org.apache.hadoop.hbase.client; import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -39,23 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.htrace.Trace; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - /** * The context, and return value, for a single submit/submitAll call. * Note on how this class (one AP submit) works. Initially, all requests are split into groups @@ -217,14 +218,14 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { try { // setup the callable based on the actions, if we don't have one already from the request if (callable == null) { - callable = createCallable(server, tableName, multiAction); + callable = createCallable(server, tableName, multiAction, operationDeadline); } RpcRetryingCaller caller = asyncProcess.createCaller(callable); try { if (callsInProgress != null) { callsInProgress.add(callable); } - res = caller.callWithoutRetries(callable, currentCallTotalTimeout); + res = caller.callWithoutRetries(callable, rpcTimeout); if (res == null) { // Cancelled return; @@ -297,7 +298,8 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { private final boolean hasAnyReplicaGets; private final long nonceGroup; private CancellableRegionServerCallable currentCallable; - private int currentCallTotalTimeout; + private int rpcTimeout; + private long operationDeadline; private final Map> heapSizesByServer = new HashMap<>(); protected AsyncProcess asyncProcess; @@ -339,8 +341,8 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, Batch.Callback callback, - CancellableRegionServerCallable callable, int timeout, - AsyncProcess asyncProcess) { + CancellableRegionServerCallable callable, long operationDeadline, + int rpcTimeout, AsyncProcess asyncProcess) { this.pool = pool; this.callback = callback; this.nonceGroup = nonceGroup; @@ -411,8 +413,8 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { this.errors = (asyncProcess.globalErrors != null) ? asyncProcess.globalErrors : new BatchErrors(); this.currentCallable = callable; - this.currentCallTotalTimeout = timeout; - + this.operationDeadline = operationDeadline; + this.rpcTimeout = rpcTimeout; } @VisibleForTesting @@ -724,7 +726,6 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { errorsByServer.reportServerError(server); Retry canRetry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; - if (tableName == null && ClientExceptionsUtil.isMetaClearingException(t)) { // tableName is null when we made a cross-table RPC call. asyncProcess.connection.clearCaches(server); @@ -1063,6 +1064,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { */ private void setError(int index, Row row, Throwable throwable, ServerName server) { ReplicaResultState state = null; + new Throwable().printStackTrace(); if (results == null) { // Note that we currently cannot have replica requests with null results. So it shouldn't // happen that multiple replica calls will call dAC for same actions with results == null. @@ -1132,6 +1134,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { */ private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError, Object result, ServerName server, boolean isFromReplica) { + new Throwable().printStackTrace(); Object resObj = null; if (!AsyncProcess.isReplicaGet(row)) { if (isFromReplica) { @@ -1282,9 +1285,9 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { * Create a callable. Isolated to be easily overridden in the tests. */ private MultiServerCallable createCallable(final ServerName server, - TableName tableName, final MultiAction multi) { + TableName tableName, final MultiAction multi, long deadline) { return new MultiServerCallable(asyncProcess.connection, tableName, server, - asyncProcess.rpcFactory, multi); + asyncProcess.rpcFactory, multi, deadline); } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java index 5dc7fc3..dffd8a3e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java @@ -128,4 +128,8 @@ public interface BufferedMutator extends Closeable { public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException; } + + public void setWriteRpcTimeout(int timeout); + + public void setOperationTimeout(int timeout); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 2d4c8b3..5c0a708 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -109,7 +109,8 @@ public class BufferedMutatorImpl implements BufferedMutator { HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout); + ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, + writeRpcTimeout); } @Override @@ -344,4 +345,14 @@ public class BufferedMutatorImpl implements BufferedMutator { return remainder <= 0; } } + + @Override + public void setWriteRpcTimeout(int timeout) { + ap.setRpcTimeout(timeout); + } + + @Override + public void setOperationTimeout(int timeout) { + ap.setOperationTimeout(timeout); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java index 0a6e10f..795ffb3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; /** @@ -31,11 +32,12 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @InterfaceAudience.Private abstract class CancellableRegionServerCallable extends RegionServerCallable implements Cancellable { - private final RetryingTimeTracker tracker = new RetryingTimeTracker(); + private final long deadline; CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row, - RpcControllerFactory rpcControllerFactory) { + RpcControllerFactory rpcControllerFactory, long deadline) { super(connection, rpcControllerFactory, tableName, row); + this.deadline = deadline; } /* Override so can mess with the callTimeout. @@ -44,14 +46,16 @@ Cancellable { */ @Override public T call(int callTimeout) throws IOException { - // It is expected (it seems) that tracker.start can be called multiple times (on each trip - // through the call when retrying). Also, we can call start and no need of a stop. - this.tracker.start(); - int remainingTime = tracker.getRemainingTime(callTimeout); - if (remainingTime == 0) { - throw new DoNotRetryIOException("Timeout for mutate row"); + long remaining = deadline - System.currentTimeMillis(); + int remainingTime; + if (remaining > Integer.MAX_VALUE) { + remainingTime = Integer.MAX_VALUE; + } else if (remaining <= 0) { + throw new DoNotRetryIOException(new TimeoutIOException()); + } else { + remainingTime = (int) remaining; } - return super.call(remainingTime); + return super.call(Math.min(remainingTime, callTimeout)); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 38178b4..448700a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1830,8 +1830,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable { // For tests to override. protected AsyncProcess createAsyncProcess(Configuration conf) { // No default pool available. - int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, rpcTimeout); + int writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, + writeRpcTimeout); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 1d1db3a..ab71f7c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -483,12 +483,7 @@ public class HTable implements Table { public void batch(final List actions, final Object[] results, int timeout) throws InterruptedException, IOException { AsyncRequestFuture ars = null; - if (timeout != -1) { - ars = multiAp.submitAll(pool, tableName, actions, null, results, null, timeout); - } else { - // use default timeout in AP - ars = multiAp.submitAll(pool, tableName, actions, null, results); - } + ars = multiAp.submitAll(pool, tableName, actions, null, results, timeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -524,7 +519,7 @@ public class HTable implements Table { throws IOException { CancellableRegionServerCallable callable = new CancellableRegionServerCallable( - connection, getName(), delete.getRow(), this.rpcControllerFactory) { + connection, getName(), delete.getRow(), this.rpcControllerFactory, getDeadline()) { @Override protected SingleResponse rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -536,7 +531,7 @@ public class HTable implements Table { List rows = new ArrayList(); rows.add(delete); AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, - null, null, callable, operationTimeout); + null, null, callable, writeRpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -594,7 +589,7 @@ public class HTable implements Table { public void mutateRow(final RowMutations rm) throws IOException { CancellableRegionServerCallable callable = new CancellableRegionServerCallable(this.connection, getName(), rm.getRow(), - rpcControllerFactory) { + rpcControllerFactory, getDeadline()) { @Override protected MultiResponse rpcCall() throws Exception { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( @@ -615,7 +610,7 @@ public class HTable implements Table { } }; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, null, callable, operationTimeout); + null, null, callable, writeRpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -636,7 +631,7 @@ public class HTable implements Table { checkHasFamilies(append); NoncedRegionServerCallable callable = new NoncedRegionServerCallable(this.connection, this.rpcControllerFactory, - getName(), append.getRow()) { + getName(), append.getRow(), getDeadline()) { @Override protected Result rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -658,7 +653,7 @@ public class HTable implements Table { checkHasFamilies(increment); NoncedRegionServerCallable callable = new NoncedRegionServerCallable(this.connection, - this.rpcControllerFactory, getName(), increment.getRow()) { + this.rpcControllerFactory, getName(), increment.getRow(), getDeadline()) { @Override protected Result rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -704,7 +699,7 @@ public class HTable implements Table { NoncedRegionServerCallable callable = new NoncedRegionServerCallable(this.connection, this.rpcControllerFactory, getName(), - row) { + row, getDeadline()) { @Override protected Long rpcCall() throws Exception { MutateRequest request = RequestConverter.buildIncrementRequest( @@ -775,7 +770,7 @@ public class HTable implements Table { throws IOException { CancellableRegionServerCallable callable = new CancellableRegionServerCallable( - this.connection, getName(), row, this.rpcControllerFactory) { + this.connection, getName(), row, this.rpcControllerFactory, getDeadline()) { @Override protected SingleResponse rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -791,7 +786,7 @@ public class HTable implements Table { Object[] results = new Object[1]; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, - null, results, callable, operationTimeout); + null, results, callable, writeRpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -808,7 +803,7 @@ public class HTable implements Table { throws IOException { CancellableRegionServerCallable callable = new CancellableRegionServerCallable(connection, getName(), rm.getRow(), - rpcControllerFactory) { + rpcControllerFactory, getDeadline()) { @Override protected MultiResponse rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -835,7 +830,7 @@ public class HTable implements Table { * */ Object[] results = new Object[rm.getMutations().size()]; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, results, callable, operationTimeout); + null, results, callable, writeRpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -1095,6 +1090,12 @@ public class HTable implements Table { @Override public void setOperationTimeout(int operationTimeout) { this.operationTimeout = operationTimeout; + try { + getBufferedMutator().setOperationTimeout(operationTimeout); + } catch (IOException e) { + e.printStackTrace(); + } + multiAp.setOperationTimeout(operationTimeout); } @Override @@ -1111,8 +1112,8 @@ public class HTable implements Table { @Override @Deprecated public void setRpcTimeout(int rpcTimeout) { - this.readRpcTimeout = rpcTimeout; - this.writeRpcTimeout = rpcTimeout; + setReadRpcTimeout(rpcTimeout); + setWriteRpcTimeout(rpcTimeout); } @Override @@ -1123,6 +1124,12 @@ public class HTable implements Table { @Override public void setWriteRpcTimeout(int writeRpcTimeout) { this.writeRpcTimeout = writeRpcTimeout; + try { + getBufferedMutator().setWriteRpcTimeout(writeRpcTimeout); + } catch (IOException e) { + // Ignore the exception, we will throw it on operations. + } + multiAp.setRpcTimeout(writeRpcTimeout); } @Override @@ -1259,4 +1266,8 @@ public class HTable implements Table { } return mutator; } + + private long getDeadline() { + return System.currentTimeMillis() + this.operationTimeout; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 3ef97e78..4c09875 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -55,8 +55,9 @@ class MultiServerCallable extends CancellableRegionServerCallable multi) { - super(connection, tableName, null, rpcFactory); + final ServerName location, RpcControllerFactory rpcFactory, final MultiAction multi, + long deadline) { + super(connection, tableName, null, rpcFactory, deadline); this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. // Using region info from parent HRegionLocation would be a mistake for this class; so diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java index 7c01e21..4f3ce0e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java @@ -48,8 +48,8 @@ public abstract class NoncedRegionServerCallable extends CancellableRegionSer */ public NoncedRegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory, - TableName tableName, byte [] row) { - super(connection, tableName, row, rpcControllerFactory); + TableName tableName, byte [] row, long deadline) { + super(connection, tableName, row, rpcControllerFactory, deadline); this.nonce = getConnection().getNonceGenerator().newNonce(); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 0703e51..1372f54 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client; - import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -60,10 +59,14 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncProcess.ListRowAccess; -import org.apache.hadoop.hbase.client.AsyncProcess.TaskCountChecker; +import org.apache.hadoop.hbase.client.AsyncProcess.RequestSizeChecker; +import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker; import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode; import org.apache.hadoop.hbase.client.AsyncProcess.RowCheckerHost; -import org.apache.hadoop.hbase.client.AsyncProcess.RequestSizeChecker; +import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker; +import org.apache.hadoop.hbase.client.AsyncProcess.TaskCountChecker; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -78,10 +81,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; import org.mockito.Mockito; -import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker; -import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker; -import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; -import org.apache.hadoop.hbase.client.backoff.ServerStatistics; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; @@ -158,11 +158,11 @@ public class TestAsyncProcess { protected AsyncRequestFutureImpl createAsyncRequestFuture(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, Batch.Callback callback, Object[] results, boolean needResults, - CancellableRegionServerCallable callable, int curTimeout) { + CancellableRegionServerCallable callable, long deadline, int rpcTimeout) { // Test HTable has tableName of null, so pass DUMMY_TABLE AsyncRequestFutureImpl r = new MyAsyncRequestFutureImpl( DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults, - results, callback, callable, curTimeout, this); + results, callback, callable, getDeadline(), rpcTimeout, this); allReqs.add(r); return r; } @@ -254,12 +254,11 @@ public class TestAsyncProcess { static class MyAsyncRequestFutureImpl extends AsyncRequestFutureImpl { public MyAsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, - ExecutorService pool, boolean needResults, Object[] results, - Batch.Callback callback, - CancellableRegionServerCallable callable, int timeout, - AsyncProcess asyncProcess) { + ExecutorService pool, boolean needResults, Object[] results, + Batch.Callback callback, CancellableRegionServerCallable callable, long deadline, + int rpcTimeout, AsyncProcess asyncProcess) { super(tableName, actions, nonceGroup, pool, needResults, - results, callback, callable, timeout, asyncProcess); + results, callback, callable, deadline, rpcTimeout, asyncProcess); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 786f570..0aa4e44 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -93,7 +93,8 @@ import static org.junit.Assert.fail; */ @Category({LargeTests.class}) public class TestHCM { - @Rule public final TestRule timeout = CategoryBasedTimeout.builder() + @Rule + public final TestRule timeout = CategoryBasedTimeout.builder() .withTimeout(this.getClass()) .withLookingForStuckThread(true) .build(); @@ -113,9 +114,9 @@ public class TestHCM { private static Random _randy = new Random(); private static final int RPC_RETRY = 5; -/** -* This copro sleeps 20 second. The first call it fails. The second time, it works. -*/ + /** + * This copro sleeps 20 second. The first call it fails. The second time, it works. + */ public static class SleepAndFailFirstTime extends BaseRegionObserver { static final AtomicLong ct = new AtomicLong(0); static final String SLEEP_TIME_CONF_KEY = @@ -135,16 +136,46 @@ public class TestHCM { @Override public void preGetOp(final ObserverContext e, - final Get get, final List results) throws IOException { + final Get get, final List results) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + } + + @Override + public void prePut(final ObserverContext e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { Threads.sleep(sleepTime.get()); - if (ct.incrementAndGet() == 1){ + if (ct.incrementAndGet() == 1) { throw new IOException("first call I fail"); } } + + @Override + public void preDelete(final ObserverContext e, + final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + } + + @Override + public Result preIncrement(final ObserverContext e, + final Increment increment) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + return super.preIncrement(e, increment); + } } public static class SleepCoprocessor extends BaseRegionObserver { public static final int SLEEP_TIME = 5000; + @Override public void preGetOp(final ObserverContext e, final Get get, final List results) throws IOException { @@ -156,16 +187,19 @@ public class TestHCM { final Put put, final WALEdit edit, final Durability durability) throws IOException { Threads.sleep(SLEEP_TIME); } - } - public static class SleepWriteCoprocessor extends BaseRegionObserver { - public static final int SLEEP_TIME = 5000; @Override public Result preIncrement(final ObserverContext e, - final Increment increment) throws IOException { + final Increment increment) throws IOException { Threads.sleep(SLEEP_TIME); return super.preIncrement(e, increment); } + + @Override + public void preDelete(final ObserverContext e, final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(SLEEP_TIME); + } } public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver { @@ -364,11 +398,12 @@ public class TestHCM { * timeouted when the server answers. */ @Test - public void testOperationTimeout() throws Exception { - HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout"); + public void testGetOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetOperationTimeout"); hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); - Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}); + Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration()); table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); // Check that it works if the timeout is big enough table.setOperationTimeout(120 * 1000); table.get(new Get(FAM_NAM)); @@ -392,6 +427,64 @@ public class TestHCM { } @Test + public void testPutOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutOperationTimeout"); + hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); + Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration()); + table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); + // Check that it works if the timeout is big enough + table.setOperationTimeout(120 * 1000); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + + // Resetting and retrying. Will fail this time, not enough time for the second try + SleepAndFailFirstTime.ct.set(0); + try { + table.setOperationTimeout(30 * 1000); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + Assert.fail("We expect an exception here"); + } catch (RetriesExhaustedWithDetailsException e) { + // The client has a CallTimeout class, but it's not shared.We're not very clean today, + // in the general case you can expect the call to stop, but the exception may vary. + // In this test however, we're sure that it will be a socket timeout. + LOG.info("We received an exception, as expected ", e); + } catch (IOException e) { + Assert.fail("Wrong exception:" + e.getMessage()); + } finally { + table.close(); + } + } + + @Test + public void testDeleteOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteOperationTimeout"); + hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); + Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration()); + table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); + // Check that it works if the timeout is big enough + table.setOperationTimeout(120 * 1000); + table.delete(new Delete(FAM_NAM)); + + // Resetting and retrying. Will fail this time, not enough time for the second try + SleepAndFailFirstTime.ct.set(0); + try { + table.setOperationTimeout(30 * 1000); + table.delete(new Delete(FAM_NAM)); + Assert.fail("We expect an exception here"); + } catch (RetriesExhaustedWithDetailsException e) { + // The client has a CallTimeout class, but it's not shared.We're not very clean today, + // in the general case you can expect the call to stop, but the exception may vary. + // In this test however, we're sure that it will be a socket timeout. + LOG.info("We received an exception, as expected ", e); + } catch (IOException e) { + Assert.fail("Wrong exception:" + e.getMessage()); + } finally { + table.close(); + } + } + + @Test public void testRpcTimeout() throws Exception { HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout"); hdt.addCoprocessor(SleepCoprocessor.class.getName()); @@ -419,14 +512,14 @@ public class TestHCM { } @Test - public void testWriteRpcTimeout() throws Exception { + public void testIncrementRpcTimeout() throws Exception { HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout"); - hdt.addCoprocessor(SleepWriteCoprocessor.class.getName()); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { - t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2); - t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100); + t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); Increment i = new Increment(FAM_NAM); i.addColumn(FAM_NAM, FAM_NAM, 1); t.increment(i); @@ -435,23 +528,49 @@ public class TestHCM { // expected } - // Again, with configuration based override - c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepWriteCoprocessor.SLEEP_TIME / 2); - try (Connection conn = ConnectionFactory.createConnection(c)) { - try (Table t = conn.getTable(hdt.getTableName())) { - Increment i = new Increment(FAM_NAM); - i.addColumn(FAM_NAM, FAM_NAM, 1); - t.increment(i); - fail("Write should not have succeeded"); - } catch (RetriesExhaustedException e) { - // expected - } + } + + @Test + public void testDeleteRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + Delete d = new Delete(FAM_NAM); + d.addColumn(FAM_NAM, FAM_NAM, 1); + t.delete(d); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected } + } @Test - public void testReadRpcTimeout() throws Exception { - HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout"); + public void testPutRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + Put p = new Put(FAM_NAM); + p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM); + t.put(p); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + } + + @Test + public void testGetRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetRpcTimeout"); hdt.addCoprocessor(SleepCoprocessor.class.getName()); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); @@ -464,16 +583,6 @@ public class TestHCM { // expected } - // Again, with configuration based override - c.setInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2); - try (Connection conn = ConnectionFactory.createConnection(c)) { - try (Table t = conn.getTable(hdt.getTableName())) { - t.get(new Get(FAM_NAM)); - fail("Get should not have succeeded"); - } catch (RetriesExhaustedException e) { - // expected - } - } } @Test @@ -499,9 +608,10 @@ public class TestHCM { HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcRetryingCallerSleep"); hdt.addCoprocessorWithSpec("|" + SleepAndFailFirstTime.class.getName() + "||" + SleepAndFailFirstTime.SLEEP_TIME_CONF_KEY + "=2000"); - TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close(); - Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c).close(); + SleepAndFailFirstTime.ct.set(0); + c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000); c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000); -- 2.8.4 (Apple Git-73)