From 95bceafa48f54ac96d305c4c136e3a64d596936a Mon Sep 17 00:00:00 2001 From: khemani Date: Wed, 24 Aug 2011 11:15:39 -0700 Subject: [PATCH] HBASE-4007 distributed log splitting can get indefinitely stuck --- .../hadoop/hbase/master/MasterFileSystem.java | 3 + .../hadoop/hbase/master/SplitLogManager.java | 65 +++++++++++++++++--- .../apache/hadoop/hbase/zookeeper/ZKSplitLog.java | 2 + .../hadoop/hbase/master/TestSplitLogManager.java | 65 ++++++++------------ 4 files changed, 86 insertions(+), 49 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 9a71fdf..b760ce2 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -233,6 +233,9 @@ public class MasterFileSystem { } if (distributedLogSplitting) { + for (ServerName serverName : serverNames) { + splitLogManager.handleDeadWorker(serverName.toString()); + } splitTime = EnvironmentEdgeManager.currentTimeMillis(); try { try { diff --git a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 54b6d45..215314e 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -23,6 +23,8 @@ import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -105,6 +107,9 @@ public class SplitLogManager extends ZooKeeperListener { new ConcurrentHashMap(); private TimeoutMonitor timeoutMonitor; + private Set deadWorkers = null; + private Object deadWorkersLock = new Object(); + /** * Its OK to construct this object even when region-servers are not online. It * does lookup the orphan tasks in zk but it doesn't block for them to be @@ -402,7 +407,7 @@ public class SplitLogManager extends ZooKeeperListener { ZKSplitLog.getFileName(path)) == Status.DONE) { setDone(path, false); // success } else { - resubmit(path, false); // err + resubmit(path, true /* force */); // err } } else { setDone(path, false); // success @@ -451,12 +456,8 @@ public class SplitLogManager extends ZooKeeperListener { if (task.isUnassigned()) { LOG.info("task " + path + " acquired by " + workerName); } - // very noisy - //LOG.debug("heartbeat for " + path + " last_version=" + task.last_version + - // " last_update=" + task.last_update + " new_version=" + - // new_version); - task.last_update = EnvironmentEdgeManager.currentTimeMillis(); - task.last_version = new_version; + task.registerHeartbeat(EnvironmentEdgeManager.currentTimeMillis(), + new_version, workerName); tot_mgr_heartbeat.incrementAndGet(); } else { assert false; @@ -558,14 +559,22 @@ public class SplitLogManager extends ZooKeeperListener { * @throws KeeperException */ private void createRescanNode(long retries) { + // The RESCAN node will be deleted almost immediately by the + // SplitLogManager as soon as it is created because it is being + // created in the DONE state. This behavior prevents a buildup + // of RESCAN nodes. But there is also a chance that a SplitLogWorker + // might miss the watch-trigger that creation of RESCAN node provides. + // Since the TimeoutMoitor will keep resubmitting UNASSIGNED tasks + // therefore this behavior is safe. this.watcher.getRecoverableZooKeeper().getZooKeeper(). create(ZKSplitLog.getRescanNode(watcher), - TaskState.TASK_UNASSIGNED.get(serverName), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT_SEQUENTIAL, + TaskState.TASK_DONE.get(serverName), Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), new Long(retries)); } private void createRescanSuccess(String path) { + lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis(); tot_mgr_rescan.incrementAndGet(); getDataSetWatch(path, zkretries); } @@ -698,6 +707,7 @@ public class SplitLogManager extends ZooKeeperListener { static class Task { long last_update; int last_version; + String cur_worker_name; TaskBatch batch; boolean deleted; int incarnation; @@ -707,6 +717,7 @@ public class SplitLogManager extends ZooKeeperListener { public String toString() { return ("last_update = " + last_update + " last_version = " + last_version + + " cur_worker_name = " + cur_worker_name + " deleted = " + deleted + " incarnation = " + incarnation + " resubmits = " + unforcedResubmits + @@ -739,11 +750,29 @@ public class SplitLogManager extends ZooKeeperListener { return (last_update == -1); } + public void registerHeartbeat(long time, int version, String worker) { + last_version = version; + last_update = time; + cur_worker_name = worker; + } + public void setUnassigned() { + cur_worker_name = null; last_update = -1; } } + public void handleDeadWorker(String worker_name) { + // resubmit the tasks on the TimeoutMonitor thread. Makes it easier + // to reason about concurrency. Makes it easier to retry. + synchronized (deadWorkersLock) { + if (deadWorkers == null) { + deadWorkers = new HashSet(100); + } + deadWorkers.add(worker_name); + } + } + /** * Periodically checks all active tasks and resubmits the ones that have timed * out @@ -759,10 +788,17 @@ public class SplitLogManager extends ZooKeeperListener { int unassigned = 0; int tot = 0; boolean found_assigned_task = false; + Set localDeadWorkers; + + synchronized (deadWorkersLock) { + localDeadWorkers = deadWorkers; + deadWorkers = null; + } for (Map.Entry e : tasks.entrySet()) { String path = e.getKey(); Task task = e.getValue(); + String cur_worker = task.cur_worker_name; tot++; // don't easily resubmit a task which hasn't been picked up yet. It // might be a long while before a SplitLogWorker is free to pick up a @@ -774,7 +810,16 @@ public class SplitLogManager extends ZooKeeperListener { continue; } found_assigned_task = true; - if (resubmit(path, task, false)) { + if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) { + tot_mgr_resubmit_dead_server_task.incrementAndGet(); + if (resubmit(path, task, true /* force */)) { + resubmitted++; + } else { + handleDeadWorker(cur_worker); + LOG.warn("Failed to resubmit task " + path + " owned by dead " + + cur_worker + ", will retry."); + } + } else if (resubmit(path, task, false /* force */)) { resubmitted++; } } diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java index 61e5c65..9b83840 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java @@ -235,6 +235,8 @@ public class ZKSplitLog { public static AtomicLong tot_mgr_task_deleted = new AtomicLong(0); public static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0); public static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0); + public static AtomicLong tot_mgr_resubmit_dead_server_task = + new AtomicLong(0); diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index 9a88855..5c9d7dd 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -131,31 +131,6 @@ public class TestSplitLogManager { assertTrue(false); } - private int numRescanPresent() throws KeeperException { - int num = 0; - List nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode); - for (String node : nodes) { - if (ZKSplitLog.isRescanNode(zkw, - ZKUtil.joinZNode(zkw.splitLogZNode, node))) { - num++; - } - } - return num; - } - - private void setRescanNodeDone(int count) throws KeeperException { - List nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode); - for (String node : nodes) { - String nodepath = ZKUtil.joinZNode(zkw.splitLogZNode, node); - if (ZKSplitLog.isRescanNode(zkw, nodepath)) { - ZKUtil.setData(zkw, nodepath, - TaskState.TASK_DONE.get("some-worker")); - count--; - } - } - assertEquals(0, count); - } - private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException, InterruptedException { String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); @@ -222,7 +197,6 @@ public class TestSplitLogManager { waitForCounter(tot_mgr_resubmit, 0, 1, to + 100); assertTrue(task.isUnassigned()); waitForCounter(tot_mgr_rescan, 0, 1, to + 100); - assertEquals(1, numRescanPresent()); } @Test @@ -253,7 +227,6 @@ public class TestSplitLogManager { assertTrue(task.isOrphan()); assertTrue(task.isUnassigned()); assertTrue(ZKUtil.checkExists(zkw, tasknode) > version); - assertEquals(1, numRescanPresent()); } @Test @@ -286,7 +259,6 @@ public class TestSplitLogManager { ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker3")); waitForCounter(tot_mgr_heartbeat, 1, 2, 1000); waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + 100); - assertEquals(2, numRescanPresent()); Thread.sleep(to + 100); assertEquals(2L, tot_mgr_resubmit.get()); } @@ -311,16 +283,12 @@ public class TestSplitLogManager { waitForCounter(tot_mgr_resubmit, 0, 1, to + 100); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); - assertEquals(1, numRescanPresent()); byte[] taskstate = ZKUtil.getData(zkw, tasknode); assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), taskstate)); - setRescanNodeDone(1); - waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000); - assertEquals(0, numRescanPresent()); return; } @@ -377,7 +345,6 @@ public class TestSplitLogManager { waitForCounter(tot_mgr_resubmit, 0, 1, 1000); int version1 = ZKUtil.checkExists(zkw, tasknode); assertTrue(version1 > version); - assertEquals(1, numRescanPresent()); byte[] taskstate = ZKUtil.getData(zkw, tasknode); assertTrue(Arrays.equals(taskstate, @@ -417,18 +384,38 @@ public class TestSplitLogManager { TaskState.TASK_OWNED.get("dummy-worker")); } - // since all the nodes in the system are not unassigned the - // unassigned_timeout must not have kicked in - assertEquals(0, numRescanPresent()); - // since we have stopped heartbeating the owned node therefore it should // get resubmitted LOG.info("waiting for manager to resubmit the orphan task"); waitForCounter(tot_mgr_resubmit, 0, 1, to + 500); - assertEquals(1, numRescanPresent()); // now all the nodes are unassigned. manager should post another rescan waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + 500); - assertEquals(2, numRescanPresent()); + } + + @Test + public void testDeadWorker() throws Exception { + LOG.info("testDeadWorker"); + + conf.setLong("hbase.splitlog.max.resubmit", 0); + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + TaskBatch batch = new TaskBatch(); + + String tasknode = submitTaskAndWait(batch, "foo/1"); + int version = ZKUtil.checkExists(zkw, tasknode); + + ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1")); + waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); + slm.handleDeadWorker("worker1"); + waitForCounter(tot_mgr_resubmit, 0, 1, 1000); + waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, 1000); + + int version1 = ZKUtil.checkExists(zkw, tasknode); + assertTrue(version1 > version); + byte[] taskstate = ZKUtil.getData(zkw, tasknode); + assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), + taskstate)); + return; } } -- 1.7.4