From 4fa70d63c3e6bfd97ffb2afeb0a80aa46c7b034f Mon Sep 17 00:00:00 2001 From: khemani Date: Thu, 5 Jan 2012 14:32:59 -0800 Subject: [PATCH] HBASE-5081 [jira] Distributed log splitting deleteNode races against splitLog retry Summary: HBASE-5081 Distributed log splitting deleteNode races against splitLog retry Recently, during 0.92 rc testing, we found distributed log splitting hangs there forever. Please see attached screen shot. I looked into it and here is what happened I think: 1. One rs died, the servershutdownhandler found it out and started the distributed log splitting; 2. All three tasks failed, so the three tasks were deleted, asynchronously; 3. Servershutdownhandler retried the log splitting; 4. During the retrial, it created these three tasks again, and put them in a hashmap (tasks); 5. The asynchronously deletion in step 2 finally happened for one task, in the callback, it removed one task in the hashmap; 6. One of the newly submitted tasks' zookeeper watcher found out that task is unassigned, and it is not in the hashmap, so it created a new orphan task. 7. All three tasks failed, but that task created in step 6 is an orphan so the batch.err counter was one short, so the log splitting hangs there and keeps waiting for the last task to finish which is never going to happen. So I think the problem is step 2. The fix is to make deletion sync, instead of async, so that the retry will have a clean start. Async deleteNode will mess up with split log retrial. In extreme situation, if async deleteNode doesn't happen soon enough, some node created during the retrial could be deleted. deleteNode should be sync. Test Plan: EMPTY Reviewers: JIRA --- .../hadoop/hbase/master/MasterFileSystem.java | 7 +- .../hadoop/hbase/master/SplitLogManager.java | 341 +++++++++++--------- .../hbase/regionserver/wal/HLogSplitter.java | 45 ++- .../apache/hadoop/hbase/zookeeper/ZKSplitLog.java | 2 + .../hbase/master/TestDistributedLogSplitting.java | 85 +++--- .../hadoop/hbase/master/TestSplitLogManager.java | 100 +++++- 6 files changed, 343 insertions(+), 239 deletions(-) diff --git src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 3938fa7..548a5be 100644 --- src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -270,12 +270,7 @@ public class MasterFileSystem { if (distributedLogSplitting) { splitLogManager.handleDeadWorkers(serverNames); splitTime = EnvironmentEdgeManager.currentTimeMillis(); - try { - splitLogSize = splitLogManager.splitLogDistributed(logDirs); - } catch (OrphanHLogAfterSplitException e) { - LOG.warn("Retrying distributed splitting for " + serverNames, e); - splitLogManager.splitLogDistributed(logDirs); - } + splitLogSize = splitLogManager.splitLogDistributed(logDirs); splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime; } else { for(Path logDir: logDirs){ diff --git src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 667a8b1..0ef0e33 100644 --- src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -107,6 +107,7 @@ public class SplitLogManager extends ZooKeeperListener { private long timeout; private long unassignedTimeout; private long lastNodeCreateTime = Long.MAX_VALUE; + public boolean ignoreZKDeleteForTesting = false; private ConcurrentMap tasks = new ConcurrentHashMap(); @@ -116,10 +117,12 @@ public class SplitLogManager extends ZooKeeperListener { private Object deadWorkersLock = new Object(); /** - * Its OK to construct this object even when region-servers are not online. It - * does lookup the orphan tasks in zk but it doesn't block for them to be - * done. - * + * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration, + * Stoppable, String, TaskFinisher)} that provides a task finisher for + * copying recovered edits to their final destination. The task finisher + * has to be robust because it can be arbitrarily restarted or called + * multiple times. + * * @param zkw * @param conf * @param stopper @@ -142,6 +145,18 @@ public class SplitLogManager extends ZooKeeperListener { } }); } + + /** + * Its OK to construct this object even when region-servers are not online. It + * does lookup the orphan tasks in zk but it doesn't block waiting for them + * to be done. + * + * @param zkw + * @param conf + * @param stopper + * @param serverName + * @param tf task finisher + */ public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, Stoppable stopper, String serverName, TaskFinisher tf) { super(zkw); @@ -194,8 +209,6 @@ public class SplitLogManager extends ZooKeeperListener { fileStatus.add(status); } } - if (fileStatus.isEmpty()) - return null; FileStatus[] a = new FileStatus[fileStatus.size()]; return fileStatus.toArray(a); } @@ -228,8 +241,6 @@ public class SplitLogManager extends ZooKeeperListener { MonitoredTask status = TaskMonitor.get().createStatus( "Doing distributed log split in " + logDirs); FileStatus[] logfiles = getFileList(logDirs); - if(logfiles == null) - return 0; status.setStatus("Checking directory contents..."); LOG.debug("Scheduling batch of logs to split"); tot_mgr_log_split_batch_start.incrementAndGet(); @@ -251,7 +262,7 @@ public class SplitLogManager extends ZooKeeperListener { } waitForSplittingCompletion(batch, status); if (batch.done != batch.installed) { - stopTrackingTasks(batch); + batch.isDead = true; tot_mgr_log_split_batch_err.incrementAndGet(); LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed + " but only " + batch.done + " done"); @@ -259,17 +270,21 @@ public class SplitLogManager extends ZooKeeperListener { + logDirs + " Task = " + batch); } for(Path logDir: logDirs){ - if (anyNewLogFiles(logDir, logfiles)) { - tot_mgr_new_unexpected_hlogs.incrementAndGet(); - LOG.warn("new hlogs were produced while logs in " + logDir + - " were being split"); - throw new OrphanHLogAfterSplitException(); - } - tot_mgr_log_split_batch_success.incrementAndGet(); status.setStatus("Cleaning up log directory..."); - if (!fs.delete(logDir, true)) { - throw new IOException("Unable to delete src dir: " + logDir); + try { + if (fs.exists(logDir) && !fs.delete(logDir, false)) { + LOG.warn("Unable to delete log src dir. Ignoring. " + logDir); + } + } catch (IOException ioe) { + FileStatus[] files = fs.listStatus(logDir); + if (files != null && files.length > 0) { + LOG.warn("returning success without actually splitting and " + + "deleting all the log files in path " + logDir); + } else { + LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); + } } + tot_mgr_log_split_batch_success.incrementAndGet(); } String msg = "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed + " log files in " + logDirs + " in " + @@ -295,8 +310,6 @@ public class SplitLogManager extends ZooKeeperListener { createNode(path, zkretries); return true; } - LOG.warn(path + "is already being split. " + - "Two threads cannot wait for the same task"); return false; } @@ -323,15 +336,6 @@ public class SplitLogManager extends ZooKeeperListener { } private void setDone(String path, TerminationStatus status) { - if (!ZKSplitLog.isRescanNode(watcher, path)) { - if (status == SUCCESS) { - tot_mgr_log_split_success.incrementAndGet(); - LOG.info("Done splitting " + path); - } else { - tot_mgr_log_split_err.incrementAndGet(); - LOG.warn("Error splitting " + path); - } - } Task task = tasks.get(path); if (task == null) { if (!ZKSplitLog.isRescanNode(watcher, path)) { @@ -340,18 +344,24 @@ public class SplitLogManager extends ZooKeeperListener { } } else { synchronized (task) { - task.deleted = true; - // if in stopTrackingTasks() we were to make tasks orphan instead of - // forgetting about them then we will have to handle the race when - // accessing task.batch here. - if (!task.isOrphan()) { - synchronized (task.batch) { - if (status == SUCCESS) { - task.batch.done++; - } else { - task.batch.error++; + if (task.status == IN_PROGRESS) { + if (status == SUCCESS) { + tot_mgr_log_split_success.incrementAndGet(); + LOG.info("Done splitting " + path); + } else { + tot_mgr_log_split_err.incrementAndGet(); + LOG.warn("Error splitting " + path); + } + task.status = status; + if (task.batch != null) { + synchronized (task.batch) { + if (status == SUCCESS) { + task.batch.done++; + } else { + task.batch.error++; + } + task.batch.notify(); } - task.batch.notify(); } } } @@ -394,6 +404,11 @@ public class SplitLogManager extends ZooKeeperListener { private void getDataSetWatchSuccess(String path, byte[] data, int version) { if (data == null) { + if (version == Integer.MIN_VALUE) { + // assume all done. The task znode suddenly disappeared. + setDone(path, SUCCESS); + return; + } tot_mgr_null_data.incrementAndGet(); LOG.fatal("logic error - got null data " + path); setDone(path, FAILURE); @@ -480,7 +495,7 @@ public class SplitLogManager extends ZooKeeperListener { ResubmitDirective directive) { // its ok if this thread misses the update to task.deleted. It will // fail later - if (task.deleted) { + if (task.status != IN_PROGRESS) { return false; } int version; @@ -490,7 +505,8 @@ public class SplitLogManager extends ZooKeeperListener { return false; } if (task.unforcedResubmits >= resubmit_threshold) { - if (task.unforcedResubmits == resubmit_threshold) { + if (!task.resubmitThresholdReached) { + task.resubmitThresholdReached = true; tot_mgr_resubmit_threshold_reached.incrementAndGet(); LOG.info("Skipping resubmissions of task " + path + " because threshold " + resubmit_threshold + " reached"); @@ -514,7 +530,9 @@ public class SplitLogManager extends ZooKeeperListener { return false; } } catch (NoNodeException e) { - LOG.debug("failed to resubmit " + path + " task done"); + LOG.warn("failed to resubmit because znode doesn't exist " + path + + " task done (or forced done by removing the znode)"); + getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); return false; } catch (KeeperException e) { tot_mgr_resubmit_failed.incrementAndGet(); @@ -539,12 +557,18 @@ public class SplitLogManager extends ZooKeeperListener { private void deleteNode(String path, Long retries) { tot_mgr_node_delete_queued.incrementAndGet(); + // Once a task znode is ready for delete, that is it is in the TASK_DONE + // state, then no one should be writing to it anymore. That is no one + // will be updating the znode version any more. this.watcher.getRecoverableZooKeeper().getZooKeeper(). delete(path, -1, new DeleteAsyncCallback(), retries); } private void deleteNodeSuccess(String path) { + if (ignoreZKDeleteForTesting) { + return; + } Task task; task = tasks.remove(path); if (task == null) { @@ -555,6 +579,10 @@ public class SplitLogManager extends ZooKeeperListener { LOG.debug("deleted task without in memory state " + path); return; } + synchronized (task) { + task.status = DELETED; + task.notify(); + } tot_mgr_task_deleted.incrementAndGet(); } @@ -603,59 +631,67 @@ public class SplitLogManager extends ZooKeeperListener { Task oldtask; // batch.installed is only changed via this function and // a single thread touches batch.installed. - oldtask = tasks.putIfAbsent(path, new Task(batch)); - if (oldtask != null) { - // new task was not used. - batch.installed--; - synchronized (oldtask) { - if (oldtask.isOrphan()) { - if (oldtask.deleted) { - // The task is already done. Do not install the batch for this - // task because it might be too late for setDone() to update - // batch.done. There is no need for the batch creator to wait for - // this task to complete. - return (null); + Task newtask = new Task(); + newtask.batch = batch; + oldtask = tasks.putIfAbsent(path, newtask); + if (oldtask == null) { + batch.installed++; + return null; + } + // new task was not used. + synchronized (oldtask) { + if (oldtask.isOrphan()) { + if (oldtask.status == SUCCESS) { + // The task is already done. Do not install the batch for this + // task because it might be too late for setDone() to update + // batch.done. There is no need for the batch creator to wait for + // this task to complete. + return (null); + } + if (oldtask.status == IN_PROGRESS) { + oldtask.batch = batch; + batch.installed++; + LOG.debug("Previously orphan task " + path + + " is now being waited upon"); + return null; + } + while (oldtask.status == FAILURE) { + LOG.debug("wait for status of task " + path + + " to change to DELETED"); + tot_mgr_wait_for_zk_delete.incrementAndGet(); + try { + oldtask.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted when waiting for znode delete callback"); + // fall through to return failure + break; } - oldtask.setBatch(batch); } + if (oldtask.status != DELETED) { + LOG.warn("Failure because previously failed task" + + " state still present. Waiting for znode delete callback" + + " path=" + path); + return oldtask; + } + // reinsert the newTask and it must succeed this time + Task t = tasks.putIfAbsent(path, newtask); + if (t == null) { + batch.installed++; + return null; + } + LOG.fatal("Logic error. Deleted task still present in tasks map"); + assert false : "Deleted task still present in tasks map"; + return t; } - LOG.info("Previously orphan task " + path + - " is now being waited upon"); - return (null); - } - return oldtask; - } - - /** - * This function removes any knowledge of this batch's tasks from the - * manager. It doesn't actually stop the active tasks. If the tasks are - * resubmitted then the active tasks will be reacquired and monitored by the - * manager. It is important to call this function when batch processing - * terminates prematurely, otherwise if the tasks are re-submitted - * then they might fail. - *

- * there is a slight race here. even after a task has been removed from - * {@link #tasks} someone who had acquired a reference to it will continue to - * process the task. That is OK since we don't actually change the task and - * the batch objects. - *

- * TODO Its probably better to convert these to orphan tasks but then we - * have to deal with race conditions as we nullify Task's batch pointer etc. - *

- * @param batch - */ - void stopTrackingTasks(TaskBatch batch) { - for (Map.Entry e : tasks.entrySet()) { - String path = e.getKey(); - Task t = e.getValue(); - if (t.batch == batch) { // == is correct. equals not necessary. - tasks.remove(path); - } + LOG.warn("Failure because two threads can't wait for the same task. " + + " path=" + path); + return oldtask; } } Task findOrCreateOrphanTask(String path) { - Task orphanTask = new Task(null); + Task orphanTask = new Task(); Task task; task = tasks.putIfAbsent(path, orphanTask); if (task == null) { @@ -716,9 +752,10 @@ public class SplitLogManager extends ZooKeeperListener { * All access is synchronized. */ static class TaskBatch { - int installed; - int done; - int error; + int installed = 0; + int done = 0; + int error = 0; + volatile boolean isDead = false; @Override public String toString() { @@ -731,45 +768,35 @@ public class SplitLogManager extends ZooKeeperListener { * in memory state of an active task. */ static class Task { - long last_update; - int last_version; - String cur_worker_name; + volatile long last_update; + volatile int last_version; + volatile String cur_worker_name; TaskBatch batch; - boolean deleted; - int incarnation; - int unforcedResubmits; + volatile TerminationStatus status; + volatile int incarnation; + volatile int unforcedResubmits; + volatile boolean resubmitThresholdReached; @Override public String toString() { return ("last_update = " + last_update + " last_version = " + last_version + " cur_worker_name = " + cur_worker_name + - " deleted = " + deleted + + " status = " + status + " incarnation = " + incarnation + " resubmits = " + unforcedResubmits + " batch = " + batch); } - Task(TaskBatch tb) { + Task() { incarnation = 0; last_version = -1; - deleted = false; - setBatch(tb); + status = IN_PROGRESS; setUnassigned(); } - public void setBatch(TaskBatch batch) { - if (batch != null && this.batch != null) { - LOG.fatal("logic error - batch being overwritten"); - } - this.batch = batch; - if (batch != null) { - batch.installed++; - } - } - public boolean isOrphan() { - return (batch == null); + return (batch == null || batch.isDead); } public boolean isUnassigned() { @@ -882,6 +909,16 @@ public class SplitLogManager extends ZooKeeperListener { if (tot > 0 && !found_assigned_task && ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) > unassignedTimeout)) { + for (Map.Entry e : tasks.entrySet()) { + String path = e.getKey(); + Task task = e.getValue(); + // we have to do this check again because tasks might have + // been asynchronously assigned. + if (task.isUnassigned()) { + // We just touch the znode to make sure its still there + getDataSetWatch(path, zkretries); + } + } createRescanNode(Long.MAX_VALUE); tot_mgr_resubmit_unassigned.incrementAndGet(); LOG.debug("resubmitting unassigned task(s) after timeout"); @@ -901,6 +938,12 @@ public class SplitLogManager extends ZooKeeperListener { tot_mgr_node_create_result.incrementAndGet(); if (rc != 0) { if (rc == KeeperException.Code.NODEEXISTS.intValue()) { + // What if there is a delete pending against this pre-existing + // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE + // state. Only operations that will be carried out on this node by + // this manager are get-znode-data, task-finisher and delete-znode. + // And all code pieces correctly handle the case of suddenly + // disappearing task-znode. LOG.debug("found pre-existing znode " + path); tot_mgr_node_already_exists.incrementAndGet(); } else { @@ -933,6 +976,15 @@ public class SplitLogManager extends ZooKeeperListener { Stat stat) { tot_mgr_get_data_result.incrementAndGet(); if (rc != 0) { + if (rc == KeeperException.Code.NONODE.intValue()) { + tot_mgr_get_data_nonode.incrementAndGet(); + // The task znode has been deleted. Must be some pending delete + // that deleted the task. Assume success because a task-znode is + // is only deleted after TaskFinisher is successful. + LOG.warn("task znode " + path + " vanished."); + getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); + return; + } Long retry_count = (Long) ctx; LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path + " remaining retries=" + retry_count); @@ -974,9 +1026,10 @@ public class SplitLogManager extends ZooKeeperListener { } return; } else { - LOG.debug(path - + " does not exist, either was never created or was deleted" - + " in earlier rounds, zkretries = " + (Long) ctx); + LOG.debug(path + + " does not exist. Either was created but deleted behind our" + + " back by another pending delete OR was deleted" + + " in earlier retry rounds. zkretries = " + (Long) ctx); } } else { LOG.debug("deleted " + path); @@ -1014,46 +1067,10 @@ public class SplitLogManager extends ZooKeeperListener { } /** - * checks whether any new files have appeared in logDir which were - * not present in the original logfiles set - * @param logdir - * @param logfiles - * @return True if a new log file is found - * @throws IOException - */ - public boolean anyNewLogFiles(Path logdir, FileStatus[] logfiles) - throws IOException { - if (logdir == null) { - return false; - } - LOG.debug("re-listing " + logdir); - tot_mgr_relist_logdir.incrementAndGet(); - FileStatus[] newfiles = FSUtils.listStatus(fs, logdir, null); - if (newfiles == null) { - return false; - } - boolean matched; - for (FileStatus newfile : newfiles) { - matched = false; - for (FileStatus origfile : logfiles) { - if (origfile.equals(newfile)) { - matched = true; - break; - } - } - if (matched == false) { - LOG.warn("Discovered orphan hlog " + newfile + " after split." + - " Maybe HRegionServer was not dead when we started"); - return true; - } - } - return false; - } - - /** * {@link SplitLogManager} can use objects implementing this interface to * finish off a partially done task by {@link SplitLogWorker}. This provides - * a serialization point at the end of the task processing. + * a serialization point at the end of the task processing. Must be + * restartable and idempotent. */ static public interface TaskFinisher { /** @@ -1085,7 +1102,19 @@ public class SplitLogManager extends ZooKeeperListener { FORCE(); } enum TerminationStatus { - SUCCESS(), - FAILURE(); + IN_PROGRESS("in_progress"), + SUCCESS("success"), + FAILURE("failure"), + DELETED("deleted"); + + String statusMsg; + TerminationStatus(String msg) { + statusMsg = msg; + } + + @Override + public String toString() { + return statusMsg; + } } } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 43bfba0..6f9417a 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -541,16 +541,21 @@ public class HLogSplitter { if (ZKSplitLog.isCorruptFlagFile(dst)) { continue; } - if (fs.exists(dst)) { - fs.delete(dst, false); - } else { - Path dstdir = dst.getParent(); - if (!fs.exists(dstdir)) { - if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir); + if (fs.exists(src)) { + if (fs.exists(dst)) { + fs.delete(dst, false); + } else { + Path dstdir = dst.getParent(); + if (!fs.exists(dstdir)) { + if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir); + } } + fs.rename(src, dst); + LOG.debug(" moved " + src + " => " + dst); + } else { + LOG.debug("Could not move recovered edits from " + src + + " as it doesn't exist"); } - fs.rename(src, dst); - LOG.debug(" moved " + src + " => " + dst); } archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf); @@ -600,24 +605,32 @@ public class HLogSplitter { } fs.mkdirs(oldLogDir); + // this method can get restarted or called multiple times for archiving + // the same log files. for (Path corrupted : corruptedLogs) { Path p = new Path(corruptDir, corrupted.getName()); - if (!fs.rename(corrupted, p)) { - LOG.info("Unable to move corrupted log " + corrupted + " to " + p); - } else { - LOG.info("Moving corrupted log " + corrupted + " to " + p); + if (fs.exists(corrupted)) { + if (!fs.rename(corrupted, p)) { + LOG.warn("Unable to move corrupted log " + corrupted + " to " + p); + } else { + LOG.warn("Moving corrupted log " + corrupted + " to " + p); + } } } for (Path p : processedLogs) { Path newPath = HLog.getHLogArchivePath(oldLogDir, p); - if (!fs.rename(p, newPath)) { - LOG.info("Unable to move " + p + " to " + newPath); - } else { - LOG.info("Archived processed log " + p + " to " + newPath); + if (fs.exists(p)) { + if (!fs.rename(p, newPath)) { + LOG.warn("Unable to move " + p + " to " + newPath); + } else { + LOG.debug("Archived processed log " + p + " to " + newPath); + } } } + // distributed log splitting removes the srcDir (region's log dir) later + // when all the log files in that srcDir have been successfully processed if (srcDir != null && !fs.delete(srcDir, true)) { throw new IOException("Unable to delete src dir: " + srcDir); } diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java index 9b83840..02034dc 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java @@ -215,6 +215,7 @@ public class ZKSplitLog { public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0); + public static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0); public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0); public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0); @@ -224,6 +225,7 @@ public class ZKSplitLog { public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0); public static AtomicLong tot_mgr_null_data = new AtomicLong(0); public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0); + public static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0); public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0); public static AtomicLong tot_mgr_resubmit_threshold_reached = new AtomicLong(0); diff --git src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index a348f0c..b0487f1 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -142,51 +143,6 @@ public class TestDistributedLogSplitting { ht.close(); } - @Test(expected=OrphanHLogAfterSplitException.class, timeout=300000) - public void testOrphanLogCreation() throws Exception { - LOG.info("testOrphanLogCreation"); - startCluster(NUM_RS); - final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; - final FileSystem fs = master.getMasterFileSystem().getFileSystem(); - - List rsts = cluster.getLiveRegionServerThreads(); - HRegionServer hrs = rsts.get(0).getRegionServer(); - Path rootdir = FSUtils.getRootDir(conf); - final Path logDir = new Path(rootdir, - HLog.getHLogDirectoryName(hrs.getServerName().toString())); - - installTable(new ZooKeeperWatcher(conf, "table-creation", null), - "table", "family", 40); - - makeHLog(hrs.getWAL(), hrs.getOnlineRegions(), "table", - 1000, 100); - - new Thread() { - public void run() { - while (true) { - int i = 0; - try { - while(ZKSplitLog.Counters.tot_mgr_log_split_batch_start.get() == - 0) { - Thread.yield(); - } - fs.createNewFile(new Path(logDir, "foo" + i++)); - } catch (Exception e) { - LOG.debug("file creation failed", e); - return; - } - } - } - }.start(); - slm.splitLogDistributed(logDir); - FileStatus[] files = fs.listStatus(logDir); - if (files != null) { - for (FileStatus file : files) { - LOG.debug("file still there " + file.getPath()); - } - } - } - @Test (timeout=300000) public void testRecoveredEdits() throws Exception { LOG.info("testRecoveredEdits"); @@ -309,6 +265,45 @@ public class TestDistributedLogSplitting { "tot_wkr_preempt_task"); } + @Test + public void testDelayedDeleteOnFailure() throws Exception { + LOG.info("testDelayedDeleteOnFailure"); + startCluster(1); + final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; + final FileSystem fs = master.getMasterFileSystem().getFileSystem(); + final Path logDir = new Path(FSUtils.getRootDir(conf), "x"); + fs.mkdirs(logDir); + final Path corruptedLogFile = new Path(logDir, "x"); + FSDataOutputStream out; + out = fs.create(corruptedLogFile); + out.write(0); + out.write(Bytes.toBytes("corrupted bytes")); + out.close(); + slm.ignoreZKDeleteForTesting = true; + Thread t = new Thread() { + @Override + public void run() { + try { + slm.splitLogDistributed(logDir); + } catch (IOException ioe) { + try { + assertTrue(fs.exists(corruptedLogFile)); + slm.splitLogDistributed(logDir); + } catch (IOException e) { + assertTrue(Thread.currentThread().isInterrupted()); + return; + } + fail("did not get the expected IOException from the 2nd call"); + } + fail("did not get the expected IOException from the 1st call"); + } + }; + t.start(); + waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); + t.interrupt(); + t.join(); + } + HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs ) throws Exception { // Create a table with regions diff --git src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 32ad7e8..0974b56 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.*; @@ -31,6 +32,7 @@ import static org.junit.Assert.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.master.SplitLogManager.Task; @@ -112,19 +114,34 @@ public class TestSplitLogManager { TEST_UTIL.shutdownMiniZKCluster(); } - private void waitForCounter(AtomicLong ctr, long oldval, long newval, + private interface Expr { + public long eval(); + } + + private void waitForCounter(final AtomicLong ctr, long oldval, long newval, + long timems) { + Expr e = new Expr() { + public long eval() { + return ctr.get(); + } + }; + waitForCounter(e, oldval, newval, timems); + return; + } + + private void waitForCounter(Expr e, long oldval, long newval, long timems) { long curt = System.currentTimeMillis(); long endt = curt + timems; while (curt < endt) { - if (ctr.get() == oldval) { + if (e.eval() == oldval) { try { Thread.sleep(10); - } catch (InterruptedException e) { + } catch (InterruptedException eintr) { } curt = System.currentTimeMillis(); } else { - assertEquals(newval, ctr.get()); + assertEquals(newval, e.eval()); return; } } @@ -267,10 +284,8 @@ public class TestSplitLogManager { public void testRescanCleanup() throws Exception { LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); - int to = 1000; - conf.setInt("hbase.splitlog.manager.timeout", to); + conf.setInt("hbase.splitlog.manager.timeout", 1000); conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); - to = to + 2 * 100; slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); @@ -280,14 +295,23 @@ public class TestSplitLogManager { ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1")); waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); - waitForCounter(tot_mgr_resubmit, 0, 1, to + 100); - int version1 = ZKUtil.checkExists(zkw, tasknode); - assertTrue(version1 > version); - byte[] taskstate = ZKUtil.getData(zkw, tasknode); - assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), - taskstate)); - - waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000); + waitForCounter(new Expr() { + @Override + public long eval() { + return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get()); + } + }, 0, 1, 5*60000); // wait long enough + if (tot_mgr_resubmit_failed.get() == 0) { + int version1 = ZKUtil.checkExists(zkw, tasknode); + assertTrue(version1 > version); + byte[] taskstate = ZKUtil.getData(zkw, tasknode); + assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), + taskstate)); + + waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000); + } else { + LOG.warn("Could not run test. Lost ZK connection?"); + } return; } @@ -419,6 +443,52 @@ public class TestSplitLogManager { return; } + @Test + public void testEmptyLogDir() throws Exception { + LOG.info("testEmptyLogDir"); + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path emptyLogDirPath = new Path(fs.getWorkingDirectory(), + UUID.randomUUID().toString()); + fs.mkdirs(emptyLogDirPath); + slm.splitLogDistributed(emptyLogDirPath); + assertFalse(fs.exists(emptyLogDirPath)); + } + + @Test + public void testVanishingTaskZNode() throws Exception { + LOG.info("testVanishingTaskZNode"); + conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0); + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + final Path logDir = new Path(fs.getWorkingDirectory(), + UUID.randomUUID().toString()); + fs.mkdirs(logDir); + Path logFile = new Path(logDir, UUID.randomUUID().toString()); + fs.createNewFile(logFile); + new Thread() { + public void run() { + try { + // this call will block because there are no SplitLogWorkers + slm.splitLogDistributed(logDir); + } catch (Exception e) { + LOG.warn("splitLogDistributed failed", e); + fail(); + } + } + }.start(); + waitForCounter(tot_mgr_node_create_result, 0, 1, 10000); + String znode = ZKSplitLog.getEncodedNodeName(zkw, logFile.toString()); + // remove the task znode + ZKUtil.deleteNode(zkw, znode); + waitForCounter(tot_mgr_get_data_nonode, 0, 1, 30000); + waitForCounter(tot_mgr_log_split_batch_success, 0, 1, 1000); + assertTrue(fs.exists(logFile)); + fs.delete(logDir, true); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -- 1.7.7.2