diff --git b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 19c9b54..e4822e4 100644 --- b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -402,10 +403,11 @@ public class HLog implements HConstants, Syncable { HLog.Reader reader = (HLog.Reader) c.newInstance(); reader.init(fs, path, conf); return reader; - } catch (Exception e) { - IOException ie = new IOException("cannot get log reader"); - ie.initCause(e); - throw ie; + }catch (IOException e) { + throw e; + } + catch (Exception e) { + throw new IOException("Cannot get log reader", e); } } @@ -1011,13 +1013,15 @@ public class HLog implements HConstants, Syncable { * @param rootDir qualified root directory of the HBase instance * @param srcDir Directory of log files to split: e.g. * ${ROOTDIR}/log_HOST_PORT - * @param oldLogDir + * @param oldLogDir directory where processed (split) logs will be archived to * @param fs FileSystem * @param conf HBaseConfiguration - * @throws IOException + * @throws IOException will throw if corrupted hlogs aren't tolerated + * @return the list of splits */ public static List splitLog(final Path rootDir, final Path srcDir, - Path oldLogDir, final FileSystem fs, final Configuration conf) throws IOException { + Path oldLogDir, final FileSystem fs, final Configuration conf) + throws IOException { long millis = System.currentTimeMillis(); List splits = null; @@ -1032,18 +1036,11 @@ public class HLog implements HConstants, Syncable { } LOG.info("Splitting " + logfiles.length + " hlog(s) in " + srcDir.toString()); - splits = splitLog(rootDir, oldLogDir, logfiles, fs, conf); + splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf); + try { - FileStatus[] files = fs.listStatus(srcDir); - for(FileStatus file : files) { - Path newPath = getHLogArchivePath(oldLogDir, file.getPath()); - LOG.debug("Moving " + FSUtils.getPath(file.getPath()) + " to " + - FSUtils.getPath(newPath)); - fs.rename(file.getPath(), newPath); - } - LOG.debug("Moved " + files.length + " log files to " + - FSUtils.getPath(oldLogDir)); - fs.delete(srcDir, true); + LOG.info("Spliting is done. Removing old log dir "+srcDir); + fs.delete(srcDir, false); } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); IOException io = new IOException("Cannot delete: " + srcDir); @@ -1083,201 +1080,317 @@ public class HLog implements HConstants, Syncable { } } - /* - * @param rootDir - * @param logfiles - * @param fs - * @param conf - * @throws IOException - * @return List of splits made. - */ - private static List splitLog(final Path rootDir, - Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs, + private static List splitLog(final Path rootDir, final Path srcDir, + Path oldLogDir, final FileStatus[] logFiles, final FileSystem fs, final Configuration conf) throws IOException { + + ArrayList processedLogs = new ArrayList(); + ArrayList corruptedLogs = new ArrayList(); final Map logWriters = - Collections.synchronizedMap( - new TreeMap(Bytes.BYTES_COMPARATOR)); - List splits = null; - - // Number of threads to use when log splitting to rewrite the logs. - // More means faster but bigger mem consumption. - int logWriterThreads = - conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); - - // Number of logs to read concurrently when log splitting. - // More means faster but bigger mem consumption */ - int concurrentLogReads = + Collections.synchronizedMap( + new TreeMap(Bytes.BYTES_COMPARATOR)); + + List splits; + + // Number of logs in a read batch + // More means faster but bigger mem consumption + int logFilesPerStep = conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3); - // Is append supported? + boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); + try { - int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) / - concurrentLogReads)).intValue(); - for (int step = 0; step < maxSteps; step++) { - final Map> logEntries = - new TreeMap>(Bytes.BYTES_COMPARATOR); - // Stop at logfiles.length when it's the last step - int endIndex = step == maxSteps - 1? logfiles.length: - step * concurrentLogReads + concurrentLogReads; - for (int i = (step * concurrentLogReads); i < endIndex; i++) { - // Check for possibly empty file. With appends, currently Hadoop - // reports a zero length even if the file has been sync'd. Revisit if - // HADOOP-4751 is committed. - long length = logfiles[i].getLen(); - if (LOG.isDebugEnabled()) { - LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length + - ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen()); + int i = -1; + while (i < logFiles.length) { + final Map> editsByRegion = + new TreeMap>(Bytes.BYTES_COMPARATOR); + + for (int j = 0; j < logFilesPerStep; j++) { + i++; + if (i == logFiles.length) { + break; } - Reader in = null; - int count = 0; + + FileStatus log = logFiles[i]; + Path logPath = log.getPath(); + long logLength = log.getLen(); + + LOG.debug("Splitting hlog " + (i + 1) + " of " + logFiles.length + + ": " + logPath + ", length=" + logLength ); try { - in = HLog.getReader(fs, logfiles[i].getPath(), conf); - try { - HLog.Entry entry; - while ((entry = in.next()) != null) { - byte [] regionName = entry.getKey().getRegionName(); - LinkedList queue = logEntries.get(regionName); - if (queue == null) { - queue = new LinkedList(); - LOG.debug("Adding queue for " + Bytes.toStringBinary(regionName)); - logEntries.put(regionName, queue); - } - queue.push(entry); - count++; - } - LOG.debug("Pushed=" + count + " entries from " + - logfiles[i].getPath()); - } catch (IOException e) { - LOG.debug("IOE Pushed=" + count + " entries from " + - logfiles[i].getPath()); - e = RemoteExceptionHandler.checkIOException(e); - if (!(e instanceof EOFException)) { - LOG.warn("Exception processing " + logfiles[i].getPath() + - " -- continuing. Possible DATA LOSS!", e); - } - } + + recoverLog(true, fs, logPath); + parseHLog(log, editsByRegion, fs, conf); + processedLogs.add(logPath); + } catch (IOException e) { - if (length <= 0) { - LOG.warn("Empty hlog, continuing: " + logfiles[i] + " count=" + count, e); - continue; + if (skipErrors) { + LOG.warn("Got "+ e +" while parsing hlog " + logPath + + ". Marking as corrupted"); + corruptedLogs.add(logPath); + } else { + throw e; } - throw e; - } finally { - try { - if (in != null) { - in.close(); - } - } catch (IOException e) { - LOG.warn("Close in finally threw exception -- continuing", e); - } - // Archive the input file now so we do not replay edits. We could - // have gotten here because of an exception. If so, probably - // nothing we can do about it. Replaying it, it could work but we - // could be stuck replaying for ever. Just continue though we - // could have lost some edits. - fs.rename(logfiles[i].getPath(), - getHLogArchivePath(oldLogDir, logfiles[i].getPath())); - } - } - ExecutorService threadPool = - Executors.newFixedThreadPool(logWriterThreads); - for (final byte[] key : logEntries.keySet()) { - Thread thread = new Thread(Bytes.toStringBinary(key)) { - @Override - public void run() { - LinkedList entries = logEntries.get(key); - LOG.debug("Thread got " + entries.size() + " to process"); - long threadTime = System.currentTimeMillis(); - try { - int count = 0; - // Items were added to the linkedlist oldest first. Pull them - // out in that order. - for (ListIterator i = - entries.listIterator(entries.size()); - i.hasPrevious();) { - HLog.Entry logEntry = i.previous(); - WriterAndPath wap = logWriters.get(key); - if (wap == null) { - Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor - .getTableDir(rootDir, logEntry.getKey().getTablename()), - HRegionInfo.encodeRegionName(key)), - HREGION_OLDLOGFILE_NAME); - Path oldlogfile = null; - Reader old = null; - if (fs.exists(logfile)) { - FileStatus stat = fs.getFileStatus(logfile); - if (stat.getLen() <= 0) { - LOG.warn("Old hlog file " + logfile + " is zero " + - "length. Deleting existing file"); - fs.delete(logfile, false); - } else { - LOG.warn("Old hlog file " + logfile + " already " + - "exists. Copying existing file to new file"); - oldlogfile = new Path(logfile.toString() + ".old"); - fs.rename(logfile, oldlogfile); - old = getReader(fs, oldlogfile, conf); - } - } - Writer w = createWriter(fs, logfile, conf); - wap = new WriterAndPath(logfile, w); - logWriters.put(key, wap); - if (LOG.isDebugEnabled()) { - LOG.debug("Creating new hlog file writer for path " - + logfile + " and region " + Bytes.toStringBinary(key)); - } - - if (old != null) { - // Copy from existing log file - HLog.Entry entry; - for (; (entry = old.next()) != null; count++) { - if (LOG.isDebugEnabled() && count > 0 - && count % 10000 == 0) { - LOG.debug("Copied " + count + " edits"); - } - w.append(entry); - } - old.close(); - fs.delete(oldlogfile, true); - } - } - wap.w.append(logEntry); - count++; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Applied " + count + " total edits to " - + Bytes.toStringBinary(key) + " in " - + (System.currentTimeMillis() - threadTime) + "ms"); - } - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.warn("Got while writing region " + Bytes.toStringBinary(key) - + " log " + e); - e.printStackTrace(); - } - } - }; - threadPool.execute(thread); - } - threadPool.shutdown(); - // Wait for all threads to terminate - try { - for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); i++) { - LOG.debug("Waiting for hlog writers to terminate, iteration #" + i); } - }catch(InterruptedException ex) { - LOG.warn("Hlog writers were interrupted, possible data loss!"); } + + writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf); + } + + if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) { + throw new IOException("Discovered orphan hlog after split. Maybe HRegionServer was not dead when we started"); } + archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf); + } finally { splits = new ArrayList(logWriters.size()); for (WriterAndPath wap : logWriters.values()) { wap.w.close(); - LOG.debug("Closed " + wap.p); splits.add(wap.p); + LOG.debug("Closed " + wap.p); } } return splits; } + private static void writeEditsBatchToRegions( + Map> splitLogsMap, + Map logWriters, + Path rootDir, + FileSystem fs, + Configuration conf) throws IOException { + + // Number of threads to use when log splitting to rewrite the logs. + // More means faster but bigger mem consumption. + int logWriterThreads = + conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); + + boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); + ExecutorService threadPool = + Executors.newFixedThreadPool(logWriterThreads); + for (final byte[] key : splitLogsMap.keySet()) { + Thread thread = createNewSplitThread( + rootDir, logWriters, splitLogsMap, key, fs, conf); + threadPool.execute(thread); + } + threadPool.shutdown(); + // Wait for all threads to terminate + try { + for(int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) { + LOG.debug("Waiting for hlog writers to terminate, iteration #" + j); + } + } catch(InterruptedException ex) { + if (skipErrors) { + LOG.warn("Hlog writers were interrupted, possible data loss!"); + } else { + throw new IOException("Could not finish writing log entries", ex); + } + } + } + + + /** + * Parse a single hlog and put the edits in @splitLogsMap + * + * @param logfile to split + * @param splitLogsMap output parameter: a map with region names as keys and a + * list of edits as values + * @param fs the filesystem + * @param conf the configuration + * @throws IOException if hlog is corrupted, or can't be open + */ + private static void parseHLog( + FileStatus logfile, + final Map> splitLogsMap, + FileSystem fs, + Configuration conf) throws IOException { + + // Check for possibly empty file. With appends, currently Hadoop + // reports a zero length even if the file has been sync'd. Revisit if + // HDFS-376 or HDFS-878 is committed. + long length = logfile.getLen(); + Path path = logfile.getPath(); + + Reader in; + int editsCount = 0; + + if (length <= 0) { + LOG.warn("File " + path + " might be still open, length is 0"); + } + try { + in = HLog.getReader(fs, path, conf); + } catch (EOFException e) { + if (length <= 0) { + //TODO should we ignore an empty file if skip.errors is false? + //The caller should decide what to do. E.g. ignore if this is the last + //log in sequence. + //TODO is this scenario still possible if the log has been recovered (i.e. closed) + LOG.warn("Could not open " + path + " for reading. File is empty" + e); + return; + } + else { + throw e; + } + } + + try { + Entry entry; + while ((entry = in.next()) != null) { + byte[] region = entry.getKey().getRegionName(); + LinkedList queue = splitLogsMap.get(region); + if (queue == null) { + queue = new LinkedList(); + splitLogsMap.put(region, queue); + } + queue.push(entry); + editsCount++; + } + LOG.debug("Pushed=" + editsCount + " entries from " + path); + } finally { + try { + if (in != null) { + in.close(); + } + } catch (IOException e) { + LOG.warn("Close log reader in finally threw exception -- continuing", e); + } + } + } + + + private static Thread createNewSplitThread( + final Path rootDir, + final Map logWriters, + final Map> logEntries, + final byte[] key, + final FileSystem fs, + final Configuration conf + ) { + + return new Thread(Bytes.toStringBinary(key)) { + @Override + public void run() { + LinkedList entries = logEntries.get(key); + LOG.debug("Thread " + this.getId() + " got " + entries.size() + " to process"); + long threadTime = System.currentTimeMillis(); + try { + int editsCount = 0; + // Items were added to the linkedlist oldest first. Pull them + // out in that order. + for (ListIterator iterator = entries.listIterator(entries.size()); + iterator.hasPrevious();) { + Entry logEntry = iterator.previous(); + WriterAndPath wap = logWriters.get(key); + if (wap == null) { + Path logFile = getRegionLogPath(logEntry, rootDir, key); + if (fs.exists(logFile)) { + LOG.warn("Deleting old hlog file: " + logFile); + fs.delete(logFile, false); + } + Writer w = createWriter(fs, logFile, conf); + wap = new WriterAndPath(logFile, w); + logWriters.put(key, wap); + LOG.debug("Creating writer path=" + logFile + + " region=" + Bytes.toStringBinary(key)); + } + wap.w.append(logEntry); + wap.w.sync(); + editsCount++; + } + LOG.debug(this.getId() + " Applied " + editsCount + " total edits to " + + Bytes.toStringBinary(key) + " in " + + (System.currentTimeMillis() - threadTime) + "ms"); + + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.warn(this.getId() + "Got while writing log entry " + + Bytes.toStringBinary(key) + " log " + e); + } + } + }; + } + + + private static void archiveLogs(ArrayList corruptedLogs, + ArrayList processedLogs, + Path oldLogDir, + FileSystem fs, + Configuration conf) throws IOException{ + final Path corruptDir = + new Path(conf.get(HBASE_DIR), + conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt")); + + if (!fs.exists(corruptDir)) { + fs.mkdirs(corruptDir); + } + if (!fs.exists(oldLogDir)) { + fs.mkdirs(oldLogDir); + } + + for (Path corrupted: corruptedLogs) { + LOG.info("Moving corrupted log " + corrupted + " to " + + new Path(corruptDir, corrupted.getName())); + fs.rename(corrupted, new Path(corruptDir, corrupted.getName())); + } + + for (Path p: processedLogs) { + Path newPath = getHLogArchivePath(oldLogDir, p); + fs.rename(p, newPath); + LOG.info("Archived processed log " + p + " to " + newPath); + } + + } + + private static Path getRegionLogPath(Entry logEntry, Path rootDir, byte[] key) { + return new Path(HRegion.getRegionDir(HTableDescriptor + .getTableDir(rootDir, logEntry.getKey().getTablename()), + HRegionInfo.encodeRegionName(key)), + HREGION_OLDLOGFILE_NAME); + } + + /* + * Recover log. + * Try and open log in append mode. + * Doing this, we get a hold of the file that crashed writer + * was writing to. Once we have it, close it. This will + * allow subsequent reader to see up to last sync. + * @param fs + * @param p + * @param append + */ + public static void recoverLog(final boolean append, final FileSystem fs, final Path p + ) throws IOException{ + if (!append) { + return; + } + LOG.info("Recovering hlog" + p); + // lease recovery not needed for local file system case. + // currently, local file system doesn't implement append either. + if (!(fs instanceof DistributedFileSystem)) { + return; + } + + // Trying recovery + boolean recovered = false; + while (!recovered) { + try { +// HLog.Writer out = HLog.createWriter(fs, p, conf); + FSDataOutputStream out = fs.append(p); + out.close(); + recovered = true; + } catch (IOException e) { + LOG.info("Failed open for append, got "+ e + " waiting on lease recovery: " + p); + try { + Thread.sleep(1000); + }catch(InterruptedException ex) { + // ignore it and try again + } + } + } + LOG.info("Past out lease recovery " + p); + } + + + /** * Utility class that lets us keep track of the edit with it's key * Only used when splitting logs diff --git b/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java a/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 2e5c142..63adf46 100644 --- b/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ a/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -114,6 +114,18 @@ public class HBaseTestingUtility { startMiniCluster(1); } + public void startMiniDFSCluster(int servers) throws Exception { + this.dfsCluster = new MiniDFSCluster(12345, this.conf, servers, true, + true, true, null, null, null, null); + } + + public void shutdownMiniDFSCluster() throws Exception { + if (this.dfsCluster != null) { + // The below throws an exception per dn, AsynchronousCloseException. + this.dfsCluster.shutdown(); + } + } + public void startMiniZKCluster() throws Exception { // Note that this is done before we create the MiniHBaseCluster because we // need to edit the config to add the ZooKeeper servers. @@ -162,7 +174,7 @@ public class HBaseTestingUtility { // Bring up mini dfs cluster. This spews a bunch of warnings about missing // scheme. TODO: fix. // Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. - this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, + this.dfsCluster = new MiniDFSCluster(12345, this.conf, servers, true, true, true, null, null, null, null); // Restore System property. minidfscluster accesses content of // the TEST_DIRECTORY_KEY to make bad blocks, a feature we are not using, diff --git b/core/src/test/java/org/apache/hadoop/hbase/master/TestHLogSplit.java a/core/src/test/java/org/apache/hadoop/hbase/master/TestHLogSplit.java new file mode 100644 index 0000000..e0bea20 --- /dev/null +++ a/core/src/test/java/org/apache/hadoop/hbase/master/TestHLogSplit.java @@ -0,0 +1,577 @@ +package org.apache.hadoop.hbase.master; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.ipc.RemoteException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestHLogSplit { + + private Configuration conf; + private FileSystem fs; + + private final static HBaseTestingUtility + TEST_UTIL = new HBaseTestingUtility(); + + + private static final Path hbaseDir = new Path("/hbase"); + private static final Path hlogDir = new Path(hbaseDir, "hlog"); + private static final Path oldLogDir = new Path(hbaseDir, "hlog.old"); + private static final Path corruptDir = new Path(hbaseDir, ".corrupt"); + + private static final int NUM_WRITERS = 10; + private static final int ENTRIES = 10; // entries per writer per region + + private HLog.Writer[] writer = new HLog.Writer[NUM_WRITERS]; + private long seq = 0; + private static final byte[] TABLE_NAME = "t1".getBytes(); + private static final byte[] FAMILY = "f1".getBytes(); + private static final byte[] QUALIFIER = "q1".getBytes(); + private static final byte[] VALUE = "v1".getBytes(); + private static final String HLOG_FILE_PREFIX = "hlog.dat."; + private static List regions; + + + static enum Corruptions { + INSERT_GARBAGE_ON_FIRST_LINE, + INSERT_GARBAGE_IN_THE_MIDDLE, + APPEND_GARBAGE, + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration(). + setInt("hbase.regionserver.flushlogentries", 1); + TEST_UTIL.getConfiguration(). + setBoolean("dfs.support.append", true); + TEST_UTIL.getConfiguration(). + setStrings("hbase.rootdir", hbaseDir.toString()); + + TEST_UTIL.startMiniDFSCluster(2); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniDFSCluster(); + } + + @Before + public void setUp() throws Exception { + + conf = TEST_UTIL.getConfiguration(); + fs = TEST_UTIL.getDFSCluster().getFileSystem(); + FileStatus[] entries = fs.listStatus(new Path("/")); + for (FileStatus dir : entries){ + fs.delete(dir.getPath(), true); + } + seq = 0; + regions = new ArrayList(); + Collections.addAll(regions, "bbb", "ccc"); + TEST_UTIL.getDFSCluster().getNamesystem().leaseManager.setLeasePeriod(100, 50000); + + } + + @After + public void tearDown() throws Exception { + } + + + //TODO: check the edits order is respected (scenarios) + //TODO: test the split of a large (lots of regions > 500 file). In my tests it seems without hflush + // we're losing entries while writing to regions + + @Test + public void testEmptyLogFiles() throws IOException { + + injectEmptyFile(".empty", true); + generateHLogs(Integer.MAX_VALUE); + injectEmptyFile("empty", true); + + // make fs act as a different client now + // initialize will create a new DFSClient with a new client ID + fs.initialize(fs.getUri(), conf); + + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + + } + + + @Test + public void testEmptyOpenLogFiles() throws IOException { + injectEmptyFile(".empty", false); + generateHLogs(Integer.MAX_VALUE); + injectEmptyFile("empty", false); + + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + + + } + + @Test + public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException { + // generate logs but leave hlog.dat.5 open. + generateHLogs(5); + + fs.initialize(fs.getUri(), conf); + + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + + + } + + + @Test + public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException { + conf.setBoolean("hbase.skip.errors", true); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.APPEND_GARBAGE, true, fs); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + + + } + + @Test + public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException { + conf.setBoolean("hbase.skip.errors", true); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf)); + } + + + } + + + @Test + public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { + conf.setBoolean("hbase.skip.errors", true); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + // the entries in the original logs are alternating regions + // considering the sequence file header, the middle corruption should + // affect at least half of the entries + int goodEntries = (NUM_WRITERS - 1) * ENTRIES; + int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1; + assertTrue("The file up to the corrupted area hasn't been parsed", + goodEntries + firstHalfEntries <= countHLog(logfile, fs, conf)); + } + } + + @Test + public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { + conf.setBoolean("hbase.skip.errors", true); + + Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0"); + Path c2 = new Path(hlogDir, HLOG_FILE_PREFIX + "5"); + Path c3 = new Path(hlogDir, HLOG_FILE_PREFIX + (NUM_WRITERS - 1)); + generateHLogs(-1); + corruptHLog(c1, Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs); + corruptHLog(c2, Corruptions.APPEND_GARBAGE, true, fs); + corruptHLog(c3, Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs); + + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + FileStatus[] archivedLogs = fs.listStatus(corruptDir); + + assertEquals("expected a different file", c1.getName(), archivedLogs[0].getPath().getName()); + assertEquals("expected a different file", c2.getName(), archivedLogs[1].getPath().getName()); + assertEquals("expected a different file", c3.getName(), archivedLogs[2].getPath().getName()); + assertEquals(archivedLogs.length, 3); + + } + + @Test + public void testLogsGetArchivedAfterSplit() throws IOException { + conf.setBoolean("hbase.skip.errors", false); + + generateHLogs(-1); + + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + FileStatus[] archivedLogs = fs.listStatus(oldLogDir); + + assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); + } + + + + @Test(expected = IOException.class) + public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException { + conf.setBoolean("hbase.skip.errors", false); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.APPEND_GARBAGE, true, fs); + + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } + + @Test + public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException { + conf.setBoolean("hbase.skip.errors", false); + generateHLogs(-1); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.APPEND_GARBAGE, true, fs); + fs.initialize(fs.getUri(), conf); + try { + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } catch (IOException e) {/* expected */} + + assertEquals("if skip.errors is false all files should remain in place", + NUM_WRITERS, fs.listStatus(hlogDir).length); + } + + + @Test + public void testSplit() throws IOException { + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + + } + } + + @Test(expected = FileNotFoundException.class) + public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() throws IOException { + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + fs.listStatus(hlogDir); + + } + + @Test + public void testLogCannotBeWrittenOnceParsed() throws IOException { + generateHLogs(9); + fs.initialize(fs.getUri(), conf); + + + AtomicLong counter = new AtomicLong(0); + (new ZombieLastLogWriterRegionServer(counter)).start(); + + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, "juliet"); + + // It's possible that the writer got an error while appending and didn't count it + // however the entry will in fact be written to file and split with the rest + long numberOfEditsInRegion = countHLog(logfile, fs, conf); + assertTrue("The log file could have at most 1 extra log entry, but " + + "can't have less" + logfile, counter.get() == numberOfEditsInRegion || + counter.get() + 1 == numberOfEditsInRegion); + } + + + @Test(expected = IOException.class) + public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted() throws IOException { + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + (new ZombieNewLogWriterRegionServer()).start(); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } + + @Test + public void testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted() throws IOException { + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + (new ZombieNewLogWriterRegionServer()).start(); + try { + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } catch (IOException ex) {/* expected */} + int logFilesNumber = fs.listStatus(hlogDir).length; + + assertEquals("Log files should not be archived if there's an extra file after split", + NUM_WRITERS + 1, logFilesNumber); + return; + + } + + /** + * This thread will keep writing to the file after the split process has started + * It simulates a region server that was considered dead but woke up and wrote + * some more to he last log entry + */ + class ZombieLastLogWriterRegionServer extends Thread { + AtomicLong editsCount; + Path log; + HLog.Writer lastLogWriter = writer[NUM_WRITERS - 1]; + public ZombieLastLogWriterRegionServer(AtomicLong counter) { + this.editsCount = counter; + } + + @Override + public void run() { + flushToConsole("starting"); + while (true) { + try { + appendEntry(lastLogWriter, TABLE_NAME, "juliet".getBytes(), + ("r" + editsCount).getBytes(), FAMILY, QUALIFIER, VALUE, 0); + lastLogWriter.sync(); + editsCount.incrementAndGet(); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + // + } + + + } catch (IOException ex) { + if (ex instanceof RemoteException) { + flushToConsole("Juliet: got RemoteException " + + ex.getMessage() + " while writing " + (editsCount.get() + 1)); + break; + } else { + assertTrue("Failed to write " + editsCount.get(), false); + } + + } + } + + + } + } + + /** + * This thread will keep adding new log files + * It simulates a region server that was considered dead but woke up and wrote + * some more to a new hlog + */ + class ZombieNewLogWriterRegionServer extends Thread { + + + @Override + public void run() { + + boolean splitStarted = false; + while (!splitStarted) { + try { + splitStarted = fs.listStatus(new Path(hbaseDir, new String(TABLE_NAME))).length > 0; + } catch (FileNotFoundException e) { + try { + flushToConsole("Juliet: split not started, sleeping a bit..."); + Thread.sleep(100); + } catch (InterruptedException e1) { + // + } + } catch (IOException e1) { + assertTrue("Failed to list status ", false); + } + } + + Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet"); + try { + HLog.Writer writer = HLog.createWriter(fs, + julietLog, conf); + appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(), + ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0); + writer.close(); + flushToConsole("Juliet file creator: created file " + julietLog); + } catch (IOException e1) { + assertTrue("Failed to create file " + julietLog, false); + } + + } + + } + + private void flushToConsole(String s) { + System.out.println(s); + System.out.flush(); + } + + + private void generateHLogs(int leaveOpen) throws IOException { + for (int i = 0; i < NUM_WRITERS; i++) { + writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf); + for (int j = 0; j < ENTRIES; j++) { + int prefix = 0; + for (String region : regions) { + String row_key = region + prefix++ + i + j; + appendEntry(writer[i], TABLE_NAME, region.getBytes(), + row_key.getBytes(), FAMILY, QUALIFIER, VALUE, seq); + } + } + if (i != leaveOpen) { + writer[i].close(); + flushToConsole("Closing writer " + i); + } + } + } + + private Path getLogForRegion(Path rootdir, byte[] table, String region) { + return new Path(HRegion.getRegionDir(HTableDescriptor + .getTableDir(rootdir, table), + HRegionInfo.encodeRegionName(region.getBytes())), + "oldlogfile.log"); + } + + private void corruptHLog(Path path, Corruptions corruption, boolean close, + FileSystem fs) throws IOException { + + FSDataOutputStream out; + int fileSize = (int) fs.listStatus(path)[0].getLen(); + + FSDataInputStream in = fs.open(path); + byte[] corrupted_bytes = new byte[fileSize]; + in.readFully(0, corrupted_bytes, 0, fileSize); + in.close(); + + switch (corruption) { + case APPEND_GARBAGE: + out = fs.append(path); + out.write("-----".getBytes()); + closeOrFlush(close, out); + break; + + case INSERT_GARBAGE_ON_FIRST_LINE: + fs.delete(path, false); + out = fs.create(path); + out.write(0); + out.write(corrupted_bytes); + closeOrFlush(close, out); + break; + + case INSERT_GARBAGE_IN_THE_MIDDLE: + fs.delete(path, false); + out = fs.create(path); + int middle = (int) Math.floor(corrupted_bytes.length / 2); + out.write(corrupted_bytes, 0, middle); + out.write(0); + out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); + closeOrFlush(close, out); + break; + } + + + } + + private void closeOrFlush(boolean close, FSDataOutputStream out) throws IOException { + if (close) { + out.close(); + } else { + out.sync(); + out.hflush(); + } + } + + @SuppressWarnings("unused") + private void dumpHLog(Path log, FileSystem fs, Configuration conf) throws IOException { + HLog.Entry entry; + HLog.Reader in = HLog.getReader(fs, log, conf); + while ((entry = in.next()) != null) { + System.out.println(entry); + } + } + + private int countHLog(Path log, FileSystem fs, Configuration conf) throws IOException { + int count = 0; + HLog.Reader in = HLog.getReader(fs, log, conf); + while (in.next() != null) { + count++; + } + return count; + } + + + public long appendEntry(HLog.Writer writer, byte[] table, byte[] region, + byte[] row, byte[] family, byte[] qualifier, + byte[] value, long seq) + throws IOException { + + long time = System.nanoTime(); + WALEdit edit = new WALEdit(); + seq++; + edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value)); + writer.append(new HLog.Entry(new HLogKey(region, table, seq, time), edit)); + writer.sync(); + return seq; + + } + + private void injectEmptyFile(String suffix, boolean closeFile) + throws IOException { + HLog.Writer writer = HLog.createWriter( + fs, new Path(hlogDir, HLOG_FILE_PREFIX + suffix), conf); + if (closeFile) writer.close(); + } + + @SuppressWarnings("unused") + private void listLogs(FileSystem fs, Path dir) throws IOException { + for (FileStatus file : fs.listStatus(dir)) { + System.out.println(file.getPath()); + } + + } + +}