diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 2ffb2e3..2844b56 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -218,8 +218,8 @@ class AsyncProcess { // End configuration settings. public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, - RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, - RpcControllerFactory rpcFactory, int rpcTimeout) { + RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, + RpcControllerFactory rpcFactory, int rpcTimeout) { if (hc == null) { throw new IllegalArgumentException("ClusterConnection cannot be null."); } @@ -963,4 +963,8 @@ class AsyncProcess { return data.iterator(); } } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index c6b2a53..de20385 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -1284,7 +1284,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { private MultiServerCallable createCallable(final ServerName server, TableName tableName, final MultiAction multi) { return new MultiServerCallable(asyncProcess.connection, tableName, server, - asyncProcess.rpcFactory, multi); + asyncProcess.rpcFactory, multi, asyncProcess.timeout); } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java index 0a6e10f..0cf4d20 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java @@ -32,10 +32,12 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; abstract class CancellableRegionServerCallable extends RegionServerCallable implements Cancellable { private final RetryingTimeTracker tracker = new RetryingTimeTracker(); + private int rpcTimeout; CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row, - RpcControllerFactory rpcControllerFactory) { + RpcControllerFactory rpcControllerFactory, int rpcTimeout) { super(connection, rpcControllerFactory, tableName, row); + this.rpcTimeout = rpcTimeout; } /* Override so can mess with the callTimeout. @@ -51,7 +53,7 @@ Cancellable { if (remainingTime == 0) { throw new DoNotRetryIOException("Timeout for mutate row"); } - return super.call(remainingTime); + return super.call(Math.min(rpcTimeout, remainingTime)); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 1d1db3a..d59f236 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -524,7 +524,7 @@ public class HTable implements Table { throws IOException { CancellableRegionServerCallable callable = new CancellableRegionServerCallable( - connection, getName(), delete.getRow(), this.rpcControllerFactory) { + connection, getName(), delete.getRow(), this.rpcControllerFactory, writeRpcTimeout) { @Override protected SingleResponse rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -594,7 +594,7 @@ public class HTable implements Table { public void mutateRow(final RowMutations rm) throws IOException { CancellableRegionServerCallable callable = new CancellableRegionServerCallable(this.connection, getName(), rm.getRow(), - rpcControllerFactory) { + rpcControllerFactory, writeRpcTimeout) { @Override protected MultiResponse rpcCall() throws Exception { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( @@ -636,7 +636,7 @@ public class HTable implements Table { checkHasFamilies(append); NoncedRegionServerCallable callable = new NoncedRegionServerCallable(this.connection, this.rpcControllerFactory, - getName(), append.getRow()) { + getName(), append.getRow(), writeRpcTimeout) { @Override protected Result rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -658,7 +658,7 @@ public class HTable implements Table { checkHasFamilies(increment); NoncedRegionServerCallable callable = new NoncedRegionServerCallable(this.connection, - this.rpcControllerFactory, getName(), increment.getRow()) { + this.rpcControllerFactory, getName(), increment.getRow(), writeRpcTimeout) { @Override protected Result rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -704,7 +704,7 @@ public class HTable implements Table { NoncedRegionServerCallable callable = new NoncedRegionServerCallable(this.connection, this.rpcControllerFactory, getName(), - row) { + row, writeRpcTimeout) { @Override protected Long rpcCall() throws Exception { MutateRequest request = RequestConverter.buildIncrementRequest( @@ -775,7 +775,7 @@ public class HTable implements Table { throws IOException { CancellableRegionServerCallable callable = new CancellableRegionServerCallable( - this.connection, getName(), row, this.rpcControllerFactory) { + this.connection, getName(), row, this.rpcControllerFactory, writeRpcTimeout) { @Override protected SingleResponse rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -808,7 +808,7 @@ public class HTable implements Table { throws IOException { CancellableRegionServerCallable callable = new CancellableRegionServerCallable(connection, getName(), rm.getRow(), - rpcControllerFactory) { + rpcControllerFactory, writeRpcTimeout) { @Override protected MultiResponse rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -1123,6 +1123,15 @@ public class HTable implements Table { @Override public void setWriteRpcTimeout(int writeRpcTimeout) { this.writeRpcTimeout = writeRpcTimeout; + multiAp.setTimeout(writeRpcTimeout); + try { + BufferedMutator bufferedMutator = getBufferedMutator(); + if (bufferedMutator instanceof BufferedMutatorImpl) { + ((BufferedMutatorImpl)bufferedMutator).ap.setTimeout(writeRpcTimeout); + } + } catch (IOException e) { + + } } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 3ef97e78..01b7c2e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -55,8 +55,9 @@ class MultiServerCallable extends CancellableRegionServerCallable multi) { - super(connection, tableName, null, rpcFactory); + final ServerName location, RpcControllerFactory rpcFactory, + final MultiAction multi, int rpcTimeout) { + super(connection, tableName, null, rpcFactory, rpcTimeout); this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. // Using region info from parent HRegionLocation would be a mistake for this class; so diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java index 7c01e21..ad0a67d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java @@ -48,8 +48,8 @@ public abstract class NoncedRegionServerCallable extends CancellableRegionSer */ public NoncedRegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory, - TableName tableName, byte [] row) { - super(connection, tableName, row, rpcControllerFactory); + TableName tableName, byte [] row, int rpcTimeout) { + super(connection, tableName, row, rpcControllerFactory, rpcTimeout); this.nonce = getConnection().getNonceGenerator().newNonce(); }