From 0fb073ce55128568fbf1f88911c384b84c46e6e9 Mon Sep 17 00:00:00 2001 From: Joseph Hwang Date: Tue, 2 Aug 2016 10:47:39 -0700 Subject: [PATCH] HBASE-15937 Modified retries and polling options for the Replication Table so that they are more realistic. This set up will handle situations where regionservers come up much earlier than master or replication table goes down for a period of time. --- .../hbase/replication/ReplicationQueues.java | 2 +- .../hbase/replication/ReplicationQueuesClient.java | 2 +- .../hbase/replication/ReplicationTableBase.java | 149 +++++++++++++++------ .../TableBasedReplicationQueuesClientImpl.java | 2 +- .../TableBasedReplicationQueuesImpl.java | 4 +- .../regionserver/ReplicationSourceManager.java | 25 +++- .../hadoop/hbase/util/hbck/ReplicationChecker.java | 6 +- .../replication/TestReplicationStateHBaseImpl.java | 2 + .../replication/TestReplicationTableBase.java | 11 +- ...TestTableBasedReplicationSourceManagerImpl.java | 2 + 10 files changed, 150 insertions(+), 55 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index 0de0cc8..d31ac34 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -106,7 +106,7 @@ public interface ReplicationQueues { * be alive, dead or from a previous run of the cluster. * @return a list of server names */ - List getListOfReplicators(); + List getListOfReplicators() throws ReplicationException; /** * Checks if the provided znode is the same as this region server's diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java index 6d8900e..160a9d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java @@ -43,7 +43,7 @@ public interface ReplicationQueuesClient { * @return a list of server names * @throws KeeperException zookeeper exception */ - List getListOfReplicators() throws KeeperException; + List getListOfReplicators() throws KeeperException, ReplicationException; /** * Get a list of all WALs in the given queue on the given region server. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java index 61bb041..e09d7d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java @@ -76,7 +76,7 @@ abstract class ReplicationTableBase { /** Name of the HBase Table used for tracking replication*/ public static final TableName REPLICATION_TABLE_NAME = - TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); + TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); // Column family and column names for Queues in the Replication Table public static final byte[] CF_QUEUE = Bytes.toBytes("q"); @@ -85,11 +85,10 @@ abstract class ReplicationTableBase { // Column Descriptor for the Replication Table private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR = - new HColumnDescriptor(CF_QUEUE).setMaxVersions(1) - .setInMemory(true) - .setScope(HConstants.REPLICATION_SCOPE_LOCAL) - // TODO: Figure out which bloom filter to use - .setBloomFilterType(BloomType.NONE); + new HColumnDescriptor(CF_QUEUE).setMaxVersions(1) + .setInMemory(true) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL) + .setBloomFilterType(BloomType.NONE); // The value used to delimit the queueId and server name inside of a queue's row key. Currently a // hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens. @@ -100,14 +99,26 @@ abstract class ReplicationTableBase { public static final String QUEUE_HISTORY_DELIMITER = "|"; /* - * Make sure that HBase table operations for replication have a high number of retries. This is - * because the server is aborted if any HBase table operation fails. Each RPC will be attempted - * 3600 times before exiting. This provides each operation with 2 hours of retries - * before the server is aborted. + * Because many Replication Table operations abort the server on failure, we have to make sure that + * ReplicationTable operations can handle longer periods of Replication Table region unavailability + * Because RPC requests fail almost immediately on region unavailability, the amount of time we can + * tolerate region unavailability is largely determined by (retries * pause). With the default + * setup we can tolerate at least 10 minutes of region unavailability before aborting. */ - private static final int CLIENT_RETRIES = 3600; - private static final int RPC_TIMEOUT = 2000; - private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT; + private static final int DEFAULT_CLIENT_RETRIES = 100; + private static final int DEFAULT_CLIENT_PAUSE = 6000; + private static final int DEFAULT_RPC_TIMEOUT = 60000; + private static final int DEFAULT_SCANNER_TIMEOUT = + DEFAULT_CLIENT_RETRIES * (DEFAULT_CLIENT_PAUSE); + + /* + * Make sure that the HBase Replication Table initialization has the proper timeouts. Because + * HBase servers can come up a lot sooner than the cluster is ready to create tables and this + * is a one time operation, we can accept longer pauses than normal. + */ + private static final int DEFAULT_INIT_RETRIES = 60; + private static final int DEFAULT_INIT_PAUSE = 60000; + private static final int DEFAULT_INIT_RPC_TIMEOUT = 60000; // We only need a single thread to initialize the Replication Table private static final int NUM_INITIALIZE_WORKERS = 1; @@ -117,11 +128,21 @@ abstract class ReplicationTableBase { private final Connection connection; private final Executor executor; private volatile CountDownLatch replicationTableInitialized; + private int clientRetries; + private int clientPause; + private int rpcTimeout; + private int scannerTimeout; + private int operationTimeout; + private int initRetries; + private int initPause; + private int initRpcTimeout; + private int initOperationTimeout; public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException { this.conf = new Configuration(conf); this.abortable = abort; - decorateConf(); + readTimeoutConf(); + decorateTimeoutConf(); this.connection = ConnectionFactory.createConnection(this.conf); this.executor = setUpExecutor(); this.replicationTableInitialized = new CountDownLatch(1); @@ -129,11 +150,36 @@ abstract class ReplicationTableBase { } /** - * Modify the connection's config so that operations run on the Replication Table have longer and - * a larger number of retries + * Read in the configuration values that set up the Replication Table operation timeouts and + * retries + */ + private void readTimeoutConf() { + clientRetries = conf.getInt("hbase.replication.table.client.retries", DEFAULT_CLIENT_RETRIES); + clientPause = conf.getInt("hbase.replication.table.client.pause", DEFAULT_CLIENT_PAUSE); + rpcTimeout = conf.getInt("hbase.replication.table.rpc.timeout", DEFAULT_RPC_TIMEOUT); + scannerTimeout = conf.getInt("hbase.replication.table.scanner.timeout", + DEFAULT_SCANNER_TIMEOUT); + initRetries = conf.getInt("hbase.replication.table.init.retries", DEFAULT_INIT_RETRIES); + initPause = conf.getInt("hbase.replication.table.init.pause", DEFAULT_INIT_PAUSE); + initRpcTimeout = conf.getInt("hbase.replication.table.init.timeout", DEFAULT_INIT_RPC_TIMEOUT); + operationTimeout = calculateTimeout(clientRetries, clientPause, rpcTimeout); + initOperationTimeout = calculateTimeout(initRetries, initPause, initRpcTimeout); + } + + private int calculateTimeout(int retries, int pause, int rpcTimeout) { + return retries * (pause + rpcTimeout); + } + + /** + * Set up the configuration values for normal Replication Table operations */ - private void decorateConf() { - this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES); + private void decorateTimeoutConf() { + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, clientPause); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, clientRetries); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scannerTimeout); + conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, operationTimeout); + conf.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, operationTimeout); } /** @@ -159,15 +205,6 @@ abstract class ReplicationTableBase { } /** - * Increases the RPC and operations timeouts for the Replication Table - */ - private Table setReplicationTableTimeOuts(Table replicationTable) { - replicationTable.setRpcTimeout(RPC_TIMEOUT); - replicationTable.setOperationTimeout(OPERATION_TIMEOUT); - return replicationTable; - } - - /** * Build the row key for the given queueId. This will uniquely identify it from all other queues * in the cluster. * @param serverName The owner of the queue @@ -221,7 +258,7 @@ abstract class ReplicationTableBase { * be alive, dead or from a previous run of the cluster. * @return a list of server names */ - protected List getListOfReplicators() { + protected List getListOfReplicators() throws ReplicationException { // scan all of the queues and return a list of all unique OWNER values Set peerServers = new HashSet(); ResultScanner allQueuesInCluster = null; @@ -234,7 +271,7 @@ abstract class ReplicationTableBase { } } catch (IOException e) { String errMsg = "Failed getting list of replicators"; - abortable.abort(errMsg, e); + throw new ReplicationException(errMsg, e); } finally { if (allQueuesInCluster != null) { allQueuesInCluster.close(); @@ -325,7 +362,7 @@ abstract class ReplicationTableBase { protected ResultScanner getQueuesBelongingToServer(String server) throws IOException { Scan scan = new Scan(); SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, - CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server)); + CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server)); scan.setFilter(filterMyQueues); scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY); @@ -367,6 +404,15 @@ abstract class ReplicationTableBase { } /** + * Increases the RPC and operations timeouts for the Replication Table + */ + private Table setReplicationTableTimeOuts(Table replicationTable) { + replicationTable.setRpcTimeout(rpcTimeout); + replicationTable.setOperationTimeout(operationTimeout); + return replicationTable; + } + + /** * Builds the Replication Table in a background thread. Any method accessing the Replication Table * should do so through getOrBlockOnReplicationTable() * @@ -383,31 +429,43 @@ abstract class ReplicationTableBase { */ private class CreateReplicationTableWorker implements Runnable { - private Admin admin; + private Configuration initConf; + private Connection initConnection; + private Admin initAdmin; @Override public void run() { try { - admin = connection.getAdmin(); - if (!replicationTableExists()) { - createReplicationTable(); - } - int maxRetries = conf.getInt("hbase.replication.queues.createtable.retries.number", - CLIENT_RETRIES); - RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, RPC_TIMEOUT); + initConf = buildTableInitConf(); + initConnection = ConnectionFactory.createConnection(initConf); + initAdmin = initConnection.getAdmin(); + createReplicationTable(); + int maxRetries = conf.getInt("hbase.replication.createtable.retries.number", + DEFAULT_CLIENT_RETRIES); + RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, DEFAULT_RPC_TIMEOUT); RetryCounter retryCounter = counterFactory.create(); - while (!replicationTableExists()) { + while (!replicationTableAvailable()) { retryCounter.sleepUntilNextRetry(); if (!retryCounter.shouldRetry()) { throw new IOException("Unable to acquire the Replication Table"); } } replicationTableInitialized.countDown(); + initAdmin.close(); } catch (IOException | InterruptedException e) { abortable.abort("Failed building Replication Table", e); } } + private Configuration buildTableInitConf() { + Configuration tempConf = new Configuration(conf); + tempConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, initRetries); + tempConf.setInt(HConstants.HBASE_CLIENT_PAUSE, initPause); + tempConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, initRpcTimeout); + tempConf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, initOperationTimeout); + return tempConf; + } + /** * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR * in TableBasedReplicationQueuesImpl @@ -418,7 +476,7 @@ abstract class ReplicationTableBase { HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME); replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR); try { - admin.createTable(replicationTableDescriptor); + initAdmin.createTable(replicationTableDescriptor); } catch (TableExistsException e) { // In this case we can just continue as normal } @@ -430,12 +488,15 @@ abstract class ReplicationTableBase { * @return whether the Replication Table exists * @throws IOException */ - private boolean replicationTableExists() { + private boolean replicationTableAvailable() { try { - return admin.tableExists(REPLICATION_TABLE_NAME); - } catch (IOException e) { + return initAdmin.tableExists(REPLICATION_TABLE_NAME) && + initAdmin.isTableAvailable(REPLICATION_TABLE_NAME); + } catch (Exception e) { + // Catches a null pointer exception that is thrown when admin.tableExists() is called too + // early. In this case assume the Table is not available yet. return false; } } } -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java index dcbed7a..130827b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java @@ -56,7 +56,7 @@ public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase } @Override - public List getListOfReplicators() { + public List getListOfReplicators() throws ReplicationException { return super.getListOfReplicators(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java index 3ee6fde..31fb602 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java @@ -87,7 +87,7 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase } @Override - public List getListOfReplicators() { + public List getListOfReplicators() throws ReplicationException { return super.getListOfReplicators(); } @@ -126,7 +126,7 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase } } catch (IOException | ReplicationException e) { String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename; - abortable.abort(errMsg, e); + throw new ReplicationException(errMsg, e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 586aace..3906c7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -63,6 +63,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; /** @@ -723,7 +725,28 @@ public class ReplicationSourceManager implements ReplicationListener { @Override public void run() { - List currentReplicators = replicationQueues.getListOfReplicators(); + List currentReplicators = null; + boolean success = false; + // Retry for as long as we can with 1 second sleeps + RetryCounterFactory retryCounterFactory = new RetryCounterFactory(Integer.MAX_VALUE, 1000); + RetryCounter retries = retryCounterFactory.create(); + while (!success && retries.shouldRetry()) { + try { + // getListOfReplicators() can throw an exception if Table based replication is being used + // and Replication Table is not up yet. It can also return null if no un-adopted queues + // exist. + currentReplicators = replicationQueues.getListOfReplicators(); + success = true; + } catch (ReplicationException e) { + try { + retries.sleepUntilNextRetry(); + LOG.warn("AdoptAbandonedQueuesWorker has failed getting list of replicators attempt=" + + retries.getAttemptTimes()); + } catch (InterruptedException ie) { + return; + } + } + } if (currentReplicators == null || currentReplicators.size() == 0) { return; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java index 8d66c8f..596e4c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.util.HBaseFsck; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -105,7 +106,6 @@ public class ReplicationChecker { undeletedQueueIds.put(replicator, new ArrayList()); } undeletedQueueIds.get(replicator).add(queueId); - String msg = "Undeleted replication queue for removed peer found: " + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(), replicator, queueId); @@ -114,8 +114,8 @@ public class ReplicationChecker { } } } - } catch (KeeperException ke) { - throw new IOException(ke); + } catch (KeeperException | ReplicationException e) { + throw new IOException(e); } checkUnDeletedHFileRefsQueues(peerIds); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java index 7ec6df8..e34635b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java @@ -85,6 +85,8 @@ public class TestReplicationStateHBaseImpl { TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class", TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); + // Set the table initialization pause lower to speed up the test + conf.setInt("hbase.replication.table.init.pause", 100); utility.startMiniCluster(); zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java index 83fdad7..ed2a3de 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -58,6 +59,8 @@ public class TestReplicationTableBase { TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class", TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); + // Set the table initialization pause lower to speed up the test + conf.setInt("hbase.replication.table.init.pause", 100); zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); utility.waitFor(0, TIME_OUT_MILLIS, new Waiter.ExplainingPredicate() { @Override @@ -102,8 +105,12 @@ public class TestReplicationTableBase { public class RequestReplicationQueueData extends Thread { @Override public void run() { - assertEquals(0, rq.getListOfReplicators().size()); - asyncRequestSuccess = true; + try { + assertEquals(0, rq.getListOfReplicators().size()); + asyncRequestSuccess = true; + } catch (ReplicationException e) { + + } } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java index e606257..7d2fc61 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java @@ -44,6 +44,8 @@ public class TestTableBasedReplicationSourceManagerImpl extends TestReplicationS conf.setLong("replication.sleep.before.failover", 2000); conf.setInt("replication.source.maxretriesmultiplier", 10); + // Set the table initialization pause lower to speed up the test + conf.setInt("hbase.replication.table.init.pause", 100); conf.setClass("hbase.region.replica.replication.replicationQueues.class", TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class", -- 2.8.0-rc2