Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1456743) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy) @@ -22,7 +22,12 @@ import java.net.URLEncoder; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -94,7 +99,10 @@ private static Path oldLogDir; private static Path logDir; + + private static CountDownLatch latch; + private static List files = new ArrayList(); @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -214,9 +222,108 @@ // TODO Need a case with only 2 HLogs and we only want to delete the first one } + + @Test + public void testNodeFailoverWorkerCopyQueuesFromRSUsingMulti() throws Exception { + LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti"); + conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); + final Server server = new DummyServer("hostname0.example.org"); + AtomicBoolean replicating = new AtomicBoolean(true); + ReplicationZookeeper rz = new ReplicationZookeeper(server, replicating); + // populate some znodes in the peer znode + files.add("log1"); + files.add("log2"); + for (String file : files) { + rz.addLogToList(file, "1"); + } + // create 3 DummyServers + Server s1 = new DummyServer("dummyserver1.example.org"); + Server s2 = new DummyServer("dummyserver2.example.org"); + Server s3 = new DummyServer("dummyserver3.example.org"); + // create 3 DummyNodeFailoverWorkers + DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker( + server.getServerName().getServerName(), s1); + DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker( + server.getServerName().getServerName(), s2); + DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker( + server.getServerName().getServerName(), s3); + + latch = new CountDownLatch(3); + // start the threads + w1.start(); + w2.start(); + w3.start(); + // make sure only one is successful + int populatedMap = 0; + // wait for result now... till all the workers are done. + latch.await(); + populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() + + w3.isLogZnodesMapPopulated(); + assertEquals(1, populatedMap); + // close out the resources. + rz.close(); + server.abort("", null); + } + + static class DummyNodeFailoverWorker extends Thread { + private SortedMap> logZnodesMap; + Server server; + private String deadRsZnode; + ReplicationZookeeper rz; + + public DummyNodeFailoverWorker(String znode, Server s) throws Exception { + this.deadRsZnode = znode; + this.server = s; + rz = new ReplicationZookeeper(server, new AtomicBoolean(true)); + } + + @Override + public void run() { + try { + logZnodesMap = rz.copyQueuesFromRSUsingMulti(deadRsZnode); + rz.close(); + server.abort("Done with testing", null); + } catch (Exception e) { + LOG.error("Got exception while running NodeFailoverWorker", e); + } finally { + latch.countDown(); + } + } + + /** + * @return 1 when the map is not empty. + */ + private int isLogZnodesMapPopulated() { + Collection> sets = logZnodesMap.values(); + if (sets.size() > 1) { + throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size()); + } + if (sets.size() == 1) { + SortedSet s = sets.iterator().next(); + for (String file : files) { + // at least one file was missing + if (!s.contains(file)) { + return 0; + } + } + return 1; // we found all the files + } + return 0; + } + } + static class DummyServer implements Server { + String hostname; + DummyServer() { + hostname = "hostname.example.org"; + } + + DummyServer(String hostname) { + this.hostname = hostname; + } + @Override public Configuration getConfiguration() { return conf; @@ -229,19 +336,19 @@ @Override public CatalogTracker getCatalogTracker() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return null; // To change body of implemented methods use File | Settings | File Templates. } @Override public ServerName getServerName() { - return new ServerName("hostname.example.org", 1234, -1L); + return new ServerName(hostname, 1234, -1L); } @Override public void abort(String why, Throwable e) { - //To change body of implemented methods use File | Settings | File Templates. + // To change body of implemented methods use File | Settings | File Templates. } - + @Override public boolean isAborted() { return false; @@ -249,15 +356,14 @@ @Override public void stop(String why) { - //To change body of implemented methods use File | Settings | File Templates. + // To change body of implemented methods use File | Settings | File Templates. } @Override public boolean isStopped() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; // To change body of implemented methods use File | Settings | File Templates. } } - }