From 28b64cb9c7acb2c0a81575bd386d09974d9b1b1e Mon Sep 17 00:00:00 2001 From: Vincent Date: Sat, 17 Dec 2016 22:15:41 -0800 Subject: [PATCH] HBASE-17328 Properly dispose of looped replication peers --- .../regionserver/ReplicationSource.java | 2 ++ .../regionserver/ReplicationSourceManager.java | 12 ++++++++++- .../hbase/replication/TestMasterReplication.java | 25 ++++++++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index a6fe0fb..c988f87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -327,6 +327,8 @@ public class ReplicationSource extends Thread this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " + peerClusterId + " which is not allowed by ReplicationEndpoint:" + replicationEndpoint.getClass().getName(), null, false); + this.manager.closeQueue(this); + return; } LOG.info("Replicating " + clusterId + " -> " + peerClusterId); // start workers diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index fa6f894..478e552 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -530,12 +530,22 @@ public class ReplicationSourceManager implements ReplicationListener { */ public void closeRecoveredQueue(ReplicationSourceInterface src) { LOG.info("Done with the recovered queue " + src.getPeerClusterZnode()); + closeSource(src); + this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode()); + } + + public void closeQueue(ReplicationSourceInterface src) { + LOG.info("Done with the queue " + src.getPeerClusterZnode()); + closeSource(src); + this.walsById.remove(src.getPeerClusterZnode()); + } + + private void closeSource(ReplicationSourceInterface src) { if (src instanceof ReplicationSource) { ((ReplicationSource) src).getSourceMetrics().clear(); } this.oldsources.remove(src); deleteSource(src.getPeerClusterZnode(), false); - this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode()); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index 5b8538b..3be7d9e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.After; import org.junit.Before; @@ -169,6 +170,30 @@ public class TestMasterReplication { } /** + * Tests the replication scenario 0 -> 0 By default + * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the + * ReplicationSource should terminate, and no further logs should get enqueued + */ + @Test(timeout = 300000) + public void testLoopedReplication() throws Exception { + LOG.info("testLoopedReplication"); + startMiniClusters(1); + createTableOnClusters(table); + addPeer("1", 0, 0); + Thread.sleep(SLEEP_TIME); // wait for ReplicationSource to terminate + + Table[] htables = getHTablesOnClusters(tableName); + putAndWait(row, famName, htables[0], htables[0]); + ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher(); + String queuesZnode = ZKUtil.joinZNode(zkw.getZNodePaths().baseZNode, "replication/rs"); + String serverName = + utilities[0].getHBaseCluster().getRegionServer(0).getServerName().toString(); + List listChildrenNoWatch = + ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, serverName)); + assertEquals(0, listChildrenNoWatch.size()); + } + + /** * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of * HFiles to a table in each cluster, checking if it's replicated. */ -- 2.8.4 (Apple Git-73)