Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (revision 1459623) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (working copy) @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -137,6 +138,8 @@ FSUtils.setRootDir(TEST_UTIL.getConfiguration(), HBASEDIR); TEST_UTIL.getConfiguration().setClass("hbase.regionserver.hlog.writer.impl", InstrumentedSequenceFileLogWriter.class, HLog.Writer.class); + TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true); + TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config. System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); // Create fake maping user to group and set it to the conf. @@ -253,9 +256,9 @@ try { doWriting(); } catch (IOException e) { - flushToConsole(getName() + " Writer exiting " + e); + LOG.warn(getName() + " Writer exiting " + e); } catch (InterruptedException e) { - flushToConsole(getName() + " Writer exiting " + e); + LOG.warn(getName() + " Writer exiting " + e); } } @@ -378,11 +381,12 @@ generateHLogs(-1); + CountDownLatch latch = new CountDownLatch(1); try { - (new ZombieNewLogWriterRegionServer(stop)).start(); - HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + (new ZombieNewLogWriterRegionServer(latch, stop)).start(); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, HBASEDIR, HLOGDIR, OLDLOGDIR, fs); - logSplitter.splitLog(); + logSplitter.splitLog(latch); } finally { stop.set(true); } @@ -713,16 +717,23 @@ AtomicBoolean stop = new AtomicBoolean(false); generateHLogs(-1); fs.initialize(fs.getUri(), conf); - Thread zombie = new ZombieNewLogWriterRegionServer(stop); + CountDownLatch latch = new CountDownLatch(1); + Thread zombie = new ZombieNewLogWriterRegionServer(latch, stop); + List splits = null; try { zombie.start(); try { HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, HBASEDIR, HLOGDIR, OLDLOGDIR, fs); - logSplitter.splitLog(); - } catch (IOException ex) {/* expected */} - int logFilesNumber = fs.listStatus(HLOGDIR).length; + splits = logSplitter.splitLog(); + } catch (IOException ex) { + /* expected */ + LOG.warn("testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted", ex); + } + FileStatus[] files = fs.listStatus(HLOGDIR); + if (files == null) fail("no files in " + HLOGDIR + " with splits " + splits); + int logFilesNumber = files.length; assertEquals("Log files should not be archived if there's an extra file after split", NUM_WRITERS + 1, logFilesNumber); @@ -1066,8 +1077,10 @@ */ class ZombieNewLogWriterRegionServer extends Thread { AtomicBoolean stop; - public ZombieNewLogWriterRegionServer(AtomicBoolean stop) { + CountDownLatch latch; + public ZombieNewLogWriterRegionServer(CountDownLatch latch, AtomicBoolean stop) { super("ZombieNewLogWriterRegionServer"); + this.latch = latch; this.stop = stop; } @@ -1084,7 +1097,7 @@ try { while (!fs.exists(recoveredEdits) && !stop.get()) { - flushToConsole("Juliet: split not started, sleeping a bit..."); + LOG.info("Juliet: split not started, sleeping a bit..."); Threads.sleep(10); } @@ -1094,8 +1107,10 @@ appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(), ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0); writer.close(); - flushToConsole("Juliet file creator: created file " + julietLog); + LOG.info("Juliet file creator: created file " + julietLog); + latch.countDown(); } catch (IOException e1) { + LOG.error("Failed to create file " + julietLog, e1); assertTrue("Failed to create file " + julietLog, false); } } @@ -1306,7 +1321,7 @@ } if (i != leaveOpen) { ws[i].close(); - flushToConsole("Closing writer " + i); + LOG.info("Closing writer " + i); } } return ws; @@ -1418,9 +1433,9 @@ byte[] row, byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException { - flushToConsole(Thread.currentThread().getName() + " append"); + LOG.info(Thread.currentThread().getName() + " append"); writer.append(createTestEntry(table, region, row, family, qualifier, value, seq)); - flushToConsole(Thread.currentThread().getName() + " sync"); + LOG.info(Thread.currentThread().getName() + " sync"); writer.sync(); return seq; } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (revision 1459623) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; @@ -49,7 +50,6 @@ import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.LeaseManager; -import org.apache.hadoop.io.SequenceFile; import org.apache.log4j.Level; import org.junit.After; import org.junit.AfterClass; @@ -79,7 +79,7 @@ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static Path hbaseDir; private static Path oldLogDir; - + @Before public void setUp() throws Exception { @@ -99,6 +99,7 @@ // Make block sizes small. TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); // needed for testAppendClose() + TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true); TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); // quicker heartbeat interval for faster DN death notification TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000); @@ -370,18 +371,30 @@ } } - // For this test to pass, requires: - // 1. HDFS-200 (append support) - // 2. HDFS-988 (SafeMode should freeze file operations - // [FSNamesystem.nextGenerationStampForBlock]) - // 3. HDFS-142 (on restart, maintain pendingCreates) + /* + * We pass different values to recoverFileLease() so that different code paths are covered + * + * For this test to pass, requires: + * 1. HDFS-200 (append support) + * 2. HDFS-988 (SafeMode should freeze file operations + * [FSNamesystem.nextGenerationStampForBlock]) + * 3. HDFS-142 (on restart, maintain pendingCreates) + */ @Test public void testAppendClose() throws Exception { + testAppendClose(true); + testAppendClose(false); + } + + /* + * @param triggerDirectAppend whether to trigger direct call of fs.append() + */ + public void testAppendClose(final boolean triggerDirectAppend) throws Exception { byte [] tableName = Bytes.toBytes(getName()); HRegionInfo regioninfo = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false); - HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir", + HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir" + triggerDirectAppend, "hlogdir_archive", conf); final int total = 20; @@ -456,6 +469,7 @@ public Exception exception = null; public void run() { try { + rlConf.setBoolean(FSHDFSUtils.TEST_TRIGGER_DFS_APPEND, triggerDirectAppend); FSUtils.getInstance(fs, rlConf) .recoverFileLease(recoveredFs, walPath, rlConf); } catch (IOException e) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (revision 1459623) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (working copy) @@ -26,6 +26,7 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -34,6 +35,7 @@ import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; @@ -187,6 +189,20 @@ */ public List splitLog() throws IOException { + return splitLog((CountDownLatch) null); + } + + /** + * Split up a bunch of regionserver commit log files that are no longer being + * written to, into new files, one per region for region to replay on startup. + * Delete the old log files when finished. + * + * @param latch + * @throws IOException will throw if corrupted hlogs aren't tolerated + * @return the list of splits + */ + public List splitLog(CountDownLatch latch) + throws IOException { Preconditions.checkState(!hasSplit, "An HLogSplitter instance may only be used once"); hasSplit = true; @@ -210,7 +226,7 @@ } logAndReport("Splitting " + logfiles.length + " hlog(s) in " + srcDir.toString()); - splits = splitLog(logfiles); + splits = splitLog(logfiles, latch); splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; String msg = "hlog file splitting completed in " + splitTime + @@ -274,7 +290,8 @@ * After the process is complete, the log files are archived to a separate * directory. */ - private List splitLog(final FileStatus[] logfiles) throws IOException { + private List splitLog(final FileStatus[] logfiles, CountDownLatch latch) + throws IOException { List processedLogs = new ArrayList(logfiles.length); List corruptedLogs = new ArrayList(logfiles.length); List splits; @@ -322,10 +339,19 @@ } status.setStatus("Log splits complete. Checking for orphaned logs."); - if (fs.listStatus(srcDir).length > processedLogs.size() + if (latch != null) { + try { + latch.await(); + } catch (InterruptedException ie) { + LOG.warn("wait for latch interrupted"); + Thread.currentThread().interrupt(); + } + } + FileStatus[] currFiles = fs.listStatus(srcDir); + if (currFiles.length > processedLogs.size() + corruptedLogs.size()) { throw new OrphanHLogAfterSplitException( - "Discovered orphan hlog after split. Maybe the " + "Discovered orphan hlog after split. Maybe the " + "HRegionServer was not dead when we started"); } } finally { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (revision 1459623) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (working copy) @@ -54,6 +54,8 @@ * in o.a.h.hdfs.protocol.HdfsConstants cause of HDFS-1620. */ public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000; + + public static final String TEST_TRIGGER_DFS_APPEND = "hbase.test.trigger.dfs.append"; @Override public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf) @@ -72,22 +74,37 @@ // Trying recovery boolean recovered = false; + long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 300000); + // conf parameter passed from unit test, indicating whether fs.append() should be triggered + boolean triggerAppend = conf.getBoolean(TEST_TRIGGER_DFS_APPEND, false); + Exception ex = null; while (!recovered) { try { try { DistributedFileSystem dfs = (DistributedFileSystem) fs; - DistributedFileSystem.class.getMethod("recoverLease", new Class[] { Path.class }).invoke( - dfs, p); - } catch (InvocationTargetException ite) { - // function was properly called, but threw it's own exception - throw (IOException) ite.getCause(); + if (triggerAppend) throw new IOException(); + try { + recovered = (Boolean) DistributedFileSystem.class.getMethod( + "recoverLease", new Class[] { Path.class }).invoke(dfs, p); + if (!recovered) LOG.debug("recoverLease returned false"); + } catch (InvocationTargetException ite) { + // function was properly called, but threw it's own exception + throw (IOException) ite.getCause(); + } } catch (Exception e) { LOG.debug("Failed fs.recoverLease invocation, " + e.toString() + - ", trying fs.append instead"); + ", trying fs.append instead"); + ex = e; + } + if (ex != null || System.currentTimeMillis() - startWaiting > recoveryTimeout) { + LOG.debug("trying fs.append for " + p + " with " + ex); + ex = null; // assume the following append() call would succeed FSDataOutputStream out = fs.append(p); out.close(); + recovered = true; + LOG.debug("fs.append passed"); } - recovered = true; + if (recovered) break; } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); if (e instanceof AlreadyBeingCreatedException) { @@ -111,9 +128,9 @@ } try { Thread.sleep(1000); - } catch (InterruptedException ex) { + } catch (InterruptedException ie) { InterruptedIOException iioe = new InterruptedIOException(); - iioe.initCause(ex); + iioe.initCause(ie); throw iioe; } }