From 4560838e3313724e179a9c70c8fa4395fa33be94 Mon Sep 17 00:00:00 2001 From: Ben Lau Date: Wed, 14 Feb 2018 11:36:04 -0800 Subject: [PATCH] HBASE-18282 ReplicationLogCleaner can delete WALs not yet replicated in case of a KeeperException --- .../hbase/replication/ReplicationQueues.java | 3 +- .../replication/ReplicationQueuesClientZKImpl.java | 5 ++ .../hbase/replication/ReplicationQueuesZKImpl.java | 10 ++++ .../hbase/replication/ReplicationStateZKBase.java | 8 +++- .../cleaner/ReplicationZKLockCleanerChore.java | 4 +- .../replication/master/ReplicationLogCleaner.java | 10 +++- .../hbase/master/cleaner/TestLogsCleaner.java | 54 +++++++++++++++++----- 7 files changed, 79 insertions(+), 15 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index 2409111939..ccc71723b6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -126,8 +126,9 @@ public interface ReplicationQueues { * Get a list of all region servers that have outstanding replication queues. These servers could * be alive, dead or from a previous run of the cluster. * @return a list of server names + * @throws ReplicationException */ - List getListOfReplicators(); + List getListOfReplicators() throws ReplicationException; /** * Checks if the provided znode is the same as this region server's diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java index 9078e4026c..14b43348ec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -48,6 +48,11 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem } } + @Override + public List getListOfReplicators() throws KeeperException { + return super.getListOfReplicatorsZK(); + } + @Override public List getLogsInQueue(String serverName, String queueId) throws KeeperException { String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index a1bd82962c..dda9adf7cc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -102,6 +102,16 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } } + @Override + public List getListOfReplicators() throws ReplicationException { + try { + return super.getListOfReplicatorsZK(); + } catch (KeeperException e) { + LOG.warn("getListOfReplicators() from ZK failed", e); + throw new ReplicationException("getListOfReplicators() from ZK failed", e); + } + } + @Override public void removeQueue(String queueId) { try { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index d0c3513f68..0713b99c3a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -90,12 +90,18 @@ public abstract class ReplicationStateZKBase { this.hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName); } - public List getListOfReplicators() { + /** + * Subclasses that use ZK explicitly can just call this directly while classes + * that are trying to hide internal details of storage can wrap the KeeperException + * into a ReplicationException or something else. + */ + protected List getListOfReplicatorsZK() throws KeeperException { List result = null; try { result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode); } catch (KeeperException e) { this.abortable.abort("Failed to get list of replicators", e); + throw e; } return result; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java index 3fa30bf6e4..7c50719a72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; import org.apache.hadoop.hbase.replication.ReplicationTracker; @@ -108,7 +109,8 @@ public class ReplicationZKLockCleanerChore extends ScheduledChore { } } catch (KeeperException e) { LOG.warn("zk operation interrupted", e); + } catch (ReplicationException e2) { + LOG.warn("replication exception", e2); } - } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 7731240cc4..42d66a5836 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -156,6 +156,14 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { } } + @VisibleForTesting + public void setConf(Configuration conf, ZooKeeperWatcher zk, + ReplicationQueuesClient replicationQueuesClient) { + super.setConf(conf); + this.zkw = zk; + this.replicationQueues = replicationQueuesClient; + } + @Override public void stop(String why) { if (this.stopped) return; @@ -171,7 +179,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { return this.stopped; } - private static class WarnOnlyAbortable implements Abortable { + public static class WarnOnlyAbortable implements Abortable { @Override public void abort(String why, Throwable e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 8efa75481b..df5916cc2b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.doAnswer; import java.io.IOException; import java.lang.reflect.Field; @@ -29,6 +30,7 @@ import java.net.URLEncoder; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; @@ -55,12 +57,13 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; @Category(MediumTests.class) public class TestLogsCleaner { @@ -177,13 +180,10 @@ public class TestLogsCleaner { cleaner.getDeletableFiles(new LinkedList()); } - /** - * ReplicationLogCleaner should be able to ride over ZooKeeper errors without - * aborting. - */ - @Test - public void testZooKeeperAbort() throws Exception { + @Test(timeout=10000) + public void testZooKeeperAbortDuringGetListOfReplicators() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); + ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); List dummyFiles = Lists.newArrayList( @@ -193,19 +193,51 @@ public class TestLogsCleaner { FaultyZooKeeperWatcher faultyZK = new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null); + final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false); + try { faultyZK.init(); - cleaner.setConf(conf, faultyZK); + ReplicationQueuesClient replicationQueuesClient = spy(ReplicationFactory.getReplicationQueuesClient( + faultyZK, conf, new ReplicationLogCleaner.WarnOnlyAbortable())); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + try { + return invocation.callRealMethod(); + } catch (KeeperException.ConnectionLossException e) { + getListOfReplicatorsFailed.set(true); + throw e; + } + } + }).when(replicationQueuesClient).getListOfReplicators(); + replicationQueuesClient.init(); + + cleaner.setConf(conf, faultyZK, replicationQueuesClient); // should keep all files due to a ConnectionLossException getting the queues znodes Iterable toDelete = cleaner.getDeletableFiles(dummyFiles); + + assertTrue(getListOfReplicatorsFailed.get()); assertFalse(toDelete.iterator().hasNext()); assertFalse(cleaner.isStopped()); } finally { faultyZK.close(); } + } + + /** + * When zk is working both files should be returned + * @throws Exception + */ + @Test(timeout=10000) + public void testZooKeeperNormal() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); - // when zk is working both files should be returned - cleaner = new ReplicationLogCleaner(); + List dummyFiles = Lists.newArrayList( + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) + ); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null); try { cleaner.setConf(conf, zkw); @@ -291,7 +323,7 @@ public class TestLogsCleaner { public void init() throws Exception { this.zk = spy(super.getRecoverableZooKeeper()); doThrow(new KeeperException.ConnectionLossException()) - .when(zk).getData("/hbase/replication/rs", null, new Stat()); + .when(zk).getChildren("/hbase/replication/rs", null); } public RecoverableZooKeeper getRecoverableZooKeeper() { -- 2.14.3 (Apple Git-98)