From d1f6dd8110726b1e9910f8907b34c458ec4ceb76 Mon Sep 17 00:00:00 2001 From: Jingyun Tian Date: Fri, 1 Mar 2019 11:42:26 +0800 Subject: [PATCH] HBASE-21934 RemoteProcedureDispatcher should track the ongoing dispatched calls --- .../procedure2/RemoteProcedureDispatcher.java | 29 +++ .../procedure/ServerRemoteProcedure.java | 132 ++++++++++ .../replication/RefreshPeerProcedure.java | 72 +---- .../procedure/TestServerRemoteProcedure.java | 246 ++++++++++++++++++ 4 files changed, 411 insertions(+), 68 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java index 4c54c57b9e..fe34d80a13 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -172,6 +172,16 @@ public abstract class RemoteProcedureDispatcher implements RemoteNode { private Set operations; + private final Set dispatchedOperations = new HashSet<>(); protected BufferNode(final TRemote key) { super(key, 0); @@ -358,6 +379,8 @@ public abstract class RemoteProcedureDispatcher operation.storeInDispatchedQueue()) + .forEach(operation -> dispatchedOperations.add(operation)); this.operations = null; } } @@ -367,6 +390,12 @@ public abstract class RemoteProcedureDispatcher + implements RemoteProcedureDispatcher.RemoteProcedure { + protected static final Logger LOG = LoggerFactory.getLogger(ServerRemoteProcedure.class); + protected ProcedureEvent event; + protected ServerName targetServer; + protected boolean dispatched; + protected boolean succ; + + protected abstract void complete(MasterProcedureEnv env, Throwable error); + + @Override + protected synchronized Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (succ) { + return null; + } + dispatched = false; + } + try { + env.getRemoteDispatcher().addOperationToNode(targetServer, this); + } catch (FailedRemoteDispatchException frde) { + LOG.warn("Can not send remote operation {} to {}, this operation will " + + "be retried to send to another server", + this.getProcId(), targetServer); + return null; + } + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } + + @Override + protected synchronized void completionCleanup(MasterProcedureEnv env) { + env.getRemoteDispatcher().removeCompletedOperation(targetServer, this); + } + + @Override + public synchronized boolean remoteCallFailed(MasterProcedureEnv env, ServerName serverName, + IOException exception) { + remoteOperationDone(env, exception); + return false; + } + + @Override + public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { + remoteOperationDone(env, null); + } + + @Override + public synchronized void remoteOperationFailed(MasterProcedureEnv env, + RemoteProcedureException error) { + remoteOperationDone(env, error); + } + + synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) { + if (this.isFinished()) { + LOG.info("This procedure {} is already finished, skip the rest processes", this.getProcId()); + return; + } + if (event == null) { + LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", + getProcId()); + return; + } + complete(env, error); + event.wake(env.getProcedureScheduler()); + event = null; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java index 6fe7662bdd..db0b3eacec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java @@ -22,15 +22,10 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; -import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; -import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; -import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; -import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; import org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -42,7 +37,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData; @InterfaceAudience.Private -public class RefreshPeerProcedure extends Procedure +public class RefreshPeerProcedure extends ServerRemoteProcedure implements PeerProcedureInterface, RemoteProcedure { private static final Logger LOG = LoggerFactory.getLogger(RefreshPeerProcedure.class); @@ -51,16 +46,6 @@ public class RefreshPeerProcedure extends Procedure private PeerOperationType type; - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", - justification = "Will never change after construction") - private ServerName targetServer; - - private boolean dispatched; - - private ProcedureEvent event; - - private boolean succ; - public RefreshPeerProcedure() { } @@ -122,12 +107,8 @@ public class RefreshPeerProcedure extends Procedure .setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray()); } - private void complete(MasterProcedureEnv env, Throwable error) { - if (event == null) { - LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", - getProcId()); - return; - } + @Override + protected void complete(MasterProcedureEnv env, Throwable error) { if (error != null) { LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, targetServer, error); this.succ = false; @@ -135,51 +116,6 @@ public class RefreshPeerProcedure extends Procedure LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, targetServer); this.succ = true; } - - event.wake(env.getProcedureScheduler()); - event = null; - } - - @Override - public synchronized boolean remoteCallFailed(MasterProcedureEnv env, ServerName remote, - IOException exception) { - complete(env, exception); - return true; - } - - @Override - public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { - complete(env, null); - } - - @Override - public synchronized void remoteOperationFailed(MasterProcedureEnv env, - RemoteProcedureException error) { - complete(env, error); - } - - @Override - protected synchronized Procedure[] execute(MasterProcedureEnv env) - throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { - if (dispatched) { - if (succ) { - return null; - } - // retry - dispatched = false; - } - try { - env.getRemoteDispatcher().addOperationToNode(targetServer, this); - } catch (FailedRemoteDispatchException frde) { - LOG.info("Can not add remote operation for refreshing peer {} for {} to {}, " + - "this is usually because the server is already dead, " + - "give up and mark the procedure as complete", peerId, type, targetServer, frde); - return null; - } - dispatched = true; - event = new ProcedureEvent<>(this); - event.suspendIfNotReady(this); - throw new ProcedureSuspendedException(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java new file mode 100644 index 0000000000..c0311d3a91 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java @@ -0,0 +1,246 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.NavigableMap; +import java.util.Set; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.MockMasterServices; +import org.apache.hadoop.hbase.master.replication.RefreshPeerProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestServerRemoteProcedure { + private static final Logger LOG = LoggerFactory.getLogger(TestServerRemoteProcedure.class); + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestServerRemoteProcedure.class); + @Rule + public TestName name = new TestName(); + @Rule + public final ExpectedException exception = ExpectedException.none(); + protected HBaseTestingUtility util; + protected MockRSProcedureDispatcher rsDispatcher; + protected MockMasterServices master; + protected AssignmentManager am; + protected NavigableMap> regionsToRegionServers = + new ConcurrentSkipListMap<>(); + // Simple executor to run some simple tasks. + protected ScheduledExecutorService executor; + + @Before + public void setUp() throws Exception { + util = new HBaseTestingUtility(); + this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build()); + master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers); + rsDispatcher = new MockRSProcedureDispatcher(master); + rsDispatcher.setMockRsExecutor(new NoopRSExecutor()); + master.start(2, rsDispatcher); + am = master.getAssignmentManager(); + } + + @After + public void tearDown() throws Exception { + master.stop("tearDown"); + this.executor.shutdownNow(); + } + + @Test + public void testRemoteProcedureAndCrashBeforeResponse() throws Exception { + ServerName worker = master.getServerManager().getOnlineServersList().get(0); + ServerRemoteProcedure refreshPeerProcedure = + new RefreshPeerProcedure("test", PeerProcedureInterface.PeerOperationType.ADD, worker); + Future future = submitProcedure(refreshPeerProcedure); + Thread.sleep(2000); + master.getServerManager().expireServer(worker); + // if remoteCallFailed is called for this procedure, this procedure should be finished. + future.get(5000, TimeUnit.MILLISECONDS); + Assert.assertTrue(refreshPeerProcedure.isSuccess()); + } + + @Test + public void testRemoteCompleteAndFailedAtTheSameTime() throws Exception { + ServerName worker = master.getServerManager().getOnlineServersList().get(0); + ServerRemoteProcedure noopServerRemoteProcedure = new NoopServerRemoteProcedure(worker); + Future future = submitProcedure(noopServerRemoteProcedure); + Thread.sleep(2000); + // complete the process and fail the process at the same time + ExecutorService threadPool = Executors.newFixedThreadPool(2); + threadPool.execute(() -> noopServerRemoteProcedure + .remoteOperationDone(master.getMasterProcedureExecutor().getEnvironment(), null)); + threadPool.execute(() -> noopServerRemoteProcedure.remoteCallFailed( + master.getMasterProcedureExecutor().getEnvironment(), worker, new IOException())); + future.get(2000, TimeUnit.MILLISECONDS); + Assert.assertTrue(noopServerRemoteProcedure.isSuccess()); + } + + private Future submitProcedure(final Procedure proc) { + return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); + } + + private static class NoopServerRemoteProcedure extends ServerRemoteProcedure + implements PeerProcedureInterface { + + public NoopServerRemoteProcedure(ServerName targetServer) { + this.targetServer = targetServer; + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + return; + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + return; + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + return; + } + + @Override + public RemoteProcedureDispatcher.RemoteOperation remoteCallBuild(MasterProcedureEnv env, + ServerName serverName) { + return new RSProcedureDispatcher.ServerOperation(null, 0L, this.getClass(), new byte[0]); + } + + @Override + public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + @Override + public synchronized void remoteOperationFailed(MasterProcedureEnv env, + RemoteProcedureException error) { + complete(env, error); + } + + @Override + public void complete(MasterProcedureEnv env, Throwable error) { + this.succ = true; + return; + } + + @Override + public String getPeerId() { + return "test"; + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.ADD; + } + } + + protected interface MockRSExecutor { + AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest req) throws IOException; + } + + protected static class NoopRSExecutor implements MockRSExecutor { + @Override + public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, + AdminProtos.ExecuteProceduresRequest req) throws IOException { + if (req.getOpenRegionCount() > 0) { + for (AdminProtos.OpenRegionRequest request : req.getOpenRegionList()) { + for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : request.getOpenInfoList()) { + execOpenRegion(server, openReq); + } + } + } + return AdminProtos.ExecuteProceduresResponse.getDefaultInstance(); + } + + protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server, + AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException { + return null; + } + } + + protected static class MockRSProcedureDispatcher extends RSProcedureDispatcher { + private MockRSExecutor mockRsExec; + + public MockRSProcedureDispatcher(final MasterServices master) { + super(master); + } + + public void setMockRsExecutor(final MockRSExecutor mockRsExec) { + this.mockRsExec = mockRsExec; + } + + @Override + protected void remoteDispatch(ServerName serverName, + @SuppressWarnings("rawtypes") Set remoteProcedures) { + submitTask(new MockRSProcedureDispatcher.MockRemoteCall(serverName, remoteProcedures)); + } + + private class MockRemoteCall extends ExecuteProceduresRemoteCall { + public MockRemoteCall(final ServerName serverName, + @SuppressWarnings("rawtypes") final Set operations) { + super(serverName, operations); + } + + @Override + protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName, + final AdminProtos.ExecuteProceduresRequest request) throws IOException { + return mockRsExec.sendRequest(serverName, request); + } + } + } +} -- 2.17.1