From b6f19fec641368825f07fc4901921955256a10be Mon Sep 17 00:00:00 2001 From: khemani Date: Mon, 2 Jan 2012 17:11:13 -0800 Subject: [PATCH] HBASE-5081 [jira] Distributed log splitting deleteNode races againsth splitLog retry Summary: HBASE-5081 Distributed log splitting deleteNode races againsth splitLog retry Recently, during 0.92 rc testing, we found distributed log splitting hangs there forever. Please see attached screen shot. I looked into it and here is what happened I think: 1. One rs died, the servershutdownhandler found it out and started the distributed log splitting; 2. All three tasks failed, so the three tasks were deleted, asynchronously; 3. Servershutdownhandler retried the log splitting; 4. During the retrial, it created these three tasks again, and put them in a hashmap (tasks); 5. The asynchronously deletion in step 2 finally happened for one task, in the callback, it removed one task in the hashmap; 6. One of the newly submitted tasks' zookeeper watcher found out that task is unassigned, and it is not in the hashmap, so it created a new orphan task. 7. All three tasks failed, but that task created in step 6 is an orphan so the batch.err counter was one short, so the log splitting hangs there and keeps waiting for the last task to finish which is never going to happen. So I think the problem is step 2. The fix is to make deletion sync, instead of async, so that the retry will have a clean start. Async deleteNode will mess up with split log retrial. In extreme situation, if async deleteNode doesn't happen soon enough, some node created during the retrial could be deleted. deleteNode should be sync. Test Plan: EMPTY Reviewers: JIRA --- .../hadoop/hbase/master/SplitLogManager.java | 299 ++++++++++---------- .../hbase/regionserver/wal/HLogSplitter.java | 45 ++- .../apache/hadoop/hbase/zookeeper/ZKSplitLog.java | 1 + .../hbase/master/TestDistributedLogSplitting.java | 45 --- .../hadoop/hbase/master/TestSplitLogManager.java | 48 +++ 5 files changed, 222 insertions(+), 216 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 667a8b1..7f802a8 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -116,14 +116,11 @@ public class SplitLogManager extends ZooKeeperListener { private Object deadWorkersLock = new Object(); /** - * Its OK to construct this object even when region-servers are not online. It - * does lookup the orphan tasks in zk but it doesn't block for them to be - * done. - * - * @param zkw - * @param conf - * @param stopper - * @param serverName + * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration, + * Stoppable, String, TaskFinisher)} that provides a task finisher for + * copying recovered edits to their final destination. The task finisher + * has to be robust because it can be arbitrarily restarted or called + * multiple times. */ public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, Stoppable stopper, String serverName) { @@ -142,6 +139,18 @@ public class SplitLogManager extends ZooKeeperListener { } }); } + + /** + * Its OK to construct this object even when region-servers are not online. It + * does lookup the orphan tasks in zk but it doesn't block waiting for them + * to be done. + * + * @param zkw + * @param conf + * @param stopper + * @param serverName + * @param tf task finisher + */ public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, Stoppable stopper, String serverName, TaskFinisher tf) { super(zkw); @@ -194,8 +203,6 @@ public class SplitLogManager extends ZooKeeperListener { fileStatus.add(status); } } - if (fileStatus.isEmpty()) - return null; FileStatus[] a = new FileStatus[fileStatus.size()]; return fileStatus.toArray(a); } @@ -228,8 +235,6 @@ public class SplitLogManager extends ZooKeeperListener { MonitoredTask status = TaskMonitor.get().createStatus( "Doing distributed log split in " + logDirs); FileStatus[] logfiles = getFileList(logDirs); - if(logfiles == null) - return 0; status.setStatus("Checking directory contents..."); LOG.debug("Scheduling batch of logs to split"); tot_mgr_log_split_batch_start.incrementAndGet(); @@ -251,7 +256,7 @@ public class SplitLogManager extends ZooKeeperListener { } waitForSplittingCompletion(batch, status); if (batch.done != batch.installed) { - stopTrackingTasks(batch); + batch.isDead = true; tot_mgr_log_split_batch_err.incrementAndGet(); LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed + " but only " + batch.done + " done"); @@ -259,17 +264,21 @@ public class SplitLogManager extends ZooKeeperListener { + logDirs + " Task = " + batch); } for(Path logDir: logDirs){ - if (anyNewLogFiles(logDir, logfiles)) { - tot_mgr_new_unexpected_hlogs.incrementAndGet(); - LOG.warn("new hlogs were produced while logs in " + logDir + - " were being split"); - throw new OrphanHLogAfterSplitException(); - } - tot_mgr_log_split_batch_success.incrementAndGet(); status.setStatus("Cleaning up log directory..."); - if (!fs.delete(logDir, true)) { - throw new IOException("Unable to delete src dir: " + logDir); + try { + if (fs.exists(logDir) && !fs.delete(logDir, false)) { + LOG.warn("Unable to delete log src dir. Ignoring. " + logDir); + } + } catch (IOException ioe) { + FileStatus[] files = fs.listStatus(logDir); + if (files != null && files.length > 0) { + LOG.warn("returning success without actually splitting and " + + "deleting all the log files in path " + logDir); + } else { + LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); + } } + tot_mgr_log_split_batch_success.incrementAndGet(); } String msg = "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed + " log files in " + logDirs + " in " + @@ -295,8 +304,6 @@ public class SplitLogManager extends ZooKeeperListener { createNode(path, zkretries); return true; } - LOG.warn(path + "is already being split. " + - "Two threads cannot wait for the same task"); return false; } @@ -323,15 +330,6 @@ public class SplitLogManager extends ZooKeeperListener { } private void setDone(String path, TerminationStatus status) { - if (!ZKSplitLog.isRescanNode(watcher, path)) { - if (status == SUCCESS) { - tot_mgr_log_split_success.incrementAndGet(); - LOG.info("Done splitting " + path); - } else { - tot_mgr_log_split_err.incrementAndGet(); - LOG.warn("Error splitting " + path); - } - } Task task = tasks.get(path); if (task == null) { if (!ZKSplitLog.isRescanNode(watcher, path)) { @@ -340,18 +338,24 @@ public class SplitLogManager extends ZooKeeperListener { } } else { synchronized (task) { - task.deleted = true; - // if in stopTrackingTasks() we were to make tasks orphan instead of - // forgetting about them then we will have to handle the race when - // accessing task.batch here. - if (!task.isOrphan()) { - synchronized (task.batch) { - if (status == SUCCESS) { - task.batch.done++; - } else { - task.batch.error++; + if (task.status == IN_PROGRESS) { + if (status == SUCCESS) { + tot_mgr_log_split_success.incrementAndGet(); + LOG.info("Done splitting " + path); + } else { + tot_mgr_log_split_err.incrementAndGet(); + LOG.warn("Error splitting " + path); + } + task.status = status; + if (task.batch != null) { + synchronized (task.batch) { + if (status == SUCCESS) { + task.batch.done++; + } else { + task.batch.error++; + } + task.batch.notify(); } - task.batch.notify(); } } } @@ -394,6 +398,11 @@ public class SplitLogManager extends ZooKeeperListener { private void getDataSetWatchSuccess(String path, byte[] data, int version) { if (data == null) { + if (version == Integer.MIN_VALUE) { + // assume all done. The task znode suddenly disappeared. + setDone(path, SUCCESS); + return; + } tot_mgr_null_data.incrementAndGet(); LOG.fatal("logic error - got null data " + path); setDone(path, FAILURE); @@ -480,7 +489,7 @@ public class SplitLogManager extends ZooKeeperListener { ResubmitDirective directive) { // its ok if this thread misses the update to task.deleted. It will // fail later - if (task.deleted) { + if (task.status != IN_PROGRESS) { return false; } int version; @@ -514,7 +523,9 @@ public class SplitLogManager extends ZooKeeperListener { return false; } } catch (NoNodeException e) { - LOG.debug("failed to resubmit " + path + " task done"); + LOG.warn("failed to resubmit because znode doesn't exist " + path + + " task done (or forced done by removing the znode)"); + getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); return false; } catch (KeeperException e) { tot_mgr_resubmit_failed.incrementAndGet(); @@ -539,6 +550,9 @@ public class SplitLogManager extends ZooKeeperListener { private void deleteNode(String path, Long retries) { tot_mgr_node_delete_queued.incrementAndGet(); + // Once a task znode is ready for delete, that is it is in the TASK_DONE + // state, then no one should be writing to it anymore. That is no one + // will be updating the znode version any more. this.watcher.getRecoverableZooKeeper().getZooKeeper(). delete(path, -1, new DeleteAsyncCallback(), retries); @@ -603,59 +617,43 @@ public class SplitLogManager extends ZooKeeperListener { Task oldtask; // batch.installed is only changed via this function and // a single thread touches batch.installed. - oldtask = tasks.putIfAbsent(path, new Task(batch)); - if (oldtask != null) { - // new task was not used. - batch.installed--; - synchronized (oldtask) { - if (oldtask.isOrphan()) { - if (oldtask.deleted) { - // The task is already done. Do not install the batch for this - // task because it might be too late for setDone() to update - // batch.done. There is no need for the batch creator to wait for - // this task to complete. - return (null); - } - oldtask.setBatch(batch); - } - } - LOG.info("Previously orphan task " + path + - " is now being waited upon"); - return (null); + Task newtask = new Task(); + newtask.batch = batch; + oldtask = tasks.putIfAbsent(path, newtask); + if (oldtask == null) { + batch.installed++; + return null; } - return oldtask; - } - - /** - * This function removes any knowledge of this batch's tasks from the - * manager. It doesn't actually stop the active tasks. If the tasks are - * resubmitted then the active tasks will be reacquired and monitored by the - * manager. It is important to call this function when batch processing - * terminates prematurely, otherwise if the tasks are re-submitted - * then they might fail. - *

