From 75de17c7b1ba690c86b12a21f15faef27b0f9693 Mon Sep 17 00:00:00 2001 From: Ashu Pachauri Date: Fri, 23 Sep 2016 16:04:08 -0700 Subject: [PATCH] HBASE-16681: Flaky TestReplicationSourceManagerZkImpl --- .../regionserver/TestReplicationSourceManager.java | 59 +++++++++++++++++----- 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 2e80b2d..71ff6e0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -56,18 +57,22 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -437,28 +442,45 @@ public abstract class TestReplicationSourceManager { String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); try { DummyServer server = new DummyServer(); - ReplicationQueues rq = + final ReplicationQueues rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments( server.getConfiguration(), server, server.getZooKeeper())); rq.init(server.getServerName().toString()); // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface // initialization to throw an exception. - conf.set("replication.replicationsource.implementation", "fakeReplicationSourceImpl"); - ReplicationPeers rp = manager.getReplicationPeers(); + conf.set("replication.replicationsource.implementation", + FailInitializeDummyReplicationSource.class.getName()); + final ReplicationPeers rp = manager.getReplicationPeers(); // Set up the znode and ReplicationPeer for the fake peer rp.registerPeer("FakePeer", new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase")); - rp.peerConnected("FakePeer"); - // Have ReplicationSourceManager add the fake peer. It should fail to initialize a - // ReplicationSourceInterface. - List fakePeers = new ArrayList<>(); - fakePeers.add("FakePeer"); - manager.peerListChanged(fakePeers); + // Wait for the peer to get created and connected + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (rp.getConnectedPeer("FakePeer") != null); + } + }); + + // Make sure that the replication source was not initialized + List sources = manager.getSources(); + for (ReplicationSourceInterface source : sources) { + assertNotEquals("FakePeer", source.getPeerClusterId()); + } + // Create a replication queue for the fake peer rq.addLog("FakePeer", "FakeFile"); - // Removing the peer should remove both the replication queue and the ReplicationPeer - manager.removePeer("FakePeer"); - assertFalse(rq.getAllQueues().contains("FakePeer")); - assertNull(rp.getConnectedPeer("FakePeer")); + // Unregister peer, this should remove the peer and clear all queues associated with it + // Need to wait for the ReplicationTracker to pick up the changes and notify listeners. + rp.unregisterPeer("FakePeer"); + Waiter.waitFor(conf, 20000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + List peers = rp.getAllPeerIds(); + return (!rq.getAllQueues().contains("FakePeer")) + && (rp.getConnectedPeer("FakePeer") == null) + && (!peers.contains("FakePeer")); + } + }); } finally { conf.set("replication.replicationsource.implementation", replicationSourceImplName); } @@ -553,6 +575,17 @@ public abstract class TestReplicationSourceManager { } } + static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { + + @Override + public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, + UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics) + throws IOException { + throw new IOException("Failing deliberately"); + } + } + static class DummyServer implements Server { String hostname; -- 2.8.0-rc2