diff --git src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 16d4f13..58d5371 100644 --- src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -338,7 +338,9 @@ public class SplitLogManager extends ZooKeeperListener { LOG.warn("No more task remaining (ZK or task map), splitting " + "should have completed. Remaining tasks in ZK " + remainingInZK + ", active tasks in map " + actual); - return; + if (remainingInZK == 0 && actual == 0) { + return; + } } batch.wait(100); if (stopper.isStopped()) { diff --git src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 582a802..649c26e 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -65,6 +65,7 @@ public class TestSplitLogManager { private static boolean stopped = false; private SplitLogManager slm; private Configuration conf; + private int to; private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -105,6 +106,11 @@ public class TestSplitLogManager { stopped = false; resetCounters(); + to = 4000; + conf.setInt("hbase.splitlog.manager.timeout", to); + conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); + conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); + to = to + 4 * 100; } @After @@ -194,26 +200,20 @@ public class TestSplitLogManager { TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - int to = 1000; - conf.setInt("hbase.splitlog.manager.timeout", to); - conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); - to = to + 2 * 100; - - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); - waitForCounter(tot_mgr_heartbeat, 0, 1, 100); + waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); assertFalse(task.isUnassigned()); long curt = System.currentTimeMillis(); assertTrue((task.last_update <= curt) && (task.last_update > (curt - 1000))); LOG.info("waiting for manager to resubmit the orphan task"); - waitForCounter(tot_mgr_resubmit, 0, 1, to + 100); + waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); assertTrue(task.isUnassigned()); - waitForCounter(tot_mgr_rescan, 0, 1, to + 100); + waitForCounter(tot_mgr_rescan, 0, 1, to + to/2); } @Test @@ -229,12 +229,12 @@ public class TestSplitLogManager { slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); slm.finishInitialization(); - waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100); + waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); assertTrue(task.isOrphan()); assertTrue(task.isUnassigned()); // wait for RESCAN node to be created - waitForCounter(tot_mgr_rescan, 0, 1, 500); + waitForCounter(tot_mgr_rescan, 0, 1, to/2); Task task2 = slm.findOrCreateOrphanTask(tasknode); assertTrue(task == task2); LOG.debug("task = " + task); @@ -250,11 +250,6 @@ public class TestSplitLogManager { public void testMultipleResubmits() throws Exception { LOG.info("TestMultipleResbmits - no indefinite resubmissions"); - int to = 1000; - conf.setInt("hbase.splitlog.manager.timeout", to); - conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); - to = to + 2 * 100; - conf.setInt("hbase.splitlog.max.resubmit", 2); slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); slm.finishInitialization(); @@ -264,19 +259,19 @@ public class TestSplitLogManager { int version = ZKUtil.checkExists(zkw, tasknode); ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1")); - waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); - waitForCounter(tot_mgr_resubmit, 0, 1, to + 100); + waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); + waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker2")); - waitForCounter(tot_mgr_heartbeat, 1, 2, 1000); - waitForCounter(tot_mgr_resubmit, 1, 2, to + 100); + waitForCounter(tot_mgr_heartbeat, 1, 2, to/2); + waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2); int version2 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version2 > version1); ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker3")); - waitForCounter(tot_mgr_heartbeat, 1, 2, 1000); - waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + 100); - Thread.sleep(to + 100); + waitForCounter(tot_mgr_heartbeat, 1, 2, to/2); + waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2); + Thread.sleep(to + to/2); assertEquals(2L, tot_mgr_resubmit.get()); } @@ -284,8 +279,6 @@ public class TestSplitLogManager { public void testRescanCleanup() throws Exception { LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); - conf.setInt("hbase.splitlog.manager.timeout", 1000); - conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); @@ -294,7 +287,7 @@ public class TestSplitLogManager { int version = ZKUtil.checkExists(zkw, tasknode); ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1")); - waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); + waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); waitForCounter(new Expr() { @Override public long eval() { @@ -308,7 +301,7 @@ public class TestSplitLogManager { assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), taskstate)); - waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000); + waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2); } else { LOG.warn("Could not run test. Lost ZK connection?"); } @@ -330,7 +323,7 @@ public class TestSplitLogManager { batch.wait(); } } - waitForCounter(tot_mgr_task_deleted, 0, 1, 1000); + waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); } @@ -350,7 +343,7 @@ public class TestSplitLogManager { batch.wait(); } } - waitForCounter(tot_mgr_task_deleted, 0, 1, 1000); + waitForCounter(tot_mgr_task_deleted, 0, 1, to/2); assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT); } @@ -366,7 +359,7 @@ public class TestSplitLogManager { ZKUtil.setData(zkw, tasknode, TaskState.TASK_RESIGNED.get("worker")); int version = ZKUtil.checkExists(zkw, tasknode); - waitForCounter(tot_mgr_resubmit, 0, 1, 1000); + waitForCounter(tot_mgr_resubmit, 0, 1, to/2); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); @@ -386,15 +379,9 @@ public class TestSplitLogManager { TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - int to = 4000; - conf.setInt("hbase.splitlog.manager.timeout", to); - conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); - conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); - - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); slm.finishInitialization(); - waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100); + waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); // submit another task which will stay in unassigned mode @@ -430,10 +417,10 @@ public class TestSplitLogManager { int version = ZKUtil.checkExists(zkw, tasknode); ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1")); - waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); + waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); slm.handleDeadWorker("worker1"); - waitForCounter(tot_mgr_resubmit, 0, 1, 3000); - waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, 3000); + waitForCounter(tot_mgr_resubmit, 0, 1, to/2); + waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version);