- * there is a slight race here. even after a task has been removed from - * {@link #tasks} someone who had acquired a reference to it will continue to - * process the task. That is OK since we don't actually change the task and - * the batch objects. - *

- * TODO Its probably better to convert these to orphan tasks but then we - * have to deal with race conditions as we nullify Task's batch pointer etc. - *

- * @param batch - */ - void stopTrackingTasks(TaskBatch batch) { - for (Map.Entry e : tasks.entrySet()) { - String path = e.getKey(); - Task t = e.getValue(); - if (t.batch == batch) { // == is correct. equals not necessary. - tasks.remove(path); + // new task was not used. + synchronized (oldtask) { + if (oldtask.isOrphan()) { + if (oldtask.status == SUCCESS) { + // The task is already done. Do not install the batch for this + // task because it might be too late for setDone() to update + // batch.done. There is no need for the batch creator to wait for + // this task to complete. + return (null); + } + if (oldtask.status == IN_PROGRESS) { + oldtask.batch = batch; + batch.installed++; + LOG.debug("Previously orphan task " + path + + " is now being waited upon"); + return null; + } + LOG.warn("Transient problem. Failure because previously failed task" + + " state still present. Waiting for znode delete callback" + + " path=" + path); + return oldtask; } + LOG.warn("Failure because two threads can't wait for the same task. " + + " path=" + path); + return oldtask; } } Task findOrCreateOrphanTask(String path) { - Task orphanTask = new Task(null); + Task orphanTask = new Task(); Task task; task = tasks.putIfAbsent(path, orphanTask); if (task == null) { @@ -716,9 +714,10 @@ public class SplitLogManager extends ZooKeeperListener { * All access is synchronized. */ static class TaskBatch { - int installed; - int done; - int error; + int installed = 0; + int done = 0; + int error = 0; + volatile boolean isDead = false; @Override public String toString() { @@ -735,7 +734,7 @@ public class SplitLogManager extends ZooKeeperListener { int last_version; String cur_worker_name; TaskBatch batch; - boolean deleted; + TerminationStatus status; int incarnation; int unforcedResubmits; @@ -744,32 +743,21 @@ public class SplitLogManager extends ZooKeeperListener { return ("last_update = " + last_update + " last_version = " + last_version + " cur_worker_name = " + cur_worker_name + - " deleted = " + deleted + + " status = " + status + " incarnation = " + incarnation + " resubmits = " + unforcedResubmits + " batch = " + batch); } - Task(TaskBatch tb) { + Task() { incarnation = 0; last_version = -1; - deleted = false; - setBatch(tb); + status = IN_PROGRESS; setUnassigned(); } - public void setBatch(TaskBatch batch) { - if (batch != null && this.batch != null) { - LOG.fatal("logic error - batch being overwritten"); - } - this.batch = batch; - if (batch != null) { - batch.installed++; - } - } - public boolean isOrphan() { - return (batch == null); + return (batch == null || batch.isDead); } public boolean isUnassigned() { @@ -882,6 +870,16 @@ public class SplitLogManager extends ZooKeeperListener { if (tot > 0 && !found_assigned_task && ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) > unassignedTimeout)) { + for (Map.Entry e : tasks.entrySet()) { + String path = e.getKey(); + Task task = e.getValue(); + // we have to do this check again because tasks might have + // been asynchronously assigned. + if (task.isUnassigned()) { + // We just touch the znode to make sure its still there + getDataSetWatch(path, zkretries); + } + } createRescanNode(Long.MAX_VALUE); tot_mgr_resubmit_unassigned.incrementAndGet(); LOG.debug("resubmitting unassigned task(s) after timeout"); @@ -901,6 +899,12 @@ public class SplitLogManager extends ZooKeeperListener { tot_mgr_node_create_result.incrementAndGet(); if (rc != 0) { if (rc == KeeperException.Code.NODEEXISTS.intValue()) { + // What if there is a delete pending against this pre-existing + // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE + // state. Only operations that will be carried out on this node by + // this manager are get-znode-data, task-finisher and delete-znode. + // And all code pieces correctly handle the case of suddenly + // disappearing task-znode. LOG.debug("found pre-existing znode " + path); tot_mgr_node_already_exists.incrementAndGet(); } else { @@ -933,6 +937,15 @@ public class SplitLogManager extends ZooKeeperListener { Stat stat) { tot_mgr_get_data_result.incrementAndGet(); if (rc != 0) { + if (rc == KeeperException.Code.NONODE.intValue()) { + tot_mgr_get_data_nonode.incrementAndGet(); + // The task znode has been deleted. Must be some pending delete + // that deleted the task. Assume success because a task-znode is + // is only deleted after TaskFinisher is successful. + LOG.warn("task znode " + path + " vanished."); + getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); + return; + } Long retry_count = (Long) ctx; LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path + " remaining retries=" + retry_count); @@ -974,9 +987,10 @@ public class SplitLogManager extends ZooKeeperListener { } return; } else { - LOG.debug(path - + " does not exist, either was never created or was deleted" - + " in earlier rounds, zkretries = " + (Long) ctx); + LOG.debug(path + + " does not exist. Either was created but deleted behind our" + + " back by another pending delete OR was deleted" + + " in earlier retry rounds. zkretries = " + (Long) ctx); } } else { LOG.debug("deleted " + path); @@ -1014,46 +1028,10 @@ public class SplitLogManager extends ZooKeeperListener { } /** - * checks whether any new files have appeared in logDir which were - * not present in the original logfiles set - * @param logdir - * @param logfiles - * @return True if a new log file is found - * @throws IOException - */ - public boolean anyNewLogFiles(Path logdir, FileStatus[] logfiles) - throws IOException { - if (logdir == null) { - return false; - } - LOG.debug("re-listing " + logdir); - tot_mgr_relist_logdir.incrementAndGet(); - FileStatus[] newfiles = FSUtils.listStatus(fs, logdir, null); - if (newfiles == null) { - return false; - } - boolean matched; - for (FileStatus newfile : newfiles) { - matched = false; - for (FileStatus origfile : logfiles) { - if (origfile.equals(newfile)) { - matched = true; - break; - } - } - if (matched == false) { - LOG.warn("Discovered orphan hlog " + newfile + " after split." + - " Maybe HRegionServer was not dead when we started"); - return true; - } - } - return false; - } - - /** * {@link SplitLogManager} can use objects implementing this interface to * finish off a partially done task by {@link SplitLogWorker}. This provides - * a serialization point at the end of the task processing. + * a serialization point at the end of the task processing. Must be + * restartable and idempotent. */ static public interface TaskFinisher { /** @@ -1085,7 +1063,18 @@ public class SplitLogManager extends ZooKeeperListener { FORCE(); } enum TerminationStatus { - SUCCESS(), - FAILURE(); + IN_PROGRESS("in_progress"), + SUCCESS("success"), + FAILURE("failure"); + + String statusMsg; + TerminationStatus(String msg) { + statusMsg = msg; + } + + @Override + public String toString() { + return statusMsg; + } } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 43bfba0..6f9417a 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -541,16 +541,21 @@ public class HLogSplitter { if (ZKSplitLog.isCorruptFlagFile(dst)) { continue; } - if (fs.exists(dst)) { - fs.delete(dst, false); - } else { - Path dstdir = dst.getParent(); - if (!fs.exists(dstdir)) { - if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir); + if (fs.exists(src)) { + if (fs.exists(dst)) { + fs.delete(dst, false); + } else { + Path dstdir = dst.getParent(); + if (!fs.exists(dstdir)) { + if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir); + } } + fs.rename(src, dst); + LOG.debug(" moved " + src + " => " + dst); + } else { + LOG.debug("Could not move recovered edits from " + src + + " as it doesn't exist"); } - fs.rename(src, dst); - LOG.debug(" moved " + src + " => " + dst); } archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf); @@ -600,24 +605,32 @@ public class HLogSplitter { } fs.mkdirs(oldLogDir); + // this method can get restarted or called multiple times for archiving + // the same log files. for (Path corrupted : corruptedLogs) { Path p = new Path(corruptDir, corrupted.getName()); - if (!fs.rename(corrupted, p)) { - LOG.info("Unable to move corrupted log " + corrupted + " to " + p); - } else { - LOG.info("Moving corrupted log " + corrupted + " to " + p); + if (fs.exists(corrupted)) { + if (!fs.rename(corrupted, p)) { + LOG.warn("Unable to move corrupted log " + corrupted + " to " + p); + } else { + LOG.warn("Moving corrupted log " + corrupted + " to " + p); + } } } for (Path p : processedLogs) { Path newPath = HLog.getHLogArchivePath(oldLogDir, p); - if (!fs.rename(p, newPath)) { - LOG.info("Unable to move " + p + " to " + newPath); - } else { - LOG.info("Archived processed log " + p + " to " + newPath); + if (fs.exists(p)) { + if (!fs.rename(p, newPath)) { + LOG.warn("Unable to move " + p + " to " + newPath); + } else { + LOG.debug("Archived processed log " + p + " to " + newPath); + } } } + // distributed log splitting removes the srcDir (region's log dir) later + // when all the log files in that srcDir have been successfully processed if (srcDir != null && !fs.delete(srcDir, true)) { throw new IOException("Unable to delete src dir: " + srcDir); } diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java index 9b83840..07b8fc6 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java @@ -215,6 +215,7 @@ public class ZKSplitLog { public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0); + public static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0); public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0); diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index a348f0c..b51ef31 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -142,51 +142,6 @@ public class TestDistributedLogSplitting { ht.close(); } - @Test(expected=OrphanHLogAfterSplitException.class, timeout=300000) - public void testOrphanLogCreation() throws Exception { - LOG.info("testOrphanLogCreation"); - startCluster(NUM_RS); - final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; - final FileSystem fs = master.getMasterFileSystem().getFileSystem(); - - List rsts = cluster.getLiveRegionServerThreads(); - HRegionServer hrs = rsts.get(0).getRegionServer(); - Path rootdir = FSUtils.getRootDir(conf); - final Path logDir = new Path(rootdir, - HLog.getHLogDirectoryName(hrs.getServerName().toString())); - - installTable(new ZooKeeperWatcher(conf, "table-creation", null), - "table", "family", 40); - - makeHLog(hrs.getWAL(), hrs.getOnlineRegions(), "table", - 1000, 100); - - new Thread() { - public void run() { - while (true) { - int i = 0; - try { - while(ZKSplitLog.Counters.tot_mgr_log_split_batch_start.get() == - 0) { - Thread.yield(); - } - fs.createNewFile(new Path(logDir, "foo" + i++)); - } catch (Exception e) { - LOG.debug("file creation failed", e); - return; - } - } - } - }.start(); - slm.splitLogDistributed(logDir); - FileStatus[] files = fs.listStatus(logDir); - if (files != null) { - for (FileStatus file : files) { - LOG.debug("file still there " + file.getPath()); - } - } - } - @Test (timeout=300000) public void testRecoveredEdits() throws Exception { LOG.info("testRecoveredEdits"); diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 32ad7e8..2d23388 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.*; @@ -31,6 +32,7 @@ import static org.junit.Assert.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.master.SplitLogManager.Task; @@ -419,6 +421,52 @@ public class TestSplitLogManager { return; } + @Test + public void testEmptyLogDir() throws Exception { + LOG.info("testEmptyLogDir"); + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path emptyLogDirPath = new Path(fs.getWorkingDirectory(), + UUID.randomUUID().toString()); + fs.mkdirs(emptyLogDirPath); + slm.splitLogDistributed(emptyLogDirPath); + assertFalse(fs.exists(emptyLogDirPath)); + } + + @Test + public void testVanishingTaskZNode() throws Exception { + LOG.info("testVanishingTaskZNode"); + conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0); + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + final Path logDir = new Path(fs.getWorkingDirectory(), + UUID.randomUUID().toString()); + fs.mkdirs(logDir); + Path logFile = new Path(logDir, UUID.randomUUID().toString()); + fs.createNewFile(logFile); + new Thread() { + public void run() { + try { + // this call will block because there are no SplitLogWorkers + slm.splitLogDistributed(logDir); + } catch (Exception e) { + LOG.warn("splitLogDistributed failed", e); + fail(); + } + } + }.start(); + waitForCounter(tot_mgr_node_create_result, 0, 1, 10000); + String znode = ZKSplitLog.getEncodedNodeName(zkw, logFile.toString()); + // remove the task znode + ZKUtil.deleteNode(zkw, znode); + waitForCounter(tot_mgr_get_data_nonode, 0, 1, 30000); + waitForCounter(tot_mgr_log_split_batch_success, 0, 1, 1000); + assertTrue(fs.exists(logFile)); + fs.delete(logDir, true); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -- 1.7.7.2