commit 7092dc3ec4f29f280ae92eea6e1f01313d998596 Author: Ashu Pachauri Date: Fri Sep 23 16:04:08 2016 -0700 HBASE-16681: Flaky TestReplicationSourceManagerZkImpl Change-Id: I6bf31eb2f3815079d346963ad78045f67e0f44b7 diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index b74a5e6..7614b0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -57,13 +58,16 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; @@ -521,28 +525,47 @@ public class TestReplicationSourceManager { String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); try { DummyServer server = new DummyServer(); - ReplicationQueues rq = + final ReplicationQueues rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), server); rq.init(server.getServerName().toString()); // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface // initialization to throw an exception. - conf.set("replication.replicationsource.implementation", "fakeReplicationSourceImpl"); - ReplicationPeers rp = manager.getReplicationPeers(); + conf.set("replication.replicationsource.implementation", + FailInitializeDummyReplicationSource.class.getName()); + final ReplicationPeers rp = manager.getReplicationPeers(); // Set up the znode and ReplicationPeer for the fake peer rp.addPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase"), null); - rp.peerAdded("FakePeer"); - // Have ReplicationSourceManager add the fake peer. It should fail to initialize a - // ReplicationSourceInterface. - List fakePeers = new ArrayList<>(); - fakePeers.add("FakePeer"); - manager.peerListChanged(fakePeers); - // Create a replication queue for the fake peer - rq.addLog("FakePeer", "FakeFile"); + // Wait for the peer to get created and connected + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (rp.getPeer("FakePeer") != null); + } + }); + + // Make sure that the replication source was not initialized + List sources = manager.getSources(); + for (ReplicationSourceInterface source : sources) { + assertNotEquals("FakePeer", source.getPeerClusterId()); + } + // Removing the peer should remove both the replication queue and the ReplicationPeer manager.removePeer("FakePeer"); assertFalse(rq.getAllQueues().contains("FakePeer")); assertNull(rp.getPeer("FakePeer")); + // Unregister peer, this should remove the peer and clear all queues associated with it + // Need to wait for the ReplicationTracker to pick up the changes and notify listeners. + rp.removePeer("FakePeer"); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + List peers = rp.getAllPeerIds(); + return (!rq.getAllQueues().contains("FakePeer")) + && (rp.getPeer("FakePeer") == null) + && (!peers.contains("FakePeer")); + } + }); } finally { conf.set("replication.replicationsource.implementation", replicationSourceImplName); } @@ -638,6 +661,17 @@ public class TestReplicationSourceManager { } } + static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { + + @Override + public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, + UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics) + throws IOException { + throw new IOException("Failing deliberately"); + } + } + static class DummyServer implements Server { String hostname;