From 8cadb4b4240d7721a9bc84490a6157211570d993 Mon Sep 17 00:00:00 2001 From: Joseph Hwang Date: Thu, 30 Jun 2016 15:18:33 -0700 Subject: [PATCH] HBASE-16096 Properly remove the replication queue and peer znodes after calling ReplicationSourceManager.removePeer(). --- .../regionserver/ReplicationSourceManager.java | 15 +++++--- .../regionserver/TestReplicationSourceManager.java | 40 ++++++++++++++++++++++ 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index e2a232f..586aace 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -561,9 +561,10 @@ public class ReplicationSourceManager implements ReplicationListener { srcToRemove.add(src); } } - if (srcToRemove.size() == 0) { - LOG.error("The queue we wanted to close is missing " + id); - return; + if (srcToRemove.isEmpty()) { + LOG.error("The peer we wanted to remove is missing a ReplicationSourceInterface. " + + "This could mean that ReplicationSourceInterface initialization failed for this peer " + + "and that replication on this peer may not be caught up. peerId=" + id); } for (ReplicationSourceInterface toRemove : srcToRemove) { toRemove.terminate(terminateMessage); @@ -739,8 +740,6 @@ public class ReplicationSourceManager implements ReplicationListener { } } - - /** * Get the directory where wals are archived * @return the directory where wals are archived @@ -766,6 +765,12 @@ public class ReplicationSourceManager implements ReplicationListener { } /** + * Get the ReplicationPeers used by this ReplicationSourceManager + * @return the ReplicationPeers used by this ReplicationSourceManager + */ + public ReplicationPeers getReplicationPeers() {return this.replicationPeers;} + + /** * Get a string representation of all the sources' metrics */ public String getStats() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 4442bbb..7696e95 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; @@ -424,6 +425,45 @@ public abstract class TestReplicationSourceManager { scopes.containsKey(f2)); } + /** + * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the + * corresponding ReplicationSourceInterface correctly cleans up the corresponding + * replication queue and ReplicationPeer. + * See HBASE-16096. + * @throws Exception + */ + @Test + public void testPeerRemovalCleanup() throws Exception{ + String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); + try { + DummyServer server = new DummyServer(); + ReplicationQueues rq = + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments( + server.getConfiguration(), server, server.getZooKeeper())); + rq.init(server.getServerName().toString()); + // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface + // initialization to throw an exception. + conf.set("replication.replicationsource.implementation", "fakeReplicationSourceImpl"); + ReplicationPeers rp = manager.getReplicationPeers(); + // Set up the znode and ReplicationPeer for the fake peer + rp.registerPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase")); + rp.peerConnected("FakePeer"); + // Have ReplicationSourceManager add the fake peer. It should fail to initialize a + // ReplicationSourceInterface. + List fakePeers = new ArrayList<>(); + fakePeers.add("FakePeer"); + manager.peerListChanged(fakePeers); + // Create a replication queue for the fake peer + rq.addLog("FakePeer", "FakeFile"); + // Removing the peer should remove both the replication queue and the ReplicationPeer + manager.removePeer("FakePeer"); + assertFalse(rq.getAllQueues().contains("FakePeer")); + assertNull(rp.getConnectedPeer("FakePeer")); + } finally { + conf.set("replication.replicationsource.implementation", replicationSourceImplName); + } + } + private WALEdit getBulkLoadWALEdit(NavigableMap scope) { // 1. Create store files for the families Map> storeFiles = new HashMap<>(1); -- 2.8.0-rc2