diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index 9dd71a3..0a498d4 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -58,6 +58,7 @@ public class TestSplitLogWorker { private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems) { + assert ctr.get() == oldval; long curt = System.currentTimeMillis(); long endt = curt + timems; while (curt < endt) { @@ -72,7 +73,8 @@ public class TestSplitLogWorker { return; } } - assertTrue(false); + assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval, + false); } @BeforeClass @@ -85,7 +87,6 @@ public class TestSplitLogWorker { @Before public void setup() throws Exception { - slw = null; TEST_UTIL.startMiniZKCluster(); zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "split-log-worker-tests", null); @@ -102,13 +103,6 @@ public class TestSplitLogWorker { @After public void teardown() throws Exception { - if (slw != null) { - slw.stop(); - slw.worker.join(3000); - if (slw.worker.isAlive()) { - assertTrue("could not stop the worker thread" == null); - } - } TEST_UTIL.shutdownMiniZKCluster(); } @@ -139,12 +133,27 @@ public class TestSplitLogWorker { TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), "rs", - neverEndingTask); + SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), + "rs", neverEndingTask); slw.start(); - waitForCounter(tot_wkr_task_acquired, 0, 1, 100); - assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, + try { + waitForCounter(tot_wkr_task_acquired, 0, 1, 100); + assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tatas")), "rs")); + } finally { + stopSplitLogWorker(slw); + } + } + + private void stopSplitLogWorker(final SplitLogWorker slw) + throws InterruptedException { + if (slw != null) { + slw.stop(); + slw.worker.join(3000); + if (slw.worker.isAlive()) { + assertTrue(("Could not stop the worker thread slw=" + slw) == null); + } + } } @Test @@ -167,67 +176,73 @@ public class TestSplitLogWorker { ZKSplitLog.getEncodedNodeName(zkw, "trft")), "svr1") || TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "trft")), "svr2")); - slw1.stop(); - slw2.stop(); - slw1.worker.join(); - slw2.worker.join(); + stopSplitLogWorker(slw1); + stopSplitLogWorker(slw2); } @Test public void testPreemptTask() throws Exception { LOG.info("testPreemptTask"); - slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), + SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), "tpt_svr", neverEndingTask); slw.start(); - Thread.yield(); // let the worker start - Thread.sleep(100); + try { + Thread.yield(); // let the worker start + Thread.sleep(100); - // this time create a task node after starting the splitLogWorker - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"), + // this time create a task node after starting the splitLogWorker + zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); - assertEquals(1, slw.taskReadySeq); - assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, + waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + assertEquals(1, slw.taskReadySeq); + assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tpt_task")), "tpt_svr")); - ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"), + ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"), TaskState.TASK_UNASSIGNED.get("manager")); - waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); + waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); + } finally { + stopSplitLogWorker(slw); + } } @Test public void testMultipleTasks() throws Exception { LOG.info("testMultipleTasks"); - slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), + SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), "tmt_svr", neverEndingTask); slw.start(); - Thread.yield(); // let the worker start - Thread.sleep(100); + try { + Thread.yield(); // let the worker start + Thread.sleep(100); - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"), + zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); - // now the worker is busy doing the above task + waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + // now the worker is busy doing the above task - // create another task - zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"), + // create another task + zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"), TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - // preempt the first task, have it owned by another worker - ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"), + // preempt the first task, have it owned by another worker + ZKUtil.setData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"), TaskState.TASK_OWNED.get("another-worker")); - waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); + waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); - waitForCounter(tot_wkr_task_acquired, 1, 2, 1000); - assertEquals(2, slw.taskReadySeq); - assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, + waitForCounter(tot_wkr_task_acquired, 1, 2, 1000); + assertEquals(2, slw.taskReadySeq); + assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2")), "tmt_svr")); + } finally { + stopSplitLogWorker(slw); + } } @Test