From a717f2501e8d7831db63f8175f6bb15ad14ae6a0 Mon Sep 17 00:00:00 2001 From: Jingyun Tian Date: Fri, 22 Feb 2019 11:11:43 +0800 Subject: [PATCH] HBASE-21934 SplitWALProcedure get stuck during ITBLL --- .../procedure2/RemoteProcedureDispatcher.java | 21 +++ .../assignment/RegionRemoteProcedureBase.java | 5 +- .../procedure/ServerRemoteProcedure.java | 34 ++++ .../procedure/SplitWALRemoteProcedure.java | 17 +- .../SwitchRpcThrottleRemoteProcedure.java | 3 +- .../replication/RefreshPeerProcedure.java | 4 +- ...ncReplicationReplayWALRemoteProcedure.java | 5 +- .../procedure/TestRemoteServerProcedure.java | 149 ++++++++++++++++++ 8 files changed, 219 insertions(+), 19 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestRemoteServerProcedure.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java index 958b071404..00fc9e228e 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.DelayQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -172,6 +173,16 @@ public abstract class RemoteProcedureDispatcher implements RemoteNode { private Set operations; + private Set dispatchedOperations = new ConcurrentSkipListSet<>(); protected BufferNode(final TRemote key) { super(key, 0); @@ -358,6 +370,7 @@ public abstract class RemoteProcedureDispatcher +public abstract class RegionRemoteProcedureBase extends ServerRemoteProcedure implements TableProcedureInterface, RemoteProcedure { private static final Logger LOG = LoggerFactory.getLogger(RegionRemoteProcedureBase.class); protected RegionInfo region; - private ServerName targetServer; - private boolean dispatched; protected RegionRemoteProcedureBase() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java new file mode 100644 index 0000000000..3308c85a3f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; + +public abstract class ServerRemoteProcedure extends Procedure { + protected ServerName targetServer; + + @Override + protected void completionCleanup(MasterProcedureEnv env) { + if (this instanceof RemoteProcedureDispatcher.RemoteProcedure) { + env.getRemoteDispatcher().removeFinishedOperation(targetServer, + (RemoteProcedureDispatcher.RemoteProcedure) this); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java index fb2dbd7926..d55e12c50c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java @@ -48,12 +48,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; * DoNotRetryIOException. Otherwise it will retry until succeed. */ @InterfaceAudience.Private -public class SplitWALRemoteProcedure extends Procedure +public class SplitWALRemoteProcedure extends ServerRemoteProcedure implements RemoteProcedureDispatcher.RemoteProcedure, ServerProcedureInterface { private static final Logger LOG = LoggerFactory.getLogger(SplitWALRemoteProcedure.class); private String walPath; - private ServerName worker; private ServerName crashedServer; private boolean dispatched; private ProcedureEvent event; @@ -63,7 +62,7 @@ public class SplitWALRemoteProcedure extends Procedure } public SplitWALRemoteProcedure(ServerName worker, ServerName crashedServer, String wal) { - this.worker = worker; + this.targetServer = worker; this.crashedServer = crashedServer; this.walPath = wal; } @@ -78,12 +77,12 @@ public class SplitWALRemoteProcedure extends Procedure dispatched = false; } try { - env.getRemoteDispatcher().addOperationToNode(worker, this); + env.getRemoteDispatcher().addOperationToNode(targetServer, this); } catch (NoNodeDispatchException | NullTargetServerDispatchException | NoServerDispatchException e) { // When send to a wrong target server, it need construct a new SplitWALRemoteProcedure. // Thus return null for this procedure and let SplitWALProcedure to handle this. - LOG.warn("dispatch WAL {} to {} failed, will retry on another server", walPath, worker, e); + LOG.warn("dispatch WAL {} to {} failed, will retry on another server", walPath, targetServer, e); return null; } dispatched = true; @@ -106,7 +105,7 @@ public class SplitWALRemoteProcedure extends Procedure protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { MasterProcedureProtos.SplitWALRemoteData.Builder builder = MasterProcedureProtos.SplitWALRemoteData.newBuilder(); - builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(worker)) + builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(targetServer)) .setCrashedServer(ProtobufUtil.toServerName(crashedServer)); serializer.serialize(builder.build()); } @@ -116,7 +115,7 @@ public class SplitWALRemoteProcedure extends Procedure MasterProcedureProtos.SplitWALRemoteData data = serializer.deserialize(MasterProcedureProtos.SplitWALRemoteData.class); walPath = data.getWalPath(); - worker = ProtobufUtil.toServerName(data.getWorker()); + targetServer = ProtobufUtil.toServerName(data.getWorker()); crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); } @@ -146,7 +145,7 @@ public class SplitWALRemoteProcedure extends Procedure return; } if (error == null) { - LOG.info("split WAL {} on {} succeeded", walPath, worker); + LOG.info("split WAL {} on {} succeeded", walPath, targetServer); try { env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath); } catch (IOException e){ @@ -156,7 +155,7 @@ public class SplitWALRemoteProcedure extends Procedure } else { if (error instanceof DoNotRetryIOException) { LOG.warn("WAL split task of {} send to a wrong server {}, will retry on another server", - walPath, worker, error); + walPath, targetServer, error); success = true; } else { LOG.warn("split WAL {} failed, retry...", walPath, error); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java index 9a56ddc328..f4925b3a49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java @@ -40,11 +40,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.S * The procedure to switch rpc throttle on region server */ @InterfaceAudience.Private -public class SwitchRpcThrottleRemoteProcedure extends Procedure +public class SwitchRpcThrottleRemoteProcedure extends ServerRemoteProcedure implements RemoteProcedure, ServerProcedureInterface { private static final Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleRemoteProcedure.class); - private ServerName targetServer; private boolean rpcThrottleEnabled; public SwitchRpcThrottleRemoteProcedure() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java index 2f43ae9282..ee6bd7784b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; +import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure; import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; @@ -42,7 +43,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData; @InterfaceAudience.Private -public class RefreshPeerProcedure extends Procedure +public class RefreshPeerProcedure extends ServerRemoteProcedure implements PeerProcedureInterface, RemoteProcedure { private static final Logger LOG = LoggerFactory.getLogger(RefreshPeerProcedure.class); @@ -53,7 +54,6 @@ public class RefreshPeerProcedure extends Procedure @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "Will never change after construction") - private ServerName targetServer; private int stage; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java index 8e6d411ec7..6ad279f30c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; +import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure; import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; @@ -47,7 +48,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.S * A remote procedure which is used to send replaying remote wal work to region server. */ @InterfaceAudience.Private -public class SyncReplicationReplayWALRemoteProcedure extends Procedure +public class SyncReplicationReplayWALRemoteProcedure extends ServerRemoteProcedure implements RemoteProcedure, PeerProcedureInterface { private static final Logger LOG = @@ -55,8 +56,6 @@ public class SyncReplicationReplayWALRemoteProcedure extends Procedure wals; private boolean dispatched; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestRemoteServerProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestRemoteServerProcedure.java new file mode 100644 index 0000000000..fa5e53da1c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestRemoteServerProcedure.java @@ -0,0 +1,149 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.NavigableMap; +import java.util.Set; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.MockMasterServices; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; + +@Category({MasterTests.class, MediumTests.class}) +public class TestRemoteServerProcedure { + private static final Logger LOG = LoggerFactory.getLogger(TestRemoteServerProcedure.class); + @Rule + public TestName name = new TestName(); + @Rule + public final ExpectedException exception = ExpectedException.none(); + protected HBaseTestingUtility util; + protected MockRSProcedureDispatcher rsDispatcher; + protected MockMasterServices master; + protected AssignmentManager am; + protected NavigableMap> regionsToRegionServers = + new ConcurrentSkipListMap<>(); + // Simple executor to run some simple tasks. + protected ScheduledExecutorService executor; + + @Before + public void setUp() throws Exception { + util = new HBaseTestingUtility(); + this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build()); + master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers); + rsDispatcher = new MockRSProcedureDispatcher(master); + rsDispatcher.setMockRsExecutor(new NoopRSExecutor()); + master.start(2, rsDispatcher); + am = master.getAssignmentManager(); + master.getServerManager().getOnlineServersList().stream() + .forEach(serverName -> am.getRegionStates().getOrCreateServer(serverName)); + } + + @After + public void tearDown() throws Exception { + master.stop("tearDown"); + this.executor.shutdownNow(); + } + + @Test + public void testSplitWALAndCrashBeforeResponse() throws Exception { + ServerName worker = master.getServerManager().getOnlineServersList().get(0); + ServerName crashedWorker = master.getServerManager().getOnlineServersList().get(1); + ServerRemoteProcedure splitWALRemoteProcedure = + new SplitWALRemoteProcedure(worker, crashedWorker, "test"); + submitProcedure(splitWALRemoteProcedure); + Thread.sleep(5000); + master.getServerManager().expireServer(worker); + // if remoteCallFailed is called for this procedure, this procedure should be finished. + util.waitFor(5000, () -> splitWALRemoteProcedure.isSuccess()); + Assert.assertTrue(splitWALRemoteProcedure.isSuccess()); + } + + private Future submitProcedure(final Procedure proc) { + return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); + } + + protected interface MockRSExecutor { + AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest req) throws IOException; + } + + protected class NoopRSExecutor implements MockRSExecutor { + @Override + public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest req) throws IOException { + return AdminProtos.ExecuteProceduresResponse.getDefaultInstance(); + } + } + + protected class MockRSProcedureDispatcher extends RSProcedureDispatcher { + private MockRSExecutor mockRsExec; + + public MockRSProcedureDispatcher(final MasterServices master) { + super(master); + } + + public void setMockRsExecutor(final MockRSExecutor mockRsExec) { + this.mockRsExec = mockRsExec; + } + + @Override + protected void remoteDispatch(ServerName serverName, + @SuppressWarnings("rawtypes") Set remoteProcedures) { + submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures)); + } + + private class MockRemoteCall extends ExecuteProceduresRemoteCall { + public MockRemoteCall(final ServerName serverName, + @SuppressWarnings("rawtypes") final Set operations) { + super(serverName, operations); + } + + @Override + protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName, + final AdminProtos.ExecuteProceduresRequest request) throws IOException { + return mockRsExec.sendRequest(serverName, request); + } + } + } +} -- 2.17.1