diff --git src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 0ef0e33..5489402 100644 --- src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -485,7 +485,6 @@ public class SplitLogManager extends ZooKeeperListener { new_version, workerName); tot_mgr_heartbeat.incrementAndGet(); } else { - assert false; LOG.warn("got dup heartbeat for " + path + " ver = " + new_version); } return; diff --git src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index b0487f1..edebdaf 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -30,6 +30,11 @@ import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,7 +50,6 @@ import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -59,9 +63,7 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; import org.junit.After; -import org.junit.Before; import org.junit.Test; -import org.junit.Ignore; import org.junit.experimental.categories.Category; @Category(LargeTests.class) @@ -265,7 +267,7 @@ public class TestDistributedLogSplitting { "tot_wkr_preempt_task"); } - @Test + @Test(timeout=25000) public void testDelayedDeleteOnFailure() throws Exception { LOG.info("testDelayedDeleteOnFailure"); startCluster(1); @@ -273,35 +275,52 @@ public class TestDistributedLogSplitting { final FileSystem fs = master.getMasterFileSystem().getFileSystem(); final Path logDir = new Path(FSUtils.getRootDir(conf), "x"); fs.mkdirs(logDir); - final Path corruptedLogFile = new Path(logDir, "x"); - FSDataOutputStream out; - out = fs.create(corruptedLogFile); - out.write(0); - out.write(Bytes.toBytes("corrupted bytes")); - out.close(); - slm.ignoreZKDeleteForTesting = true; - Thread t = new Thread() { - @Override - public void run() { - try { - slm.splitLogDistributed(logDir); - } catch (IOException ioe) { + try { + final Path corruptedLogFile = new Path(logDir, "x"); + FSDataOutputStream out; + out = fs.create(corruptedLogFile); + out.write(0); + out.write(Bytes.toBytes("corrupted bytes")); + out.close(); + slm.ignoreZKDeleteForTesting = true; + ExecutorService executor = Executors.newSingleThreadExecutor(); + Runnable runnable = new Runnable() { + @Override + public void run() { try { - assertTrue(fs.exists(corruptedLogFile)); + // since the logDir is a fake, corrupted one, so the split log worker + // will finish it quickly with error, and this call will fail and throw + // an IOException. slm.splitLogDistributed(logDir); - } catch (IOException e) { - assertTrue(Thread.currentThread().isInterrupted()); - return; + } catch (IOException ioe) { + try { + assertTrue(fs.exists(corruptedLogFile)); + // this call will block waiting for the task to be removed from the + // tasks map, until it is interrupted. + slm.splitLogDistributed(logDir); + } catch (IOException e) { + assertTrue(Thread.currentThread().isInterrupted()); + return; + } + fail("did not get the expected IOException from the 2nd call"); } - fail("did not get the expected IOException from the 2nd call"); + fail("did not get the expected IOException from the 1st call"); } - fail("did not get the expected IOException from the 1st call"); + }; + Future result = executor.submit(runnable); + try { + result.get(3000, TimeUnit.MILLISECONDS); + } catch (TimeoutException te) { + // it is ok, expected. } - }; - t.start(); - waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); - t.interrupt(); - t.join(); + waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); + executor.shutdownNow(); + + // make sure the runnable is finished with no expception thrown. + result.get(); + } finally { + fs.delete(logDir, true); + } } HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, diff --git src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 0974b56..552649b 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -456,7 +456,7 @@ public class TestSplitLogManager { assertFalse(fs.exists(emptyLogDirPath)); } - @Test + @Test(timeout=45000) public void testVanishingTaskZNode() throws Exception { LOG.info("testVanishingTaskZNode"); conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0); @@ -466,27 +466,31 @@ public class TestSplitLogManager { final Path logDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString()); fs.mkdirs(logDir); - Path logFile = new Path(logDir, UUID.randomUUID().toString()); - fs.createNewFile(logFile); - new Thread() { - public void run() { - try { - // this call will block because there are no SplitLogWorkers - slm.splitLogDistributed(logDir); - } catch (Exception e) { - LOG.warn("splitLogDistributed failed", e); - fail(); + try { + Path logFile = new Path(logDir, UUID.randomUUID().toString()); + fs.createNewFile(logFile); + new Thread() { + public void run() { + try { + // this call will block because there are no SplitLogWorkers, + // until the task znode is deleted below. Then the call will + // complete successfully, assuming the log is split. + slm.splitLogDistributed(logDir); + } catch (Exception e) { + LOG.warn("splitLogDistributed failed", e); + } } - } - }.start(); - waitForCounter(tot_mgr_node_create_result, 0, 1, 10000); - String znode = ZKSplitLog.getEncodedNodeName(zkw, logFile.toString()); - // remove the task znode - ZKUtil.deleteNode(zkw, znode); - waitForCounter(tot_mgr_get_data_nonode, 0, 1, 30000); - waitForCounter(tot_mgr_log_split_batch_success, 0, 1, 1000); - assertTrue(fs.exists(logFile)); - fs.delete(logDir, true); + }.start(); + waitForCounter(tot_mgr_node_create_result, 0, 1, 10000); + String znode = ZKSplitLog.getEncodedNodeName(zkw, logFile.toString()); + // remove the task znode, to finish the distributed log splitting + ZKUtil.deleteNode(zkw, znode); + waitForCounter(tot_mgr_get_data_nonode, 0, 1, 30000); + waitForCounter(tot_mgr_log_split_batch_success, 0, 1, 1000); + assertTrue(fs.exists(logFile)); + } finally { + fs.delete(logDir, true); + } } @org.junit.Rule