diff --git src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 31c1e9b..32abd23 100644 --- src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -122,7 +122,7 @@ public class MasterFileSystem { conf.getBoolean("hbase.master.distributed.log.splitting", true); if (this.distributedLogSplitting) { this.splitLogManager = new SplitLogManager(master.getZooKeeper(), - master.getConfiguration(), master, master.getServerName().toString()); + master.getConfiguration(), master, this.services, master.getServerName().toString()); this.splitLogManager.finishInitialization(masterRecovery); } else { this.splitLogManager = null; diff --git src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index c74659e..2d6881d 100644 --- src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -63,6 +63,8 @@ import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; +import com.google.common.base.Strings; + import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.*; import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.*; @@ -99,6 +101,7 @@ public class SplitLogManager extends ZooKeeperListener { private static final Log LOG = LogFactory.getLog(SplitLogManager.class); private final Stoppable stopper; + private final MasterServices master; private final String serverName; private final TaskFinisher taskFinisher; private FileSystem fs; @@ -111,12 +114,12 @@ public class SplitLogManager extends ZooKeeperListener { private long lastNodeCreateTime = Long.MAX_VALUE; public boolean ignoreZKDeleteForTesting = false; - private ConcurrentMap tasks = + private final ConcurrentMap tasks = new ConcurrentHashMap(); private TimeoutMonitor timeoutMonitor; private Set deadWorkers = null; - private Object deadWorkersLock = new Object(); + private final Object deadWorkersLock = new Object(); private Set failedDeletions = null; @@ -133,8 +136,8 @@ public class SplitLogManager extends ZooKeeperListener { * @param serverName */ public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, - Stoppable stopper, String serverName) { - this(zkw, conf, stopper, serverName, new TaskFinisher() { + Stoppable stopper, MasterServices master, String serverName) { + this(zkw, conf, stopper, master, serverName, new TaskFinisher() { @Override public Status finish(String workerName, String logfile) { try { @@ -160,11 +163,12 @@ public class SplitLogManager extends ZooKeeperListener { * @param tf task finisher */ public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, - Stoppable stopper, String serverName, TaskFinisher tf) { + Stoppable stopper, MasterServices master, String serverName, TaskFinisher tf) { super(zkw); this.taskFinisher = tf; this.conf = conf; this.stopper = stopper; + this.master = master; this.zkretries = conf.getLong("hbase.splitlog.zk.retries", ZKSplitLog.DEFAULT_ZK_RETRIES); this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", @@ -174,8 +178,9 @@ public class SplitLogManager extends ZooKeeperListener { this.unassignedTimeout = conf.getInt("hbase.splitlog.manager.unassigned.timeout", ZKSplitLog.DEFAULT_UNASSIGNED_TIMEOUT); - LOG.debug("timeout = " + timeout); - LOG.debug("unassigned timeout = " + unassignedTimeout); + LOG.info("timeout = " + timeout); + LOG.info("unassigned timeout = " + unassignedTimeout); + LOG.info("resubmit threshold = " + this.resubmit_threshold); this.serverName = serverName; this.timeoutMonitor = new TimeoutMonitor( @@ -598,8 +603,31 @@ public class SplitLogManager extends ZooKeeperListener { } int version; if (directive != FORCE) { - if ((EnvironmentEdgeManager.currentTimeMillis() - task.last_update) < - timeout) { + // We're going to resubmit: + // 1) immediately if the worker server is now marked as dead + // 2) after a configurable timeout if the server is not marked as dead but has still not + // finished the task. This allows to continue if the worker cannot actually handle it, + // for any reason. + final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update; + ServerName curWorker = null; + if (!Strings.isNullOrEmpty(task.cur_worker_name)) { + try { + curWorker = ServerName.parseServerName(task.cur_worker_name); + } catch (IllegalArgumentException ie) { + LOG.error("Got invalid server name:" + task.cur_worker_name + " - task for path:" + path + + " won't be resubmitted before timeout"); + } + } else { + LOG.error("Got empty/null server name:" + task.cur_worker_name + " - task for path:" + path + + " won't be resubmitted before timeout"); + } + final boolean alive = + (master.getServerManager() != null && curWorker != null) ? master.getServerManager() + .isServerOnline(curWorker) : true; + if (alive && time < timeout) { + LOG.trace("Skipping the resubmit of " + task.toString() + " because the server " + + task.cur_worker_name + " is not marked as dead, we waited for " + time + + " while the timeout is " + timeout); return false; } if (task.unforcedResubmits >= resubmit_threshold) { diff --git src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java index f5d5f85..526c4cc 100644 --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java @@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.util.Bytes; public class ZKSplitLog { private static final Log LOG = LogFactory.getLog(ZKSplitLog.class); - public static final int DEFAULT_TIMEOUT = 25000; // 25 sec + public static final int DEFAULT_TIMEOUT = 300000; // 5 mins public static final int DEFAULT_ZK_RETRIES = 3; public static final int DEFAULT_MAX_RESUBMIT = 3; public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min diff --git src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java index d766018..cb1d3e8 100644 --- src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java +++ src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -23,10 +23,11 @@ import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; import java.io.IOException; import java.util.Arrays; -import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import junit.framework.Assert; + import static org.junit.Assert.*; import org.apache.commons.logging.Log; @@ -48,11 +49,10 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; @Category(MediumTests.class) public class TestSplitLogManager { @@ -66,6 +66,8 @@ public class TestSplitLogManager { private SplitLogManager slm; private Configuration conf; private int to; + private final ServerManager sm = Mockito.mock(ServerManager.class); + private final MasterServices master = Mockito.mock(MasterServices.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -83,14 +85,6 @@ public class TestSplitLogManager { }; - @BeforeClass - public static void setUpBeforeClass() throws Exception { - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - } - @Before public void setup() throws Exception { TEST_UTIL.startMiniZKCluster(); @@ -111,6 +105,11 @@ public class TestSplitLogManager { conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); to = to + 4 * 100; + + // By default, we let the test manage the error as before, so the server + // does not appear as dead from the master point of view, only from the split log pov. + Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true); + Mockito.when(master.getServerManager()).thenReturn(sm); } @After @@ -180,7 +179,7 @@ public class TestSplitLogManager { public void testTaskCreation() throws Exception { LOG.info("TestTaskCreation - test the creation of a task in zk"); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); @@ -200,7 +199,7 @@ public class TestSplitLogManager { TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); @@ -227,7 +226,7 @@ public class TestSplitLogManager { CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); Task task = slm.findOrCreateOrphanTask(tasknode); @@ -251,7 +250,7 @@ public class TestSplitLogManager { LOG.info("TestMultipleResbmits - no indefinite resubmissions"); conf.setInt("hbase.splitlog.max.resubmit", 2); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); @@ -279,7 +278,7 @@ public class TestSplitLogManager { public void testRescanCleanup() throws Exception { LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); @@ -294,26 +293,21 @@ public class TestSplitLogManager { return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get()); } }, 0, 1, 5*60000); // wait long enough - if (tot_mgr_resubmit_failed.get() == 0) { - int version1 = ZKUtil.checkExists(zkw, tasknode); - assertTrue(version1 > version); - byte[] taskstate = ZKUtil.getData(zkw, tasknode); - assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), - taskstate)); - - waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2); - } else { - LOG.warn("Could not run test. Lost ZK connection?"); - } + Assert + .assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.get()); + int version1 = ZKUtil.checkExists(zkw, tasknode); + assertTrue(version1 > version); + byte[] taskstate = ZKUtil.getData(zkw, tasknode); + assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), taskstate)); - return; + waitForCounter(tot_mgr_rescan_deleted, 0, 1, to / 2); } @Test public void testTaskDone() throws Exception { LOG.info("TestTaskDone - cleanup task node once in DONE state"); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -332,7 +326,7 @@ public class TestSplitLogManager { LOG.info("TestTaskErr - cleanup task node once in ERR state"); conf.setInt("hbase.splitlog.max.resubmit", 0); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); @@ -352,7 +346,7 @@ public class TestSplitLogManager { public void testTaskResigned() throws Exception { LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); String tasknode = submitTaskAndWait(batch, "foo/1"); @@ -379,7 +373,7 @@ public class TestSplitLogManager { TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); @@ -409,7 +403,7 @@ public class TestSplitLogManager { LOG.info("testDeadWorker"); conf.setLong("hbase.splitlog.max.resubmit", 0); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); TaskBatch batch = new TaskBatch(); @@ -433,7 +427,7 @@ public class TestSplitLogManager { @Test public void testEmptyLogDir() throws Exception { LOG.info("testEmptyLogDir"); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); FileSystem fs = TEST_UTIL.getTestFileSystem(); Path emptyLogDirPath = new Path(fs.getWorkingDirectory(), @@ -448,7 +442,7 @@ public class TestSplitLogManager { LOG.info("testVanishingTaskZNode"); conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0); conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 1000); - slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); slm.finishInitialization(); FileSystem fs = TEST_UTIL.getTestFileSystem(); final Path logDir = new Path(fs.getWorkingDirectory(), @@ -488,6 +482,31 @@ public class TestSplitLogManager { } } + @Test + public void testWorkerCrash() throws Exception { + conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT); + slm = new SplitLogManager(zkw, conf, stopper, master, "dummy-master", null); + slm.finishInitialization(); + TaskBatch batch = new TaskBatch(); + + String tasknode = submitTaskAndWait(batch, "foo/1"); + final ServerName worker1 = new ServerName("worker1,1,1"); + + ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get(worker1.getServerName())); + if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to / 2); + + // Not yet resubmitted. + Assert.assertEquals(0, tot_mgr_resubmit.get()); + + // This server becomes dead + Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); + + Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). + + // It has been resubmitted + Assert.assertEquals(1, tot_mgr_resubmit.get()); + } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();