From e2584156192e894f98591232b9a9b2c07145441a Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 4 Nov 2018 15:32:16 +0800 Subject: [PATCH] HBASE-21420 Use procedure event to wake up the SyncReplicationReplayWALProcedures which wait for worker --- .../src/main/protobuf/MasterProcedure.proto | 1 + .../replication/ZKReplicationStorageBase.java | 2 +- .../replication/RecoverStandbyProcedure.java | 39 ++-- .../SyncReplicationReplayWALManager.java | 171 ++++++++++++------ .../SyncReplicationReplayWALProcedure.java | 62 +++---- ...SyncReplicationReplayWALWorkerStorage.java | 108 ----------- ...eerSyncReplicationStateProcedureRetry.java | 94 ++++++++++ 7 files changed, 261 insertions(+), 216 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestTransitPeerSyncReplicationStateProcedureRetry.java diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 07d1a55af5..7a1c1d3062 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -507,6 +507,7 @@ message RecoverStandbyStateData { message SyncReplicationReplayWALStateData { required string peer_id = 1; repeated string wal = 2; + optional ServerName worker = 3; } message SyncReplicationReplayWALRemoteStateData { diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java index d6e692aef3..596167f9ab 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; * zookeeper. */ @InterfaceAudience.Private -public class ZKReplicationStorageBase { +public abstract class ZKReplicationStorageBase { public static final String REPLICATION_ZNODE = "zookeeper.znode.replication"; public static final String REPLICATION_ZNODE_DEFAULT = "replication"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java index a1e2400fc9..8ec6bdd13a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java @@ -20,13 +20,11 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; import java.util.Arrays; import java.util.List; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +51,7 @@ public class RecoverStandbyProcedure extends AbstractPeerNoLockProcedure wals = syncReplicationReplayWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId); - addChildProcedure(wals.stream().map(wal -> new SyncReplicationReplayWALProcedure(peerId, + addChildProcedure(wals.stream() + .map(wal -> new SyncReplicationReplayWALProcedure(peerId, Arrays.asList(syncReplicationReplayWALManager.removeWALRootPath(wal)))) - .toArray(SyncReplicationReplayWALProcedure[]::new)); + .toArray(SyncReplicationReplayWALProcedure[]::new)); } catch (IOException e) { LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e); throw new ProcedureYieldException(); @@ -147,4 +135,19 @@ public class RecoverStandbyProcedure extends AbstractPeerNoLockProcedure + * The {@link ProcedureEvent} is used for notifying procedures that there are available workers + * now. We used to use sleeping and retrying before, but if the interval is too large, for + * example, exponential backoff, then it is not effective, but if the interval is too small, then + * we will flood the procedure wal. + *

+ * The states are only kept in memory, so when restarting, we need to reconstruct these + * information, using the information stored in related procedures. See the {@code afterReplay} + * method in {@link RecoverStandbyProcedure} and {@link SyncReplicationReplayWALProcedure} for + * more details. + */ + private static final class UsedReplayWorkersForPeer { + + private final Set usedWorkers = new HashSet(); - private final Map> workers = new HashMap<>(); + private final ProcedureEvent event; - private final Object workerLock = new Object(); + public UsedReplayWorkersForPeer(String peerId) { + this.event = new ProcedureEvent<>(peerId); + } + + public void used(ServerName worker) { + usedWorkers.add(worker); + } + + public Optional acquire(ServerManager serverManager) { + Optional worker = serverManager.getOnlineServers().keySet().stream() + .filter(server -> !usedWorkers.contains(server)).findAny(); + worker.ifPresent(usedWorkers::add); + return worker; + } + + public void release(ServerName worker) { + usedWorkers.remove(worker); + } + + public void suspend(Procedure proc) { + event.suspend(); + event.suspendIfNotReady(proc); + } + + public void wake(MasterProcedureScheduler scheduler) { + if (!event.isReady()) { + event.wake(scheduler); + } + } + } + + private final ConcurrentMap usedWorkersByPeer = + new ConcurrentHashMap<>(); public SyncReplicationReplayWALManager(MasterServices services) throws IOException, ReplicationException { - this.services = services; + this.serverManager = services.getServerManager(); this.fs = services.getMasterFileSystem().getWALFileSystem(); this.walRootDir = services.getMasterFileSystem().getWALRootDir(); this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); - this.workerStorage = new ZKSyncReplicationReplayWALWorkerStorage(services.getZooKeeper(), - services.getConfiguration()); - checkReplayingWALDir(); - } - - private void checkReplayingWALDir() throws IOException, ReplicationException { - FileStatus[] files = fs.listStatus(remoteWALDir); - for (FileStatus file : files) { - String name = file.getPath().getName(); - if (name.endsWith(REMOTE_WAL_REPLAY_SUFFIX)) { - String peerId = name.substring(0, name.length() - REMOTE_WAL_REPLAY_SUFFIX.length()); - workers.put(peerId, workerStorage.getPeerWorkers(peerId)); + MasterProcedureScheduler scheduler = + services.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler(); + serverManager.registerListener(new ServerListener() { + + @Override + public void serverAdded(ServerName serverName) { + for (UsedReplayWorkersForPeer usedWorkers : usedWorkersByPeer.values()) { + synchronized (usedWorkers) { + usedWorkers.wake(scheduler); + } + } } - } + }); } - public void registerPeer(String peerId) throws ReplicationException { - workers.put(peerId, new HashSet<>()); - workerStorage.addPeer(peerId); + public void registerPeer(String peerId) { + usedWorkersByPeer.put(peerId, new UsedReplayWorkersForPeer(peerId)); } - public void unregisterPeer(String peerId) throws ReplicationException { - workers.remove(peerId); - workerStorage.removePeer(peerId); + public void unregisterPeer(String peerId) { + usedWorkersByPeer.remove(peerId); } - public ServerName getPeerWorker(String peerId) throws ReplicationException { - Optional worker = Optional.empty(); - ServerName workerServer = null; - synchronized (workerLock) { - worker = services.getServerManager().getOnlineServers().keySet().stream() - .filter(server -> !workers.get(peerId).contains(server)).findFirst(); + /** + * Get a worker for replaying remote wal for a give peer. If no worker available, i.e, all the + * region servers have been used by others, a {@link ProcedureSuspendedException} will be thrown + * to suspend the procedure. And it will be woken up later when there are available workers, + * either by others release a worker, or there is a new region server joins the cluster. + */ + public ServerName acquirePeerWorker(String peerId, Procedure proc) + throws ProcedureSuspendedException { + UsedReplayWorkersForPeer usedWorkers = usedWorkersByPeer.get(peerId); + synchronized (usedWorkers) { + Optional worker = usedWorkers.acquire(serverManager); if (worker.isPresent()) { - workerServer = worker.get(); - workers.get(peerId).add(workerServer); + return worker.get(); } + // no worker available right now, suspend the procedure + usedWorkers.suspend(proc); } - if (workerServer != null) { - workerStorage.addPeerWorker(peerId, workerServer); - } - return workerServer; + throw new ProcedureSuspendedException(); } - public void removePeerWorker(String peerId, ServerName worker) throws ReplicationException { - synchronized (workerLock) { - workers.get(peerId).remove(worker); + public void releasePeerWorker(String peerId, ServerName worker, + MasterProcedureScheduler scheduler) { + UsedReplayWorkersForPeer usedWorkers = usedWorkersByPeer.get(peerId); + synchronized (usedWorkers) { + usedWorkers.release(worker); + usedWorkers.wake(scheduler); } - workerStorage.removePeerWorker(peerId, worker); } + + /** + * Will only be called when loading procedures, where we need to construct the used worker set for + * each peer. + */ + public void addUsedPeerWorker(String peerId, ServerName worker) { + usedWorkersByPeer.get(peerId).used(worker); + } + public void createPeerRemoteWALDir(String peerId) throws IOException { Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId); if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) { @@ -132,23 +199,23 @@ public class SyncReplicationReplayWALManager { deleteDir(dst, peerId); if (!fs.rename(src, dst)) { throw new IOException( - "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId); + "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId); } LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId); } else if (!fs.exists(dst)) { throw new IOException( - "Want to rename from " + src + " to " + dst + ", but they both do not exist"); + "Want to rename from " + src + " to " + dst + ", but they both do not exist"); } } public void renameToPeerReplayWALDir(String peerId) throws IOException { rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId), - peerId); + peerId); } public void renameToPeerSnapshotWALDir(String peerId) throws IOException { rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId), - peerId); + peerId); } public List getReplayWALsAndCleanUpUnusedFiles(String peerId) throws IOException { @@ -158,7 +225,7 @@ public class SyncReplicationReplayWALManager { Path src = status.getPath(); String srcName = src.getName(); String dstName = - srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length()); + srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length()); FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName)); } List wals = new ArrayList<>(); @@ -179,7 +246,7 @@ public class SyncReplicationReplayWALManager { Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId); if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) { throw new IOException( - "Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId); + "Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java index 653b766926..bf643932e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java @@ -18,19 +18,17 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; -import java.util.ArrayList; import java.util.List; - import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALStateData; @@ -60,21 +58,7 @@ public class SyncReplicationReplayWALProcedure env.getMasterServices().getSyncReplicationReplayWALManager(); switch (state) { case ASSIGN_WORKER: - try { - worker = syncReplicationReplayWALManager.getPeerWorker(peerId); - } catch (ReplicationException e) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts); - LOG.warn("Failed to get worker to replay wals {} for peer id={}, sleep {} secs and retry", - wals, peerId, backoff / 1000, e); - throw suspend(backoff); - } - if (worker == null) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts); - LOG.info("No worker to replay wals {} for peer id={}, sleep {} secs and retry", wals, - peerId, backoff / 1000); - throw suspend(backoff); - } - attempts = 0; + worker = syncReplicationReplayWALManager.acquirePeerWorker(peerId, this); setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER); return Flow.HAS_MORE_STATE; case DISPATCH_WALS_TO_WORKER: @@ -88,19 +72,11 @@ public class SyncReplicationReplayWALProcedure } catch (IOException e) { long backoff = ProcedureUtil.getBackoffTimeMs(attempts); LOG.warn("Failed to check whether replay wals {} finished for peer id={}" + - ", sleep {} secs and retry", - wals, peerId, backoff / 1000, e); - throw suspend(backoff); - } - try { - syncReplicationReplayWALManager.removePeerWorker(peerId, worker); - } catch (ReplicationException e) { - long backoff = ProcedureUtil.getBackoffTimeMs(attempts); - LOG.warn("Failed to remove worker {} for peer id={}, sleep {} secs and retry", worker, - peerId, backoff / 1000, e); + ", sleep {} secs and retry", wals, peerId, backoff / 1000, e); throw suspend(backoff); } - attempts = 0; + syncReplicationReplayWALManager.releasePeerWorker(peerId, worker, + env.getProcedureScheduler()); if (!finished) { LOG.warn("Failed to replay wals {} for peer id={}, retry", wals, peerId); setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER); @@ -139,26 +115,38 @@ public class SyncReplicationReplayWALProcedure } @Override - protected void serializeStateData(ProcedureStateSerializer serializer) - throws IOException { + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); SyncReplicationReplayWALStateData.Builder builder = - SyncReplicationReplayWALStateData.newBuilder(); - builder.setPeerId(peerId); - wals.stream().forEach(builder::addWal); + SyncReplicationReplayWALStateData.newBuilder().setPeerId(peerId).addAllWal(wals); + if (worker != null) { + builder.setWorker(ProtobufUtil.toServerName(worker)); + } serializer.serialize(builder.build()); } @Override protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); SyncReplicationReplayWALStateData data = - serializer.deserialize(SyncReplicationReplayWALStateData.class); + serializer.deserialize(SyncReplicationReplayWALStateData.class); peerId = data.getPeerId(); - wals = new ArrayList<>(); - data.getWalList().forEach(wals::add); + wals = data.getWalList(); + if (data.hasWorker()) { + worker = ProtobufUtil.toServerName(data.getWorker()); + } } @Override public PeerOperationType getPeerOperationType() { return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL; } + + @Override + protected void afterReplay(MasterProcedureEnv env) { + if (worker != null) { + env.getMasterServices().getSyncReplicationReplayWALManager().addUsedPeerWorker(peerId, + worker); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java deleted file mode 100644 index 5991cf048e..0000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.replication; - -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; - -@InterfaceAudience.Private -public class ZKSyncReplicationReplayWALWorkerStorage extends ZKReplicationStorageBase { - - public static final String WORKERS_ZNODE = "zookeeper.znode.sync.replication.replaywal.workers"; - - public static final String WORKERS_ZNODE_DEFAULT = "replaywal-workers"; - - /** - * The name of the znode that contains a list of workers to replay wal. - */ - private final String workersZNode; - - public ZKSyncReplicationReplayWALWorkerStorage(ZKWatcher zookeeper, Configuration conf) { - super(zookeeper, conf); - String workersZNodeName = conf.get(WORKERS_ZNODE, WORKERS_ZNODE_DEFAULT); - workersZNode = ZNodePaths.joinZNode(replicationZNode, workersZNodeName); - } - - private String getPeerNode(String peerId) { - return ZNodePaths.joinZNode(workersZNode, peerId); - } - - public void addPeer(String peerId) throws ReplicationException { - try { - ZKUtil.createWithParents(zookeeper, getPeerNode(peerId)); - } catch (KeeperException e) { - throw new ReplicationException( - "Failed to add peer id=" + peerId + " to replaywal-workers storage", e); - } - } - - public void removePeer(String peerId) throws ReplicationException { - try { - ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId)); - } catch (KeeperException e) { - throw new ReplicationException( - "Failed to remove peer id=" + peerId + " to replaywal-workers storage", e); - } - } - - private String getPeerWorkerNode(String peerId, ServerName worker) { - return ZNodePaths.joinZNode(getPeerNode(peerId), worker.getServerName()); - } - - public void addPeerWorker(String peerId, ServerName worker) throws ReplicationException { - try { - ZKUtil.createWithParents(zookeeper, getPeerWorkerNode(peerId, worker)); - } catch (KeeperException e) { - throw new ReplicationException("Failed to add worker=" + worker + " for peer id=" + peerId, - e); - } - } - - public void removePeerWorker(String peerId, ServerName worker) throws ReplicationException { - try { - ZKUtil.deleteNode(zookeeper, getPeerWorkerNode(peerId, worker)); - } catch (KeeperException e) { - throw new ReplicationException("Failed to remove worker=" + worker + " for peer id=" + peerId, - e); - } - } - - public Set getPeerWorkers(String peerId) throws ReplicationException { - try { - List children = ZKUtil.listChildrenNoWatch(zookeeper, getPeerNode(peerId)); - if (children == null) { - return new HashSet<>(); - } - return children.stream().map(ServerName::valueOf).collect(Collectors.toSet()); - } catch (KeeperException e) { - throw new ReplicationException("Failed to list workers for peer id=" + peerId, e); - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestTransitPeerSyncReplicationStateProcedureRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestTransitPeerSyncReplicationStateProcedureRetry.java new file mode 100644 index 0000000000..1c4a81982a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestTransitPeerSyncReplicationStateProcedureRetry.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; +import java.io.UncheckedIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.SyncReplicationTestBase; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, LargeTests.class }) +public class TestTransitPeerSyncReplicationStateProcedureRetry extends SyncReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestTransitPeerSyncReplicationStateProcedureRetry.class); + + @BeforeClass + public static void setUp() throws Exception { + UTIL2.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); + SyncReplicationTestBase.setUp(); + } + + @Test + public void testRecoveryAndDoubleExecution() throws Exception { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); + write(UTIL1, 0, 100); + Thread.sleep(2000); + // peer is disabled so no data have been replicated + verifyNotReplicatedThroughRegion(UTIL2, 0, 100); + + // transit the A to DA first to avoid too many error logs. + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + HMaster master = UTIL2.getHBaseCluster().getMaster(); + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + // Enable test flags and then queue the procedure. + ProcedureTestingUtility.waitNoProcedureRunning(procExec); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); + Thread t = new Thread() { + + @Override + public void run() { + try { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + }; + t.start(); + UTIL2.waitFor(30000, () -> procExec.getProcedures().stream() + .anyMatch(p -> p instanceof TransitPeerSyncReplicationStateProcedure && !p.isFinished())); + long procId = procExec.getProcedures().stream() + .filter(p -> p instanceof TransitPeerSyncReplicationStateProcedure && !p.isFinished()) + .mapToLong(Procedure::getProcId).min().getAsLong(); + MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); + } +} -- 2.17.1