From fceb096b874f84e434cea19855da39da94b88617 Mon Sep 17 00:00:00 2001 From: Ashu Pachauri Date: Fri, 2 Jun 2017 16:00:37 -0700 Subject: [PATCH] HBASE-18192: Replication drops recovered queues on region server shutdown --- .../hadoop/hbase/regionserver/HRegionServer.java | 3 +- .../regionserver/ReplicationSource.java | 43 +++++-- .../hbase/replication/TestReplicationSource.java | 128 ++++++++++++++++++++- 3 files changed, 162 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3329fd689f..d2f0b04d57 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2240,7 +2240,8 @@ public class HRegionServer extends HasThread implements * @return Return the object that implements the replication * source service. */ - ReplicationSourceService getReplicationSourceService() { + @VisibleForTesting + public ReplicationSourceService getReplicationSourceService() { return replicationSourceHandler; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index bf422aae50..65f581a421 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -140,6 +140,12 @@ public class ReplicationSource extends Thread private ReplicationThrottler throttler; private ConcurrentHashMap workerThreads = new ConcurrentHashMap(); + // Hold the state of a replication worker thread + public enum WorkerState { + RUNNING, + STOPPED, + FINISHED // The worker is done processing a recovered queue + } /** * Instantiation method used by region servers @@ -392,7 +398,7 @@ public class ReplicationSource extends Thread this.sourceRunning = false; Collection workers = workerThreads.values(); for (ReplicationSourceWorkerThread worker : workers) { - worker.setWorkerRunning(false); + worker.setWorkerState(WorkerState.STOPPED); worker.interrupt(); } ListenableFuture future = null; @@ -504,11 +510,12 @@ public class ReplicationSource extends Thread private int currentNbOperations = 0; // Current size of data we need to replicate private int currentSize = 0; - // Indicates whether this particular worker is running - private boolean workerRunning = true; + // Current state of the worker thread + private WorkerState state; // Current number of hfiles that we need to replicate private long currentNbHFiles = 0; + public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue queue, ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) { this.walGroupId = walGroupId; @@ -520,6 +527,7 @@ public class ReplicationSource extends Thread @Override public void run() { + setWorkerState(WorkerState.RUNNING); // If this is recovered, the queue is already full and the first log // normally has a position (unless the RS failed between 2 logs) if (this.replicationQueueInfo.isQueueRecovered()) { @@ -653,13 +661,13 @@ public class ReplicationSource extends Thread sleepMultiplier = 1; shipEdits(currentWALisBeingWrittenTo, entries); } - if (replicationQueueInfo.isQueueRecovered()) { + if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) { // use synchronize to make sure one last thread will clean the queue synchronized (workerThreads) { Threads.sleep(100);// wait a short while for other worker thread to fully exit boolean allOtherTaskDone = true; for (ReplicationSourceWorkerThread worker : workerThreads.values()) { - if (!worker.equals(this) && worker.isAlive()) { + if (!worker.equals(this) && worker.getWorkerState() != WorkerState.FINISHED) { allOtherTaskDone = false; break; } @@ -671,6 +679,10 @@ public class ReplicationSource extends Thread } } } + // If the worker exits run loop without finishing it's task, mark it as stopped. + if (state != WorkerState.FINISHED) { + setWorkerState(WorkerState.STOPPED); + } } /** @@ -1132,7 +1144,7 @@ public class ReplicationSource extends Thread LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " + peerClusterZnode); metrics.incrCompletedRecoveryQueue(); - workerRunning = false; + setWorkerState(WorkerState.FINISHED); return true; } return false; @@ -1163,7 +1175,7 @@ public class ReplicationSource extends Thread } private boolean isWorkerActive() { - return !stopper.isStopped() && workerRunning && !isInterrupted(); + return !stopper.isStopped() && state == WorkerState.RUNNING && !isInterrupted(); } private void terminate(String reason, Exception cause) { @@ -1174,13 +1186,26 @@ public class ReplicationSource extends Thread LOG.error("Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason, cause); } + setWorkerState(WorkerState.STOPPED); this.interrupt(); Threads.shutdown(this, sleepForRetries); LOG.info("ReplicationSourceWorker " + this.getName() + " terminated"); } - public void setWorkerRunning(boolean workerRunning) { - this.workerRunning = workerRunning; + /** + * Set the worker state + * @param state + */ + public void setWorkerState(WorkerState state) { + this.state = state; + } + + /** + * Get the current state of this worker. + * @return WorkerState + */ + public WorkerState getWorkerState() { + return state; } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index 9bf0e9338c..8c597fa82c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -18,9 +18,11 @@ */ package org.apache.hadoop.hbase.replication; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -30,6 +32,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.Predicate; @@ -37,6 +41,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALProvider; @@ -46,7 +54,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; + +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -60,10 +69,12 @@ public class TestReplicationSource { LogFactory.getLog(TestReplicationSource.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static HBaseTestingUtility TEST_UTIL_PEER = + new HBaseTestingUtility(); private static FileSystem FS; private static Path oldLogDir; private static Path logDir; - private static Configuration conf = HBaseConfiguration.create(); + private static Configuration conf = TEST_UTIL.getConfiguration(); /** * @throws java.lang.Exception @@ -79,6 +90,13 @@ public class TestReplicationSource { if (FS.exists(logDir)) FS.delete(logDir, true); } + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL_PEER.shutdownMiniHBaseCluster(); + TEST_UTIL.shutdownMiniHBaseCluster(); + TEST_UTIL.shutdownMiniDFSCluster(); + } + /** * Sanity check that we can move logs around while we are reading * from them. Should this test fail, ReplicationSource would have a hard @@ -165,5 +183,111 @@ public class TestReplicationSource { } + /** + * Tests that recovered queues are preserved on a regionserver shutdown. + * See HBASE-18192 + * @throws Exception + */ + @Test + public void testServerShutdownRecoveredQueue() throws Exception { + try { + // Ensure single-threaded WAL + conf.set("hbase.wal.provider", "defaultProvider"); + conf.setInt("replication.sleep.before.failover", 2000); + // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in. + conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName()); + MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2); + TEST_UTIL_PEER.startMiniCluster(1); + + HRegionServer serverA = cluster.getRegionServer(0); + final ReplicationSourceManager managerA = + ((Replication) serverA.getReplicationSourceService()).getReplicationManager(); + HRegionServer serverB = cluster.getRegionServer(1); + final ReplicationSourceManager managerB = + ((Replication) serverB.getReplicationSourceService()).getReplicationManager(); + final ReplicationAdmin replicationAdmin = new ReplicationAdmin(TEST_UTIL.getConfiguration()); + + final String peerId = "TestPeer"; + replicationAdmin.addPeer(peerId, + new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()), null); + // Wait for replication sources to come up + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty()); + } + }); + // Disabling peer makes sure there is at least one log to claim when the server dies + // The recovered queue will also stay there until the peer is disabled even if the + // WALs it contains have no data. + replicationAdmin.disablePeer(peerId); + + // Stopping serverA + // It's queues should be claimed by the only other alive server i.e. serverB + cluster.stopRegionServer(serverA.getServerName()); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return managerB.getOldSources().size() == 1; + } + }); + + final HRegionServer serverC = cluster.startRegionServer().getRegionServer(); + serverC.waitForServerOnline(); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return serverC.getReplicationSourceService() != null; + } + }); + final ReplicationSourceManager managerC = + ((Replication) serverC.getReplicationSourceService()).getReplicationManager(); + // Sanity check + assertEquals(0, managerC.getOldSources().size()); + + // Stopping serverB + // Now serverC should have two recovered queues: + // 1. The serverB's normal queue + // 2. serverA's recovered queue on serverB + cluster.stopRegionServer(serverB.getServerName()); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return managerC.getOldSources().size() == 2; + } + }); + replicationAdmin.enablePeer(peerId); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override public boolean evaluate() throws Exception { + return managerC.getOldSources().size() == 0; + } + }); + } finally { + conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName()); + } + } + + /** + * Regionserver implementation that adds a delay on the graceful shutdown. + */ + public static class ShutdownDelayRegionServer extends HRegionServer { + public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException { + super(conf); + } + + public ShutdownDelayRegionServer(Configuration conf, CoordinatedStateManager csm) + throws IOException, InterruptedException { + super(conf, csm); + } + + @Override + protected void stopServiceThreads() { + // Add a delay before service threads are shutdown. + // This will keep the zookeeper connection alive for the duration of the delay. + LOG.info("Adding a delay to the regionserver shutdown"); + try { + Thread.sleep(2000); + } catch (InterruptedException ex) { + LOG.error("Interrupted while sleeping"); + } + super.stopServiceThreads(); + } + } } -- 2.13.0