From ff9927b99d4b24f1ef61866a260883c8c61cca9f Mon Sep 17 00:00:00 2001 From: Joseph Hwang Date: Mon, 18 Jul 2016 17:59:02 -0700 Subject: [PATCH] HBASE-16138 Large number of changes that affects how cluster startup occurs. We now lazily register WAL's in Replication, they are only registered if they are being used to open a Replicated Region. TableBasedReplication will not fail fast on calls to addLog before Replication is up. This will allow clusters to start up with Replication Table hosted on any region. Also included changes to update default wait/retry values for a more realistic setup. --- .../org/apache/hadoop/hbase/HTableDescriptor.java | 13 ++ .../hbase/replication/ReplicationQueues.java | 10 +- .../hbase/replication/ReplicationQueuesClient.java | 2 +- .../hbase/replication/ReplicationQueuesZKImpl.java | 5 + .../hbase/replication/ReplicationTableBase.java | 230 ++++++++++++++++----- .../TableBasedReplicationQueuesClientImpl.java | 2 +- .../TableBasedReplicationQueuesImpl.java | 71 +++++-- .../hadoop/hbase/regionserver/HRegionServer.java | 7 + .../regionserver/ReplicationSourceService.java | 7 +- .../hbase/regionserver/wal/AbstractFSWAL.java | 12 ++ .../hadoop/hbase/regionserver/wal/FSHLog.java | 3 +- .../replication/regionserver/Replication.java | 7 + .../regionserver/ReplicationSourceManager.java | 72 ++++++- .../hadoop/hbase/util/hbck/ReplicationChecker.java | 5 +- .../hadoop/hbase/wal/AbstractFSWALProvider.java | 4 + .../hadoop/hbase/wal/DisabledWALProvider.java | 10 + .../main/java/org/apache/hadoop/hbase/wal/WAL.java | 4 + .../replication/TestReplicationStateHBaseImpl.java | 5 + .../replication/TestReplicationTableBase.java | 14 +- .../regionserver/TestReplicationSourceManager.java | 30 +-- .../TestReplicationSourceManagerZkImpl.java | 26 +++ ...TestTableBasedReplicationSourceManagerImpl.java | 64 +++++- 22 files changed, 497 insertions(+), 106 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index ccad414..8aa87d9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -1555,4 +1555,17 @@ public class HTableDescriptor implements Comparable { public void removeConfiguration(final String key) { configuration.remove(key); } + + /** + * Checks if any of the column families within this table are replicated + * @return whether any of the column families within this table are replicated + */ + public boolean checkAnyReplicatedFamilies() { + for (HColumnDescriptor family : families.values()) { + if (family.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL) { + return true; + } + } + return false; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index 0de0cc8..f02a09f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -53,6 +53,14 @@ public interface ReplicationQueues { void addLog(String queueId, String filename) throws ReplicationException; /** + * Add a new WAL file to the given queue. If the queue does not exist it is created. If + * Replication is not initialized yet, this call will throw an exception immediately + * @param queueId a String that identifies the queue. + * @param filename name of the WAL + */ + void addLogFailFast(String queueId, String filename) throws ReplicationException; + + /** * Remove an WAL file from the given queue. * @param queueId a String that identifies the queue. * @param filename name of the WAL @@ -106,7 +114,7 @@ public interface ReplicationQueues { * be alive, dead or from a previous run of the cluster. * @return a list of server names */ - List getListOfReplicators(); + List getListOfReplicators() throws ReplicationException; /** * Checks if the provided znode is the same as this region server's diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java index 6d8900e..160a9d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java @@ -43,7 +43,7 @@ public interface ReplicationQueuesClient { * @return a list of server names * @throws KeeperException zookeeper exception */ - List getListOfReplicators() throws KeeperException; + List getListOfReplicators() throws KeeperException, ReplicationException; /** * Get a list of all WALs in the given queue on the given region server. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 655aaae..7726bb5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -124,6 +124,11 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } @Override + public void addLogFailFast(String queueId, String filename) throws ReplicationException { + addLog(queueId, filename); + } + + @Override public void removeLog(String queueId, String filename) { try { String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java index 61bb041..4219e88 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTableBase.java @@ -72,7 +72,7 @@ import java.util.concurrent.TimeUnit; * to the caller to close the returned table. */ @InterfaceAudience.Private -abstract class ReplicationTableBase { +public abstract class ReplicationTableBase { /** Name of the HBase Table used for tracking replication*/ public static final TableName REPLICATION_TABLE_NAME = @@ -88,7 +88,6 @@ abstract class ReplicationTableBase { new HColumnDescriptor(CF_QUEUE).setMaxVersions(1) .setInMemory(true) .setScope(HConstants.REPLICATION_SCOPE_LOCAL) - // TODO: Figure out which bloom filter to use .setBloomFilterType(BloomType.NONE); // The value used to delimit the queueId and server name inside of a queue's row key. Currently a @@ -100,29 +99,71 @@ abstract class ReplicationTableBase { public static final String QUEUE_HISTORY_DELIMITER = "|"; /* - * Make sure that HBase table operations for replication have a high number of retries. This is - * because the server is aborted if any HBase table operation fails. Each RPC will be attempted - * 3600 times before exiting. This provides each operation with 2 hours of retries - * before the server is aborted. + * Make sure that normal HBase Replication Table operations for replication have a high number of + * retries. This is because the server is aborted if any HBase table operation fails. Each RPC will + * be attempted 240 times before exiting. */ - private static final int CLIENT_RETRIES = 3600; - private static final int RPC_TIMEOUT = 2000; - private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT; + private static final int DEFAULT_CLIENT_RETRIES = 240; + private static final int DEFAULT_CLIENT_PAUSE = 5000; + private static final int DEFAULT_RPC_TIMEOUT = 120000; + + /* + * Make sure that the HBase Replication Table initialization has the proper timeouts. Because + * HBase servers can come up a lot sooner than the cluster is ready to create tables and this + * is a one time operation, we can accept longer pauses than normal. + */ + private static final int DEFAULT_INIT_RETRIES = 240; + private static final int DEFAULT_INIT_PAUSE = 60000; + private static final int DEFAULT_INIT_RPC_TIMEOUT = 120000; + + /* + * Used for fast fail table operations. Primarily ReplicationQueues.addLog(), which blocks + * during region opening, but is supposed to fail quickly if Replication is not up yet. Increasing + * these retry values will slow down cluster initialization + */ + private static final int DEFAULT_FAST_FAIL_RETRIES = 1; + private static final int DEFAULT_FAST_FAIL_PAUSE = 0; + private static final int DEFAULT_FAST_FAIL_TIMEOUT = 60000; + + /* + * Determine the polling frequency used to check when the Replication Table comes up. With the + * default options we will poll in intervals of 100 ms forever. + */ + private static final int DEFAULT_WAIT_TABLE_RETRIES = Integer.MAX_VALUE; + private static final int DEFAULT_WAIT_TABLE_PAUSE = 100; // We only need a single thread to initialize the Replication Table private static final int NUM_INITIALIZE_WORKERS = 1; protected final Configuration conf; + protected final Configuration fastFailConf; protected final Abortable abortable; private final Connection connection; + private final Connection fastFailConnection; private final Executor executor; private volatile CountDownLatch replicationTableInitialized; + private int clientRetries; + private int clientPause; + private int rpcTimeout; + private int operationTimeout; + private int initRetries; + private int initPause; + private int initRpcTimeout; + private int initOperationTimeout; + private int fastFailTimeout; + private int fastFailPause; + private int fastFailRetries; + private int fastFailOperationTimeout; + public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException { this.conf = new Configuration(conf); + this.fastFailConf = new Configuration(conf); this.abortable = abort; - decorateConf(); + readTimeoutConf(); + decorateTimeoutConf(); this.connection = ConnectionFactory.createConnection(this.conf); + this.fastFailConnection = ConnectionFactory.createConnection(this.fastFailConf); this.executor = setUpExecutor(); this.replicationTableInitialized = new CountDownLatch(1); createReplicationTableInBackground(); @@ -132,8 +173,48 @@ abstract class ReplicationTableBase { * Modify the connection's config so that operations run on the Replication Table have longer and * a larger number of retries */ - private void decorateConf() { - this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES); + private void readTimeoutConf() { + clientRetries = conf.getInt("hbase.replication.table.client.retries", DEFAULT_CLIENT_RETRIES); + clientPause = conf.getInt("hbase.replication.table.client.pause", DEFAULT_CLIENT_PAUSE); + rpcTimeout = conf.getInt("hbase.replication.table.rpc.timeout", DEFAULT_RPC_TIMEOUT); + initRetries = conf.getInt("hbase.replication.table.init.retries", DEFAULT_INIT_RETRIES); + initPause = conf.getInt("hbase.replication.table.init.pause", DEFAULT_INIT_PAUSE); + initRpcTimeout = conf.getInt("hbase.replication.table.init.timeout", DEFAULT_INIT_RPC_TIMEOUT); + fastFailTimeout= conf.getInt("hbase.replication.table.fastfail.timeout", + DEFAULT_FAST_FAIL_TIMEOUT); + fastFailRetries = conf.getInt("hbase.replication.table.fastfail.retries", + DEFAULT_FAST_FAIL_RETRIES); + fastFailPause = conf.getInt("hbase.replication.table.fastfail.pause", DEFAULT_FAST_FAIL_PAUSE); + fastFailOperationTimeout = getOperationTimeout(fastFailRetries, fastFailPause, fastFailTimeout); + operationTimeout = getOperationTimeout(clientRetries, clientPause, rpcTimeout); + initOperationTimeout = getOperationTimeout(initRetries, initPause, initRpcTimeout); + } + + /** + * Calculate the operation timeout for a retried RPC request + * @param retries times we retry a request + * @param pause pause between failed requests + * @param rpcTimeout timeout of a RPC request + * @return the operation timeout for a retried RPC request + */ + private int getOperationTimeout(int retries, int pause, int rpcTimeout) { + return retries * (pause + rpcTimeout); + } + + /** + * Set up the configuration values for normal Replication Table operations + */ + private void decorateTimeoutConf() { + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, clientPause); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, clientRetries); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); + conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, operationTimeout); + conf.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, operationTimeout); + fastFailConf.setInt(HConstants.HBASE_CLIENT_PAUSE, fastFailPause); + fastFailConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, fastFailRetries); + fastFailConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, fastFailTimeout); + fastFailConf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, fastFailOperationTimeout); + fastFailConf.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, fastFailOperationTimeout); } /** @@ -159,15 +240,6 @@ abstract class ReplicationTableBase { } /** - * Increases the RPC and operations timeouts for the Replication Table - */ - private Table setReplicationTableTimeOuts(Table replicationTable) { - replicationTable.setRpcTimeout(RPC_TIMEOUT); - replicationTable.setOperationTimeout(OPERATION_TIMEOUT); - return replicationTable; - } - - /** * Build the row key for the given queueId. This will uniquely identify it from all other queues * in the cluster. * @param serverName The owner of the queue @@ -217,11 +289,20 @@ abstract class ReplicationTableBase { } /** + * Blocks until the Replication Table is available + * + * @throws InterruptedException + */ + public void blockUntilReplicationIsAvailable() throws InterruptedException { + replicationTableInitialized.await(); + } + + /** * Get a list of all region servers that have outstanding replication queues. These servers could * be alive, dead or from a previous run of the cluster. * @return a list of server names */ - protected List getListOfReplicators() { + protected List getListOfReplicators() throws ReplicationException{ // scan all of the queues and return a list of all unique OWNER values Set peerServers = new HashSet(); ResultScanner allQueuesInCluster = null; @@ -233,8 +314,7 @@ abstract class ReplicationTableBase { peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER))); } } catch (IOException e) { - String errMsg = "Failed getting list of replicators"; - abortable.abort(errMsg, e); + throw new ReplicationException(e); } finally { if (allQueuesInCluster != null) { allQueuesInCluster.close(); @@ -351,7 +431,29 @@ abstract class ReplicationTableBase { e.getMessage(); throw new InterruptedIOException(errMsg); } - return getAndSetUpReplicationTable(); + return getAndSetUpReplicationTable(connection); + } + + /** + * Attempts to acquire the Replication Table. This operation will immediately throw an exception + * if the Replication Table is not up yet + * + * @return the Replication Table + * @throws IOException + */ + protected Table getOrFailFastReplication() throws IOException { + if (replicationTableInitialized.getCount() != 0) { + throw new IOException("getOrFailFastReplication() failed because replication is not " + + "available yet"); + } + try { + replicationTableInitialized.await(); + } catch (InterruptedException e) { + String errMsg = "Unable to acquire the Replication Table due to InterruptedException: " + + e.getMessage(); + throw new InterruptedIOException(errMsg); + } + return getAndSetUpReplicationTable(fastFailConnection); } /** @@ -360,13 +462,37 @@ abstract class ReplicationTableBase { * @return the Replication Table * @throws IOException */ - private Table getAndSetUpReplicationTable() throws IOException { + private Table getAndSetUpReplicationTable(Connection connection) throws IOException { Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME); setReplicationTableTimeOuts(replicationTable); return replicationTable; } /** + * Increases the RPC and operations timeouts for the Replication Table + */ + private Table setReplicationTableTimeOuts(Table replicationTable) { + replicationTable.setRpcTimeout(rpcTimeout); + replicationTable.setOperationTimeout(operationTimeout); + return replicationTable; + } + + /* + * Checks whether the Replication Table exists yet + * + * @return whether the Replication Table exists + * @throws IOException + */ + private boolean replicationTableAvailable() { + try (Admin tempAdmin = connection.getAdmin()){ + return tempAdmin.tableExists(REPLICATION_TABLE_NAME) && + tempAdmin.isTableAvailable(REPLICATION_TABLE_NAME); + } catch (IOException e) { + return false; + } + } + + /** * Builds the Replication Table in a background thread. Any method accessing the Replication Table * should do so through getOrBlockOnReplicationTable() * @@ -383,20 +509,24 @@ abstract class ReplicationTableBase { */ private class CreateReplicationTableWorker implements Runnable { - private Admin admin; + private Configuration initConf; + private Connection initConnection; + private Admin initAdmin; @Override public void run() { try { - admin = connection.getAdmin(); - if (!replicationTableExists()) { - createReplicationTable(); - } - int maxRetries = conf.getInt("hbase.replication.queues.createtable.retries.number", - CLIENT_RETRIES); - RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, RPC_TIMEOUT); + initConf = buildTableInitConf(); + initConnection = ConnectionFactory.createConnection(initConf); + initAdmin = initConnection.getAdmin(); + createReplicationTable(); + int maxRetries = conf.getInt("hbase.replication.waittable.retries", + DEFAULT_WAIT_TABLE_RETRIES); + int pause = conf.getInt("hbase.replication.waittable.retries.pause", + DEFAULT_WAIT_TABLE_PAUSE); + RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, pause); RetryCounter retryCounter = counterFactory.create(); - while (!replicationTableExists()) { + while (!replicationTableAvailable()) { retryCounter.sleepUntilNextRetry(); if (!retryCounter.shouldRetry()) { throw new IOException("Unable to acquire the Replication Table"); @@ -418,24 +548,28 @@ abstract class ReplicationTableBase { HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME); replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR); try { - admin.createTable(replicationTableDescriptor); + if (initAdmin.tableExists(REPLICATION_TABLE_NAME)) { + return; + } + } catch (Exception e) { + // If this tableExists is called too early admin will throw a null pointer exception. In + // this case proceed to create the Replication Table as we normally would. + } + try { + initAdmin.createTableAsync(replicationTableDescriptor, null); } catch (TableExistsException e) { // In this case we can just continue as normal } } - /** - * Checks whether the Replication Table exists yet - * - * @return whether the Replication Table exists - * @throws IOException - */ - private boolean replicationTableExists() { - try { - return admin.tableExists(REPLICATION_TABLE_NAME); - } catch (IOException e) { - return false; - } + private Configuration buildTableInitConf() { + Configuration initConfiguration = new Configuration(conf); + initConfiguration.setInt(HConstants.HBASE_CLIENT_PAUSE, initPause); + initConfiguration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, initRetries); + initConfiguration.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, initRpcTimeout); + initConfiguration.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, initOperationTimeout); + initConfiguration.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, initOperationTimeout); + return initConfiguration; } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java index dcbed7a..130827b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java @@ -56,7 +56,7 @@ public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase } @Override - public List getListOfReplicators() { + public List getListOfReplicators() throws ReplicationException { return super.getListOfReplicators(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java index 3ee6fde..8ab05d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java @@ -87,7 +87,7 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase } @Override - public List getListOfReplicators() { + public List getListOfReplicators() throws ReplicationException { return super.getListOfReplicators(); } @@ -108,28 +108,53 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase } } + // addLog() will throw a ReplicationException immediately if the Replication Table is not up. It + // will not wait and block on Replication to come up like the other methods @Override public void addLog(String queueId, String filename) throws ReplicationException { try (Table replicationTable = getOrBlockOnReplicationTable()) { - if (!checkQueueExists(queueId)) { - // Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values - Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId))); - putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes); - putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, EMPTY_STRING_BYTES); - putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); - replicationTable.put(putNewQueue); - } else { - // Otherwise simply add the new log and offset as a new column - Put putNewLog = new Put(queueIdToRowKey(queueId)); - putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); - safeQueueUpdate(putNewLog); - } + addLog(queueId, filename, replicationTable, checkQueueExists(queueId)); } catch (IOException | ReplicationException e) { String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename; - abortable.abort(errMsg, e); + throw new ReplicationException(errMsg, e); + } + } + + @Override + public void addLogFailFast(String queueId, String filename) throws ReplicationException { + // The following line will throw an exception if it fails to read Replication Table with the + // fastFail config options + try (Table replicationTable = getOrFailFastReplication()) { + addLog(queueId, filename, replicationTable, checkQueueExistsFailFast(queueId)); + } catch (IOException | ReplicationException e) { + String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename; + throw new ReplicationException(errMsg, e); + } + } + + private void addLog(String queueId, String filename, Table replicationTable, boolean queueExists) + throws IOException, ReplicationException{ + if (!queueExists) { + // Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values + Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId))); + putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes); + putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, EMPTY_STRING_BYTES); + putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); + replicationTable.put(putNewQueue); + } else { + // Otherwise simply add the new log and offset as a new column + Put putNewLog = new Put(queueIdToRowKey(queueId)); + putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); + boolean updateSuccess = replicationTable.checkAndPut(putNewLog.getRow(), + CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, serverNameBytes, putNewLog); + if (!updateSuccess) { + throw new ReplicationException("Failed to add new log queueId=" + queueId + + " filename=" + filename); + } } } + @Override public void removeLog(String queueId, String filename) { try { @@ -181,7 +206,7 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase return Bytes.toLong(result.getValue(CF_QUEUE, Bytes.toBytes(filename))); } catch (IOException e) { throw new ReplicationException("Could not get position in log for queueId=" + queueId + - ", filename=" + filename); + ", filename=" + filename, e); } } @@ -351,15 +376,23 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase } } + private boolean checkQueueExists(String queueId) throws IOException{ + try (Table replicationTable = getOrBlockOnReplicationTable()) { + byte[] rowKey = queueIdToRowKey(queueId); + return replicationTable.exists(new Get(rowKey)); + } + } + /** - * Check if the queue specified by queueId is stored in HBase + * Check if the queue specified by queueId is stored in HBase. This method will throw an exception + * if it fails to read the Replication Table after the options set in fastFailConfig * * @param queueId Either raw or reclaimed format of the queueId * @return Whether the queue is stored in HBase * @throws IOException */ - private boolean checkQueueExists(String queueId) throws IOException { - try (Table replicationTable = getOrBlockOnReplicationTable()) { + private boolean checkQueueExistsFailFast(String queueId) throws IOException { + try (Table replicationTable = getOrFailFastReplication()) { byte[] rowKey = queueIdToRowKey(queueId); return replicationTable.exists(new Get(rowKey)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index e03993f..364a05a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1899,11 +1899,18 @@ public class HRegionServer extends HasThread implements } else { byte[] namespace = regionInfo.getTable().getNamespace(); wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes(), namespace); + if (getReplicationSourceService() != null && checkReplication(regionInfo)) { + getReplicationSourceService().registerWal(wal); + } } roller.addWAL(wal); return wal; } + public boolean checkReplication(HRegionInfo regionInfo) throws IOException{ + return tableDescriptors.get(regionInfo.getTable()).checkAnyReplicatedFamilies(); + } + @Override public Connection getConnection() { return getClusterConnection(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index 13b502b..381aaf0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.wal.WAL; + +import java.io.IOException; /** * A source for a replication stream has to expose this service. @@ -29,8 +32,10 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @InterfaceAudience.Private public interface ReplicationSourceService extends ReplicationService { /** - * Returns a WALObserver for the service. This is needed to + * Returns a WALObserver for the service. This is needed to * observe log rolls and log archival events. */ WALActionsListener getWALActionsListener(); + + void registerWal(WAL wal) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 3aafc23..d04533a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -422,6 +422,14 @@ public abstract class AbstractFSWAL implements WAL { return rollWriter(false); } + public void lockRollWriter() { + rollWriterLock.lock(); + } + + public void unlockRollWriter() { + rollWriterLock.unlock(); + } + /** * This is a convenience method that computes a new filename with a given file-number. * @param filenum to use @@ -893,6 +901,10 @@ public abstract class AbstractFSWAL implements WAL { } } + public String getWalPrefix() { + return walFilePrefix; + } + @Override public String toString() { return getClass().getSimpleName() + " " + walFilePrefix + ":" + walFileSuffix + "(num " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index cdf5757..54afd4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -282,8 +282,7 @@ public class FSHLog extends AbstractFSWAL { LOG.warn("pre-sync failed but an optimization so keep going", e); } } - - /** + /* * This method allows subclasses to inject different writers without having to extend other * methods like rollWriter(). * @return Writer instance diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 4f518bb..ad9fb92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.zookeeper.KeeperException; @@ -167,6 +168,7 @@ public class Replication extends WALActionsListener.Base implements public WALActionsListener getWALActionsListener() { return this; } + /** * Stops replication service. */ @@ -306,6 +308,11 @@ public class Replication extends WALActionsListener.Base implements getReplicationManager().postLogRoll(newPath); } + @Override + public void registerWal(WAL wal) throws IOException { + replicationManager.registerWal(wal); + } + /** * This method modifies the master's configuration in order to inject replication-related features * @param conf diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 586aace..d81532a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -64,6 +64,9 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.hadoop.hbase.wal.WAL; /** * This class is responsible to manage all the replication @@ -117,6 +120,7 @@ public class ReplicationSourceManager implements ReplicationListener { private final Random rand; private final boolean replicationForBulkLoadDataEnabled; + private Set registeredWALs; /** * Creates a replication manager and sets the watch on all the other registered region servers @@ -170,6 +174,7 @@ public class ReplicationSourceManager implements ReplicationListener { replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); + this.registeredWALs = Collections.synchronizedSet(new HashSet()); } /** @@ -351,9 +356,17 @@ public class ReplicationSourceManager implements ReplicationListener { } void preLogRoll(Path newLog) throws IOException { - recordLog(newLog); String logName = newLog.getName(); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); + if (registeredWALs.contains(logPrefix)) { + recordLogAndLatestPath(newLog, false); + } + } + + private void recordLogAndLatestPath(Path newLog, boolean failFast) throws IOException { + String logName = newLog.getName(); + String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); + recordLog(newLog, failFast); synchronized (latestPaths) { Iterator iterator = latestPaths.iterator(); while (iterator.hasNext()) { @@ -373,7 +386,7 @@ public class ReplicationSourceManager implements ReplicationListener { * @param logPath the log path to check and enqueue * @throws IOException */ - private void recordLog(Path logPath) throws IOException { + private void recordLog(Path logPath, boolean failFast) throws IOException { String logName = logPath.getName(); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); // update replication queues on ZK @@ -381,7 +394,11 @@ public class ReplicationSourceManager implements ReplicationListener { synchronized (replicationPeers) { for (String id : replicationPeers.getConnectedPeerIds()) { try { - this.replicationQueues.addLog(id, logName); + if (failFast) { + this.replicationQueues.addLogFailFast(id, logName); + } else { + this.replicationQueues.addLog(id, logName); + } } catch (ReplicationException e) { throw new IOException("Cannot add log to replication queue" + " when creating a new source, queueId=" + id + ", filename=" + logName, e); @@ -419,11 +436,38 @@ public class ReplicationSourceManager implements ReplicationListener { void postLogRoll(Path newLog) throws IOException { // This only updates the sources we own, not the recovered ones + String logName = newLog.getName(); + String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); + if (registeredWALs.contains(logPrefix)) { + enqueueNewLog(newLog); + } + } + + void enqueueNewLog(Path newLog) { for (ReplicationSourceInterface source : this.sources) { source.enqueueLog(newLog); } } + void registerWal(WAL wal) throws IOException { + try { + wal.lockRollWriter(); + if (registeredWALs.contains(AbstractFSWALProvider.getWalFilePrefix(wal))) { + return; + } + // Perform the prelog and postlog roll actions + // We have to lock on the rollwriter right here + recordLogAndLatestPath(AbstractFSWALProvider.getCurrentFileName(wal), true); + enqueueNewLog(AbstractFSWALProvider.getCurrentFileName(wal)); + // recordLogAndLatestPath will throw an exception if it fails and the WAL will not be registered + registeredWALs.add(AbstractFSWALProvider.getWalFilePrefix(wal)); + } finally { + wal.unlockRollWriter(); + } + } + + + /** * Factory method to create a replication source * @param conf the configuration to use @@ -723,7 +767,27 @@ public class ReplicationSourceManager implements ReplicationListener { @Override public void run() { - List currentReplicators = replicationQueues.getListOfReplicators(); + List currentReplicators = null; + RetryCounterFactory retryCounterFactory = new RetryCounterFactory(Integer.MAX_VALUE, 1000); + RetryCounter retryCounter = retryCounterFactory.create(); + while (currentReplicators == null) { + try { + // Table based replication will throw an exception if the Replication Table is not up yet. + // In that case we just sleep and retry later. + currentReplicators = replicationQueues.getListOfReplicators(); + } catch (ReplicationException e) { + try { + LOG.warn("AdoptAbandonedQueuesWorker failed to get list of replicators retrying. " + + "retries=" + retryCounter.getAttemptTimes()); + retryCounter.useRetry(); + retryCounter.sleepUntilNextRetry(); + } catch (InterruptedException ie) { + LOG.error("AdoptAbandonedQueuesWorker received an InterruptedException while sleeping" + + " between retries. No queues were adopted."); + return; + } + } + } if (currentReplicators == null || currentReplicators.size() == 0) { return; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java index 8d66c8f..2dcc0f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; @@ -114,8 +115,8 @@ public class ReplicationChecker { } } } - } catch (KeeperException ke) { - throw new IOException(ke); + } catch (KeeperException | ReplicationException e) { + throw new IOException(e); } checkUnDeletedHFileRefsQueues(peerIds); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index e495e99..6a2e7d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -181,6 +181,10 @@ public abstract class AbstractFSWALProvider> implemen return ((AbstractFSWAL) wal).getCurrentFileName(); } + public static String getWalFilePrefix(final WAL wal) { + return ((AbstractFSWAL) wal).getWalPrefix(); + } + /** * request a log roll, but don't actually do it. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index b5ddd00..b43b271 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -153,6 +153,16 @@ class DisabledWALProvider implements WALProvider { } @Override + public void lockRollWriter() { + // no-op + } + + @Override + public void unlockRollWriter() { + // no-op + } + + @Override public long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException { if (!this.listeners.isEmpty()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index af63b0b..e3ead79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -99,6 +99,10 @@ public interface WAL { */ void close() throws IOException; + void lockRollWriter(); + + void unlockRollWriter(); + /** * Append a set of edits to the WAL. The WAL is not flushed/sync'd after this transaction * completes BUT on return this edit must have its region edit/sequence id assigned diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java index 7ec6df8..b0ed89a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java @@ -85,6 +85,8 @@ public class TestReplicationStateHBaseImpl { TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class", TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); + // Set the table initialization pause lower to speed up the test + conf.setInt("hbase.replication.table.init.pause", 100); utility.startMiniCluster(); zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); @@ -108,6 +110,9 @@ public class TestReplicationStateHBaseImpl { rq3.init(server3); rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); rp.init(); + ((TableBasedReplicationQueuesImpl) rq1).blockUntilReplicationIsAvailable(); + ((TableBasedReplicationQueuesImpl) rq2).blockUntilReplicationIsAvailable(); + ((TableBasedReplicationQueuesImpl) rq3).blockUntilReplicationIsAvailable(); } catch (Exception e) { fail("testReplicationStateHBaseConstruction received an exception" + e.getMessage()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java index 83fdad7..06d08aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java @@ -31,6 +31,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests ReplicationTableBase behavior when the Master startup is delayed. The table initialization @@ -58,6 +59,8 @@ public class TestReplicationTableBase { TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class", TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); + // Set the table initialization pause lower to speed up the test + conf.setInt("hbase.replication.table.init.pause", 100); zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); utility.waitFor(0, TIME_OUT_MILLIS, new Waiter.ExplainingPredicate() { @Override @@ -79,8 +82,7 @@ public class TestReplicationTableBase { final RequestReplicationQueueData async = new RequestReplicationQueueData(); async.start(); Thread.sleep(SLEEP_MILLIS); - // Test that the Replication Table has not been assigned and the methods are blocking - assertFalse(rb.getInitializationStatus()); + // Check that the replication table operation is indeed blocking assertFalse(asyncRequestSuccess); utility.startMiniCluster(); // Test that the methods do return the correct results after getting the table @@ -102,8 +104,12 @@ public class TestReplicationTableBase { public class RequestReplicationQueueData extends Thread { @Override public void run() { - assertEquals(0, rq.getListOfReplicators().size()); - asyncRequestSuccess = true; + try { + assertEquals(0, rq.getListOfReplicators().size()); + asyncRequestSuccess = true; + } catch (ReplicationException e) { + fail(e.getMessage()); + } } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 7696e95..8d9b10a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -225,6 +225,7 @@ public abstract class TestReplicationSourceManager { URLEncoder.encode("regionserver:60020", "UTF8")); final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()); manager.init(); + manager.registerWal(wal); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame")); htd.addFamily(new HColumnDescriptor(f1)); NavigableMap scopes = new TreeMap( @@ -294,6 +295,7 @@ public abstract class TestReplicationSourceManager { ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper())); rq.init(server.getServerName().toString()); + waitUntilReplicationEnabled(rq); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); @@ -335,6 +337,7 @@ public abstract class TestReplicationSourceManager { ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper())); rq.init(server.getServerName().toString()); + waitUntilReplicationEnabled(rq); // populate some znodes in the peer znode SortedSet files = new TreeSet(); String group = "testgroup"; @@ -350,6 +353,7 @@ public abstract class TestReplicationSourceManager { ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, s1.getZooKeeper())); rq1.init(s1.getServerName().toString()); + waitUntilReplicationEnabled(rq); ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); rp1.init(); @@ -366,27 +370,6 @@ public abstract class TestReplicationSourceManager { } @Test - public void testCleanupUnknownPeerZNode() throws Exception { - final Server server = new DummyServer("hostname2.example.org"); - ReplicationQueues rq = ReplicationFactory.getReplicationQueues( - new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper())); - rq.init(server.getServerName().toString()); - // populate some znodes in the peer znode - // add log to an unknown peer - String group = "testgroup"; - rq.addLog("2", group + ".log1"); - rq.addLog("2", group + ".log2"); - - NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName()); - w1.run(); - - // The log of the unknown peer should be removed from zk - for (String peer : manager.getAllQueues()) { - assertTrue(peer.startsWith("1")); - } - } - - @Test public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception { NavigableMap scope = new TreeMap(Bytes.BYTES_COMPARATOR); // 1. Get the bulk load wal edit event @@ -499,7 +482,9 @@ public abstract class TestReplicationSourceManager { return logEdit; } - static class DummyNodeFailoverWorker extends Thread { + abstract void waitUntilReplicationEnabled(ReplicationQueues rq) throws InterruptedException; + + class DummyNodeFailoverWorker extends Thread { private Map> logZnodesMap; Server server; private String deadRsZnode; @@ -512,6 +497,7 @@ public abstract class TestReplicationSourceManager { ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper())); this.rq.init(this.server.getServerName().toString()); + waitUntilReplicationEnabled(rq); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java index 75ed835..a809583 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java @@ -144,4 +144,30 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan s0.stop(""); } + + @Test + public void testCleanupUnknownPeerZNode() throws Exception { + final Server server = new DummyServer("hostname2.example.org"); + ReplicationQueues rq = ReplicationFactory.getReplicationQueues( + new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper())); + rq.init(server.getServerName().toString()); + // populate some znodes in the peer znode + // add log to an unknown peer + String group = "testgroup"; + rq.addLog("2", group + ".log1"); + rq.addLog("2", group + ".log2"); + + ReplicationSourceManager.NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName()); + w1.run(); + + // The log of the unknown peer should be removed from zk + for (String peer : manager.getAllQueues()) { + assertTrue(peer.startsWith("1")); + } + } + + @Override + void waitUntilReplicationEnabled(ReplicationQueues rq) throws InterruptedException { + // no-op + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java index e606257..411bc5b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java @@ -20,16 +20,32 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; +import org.apache.hadoop.hbase.replication.ReplicationTableBase; import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesClientImpl; import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesImpl; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.BeforeClass; +import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.IOException; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.fail; + /** * Tests the ReplicationSourceManager with TableBasedReplicationQueue's and * TableBasedReplicationQueuesClient @@ -43,14 +59,60 @@ public class TestTableBasedReplicationSourceManagerImpl extends TestReplicationS ReplicationSourceDummy.class.getCanonicalName()); conf.setLong("replication.sleep.before.failover", 2000); conf.setInt("replication.source.maxretriesmultiplier", 10); - conf.setClass("hbase.region.replica.replication.replicationQueues.class", TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class", TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); + // Set the table initialization pause lower to speed up the test + conf.setInt("hbase.replication.table.init.pause", 100); utility = new HBaseTestingUtility(conf); utility.startMiniCluster(); setupZkAndReplication(); } + /** + * Test the prelog roll procedure for when Replication is not up. This simulates the cluster + * initialization process. + */ + @Test + public void TestPrelogRoll() throws Exception { + ReplicationPeers peers = replication.getReplicationManager().getReplicationPeers(); + peers.registerPeer("peer", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1")); + peers.peerConnected("peer"); + try { + // Check that the hardcoded WAL name that we use is valid + TableBasedReplicationQueuesImpl rq = new TableBasedReplicationQueuesImpl( + new ReplicationQueuesArguments(conf, zkw, zkw)); + List listeners = new ArrayList<>(); + listeners.add(replication); + waitUntilReplicationEnabled(rq); + utility.getAdmin().disableTable(ReplicationTableBase.REPLICATION_TABLE_NAME); + final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners, + URLEncoder.encode("regionserver:60020", "UTF8")); + try { + WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()); + replication.registerWal(wal); + fail("RegisteringWal should fail while replication is not available"); + } catch (IOException e) { + } + final WAL nonReplicatedWal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()); + utility.getAdmin().enableTable(ReplicationTableBase.REPLICATION_TABLE_NAME); + replication.registerWal(nonReplicatedWal); + utility.getAdmin().disableTable(ReplicationTableBase.REPLICATION_TABLE_NAME); + try { + replication.preLogRoll(null, AbstractFSWALProvider.getCurrentFileName(nonReplicatedWal)); + fail("Prelog roll should have attempted to register the log and thrown an exception"); + } catch (IOException e) { + } + } finally { + utility.getAdmin().enableTable(ReplicationTableBase.REPLICATION_TABLE_NAME); + peers.unregisterPeer("peer"); + peers.peerDisconnected("peer"); + } + } + + @Override + void waitUntilReplicationEnabled(ReplicationQueues rq) throws InterruptedException{ + ((TableBasedReplicationQueuesImpl) rq).blockUntilReplicationIsAvailable(); + } } -- 2.8.0-rc2