From 717b6481ab64ab0e767b0338ffca9464ba32ff24 Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Thu, 17 Dec 2015 14:20:39 +0530 Subject: [PATCH] HBASE-14937 Make rpc call timeout for replication adaptive --- .../hbase/protobuf/ReplicationProtbufUtil.java | 6 ++-- .../HBaseInterClusterReplicationEndpoint.java | 39 ++++++++++++++++++++-- .../hbase/replication/TestReplicationEndpoint.java | 4 ++- 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 91185af..3ddff76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -54,6 +54,7 @@ public class ReplicationProtbufUtil { * A helper to replicate a list of WAL entries using admin protocol. * @param admin Admin service * @param entries Array of WAL entries to be replicated + * @param callTimeout RPC timeout for replication request * @param replicationClusterId Id which will uniquely identify source cluster FS client * configurations in the replication configuration directory * @param sourceBaseNamespaceDir Path to source cluster base namespace directory @@ -61,12 +62,13 @@ public class ReplicationProtbufUtil { * @throws java.io.IOException */ public static void replicateWALEntry(final AdminService.BlockingInterface admin, - final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir, - Path sourceHFileArchiveDir) throws IOException { + final Entry[] entries, final int callTimeout, String replicationClusterId, + Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) throws IOException { Pair p = buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); + controller.setCallTimeout(callTimeout); try { admin.replicateWALEntry(controller, p.getFirst()); } catch (ServiceException se) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 70cc420..5723126 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; @@ -70,6 +71,7 @@ import org.apache.hadoop.ipc.RemoteException; public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint { private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class); + public static final String REPLICATION_RPC_TIMEOUT_KEY = "hbase.replication.rpc.timeout"; private HConnection conn; private Configuration conf; @@ -92,6 +94,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private Path baseNamespaceDir; private Path hfileArchiveDir; private boolean replicationBulkLoadDataEnabled; + private int callTimeout; @Override public void init(Context context) throws IOException { @@ -117,6 +120,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi new LinkedBlockingQueue()); this.exec.allowCoreThreadTimeOut(true); + this.callTimeout = + this.conf.getInt(REPLICATION_RPC_TIMEOUT_KEY, + this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + if (this.callTimeout < 0) { + throw new IllegalArgumentException(REPLICATION_RPC_TIMEOUT_KEY + " cannot be negative."); + } + this.replicationBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); @@ -205,6 +215,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e); } } + int callTimeoutRetryCounter = 1; while (this.isRunning()) { if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { @@ -278,6 +289,26 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } else if (ioe instanceof ConnectException) { LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); replicationSinkMgr.chooseSinks(); + } else if (ioe.getCause() instanceof CallTimeoutException) { + // Exception means request was not processed in the configured replication rpc + // timeout. It is possible that retrying the same request with same timeout may + // fail again. So increase the replication rpc timeout and try again + if (this.callTimeout == 0 || this.callTimeout == Integer.MAX_VALUE) { + LOG.info("Replication RPC request call timed out, retrying again. " + "Retry count=" + + callTimeoutRetryCounter + ", timeout=" + this.callTimeout); + } else { + this.callTimeout *= callTimeoutRetryCounter * 2; + if (this.callTimeout < 0) { + // Integer over flow + LOG.debug("Replication RPC request call timeout " + this.callTimeout + + " overflows integer value. Setting it to interger max value."); + this.callTimeout = Integer.MAX_VALUE; + } + LOG.info("Replication RPC request call timed out, retrying again with higher " + + "timeout value. Retry count=" + callTimeoutRetryCounter + ", timeout=" + + this.callTimeout); + } + callTimeoutRetryCounter++; } else { LOG.warn("Can't replicate because of a local or network error: ", ioe); } @@ -318,16 +349,18 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi @VisibleForTesting protected Replicator createReplicator(List entries, int ordinal) { - return new Replicator(entries, ordinal); + return new Replicator(entries, ordinal, this.callTimeout); } @VisibleForTesting protected class Replicator implements Callable { private List entries; private int ordinal; - public Replicator(List entries, int ordinal) { + private int callTimeout; + public Replicator(List entries, int ordinal, int callTimeOut) { this.entries = entries; this.ordinal = ordinal; + this.callTimeout = callTimeOut; } @Override @@ -337,7 +370,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi sinkPeer = replicationSinkMgr.getReplicationSink(); BlockingInterface rrs = sinkPeer.getRegionServer(); ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]), - replicationClusterId, baseNamespaceDir, hfileArchiveDir); + this.callTimeout, replicationClusterId, baseNamespaceDir, hfileArchiveDir); replicationSinkMgr.reportSinkSuccess(sinkPeer); return ordinal; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index a5a4e73..5e6c2c3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Connection; @@ -348,7 +349,8 @@ public class TestReplicationEndpoint extends TestReplicationBase { private int ordinal; public DummyReplicator(List entries, int ordinal) { - super(entries, ordinal); + super(entries, ordinal, conf1.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); this.ordinal = ordinal; } -- 1.9.2.msysgit.0