diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index b33e64d..8bd1267 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -125,11 +125,12 @@ public class ReplicationAdmin implements Closeable { try { zkw = createZooKeeperWatcher(); try { - this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection); - this.replicationPeers.init(); this.replicationQueuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection); this.replicationQueuesClient.init(); + this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, + this.replicationQueuesClient, this.connection); + this.replicationPeers.init(); } catch (Exception exception) { if (zkw != null) { zkw.close(); @@ -187,7 +188,7 @@ public class ReplicationAdmin implements Closeable { this.replicationPeers.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs); } - + /** * Add a new remote slave cluster for replication. * @param id a short name that identifies the cluster diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index f115a39..91e77ca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -42,7 +42,12 @@ public class ReplicationFactory { public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { - return new ReplicationPeersZKImpl(zk, conf, abortable); + return getReplicationPeers(zk, conf, null, abortable); + } + + public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf, + final ReplicationQueuesClient queuesClient, Abortable abortable) { + return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable); } public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index a223531..c76a911 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -81,14 +81,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re // Map of peer clusters keyed by their id private Map peerClusters; private final String tableCFsNodeName; + private final ReplicationQueuesClient queuesClient; private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class); public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf, - Abortable abortable) { + final ReplicationQueuesClient queuesClient, Abortable abortable) { super(zk, conf, abortable); this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); this.peerClusters = new ConcurrentHashMap(); + this.queuesClient = queuesClient; } @Override @@ -116,6 +118,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re throw new IllegalArgumentException("Found invalid peer name:" + id); } + checkQueuesDeleted(id); + ZKUtil.createWithParents(this.zookeeper, this.peersZNode); List listOfOps = new ArrayList(); ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id), @@ -561,5 +565,21 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return ProtobufUtil.prependPBMagic(bytes); } - + private void checkQueuesDeleted(String peerId) throws ReplicationException { + try { + List replicators = queuesClient.getListOfReplicators(); + for (String replicator : replicators) { + List queueIds = queuesClient.getAllQueues(replicator); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (queueInfo.getPeerId().equals(peerId)) { + throw new ReplicationException("undeleted queue for peerId: " + peerId + + ", replicator: " + replicator + ", queueId: " + queueId); + } + } + } + } catch (KeeperException e) { + throw new ReplicationException("Could not check queues deleted with id=" + peerId, e); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 4988481..f66560f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE; import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker; +import org.apache.hadoop.hbase.util.hbck.ReplicationChecker; import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler; import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl; import org.apache.hadoop.hbase.util.hbck.TableLockChecker; @@ -246,6 +247,7 @@ public class HBaseFsck extends Configured implements Closeable { private boolean fixReferenceFiles = false; // fix lingering reference store file private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows private boolean fixTableLocks = false; // fix table locks which are expired + private boolean fixReplication = false; // fix undeleted replication queues for removed peer private boolean fixAny = false; // Set to true if any of the fix is required. // limit checking/fixes to listed tables, if empty attempt to check/fix all @@ -713,6 +715,8 @@ public class HBaseFsck extends Configured implements Closeable { checkAndFixTableLocks(); + checkAndFixReplication(); + // Remove the hbck lock unlockHbck(); @@ -3268,12 +3272,26 @@ public class HBaseFsck extends Configured implements Closeable { } private void checkAndFixTableLocks() throws IOException { - TableLockChecker checker = new TableLockChecker(createZooKeeperWatcher(), errors); + ZooKeeperWatcher zkw = createZooKeeperWatcher(); + TableLockChecker checker = new TableLockChecker(zkw, errors); checker.checkTableLocks(); if (this.fixTableLocks) { checker.fixExpiredTableLocks(); } + zkw.close(); + } + + private void checkAndFixReplication() throws IOException { + ZooKeeperWatcher zkw = createZooKeeperWatcher(); + ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, connection, errors); + checker.checkUnDeletedQueues(); + + if (checker.hasUnDeletedQueues() && this.fixReplication) { + checker.fixUnDeletedQueues(); + setShouldRerun(); + } + zkw.close(); } /** @@ -3812,7 +3830,7 @@ public class HBaseFsck extends Configured implements Closeable { HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION, ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE, WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK, BOUNDARIES_ERROR, ORPHAN_TABLE_STATE, - NO_TABLE_STATE + NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE } void clear(); void report(String message); @@ -4213,6 +4231,14 @@ public class HBaseFsck extends Configured implements Closeable { fixTableLocks = shouldFix; fixAny |= shouldFix; } + + /** + * Set replication fix mode. + */ + public void setFixReplication(boolean shouldFix) { + fixReplication = shouldFix; + fixAny |= shouldFix; + } /** * Check if we should rerun fsck again. This checks if we've tried to @@ -4473,6 +4499,10 @@ public class HBaseFsck extends Configured implements Closeable { out.println(" Table lock options"); out.println(" -fixTableLocks Deletes table locks held for a long time (hbase.table.lock.expire.ms, 10min by default)"); + out.println(""); + out.println(" Replication options"); + out.println(" -fixReplication Deletes replication queues for removed peers"); + out.flush(); errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString()); @@ -4658,6 +4688,8 @@ public class HBaseFsck extends Configured implements Closeable { setRegionBoundariesCheck(); } else if (cmd.equals("-fixTableLocks")) { setFixTableLocks(true); + } else if (cmd.equals("-fixReplication")) { + setFixReplication(true); } else if (cmd.startsWith("-")) { errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd); return printUsageAndExit(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index e126205..e187b9b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -26,8 +26,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -114,6 +117,37 @@ public class TestReplicationAdmin { admin.removePeer(ID_SECOND); assertEquals(0, admin.getPeersCount()); } + + @Test + public void testAddPeerWithUnDeletedQueues() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test HBaseAdmin", null); + ReplicationQueues repQueues = + ReplicationFactory.getReplicationQueues(zkw, conf, null); + repQueues.init("server1"); + + // add queue for ID_ONE + repQueues.addLog(ID_ONE, "file1"); + try { + admin.addPeer(ID_ONE, KEY_ONE); + fail(); + } catch (ReplicationException e) { + // OK! + } + repQueues.removeQueue(ID_ONE); + assertEquals(0, repQueues.getAllQueues().size()); + + // add recovered queue for ID_ONE + repQueues.addLog(ID_ONE + "-server2", "file1"); + try { + admin.addPeer(ID_ONE, KEY_ONE); + fail(); + } catch (ReplicationException e) { + // OK! + } + repQueues.removeAllQueues(); + zkw.close(); + } /** * basic checks that when we add a peer that it is enabled, and that we can disable diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java index 3562a69..db389bc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java @@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; @@ -107,6 +108,8 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.SplitTransactionFactory; import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl; import org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; @@ -117,6 +120,7 @@ import org.apache.hadoop.hbase.util.HBaseFsck.TableInfo; import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker; import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.Assert; @@ -1320,7 +1324,7 @@ public class TestHBaseFsck { // fix hole assertErrors( - doFsck(conf, false, true, false, false, false, false, false, false, false, false, null), + doFsck(conf, false, true, false, false, false, false, false, false, false, false, false, null), new ERROR_CODE[] { ERROR_CODE.NOT_IN_META_OR_DEPLOYED, ERROR_CODE.NOT_IN_META_OR_DEPLOYED }); @@ -1797,7 +1801,7 @@ public class TestHBaseFsck { // for some time until children references are deleted. HBCK erroneously sees this as // overlapping regions HBaseFsck hbck = doFsck(conf, true, true, false, false, false, true, true, true, false, - false, null); + false, false, null); assertErrors(hbck, new ERROR_CODE[] {}); //no LINGERING_SPLIT_PARENT reported // assert that the split hbase:meta entry is still there. @@ -1876,7 +1880,7 @@ public class TestHBaseFsck { // now fix it. The fix should not revert the region split, but add daughters to META hbck = doFsck(conf, true, true, false, false, false, false, false, false, false, - false, null); + false, false, null); assertErrors(hbck, new ERROR_CODE[] { ERROR_CODE.NOT_IN_META_OR_DEPLOYED, ERROR_CODE.NOT_IN_META_OR_DEPLOYED, @@ -2404,6 +2408,55 @@ public class TestHBaseFsck { doQuarantineTest(table, hbck, 3, 0, 0, 0, 1); hbck.close(); } + + @Test(timeout=60000) + public void testCheckReplication() throws Exception { + // check no errors + HBaseFsck hbck = doFsck(conf, false); + assertNoErrors(hbck); + + // create peer + ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf); + Assert.assertEquals(0, replicationAdmin.getPeersCount()); + String zkPort = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT); + replicationAdmin.addPeer("1", "127.0.0.1:2181" + zkPort + ":/hbase"); + replicationAdmin.getPeersCount(); + Assert.assertEquals(1, replicationAdmin.getPeersCount()); + + // create replicator + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test Hbase Fsck", connection); + ReplicationQueues repQueues = + ReplicationFactory.getReplicationQueues(zkw, conf, connection); + repQueues.init("server1"); + // queues for current peer, no errors + repQueues.addLog("1", "file1"); + repQueues.addLog("1-server2", "file1"); + Assert.assertEquals(2, repQueues.getAllQueues().size()); + hbck = doFsck(conf, false); + assertNoErrors(hbck); + + // queues for removed peer + repQueues.addLog("2", "file1"); + repQueues.addLog("2-server2", "file1"); + Assert.assertEquals(4, repQueues.getAllQueues().size()); + hbck = doFsck(conf, false); + assertErrors(hbck, new ERROR_CODE[] { ERROR_CODE.UNDELETED_REPLICATION_QUEUE, + ERROR_CODE.UNDELETED_REPLICATION_QUEUE }); + + // fix the case + hbck = doFsck(conf, true); + hbck = doFsck(conf, false); + assertNoErrors(hbck); + // ensure only "2" is deleted + Assert.assertEquals(2, repQueues.getAllQueues().size()); + Assert.assertNull(repQueues.getLogsInQueue("2")); + Assert.assertNull(repQueues.getLogsInQueue("2-sever2")); + + replicationAdmin.removePeer("1"); + repQueues.removeAllQueues(); + zkw.close(); + replicationAdmin.close(); + } /** * This creates a table and simulates the race situation where a concurrent compaction or split diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java index a28378e..6a8a68b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java @@ -40,14 +40,14 @@ public class HbckTestingUtil { public static HBaseFsck doFsck( Configuration conf, boolean fix, TableName table) throws Exception { - return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table); + return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table); } - public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, - boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps, - boolean fixHdfsOrphans, boolean fixTableOrphans, boolean fixVersionFile, - boolean fixReferenceFiles, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, - TableName table) throws Exception { + public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta, + boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans, + boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, + boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, Boolean fixReplication, TableName table) + throws Exception { HBaseFsck fsck = new HBaseFsck(conf, exec); try { HBaseFsck.setDisplayFullReport(); // i.e. -details @@ -62,6 +62,7 @@ public class HbckTestingUtil { fsck.setFixReferenceFiles(fixReferenceFiles); fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo); fsck.setFixTableLocks(fixTableLocks); + fsck.setFixReplication(fixReplication); if (table != null) { fsck.includeTable(table); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java new file mode 100644 index 0000000..cb71ee2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util.hbck; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.util.HBaseFsck; +import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/* + * Check and fix undeleted replication queues for removed peerId. + */ +public class ReplicationChecker { + private static final Log LOG = LogFactory.getLog(ReplicationChecker.class); + private ErrorReporter errorReporter; + private ReplicationQueuesClient queuesClient; + private ReplicationPeers replicationPeers; + private ReplicationQueueDeletor queueDeletor; + // replicator with its queueIds for removed peers + private Map> undeletedQueueIds = new HashMap>(); + + public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, HConnection connection, + ErrorReporter errorReporter) throws IOException { + try { + this.errorReporter = errorReporter; + this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection); + this.queuesClient.init(); + this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient, + connection); + this.replicationPeers.init(); + this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection); + } catch (ReplicationException e) { + throw new IOException("construct ReplicationChecker fail", e); + } + } + + public boolean hasUnDeletedQueues() { + return errorReporter.getErrorList() + .contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE); + } + + public void checkUnDeletedQueues() throws IOException { + Set peerIds = new HashSet(this.replicationPeers.getAllPeerIds()); + try { + List replicators = this.queuesClient.getListOfReplicators(); + for (String replicator : replicators) { + List queueIds = this.queuesClient.getAllQueues(replicator); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (!peerIds.contains(queueInfo.getPeerId())) { + if (!undeletedQueueIds.containsKey(replicator)) { + undeletedQueueIds.put(replicator, new ArrayList()); + } + undeletedQueueIds.get(replicator).add(queueId); + + String msg = "Undeleted replication queue for removed peer found: " + + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", + queueInfo.getPeerId(), replicator, queueId); + errorReporter.reportError( + HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg); + } + } + } + } catch (KeeperException ke) { + throw new IOException(ke); + } + } + + static class ReplicationQueueDeletor extends ReplicationStateZKBase { + public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { + super(zk, conf, abortable); + } + + public void removeQueue(String replicator, String queueId) throws IOException { + String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator), + queueId); + try { + ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath); + LOG.info("remove replication queue, replicator: " + replicator + ", queueId: " + queueId); + } catch (KeeperException e) { + throw new IOException("fail to delete queue, replicator: " + replicator + ", queueId: " + + queueId); + } + } + } + + public void fixUnDeletedQueues() throws IOException { + for (Entry> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { + String replicator = replicatorAndQueueIds.getKey(); + for (String queueId : replicatorAndQueueIds.getValue()) { + queueDeletor.removeQueue(replicator, queueId); + } + } + } +}