diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index e479eae..2fb4897 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -128,7 +128,9 @@ public class HLog implements HConstants, Syncable { private final List actionListeners = Collections.synchronizedList(new ArrayList()); + @SuppressWarnings("unchecked") private static Class logWriterClass; + @SuppressWarnings("unchecked") private static Class logReaderClass; private OutputStream hdfs_out; // OutputStream associated with the current SequenceFile.writer @@ -470,7 +472,6 @@ public class HLog implements HConstants, Syncable { * @return A WAL reader. Close when done with it. * @throws IOException */ - @SuppressWarnings("unchecked") public static Reader getReader(final FileSystem fs, final Path path, Configuration conf) throws IOException { @@ -483,10 +484,11 @@ public class HLog implements HConstants, Syncable { HLog.Reader reader = (HLog.Reader) logReaderClass.newInstance(); reader.init(fs, path, conf); return reader; - } catch (Exception e) { - IOException ie = new IOException("cannot get log reader"); - ie.initCause(e); - throw ie; + } catch (IOException e) { + throw e; + } + catch (Exception e) { + throw new IOException("Cannot get log reader", e); } } @@ -497,7 +499,6 @@ public class HLog implements HConstants, Syncable { * @return A WAL writer. Close when done with it. * @throws IOException */ - @SuppressWarnings("unchecked") public static Writer createWriter(final FileSystem fs, final Path path, Configuration conf) throws IOException { @@ -1130,10 +1131,11 @@ public class HLog implements HConstants, Syncable { * @param rootDir qualified root directory of the HBase instance * @param srcDir Directory of log files to split: e.g. * ${ROOTDIR}/log_HOST_PORT - * @param oldLogDir + * @param oldLogDir directory where processed (split) logs will be archived to * @param fs FileSystem * @param conf Configuration - * @throws IOException + * @throws IOException will throw if corrupted hlogs aren't tolerated + * @return the list of splits */ public static List splitLog(final Path rootDir, final Path srcDir, Path oldLogDir, final FileSystem fs, final Configuration conf) @@ -1152,18 +1154,10 @@ public class HLog implements HConstants, Syncable { } LOG.info("Splitting " + logfiles.length + " hlog(s) in " + srcDir.toString()); - splits = splitLog(rootDir, oldLogDir, logfiles, fs, conf); + splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf); try { - FileStatus[] files = fs.listStatus(srcDir); - for(FileStatus file : files) { - Path newPath = getHLogArchivePath(oldLogDir, file.getPath()); - LOG.debug("Moving " + FSUtils.getPath(file.getPath()) + " to " + - FSUtils.getPath(newPath)); - fs.rename(file.getPath(), newPath); - } - LOG.debug("Moved " + files.length + " log files to " + - FSUtils.getPath(oldLogDir)); - fs.delete(srcDir, true); + LOG.info("Spliting is done. Removing old log dir "+srcDir); + fs.delete(srcDir, false); } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); IOException io = new IOException("Cannot delete: " + srcDir); @@ -1203,296 +1197,77 @@ public class HLog implements HConstants, Syncable { } } - /* - * @param rootDir - * @param logfiles - * @param fs - * @param conf - * @throws IOException - * @return List of splits made. - */ - private static List splitLog(final Path rootDir, + private static List splitLog(final Path rootDir, final Path srcDir, Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs, final Configuration conf) throws IOException { + List processedLogs = new ArrayList(); + List corruptedLogs = new ArrayList(); final Map logWriters = Collections.synchronizedMap( new TreeMap(Bytes.BYTES_COMPARATOR)); List splits = null; - // Number of threads to use when log splitting to rewrite the logs. - // More means faster but bigger mem consumption. - int logWriterThreads = - conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); - - // Number of logs to read into memory before writing to their appropriate - // regions when log splitting. More means faster but bigger mem consumption + // Number of logs in a read batch + // More means faster but bigger mem consumption int logFilesPerStep = conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3); + boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); // append support = we can avoid data loss (yay) // we open for append, then close to recover the correct file length final boolean appendSupport = isAppend(conf); - // store corrupt logs for post-mortem analysis (empty string = discard) - final String corruptDir = - conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"); - - List finishedFiles = new LinkedList(); - List corruptFiles = new LinkedList(); - try { - int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) / - logFilesPerStep)).intValue(); - for (int step = 0; step < maxSteps; step++) { - - // Step 1: read N log files into memory - final Map> logEntries = - new TreeMap>(Bytes.BYTES_COMPARATOR); - int endIndex = step == maxSteps - 1? logfiles.length: - step * logFilesPerStep + logFilesPerStep; - for (int i = (step * logFilesPerStep); i < endIndex; i++) { - Path curLogFile = logfiles[i].getPath(); - - // make sure we get the right file length before opening for read - recoverLog(fs, curLogFile, appendSupport); - - long length = fs.getFileStatus(curLogFile).getLen(); - if (LOG.isDebugEnabled()) { - LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length + - ": " + curLogFile + ", length=" + length); + int i = -1; + while (i < logfiles.length) { + final Map> editsByRegion = + new TreeMap>(Bytes.BYTES_COMPARATOR); + for (int j = 0; j < logFilesPerStep; j++) { + i++; + if (i == logfiles.length) { + break; } - - Reader in = null; - boolean cleanRead = false; - int count = 0; + FileStatus log = logfiles[i]; + Path logPath = log.getPath(); + long logLength = log.getLen(); + LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length + + ": " + logPath + ", length=" + logLength ); try { - in = HLog.getReader(fs, curLogFile, conf); - try { - Entry entry; - while ((entry = in.next()) != null) { - byte [] regionName = entry.getKey().getRegionName(); - LinkedList queue = logEntries.get(regionName); - if (queue == null) { - queue = new LinkedList(); - LOG.debug("Adding queue for " + Bytes.toStringBinary(regionName)); - logEntries.put(regionName, queue); - } - queue.push(entry); - count++; - } - LOG.debug("Pushed=" + count + " entries from " + curLogFile); - cleanRead = true; - } catch (IOException e) { - LOG.debug("IOE Pushed=" + count + " entries from " + curLogFile); - e = RemoteExceptionHandler.checkIOException(e); - if (!(e instanceof EOFException)) { - String msg = "Exception processing " + curLogFile + - " -- continuing. Possible DATA LOSS!"; - if (corruptDir.length() > 0) { - msg += " Storing in hlog corruption directory."; - } - LOG.warn(msg, e); - } - } - } catch (IOException e) { - if (length <= 0) { - LOG.warn("Empty hlog, continuing: " + logfiles[i]); - cleanRead = true; - continue; - } - throw e; - } finally { - try { - if (in != null) { - in.close(); - } - } catch (IOException e) { - LOG.warn("File.close() threw exception -- continuing, " - + "but marking file as corrupt.", e); - cleanRead = false; - } - if (cleanRead) { - finishedFiles.add(curLogFile); - } else { - corruptFiles.add(curLogFile); - } - /* TODO FOR J-D REVIEW - // Archive the input file now so we do not replay edits. We could - // have gotten here because of an exception. If so, probably - // nothing we can do about it. Replaying it, it could work but we - // could be stuck replaying for ever. Just continue though we - // could have lost some edits. - fs.rename(logfiles[i].getPath(), - getHLogArchivePath(oldLogDir, logfiles[i].getPath())); - */ - } - } - - // Step 2: Some regionserver log files have been read into memory. - // Assign them to the appropriate region directory. - class ThreadWithException extends Thread { - ThreadWithException(String name) { super(name); } - public IOException exception = null; - } - List threadList = - new ArrayList(logEntries.size()); - ExecutorService threadPool = - Executors.newFixedThreadPool(logWriterThreads); - for (final byte [] region: logEntries.keySet()) { - ThreadWithException thread = - new ThreadWithException(Bytes.toStringBinary(region)) { - @Override - public void run() { - LinkedList entries = logEntries.get(region); - LOG.debug("Thread got " + entries.size() + " to process"); - if(entries.size() <= 0) { - LOG.warn("Got a region with no entries to process."); - return; - } - long threadTime = System.currentTimeMillis(); - try { - int count = 0; - // get the logfile associated with this region. 2 logs often - // write to the same region, so persist this info across logs - WriterAndPath wap = logWriters.get(region); - if (wap == null) { - // first write to this region, make new logfile - assert entries.size() > 0; - Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor - .getTableDir(rootDir, - entries.getFirst().getKey().getTablename()), - HRegionInfo.encodeRegionName(region)), - HREGION_OLDLOGFILE_NAME); - - // If splitLog() was running when the user restarted his - // cluster, then we could already have a 'logfile'. - // Since we don't delete logs until everything is written to - // their respective regions, we can safely remove this tmp. - if (fs.exists(logfile)) { - LOG.warn("Deleting old hlog file: " + logfile); - // TODO: Archive? - fs.delete(logfile, true); - } - - // associate an OutputStream with this logfile - Writer w = createWriter(fs, logfile, conf); - wap = new WriterAndPath(logfile, w); - logWriters.put(region, wap); - if (LOG.isDebugEnabled()) { - LOG.debug("Creating new hlog file writer for path " - + logfile + " and region " + Bytes.toStringBinary(region)); - } - } - - // Items were added to the linkedlist oldest first. Pull them - // out in that order. - for (ListIterator i = entries.listIterator(entries.size()); - i.hasPrevious();) { - wap.w.append(i.previous()); - count++; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Applied " + count + " total edits to " - + Bytes.toStringBinary(region) + " in " - + (System.currentTimeMillis() - threadTime) + "ms"); - } - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.warn("Got while writing region " - + Bytes.toStringBinary(region) + " log " + e); - e.printStackTrace(); - exception = e; - } - } - }; - threadList.add(thread); - threadPool.execute(thread); - } - threadPool.shutdown(); - // Wait for all threads to terminate - try { - for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); i++) { - LOG.debug("Waiting for hlog writers to terminate, iteration #" + i); - } - } catch (InterruptedException ex) { - LOG.warn("Hlog writers were interrupted during splitLog(). " - +"Retaining log files to avoid data loss."); - throw new IOException(ex.getMessage(), ex.getCause()); - } - // throw an exception if one of the threads reported one - for (ThreadWithException t : threadList) { - if (t.exception != null) { - throw t.exception; + recoverLog(appendSupport, fs, logPath); + parseHLog(log, editsByRegion, fs, conf); + processedLogs.add(logPath); + } catch (IOException e) { + if (skipErrors) { + LOG.warn("Got "+ e +" while parsing hlog " + logPath + + ". Marking as corrupted"); + corruptedLogs.add(logPath); + } else { + throw e; + } } } - - // End of for loop. Rinse and repeat + writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf); } + if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) { + throw new IOException("Discovered orphan hlog after split. Maybe " + + "HRegionServer was not dead when we started"); + } + archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf); } finally { splits = new ArrayList(logWriters.size()); for (WriterAndPath wap : logWriters.values()) { wap.w.close(); - LOG.debug("Closed " + wap.p); splits.add(wap.p); + LOG.debug("Closed " + wap.p); } } - - // Step 3: All writes succeeded! Get rid of the now-unnecessary logs - for(Path p : finishedFiles) { - if (LOG.isDebugEnabled()) { - LOG.debug("Successfully split Hlog file. Deleting " + p); - } - try { - if (!fs.delete(p, true) && LOG.isDebugEnabled()) { - LOG.debug("Delete of split Hlog (" + p + ") failed."); - } - } catch (IOException e) { - // don't throw error here. worst case = double-read - LOG.warn("Error deleting successfully split Hlog (" + p + ") -- " + e); - } - } - for (Path p : corruptFiles) { - if (corruptDir.length() > 0) { - // store any corrupt logs for later analysis - Path cp = new Path(conf.get(HBASE_DIR), corruptDir); - if(!fs.exists(cp)) { - fs.mkdirs(cp); - } - Path newp = new Path(cp, p.getName()); - if (!fs.exists(newp)) { - if (!fs.rename(p, newp)) { - LOG.warn("Rename of " + p + " to " + newp + " failed."); - } else { - LOG.warn("Corrupt Hlog (" + p + ") moved to " + newp); - } - } else { - LOG.warn("Corrupt Hlog (" + p + ") already moved to " + newp + - ". Ignoring"); - } - } else { - // data loss is less important than disk space, delete - try { - if (!fs.delete(p, true) ) { - if (LOG.isDebugEnabled()) { - LOG.debug("Delete of split Hlog " + p + " failed."); - } - } else { - LOG.warn("Corrupt Hlog (" + p + ") deleted!"); - } - } catch (IOException e) { - LOG.warn("Error deleting corrupt Hlog (" + p + ") -- " + e); - } - } - } - return splits; } /* * @param conf - * @return True if append enabled and we have the syncFs in our path. + * @return True if append support */ static boolean isAppend(final Configuration conf) { boolean append = conf.getBoolean("dfs.support.append", false); @@ -1507,6 +1282,8 @@ public class HLog implements HConstants, Syncable { } catch (NoSuchMethodException e) { append = false; } + } else { + // TODO: Test that was are on hadoop 0.21 w/ hdfs-265 support. How? } return append; } @@ -1567,64 +1344,6 @@ public class HLog implements HConstants, Syncable { } } - /* - * Recover log. - * Try and open log in append mode. - * Doing this, we get a hold of the file that crashed writer - * was writing to. Once we have it, close it. This will - * allow subsequent reader to see up to last sync. - * @param fs - * @param p - * @param append - */ - public static void recoverLog(final FileSystem fs, final Path p, - final boolean append) throws IOException { - if (!append) { - return; - } - - // lease recovery not needed for local file system case. - // currently, local file system doesn't implement append either. - if (!(fs instanceof DistributedFileSystem)) { - return; - } - - LOG.debug("Recovering DFS lease for path " + p); - long startWaiting = System.currentTimeMillis(); - - // Trying recovery - boolean recovered = false; - while (!recovered) { - try { - FSDataOutputStream out = fs.append(p); - out.close(); - recovered = true; - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - if (e instanceof AlreadyBeingCreatedException) { - // We expect that we'll get this message while the lease is still - // within its soft limit, but if we get it past that, it means - // that the RS is holding onto the file even though it lost its - // znode. We could potentially abort after some time here. - long waitedFor = System.currentTimeMillis() - startWaiting; - - if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) { - LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p - + ":" + e.getMessage()); - } - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - // ignore it and try again - } - } else { - throw new IOException("Failed to open " + p + " for append", e); - } - } - } - LOG.info("Past out lease recovery"); - } - /** * Construct the HLog directory name * @@ -1668,28 +1387,244 @@ public class HLog implements HConstants, Syncable { return new Path(oldLogDir, System.currentTimeMillis() + "." + p.getName()); } - private static void usage() { - System.err.println("Usage: java org.apache.hbase.HLog" + - " {--dump ... | --split ...}"); + private static void writeEditsBatchToRegions( + final Map> splitLogsMap, + final Map logWriters, + final Path rootDir, final FileSystem fs, final Configuration conf) + throws IOException { + // Number of threads to use when log splitting to rewrite the logs. + // More means faster but bigger mem consumption. + int logWriterThreads = + conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); + boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); + ExecutorService threadPool = Executors.newFixedThreadPool(logWriterThreads); + for (final byte[] key : splitLogsMap.keySet()) { + Thread thread = createNewSplitThread(rootDir, logWriters, splitLogsMap, key, fs, conf); + threadPool.execute(thread); + } + threadPool.shutdown(); + // Wait for all threads to terminate + try { + for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) { + LOG.debug("Waiting for hlog writers to terminate, iteration #" + j); + } + } catch(InterruptedException ex) { + if (skipErrors) { + LOG.warn("Hlog writers were interrupted, possible data loss!"); + } else { + throw new IOException("Could not finish writing log entries", ex); + } + } } - /** + /* + * Parse a single hlog and put the edits in @splitLogsMap * - * @param list + * @param logfile to split + * @param splitLogsMap output parameter: a map with region names as keys and a + * list of edits as values + * @param fs the filesystem + * @param conf the configuration + * @throws IOException if hlog is corrupted, or can't be open */ + private static void parseHLog(final FileStatus logfile, + final Map> splitLogsMap, final FileSystem fs, + final Configuration conf) + throws IOException { + // Check for possibly empty file. With appends, currently Hadoop reports a + // zero length even if the file has been sync'd. Revisit if HDFS-376 or + // HDFS-878 is committed. + long length = logfile.getLen(); + if (length <= 0) { + LOG.warn("File " + logfile + " might be still open, length is 0"); + } + Path path = logfile.getPath(); + Reader in; + int editsCount = 0; + try { + in = HLog.getReader(fs, path, conf); + } catch (EOFException e) { + if (length <= 0) { + //TODO should we ignore an empty file if skip.errors is false? + //The caller should decide what to do. E.g. ignore if this is the last + //log in sequence. + //TODO is this scenario still possible if the log has been recovered (i.e. closed) + LOG.warn("Could not open " + path + " for reading. File is empty" + e); + return; + } else { + throw e; + } + } + try { + Entry entry; + while ((entry = in.next()) != null) { + byte[] region = entry.getKey().getRegionName(); + LinkedList queue = splitLogsMap.get(region); + if (queue == null) { + queue = new LinkedList(); + splitLogsMap.put(region, queue); + } + queue.push(entry); + editsCount++; + } + LOG.debug("Pushed=" + editsCount + " entries from " + path); + } finally { + try { + if (in != null) { + in.close(); + } + } catch (IOException e) { + LOG.warn("Close log reader in finally threw exception -- continuing", e); + } + } + } + + private static Thread createNewSplitThread(final Path rootDir, + final Map logWriters, + final Map> logEntries, + final byte[] key, final FileSystem fs, final Configuration conf) { + return new Thread(Bytes.toStringBinary(key)) { + @Override + public void run() { + LinkedList entries = logEntries.get(key); + LOG.debug("Thread " + this.getId() + " got " + entries.size() + " to process"); + long threadTime = System.currentTimeMillis(); + try { + int editsCount = 0; + // Items were added to the linkedlist oldest first. Pull them + // out in that order. + for (ListIterator iterator = entries.listIterator(entries.size()); + iterator.hasPrevious();) { + Entry logEntry = iterator.previous(); + WriterAndPath wap = logWriters.get(key); + if (wap == null) { + Path logFile = getRegionLogPath(logEntry, rootDir, key); + if (fs.exists(logFile)) { + LOG.warn("Deleting old hlog file " + logFile + + ", length=" + fs.getFileStatus(logFile).getLen()); + fs.delete(logFile, false); + } + Writer w = createWriter(fs, logFile, conf); + wap = new WriterAndPath(logFile, w); + logWriters.put(key, wap); + LOG.debug("Creating writer path=" + logFile + + " region=" + Bytes.toStringBinary(key)); + } + wap.w.append(logEntry); + wap.w.sync(); + editsCount++; + } + LOG.debug(this.getId() + " Applied " + editsCount + + " total edits to " + Bytes.toStringBinary(key) + + " in " + (System.currentTimeMillis() - threadTime) + "ms"); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.warn(this.getId() + " Got while writing log entry " + + Bytes.toStringBinary(key) + " to log", e); + } + } + }; + } + + private static void archiveLogs(final List corruptedLogs, + final List processedLogs, final Path oldLogDir, + final FileSystem fs, final Configuration conf) + throws IOException{ + final Path corruptDir = new Path(conf.get(HBASE_DIR), + conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt")); + if (!fs.exists(corruptDir)) { + fs.mkdirs(corruptDir); + } + if (!fs.exists(oldLogDir)) { + fs.mkdirs(oldLogDir); + } + for (Path corrupted: corruptedLogs) { + Path p = new Path(corruptDir, corrupted.getName()); + LOG.info("Moving corrupted log " + corrupted + " to " + p); + fs.rename(corrupted, p); + } + + for (Path p: processedLogs) { + Path newPath = getHLogArchivePath(oldLogDir, p); + fs.rename(p, newPath); + LOG.info("Archived processed log " + p + " to " + newPath); + } + } + + private static Path getRegionLogPath(Entry logEntry, Path rootDir, byte[] key) { + Path tableDir = + HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename()); + Path regionDir = + HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(key)); + return new Path(regionDir, HREGION_OLDLOGFILE_NAME); + } + + /* + * Recover log p + * @param fs + * @param p + * @param append True if append supported + * @throws IOException + */ + public static void recoverLog(final boolean append, final FileSystem fs, final Path p) + throws IOException{ + if (!append) { + return; + } + // lease recovery not needed for local file system case. + // currently, local file system doesn't implement append either. + if (!(fs instanceof DistributedFileSystem)) { + return; + } + LOG.info("Recovering hlog" + p); + long startWaiting = System.currentTimeMillis(); + + // Trying recovery + boolean recovered = false; + while (!recovered) { + try { + FSDataOutputStream out = fs.append(p); + out.close(); + recovered = true; + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + if (e instanceof AlreadyBeingCreatedException) { + // We expect that we'll get this message while the lease is still + // within its soft limit, but if we get it past that, it means + // that the RS is holding onto the file even though it lost its + // znode. We could potentially abort after some time here. + long waitedFor = System.currentTimeMillis() - startWaiting; + if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) { + LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p + + ":" + e.getMessage()); + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + // ignore it and try again + } + } else { + throw new IOException("Failed to open " + p + " for append", e); + } + } + } + LOG.info("Past out lease recovery off " + p); + } + public void addLogActionsListerner(LogActionsListener list) { LOG.info("Adding a listener"); this.actionListeners.add(list); } - /** - * - * @param list - */ public boolean removeLogActionsListener(LogActionsListener list) { return this.actionListeners.remove(list); } + private static void usage() { + System.err.println("Usage: java org.apache.hbase.HLog" + + " {--dump ... | --split ...}"); + } + /** * Pass one or more log file names and it will either dump out a text version * on stdout or split the specified log files. diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestHLogSplit.java b/src/test/java/org/apache/hadoop/hbase/master/TestHLogSplit.java new file mode 100644 index 0000000..7c596e9 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/master/TestHLogSplit.java @@ -0,0 +1,600 @@ +package org.apache.hadoop.hbase.master; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.ipc.RemoteException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestHLogSplit { + + private Configuration conf; + private FileSystem fs; + + private final static HBaseTestingUtility + TEST_UTIL = new HBaseTestingUtility(); + + + private static final Path hbaseDir = new Path("/hbase"); + private static final Path hlogDir = new Path(hbaseDir, "hlog"); + private static final Path oldLogDir = new Path(hbaseDir, "hlog.old"); + private static final Path corruptDir = new Path(hbaseDir, ".corrupt"); + + private static final int NUM_WRITERS = 10; + private static final int ENTRIES = 10; // entries per writer per region + + private HLog.Writer[] writer = new HLog.Writer[NUM_WRITERS]; + private long seq = 0; + private static final byte[] TABLE_NAME = "t1".getBytes(); + private static final byte[] FAMILY = "f1".getBytes(); + private static final byte[] QUALIFIER = "q1".getBytes(); + private static final byte[] VALUE = "v1".getBytes(); + private static final String HLOG_FILE_PREFIX = "hlog.dat."; + private static List regions; + + + static enum Corruptions { + INSERT_GARBAGE_ON_FIRST_LINE, + INSERT_GARBAGE_IN_THE_MIDDLE, + APPEND_GARBAGE, + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration(). + setInt("hbase.regionserver.flushlogentries", 1); + TEST_UTIL.getConfiguration(). + setBoolean("dfs.support.append", true); + TEST_UTIL.getConfiguration(). + setStrings("hbase.rootdir", hbaseDir.toString()); + TEST_UTIL.startMiniDFSCluster(2); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniDFSCluster(); + } + + @Before + public void setUp() throws Exception { + conf = TEST_UTIL.getConfiguration(); + fs = TEST_UTIL.getDFSCluster().getFileSystem(); + FileStatus[] entries = fs.listStatus(new Path("/")); + for (FileStatus dir : entries){ + fs.delete(dir.getPath(), true); + } + seq = 0; + regions = new ArrayList(); + Collections.addAll(regions, "bbb", "ccc"); + + // Set the soft lease for hdfs to be down from default of 5 minutes or so. + // TODO: If 0.20 hadoop do one thing, if 0.21 hadoop do another. + // Not available in 0.20 hdfs + // TEST_UTIL.getDFSCluster().getNamesystem().leaseManager. + // setLeasePeriod(100, 50000); + // Use reflection to get at the 0.20 version of above. + MiniDFSCluster dfsCluster = TEST_UTIL.getDFSCluster(); + // private NameNode nameNode; + Field field = dfsCluster.getClass().getDeclaredField("nameNode"); + field.setAccessible(true); + NameNode nn = (NameNode)field.get(dfsCluster); + nn.namesystem.leaseManager.setLeasePeriod(100, 50000); + } + + @After + public void tearDown() throws Exception { + } + + + //TODO: check the edits order is respected (scenarios) + //TODO: test the split of a large (lots of regions > 500 file). In my tests it seems without hflush + // we're losing entries while writing to regions + + @Test + public void testEmptyLogFiles() throws IOException { + + injectEmptyFile(".empty", true); + generateHLogs(Integer.MAX_VALUE); + injectEmptyFile("empty", true); + + // make fs act as a different client now + // initialize will create a new DFSClient with a new client ID + fs.initialize(fs.getUri(), conf); + + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + + } + + + @Test + public void testEmptyOpenLogFiles() throws IOException { + injectEmptyFile(".empty", false); + generateHLogs(Integer.MAX_VALUE); + injectEmptyFile("empty", false); + + + // make fs act as a different client now + // initialize will create a new DFSClient with a new client ID + fs.initialize(fs.getUri(), conf); + + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + + + } + + @Test + public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException { + // generate logs but leave hlog.dat.5 open. + generateHLogs(5); + + fs.initialize(fs.getUri(), conf); + + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + + + } + + + @Test + public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException { + conf.setBoolean("hbase.skip.errors", true); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.APPEND_GARBAGE, true, fs); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + + + } + + @Test + public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException { + conf.setBoolean("hbase.skip.errors", true); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf)); + } + + + } + + + @Test + public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { + conf.setBoolean("hbase.skip.errors", true); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + // the entries in the original logs are alternating regions + // considering the sequence file header, the middle corruption should + // affect at least half of the entries + int goodEntries = (NUM_WRITERS - 1) * ENTRIES; + int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1; + assertTrue("The file up to the corrupted area hasn't been parsed", + goodEntries + firstHalfEntries <= countHLog(logfile, fs, conf)); + } + } + + @Test + public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { + conf.setBoolean("hbase.skip.errors", true); + + Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0"); + Path c2 = new Path(hlogDir, HLOG_FILE_PREFIX + "5"); + Path c3 = new Path(hlogDir, HLOG_FILE_PREFIX + (NUM_WRITERS - 1)); + generateHLogs(-1); + corruptHLog(c1, Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs); + corruptHLog(c2, Corruptions.APPEND_GARBAGE, true, fs); + corruptHLog(c3, Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs); + + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + FileStatus[] archivedLogs = fs.listStatus(corruptDir); + + assertEquals("expected a different file", c1.getName(), archivedLogs[0].getPath().getName()); + assertEquals("expected a different file", c2.getName(), archivedLogs[1].getPath().getName()); + assertEquals("expected a different file", c3.getName(), archivedLogs[2].getPath().getName()); + assertEquals(archivedLogs.length, 3); + + } + + @Test + public void testLogsGetArchivedAfterSplit() throws IOException { + conf.setBoolean("hbase.skip.errors", false); + + generateHLogs(-1); + + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + FileStatus[] archivedLogs = fs.listStatus(oldLogDir); + + assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); + } + + + + @Test(expected = IOException.class) + public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException { + conf.setBoolean("hbase.skip.errors", false); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.APPEND_GARBAGE, true, fs); + + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } + + @Test + public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException { + conf.setBoolean("hbase.skip.errors", false); + generateHLogs(-1); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.APPEND_GARBAGE, true, fs); + fs.initialize(fs.getUri(), conf); + try { + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } catch (IOException e) {/* expected */} + + assertEquals("if skip.errors is false all files should remain in place", + NUM_WRITERS, fs.listStatus(hlogDir).length); + } + + + @Test + public void testSplit() throws IOException { + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + + } + } + + @Test + public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() + throws IOException { + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + // TODO: listStatus throws FileNotFound if hadoop 0.21 but returns null + // if hadoop 0.20. + FileStatus [] statuses = fs.listStatus(hlogDir); + assertNull(statuses); + } + + @Test + public void testLogCannotBeWrittenOnceParsed() throws IOException { + generateHLogs(9); + fs.initialize(fs.getUri(), conf); + + + AtomicLong counter = new AtomicLong(0); + (new ZombieLastLogWriterRegionServer(counter)).start(); + + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, "juliet"); + + // It's possible that the writer got an error while appending and didn't count it + // however the entry will in fact be written to file and split with the rest + long numberOfEditsInRegion = countHLog(logfile, fs, conf); + assertTrue("The log file could have at most 1 extra log entry, but " + + "can't have less" + logfile, counter.get() == numberOfEditsInRegion || + counter.get() + 1 == numberOfEditsInRegion); + } + + + @Test(expected = IOException.class) + public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted() + throws IOException { + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + (new ZombieNewLogWriterRegionServer()).start(); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } + + @Test + public void testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted() + throws IOException { + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + (new ZombieNewLogWriterRegionServer()).start(); + try { + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } catch (IOException ex) {/* expected */} + int logFilesNumber = fs.listStatus(hlogDir).length; + + assertEquals("Log files should not be archived if there's an extra file after split", + NUM_WRITERS + 1, logFilesNumber); + return; + + } + + /** + * This thread will keep writing to the file after the split process has started + * It simulates a region server that was considered dead but woke up and wrote + * some more to he last log entry + */ + class ZombieLastLogWriterRegionServer extends Thread { + AtomicLong editsCount; + Path log; + HLog.Writer lastLogWriter = writer[NUM_WRITERS - 1]; + public ZombieLastLogWriterRegionServer(AtomicLong counter) { + this.editsCount = counter; + } + + @Override + public void run() { + flushToConsole("starting"); + while (true) { + try { + appendEntry(lastLogWriter, TABLE_NAME, "juliet".getBytes(), + ("r" + editsCount).getBytes(), FAMILY, QUALIFIER, VALUE, 0); + lastLogWriter.sync(); + editsCount.incrementAndGet(); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + // + } + + + } catch (IOException ex) { + if (ex instanceof RemoteException) { + flushToConsole("Juliet: got RemoteException " + + ex.getMessage() + " while writing " + (editsCount.get() + 1)); + break; + } else { + assertTrue("Failed to write " + editsCount.get(), false); + } + + } + } + + + } + } + + /** + * This thread will keep adding new log files + * It simulates a region server that was considered dead but woke up and wrote + * some more to a new hlog + */ + class ZombieNewLogWriterRegionServer extends Thread { + + + @Override + public void run() { + + boolean splitStarted = false; + while (!splitStarted) { + try { + splitStarted = fs.listStatus(new Path(hbaseDir, new String(TABLE_NAME))).length > 0; + } catch (FileNotFoundException e) { + try { + flushToConsole("Juliet: split not started, sleeping a bit..."); + Thread.sleep(100); + } catch (InterruptedException e1) { + // + } + } catch (IOException e1) { + assertTrue("Failed to list status ", false); + } + } + + Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet"); + try { + HLog.Writer writer = HLog.createWriter(fs, + julietLog, conf); + appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(), + ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0); + writer.close(); + flushToConsole("Juliet file creator: created file " + julietLog); + } catch (IOException e1) { + assertTrue("Failed to create file " + julietLog, false); + } + + } + + } + + private void flushToConsole(String s) { + System.out.println(s); + System.out.flush(); + } + + + private void generateHLogs(int leaveOpen) throws IOException { + for (int i = 0; i < NUM_WRITERS; i++) { + writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf); + for (int j = 0; j < ENTRIES; j++) { + int prefix = 0; + for (String region : regions) { + String row_key = region + prefix++ + i + j; + appendEntry(writer[i], TABLE_NAME, region.getBytes(), + row_key.getBytes(), FAMILY, QUALIFIER, VALUE, seq); + } + } + if (i != leaveOpen) { + writer[i].close(); + flushToConsole("Closing writer " + i); + } + } + } + + private Path getLogForRegion(Path rootdir, byte[] table, String region) { + return new Path(HRegion.getRegionDir(HTableDescriptor + .getTableDir(rootdir, table), + HRegionInfo.encodeRegionName(region.getBytes())), + "oldlogfile.log"); + } + + private void corruptHLog(Path path, Corruptions corruption, boolean close, + FileSystem fs) throws IOException { + + FSDataOutputStream out; + int fileSize = (int) fs.listStatus(path)[0].getLen(); + + FSDataInputStream in = fs.open(path); + byte[] corrupted_bytes = new byte[fileSize]; + in.readFully(0, corrupted_bytes, 0, fileSize); + in.close(); + + switch (corruption) { + case APPEND_GARBAGE: + out = fs.append(path); + out.write("-----".getBytes()); + closeOrFlush(close, out); + break; + + case INSERT_GARBAGE_ON_FIRST_LINE: + fs.delete(path, false); + out = fs.create(path); + out.write(0); + out.write(corrupted_bytes); + closeOrFlush(close, out); + break; + + case INSERT_GARBAGE_IN_THE_MIDDLE: + fs.delete(path, false); + out = fs.create(path); + int middle = (int) Math.floor(corrupted_bytes.length / 2); + out.write(corrupted_bytes, 0, middle); + out.write(0); + out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); + closeOrFlush(close, out); + break; + } + + + } + + private void closeOrFlush(boolean close, FSDataOutputStream out) + throws IOException { + if (close) { + out.close(); + } else { + out.sync(); + // Not in 0out.hflush(); + } + } + + @SuppressWarnings("unused") + private void dumpHLog(Path log, FileSystem fs, Configuration conf) throws IOException { + HLog.Entry entry; + HLog.Reader in = HLog.getReader(fs, log, conf); + while ((entry = in.next()) != null) { + System.out.println(entry); + } + } + + private int countHLog(Path log, FileSystem fs, Configuration conf) throws IOException { + int count = 0; + HLog.Reader in = HLog.getReader(fs, log, conf); + while (in.next() != null) { + count++; + } + return count; + } + + + public long appendEntry(HLog.Writer writer, byte[] table, byte[] region, + byte[] row, byte[] family, byte[] qualifier, + byte[] value, long seq) + throws IOException { + + long time = System.nanoTime(); + WALEdit edit = new WALEdit(); + seq++; + edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value)); + writer.append(new HLog.Entry(new HLogKey(region, table, seq, time), edit)); + writer.sync(); + return seq; + + } + + private void injectEmptyFile(String suffix, boolean closeFile) + throws IOException { + HLog.Writer writer = HLog.createWriter( + fs, new Path(hlogDir, HLOG_FILE_PREFIX + suffix), conf); + if (closeFile) writer.close(); + } + + @SuppressWarnings("unused") + private void listLogs(FileSystem fs, Path dir) throws IOException { + for (FileStatus file : fs.listStatus(dir)) { + System.out.println(file.getPath()); + } + + } + +}