From fa26c7f24954183fdace34d205244c8f33fe23f7 Mon Sep 17 00:00:00 2001 From: Vincent Date: Tue, 20 Dec 2016 17:49:05 -0800 Subject: [PATCH] HBASE-17328 Properly dispose of looped replication peers --- .../regionserver/ReplicationSource.java | 2 + .../regionserver/ReplicationSourceManager.java | 14 +++ .../hbase/replication/TestMasterReplication.java | 140 +++++++++++++++++++++ 3 files changed, 156 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 9b0dfff..1e257db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -268,6 +268,8 @@ public class ReplicationSource extends Thread this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId " + peerClusterId + " which is not allowed by ReplicationEndpoint:" + replicationEndpoint.getClass().getName(), null, false); + this.manager.closeQueue(this); + return; } LOG.info("Replicating "+clusterId + " -> " + peerClusterId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index b9d7807..574666e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -465,6 +465,20 @@ public class ReplicationSourceManager implements ReplicationListener { } /** + * Clear the references to the specified old source + * @param src source to clear + */ + public void closeQueue(ReplicationSourceInterface src) { + LOG.info("Done with the queue " + src.getPeerClusterZnode()); + if (src instanceof ReplicationSource) { + ((ReplicationSource) src).getSourceMetrics().clear(); + } + this.sources.remove(src); + deleteSource(src.getPeerClusterZnode(), true); + this.hlogsById.remove(src.getPeerClusterZnode()); + } + + /** * Thie method first deletes all the recovered sources for the specified * id, then deletes the normal source (deleting all related data in ZK). * @param id The id of the peer cluster diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index b50439a..d4c7918 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -19,24 +19,35 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -49,10 +60,14 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.After; import org.junit.Before; @@ -269,6 +284,46 @@ public class TestMasterReplication { } } + /** + * Tests the replication scenario 0 -> 0. By default + * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, + * so the ReplicationSource should terminate, and no further logs should get + * enqueued + */ + @Test(timeout = 300000) + public void testLoopedReplication() throws Exception { + LOG.info("testLoopedReplication"); + startMiniClusters(1); + createTableOnClusters(table); + addPeer("1", 0, 0); + Thread.sleep(SLEEP_TIME); + + // wait for source to terminate + final ServerName rsName = + utilities[0].getHBaseCluster().getRegionServer(0).getServerName(); + Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + ClusterStatus clusterStatus = + utilities[0].getHBaseAdmin().getClusterStatus(); + ServerLoad serverLoad = clusterStatus.getLoad(rsName); + List replicationLoadSourceList = + serverLoad.getReplicationLoadSourceList(); + return replicationLoadSourceList.size() == 0; + } + }); + + HTable[] htables = getHTablesOnClusters(tableName); + putAndWait(row, famName, htables[0], htables[0]); + rollWALAndWait(utilities[0], table.getTableName(), row); + ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher(); + String queuesZnode = + ZKUtil.joinZNode(zkw.baseZNode, ZKUtil.joinZNode("replication", "rs")); + List listChildrenNoWatch = ZKUtil.listChildrenNoWatch(zkw, + ZKUtil.joinZNode(queuesZnode, rsName.toString())); + assertEquals(0, listChildrenNoWatch.size()); + } + @After public void tearDown() throws IOException { configurations = null; @@ -434,6 +489,91 @@ public class TestMasterReplication { } } + private void rollWALAndWait(final HBaseTestingUtility utility, + final TableName table, final byte[] row) throws IOException { + final HBaseAdmin admin = utility.getHBaseAdmin(); + final MiniHBaseCluster cluster = utility.getMiniHBaseCluster(); + + // find the region that corresponds to the given row. + HRegion region = null; + for (HRegion candidate : cluster.getRegions(table)) { + if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) { + region = candidate; + break; + } + } + assertNotNull( + "Couldn't find the region for row '" + Arrays.toString(row) + "'", + region); + + final CountDownLatch latch = new CountDownLatch(1); + + // listen for successful log rolls + final WALActionsListener listener = new WALActionsListener() { + + @Override + public void preLogRoll(Path oldPath, Path newPath) throws IOException { + + } + + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException { + latch.countDown(); + } + + @Override + public void preLogArchive(Path oldPath, Path newPath) throws IOException { + + } + + @Override + public void postLogArchive(Path oldPath, Path newPath) + throws IOException { + + } + + @Override + public void logRollRequested(boolean tooFewReplicas) { + + } + + @Override + public void logCloseRequested() { + + } + + @Override + public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, + WALEdit logEdit) { + + } + + @Override + public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, + WALEdit logEdit) { + + } + + }; + region.getLog().registerWALActionsListener(listener); + + // request a roll + admin.rollHLogWriter( + cluster.getServerHoldingRegion(region.getRegionInfo().getRegionName()) + .getServerName()); + + // wait + try { + latch.await(); + } catch (InterruptedException exception) { + LOG.warn("Interrupted while waiting for the wal of '" + region + + "' to roll. If later " + + "replication tests fail, it's probably because we should still be waiting."); + Thread.currentThread().interrupt(); + } + region.getLog().unregisterWALActionsListener(listener); + } + /** * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same * timestamp there is otherwise no way to count them. -- 2.7.4