diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 20de326..b9ed8dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -110,6 +110,13 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { LOG.error("Error while configuring " + this.getClass().getName(), e); } } + @VisibleForTesting + public void setConf(Configuration conf, ZKWatcher zk, + ReplicationQueueStorage replicationQueueStorage) { + super.setConf(conf); + this.zkw = zk; + this.queueStorage = replicationQueueStorage; + } @Override public void stop(String why) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 2f518c7..5eda2d1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -22,12 +22,15 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.doAnswer; import java.io.IOException; import java.net.URLEncoder; import java.util.Iterator; import java.util.List; import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -46,6 +49,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; @@ -61,6 +65,8 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -195,12 +201,10 @@ public class TestLogsCleaner { } } - /** - * ReplicationLogCleaner should be able to ride over ZooKeeper errors without aborting. - */ - @Test - public void testZooKeeperAbort() throws Exception { + @Test(timeout=10000) + public void testZooKeeperAbortDuringGetListOfReplicators() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); + ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); List dummyFiles = Lists.newArrayList( @@ -208,20 +212,56 @@ public class TestLogsCleaner { new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) ); - try (FaultyZooKeeperWatcher faultyZK = new FaultyZooKeeperWatcher(conf, - "testZooKeeperAbort-faulty", null)) { + FaultyZooKeeperWatcher faultyZK = + new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null); + final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false); + + try { faultyZK.init(); - cleaner.setConf(conf, faultyZK); - cleaner.preClean(); + ReplicationQueueStorage queueStorage = spy(ReplicationStorageFactory + .getReplicationQueueStorage(faultyZK, conf)); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + try { + return invocation.callRealMethod(); + } catch (ReplicationException e) { + LOG.debug("caught " + e); + getListOfReplicatorsFailed.set(true); + throw e; + } + } + }).when(queueStorage).getAllWALs(); + + cleaner.setConf(conf, faultyZK, queueStorage); // should keep all files due to a ConnectionLossException getting the queues znodes + cleaner.preClean(); Iterable toDelete = cleaner.getDeletableFiles(dummyFiles); + + assertTrue(getListOfReplicatorsFailed.get()); assertFalse(toDelete.iterator().hasNext()); assertFalse(cleaner.isStopped()); + } finally { + faultyZK.close(); } + } + + /** + * When zk is working both files should be returned + * @throws Exception + */ + @Test(timeout=10000) + public void testZooKeeperNormal() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); - // when zk is working both files should be returned - cleaner = new ReplicationLogCleaner(); - try (ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null)) { + List dummyFiles = Lists.newArrayList( + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) + ); + + ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null); + try { cleaner.setConf(conf, zkw); cleaner.preClean(); Iterable filesToDelete = cleaner.getDeletableFiles(dummyFiles); @@ -231,6 +271,8 @@ public class TestLogsCleaner { assertTrue(iter.hasNext()); assertEquals(new Path("log2"), iter.next().getPath()); assertFalse(iter.hasNext()); + } finally { + zkw.close(); } } @@ -371,7 +413,7 @@ public class TestLogsCleaner { public void init() throws Exception { this.zk = spy(super.getRecoverableZooKeeper()); doThrow(new KeeperException.ConnectionLossException()) - .when(zk).getData("/hbase/replication/rs", null, new Stat()); + .when(zk).getChildren("/hbase/replication/rs", null); } @Override