diff --git a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 161a731..31981e7 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -111,7 +111,7 @@ public class SplitLogManager extends ZooKeeperListener { private long resubmit_threshold; private long timeout; private long unassignedTimeout; - private long lastNodeCreateTime = Long.MAX_VALUE; + private long lastTaskCreateTime = Long.MAX_VALUE; public boolean ignoreZKDeleteForTesting = false; private final ConcurrentMap tasks = @@ -229,7 +229,7 @@ public class SplitLogManager extends ZooKeeperListener { * @throws IOException * if there was an error while splitting any log file * @return cumulative size of the logfiles split - * @throws IOException + * @throws IOException */ public long splitLogDistributed(final Path logDir) throws IOException { List logDirs = new ArrayList(); @@ -307,7 +307,7 @@ public class SplitLogManager extends ZooKeeperListener { } catch (IOException ioe) { FileStatus[] files = fs.listStatus(logDir); if (files != null && files.length > 0) { - LOG.warn("returning success without actually splitting and " + + LOG.warn("returning success without actually splitting and " + "deleting all the log files in path " + logDir); } else { LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); @@ -325,7 +325,7 @@ public class SplitLogManager extends ZooKeeperListener { /** * Add a task entry to splitlog znode if it is not already there. - * + * * @param taskname the path of the log to be split * @param batch the batch this task belongs to * @return true if a new entry is created, false if it is already there. @@ -333,6 +333,7 @@ public class SplitLogManager extends ZooKeeperListener { boolean enqueueSplitTask(String taskname, TaskBatch batch) { tot_mgr_log_split_start.incrementAndGet(); String path = ZKSplitLog.getEncodedNodeName(watcher, taskname); + lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis(); Task oldtask = createTaskIfAbsent(path, batch); if (oldtask == null) { // publish the task in zk @@ -461,7 +462,6 @@ public class SplitLogManager extends ZooKeeperListener { } private void createNodeSuccess(String path) { - lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis(); LOG.debug("put up splitlog task at znode " + path); getDataSetWatch(path, zkretries); } @@ -476,7 +476,7 @@ public class SplitLogManager extends ZooKeeperListener { private void getDataSetWatch(String path, Long retry_count) { this.watcher.getRecoverableZooKeeper().getZooKeeper(). getData(path, this.watcher, - new GetDataAsyncCallback(), retry_count); + new GetDataAsyncCallback(true), retry_count); tot_mgr_get_data_queued.incrementAndGet(); } @@ -484,7 +484,7 @@ public class SplitLogManager extends ZooKeeperListener { // A negative retry count will lead to ignoring all error processing. this.watcher.getRecoverableZooKeeper().getZooKeeper(). getData(path, this.watcher, - new GetDataAsyncCallback(), new Long(-1) /* retry count */); + new GetDataAsyncCallback(false), new Long(-1) /* retry count */); tot_mgr_get_data_queued.incrementAndGet(); } @@ -726,7 +726,7 @@ public class SplitLogManager extends ZooKeeperListener { /** * signal the workers that a task was resubmitted by creating the * RESCAN node. - * @throws KeeperException + * @throws KeeperException */ private void createRescanNode(long retries) { // The RESCAN node will be deleted almost immediately by the @@ -736,6 +736,7 @@ public class SplitLogManager extends ZooKeeperListener { // might miss the watch-trigger that creation of RESCAN node provides. // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks // therefore this behavior is safe. + lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis(); this.watcher.getRecoverableZooKeeper().getZooKeeper(). create(ZKSplitLog.getRescanNode(watcher), TaskState.TASK_DONE.get(serverName), Ids.OPEN_ACL_UNSAFE, @@ -744,7 +745,6 @@ public class SplitLogManager extends ZooKeeperListener { } private void createRescanSuccess(String path) { - lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis(); tot_mgr_rescan.incrementAndGet(); getDataSetWatch(path, zkretries); } @@ -1047,7 +1047,7 @@ public class SplitLogManager extends ZooKeeperListener { // master should spawn both a manager and a worker thread to guarantee // that there is always one worker in the system if (tot > 0 && !found_assigned_task && - ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) > + ((EnvironmentEdgeManager.currentTimeMillis() - lastTaskCreateTime) > unassignedTimeout)) { for (Map.Entry e : tasks.entrySet()) { String path = e.getKey(); @@ -1126,6 +1126,17 @@ public class SplitLogManager extends ZooKeeperListener { */ class GetDataAsyncCallback implements AsyncCallback.DataCallback { private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class); + private boolean completeTaskOnNoNode; + + /** + * @param completeTaskOnNoNode Complete the task if the znode cannot be found. + * Since in-memory task creation and znode creation are not atomic, there might be + * a race where there is a task in memory but the znode is not created yet (TimeoutMonitor). + * In this case completeTaskOnNoNode should be set to false. See HBASE-11217. + */ + public GetDataAsyncCallback(boolean completeTaskOnNoNode) { + this.completeTaskOnNoNode = completeTaskOnNoNode; + } @Override public void processResult(int rc, String path, Object ctx, byte[] data, @@ -1137,11 +1148,13 @@ public class SplitLogManager extends ZooKeeperListener { } if (rc == KeeperException.Code.NONODE.intValue()) { tot_mgr_get_data_nonode.incrementAndGet(); - // The task znode has been deleted. Must be some pending delete - // that deleted the task. Assume success because a task-znode is - // is only deleted after TaskFinisher is successful. LOG.warn("task znode " + path + " vanished."); - getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); + if (completeTaskOnNoNode) { + // The task znode has been deleted. Must be some pending delete + // that deleted the task. Assume success because a task-znode is + // is only deleted after TaskFinisher is successful. + getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); + } return; } Long retry_count = (Long) ctx; @@ -1284,7 +1297,7 @@ public class SplitLogManager extends ZooKeeperListener { TerminationStatus(String msg) { statusMsg = msg; } - + @Override public String toString() { return statusMsg; diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index ff87e3c..a60bc95 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -439,51 +439,6 @@ public class TestSplitLogManager { assertFalse(fs.exists(emptyLogDirPath)); } - @Test(timeout=45000) - public void testVanishingTaskZNode() throws Exception { - LOG.info("testVanishingTaskZNode"); - conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0); - conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 1000); - slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); - slm.finishInitialization(); - FileSystem fs = TEST_UTIL.getTestFileSystem(); - final Path logDir = new Path(fs.getWorkingDirectory(), - UUID.randomUUID().toString()); - fs.mkdirs(logDir); - Thread thread = null; - try { - Path logFile = new Path(logDir, UUID.randomUUID().toString()); - fs.createNewFile(logFile); - thread = new Thread() { - public void run() { - try { - // this call will block because there are no SplitLogWorkers, - // until the task znode is deleted below. Then the call will - // complete successfully, assuming the log is split. - slm.splitLogDistributed(logDir); - } catch (Exception e) { - LOG.warn("splitLogDistributed failed", e); - } - } - }; - thread.start(); - waitForCounter(tot_mgr_node_create_result, 0, 1, 10000); - String znode = ZKSplitLog.getEncodedNodeName(zkw, logFile.toString()); - // remove the task znode, to finish the distributed log splitting - ZKUtil.deleteNode(zkw, znode); - waitForCounter(tot_mgr_get_data_nonode, 0, 1, 30000); - waitForCounter(tot_mgr_log_split_batch_success, 0, 1, to/2); - assertTrue(fs.exists(logFile)); - } finally { - if (thread != null) { - // interrupt the thread in case the test fails in the middle. - // it has no effect if the thread is already terminated. - thread.interrupt(); - } - fs.delete(logDir, true); - } - } - @Test public void testWorkerCrash() throws Exception { conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT);