From c4e45dcb59990ccfd70e21716eb5c7c1200ef1cd Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sat, 24 Nov 2018 15:03:27 +0800 Subject: [PATCH] HBASE-21508 Ignore the reportRegionStateTransition call from a dead server --- .../apache/hadoop/hbase/master/HMaster.java | 3 +- .../master/assignment/AssignmentManager.java | 135 +++++++----- .../hbase/master/assignment/RegionStates.java | 16 -- .../hbase/master/assignment/ServerState.java | 5 + .../master/assignment/ServerStateNode.java | 11 +- .../TransitRegionStateProcedure.java | 8 +- .../master/procedure/ProcedureSyncWait.java | 92 ++++---- .../procedure/ServerCrashProcedure.java | 2 +- ...rtRegionStateTransitionFromDeadServer.java | 201 ++++++++++++++++++ 9 files changed, 354 insertions(+), 119 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index e1d374090e..ae04283386 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -3800,8 +3800,7 @@ public class HMaster extends HRegionServer implements MasterServices { if (offload) { final List destServers = this.serverManager.createDestinationServersList(); for (ServerName server : serversAdded) { - final List regionsOnServer = - this.assignmentManager.getRegionStates().getServerRegionInfoSet(server); + final List regionsOnServer = this.assignmentManager.getRegionsOnServer(server); for (RegionInfo hri : regionsOnServer) { ServerName dest = balancer.randomAssignment(hri, destServers); if (dest == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index 37e5f0c93e..3e4084d022 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -319,6 +319,17 @@ public class AssignmentManager implements ServerListener { return regionStates; } + /** + * Returns the regions hosted by the specified server. + */ + public List getRegionsOnServer(ServerName serverName) { + ServerStateNode serverInfo = regionStates.getServerNode(serverName); + if (serverInfo == null) { + return Collections.emptyList(); + } + return serverInfo.getRegionInfoList(); + } + public RegionStateStore getRegionStateStore() { return regionStateStore; } @@ -817,54 +828,70 @@ public class AssignmentManager implements ServerListener { // ============================================================================================ // RS Region Transition Report helpers // ============================================================================================ - // TODO: Move this code in MasterRpcServices and call on specific event? + private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder, + ServerName serverName, List transitionList) throws IOException { + for (RegionStateTransition transition : transitionList) { + switch (transition.getTransitionCode()) { + case OPENED: + case FAILED_OPEN: + case CLOSED: + assert transition.getRegionInfoCount() == 1 : transition; + final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); + updateRegionTransition(serverName, transition.getTransitionCode(), hri, + transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM); + break; + case READY_TO_SPLIT: + case SPLIT: + case SPLIT_REVERTED: + assert transition.getRegionInfoCount() == 3 : transition; + final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); + final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); + final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); + updateRegionSplitTransition(serverName, transition.getTransitionCode(), parent, splitA, + splitB); + break; + case READY_TO_MERGE: + case MERGED: + case MERGE_REVERTED: + assert transition.getRegionInfoCount() == 3 : transition; + final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); + final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); + final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); + updateRegionMergeTransition(serverName, transition.getTransitionCode(), merged, mergeA, + mergeB); + break; + } + } + } + public ReportRegionStateTransitionResponse reportRegionStateTransition( final ReportRegionStateTransitionRequest req) throws PleaseHoldException { - final ReportRegionStateTransitionResponse.Builder builder = + ReportRegionStateTransitionResponse.Builder builder = ReportRegionStateTransitionResponse.newBuilder(); - final ServerName serverName = ProtobufUtil.toServerName(req.getServer()); - try { - for (RegionStateTransition transition: req.getTransitionList()) { - switch (transition.getTransitionCode()) { - case OPENED: - case FAILED_OPEN: - case CLOSED: - assert transition.getRegionInfoCount() == 1 : transition; - final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); - updateRegionTransition(serverName, transition.getTransitionCode(), hri, - transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM); - break; - case READY_TO_SPLIT: - case SPLIT: - case SPLIT_REVERTED: - assert transition.getRegionInfoCount() == 3 : transition; - final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); - final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); - final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); - updateRegionSplitTransition(serverName, transition.getTransitionCode(), - parent, splitA, splitB); - break; - case READY_TO_MERGE: - case MERGED: - case MERGE_REVERTED: - assert transition.getRegionInfoCount() == 3 : transition; - final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); - final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); - final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); - updateRegionMergeTransition(serverName, transition.getTransitionCode(), - merged, mergeA, mergeB); - break; + ServerName serverName = ProtobufUtil.toServerName(req.getServer()); + ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); + synchronized (serverNode) { + // we only accept reportRegionStateTransition if the region server is online, see the comment + // above in submitServerCrash method and HBASE-21508 for more details. + if (serverNode.isInState(ServerState.ONLINE)) { + try { + reportRegionStateTransition(builder, serverName, req.getTransitionList()); + } catch (PleaseHoldException e) { + LOG.trace("Failed transition ", e); + throw e; + } catch (UnsupportedOperationException | IOException e) { + // TODO: at the moment we have a single error message and the RS will abort + // if the master says that one of the region transitions failed. + LOG.warn("Failed transition", e); + builder.setErrorMessage("Failed transition " + e.getMessage()); } + } else { + LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call", + serverName); + builder.setErrorMessage("You are dead"); } - } catch (PleaseHoldException e) { - LOG.trace("Failed transition ", e); - throw e; - } catch (UnsupportedOperationException|IOException e) { - // TODO: at the moment we have a single error message and the RS will abort - // if the master says that one of the region transitions failed. - LOG.warn("Failed transition", e); - builder.setErrorMessage("Failed transition " + e.getMessage()); } + return builder.build(); } @@ -1321,13 +1348,23 @@ public class AssignmentManager implements ServerListener { return 0; } - public long submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) { - boolean carryingMeta = isCarryingMeta(serverName); - ProcedureExecutor procExec = this.master.getMasterProcedureExecutor(); - long pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), - serverName, shouldSplitWal, carryingMeta)); - LOG.debug("Added=" + serverName - + " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta); + public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) { + boolean carryingMeta; + long pid; + ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); + // we hold the lock here for fencing on reportRegionStateTransition. Once we set the server + // state to CRASHED, we will no longer accept the reportRegionStateTransition call from this + // server. This is used to simplify the implementation for TRSP and SCP, where we can make sure + // that, the region list fetched by SCP will not be changed any more. + synchronized (serverNode) { + serverNode.setState(ServerState.CRASHED); + carryingMeta = isCarryingMeta(serverName); + ProcedureExecutor procExec = this.master.getMasterProcedureExecutor(); + pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName, + shouldSplitWal, carryingMeta)); + } + LOG.debug("Added={} to dead servers, submitted shutdown handler to be executed meta={}", + serverName, carryingMeta); return pid; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java index 2b9c0bdccc..7b854095e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java @@ -357,22 +357,6 @@ public class RegionStates { ((hri.isOffline() || hri.isSplit()) && offline); } - /** - * Returns the set of regions hosted by the specified server - * @param serverName the server we are interested in - * @return set of RegionInfo hosted by the specified server - */ - public List getServerRegionInfoSet(final ServerName serverName) { - ServerStateNode serverInfo = getServerNode(serverName); - if (serverInfo == null) { - return Collections.emptyList(); - } - - synchronized (serverInfo) { - return serverInfo.getRegionInfoList(); - } - } - // ============================================================================================ // Split helpers // These methods will only be called in ServerCrashProcedure, and at the end of SCP we will remove diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java index 6925c42307..3efe6e2266 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java @@ -29,6 +29,11 @@ enum ServerState { */ ONLINE, + /** + * Indicate that the server has crashed, i.e., we have already scheduled a SCP for it. + */ + CRASHED, + /** * Only server which carries meta can have this state. We will split wal for meta and then * assign meta first before splitting other wals. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java index 204221427e..0755fd0306 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java @@ -17,9 +17,10 @@ */ package org.apache.hadoop.hbase.master.assignment; -import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; @@ -84,12 +85,8 @@ class ServerStateNode implements Comparable { return regions.size(); } - public ArrayList getRegionInfoList() { - ArrayList hris = new ArrayList(regions.size()); - for (RegionStateNode region : regions) { - hris.add(region.getRegionInfo()); - } - return hris; + public List getRegionInfoList() { + return regions.stream().map(RegionStateNode::getRegionInfo).collect(Collectors.toList()); } public void addRegion(final RegionStateNode regionNode) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java index 90ebf7b1a5..0885a6a9a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java @@ -500,11 +500,9 @@ public class TransitRegionStateProcedure case REGION_STATE_TRANSITION_CONFIRM_CLOSED: case REGION_STATE_TRANSITION_CONFIRM_OPENED: // for these 3 states, the region may still be online on the crashed server - if (serverName.equals(regionNode.getRegionLocation())) { - env.getAssignmentManager().regionClosed(regionNode, false); - if (currentState != RegionStateTransitionState.REGION_STATE_TRANSITION_CLOSE) { - regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); - } + env.getAssignmentManager().regionClosed(regionNode, false); + if (currentState != RegionStateTransitionState.REGION_STATE_TRANSITION_CLOSE) { + regionNode.getProcedureEvent().wake(env.getProcedureScheduler()); } break; default: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java index c8ff9f8045..3ac998abe3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java @@ -61,51 +61,59 @@ public final class ProcedureSyncWait { } private static class ProcedureFuture implements Future { - private final ProcedureExecutor procExec; - private final Procedure proc; + private final ProcedureExecutor procExec; + private final Procedure proc; - private boolean hasResult = false; - private byte[] result = null; + private boolean hasResult = false; + private byte[] result = null; - public ProcedureFuture(ProcedureExecutor procExec, Procedure proc) { - this.procExec = procExec; - this.proc = proc; - } + public ProcedureFuture(ProcedureExecutor procExec, Procedure proc) { + this.procExec = procExec; + this.proc = proc; + } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { return false; } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } - @Override - public boolean isCancelled() { return false; } + @Override + public boolean isCancelled() { + return false; + } - @Override - public boolean isDone() { return hasResult; } + @Override + public boolean isDone() { + return hasResult; + } - @Override - public byte[] get() throws InterruptedException, ExecutionException { - if (hasResult) return result; - try { - return waitForProcedureToComplete(procExec, proc, Long.MAX_VALUE); - } catch (Exception e) { - throw new ExecutionException(e); - } + @Override + public byte[] get() throws InterruptedException, ExecutionException { + if (hasResult) { + return result; } + try { + return waitForProcedureToComplete(procExec, proc, Long.MAX_VALUE); + } catch (Exception e) { + throw new ExecutionException(e); + } + } - @Override - public byte[] get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - if (hasResult) return result; - try { - result = waitForProcedureToComplete(procExec, proc, unit.toMillis(timeout)); - hasResult = true; - return result; - } catch (TimeoutIOException e) { - throw new TimeoutException(e.getMessage()); - } catch (Exception e) { - throw new ExecutionException(e); - } + @Override + public byte[] get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + if (hasResult) return result; + try { + result = waitForProcedureToComplete(procExec, proc, unit.toMillis(timeout)); + hasResult = true; + return result; + } catch (TimeoutIOException e) { + throw new TimeoutException(e.getMessage()); + } catch (Exception e) { + throw new ExecutionException(e); } } + } public static Future submitProcedure(final ProcedureExecutor procExec, final Procedure proc) { @@ -139,7 +147,7 @@ public final class ProcedureSyncWait { public static byte[] waitForProcedureToComplete( final ProcedureExecutor procExec, final Procedure proc, final long timeout) throws IOException { - waitFor(procExec.getEnvironment(), "pid=" + proc.getProcId(), + waitFor(procExec.getEnvironment(), timeout, "pid=" + proc.getProcId(), new ProcedureSyncWait.Predicate() { @Override public Boolean evaluate() throws IOException { @@ -171,9 +179,15 @@ public final class ProcedureSyncWait { public static T waitFor(MasterProcedureEnv env, String purpose, Predicate predicate) throws IOException { - final Configuration conf = env.getMasterConfiguration(); - final long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000); - final long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000); + Configuration conf = env.getMasterConfiguration(); + long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000); + return waitFor(env, waitTime, purpose, predicate); + } + + public static T waitFor(MasterProcedureEnv env, long waitTime, String purpose, + Predicate predicate) throws IOException { + Configuration conf = env.getMasterConfiguration(); + long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000); return waitFor(env, waitTime, waitingTimeForEvents, purpose, predicate); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index 1fcc6eb6b9..048bca8002 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -146,7 +146,7 @@ public class ServerCrashProcedure break; case SERVER_CRASH_GET_REGIONS: this.regionsOnCrashedServer = - services.getAssignmentManager().getRegionStates().getServerRegionInfoSet(serverName); + services.getAssignmentManager().getRegionsOnServer(serverName); // Where to go next? Depends on whether we should split logs at all or // if we should do distributed log splitting. if (!this.shouldSplitWal) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java new file mode 100644 index 0000000000..6c9e5eb34b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportRegionStateTransitionFromDeadServer.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.assignment; + +import static org.junit.Assert.assertNotEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.PleaseHoldException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestReportRegionStateTransitionFromDeadServer { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReportRegionStateTransitionFromDeadServer.class); + + private static final List EXCLUDE_SERVERS = new ArrayList<>(); + + private static CountDownLatch ARRIVE_GET_REGIONS; + private static CountDownLatch RESUME_GET_REGIONS; + private static CountDownLatch ARRIVE_REPORT; + private static CountDownLatch RESUME_REPORT; + + private static final class ServerManagerForTest extends ServerManager { + + public ServerManagerForTest(MasterServices master) { + super(master); + } + + @Override + public List createDestinationServersList() { + return super.createDestinationServersList(EXCLUDE_SERVERS); + } + } + + private static final class AssignmentManagerForTest extends AssignmentManager { + + public AssignmentManagerForTest(MasterServices master) { + super(master); + } + + @Override + public List getRegionsOnServer(ServerName serverName) { + List regions = super.getRegionsOnServer(serverName); + if (ARRIVE_GET_REGIONS != null) { + ARRIVE_GET_REGIONS.countDown(); + try { + RESUME_GET_REGIONS.await(); + } catch (InterruptedException e) { + } + } + return regions; + } + + @Override + public ReportRegionStateTransitionResponse reportRegionStateTransition( + ReportRegionStateTransitionRequest req) throws PleaseHoldException { + if (ARRIVE_REPORT != null && req.getTransitionList().stream() + .allMatch(t -> !ProtobufUtil.toRegionInfo(t.getRegionInfo(0)).isMetaRegion())) { + ARRIVE_REPORT.countDown(); + try { + RESUME_REPORT.await(); + } catch (InterruptedException e) { + } + } + return super.reportRegionStateTransition(req); + } + } + + public static final class HMasterForTest extends HMaster { + + public HMasterForTest(Configuration conf) throws IOException, KeeperException { + super(conf); + } + + @Override + protected AssignmentManager createAssignmentManager(MasterServices master) { + return new AssignmentManagerForTest(master); + } + + @Override + protected ServerManager createServerManager(MasterServices master) throws IOException { + setupClusterConnection(); + return new ServerManagerForTest(master); + } + } + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName NAME = TableName.valueOf("Report"); + + private static byte[] CF = Bytes.toBytes("cf"); + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class); + UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 1000); + UTIL.startMiniCluster(3); + UTIL.getAdmin().balancerSwitch(false, true); + UTIL.createTable(NAME, CF); + UTIL.waitTableAvailable(NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws HBaseIOException, InterruptedException, ExecutionException { + RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo(); + AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager(); + RegionStateNode rsn = am.getRegionStates().getRegionStateNode(region); + + // move from rs0 to rs1, and then kill rs0. Later add rs1 to exclude servers, and at last verify + // that the region should not be on rs1 and rs2 both. + HRegionServer rs0 = UTIL.getMiniHBaseCluster().getRegionServer(rsn.getRegionLocation()); + HRegionServer rs1 = UTIL.getOtherRegionServer(rs0); + HRegionServer rs2 = UTIL.getMiniHBaseCluster().getRegionServerThreads().stream() + .map(t -> t.getRegionServer()).filter(rs -> rs != rs0 && rs != rs1).findAny().get(); + + RESUME_REPORT = new CountDownLatch(1); + ARRIVE_REPORT = new CountDownLatch(1); + Future future = + am.moveAsync(new RegionPlan(region, rs0.getServerName(), rs1.getServerName())); + ARRIVE_REPORT.await(); + + RESUME_GET_REGIONS = new CountDownLatch(1); + ARRIVE_GET_REGIONS = new CountDownLatch(1); + rs0.abort("For testing!"); + + ARRIVE_GET_REGIONS.await(); + RESUME_REPORT.countDown(); + + try { + future.get(15, TimeUnit.SECONDS); + } catch (TimeoutException e) { + // after the fix in HBASE-21508 we will get this exception as the TRSP can not be finished any + // more before SCP interrupts it. It's OK. + } + + EXCLUDE_SERVERS.add(rs1.getServerName()); + RESUME_GET_REGIONS.countDown(); + // wait until there are no running procedures, no SCP and no TRSP + UTIL.waitFor(30000, () -> UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor() + .getActiveProcIds().isEmpty()); + boolean onRS1 = !rs1.getRegions(NAME).isEmpty(); + boolean onRS2 = !rs2.getRegions(NAME).isEmpty(); + assertNotEquals( + "should either be on rs1 or rs2, but onRS1 is " + onRS1 + " and on RS2 is " + onRS2, onRS1, + onRS2); + } +} -- 2.17.1