From 7868a612bc667f249921a6ed8c3ada289d3b810c Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 27 Dec 2017 20:37:03 +0800 Subject: [PATCH] HBASE-19633 Clean up the replication queues in the postPeerModification stage when removing a peer --- .../hbase/master/replication/ModifyPeerProcedure.java | 19 +++++++++++++------ .../hbase/master/replication/RemovePeerProcedure.java | 4 +++- .../master/replication/ReplicationPeerManager.java | 14 ++++++++++++++ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 279fbc7..ef893a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -82,10 +82,13 @@ public abstract class ModifyPeerProcedure * Called before we finish the procedure. The implementation can do some logging work, and also * call the coprocessor hook if any. *

- * Notice that, since we have already done the actual work, throwing exception here will not fail - * this procedure, we will just ignore it and finish the procedure as suceeded. + * Notice that, since we have already done the actual work, throwing {@code IOException} here will + * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If + * {@code ReplicationException} is thrown we will retry since this usually means we fails to + * update the peer storage. */ - protected abstract void postPeerModification(MasterProcedureEnv env) throws IOException; + protected abstract void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException; private void releaseLatch() { ProcedurePrepareLatch.releaseLatch(latch, this); @@ -132,9 +135,13 @@ public abstract class ModifyPeerProcedure case POST_PEER_MODIFICATION: try { postPeerModification(env); + } catch (ReplicationException e) { + LOG.warn(getClass().getName() + " failed to call postPeerModification for peer " + + peerId + ", retry", e); + throw new ProcedureYieldException(); } catch (IOException e) { - LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + - ", ignore since the procedure has already done", e); + LOG.warn(getClass().getName() + " failed to call postPeerModification for peer " + + peerId + ", ignore since the procedure has already done", e); } releaseLatch(); return Flow.NO_MORE_STATE; @@ -160,7 +167,7 @@ public abstract class ModifyPeerProcedure throws IOException, InterruptedException { if (state == PeerModificationState.PRE_PEER_MODIFICATION) { // actually the peer related operations has no rollback, but if we haven't done any - // modifications on the peer storage, we can just return. + // modifications on the peer storage yet, we can just return. return; } throw new UnsupportedOperationException(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java index 6e9c384..049a781 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java @@ -61,7 +61,9 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { } @Override - protected void postPeerModification(MasterProcedureEnv env) throws IOException { + protected void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { + env.getReplicationPeerManager().removeAllQueues(peerId); LOG.info("Successfully removed peer " + peerId); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index f4ccce8..a24a5f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -217,6 +217,20 @@ public class ReplicationPeerManager { return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); } + public void removeAllQueues(String peerId) throws ReplicationException { + for (ServerName replicator : queueStorage.getListOfReplicators()) { + List queueIds = queueStorage.getAllQueues(replicator); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (queueInfo.getPeerId().equals(peerId)) { + queueStorage.removeQueue(replicator, queueId); + queueStorage.removeReplicatorIfQueueIsEmpty(replicator); + } + } + } + queueStorage.removePeerFromHFileRefs(peerId); + } + private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { checkClusterKey(peerConfig.getClusterKey()); -- 2.7.4