From c83ce0bf4668a39cc69bd509581b8be8b1867c97 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Sat, 29 Sep 2018 09:51:57 +0800 Subject: [PATCH] HBASE-21245 Add exponential backoff when retrying for sync replication related procedures --- .../master/replication/AbstractPeerProcedure.java | 19 +++++++ .../SyncReplicationReplayWALProcedure.java | 41 +++++++------- .../TransitPeerSyncReplicationStateProcedure.java | 63 ++++++++++++++-------- 3 files changed, 81 insertions(+), 42 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java index e133a65..bb5654a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java @@ -22,9 +22,11 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerProcedureStateData; /** @@ -39,6 +41,8 @@ public abstract class AbstractPeerProcedure // used to keep compatible with old client where we can only returns after updateStorage. protected ProcedurePrepareLatch latch; + protected int attempts; + protected AbstractPeerProcedure() { } @@ -106,4 +110,19 @@ public abstract class AbstractPeerProcedure addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() .map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new)); } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + protected ProcedureSuspendedException suspend(long backoff) throws ProcedureSuspendedException { + attempts++; + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java index 26d6a3f..6dde826 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java @@ -23,11 +23,9 @@ import java.util.List; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; -import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; -import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -38,14 +36,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.S @InterfaceAudience.Private public class SyncReplicationReplayWALProcedure - extends StateMachineProcedure - implements PeerProcedureInterface { + extends AbstractPeerProcedure { private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class); - private String peerId; - private ServerName worker = null; private List wals; @@ -58,9 +53,9 @@ public class SyncReplicationReplayWALProcedure this.wals = wals; } - @Override protected Flow executeFromState(MasterProcedureEnv env, - SyncReplicationReplayWALState state) - throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + @Override + protected Flow executeFromState(MasterProcedureEnv env, SyncReplicationReplayWALState state) + throws ProcedureSuspendedException { SyncReplicationReplayWALManager syncReplicationReplayWALManager = env.getMasterServices().getSyncReplicationReplayWALManager(); switch (state) { @@ -68,9 +63,12 @@ public class SyncReplicationReplayWALProcedure try { worker = syncReplicationReplayWALManager.getPeerWorker(peerId); } catch (ReplicationException e) { - LOG.info("Failed to get worker to replay wals {} for peer id={}, retry", wals, peerId); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn("Failed to get worker to replay wals {} for peer id={}, sleep {} secs and retry", + wals, peerId, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; if (worker == null) { LOG.info("No worker to replay wals {} for peer id={}, retry", wals, peerId); setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER); @@ -87,15 +85,21 @@ public class SyncReplicationReplayWALProcedure try { finished = syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0)); } catch (IOException e) { - LOG.info("Failed to check whether replay wals {} finished for peer id={}", wals, peerId); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to check whether replay wals {} finished for peer id={}, sleep {} secs and retry", + wals, peerId, backoff / 1000, e); + throw suspend(backoff); } try { syncReplicationReplayWALManager.removePeerWorker(peerId, worker); } catch (ReplicationException e) { - LOG.info("Failed to remove worker for peer id={}, retry", peerId); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn("Failed to remove worker {} for peer id={}, sleep {} secs and retry", worker, + peerId, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; if (!finished) { LOG.info("Failed to replay wals {} for peer id={}, retry", wals, peerId); setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER); @@ -153,11 +157,6 @@ public class SyncReplicationReplayWALProcedure } @Override - public String getPeerId() { - return peerId; - } - - @Override public PeerOperationType getPeerOperationType() { return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index c650974..5c8f2e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; -import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationUtils; @@ -146,7 +146,7 @@ public class TransitPeerSyncReplicationStateProcedure } private void createDirForRemoteWAL(MasterProcedureEnv env) - throws ProcedureYieldException, IOException { + throws IOException, ProcedureSuspendedException { MasterFileSystem mfs = env.getMasterFileSystem(); Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); @@ -155,8 +155,10 @@ public class TransitPeerSyncReplicationStateProcedure LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway", remoteWALDirForPeer); } else if (!walFs.mkdirs(remoteWALDirForPeer)) { - LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn("Can not create remote wal dir {}, sleep {} secs and retry", backoff / 1000, + remoteWALDirForPeer); + throw suspend(backoff); } } @@ -192,8 +194,7 @@ public class TransitPeerSyncReplicationStateProcedure @Override protected Flow executeFromState(MasterProcedureEnv env, - PeerSyncReplicationStateTransitionState state) - throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + PeerSyncReplicationStateTransitionState state) throws ProcedureSuspendedException { switch (state) { case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION: try { @@ -218,10 +219,14 @@ public class TransitPeerSyncReplicationStateProcedure env.getReplicationPeerManager().disablePeer(peerId); } } catch (ReplicationException e) { - LOG.warn("Failed to update peer storage for peer {} when starting transiting sync " + - "replication peer state from {} to {}, retry", peerId, fromState, toState, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to update peer storage for peer {} when starting transiting sync " + + "replication peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; setNextState( PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN); return Flow.HAS_MORE_STATE; @@ -240,10 +245,14 @@ public class TransitPeerSyncReplicationStateProcedure try { env.getReplicationPeerManager().removeAllQueues(peerId); } catch (ReplicationException e) { - LOG.warn("Failed to remove all replication queues peer {} when starting transiting" + - " sync replication peer state from {} to {}, retry", peerId, fromState, toState, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to remove all replication queues peer {} when starting transiting" + + " sync replication peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; setNextState(fromState.equals(SyncReplicationState.ACTIVE) ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE); @@ -257,10 +266,14 @@ public class TransitPeerSyncReplicationStateProcedure try { env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, toState); } catch (ReplicationException e) { - LOG.warn("Failed to update peer storage for peer {} when ending transiting sync " + - "replication peer state from {} to {}, retry", peerId, fromState, toState, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to update peer storage for peer {} when ending transiting sync " + + "replication peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; setNextState( PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END); return Flow.HAS_MORE_STATE; @@ -274,10 +287,14 @@ public class TransitPeerSyncReplicationStateProcedure try { env.getReplicationPeerManager().enablePeer(peerId); } catch (ReplicationException e) { - LOG.warn("Failed to set peer enabled for peer {} when transiting sync replication peer " + - "state from {} to {}, retry", peerId, fromState, toState, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to set peer enabled for peer {} when transiting sync replication peer " + + "state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; setNextState( PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS); return Flow.HAS_MORE_STATE; @@ -289,10 +306,14 @@ public class TransitPeerSyncReplicationStateProcedure try { createDirForRemoteWAL(env); } catch (IOException e) { - LOG.warn("Failed to create remote wal dir for peer {} when transiting sync replication " + - "peer state from {} to {}, retry", peerId, fromState, toState, e); - throw new ProcedureYieldException(); + long backoff = ProcedureUtil.getBackoffTimeMs(attempts); + LOG.warn( + "Failed to create remote wal dir for peer {} when transiting sync replication " + + "peer state from {} to {}, sleep {} secs and retry", + peerId, fromState, toState, backoff / 1000, e); + throw suspend(backoff); } + attempts = 0; setNextState( PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); return Flow.HAS_MORE_STATE; -- 2.7.4