From 1cdab7b00a0736f839ec6c7afeca106ae7059269 Mon Sep 17 00:00:00 2001 From: Vincent Date: Tue, 20 Dec 2016 17:20:14 -0800 Subject: [PATCH] HBASE-17328 Properly dispose of looped replication peers --- .../regionserver/ReplicationSource.java | 2 + .../regionserver/ReplicationSourceManager.java | 14 ++++ .../hbase/replication/TestMasterReplication.java | 91 ++++++++++++++++++++++ 3 files changed, 107 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 3049d9b..f7dd446 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -273,6 +273,8 @@ public class ReplicationSource extends Thread this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " + peerClusterId + " which is not allowed by ReplicationEndpoint:" + replicationEndpoint.getClass().getName(), null, false); + this.manager.closeQueue(this); + return; } LOG.info("Replicating "+clusterId + " -> " + peerClusterId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 9062b87..81f06a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -468,6 +468,20 @@ public class ReplicationSourceManager implements ReplicationListener { } /** + * Clear the references to the specified old source + * @param src source to clear + */ + public void closeQueue(ReplicationSourceInterface src) { + LOG.info("Done with the queue " + src.getPeerClusterZnode()); + if (src instanceof ReplicationSource) { + ((ReplicationSource) src).getSourceMetrics().clear(); + } + this.sources.remove(src); + deleteSource(src.getPeerClusterZnode(), true); + this.walsById.remove(src.getPeerClusterZnode()); + } + + /** * Thie method first deletes all the recovered sources for the specified * id, then deletes the normal source (deleting all related data in ZK). * @param id The id of the peer cluster diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index 0932bf2..b373886 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -19,24 +19,34 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -51,11 +61,14 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.After; import org.junit.Before; @@ -273,6 +286,43 @@ public class TestMasterReplication { } } + /** + * Tests the replication scenario 0 -> 0. By default + * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the + * ReplicationSource should terminate, and no further logs should get enqueued + */ + @Test(timeout = 300000) + public void testLoopedReplication() throws Exception { + LOG.info("testLoopedReplication"); + startMiniClusters(1); + createTableOnClusters(table); + addPeer("1", 0, 0); + Thread.sleep(SLEEP_TIME); + + // wait for source to terminate + final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName(); + Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + ClusterStatus clusterStatus = utilities[0].getHBaseAdmin().getClusterStatus(); + ServerLoad serverLoad = clusterStatus.getLoad(rsName); + List replicationLoadSourceList = + serverLoad.getReplicationLoadSourceList(); + return replicationLoadSourceList.size() == 0; + } + }); + + Table[] htables = getHTablesOnClusters(tableName); + putAndWait(row, famName, htables[0], htables[0]); + rollWALAndWait(utilities[0], table.getTableName(), row); + ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher(); + String queuesZnode = + ZKUtil.joinZNode(zkw.baseZNode, ZKUtil.joinZNode("replication", "rs")); + List listChildrenNoWatch = + ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, rsName.toString())); + assertEquals(0, listChildrenNoWatch.size()); + } + @After public void tearDown() throws IOException { configurations = null; @@ -438,6 +488,47 @@ public class TestMasterReplication { } } + private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table, + final byte[] row) throws IOException { + final Admin admin = utility.getHBaseAdmin(); + final MiniHBaseCluster cluster = utility.getMiniHBaseCluster(); + + // find the region that corresponds to the given row. + HRegion region = null; + for (HRegion candidate : cluster.getRegions(table)) { + if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) { + region = candidate; + break; + } + } + assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region); + + final CountDownLatch latch = new CountDownLatch(1); + + // listen for successful log rolls + final WALActionsListener listener = new WALActionsListener.Base() { + @Override + public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { + latch.countDown(); + } + }; + region.getWAL().registerWALActionsListener(listener); + + // request a roll + admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(), + region.getRegionInfo().getRegionName())); + + // wait + try { + latch.await(); + } catch (InterruptedException exception) { + LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " + + "replication tests fail, it's probably because we should still be waiting."); + Thread.currentThread().interrupt(); + } + region.getWAL().unregisterWALActionsListener(listener); + } + /** * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same * timestamp there is otherwise no way to count them. -- 2.7.4