From d0346caa15267d6b670d750957e5f82e1eb7262e Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Sat, 29 Sep 2018 09:51:57 +0800 Subject: [PATCH] HBASE-21245 Add exponential backoff when retrying for sync replication related procedures --- .../master/replication/AbstractPeerProcedure.java | 20 +++++++ .../master/replication/ModifyPeerProcedure.java | 40 ++++---------- .../SyncReplicationReplayWALProcedure.java | 41 +++++++------- .../TransitPeerSyncReplicationStateProcedure.java | 63 ++++++++++++++-------- 4 files changed, 93 insertions(+), 71 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java index e133a65..496d535 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java @@ -18,14 +18,17 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; + import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerProcedureStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; /** * The base class for all replication peer related procedure. @@ -39,6 +42,8 @@ public abstract class AbstractPeerProcedure // used to keep compatible with old client where we can only returns after updateStorage. protected ProcedurePrepareLatch latch; + protected int attempts; + protected AbstractPeerProcedure() { } @@ -106,4 +111,19 @@ public abstract class AbstractPeerProcedure addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() .map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new)); } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + protected ProcedureSuspendedException suspend(long backoff) throws ProcedureSuspendedException { + attempts++; + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 7690c96..271b0dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; /** * The base class for all replication peer related procedure except sync replication state @@ -61,8 +60,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure - implements PeerProcedureInterface { + extends AbstractPeerProcedure { private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class); - private String peerId; - private ServerName worker = null; private List wals; @@ -58,9 +53,9 @@ public class SyncReplicationReplayWALProcedure this.wals = wals; } - @Override protected Flow executeFromState(MasterProcedureEnv env, - SyncReplicationReplayWALState state) - throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + @Override + protected Flow executeFromState(MasterProcedureEnv env, SyncReplicationReplayWALState state) + throws ProcedureSuspendedException { SyncReplicationReplayWALManager syncReplicationReplayWALManager = env.getMasterServices().getSyncReplicationReplayWALManager(); switch (state) { @@ -68,9 +63,12 @@ public class SyncReplicationReplayWALProcedure try { worker = syncReplicationReplayWALManager.getPeerWorker(peerId); } catch (ReplicationException e) { - LOG.info("Failed to get worker to replay wals {} for peer id={}, retry", wals, peerId); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn("Failed to get worker to replay wals {} for peer id={}, sleep {} secs and retry", + wals, peerId, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; if (worker == null) { LOG.info("No worker to replay wals {} for peer id={}, retry", wals, peerId); setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER); @@ -87,15 +85,21 @@ public class SyncReplicationReplayWALProcedure try { finished = syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0)); } catch (IOException e) { - LOG.info("Failed to check whether replay wals {} finished for peer id={}", wals, peerId); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn("Failed to check whether replay wals {} finished for peer id={}" + + ", sleep {} secs and retry", + wals, peerId, backoff / 1000, e); + throw suspend(backoff); } try { syncReplicationReplayWALManager.removePeerWorker(peerId, worker); } catch (ReplicationException e) { - LOG.info("Failed to remove worker for peer id={}, retry", peerId); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn("Failed to remove worker {} for peer id={}, sleep {} secs and retry", worker, + peerId, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; if (!finished) { LOG.info("Failed to replay wals {} for peer id={}, retry", wals, peerId); setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER); @@ -153,11 +157,6 @@ public class SyncReplicationReplayWALProcedure } @Override - public String getPeerId() { - return peerId; - } - - @Override public PeerOperationType getPeerOperationType() { return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index c650974..5c8f2e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; -import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationUtils; @@ -146,7 +146,7 @@ public class TransitPeerSyncReplicationStateProcedure } private void createDirForRemoteWAL(MasterProcedureEnv env) - throws ProcedureYieldException, IOException { + throws IOException, ProcedureSuspendedException { MasterFileSystem mfs = env.getMasterFileSystem(); Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); @@ -155,8 +155,10 @@ public class TransitPeerSyncReplicationStateProcedure LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway", remoteWALDirForPeer); } else if (!walFs.mkdirs(remoteWALDirForPeer)) { - LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn("Can not create remote wal dir {}, sleep {} secs and retry", backoff / 1000, + remoteWALDirForPeer); + throw suspend(backoff); } } @@ -192,8 +194,7 @@ public class TransitPeerSyncReplicationStateProcedure @Override protected Flow executeFromState(MasterProcedureEnv env, - PeerSyncReplicationStateTransitionState state) - throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + PeerSyncReplicationStateTransitionState state) throws ProcedureSuspendedException { switch (state) { case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION: try { @@ -218,10 +219,14 @@ public class TransitPeerSyncReplicationStateProcedure env.getReplicationPeerManager().disablePeer(peerId); } } catch (ReplicationException e) { - LOG.warn("Failed to update peer storage for peer {} when starting transiting sync " + - "replication peer state from {} to {}, retry", peerId, fromState, toState, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to update peer storage for peer {} when starting transiting sync " + + "replication peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; setNextState( PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN); return Flow.HAS_MORE_STATE; @@ -240,10 +245,14 @@ public class TransitPeerSyncReplicationStateProcedure try { env.getReplicationPeerManager().removeAllQueues(peerId); } catch (ReplicationException e) { - LOG.warn("Failed to remove all replication queues peer {} when starting transiting" + - " sync replication peer state from {} to {}, retry", peerId, fromState, toState, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to remove all replication queues peer {} when starting transiting" + + " sync replication peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; setNextState(fromState.equals(SyncReplicationState.ACTIVE) ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); @@ -257,10 +266,14 @@ public class TransitPeerSyncReplicationStateProcedure try { env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, toState); } catch (ReplicationException e) { - LOG.warn("Failed to update peer storage for peer {} when ending transiting sync " + - "replication peer state from {} to {}, retry", peerId, fromState, toState, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to update peer storage for peer {} when ending transiting sync " + + "replication peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; setNextState( PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END); return Flow.HAS_MORE_STATE; @@ -274,10 +287,14 @@ public class TransitPeerSyncReplicationStateProcedure try { env.getReplicationPeerManager().enablePeer(peerId); } catch (ReplicationException e) { - LOG.warn("Failed to set peer enabled for peer {} when transiting sync replication peer " + - "state from {} to {}, retry", peerId, fromState, toState, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to set peer enabled for peer {} when transiting sync replication peer " + + "state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; setNextState( PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS); return Flow.HAS_MORE_STATE; @@ -289,10 +306,14 @@ public class TransitPeerSyncReplicationStateProcedure try { createDirForRemoteWAL(env); } catch (IOException e) { - LOG.warn("Failed to create remote wal dir for peer {} when transiting sync replication " + - "peer state from {} to {}, retry", peerId, fromState, toState, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to create remote wal dir for peer {} when transiting sync replication " + + "peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; setNextState( PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); return Flow.HAS_MORE_STATE; -- 2.7.4