diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index 2a5f31ef86..deb4c5cf2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -929,7 +929,18 @@ public class AssignmentManager { // shared lock for table all the time. So here we will unset it and when it is actually // executed, it will find that the attach procedure is not itself and quit immediately. if (regionNode.getProcedure() != null) { - regionNode.unsetProcedure(regionNode.getProcedure()); + //DisableTableProcedure or ModifyTableProcedure may depend on other TRSP assign complete. + //It will produce dead lock, because of DisableTableProcedure or ModifyTableProcedure + // hold xlock for table and other TRSP will not be executed. + TransitRegionStateProcedure existsProcedure = regionNode.getProcedure(); + if (TransitRegionStateProcedure.TransitionType.ASSIGN == existsProcedure.getType()) { + try { + regionClosedAbnormally(regionNode); + } catch (IOException e) { + LOG.error("regionClosedAbnormally fail by exception, region: {}", regionNode, e); + } + } + regionNode.unsetProcedure(existsProcedure); } return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo())); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java index df4afaebc1..25cf86e941 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java @@ -336,6 +336,11 @@ public class TransitRegionStateProcedure protected Flow executeFromState(MasterProcedureEnv env, RegionStateTransitionState state) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { RegionStateNode regionNode = getRegionStateNode(env); + if (this != regionNode.getProcedure()) { + LOG.warn("the regionNode: {} not process by current procedure: {}", regionNode, this); + return Flow.NO_MORE_STATE; + } + try { switch (state) { case REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE: @@ -554,6 +559,10 @@ public class TransitRegionStateProcedure } } + public TransitionType getType() { + return this.type; + } + private static TransitRegionStateProcedure setOwner(MasterProcedureEnv env, TransitRegionStateProcedure proc) { proc.setOwner(env.getRequestUser().getShortName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRaceBetweenSCPAndDTPOnPreDisable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRaceBetweenSCPAndDTPOnPreDisable.java new file mode 100644 index 0000000000..70d18bd07b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRaceBetweenSCPAndDTPOnPreDisable.java @@ -0,0 +1,159 @@ +package org.apache.hadoop.hbase.master.assignment; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.MasterObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; + +@Category({ MasterTests.class, LargeTests.class }) +public class TestRaceBetweenSCPAndDTPOnPreDisable { + private static final Logger LOG = LoggerFactory.getLogger(TestRaceBetweenSCPAndDTPOnPreDisable.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRaceBetweenSCPAndDTPOnPreDisable.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName NAME = TableName.valueOf("RacePreDisabling"); + + private static byte[] CF = Bytes.toBytes("cf"); + + private static CountDownLatch ARRIVE_PRE_DISABLE_TABLE; + + private static CountDownLatch RESUME_PRE_DISABLE_TABLE; + + public static class MasterObserverForRaceTest implements MasterCoprocessor, MasterObserver { + @Override + public Optional getMasterObserver() { + return Optional.of(this); + } + + @Override + public void preDisableTableAction(ObserverContext ctx, TableName tableName) throws IOException { + if (ARRIVE_PRE_DISABLE_TABLE != null) { + LOG.info("start arrive pre disable"); + ARRIVE_PRE_DISABLE_TABLE.countDown(); + ARRIVE_PRE_DISABLE_TABLE = null; + try { + LOG.info("start resume pre disable wait"); + RESUME_PRE_DISABLE_TABLE.await(); + LOG.info("start resume pre disable wait end"); + } catch (InterruptedException e) { + } + } + } + + + } + + public static final class HMasterForTest extends HMaster { + + public HMasterForTest(Configuration conf) throws IOException, KeeperException { + super(conf); + } + + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class); + UTIL.getConfiguration().set(MasterCoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + MasterObserverForRaceTest.class.getName()); + UTIL.startMiniCluster(2); + UTIL.createTable(NAME, CF); + UTIL.waitTableAvailable(NAME); + UTIL.getAdmin().balancerSwitch(false, true); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo(); + AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager(); + ServerName sn = am.getRegionStates().getRegionState(region).getServerName(); + LOG.info("ServerName={}, region={}", sn, region); + + ARRIVE_PRE_DISABLE_TABLE = new CountDownLatch(1); + RESUME_PRE_DISABLE_TABLE = new CountDownLatch(1); + + CountDownLatch cdl = ARRIVE_PRE_DISABLE_TABLE; + UTIL.getAdmin().disableTableAsync(NAME); + cdl.await(); + + ProcedureExecutor procExec = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + UTIL.getMiniHBaseCluster().stopRegionServer(sn); + long scpPid = Procedure.NO_PROC_ID; + do { + scpPid = getSCPPID(procExec); + } while (scpPid == Procedure.NO_PROC_ID); + + long pid = Procedure.NO_PROC_ID; + do { + pid = getSCPChildTransitRegionForTableId(procExec, scpPid, NAME); + } while (pid == Procedure.NO_PROC_ID); + + final long transitPid = pid; + RESUME_PRE_DISABLE_TABLE.countDown(); + + UTIL.waitFor(60000, () -> procExec.isFinished(transitPid)); + + long dtpProcId = + procExec.getProcedures().stream().filter(p -> p instanceof DisableTableProcedure) + .map(p -> (DisableTableProcedure) p).findAny().get().getProcId(); + UTIL.waitFor(60000, () -> procExec.isFinished(dtpProcId)); + } + + /** + * @return Returns {@link Procedure#NO_PROC_ID} if no SCP found else actual pid. + */ + private long getSCPPID(ProcedureExecutor e) { + Optional optional = e.getProcedures().stream(). + filter(p -> p instanceof ServerCrashProcedure). + map(p -> (ServerCrashProcedure) p).findAny(); + return optional.map(Procedure::getProcId).orElse(Procedure.NO_PROC_ID); + } + + private long getSCPChildTransitRegionForTableId( + ProcedureExecutor e, long scpID, TableName tableName) { + Optional optional = e.getProcedures().stream(). + filter(p -> p instanceof TransitRegionStateProcedure && + scpID == p.getParentProcId() && tableName.equals(((TransitRegionStateProcedure) p).getTableName())). + map(p -> (TransitRegionStateProcedure) p).findAny(); + + return optional.map(Procedure::getProcId).orElse(Procedure.NO_PROC_ID); + } + +}