diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index b9414cd..b0cb6f4 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -78,6 +78,8 @@ import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; +import com.google.common.annotations.VisibleForTesting; + /** * Distributes the task of log splitting to the available region servers. * Coordination happens via zookeeper. For every log file that has to be split a @@ -210,7 +212,7 @@ public class SplitLogManager extends ZooKeeperListener { this.unassignedTimeout = conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); - // Determine recovery mode + // Determine recovery mode setRecoveryMode(true); LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout + @@ -437,6 +439,11 @@ public class SplitLogManager extends ZooKeeperListener { } } + @VisibleForTesting + ConcurrentMap getTasks() { + return tasks; + } + private int activeTasks(final TaskBatch batch) { int count = 0; for (Task t: tasks.values()) { @@ -1243,7 +1250,7 @@ public class SplitLogManager extends ZooKeeperListener { } return result; } - + /** * This function is to set recovery mode from outstanding split log tasks from before or * current configuration setting @@ -1267,7 +1274,7 @@ public class SplitLogManager extends ZooKeeperListener { boolean hasSplitLogTask = false; boolean hasRecoveringRegions = false; RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN; - RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ? + RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING; // Firstly check if there are outstanding recovering regions @@ -1292,7 +1299,7 @@ public class SplitLogManager extends ZooKeeperListener { previousRecoveryMode = slt.getMode(); if (previousRecoveryMode == RecoveryMode.UNKNOWN) { // created by old code base where we don't set recovery mode in splitlogtask - // we can safely set to LOG_SPLITTING because we're in master initialization code + // we can safely set to LOG_SPLITTING because we're in master initialization code // before SSH is enabled & there is no outstanding recovering regions previousRecoveryMode = RecoveryMode.LOG_SPLITTING; } @@ -1319,7 +1326,7 @@ public class SplitLogManager extends ZooKeeperListener { // splitlogtask hasn't drained yet, keep existing recovery mode return; } - + if (previousRecoveryMode != RecoveryMode.UNKNOWN) { this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig); this.recoveryMode = previousRecoveryMode; @@ -1332,7 +1339,7 @@ public class SplitLogManager extends ZooKeeperListener { public RecoveryMode getRecoveryMode() { return this.recoveryMode; } - + /** * Returns if distributed log replay is turned on or not * @param conf diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 289b630..c2bc91e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -258,6 +258,10 @@ public class TestDistributedLogSplitting { } LOG.info(count + " edits in " + files.length + " recovered edits files."); } + + // check that the log file is moved + assertFalse(fs.exists(logDir)); + assertEquals(NUM_LOG_LINES, count); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index ceb6ada..7c3dee6 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -19,11 +19,8 @@ package org.apache.hadoop.hbase.master; import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_get_data_nonode; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_log_split_batch_success; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued; -import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_result; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted; @@ -40,6 +37,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; @@ -61,8 +59,6 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.regionserver.SplitLogWorker; import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -140,15 +136,15 @@ public class TestSplitLogManager { conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); to = to + 4 * 100; - - this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? + + this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); } @After public void teardown() throws IOException, KeeperException { stopper.stop(""); - slm.stop(); + if (slm != null) slm.stop(); TEST_UTIL.shutdownMiniZKCluster(); } @@ -159,6 +155,7 @@ public class TestSplitLogManager { private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems) throws Exception { Expr e = new Expr() { + @Override public long eval() { return ctr.get(); } @@ -500,6 +497,46 @@ public class TestSplitLogManager { assertFalse(fs.exists(emptyLogDirPath)); } + @Test + public void testLogFilesAreArchived() throws Exception { + LOG.info("testLogFilesAreArchived"); + final SplitLogManager slm = new SplitLogManager(zkw, conf, stopper, master, + DUMMY_MASTER); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + conf.set(HConstants.HBASE_DIR, + TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived").toString()); + Path logDirPath = new Path(fs.getWorkingDirectory(), + UUID.randomUUID().toString()); + fs.mkdirs(logDirPath); + // create an empty log file + String logFile = ServerName.valueOf("foo", 1, 1).toString(); + fs.create(new Path(logDirPath, logFile)).close(); + + // spin up a thread mocking split done. + new Thread() { + @Override + public void run() { + boolean done = false; + while (!done) { + for (Map.Entry entry : slm.getTasks().entrySet()) { + final ServerName worker1 = ServerName.valueOf("worker1,1,1"); + SplitLogTask slt = new SplitLogTask.Done(worker1, RecoveryMode.LOG_SPLITTING); + try { + ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray()); + } catch (KeeperException e) { + LOG.warn(e); + } + done = true; + } + } + }; + }.start(); + + slm.splitLogDistributed(logDirPath); + + assertFalse(fs.exists(logDirPath)); + } + /** * The following test case is aiming to test the situation when distributedLogReplay is turned off * and restart a cluster there should no recovery regions in ZK left. @@ -522,7 +559,7 @@ public class TestSplitLogManager { assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty()); } - + @Test(timeout=60000) public void testGetPreviousRecoveryMode() throws Exception { LOG.info("testGetPreviousRecoveryMode"); @@ -537,10 +574,10 @@ public class TestSplitLogManager { slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER); assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_SPLITTING); - + zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1); slm.setRecoveryMode(false); assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_REPLAY); } - + }