From 500baabd6c8691daba6a272b004e31c2789b50fe Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 28 Dec 2017 17:56:19 +0800 Subject: [PATCH] HBASE-19633 Clean up the replication queues in the postPeerModification stage when removing a peer --- .../hbase/master/replication/AddPeerProcedure.java | 4 +-- .../master/replication/DisablePeerProcedure.java | 2 +- .../master/replication/EnablePeerProcedure.java | 2 +- .../master/replication/ModifyPeerProcedure.java | 33 +++++++++++++--------- .../master/replication/RefreshPeerProcedure.java | 16 +++++------ .../master/replication/RemovePeerProcedure.java | 6 ++-- .../master/replication/ReplicationPeerManager.java | 14 +++++++++ .../replication/UpdatePeerConfigProcedure.java | 2 +- 8 files changed, 49 insertions(+), 30 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java index a4f9b32..fc9a047 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java @@ -74,8 +74,8 @@ public class AddPeerProcedure extends ModifyPeerProcedure { @Override protected void postPeerModification(MasterProcedureEnv env) throws IOException { - LOG.info("Successfully added " + (enabled ? "ENABLED" : "DISABLED") + " peer " + peerId + - ", config " + peerConfig); + LOG.info("Successfully added {} peer {}, config {}", enabled ? "ENABLED" : "DISABLED", peerId, + peerConfig); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { env.getMasterCoprocessorHost().postAddReplicationPeer(peerId, peerConfig); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java index 10e35a8..c32509e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java @@ -62,7 +62,7 @@ public class DisablePeerProcedure extends ModifyPeerProcedure { @Override protected void postPeerModification(MasterProcedureEnv env) throws IOException { - LOG.info("Successfully disabled peer " + peerId); + LOG.info("Successfully disabled peer {}", peerId); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { cpHost.postDisableReplicationPeer(peerId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java index f2a9f01..1b265d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java @@ -62,7 +62,7 @@ public class EnablePeerProcedure extends ModifyPeerProcedure { @Override protected void postPeerModification(MasterProcedureEnv env) throws IOException { - LOG.info("Successfully enabled peer " + peerId); + LOG.info("Successfully enabled peer {}", peerId); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { cpHost.postEnableReplicationPeer(peerId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index a682606..cc1aa89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -84,10 +84,13 @@ public abstract class ModifyPeerProcedure * Called before we finish the procedure. The implementation can do some logging work, and also * call the coprocessor hook if any. *

- * Notice that, since we have already done the actual work, throwing exception here will not fail - * this procedure, we will just ignore it and finish the procedure as suceeded. + * Notice that, since we have already done the actual work, throwing {@code IOException} here will + * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If + * {@code ReplicationException} is thrown we will retry since this usually means we fails to + * update the peer storage. */ - protected abstract void postPeerModification(MasterProcedureEnv env) throws IOException; + protected abstract void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException; private void releaseLatch() { ProcedurePrepareLatch.releaseLatch(latch, this); @@ -101,16 +104,14 @@ public abstract class ModifyPeerProcedure try { prePeerModification(env); } catch (IOException e) { - LOG.warn( - getClass().getName() + " failed to call CP hook or the pre check is failed for peer " + - peerId + ", mark the procedure as failure and give up", - e); + LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " + + "mark the procedure as failure and give up", getClass().getName(), peerId, e); setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e); releaseLatch(); return Flow.NO_MORE_STATE; } catch (ReplicationException e) { - LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + - ", retry", e); + LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(), + peerId, e); throw new ProcedureYieldException(); } setNextState(PeerModificationState.UPDATE_PEER_STORAGE); @@ -119,8 +120,8 @@ public abstract class ModifyPeerProcedure try { updatePeerStorage(env); } catch (ReplicationException e) { - LOG.warn( - getClass().getName() + " update peer storage for peer " + peerId + " failed, retry", e); + LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId, + e); throw new ProcedureYieldException(); } setNextState(PeerModificationState.REFRESH_PEER_ON_RS); @@ -134,9 +135,13 @@ public abstract class ModifyPeerProcedure case POST_PEER_MODIFICATION: try { postPeerModification(env); + } catch (ReplicationException e) { + LOG.warn("{} failed to call postPeerModification for peer {}, retry", + getClass().getName(), peerId, e); + throw new ProcedureYieldException(); } catch (IOException e) { - LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + - ", ignore since the procedure has already done", e); + LOG.warn("{} failed to call post CP hook for peer {}, " + + "ignore since the procedure has already done", getClass().getName(), peerId, e); } releaseLatch(); return Flow.NO_MORE_STATE; @@ -175,7 +180,7 @@ public abstract class ModifyPeerProcedure throws IOException, InterruptedException { if (state == PeerModificationState.PRE_PEER_MODIFICATION) { // actually the peer related operations has no rollback, but if we haven't done any - // modifications on the peer storage, we can just return. + // modifications on the peer storage yet, we can just return. return; } throw new UnsupportedOperationException(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java index ba4285f..891ec02 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java @@ -122,17 +122,15 @@ public class RefreshPeerProcedure extends Procedure private void complete(MasterProcedureEnv env, Throwable error) { if (event == null) { - LOG.warn("procedure event for " + getProcId() + - " is null, maybe the procedure is created when recovery", - new Exception()); + LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", + getProcId()); return; } if (error != null) { - LOG.warn("Refresh peer " + peerId + " for " + type + " on " + targetServer + " failed", - error); + LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, targetServer, error); this.succ = false; } else { - LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer + " suceeded"); + LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, targetServer); this.succ = true; } @@ -168,9 +166,9 @@ public class RefreshPeerProcedure extends Procedure dispatched = false; } if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) { - LOG.info("Can not add remote operation for refreshing peer " + peerId + " for " + type + - " to " + targetServer + ", this usually because the server is already dead," + - " give up and mark the procedure as complete"); + LOG.info("Can not add remote operation for refreshing peer {} for {} to {}, " + + "this usually because the server is already dead, " + + "give up and mark the procedure as complete", peerId, type, targetServer); return null; } dispatched = true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java index 6e9c384..601d8a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java @@ -61,8 +61,10 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { } @Override - protected void postPeerModification(MasterProcedureEnv env) throws IOException { - LOG.info("Successfully removed peer " + peerId); + protected void postPeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { + env.getReplicationPeerManager().removeAllQueues(peerId); + LOG.info("Successfully removed peer {}", peerId); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { cpHost.postRemoveReplicationPeer(peerId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index f4ccce8..a24a5f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -217,6 +217,20 @@ public class ReplicationPeerManager { return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); } + public void removeAllQueues(String peerId) throws ReplicationException { + for (ServerName replicator : queueStorage.getListOfReplicators()) { + List queueIds = queueStorage.getAllQueues(replicator); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (queueInfo.getPeerId().equals(peerId)) { + queueStorage.removeQueue(replicator, queueId); + queueStorage.removeReplicatorIfQueueIsEmpty(replicator); + } + } + } + queueStorage.removePeerFromHFileRefs(peerId); + } + private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { checkClusterKey(peerConfig.getClusterKey()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java index a43532d..83d6be4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java @@ -70,7 +70,7 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { @Override protected void postPeerModification(MasterProcedureEnv env) throws IOException { - LOG.info("Successfully updated peer config of " + peerId + " to " + peerConfig); + LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig); -- 2.7.4