From ed59c65a7ae7a3612a4ba0d8f338006a4e77c1f3 Mon Sep 17 00:00:00 2001 From: Jingyun Tian Date: Thu, 21 Feb 2019 19:25:06 +0800 Subject: [PATCH] HBASE-21934 SplitWALProcedure get stuck during ITBLL --- .../procedure2/RemoteProcedureDispatcher.java | 20 +++ .../assignment/RegionRemoteProcedureBase.java | 1 + .../procedure/SplitWALRemoteProcedure.java | 1 + .../SwitchRpcThrottleRemoteProcedure.java | 1 + .../replication/RefreshPeerProcedure.java | 1 + ...ncReplicationReplayWALRemoteProcedure.java | 1 + .../master/procedure/TestRemoteProcedure.java | 143 ++++++++++++++++++ 7 files changed, 168 insertions(+) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestRemoteProcedure.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java index 958b071404..11eb38b30f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.DelayQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -172,6 +173,15 @@ public abstract class RemoteProcedureDispatcher implements RemoteNode { private Set operations; + private Set dispatchedOperations = new ConcurrentSkipListSet<>(); protected BufferNode(final TRemote key) { super(key, 0); @@ -358,6 +369,7 @@ public abstract class RemoteProcedureDispatcher throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { if (dispatched) { if (success) { + env.getRemoteDispatcher().removeFinishedOperation(worker, this); return null; } dispatched = false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java index 9a56ddc328..303995858c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java @@ -64,6 +64,7 @@ public class SwitchRpcThrottleRemoteProcedure extends Procedure throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { if (dispatched) { if (succ) { + env.getRemoteDispatcher().removeFinishedOperation(targetServer, this); return null; } // retry diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java index 8e6d411ec7..ae3f9972fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java @@ -152,6 +152,7 @@ public class SyncReplicationReplayWALRemoteProcedure extends Procedure> regionsToRegionServers = + new ConcurrentSkipListMap<>(); + // Simple executor to run some simple tasks. + protected ScheduledExecutorService executor; + + @Before + public void setUp() throws Exception { + util = new HBaseTestingUtility(); + this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build()); + master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers); + rsDispatcher = new MockRSProcedureDispatcher(master); + rsDispatcher.setMockRsExecutor(new NoopRSExecutor()); + master.start(2, rsDispatcher); + am = master.getAssignmentManager(); + master.getServerManager().getOnlineServersList().stream() + .forEach(serverName -> am.getRegionStates().getOrCreateServer(serverName)); + } + + @After + public void tearDown() throws Exception { + master.stop("tearDown"); + this.executor.shutdownNow(); + } + + @Test + public void testSplitWALAndCrashBeforeResponse() throws Exception { + ServerName worker = master.getServerManager().getOnlineServersList().get(0); + ServerName crashedWorker = master.getServerManager().getOnlineServersList().get(1); + SplitWALRemoteProcedure splitWALRemoteProcedure = + new SplitWALRemoteProcedure(worker, crashedWorker, "test"); + submitProcedure(splitWALRemoteProcedure); + Thread.sleep(2000); + master.getServerManager().expireServer(worker); + // if remoteCallFailed is called for this procedure, this procedure should be finished. + util.waitFor(5000, () -> splitWALRemoteProcedure.isSuccess()); + } + + private Future submitProcedure(final Procedure proc) { + return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); + } + + protected interface MockRSExecutor { + AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest req) throws IOException; + } + + protected class NoopRSExecutor implements MockRSExecutor { + @Override + public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest req) throws IOException { + return AdminProtos.ExecuteProceduresResponse.getDefaultInstance(); + } + } + + protected class MockRSProcedureDispatcher extends RSProcedureDispatcher { + private MockRSExecutor mockRsExec; + + public MockRSProcedureDispatcher(final MasterServices master) { + super(master); + } + + public void setMockRsExecutor(final MockRSExecutor mockRsExec) { + this.mockRsExec = mockRsExec; + } + + @Override + protected void remoteDispatch(ServerName serverName, + @SuppressWarnings("rawtypes") Set remoteProcedures) { + submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures)); + } + + private class MockRemoteCall extends ExecuteProceduresRemoteCall { + public MockRemoteCall(final ServerName serverName, + @SuppressWarnings("rawtypes") final Set operations) { + super(serverName, operations); + } + + @Override + protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName, + final AdminProtos.ExecuteProceduresRequest request) throws IOException { + return mockRsExec.sendRequest(serverName, request); + } + } + } +} -- 2.17.1