From 22f361c3910e63f64ded28570a396e292333c117 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Mon, 5 Nov 2018 17:18:57 +0800 Subject: [PATCH] HBASE-21420 Use procedure event to wake up the SyncReplicationReplayWALProcedures which wait for worker --- .../AbstractProcedureScheduler.java | 5 +- .../hbase/procedure2/ProcedureEvent.java | 3 +- .../src/main/protobuf/MasterProcedure.proto | 1 + .../replication/ZKReplicationStorageBase.java | 2 +- .../replication/RecoverStandbyProcedure.java | 43 ++-- .../SyncReplicationReplayWALManager.java | 196 ++++++++++++------ .../SyncReplicationReplayWALProcedure.java | 79 ++++--- ...ncReplicationReplayWALRemoteProcedure.java | 28 +-- ...SyncReplicationReplayWALWorkerStorage.java | 108 ---------- .../TestSyncReplicationReplayWALManager.java | 179 ++++++++++++++++ ...eerSyncReplicationStateProcedureRetry.java | 94 +++++++++ 11 files changed, 493 insertions(+), 245 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestSyncReplicationReplayWALManager.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestTransitPeerSyncReplicationStateProcedureRetry.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index 7ab1329b32..d5a0260dd5 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -27,6 +27,8 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + @InterfaceAudience.Private public abstract class AbstractProcedureScheduler implements ProcedureScheduler { private static final Logger LOG = LoggerFactory.getLogger(AbstractProcedureScheduler.class); @@ -245,7 +247,8 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { * Access should remain package-private. Use ProcedureEvent class to wake/suspend events. * @param events the list of events to wake */ - void wakeEvents(ProcedureEvent[] events) { + @VisibleForTesting + public void wakeEvents(ProcedureEvent[] events) { schedLock(); try { for (ProcedureEvent event : events) { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java index be21f77faf..93d7b1153b 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureEvent.java @@ -91,7 +91,8 @@ public class ProcedureEvent { * when waking up multiple events. * Access should remain package-private. */ - synchronized void wakeInternal(AbstractProcedureScheduler procedureScheduler) { + @VisibleForTesting + public synchronized void wakeInternal(AbstractProcedureScheduler procedureScheduler) { if (ready && !suspendedProcedures.isEmpty()) { LOG.warn("Found procedures suspended in a ready event! Size=" + suspendedProcedures.size()); } diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 07d1a55af5..7a1c1d3062 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -507,6 +507,7 @@ message RecoverStandbyStateData { message SyncReplicationReplayWALStateData { required string peer_id = 1; repeated string wal = 2; + optional ServerName worker = 3; } message SyncReplicationReplayWALRemoteStateData { diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java index d6e692aef3..596167f9ab 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; * zookeeper. */ @InterfaceAudience.Private -public class ZKReplicationStorageBase { +public abstract class ZKReplicationStorageBase { public static final String REPLICATION_ZNODE = "zookeeper.znode.replication"; public static final String REPLICATION_ZNODE_DEFAULT = "replication"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java index a1e2400fc9..73ff822189 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java @@ -20,13 +20,11 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; import java.util.Arrays; import java.util.List; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +32,10 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyStateData; +/** + * The procedure for replaying all the remote wals for transitting a sync replication peer from + * STANDBY to DOWNGRADE_ACTIVE. + */ @InterfaceAudience.Private public class RecoverStandbyProcedure extends AbstractPeerNoLockProcedure { @@ -53,7 +55,7 @@ public class RecoverStandbyProcedure extends AbstractPeerNoLockProcedure wals = syncReplicationReplayWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId); - addChildProcedure(wals.stream().map(wal -> new SyncReplicationReplayWALProcedure(peerId, + addChildProcedure(wals.stream() + .map(wal -> new SyncReplicationReplayWALProcedure(peerId, Arrays.asList(syncReplicationReplayWALManager.removeWALRootPath(wal)))) - .toArray(SyncReplicationReplayWALProcedure[]::new)); + .toArray(SyncReplicationReplayWALProcedure[]::new)); } catch (IOException e) { LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e); throw new ProcedureYieldException(); @@ -147,4 +139,19 @@ public class RecoverStandbyProcedure extends AbstractPeerNoLockProcedure + * First, it will be used to balance the replay work across all the region servers. We will record + * the region servers which have already been used for replaying wal, and prevent sending new replay + * work to it, until the previous replay work has been done, where we will remove the region server + * from the used worker set. See the comment for {@code UsedReplayWorkersForPeer} for more details. + *

+ * Second, the logic for managing the remote wal directory is kept here. Before replaying the wals, + * we will rename the remote wal directory, the new name is called 'replay' directory, see + * {@link #renameToPeerReplayWALDir(String)}. This is used to prevent further writing of remote + * wals, which is very important for keeping consistency. And then we will start replaying all the + * wals, once a wal has been replayed, we will truncate the file, so that if there are crashes + * happen, we do not need to replay all the wals again, see {@link #finishReplayWAL(String)} and + * {@link #isReplayWALFinished(String)}. After replaying all the wals, we will rename the 'replay' + * directory, the new name is called 'snapshot' directory. In the directory, we will keep all the + * names for the wals being replayed, since all the files should have been truncated. When we + * transitting original the ACTIVE cluster to STANDBY later, and there are region server crashes, we + * will see the wals in this directory to determine whether a wal should be split and replayed or + * not. You can see the code in {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} for more + * details. + */ @InterfaceAudience.Private public class SyncReplicationReplayWALManager { - private static final Logger LOG = - LoggerFactory.getLogger(SyncReplicationReplayWALManager.class); + private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationReplayWALManager.class); - private final MasterServices services; + private final ServerManager serverManager; private final FileSystem fs; @@ -59,67 +84,128 @@ public class SyncReplicationReplayWALManager { private final Path remoteWALDir; - private final ZKSyncReplicationReplayWALWorkerStorage workerStorage; + /** + * This class is used to record the used workers(region servers) for a replication peer. For + * balancing the replaying remote wal job, we will only schedule one remote replay procedure each + * time. So when acquiring a worker, we will first get all the region servers for this cluster, + * and then filter out the used ones. + *

+ * The {@link ProcedureEvent} is used for notifying procedures that there are available workers + * now. We used to use sleeping and retrying before, but if the interval is too large, for + * example, exponential backoff, then it is not effective, but if the interval is too small, then + * we will flood the procedure wal. + *

+ * The states are only kept in memory, so when restarting, we need to reconstruct these + * information, using the information stored in related procedures. See the {@code afterReplay} + * method in {@link RecoverStandbyProcedure} and {@link SyncReplicationReplayWALProcedure} for + * more details. + */ + private static final class UsedReplayWorkersForPeer { + + private final Set usedWorkers = new HashSet(); + + private final ProcedureEvent event; + + public UsedReplayWorkersForPeer(String peerId) { + this.event = new ProcedureEvent<>(peerId); + } + + public void used(ServerName worker) { + usedWorkers.add(worker); + } + + public Optional acquire(ServerManager serverManager) { + Optional worker = serverManager.getOnlineServers().keySet().stream() + .filter(server -> !usedWorkers.contains(server)).findAny(); + worker.ifPresent(usedWorkers::add); + return worker; + } + + public void release(ServerName worker) { + usedWorkers.remove(worker); + } + + public void suspend(Procedure proc) { + event.suspend(); + event.suspendIfNotReady(proc); + } - private final Map> workers = new HashMap<>(); + public void wake(MasterProcedureScheduler scheduler) { + if (!event.isReady()) { + event.wake(scheduler); + } + } + } - private final Object workerLock = new Object(); + private final ConcurrentMap usedWorkersByPeer = + new ConcurrentHashMap<>(); public SyncReplicationReplayWALManager(MasterServices services) throws IOException, ReplicationException { - this.services = services; + this.serverManager = services.getServerManager(); this.fs = services.getMasterFileSystem().getWALFileSystem(); this.walRootDir = services.getMasterFileSystem().getWALRootDir(); this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); - this.workerStorage = new ZKSyncReplicationReplayWALWorkerStorage(services.getZooKeeper(), - services.getConfiguration()); - checkReplayingWALDir(); - } - - private void checkReplayingWALDir() throws IOException, ReplicationException { - FileStatus[] files = fs.listStatus(remoteWALDir); - for (FileStatus file : files) { - String name = file.getPath().getName(); - if (name.endsWith(REMOTE_WAL_REPLAY_SUFFIX)) { - String peerId = name.substring(0, name.length() - REMOTE_WAL_REPLAY_SUFFIX.length()); - workers.put(peerId, workerStorage.getPeerWorkers(peerId)); + MasterProcedureScheduler scheduler = + services.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler(); + serverManager.registerListener(new ServerListener() { + + @Override + public void serverAdded(ServerName serverName) { + for (UsedReplayWorkersForPeer usedWorkers : usedWorkersByPeer.values()) { + synchronized (usedWorkers) { + usedWorkers.wake(scheduler); + } + } } - } + }); } - public void registerPeer(String peerId) throws ReplicationException { - workers.put(peerId, new HashSet<>()); - workerStorage.addPeer(peerId); + public void registerPeer(String peerId) { + usedWorkersByPeer.put(peerId, new UsedReplayWorkersForPeer(peerId)); } - public void unregisterPeer(String peerId) throws ReplicationException { - workers.remove(peerId); - workerStorage.removePeer(peerId); + public void unregisterPeer(String peerId) { + usedWorkersByPeer.remove(peerId); } - public ServerName getPeerWorker(String peerId) throws ReplicationException { - Optional worker = Optional.empty(); - ServerName workerServer = null; - synchronized (workerLock) { - worker = services.getServerManager().getOnlineServers().keySet().stream() - .filter(server -> !workers.get(peerId).contains(server)).findFirst(); + /** + * Get a worker for replaying remote wal for a give peer. If no worker available, i.e, all the + * region servers have been used by others, a {@link ProcedureSuspendedException} will be thrown + * to suspend the procedure. And it will be woken up later when there are available workers, + * either by others release a worker, or there is a new region server joins the cluster. + */ + public ServerName acquirePeerWorker(String peerId, Procedure proc) + throws ProcedureSuspendedException { + UsedReplayWorkersForPeer usedWorkers = usedWorkersByPeer.get(peerId); + synchronized (usedWorkers) { + Optional worker = usedWorkers.acquire(serverManager); if (worker.isPresent()) { - workerServer = worker.get(); - workers.get(peerId).add(workerServer); + return worker.get(); } + // no worker available right now, suspend the procedure + usedWorkers.suspend(proc); } - if (workerServer != null) { - workerStorage.addPeerWorker(peerId, workerServer); - } - return workerServer; + throw new ProcedureSuspendedException(); } - public void removePeerWorker(String peerId, ServerName worker) throws ReplicationException { - synchronized (workerLock) { - workers.get(peerId).remove(worker); + public void releasePeerWorker(String peerId, ServerName worker, + MasterProcedureScheduler scheduler) { + UsedReplayWorkersForPeer usedWorkers = usedWorkersByPeer.get(peerId); + synchronized (usedWorkers) { + usedWorkers.release(worker); + usedWorkers.wake(scheduler); } - workerStorage.removePeerWorker(peerId, worker); } + + /** + * Will only be called when loading procedures, where we need to construct the used worker set for + * each peer. + */ + public void addUsedPeerWorker(String peerId, ServerName worker) { + usedWorkersByPeer.get(peerId).used(worker); + } + public void createPeerRemoteWALDir(String peerId) throws IOException { Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId); if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) { @@ -132,23 +218,23 @@ public class SyncReplicationReplayWALManager { deleteDir(dst, peerId); if (!fs.rename(src, dst)) { throw new IOException( - "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId); + "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId); } LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId); } else if (!fs.exists(dst)) { throw new IOException( - "Want to rename from " + src + " to " + dst + ", but they both do not exist"); + "Want to rename from " + src + " to " + dst + ", but they both do not exist"); } } public void renameToPeerReplayWALDir(String peerId) throws IOException { rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId), - peerId); + peerId); } public void renameToPeerSnapshotWALDir(String peerId) throws IOException { rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId), - peerId); + peerId); } public List getReplayWALsAndCleanUpUnusedFiles(String peerId) throws IOException { @@ -158,7 +244,7 @@ public class SyncReplicationReplayWALManager { Path src = status.getPath(); String srcName = src.getName(); String dstName = - srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length()); + srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length()); FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName)); } List wals = new ArrayList<>(); @@ -175,14 +261,6 @@ public class SyncReplicationReplayWALManager { return wals; } - public void snapshotPeerReplayWALDir(String peerId) throws IOException { - Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId); - if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) { - throw new IOException( - "Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId); - } - } - private void deleteDir(Path dir, String peerId) throws IOException { if (!fs.delete(dir, true) && fs.exists(dir)) { throw new IOException("Failed to remove dir " + dir + " for peer id=" + peerId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java index 653b766926..19f7aea58c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java @@ -18,28 +18,31 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; -import java.util.ArrayList; import java.util.List; - import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALStateData; +/** + * The procedure for replaying a set of remote wals. It will get an available region server and + * schedule a {@link SyncReplicationReplayWALRemoteProcedure} to actually send the request to region + * server. + */ @InterfaceAudience.Private public class SyncReplicationReplayWALProcedure extends AbstractPeerNoLockProcedure { private static final Logger LOG = - LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class); + LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class); private ServerName worker = null; @@ -57,24 +60,10 @@ public class SyncReplicationReplayWALProcedure protected Flow executeFromState(MasterProcedureEnv env, SyncReplicationReplayWALState state) throws ProcedureSuspendedException { SyncReplicationReplayWALManager syncReplicationReplayWALManager = - env.getMasterServices().getSyncReplicationReplayWALManager(); + env.getMasterServices().getSyncReplicationReplayWALManager(); switch (state) { case ASSIGN_WORKER: - try { - worker = syncReplicationReplayWALManager.getPeerWorker(peerId); - } catch (ReplicationException e) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts); - LOG.warn("Failed to get worker to replay wals {} for peer id={}, sleep {} secs and retry", - wals, peerId, backoff / 1000, e); - throw suspend(backoff); - } - if (worker == null) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts); - LOG.info("No worker to replay wals {} for peer id={}, sleep {} secs and retry", wals, - peerId, backoff / 1000); - throw suspend(backoff); - } - attempts = 0; + worker = syncReplicationReplayWALManager.acquirePeerWorker(peerId, this); setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER); return Flow.HAS_MORE_STATE; case DISPATCH_WALS_TO_WORKER: @@ -88,19 +77,11 @@ public class SyncReplicationReplayWALProcedure } catch (IOException e) { long backoff = ProcedureUtil.getBackoffTimeMs(attempts); LOG.warn("Failed to check whether replay wals {} finished for peer id={}" + - ", sleep {} secs and retry", - wals, peerId, backoff / 1000, e); + ", sleep {} secs and retry", wals, peerId, backoff / 1000, e); throw suspend(backoff); } - try { - syncReplicationReplayWALManager.removePeerWorker(peerId, worker); - } catch (ReplicationException e) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts); - LOG.warn("Failed to remove worker {} for peer id={}, sleep {} secs and retry", worker, - peerId, backoff / 1000, e); - throw suspend(backoff); - } - attempts = 0; + syncReplicationReplayWALManager.releasePeerWorker(peerId, worker, + env.getProcedureScheduler()); if (!finished) { LOG.warn("Failed to replay wals {} for peer id={}, retry", wals, peerId); setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER); @@ -113,8 +94,7 @@ public class SyncReplicationReplayWALProcedure } @Override - protected void rollbackState(MasterProcedureEnv env, - SyncReplicationReplayWALState state) + protected void rollbackState(MasterProcedureEnv env, SyncReplicationReplayWALState state) throws IOException, InterruptedException { if (state == getInitialState()) { return; @@ -128,8 +108,7 @@ public class SyncReplicationReplayWALProcedure } @Override - protected int getStateId( - SyncReplicationReplayWALState state) { + protected int getStateId(SyncReplicationReplayWALState state) { return state.getNumber(); } @@ -139,26 +118,40 @@ public class SyncReplicationReplayWALProcedure } @Override - protected void serializeStateData(ProcedureStateSerializer serializer) - throws IOException { + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); SyncReplicationReplayWALStateData.Builder builder = - SyncReplicationReplayWALStateData.newBuilder(); - builder.setPeerId(peerId); - wals.stream().forEach(builder::addWal); + SyncReplicationReplayWALStateData.newBuilder().setPeerId(peerId).addAllWal(wals); + if (worker != null) { + builder.setWorker(ProtobufUtil.toServerName(worker)); + } serializer.serialize(builder.build()); } @Override protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); SyncReplicationReplayWALStateData data = - serializer.deserialize(SyncReplicationReplayWALStateData.class); + serializer.deserialize(SyncReplicationReplayWALStateData.class); peerId = data.getPeerId(); - wals = new ArrayList<>(); - data.getWalList().forEach(wals::add); + wals = data.getWalList(); + if (data.hasWorker()) { + worker = ProtobufUtil.toServerName(data.getWorker()); + } } @Override public PeerOperationType getPeerOperationType() { return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL; } + + @Override + protected void afterReplay(MasterProcedureEnv env) { + // If the procedure is not finished and the worker is not null, we should add it to the used + // worker set, to prevent the worker being used by others. + if (worker != null && !isFinished()) { + env.getMasterServices().getSyncReplicationReplayWALManager().addUsedPeerWorker(peerId, + worker); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java index ba8dd78185..8e6d411ec7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java @@ -43,12 +43,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALRemoteStateData; +/** + * A remote procedure which is used to send replaying remote wal work to region server. + */ @InterfaceAudience.Private public class SyncReplicationReplayWALRemoteProcedure extends Procedure implements RemoteProcedure, PeerProcedureInterface { private static final Logger LOG = - LoggerFactory.getLogger(SyncReplicationReplayWALRemoteProcedure.class); + LoggerFactory.getLogger(SyncReplicationReplayWALRemoteProcedure.class); private String peerId; @@ -75,11 +78,11 @@ public class SyncReplicationReplayWALRemoteProcedure extends Procedure(); data.getWalList().forEach(wals::add); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java deleted file mode 100644 index 5991cf048e..0000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.replication; - -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; - -@InterfaceAudience.Private -public class ZKSyncReplicationReplayWALWorkerStorage extends ZKReplicationStorageBase { - - public static final String WORKERS_ZNODE = "zookeeper.znode.sync.replication.replaywal.workers"; - - public static final String WORKERS_ZNODE_DEFAULT = "replaywal-workers"; - - /** - * The name of the znode that contains a list of workers to replay wal. - */ - private final String workersZNode; - - public ZKSyncReplicationReplayWALWorkerStorage(ZKWatcher zookeeper, Configuration conf) { - super(zookeeper, conf); - String workersZNodeName = conf.get(WORKERS_ZNODE, WORKERS_ZNODE_DEFAULT); - workersZNode = ZNodePaths.joinZNode(replicationZNode, workersZNodeName); - } - - private String getPeerNode(String peerId) { - return ZNodePaths.joinZNode(workersZNode, peerId); - } - - public void addPeer(String peerId) throws ReplicationException { - try { - ZKUtil.createWithParents(zookeeper, getPeerNode(peerId)); - } catch (KeeperException e) { - throw new ReplicationException( - "Failed to add peer id=" + peerId + " to replaywal-workers storage", e); - } - } - - public void removePeer(String peerId) throws ReplicationException { - try { - ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId)); - } catch (KeeperException e) { - throw new ReplicationException( - "Failed to remove peer id=" + peerId + " to replaywal-workers storage", e); - } - } - - private String getPeerWorkerNode(String peerId, ServerName worker) { - return ZNodePaths.joinZNode(getPeerNode(peerId), worker.getServerName()); - } - - public void addPeerWorker(String peerId, ServerName worker) throws ReplicationException { - try { - ZKUtil.createWithParents(zookeeper, getPeerWorkerNode(peerId, worker)); - } catch (KeeperException e) { - throw new ReplicationException("Failed to add worker=" + worker + " for peer id=" + peerId, - e); - } - } - - public void removePeerWorker(String peerId, ServerName worker) throws ReplicationException { - try { - ZKUtil.deleteNode(zookeeper, getPeerWorkerNode(peerId, worker)); - } catch (KeeperException e) { - throw new ReplicationException("Failed to remove worker=" + worker + " for peer id=" + peerId, - e); - } - } - - public Set getPeerWorkers(String peerId) throws ReplicationException { - try { - List children = ZKUtil.listChildrenNoWatch(zookeeper, getPeerNode(peerId)); - if (children == null) { - return new HashSet<>(); - } - return children.stream().map(ServerName::valueOf).collect(Collectors.toSet()); - } catch (KeeperException e) { - throw new ReplicationException("Failed to list workers for peer id=" + peerId, e); - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestSyncReplicationReplayWALManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestSyncReplicationReplayWALManager.java new file mode 100644 index 0000000000..a6290df155 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestSyncReplicationReplayWALManager.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.ServerListener; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +@Category({ MasterTests.class, SmallTests.class }) +public class TestSyncReplicationReplayWALManager { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationReplayWALManager.class); + + private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private SyncReplicationReplayWALManager manager; + + private MasterProcedureScheduler scheduler; + + private Set onlineServers; + + private List listeners; + + private Queue> wokenProcedures; + + @Before + public void setUp() throws IOException, ReplicationException { + wokenProcedures = new ArrayDeque<>(); + onlineServers = new HashSet<>(); + listeners = new ArrayList<>(); + ServerManager serverManager = mock(ServerManager.class); + doAnswer(inv -> listeners.add(inv.getArgument(0))).when(serverManager) + .registerListener(any(ServerListener.class)); + ServerMetrics serverMetrics = mock(ServerMetrics.class); + doAnswer(inv -> onlineServers.stream() + .collect(Collectors.toMap(Function.identity(), k -> serverMetrics))).when(serverManager) + .getOnlineServers(); + + MasterFileSystem mfs = mock(MasterFileSystem.class); + when(mfs.getFileSystem()).thenReturn(UTIL.getTestFileSystem()); + when(mfs.getWALRootDir()).thenReturn(new Path("/")); + + scheduler = mock(MasterProcedureScheduler.class); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ProcedureEvent event = ((ProcedureEvent[]) invocation.getArgument(0))[0]; + event.wakeInternal(new MasterProcedureScheduler(pid -> null) { + + @Override + public void addFront(Iterator procedureIterator) { + procedureIterator.forEachRemaining(wokenProcedures::add); + } + }); + return null; + } + }).when(scheduler).wakeEvents(any(ProcedureEvent[].class)); + MasterProcedureEnv env = mock(MasterProcedureEnv.class); + when(env.getProcedureScheduler()).thenReturn(scheduler); + ProcedureExecutor procExec = mock(ProcedureExecutor.class); + when(procExec.getEnvironment()).thenReturn(env); + + MasterServices services = mock(MasterServices.class); + when(services.getServerManager()).thenReturn(serverManager); + when(services.getMasterFileSystem()).thenReturn(mfs); + when(services.getMasterProcedureExecutor()).thenReturn(procExec); + manager = new SyncReplicationReplayWALManager(services); + assertEquals(1, listeners.size()); + } + + @Test + public void testUsedWorkers() throws ProcedureSuspendedException { + String peerId1 = "1"; + String peerId2 = "2"; + ServerName sn1 = ServerName.valueOf("host1", 123, 12345); + ServerName sn2 = ServerName.valueOf("host2", 234, 23456); + ServerName sn3 = ServerName.valueOf("host3", 345, 34567); + onlineServers.add(sn1); + manager.registerPeer(peerId1); + manager.registerPeer(peerId2); + // confirm that different peer ids does not affect each other + assertEquals(sn1, manager.acquirePeerWorker(peerId1, new NoopProcedure<>())); + assertEquals(sn1, manager.acquirePeerWorker(peerId2, new NoopProcedure<>())); + onlineServers.add(sn2); + assertEquals(sn2, manager.acquirePeerWorker(peerId1, new NoopProcedure<>())); + assertEquals(sn2, manager.acquirePeerWorker(peerId2, new NoopProcedure<>())); + + NoopProcedure proc = new NoopProcedure<>(); + try { + manager.acquirePeerWorker(peerId1, proc); + fail("Should suspend"); + } catch (ProcedureSuspendedException e) { + // expected + } + manager.releasePeerWorker(peerId1, sn1, scheduler); + assertEquals(1, wokenProcedures.size()); + assertSame(proc, wokenProcedures.poll()); + + assertEquals(sn1, manager.acquirePeerWorker(peerId1, new NoopProcedure<>())); + + NoopProcedure proc1 = new NoopProcedure<>(); + NoopProcedure proc2 = new NoopProcedure<>(); + try { + manager.acquirePeerWorker(peerId1, proc1); + fail("Should suspend"); + } catch (ProcedureSuspendedException e) { + // expected + } + try { + manager.acquirePeerWorker(peerId1, proc2); + fail("Should suspend"); + } catch (ProcedureSuspendedException e) { + // expected + } + + listeners.get(0).serverAdded(sn3); + assertEquals(2, wokenProcedures.size()); + assertSame(proc2, wokenProcedures.poll()); + assertSame(proc1, wokenProcedures.poll()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestTransitPeerSyncReplicationStateProcedureRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestTransitPeerSyncReplicationStateProcedureRetry.java new file mode 100644 index 0000000000..1c4a81982a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestTransitPeerSyncReplicationStateProcedureRetry.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; +import java.io.UncheckedIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.SyncReplicationTestBase; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, LargeTests.class }) +public class TestTransitPeerSyncReplicationStateProcedureRetry extends SyncReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestTransitPeerSyncReplicationStateProcedureRetry.class); + + @BeforeClass + public static void setUp() throws Exception { + UTIL2.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); + SyncReplicationTestBase.setUp(); + } + + @Test + public void testRecoveryAndDoubleExecution() throws Exception { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); + write(UTIL1, 0, 100); + Thread.sleep(2000); + // peer is disabled so no data have been replicated + verifyNotReplicatedThroughRegion(UTIL2, 0, 100); + + // transit the A to DA first to avoid too many error logs. + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + HMaster master = UTIL2.getHBaseCluster().getMaster(); + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + // Enable test flags and then queue the procedure. + ProcedureTestingUtility.waitNoProcedureRunning(procExec); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); + Thread t = new Thread() { + + @Override + public void run() { + try { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + }; + t.start(); + UTIL2.waitFor(30000, () -> procExec.getProcedures().stream() + .anyMatch(p -> p instanceof TransitPeerSyncReplicationStateProcedure && !p.isFinished())); + long procId = procExec.getProcedures().stream() + .filter(p -> p instanceof TransitPeerSyncReplicationStateProcedure && !p.isFinished()) + .mapToLong(Procedure::getProcId).min().getAsLong(); + MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); + } +} -- 2.17.1