diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index c1f82b9..a454f0e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -724,6 +724,11 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } @VisibleForTesting + public long getCompletedMergeTaskCount() { + return mergePool.getCompletedTaskCount(); + } + + @VisibleForTesting /** * Shutdown the long compaction thread pool. * Should only be used in unit test to prevent long compaction thread pool from stealing job diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDispatchMergingRegionsProcedure.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDispatchMergingRegionsProcedure.java index 601f22f..1f617af 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDispatchMergingRegionsProcedure.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDispatchMergingRegionsProcedure.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hbase.master.procedure; +import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,6 +38,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.D import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -125,20 +128,21 @@ public class TestDispatchMergingRegionsProcedure { regionsToMerge[0] = tableRegions.get(0); regionsToMerge[1] = tableRegions.get(1); + final int initCompletedTaskCount = countOfCompletedMergeTaskCount(); long procId = procExec.submitProcedure(new DispatchMergingRegionsProcedure( procExec.getEnvironment(), tableName, regionsToMerge, true)); ProcedureTestingUtility.waitProcedure(procExec, procId); ProcedureTestingUtility.assertProcNotFailed(procExec, procId); - assertRegionCount(tableName, 2); + assertRegionCount(tableName, 2, 1, initCompletedTaskCount); } /** * This tests two concurrent region merges */ - @Test(timeout=90000) + @Test(timeout=60000) public void testMergeRegionsConcurrently() throws Exception { - final TableName tableName = TableName.valueOf("testMergeTwoRegions"); + final TableName tableName = TableName.valueOf("testMergeRegionsConcurrently"); final ProcedureExecutor procExec = getMasterProcedureExecutor(); List tableRegions = createTable(tableName, 4); @@ -150,6 +154,7 @@ public class TestDispatchMergingRegionsProcedure { regionsToMerge2[0] = tableRegions.get(2); regionsToMerge2[1] = tableRegions.get(3); + final int initCompletedTaskCount = countOfCompletedMergeTaskCount(); long procId1 = procExec.submitProcedure(new DispatchMergingRegionsProcedure( procExec.getEnvironment(), tableName, regionsToMerge1, true)); long procId2 = procExec.submitProcedure(new DispatchMergingRegionsProcedure( @@ -158,10 +163,25 @@ public class TestDispatchMergingRegionsProcedure { ProcedureTestingUtility.waitProcedure(procExec, procId2); ProcedureTestingUtility.assertProcNotFailed(procExec, procId1); ProcedureTestingUtility.assertProcNotFailed(procExec, procId2); - - assertRegionCount(tableName, 2); + assertRegionCount(tableName, 2, 2, initCompletedTaskCount); + } + private void waitForCompletedMergeTask(int expectedTaskCount, int initCompletedTaskCount) throws IOException, InterruptedException { + while (true) { + long currentCompletedTaskCount = countOfCompletedMergeTaskCount() - initCompletedTaskCount; + if (currentCompletedTaskCount == expectedTaskCount) { + return; + } + LOG.info("There are " + (expectedTaskCount - currentCompletedTaskCount) + " merge requests are not completed, wait 100 ms"); + TimeUnit.MILLISECONDS.sleep(100); + } + } + private static int countOfCompletedMergeTaskCount() { + int completedTaskCount = 0; + for (RegionServerThread server : UTIL.getMiniHBaseCluster().getRegionServerThreads()) { + completedTaskCount += server.getRegionServer().getCompactSplitThread().getCompletedMergeTaskCount(); + } + return completedTaskCount; } - @Test(timeout=60000) public void testMergeRegionsTwiceWithSameNonce() throws Exception { final TableName tableName = TableName.valueOf("testMergeRegionsTwiceWithSameNonce"); @@ -173,6 +193,7 @@ public class TestDispatchMergingRegionsProcedure { regionsToMerge[0] = tableRegions.get(0); regionsToMerge[1] = tableRegions.get(1); + final int initCompletedTaskCount = countOfCompletedMergeTaskCount(); long procId1 = procExec.submitProcedure(new DispatchMergingRegionsProcedure( procExec.getEnvironment(), tableName, regionsToMerge, true), nonceGroup, nonce); long procId2 = procExec.submitProcedure(new DispatchMergingRegionsProcedure( @@ -185,7 +206,7 @@ public class TestDispatchMergingRegionsProcedure { ProcedureTestingUtility.waitProcedure(procExec, procId2); ProcedureTestingUtility.assertProcNotFailed(procExec, procId2); - assertRegionCount(tableName, 2); + assertRegionCount(tableName, 2, 1, initCompletedTaskCount); } @Test(timeout=60000) @@ -202,6 +223,7 @@ public class TestDispatchMergingRegionsProcedure { regionsToMerge[0] = tableRegions.get(0); regionsToMerge[1] = tableRegions.get(1); + final int initCompletedTaskCount = countOfCompletedMergeTaskCount(); long procId = procExec.submitProcedure( new DispatchMergingRegionsProcedure( procExec.getEnvironment(), tableName, regionsToMerge, true)); @@ -211,7 +233,7 @@ public class TestDispatchMergingRegionsProcedure { MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId, numberOfSteps); ProcedureTestingUtility.assertProcNotFailed(procExec, procId); - assertRegionCount(tableName, 2); + assertRegionCount(tableName, 2, 1, initCompletedTaskCount); } @Test(timeout = 60000) @@ -248,14 +270,19 @@ public class TestDispatchMergingRegionsProcedure { return assertRegionCount(tableName, nregions); } - public List assertRegionCount(final TableName tableName, final int nregions) - throws Exception { + public List assertRegionCount(final TableName tableName, final int nregions) throws Exception { UTIL.waitUntilNoRegionsInTransition(); List tableRegions = admin.getTableRegions(tableName); assertEquals(nregions, tableRegions.size()); return tableRegions; } + public List assertRegionCount(final TableName tableName, final int nregions, + int expectedTaskCount, int initCompletedTaskCount) throws Exception { + waitForCompletedMergeTask(expectedTaskCount, initCompletedTaskCount); + return assertRegionCount(tableName, nregions); + } + private ProcedureExecutor getMasterProcedureExecutor() { return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(); }