From a5cd7079f2932f5da2f6527e336e9c6bcadaff27 Mon Sep 17 00:00:00 2001 From: Sukumar Maddineni Date: Thu, 8 Jun 2017 17:00:30 -0700 Subject: [PATCH] HBASE-16138 Large number of changes that affects how cluster startup occurs. We now lazily register WAL's in Replication, they are only registered if they are being used to open a Replicated Region. TableBasedReplication will not fail fast on calls to addLog before Replication is up. This will allow clusters to start up with Replication Table hosted on any region. Also included changes to update default wait/retry values for a more realistic setup. --- .../org/apache/hadoop/hbase/HTableDescriptor.java | 13 ++ .../hbase/replication/ReplicationQueues.java | 10 +- .../hbase/replication/ReplicationQueuesClient.java | 2 +- .../hbase/replication/ReplicationQueuesZKImpl.java | 5 + .../hbase/replication/ReplicationTableBase.java | 233 ++++++++++++++++----- .../TableBasedReplicationQueuesClientImpl.java | 2 +- .../TableBasedReplicationQueuesImpl.java | 71 +++++-- .../master/cleaner/ReplicationZKNodeCleaner.java | 3 +- .../hadoop/hbase/regionserver/HRegionServer.java | 7 + .../regionserver/ReplicationSourceService.java | 5 + .../hbase/regionserver/wal/AbstractFSWAL.java | 12 ++ .../replication/regionserver/Replication.java | 7 + .../regionserver/ReplicationSourceManager.java | 73 ++++++- .../hadoop/hbase/wal/AbstractFSWALProvider.java | 4 + .../hadoop/hbase/wal/DisabledWALProvider.java | 10 + .../main/java/org/apache/hadoop/hbase/wal/WAL.java | 4 + .../replication/TestReplicationStateHBaseImpl.java | 5 + .../replication/TestReplicationTableBase.java | 14 +- .../regionserver/TestReplicationSourceManager.java | 8 +- .../TestReplicationSourceManagerZkImpl.java | 26 +++ ...TestTableBasedReplicationSourceManagerImpl.java | 64 +++++- 21 files changed, 496 insertions(+), 82 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index bf58d73941..4a35e40e05 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -837,4 +837,17 @@ public class HTableDescriptor implements TableDescriptor, Comparable getListOfReplicators(); + List getListOfReplicators() throws ReplicationException; /** * Checks if the provided znode is the same as this region server's diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java index 6d8900e60b..160a9d2f12 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java @@ -43,7 +43,7 @@ public interface ReplicationQueuesClient { * @return a list of server names * @throws KeeperException zookeeper exception */ - List getListOfReplicators() throws KeeperException; + List getListOfReplicators() throws KeeperException, ReplicationException; /** * Get a list of all WALs in the given queue on the given region server. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 4733706c3b..5dd7bc1251 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -125,6 +125,11 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } @Override + public void addLogFailFast(String queueId, String filename) throws ReplicationException { + addLog(queueId, filename); + } + + @Override public void removeLog(String queueId, String filename) { try { String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java index 4606e22fc7..f4dd489643 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java @@ -72,7 +72,7 @@ import java.util.concurrent.TimeUnit; * to the caller to close the returned table. */ @InterfaceAudience.Private -abstract class ReplicationTableBase { +public abstract class ReplicationTableBase { /** Name of the HBase Table used for tracking replication*/ public static final TableName REPLICATION_TABLE_NAME = @@ -88,7 +88,6 @@ abstract class ReplicationTableBase { new HColumnDescriptor(CF_QUEUE).setMaxVersions(1) .setInMemory(true) .setScope(HConstants.REPLICATION_SCOPE_LOCAL) - // TODO: Figure out which bloom filter to use .setBloomFilterType(BloomType.NONE); // The value used to delimit the queueId and server name inside of a queue's row key. Currently a @@ -100,29 +99,71 @@ abstract class ReplicationTableBase { public static final String QUEUE_HISTORY_DELIMITER = "|"; /* - * Make sure that HBase table operations for replication have a high number of retries. This is - * because the server is aborted if any HBase table operation fails. Each RPC will be attempted - * 3600 times before exiting. This provides each operation with 2 hours of retries - * before the server is aborted. + * Make sure that normal HBase Replication Table operations for replication have a high number of + * retries. This is because the server is aborted if any HBase table operation fails. Each RPC will + * be attempted 240 times before exiting. */ - private static final int CLIENT_RETRIES = 3600; - private static final int RPC_TIMEOUT = 2000; - private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT; + private static final int DEFAULT_CLIENT_RETRIES = 240; + private static final int DEFAULT_CLIENT_PAUSE = 5000; + private static final int DEFAULT_RPC_TIMEOUT = 120000; + + /* + * Make sure that the HBase Replication Table initialization has the proper timeouts. Because + * HBase servers can come up a lot sooner than the cluster is ready to create tables and this + * is a one time operation, we can accept longer pauses than normal. + */ + private static final int DEFAULT_INIT_RETRIES = 240; + private static final int DEFAULT_INIT_PAUSE = 60000; + private static final int DEFAULT_INIT_RPC_TIMEOUT = 120000; + + /* + * Used for fast fail table operations. Primarily ReplicationQueues.addLog(), which blocks + * during region opening, but is supposed to fail quickly if Replication is not up yet. Increasing + * these retry values will slow down cluster initialization + */ + private static final int DEFAULT_FAST_FAIL_RETRIES = 1; + private static final int DEFAULT_FAST_FAIL_PAUSE = 0; + private static final int DEFAULT_FAST_FAIL_TIMEOUT = 60000; + + /* + * Determine the polling frequency used to check when the Replication Table comes up. With the + * default options we will poll in intervals of 100 ms forever. + */ + private static final int DEFAULT_WAIT_TABLE_RETRIES = Integer.MAX_VALUE; + private static final int DEFAULT_WAIT_TABLE_PAUSE = 100; // We only need a single thread to initialize the Replication Table private static final int NUM_INITIALIZE_WORKERS = 1; protected final Configuration conf; + protected final Configuration fastFailConf; protected final Abortable abortable; private final Connection connection; + private final Connection fastFailConnection; private final Executor executor; private volatile CountDownLatch replicationTableInitialized; + private int clientRetries; + private int clientPause; + private int rpcTimeout; + private int operationTimeout; + private int initRetries; + private int initPause; + private int initRpcTimeout; + private int initOperationTimeout; + private int fastFailTimeout; + private int fastFailPause; + private int fastFailRetries; + private int fastFailOperationTimeout; + public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException { this.conf = new Configuration(conf); + this.fastFailConf = new Configuration(conf); this.abortable = abort; - decorateConf(); + readTimeoutConf(); + decorateTimeoutConf(); this.connection = ConnectionFactory.createConnection(this.conf); + this.fastFailConnection = ConnectionFactory.createConnection(this.fastFailConf); this.executor = setUpExecutor(); this.replicationTableInitialized = new CountDownLatch(1); createReplicationTableInBackground(); @@ -132,8 +173,48 @@ abstract class ReplicationTableBase { * Modify the connection's config so that operations run on the Replication Table have longer and * a larger number of retries */ - private void decorateConf() { - this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES); + private void readTimeoutConf() { + clientRetries = conf.getInt("hbase.replication.table.client.retries", DEFAULT_CLIENT_RETRIES); + clientPause = conf.getInt("hbase.replication.table.client.pause", DEFAULT_CLIENT_PAUSE); + rpcTimeout = conf.getInt("hbase.replication.table.rpc.timeout", DEFAULT_RPC_TIMEOUT); + initRetries = conf.getInt("hbase.replication.table.init.retries", DEFAULT_INIT_RETRIES); + initPause = conf.getInt("hbase.replication.table.init.pause", DEFAULT_INIT_PAUSE); + initRpcTimeout = conf.getInt("hbase.replication.table.init.timeout", DEFAULT_INIT_RPC_TIMEOUT); + fastFailTimeout= conf.getInt("hbase.replication.table.fastfail.timeout", + DEFAULT_FAST_FAIL_TIMEOUT); + fastFailRetries = conf.getInt("hbase.replication.table.fastfail.retries", + DEFAULT_FAST_FAIL_RETRIES); + fastFailPause = conf.getInt("hbase.replication.table.fastfail.pause", DEFAULT_FAST_FAIL_PAUSE); + fastFailOperationTimeout = getOperationTimeout(fastFailRetries, fastFailPause, fastFailTimeout); + operationTimeout = getOperationTimeout(clientRetries, clientPause, rpcTimeout); + initOperationTimeout = getOperationTimeout(initRetries, initPause, initRpcTimeout); + } + + /** + * Calculate the operation timeout for a retried RPC request + * @param retries times we retry a request + * @param pause pause between failed requests + * @param rpcTimeout timeout of a RPC request + * @return the operation timeout for a retried RPC request + */ + private int getOperationTimeout(int retries, int pause, int rpcTimeout) { + return retries * (pause + rpcTimeout); + } + + /** + * Set up the configuration values for normal Replication Table operations + */ + private void decorateTimeoutConf() { + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, clientPause); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, clientRetries); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); + conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, operationTimeout); + conf.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, operationTimeout); + fastFailConf.setInt(HConstants.HBASE_CLIENT_PAUSE, fastFailPause); + fastFailConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, fastFailRetries); + fastFailConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, fastFailTimeout); + fastFailConf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, fastFailOperationTimeout); + fastFailConf.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, fastFailOperationTimeout); } /** @@ -159,15 +240,6 @@ abstract class ReplicationTableBase { } /** - * Increases the RPC and operations timeouts for the Replication Table - */ - private Table setReplicationTableTimeOuts(Table replicationTable) { - replicationTable.setRpcTimeout(RPC_TIMEOUT); - replicationTable.setOperationTimeout(OPERATION_TIMEOUT); - return replicationTable; - } - - /** * Build the row key for the given queueId. This will uniquely identify it from all other queues * in the cluster. * @param serverName The owner of the queue @@ -221,7 +293,7 @@ abstract class ReplicationTableBase { * be alive, dead or from a previous run of the cluster. * @return a list of server names */ - protected List getListOfReplicators() { + protected List getListOfReplicators() throws ReplicationException{ // scan all of the queues and return a list of all unique OWNER values Set peerServers = new HashSet<>(); ResultScanner allQueuesInCluster = null; @@ -233,8 +305,7 @@ abstract class ReplicationTableBase { peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER))); } } catch (IOException e) { - String errMsg = "Failed getting list of replicators"; - abortable.abort(errMsg, e); + throw new ReplicationException(e); } finally { if (allQueuesInCluster != null) { allQueuesInCluster.close(); @@ -351,7 +422,29 @@ abstract class ReplicationTableBase { e.getMessage(); throw new InterruptedIOException(errMsg); } - return getAndSetUpReplicationTable(); + return getReplicationTable(connection, true); + } + + /** + * Attempts to acquire the Replication Table. This operation will immediately throw an exception + * if the Replication Table is not up yet + * + * @return the Replication Table + * @throws IOException + */ + protected Table getOrFailFastReplication() throws IOException { + if (replicationTableInitialized.getCount() != 0) { + throw new IOException("getOrFailFastReplication() failed because replication is not " + + "available yet"); + } + try { + replicationTableInitialized.await(); + } catch (InterruptedException e) { + String errMsg = "Unable to acquire the Replication Table due to InterruptedException: " + + e.getMessage(); + throw new InterruptedIOException(errMsg); + } + return getReplicationTable(fastFailConnection, false); } /** @@ -360,13 +453,47 @@ abstract class ReplicationTableBase { * @return the Replication Table * @throws IOException */ - private Table getAndSetUpReplicationTable() throws IOException { + private Table getReplicationTable(Connection connection, boolean setTimeout) throws IOException { Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME); - setReplicationTableTimeOuts(replicationTable); + if(setTimeout){ + setReplicationTableTimeOuts(replicationTable); + } return replicationTable; } /** + * Blocks until the Replication Table is available + * @throws InterruptedException + */ + public void blockUntilReplicationIsAvailable() throws InterruptedException { + replicationTableInitialized.await(); + } + + /** + * Increases the RPC and operations timeouts for the Replication Table + */ + private Table setReplicationTableTimeOuts(Table replicationTable) { + replicationTable.setRpcTimeout(rpcTimeout); + replicationTable.setOperationTimeout(operationTimeout); + return replicationTable; + } + + /* + * Checks whether the Replication Table exists yet + * + * @return whether the Replication Table exists + * @throws IOException + */ + private boolean replicationTableAvailable() { + try (Admin tempAdmin = connection.getAdmin()){ + return tempAdmin.tableExists(REPLICATION_TABLE_NAME) && + tempAdmin.isTableAvailable(REPLICATION_TABLE_NAME); + } catch (IOException e) { + return false; + } + } + + /** * Builds the Replication Table in a background thread. Any method accessing the Replication Table * should do so through getOrBlockOnReplicationTable() * @@ -383,20 +510,24 @@ abstract class ReplicationTableBase { */ private class CreateReplicationTableWorker implements Runnable { - private Admin admin; + private Configuration initConf; + private Connection initConnection; + private Admin initAdmin; @Override public void run() { try { - admin = connection.getAdmin(); - if (!replicationTableExists()) { - createReplicationTable(); - } - int maxRetries = conf.getInt("hbase.replication.queues.createtable.retries.number", - CLIENT_RETRIES); - RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, RPC_TIMEOUT); + initConf = buildTableInitConf(); + initConnection = ConnectionFactory.createConnection(initConf); + initAdmin = initConnection.getAdmin(); + createReplicationTable(); + int maxRetries = conf.getInt("hbase.replication.waittable.retries", + DEFAULT_WAIT_TABLE_RETRIES); + int pause = conf.getInt("hbase.replication.waittable.retries.pause", + DEFAULT_WAIT_TABLE_PAUSE); + RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, pause); RetryCounter retryCounter = counterFactory.create(); - while (!replicationTableExists()) { + while (!replicationTableAvailable()) { retryCounter.sleepUntilNextRetry(); if (!retryCounter.shouldRetry()) { throw new IOException("Unable to acquire the Replication Table"); @@ -418,24 +549,28 @@ abstract class ReplicationTableBase { HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME); replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR); try { - admin.createTable(replicationTableDescriptor); + if (initAdmin.tableExists(REPLICATION_TABLE_NAME)) { + return; + } + } catch (Exception e) { + // If this tableExists is called to early admin will throw a null pointer exception. In this + // case proceed to create the Replication Table as we normally would. + } + try { + initAdmin.createTableAsync(replicationTableDescriptor, null); } catch (TableExistsException e) { // In this case we can just continue as normal } } - /** - * Checks whether the Replication Table exists yet - * - * @return whether the Replication Table exists - * @throws IOException - */ - private boolean replicationTableExists() { - try { - return admin.tableExists(REPLICATION_TABLE_NAME); - } catch (IOException e) { - return false; - } + private Configuration buildTableInitConf() { + Configuration initConfiguration = new Configuration(conf); + initConfiguration.setInt(HConstants.HBASE_CLIENT_PAUSE, initPause); + initConfiguration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, initRetries); + initConfiguration.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, initRpcTimeout); + initConfiguration.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, initOperationTimeout); + initConfiguration.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, initOperationTimeout); + return initConfiguration; } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java index 35075473bd..68d3e0e406 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java @@ -56,7 +56,7 @@ public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase } @Override - public List getListOfReplicators() { + public List getListOfReplicators() throws ReplicationException { return super.getListOfReplicators(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java index bf55e8cf61..4a0b909405 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java @@ -91,7 +91,7 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase } @Override - public List getListOfReplicators() { + public List getListOfReplicators() throws ReplicationException { return super.getListOfReplicators(); } @@ -112,28 +112,53 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase } } + // addLog() will throw a ReplicationException immediately if the Replication Table is not up. It + // will not wait and block on Replication to come up like the other methods @Override public void addLog(String queueId, String filename) throws ReplicationException { try (Table replicationTable = getOrBlockOnReplicationTable()) { - if (!checkQueueExists(queueId)) { - // Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values - Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId))); - putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes); - putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, EMPTY_STRING_BYTES); - putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); - replicationTable.put(putNewQueue); - } else { - // Otherwise simply add the new log and offset as a new column - Put putNewLog = new Put(queueIdToRowKey(queueId)); - putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); - safeQueueUpdate(putNewLog); - } + // The following line will throw an exception if it fails to read Replication Table with the + // fastFail config options + addLog(queueId, filename, replicationTable, checkQueueExists(queueId)); } catch (IOException | ReplicationException e) { String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename; - abortable.abort(errMsg, e); + throw new ReplicationException(errMsg, e); + } + } + + @Override + public void addLogFailFast(String queueId, String filename) throws ReplicationException { + try (Table replicationTable = getOrFailFastReplication()) { + addLog(queueId, filename, replicationTable, checkQueueExistsFailFast(queueId)); + } catch (IOException | ReplicationException e) { + String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename; + throw new ReplicationException(errMsg, e); + } + } + + private void addLog(String queueId, String filename, Table replicationTable, boolean queueExists) + throws IOException, ReplicationException{ + if (!queueExists) { + // Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values + Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId))); + putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes); + putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, EMPTY_STRING_BYTES); + putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); + replicationTable.put(putNewQueue); + } else { + // Otherwise simply add the new log and offset as a new column + Put putNewLog = new Put(queueIdToRowKey(queueId)); + putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); + boolean updateSuccess = replicationTable.checkAndPut(putNewLog.getRow(), + CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, serverNameBytes, putNewLog); + if (!updateSuccess) { + throw new ReplicationException("Failed to add new log queueId=" + queueId + + " filename=" + filename); + } } } + @Override public void removeLog(String queueId, String filename) { try { @@ -185,7 +210,7 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase return Bytes.toLong(result.getValue(CF_QUEUE, Bytes.toBytes(filename))); } catch (IOException e) { throw new ReplicationException("Could not get position in log for queueId=" + queueId + - ", filename=" + filename); + ", filename=" + filename, e); } } @@ -374,15 +399,23 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase } } + private boolean checkQueueExists(String queueId) throws IOException{ + try (Table replicationTable = getOrBlockOnReplicationTable()) { + byte[] rowKey = queueIdToRowKey(queueId); + return replicationTable.exists(new Get(rowKey)); + } + } + /** - * Check if the queue specified by queueId is stored in HBase + * Check if the queue specified by queueId is stored in HBase. This method will throw an exception + * if it fails to read the Replication Table after the options set in fastFailConfig * * @param queueId Either raw or reclaimed format of the queueId * @return Whether the queue is stored in HBase * @throws IOException */ - private boolean checkQueueExists(String queueId) throws IOException { - try (Table replicationTable = getOrBlockOnReplicationTable()) { + private boolean checkQueueExistsFailFast(String queueId) throws IOException { + try (Table replicationTable = getOrFailFastReplication()) { byte[] rowKey = queueIdToRowKey(queueId); return replicationTable.exists(new Get(rowKey)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java index dafc4f8f82..2b2a440d4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; @@ -92,7 +93,7 @@ public class ReplicationZKNodeCleaner { } } } - } catch (KeeperException ke) { + } catch (KeeperException | ReplicationException ke) { throw new IOException("Failed to get the replication queues of all replicators", ke); } return undeletedQueues; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index ef99e47e17..bdc260cf8a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2034,6 +2034,9 @@ public class HRegionServer extends HasThread implements } else { byte[] namespace = regionInfo.getTable().getNamespace(); wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes(), namespace); + if (getReplicationSourceService() != null && checkReplication(regionInfo)) { + getReplicationSourceService().registerWal(wal); + } } walRoller.addWAL(wal); return wal; @@ -3629,6 +3632,10 @@ public class HRegionServer extends HasThread implements configurationManager.notifyAllObservers(conf); } + public boolean checkReplication(HRegionInfo regionInfo) throws IOException { + return tableDescriptors.get(regionInfo.getTable()).checkAnyReplicatedFamilies(); + } + @Override public double getCompactionPressure() { double max = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index 13b502b9d5..9f98b905c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.wal.WAL; + +import java.io.IOException; /** * A source for a replication stream has to expose this service. @@ -33,4 +36,6 @@ public interface ReplicationSourceService extends ReplicationService { * observe log rolls and log archival events. */ WALActionsListener getWALActionsListener(); + + void registerWal(WAL wal) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index f32d0edc66..bb3493f746 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -486,6 +486,14 @@ public abstract class AbstractFSWAL implements WAL { return rollWriter(false); } + public void lockRollWriter() { + rollWriterLock.lock(); + } + + public void unlockRollWriter() { + rollWriterLock.unlock(); + } + /** * This is a convenience method that computes a new filename with a given file-number. * @param filenum to use @@ -976,6 +984,10 @@ public abstract class AbstractFSWAL implements WAL { return txid; } + public String getWalPrefix() { + return walFilePrefix; + } + @Override public String toString() { return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 9cc9c7cfd7..2da8fc2535 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.zookeeper.KeeperException; @@ -167,6 +168,7 @@ public class Replication extends WALActionsListener.Base implements public WALActionsListener getWALActionsListener() { return this; } + /** * Stops replication service. */ @@ -289,6 +291,11 @@ public class Replication extends WALActionsListener.Base implements getReplicationManager().postLogRoll(newPath); } + @Override + public void registerWal(WAL wal) throws IOException { + replicationManager.registerWal(wal); + } + /** * This method modifies the master's configuration in order to inject replication-related features * @param conf diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index cb631c11cc..887043ce79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -69,8 +69,11 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WAL; /** * This class is responsible to manage all the replication @@ -124,6 +127,8 @@ public class ReplicationSourceManager implements ReplicationListener { private final Random rand; private final boolean replicationForBulkLoadDataEnabled; + private Set registeredWALs; + private Connection connection; private long replicationWaitTime; @@ -183,6 +188,7 @@ public class ReplicationSourceManager implements ReplicationListener { this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY, HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT); connection = ConnectionFactory.createConnection(conf); + this.registeredWALs = Collections.synchronizedSet(new HashSet()); } /** @@ -364,9 +370,17 @@ public class ReplicationSourceManager implements ReplicationListener { } void preLogRoll(Path newLog) throws IOException { - recordLog(newLog); String logName = newLog.getName(); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); + if (registeredWALs.contains(logPrefix)) { + recordLogAndLatestPath(newLog, false); + } + } + + private void recordLogAndLatestPath(Path newLog, boolean failFast) throws IOException { + String logName = newLog.getName(); + String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); + recordLog(newLog, failFast); synchronized (latestPaths) { Iterator iterator = latestPaths.iterator(); while (iterator.hasNext()) { @@ -386,7 +400,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @param logPath the log path to check and enqueue * @throws IOException */ - private void recordLog(Path logPath) throws IOException { + private void recordLog(Path logPath, boolean failFast) throws IOException { String logName = logPath.getName(); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); // update replication queues on ZK @@ -394,7 +408,11 @@ public class ReplicationSourceManager implements ReplicationListener { synchronized (replicationPeers) { for (String id : replicationPeers.getConnectedPeerIds()) { try { - this.replicationQueues.addLog(id, logName); + if (failFast) { + this.replicationQueues.addLogFailFast(id, logName); + } else { + this.replicationQueues.addLog(id, logName); + } } catch (ReplicationException e) { throw new IOException("Cannot add log to replication queue" + " when creating a new source, queueId=" + id + ", filename=" + logName, e); @@ -431,12 +449,38 @@ public class ReplicationSourceManager implements ReplicationListener { } void postLogRoll(Path newLog) throws IOException { + String logName = newLog.getName(); + String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); + if (registeredWALs.contains(logPrefix)) { + enqueueNewLog(newLog); + } + } + + void enqueueNewLog(Path newLog) { // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources) { source.enqueueLog(newLog); } } + void registerWal(WAL wal) throws IOException { + try { + wal.lockRollWriter(); + if (registeredWALs.contains(AbstractFSWALProvider.getWalFilePrefix(wal))) { + return; + } + // Perform the prelog and postlog roll actions + // We have to lock on the rollwriter right here + recordLogAndLatestPath(AbstractFSWALProvider.getCurrentFileName(wal), true); + enqueueNewLog(AbstractFSWALProvider.getCurrentFileName(wal)); + // recordLogAndLatestPath will throw an exception if it fails and the WAL will not be + // registered + registeredWALs.add(AbstractFSWALProvider.getWalFilePrefix(wal)); + } finally { + wal.unlockRollWriter(); + } + } + @VisibleForTesting public AtomicLong getTotalBufferUsed() { return totalBufferUsed; @@ -768,7 +812,28 @@ public class ReplicationSourceManager implements ReplicationListener { @Override public void run() { - List currentReplicators = replicationQueues.getListOfReplicators(); + List currentReplicators = null; + RetryCounterFactory retryCounterFactory = new RetryCounterFactory(Integer.MAX_VALUE, 1000, 60000); + RetryCounter retryCounter = retryCounterFactory.create(); + while (currentReplicators == null) { + try { + // Table based replication will throw an exception if the Replication Table is not up yet. + // In that case we just sleep and retry later. + currentReplicators = replicationQueues.getListOfReplicators(); + } catch (ReplicationException e) { + try { + LOG.warn("AdoptAbandonedQueuesWorker failed to get list of replicators retrying. " + + "retries=" + retryCounter.getAttemptTimes()); + retryCounter.useRetry(); + retryCounter.sleepUntilNextRetry(); + } catch (InterruptedException ie) { + LOG.error("AdoptAbandonedQueuesWorker received an InterruptedException while sleeping" + + " between retries. No queues were adopted."); + return; + } + } + } + if (currentReplicators == null || currentReplicators.isEmpty()) { return; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 28b7fda285..77268b1435 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -205,6 +205,10 @@ public abstract class AbstractFSWALProvider> implemen return ((AbstractFSWAL) wal).getCurrentFileName(); } + public static String getWalFilePrefix(final WAL wal) { + return ((AbstractFSWAL) wal).getWalPrefix(); + } + /** * request a log roll, but don't actually do it. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index b442f07c1e..9f7e9aea66 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -161,6 +161,16 @@ class DisabledWALProvider implements WALProvider { } @Override + public void lockRollWriter() { + // no-op + } + + @Override + public void unlockRollWriter() { + // no-op + } + + @Override public long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException { if (!this.listeners.isEmpty()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 2ae20cf680..e6dcc7f131 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -100,6 +100,10 @@ public interface WAL extends Closeable { */ void close() throws IOException; + void lockRollWriter(); + + void unlockRollWriter(); + /** * Append a set of edits to the WAL. The WAL is not flushed/sync'd after this transaction * completes BUT on return this edit must have its region edit/sequence id assigned diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java index 3685d6daaf..3e79444f91 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java @@ -84,6 +84,8 @@ public class TestReplicationStateHBaseImpl { TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class", TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); + // Set the table initialization pause lower to speed up the test + conf.setInt("hbase.replication.table.init.pause", 100); utility.startMiniCluster(); zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); @@ -107,6 +109,9 @@ public class TestReplicationStateHBaseImpl { rq3.init(server3); rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); rp.init(); + ((TableBasedReplicationQueuesImpl) rq1).blockUntilReplicationIsAvailable(); + ((TableBasedReplicationQueuesImpl) rq2).blockUntilReplicationIsAvailable(); + ((TableBasedReplicationQueuesImpl) rq3).blockUntilReplicationIsAvailable(); } catch (Exception e) { fail("testReplicationStateHBaseConstruction received an exception" + e.getMessage()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java index 83fdad7516..06d08aaddc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java @@ -31,6 +31,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests ReplicationTableBase behavior when the Master startup is delayed. The table initialization @@ -58,6 +59,8 @@ public class TestReplicationTableBase { TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class", TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); + // Set the table initialization pause lower to speed up the test + conf.setInt("hbase.replication.table.init.pause", 100); zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); utility.waitFor(0, TIME_OUT_MILLIS, new Waiter.ExplainingPredicate() { @Override @@ -79,8 +82,7 @@ public class TestReplicationTableBase { final RequestReplicationQueueData async = new RequestReplicationQueueData(); async.start(); Thread.sleep(SLEEP_MILLIS); - // Test that the Replication Table has not been assigned and the methods are blocking - assertFalse(rb.getInitializationStatus()); + // Check that the replication table operation is indeed blocking assertFalse(asyncRequestSuccess); utility.startMiniCluster(); // Test that the methods do return the correct results after getting the table @@ -102,8 +104,12 @@ public class TestReplicationTableBase { public class RequestReplicationQueueData extends Thread { @Override public void run() { - assertEquals(0, rq.getListOfReplicators().size()); - asyncRequestSuccess = true; + try { + assertEquals(0, rq.getListOfReplicators().size()); + asyncRequestSuccess = true; + } catch (ReplicationException e) { + fail(e.getMessage()); + } } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 26aee6d52a..05df25885e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -230,6 +230,7 @@ public abstract class TestReplicationSourceManager { URLEncoder.encode("regionserver:60020", "UTF8")); final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()); manager.init(); + manager.registerWal(wal); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame")); htd.addFamily(new HColumnDescriptor(f1)); NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); @@ -297,6 +298,7 @@ public abstract class TestReplicationSourceManager { ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper())); rq.init(server.getServerName().toString()); + waitUntilReplicationEnabled(rq); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); @@ -353,6 +355,7 @@ public abstract class TestReplicationSourceManager { ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, s1.getZooKeeper())); rq1.init(s1.getServerName().toString()); + waitUntilReplicationEnabled(rq); ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); rp1.init(); @@ -519,7 +522,9 @@ public abstract class TestReplicationSourceManager { return logEdit; } - static class DummyNodeFailoverWorker extends Thread { + abstract void waitUntilReplicationEnabled(ReplicationQueues rq) throws InterruptedException; + + class DummyNodeFailoverWorker extends Thread { private Map> logZnodesMap; Server server; private String deadRsZnode; @@ -532,6 +537,7 @@ public abstract class TestReplicationSourceManager { ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper())); this.rq.init(this.server.getServerName().toString()); + waitUntilReplicationEnabled(rq); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java index 945d9f4e8c..02fa1fe5b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java @@ -150,4 +150,30 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan s0.stop(""); } + + @Test + public void testCleanupUnknownPeerZNode() throws Exception { + final Server server = new DummyServer("hostname2.example.org"); + ReplicationQueues rq = ReplicationFactory.getReplicationQueues( + new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper())); + rq.init(server.getServerName().toString()); + // populate some znodes in the peer znode + // add log to an unknown peer + String group = "testgroup"; + rq.addLog("2", group + ".log1"); + rq.addLog("2", group + ".log2"); + + ReplicationSourceManager.NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName()); + w1.run(); + + // The log of the unknown peer should be removed from zk + for (String peer : manager.getAllQueues()) { + assertTrue(peer.startsWith("1")); + } + } + + @Override + void waitUntilReplicationEnabled(ReplicationQueues rq) throws InterruptedException { + // no-op + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java index e606257af6..411bc5b093 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java @@ -20,16 +20,32 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; +import org.apache.hadoop.hbase.replication.ReplicationTableBase; import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesClientImpl; import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesImpl; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.BeforeClass; +import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.IOException; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.fail; + /** * Tests the ReplicationSourceManager with TableBasedReplicationQueue's and * TableBasedReplicationQueuesClient @@ -43,14 +59,60 @@ public class TestTableBasedReplicationSourceManagerImpl extends TestReplicationS ReplicationSourceDummy.class.getCanonicalName()); conf.setLong("replication.sleep.before.failover", 2000); conf.setInt("replication.source.maxretriesmultiplier", 10); - conf.setClass("hbase.region.replica.replication.replicationQueues.class", TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class", TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); + // Set the table initialization pause lower to speed up the test + conf.setInt("hbase.replication.table.init.pause", 100); utility = new HBaseTestingUtility(conf); utility.startMiniCluster(); setupZkAndReplication(); } + /** + * Test the prelog roll procedure for when Replication is not up. This simulates the cluster + * initialization process. + */ + @Test + public void TestPrelogRoll() throws Exception { + ReplicationPeers peers = replication.getReplicationManager().getReplicationPeers(); + peers.registerPeer("peer", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1")); + peers.peerConnected("peer"); + try { + // Check that the hardcoded WAL name that we use is valid + TableBasedReplicationQueuesImpl rq = new TableBasedReplicationQueuesImpl( + new ReplicationQueuesArguments(conf, zkw, zkw)); + List listeners = new ArrayList<>(); + listeners.add(replication); + waitUntilReplicationEnabled(rq); + utility.getAdmin().disableTable(ReplicationTableBase.REPLICATION_TABLE_NAME); + final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners, + URLEncoder.encode("regionserver:60020", "UTF8")); + try { + WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()); + replication.registerWal(wal); + fail("RegisteringWal should fail while replication is not available"); + } catch (IOException e) { + } + final WAL nonReplicatedWal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()); + utility.getAdmin().enableTable(ReplicationTableBase.REPLICATION_TABLE_NAME); + replication.registerWal(nonReplicatedWal); + utility.getAdmin().disableTable(ReplicationTableBase.REPLICATION_TABLE_NAME); + try { + replication.preLogRoll(null, AbstractFSWALProvider.getCurrentFileName(nonReplicatedWal)); + fail("Prelog roll should have attempted to register the log and thrown an exception"); + } catch (IOException e) { + } + } finally { + utility.getAdmin().enableTable(ReplicationTableBase.REPLICATION_TABLE_NAME); + peers.unregisterPeer("peer"); + peers.peerDisconnected("peer"); + } + } + + @Override + void waitUntilReplicationEnabled(ReplicationQueues rq) throws InterruptedException{ + ((TableBasedReplicationQueuesImpl) rq).blockUntilReplicationIsAvailable(); + } } -- 2.11.0 (Apple Git-81)