diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index a52b548..5fb1d62 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -1398,10 +1398,25 @@ public class AssignmentManager implements ServerListener { public void submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) { boolean carryingMeta = isCarryingMeta(serverName); ProcedureExecutor procExec = this.master.getMasterProcedureExecutor(); - procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName, - shouldSplitWal, carryingMeta)); - LOG.debug("Added=" + serverName + - " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta); + List previousSCPs = procExec.getProcedures().stream() + .filter(p -> p instanceof ServerCrashProcedure) + .map(p -> (ServerCrashProcedure) p) + .filter(p -> p.getServerName().equals(serverName) && + (p.isFinished() == false || p.isSuccess() == false)) + .collect(Collectors.toList()); + if (previousSCPs == null || previousSCPs.isEmpty()) { + procExec.submitProcedure( + new ServerCrashProcedure(procExec.getEnvironment(), serverName, + shouldSplitWal, carryingMeta)); + LOG.debug("Added=" + serverName + + " to dead servers, submitted shutdown handler to be executed meta=" + + carryingMeta); + } else { + LOG.debug("Skip to add SCP for " + serverName + + " with meta=" + + carryingMeta + " , since there are SCP(s) executing for it: " + previousSCPs); + } + } public void offlineRegion(final RegionInfo regionInfo) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestServerCrashProcedureAfterMasterRestart.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestServerCrashProcedureAfterMasterRestart.java new file mode 100644 index 0000000..dbec719 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestServerCrashProcedureAfterMasterRestart.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.assignment; + +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@Category({MasterTests.class, MediumTests.class}) +public class TestServerCrashProcedureAfterMasterRestart { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestServerCrashProcedureAfterMasterRestart.class); + + private static final Logger LOG = LoggerFactory + .getLogger(TestServerCrashProcedureAfterMasterRestart.class); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static TableName TABLE_NAME = TableName.valueOf("test"); + private static byte[] CF = Bytes.toBytes("cf"); + private static Admin admin; + private static Table client; + + + @BeforeClass + public static void setupCluster() throws Exception { + UTIL.startMiniCluster(2); + byte[][] splitKeys = new byte[10][]; + for (int i = 0; i < 10; i++) { + splitKeys[i] = Bytes.toBytes(i); + } + UTIL.createTable(TABLE_NAME, CF, splitKeys); + } + + @AfterClass + public static void cleanupTest() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + /** + * Test there is always one ServerCrashProcedure for one RegionServer + * @throws Exception + */ + @Test + public void test() throws Exception { + ServerName rs1 = UTIL.getMiniHBaseCluster().getRegionServer(0) + .getServerName(); + ServerName rs2 = UTIL.getMiniHBaseCluster().getRegionServer(1) + .getServerName(); + List metaRegion = UTIL.getMiniHBaseCluster().getRegionServer(0) + .getRegions(TableName.META_TABLE_NAME); + //move away any meta regions + for (HRegion region : metaRegion) { + UTIL.getAdmin().move(region.getRegionInfo().getEncodedNameAsBytes(), + Bytes.toBytes(rs2.getServerName())); + } + UTIL.getMiniHBaseCluster().killRegionServer(rs1); + LOG.info("Stop rs: " + rs1); + final ProcedureExecutor executor1 = UTIL + .getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + UTIL.waitFor(30000, () -> executor1.getProcedures().stream() + .filter(p -> p instanceof ServerCrashProcedure) + .map(p -> (ServerCrashProcedure) p) + .anyMatch(p -> p.getServerName().equals(rs1))); + List scps = executor1.getProcedures().stream() + .filter(p -> p instanceof ServerCrashProcedure).collect(Collectors.toList()); + long procedureID = scps.get(0).getProcId(); + ServerName master = UTIL.getMiniHBaseCluster().getMaster().getServerName(); + UTIL.getMiniHBaseCluster().killMaster(master); + UTIL.getMiniHBaseCluster().waitForMasterToStop(master, 10000); + LOG.info("Stop master: " + master); + ServerName newMaster = UTIL.getMiniHBaseCluster().startMaster().getMaster() + .getServerName(); + LOG.info("new master started: " + newMaster); + ServerName newRS = UTIL.getMiniHBaseCluster().startRegionServer() + .getRegionServer().getServerName(); + LOG.info("new rs started: " + newRS); + ProcedureExecutor executor = UTIL.getMiniHBaseCluster() + .getMaster().getMasterProcedureExecutor(); + UTIL.waitFor(3000000, () -> executor.isFinished(procedureID)); + int numberofSCPs = executor.getProcedures().stream() + .filter(p -> p instanceof ServerCrashProcedure) + .collect(Collectors.toList()).size(); + Assert.assertEquals("There should be only one SCP", 1, numberofSCPs); + + + } + +}