From f1519e4ff443b60c9e51caac69e860454c90a0df Mon Sep 17 00:00:00 2001 From: Vincent Date: Mon, 19 Dec 2016 13:01:44 -0800 Subject: [PATCH] HBASE-17328 Properly dispose of looped replication peers --- .../regionserver/ReplicationSource.java | 2 ++ .../regionserver/ReplicationSourceManager.java | 12 ++++++++++- .../hbase/replication/TestMasterReplication.java | 25 ++++++++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 3049d9b..f7dd446 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -273,6 +273,8 @@ public class ReplicationSource extends Thread this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " + peerClusterId + " which is not allowed by ReplicationEndpoint:" + replicationEndpoint.getClass().getName(), null, false); + this.manager.closeQueue(this); + return; } LOG.info("Replicating "+clusterId + " -> " + peerClusterId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 9062b87..18d8345 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -459,12 +459,22 @@ public class ReplicationSourceManager implements ReplicationListener { */ public void closeRecoveredQueue(ReplicationSourceInterface src) { LOG.info("Done with the recovered queue " + src.getPeerClusterZnode()); + closeSource(src); + this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode()); + } + + public void closeQueue(ReplicationSourceInterface src) { + LOG.info("Done with the queue " + src.getPeerClusterZnode()); + closeSource(src); + this.walsById.remove(src.getPeerClusterZnode()); + } + + private void closeSource(ReplicationSourceInterface src) { if (src instanceof ReplicationSource) { ((ReplicationSource) src).getSourceMetrics().clear(); } this.oldsources.remove(src); deleteSource(src.getPeerClusterZnode(), false); - this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode()); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index 0932bf2..4408962 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.After; import org.junit.Before; @@ -250,6 +251,30 @@ public class TestMasterReplication { } } + /** + * Tests the replication scenario 0 -> 0 By default + * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the + * ReplicationSource should terminate, and no further logs should get enqueued + */ + @Test(timeout = 300000) + public void testLoopedReplication() throws Exception { + LOG.info("testLoopedReplication"); + startMiniClusters(1); + createTableOnClusters(table); + addPeer("1", 0, 0); + Thread.sleep(SLEEP_TIME); // wait for ReplicationSource to terminate + + Table[] htables = getHTablesOnClusters(tableName); + putAndWait(row, famName, htables[0], htables[0]); + ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher(); + String queuesZnode = ZKUtil.joinZNode(zkw.baseZNode, "replication/rs"); + String serverName = + utilities[0].getHBaseCluster().getRegionServer(0).getServerName().toString(); + List listChildrenNoWatch = + ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, serverName)); + assertEquals(0, listChildrenNoWatch.size()); + } + /* * Test RSRpcServices#replicateWALEntry when replication is disabled. This is to simulate * HBASE-14840 -- 2.7.4