diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 12a3cd8..a764068 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -30,6 +30,8 @@ import java.lang.reflect.Method; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.ListIterator; @@ -37,9 +39,12 @@ import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -73,6 +78,9 @@ import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; +import com.google.common.util.concurrent.NamingThreadFactory; + +import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease; /** * HLog stores all the edits to the HStore. Its the hbase write-ahead-log @@ -131,8 +139,9 @@ public class HLog implements HConstants, Syncable { private final List actionListeners = Collections.synchronizedList(new ArrayList()); - private static Class logWriterClass; - private static Class logReaderClass; + + private static Class logWriterClass; + private static Class logReaderClass; private OutputStream hdfs_out; // OutputStream associated with the current SequenceFile.writer private int initialReplication; // initial replication factor of SequenceFile.writer @@ -484,23 +493,23 @@ public class HLog implements HConstants, Syncable { * @return A WAL reader. Close when done with it. * @throws IOException */ - @SuppressWarnings("unchecked") public static Reader getReader(final FileSystem fs, final Path path, Configuration conf) throws IOException { try { if (logReaderClass == null) { - logReaderClass = Class.forName(conf.get("hbase.regionserver.hlog.reader.impl", - SequenceFileLogReader.class.getCanonicalName())); + logReaderClass =conf.getClass("hbase.regionserver.hlog.reader.impl", + SequenceFileLogReader.class, Reader.class); } - HLog.Reader reader = (HLog.Reader) logReaderClass.newInstance(); + HLog.Reader reader = logReaderClass.newInstance(); reader.init(fs, path, conf); return reader; - } catch (Exception e) { - IOException ie = new IOException("cannot get log reader"); - ie.initCause(e); - throw ie; + } catch (IOException e) { + throw e; + } + catch (Exception e) { + throw new IOException("Cannot get log reader", e); } } @@ -511,14 +520,13 @@ public class HLog implements HConstants, Syncable { * @return A WAL writer. Close when done with it. * @throws IOException */ - @SuppressWarnings("unchecked") public static Writer createWriter(final FileSystem fs, final Path path, Configuration conf) throws IOException { try { if (logWriterClass == null) { - logWriterClass = Class.forName(conf.get("hbase.regionserver.hlog.writer.impl", - SequenceFileLogWriter.class.getCanonicalName())); + logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl", + SequenceFileLogWriter.class, Writer.class); } HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance(); writer.init(fs, path, conf); @@ -1145,10 +1153,11 @@ public class HLog implements HConstants, Syncable { * @param rootDir qualified root directory of the HBase instance * @param srcDir Directory of log files to split: e.g. * ${ROOTDIR}/log_HOST_PORT - * @param oldLogDir + * @param oldLogDir directory where processed (split) logs will be archived to * @param fs FileSystem * @param conf Configuration - * @throws IOException + * @throws IOException will throw if corrupted hlogs aren't tolerated + * @return the list of splits */ public static List splitLog(final Path rootDir, final Path srcDir, Path oldLogDir, final FileSystem fs, final Configuration conf) @@ -1167,18 +1176,10 @@ public class HLog implements HConstants, Syncable { } LOG.info("Splitting " + logfiles.length + " hlog(s) in " + srcDir.toString()); - splits = splitLog(rootDir, oldLogDir, logfiles, fs, conf); + splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf); try { - FileStatus[] files = fs.listStatus(srcDir); - for(FileStatus file : files) { - Path newPath = getHLogArchivePath(oldLogDir, file.getPath()); - LOG.debug("Moving " + FSUtils.getPath(file.getPath()) + " to " + - FSUtils.getPath(newPath)); - fs.rename(file.getPath(), newPath); - } - LOG.debug("Moved " + files.length + " log files to " + - FSUtils.getPath(oldLogDir)); - fs.delete(srcDir, true); + LOG.info("Spliting is done. Removing old log dir "+srcDir); + fs.delete(srcDir, false); } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); IOException io = new IOException("Cannot delete: " + srcDir); @@ -1218,313 +1219,99 @@ public class HLog implements HConstants, Syncable { } } - /* - * @param rootDir - * @param logfiles + /** + * Sorts the HLog edits in the given list of logfiles (that are a mix of edits on multiple regions) + * by region and then splits them per region directories, in batches of (hbase.hlog.split.batch.size) + * + * A batch consists of a set of log files that will be sorted in a single map of edits indexed by region + * the resulting map will be concurrently written by multiple threads to their corresponding regions + * + * Each batch consists of more more log files that are + * - recovered (files is opened for append then closed to ensure no process is writing into it) + * - parsed (each edit in the log is appended to a list of edits indexed by region + * see {@link #parseHLog} for more details) + * - marked as either processed or corrupt depending on parsing outcome + * - the resulting edits indexed by region are concurrently written to their corresponding region + * region directories + * - original files are then archived to a different directory + * + * + * + * @param rootDir hbase directory + * @param srcDir logs directory + * @param oldLogDir directory where processed logs are archived to + * @param logfiles the list of log files to split * @param fs * @param conf + * @return * @throws IOException - * @return List of splits made. */ - private static List splitLog(final Path rootDir, + private static List splitLog(final Path rootDir, final Path srcDir, Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs, final Configuration conf) throws IOException { + List processedLogs = new ArrayList(); + List corruptedLogs = new ArrayList(); final Map logWriters = Collections.synchronizedMap( new TreeMap(Bytes.BYTES_COMPARATOR)); List splits = null; - // Number of threads to use when log splitting to rewrite the logs. - // More means faster but bigger mem consumption. - int logWriterThreads = - conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); - - // Number of logs to read into memory before writing to their appropriate - // regions when log splitting. More means faster but bigger mem consumption + // Number of logs in a read batch + // More means faster but bigger mem consumption + //TODO make a note on the conf rename and update hbase-site.xml if needed int logFilesPerStep = - conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3); - - // append support = we can avoid data loss (yay) - // we open for append, then close to recover the correct file length - final boolean appendSupport = isAppend(conf); - - // store corrupt logs for post-mortem analysis (empty string = discard) - final String corruptDir = - conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"); + conf.getInt("hbase.hlog.split.batch.size", 3); + boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false); - List finishedFiles = new LinkedList(); - List corruptFiles = new LinkedList(); try { - int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) / - logFilesPerStep)).intValue(); - for (int step = 0; step < maxSteps; step++) { - - // Step 1: read N log files into memory - final Map> logEntries = - new TreeMap>(Bytes.BYTES_COMPARATOR); - int endIndex = step == maxSteps - 1? logfiles.length: - step * logFilesPerStep + logFilesPerStep; - for (int i = (step * logFilesPerStep); i < endIndex; i++) { - Path curLogFile = logfiles[i].getPath(); - - // make sure we get the right file length before opening for read - recoverLog(fs, curLogFile, appendSupport); - - long length = fs.getFileStatus(curLogFile).getLen(); - if (LOG.isDebugEnabled()) { - LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length + - ": " + curLogFile + ", length=" + length); + int i = -1; + while (i < logfiles.length) { + final Map> editsByRegion = + new TreeMap>(Bytes.BYTES_COMPARATOR); + for (int j = 0; j < logFilesPerStep; j++) { + i++; + if (i == logfiles.length) { + break; } - - Reader in = null; - boolean cleanRead = false; - int count = 0; + FileStatus log = logfiles[i]; + Path logPath = log.getPath(); + long logLength = log.getLen(); + LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length + + ": " + logPath + ", length=" + logLength ); try { - in = HLog.getReader(fs, curLogFile, conf); - try { - Entry entry; - while ((entry = in.next()) != null) { - byte [] regionName = entry.getKey().getRegionName(); - LinkedList queue = logEntries.get(regionName); - if (queue == null) { - queue = new LinkedList(); - LOG.debug("Adding queue for " + Bytes.toStringBinary(regionName)); - logEntries.put(regionName, queue); - } - queue.push(entry); - count++; - } - LOG.debug("Pushed=" + count + " entries from " + curLogFile); - cleanRead = true; - } catch (IOException e) { - LOG.debug("IOE Pushed=" + count + " entries from " + curLogFile); - e = RemoteExceptionHandler.checkIOException(e); - if (!(e instanceof EOFException)) { - String msg = "Exception processing " + curLogFile + - " -- continuing. Possible DATA LOSS!"; - if (corruptDir.length() > 0) { - msg += " Storing in hlog corruption directory."; - } - LOG.warn(msg, e); - } - } - } catch (IOException e) { - if (length <= 0) { - LOG.warn("Empty hlog, continuing: " + logfiles[i]); - cleanRead = true; - continue; - } - throw e; - } finally { - try { - if (in != null) { - in.close(); - } - } catch (IOException e) { - LOG.warn("File.close() threw exception -- continuing, " - + "but marking file as corrupt.", e); - cleanRead = false; - } - if (cleanRead) { - finishedFiles.add(curLogFile); - } else { - corruptFiles.add(curLogFile); - } - /* TODO FOR J-D REVIEW - // Archive the input file now so we do not replay edits. We could - // have gotten here because of an exception. If so, probably - // nothing we can do about it. Replaying it, it could work but we - // could be stuck replaying for ever. Just continue though we - // could have lost some edits. - fs.rename(logfiles[i].getPath(), - getHLogArchivePath(oldLogDir, logfiles[i].getPath())); - */ - } - } - - // Step 2: Some regionserver log files have been read into memory. - // Assign them to the appropriate region directory. - class ThreadWithException extends Thread { - ThreadWithException(String name) { super(name); } - public IOException exception = null; - } - List threadList = - new ArrayList(logEntries.size()); - ExecutorService threadPool = - Executors.newFixedThreadPool(logWriterThreads); - for (final byte [] region: logEntries.keySet()) { - ThreadWithException thread = - new ThreadWithException(Bytes.toStringBinary(region)) { - @Override - public void run() { - LinkedList entries = logEntries.get(region); - LOG.debug("Thread got " + entries.size() + " to process"); - if(entries.size() <= 0) { - LOG.warn("Got a region with no entries to process."); - return; - } - long threadTime = System.currentTimeMillis(); - try { - int count = 0; - // get the logfile associated with this region. 2 logs often - // write to the same region, so persist this info across logs - WriterAndPath wap = logWriters.get(region); - if (wap == null) { - // first write to this region, make new logfile - assert entries.size() > 0; - Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor - .getTableDir(rootDir, - entries.getFirst().getKey().getTablename()), - HRegionInfo.encodeRegionName(region)), - HREGION_OLDLOGFILE_NAME); - - // If splitLog() was running when the user restarted his - // cluster, then we could already have a 'logfile'. - // Since we don't delete logs until everything is written to - // their respective regions, we can safely remove this tmp. - if (fs.exists(logfile)) { - LOG.warn("Deleting old hlog file: " + logfile); - // TODO: Archive? - fs.delete(logfile, true); - } - - // associate an OutputStream with this logfile - Writer w = createWriter(fs, logfile, conf); - wap = new WriterAndPath(logfile, w); - logWriters.put(region, wap); - if (LOG.isDebugEnabled()) { - LOG.debug("Creating new hlog file writer for path " - + logfile + " and region " + Bytes.toStringBinary(region)); - } - } - - // Items were added to the linkedlist oldest first. Pull them - // out in that order. - for (ListIterator i = entries.listIterator(entries.size()); - i.hasPrevious();) { - wap.w.append(i.previous()); - count++; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Applied " + count + " total edits to " - + Bytes.toStringBinary(region) + " in " - + (System.currentTimeMillis() - threadTime) + "ms"); - } - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.warn("Got while writing region " - + Bytes.toStringBinary(region) + " log " + e); - e.printStackTrace(); - exception = e; - } - } - }; - threadList.add(thread); - threadPool.execute(thread); - } - threadPool.shutdown(); - // Wait for all threads to terminate - try { - for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); i++) { - LOG.debug("Waiting for hlog writers to terminate, iteration #" + i); - } - } catch (InterruptedException ex) { - LOG.warn("Hlog writers were interrupted during splitLog(). " - +"Retaining log files to avoid data loss."); - throw new IOException(ex.getMessage(), ex.getCause()); - } - // throw an exception if one of the threads reported one - for (ThreadWithException t : threadList) { - if (t.exception != null) { - throw t.exception; + recoverFileLease(fs, logPath, conf); + parseHLog(log, editsByRegion, fs, conf); + processedLogs.add(logPath); + } catch (IOException e) { + if (skipErrors) { + LOG.warn("Got while parsing hlog " + logPath + + ". Marking as corrupted", e); + corruptedLogs.add(logPath); + } else { + throw e; + } } } - - // End of for loop. Rinse and repeat + writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf); + } + if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) { + throw new IOException("Discovered orphan hlog after split. Maybe " + + "HRegionServer was not dead when we started"); } + archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf); } finally { splits = new ArrayList(logWriters.size()); for (WriterAndPath wap : logWriters.values()) { wap.w.close(); - LOG.debug("Closed " + wap.p); splits.add(wap.p); + LOG.debug("Closed " + wap.p); } } - - // Step 3: All writes succeeded! Get rid of the now-unnecessary logs - for(Path p : finishedFiles) { - if (LOG.isDebugEnabled()) { - LOG.debug("Successfully split Hlog file. Deleting " + p); - } - try { - if (!fs.delete(p, true) && LOG.isDebugEnabled()) { - LOG.debug("Delete of split Hlog (" + p + ") failed."); - } - } catch (IOException e) { - // don't throw error here. worst case = double-read - LOG.warn("Error deleting successfully split Hlog (" + p + ") -- " + e); - } - } - for (Path p : corruptFiles) { - if (corruptDir.length() > 0) { - // store any corrupt logs for later analysis - Path cp = new Path(conf.get(HBASE_DIR), corruptDir); - if(!fs.exists(cp)) { - fs.mkdirs(cp); - } - Path newp = new Path(cp, p.getName()); - if (!fs.exists(newp)) { - if (!fs.rename(p, newp)) { - LOG.warn("Rename of " + p + " to " + newp + " failed."); - } else { - LOG.warn("Corrupt Hlog (" + p + ") moved to " + newp); - } - } else { - LOG.warn("Corrupt Hlog (" + p + ") already moved to " + newp + - ". Ignoring"); - } - } else { - // data loss is less important than disk space, delete - try { - if (!fs.delete(p, true) ) { - if (LOG.isDebugEnabled()) { - LOG.debug("Delete of split Hlog " + p + " failed."); - } - } else { - LOG.warn("Corrupt Hlog (" + p + ") deleted!"); - } - } catch (IOException e) { - LOG.warn("Error deleting corrupt Hlog (" + p + ") -- " + e); - } - } - } - return splits; } - /* - * @param conf - * @return True if append enabled and we have the syncFs in our path. - */ - static boolean isAppend(final Configuration conf) { - boolean append = conf.getBoolean("dfs.support.append", false); - if (append) { - try { - // TODO: The implementation that comes back when we do a createWriter - // may not be using SequenceFile so the below is not a definitive test. - // Will do for now (hdfs-200). - SequenceFile.Writer.class.getMethod("syncFs", new Class []{}); - append = true; - } catch (SecurityException e) { - } catch (NoSuchMethodException e) { - append = false; - } - } - return append; - } /** * Utility class that lets us keep track of the edit with it's key @@ -1582,64 +1369,6 @@ public class HLog implements HConstants, Syncable { } } - /* - * Recover log. - * Try and open log in append mode. - * Doing this, we get a hold of the file that crashed writer - * was writing to. Once we have it, close it. This will - * allow subsequent reader to see up to last sync. - * @param fs - * @param p - * @param append - */ - public static void recoverLog(final FileSystem fs, final Path p, - final boolean append) throws IOException { - if (!append) { - return; - } - - // lease recovery not needed for local file system case. - // currently, local file system doesn't implement append either. - if (!(fs instanceof DistributedFileSystem)) { - return; - } - - LOG.debug("Recovering DFS lease for path " + p); - long startWaiting = System.currentTimeMillis(); - - // Trying recovery - boolean recovered = false; - while (!recovered) { - try { - FSDataOutputStream out = fs.append(p); - out.close(); - recovered = true; - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - if (e instanceof AlreadyBeingCreatedException) { - // We expect that we'll get this message while the lease is still - // within its soft limit, but if we get it past that, it means - // that the RS is holding onto the file even though it lost its - // znode. We could potentially abort after some time here. - long waitedFor = System.currentTimeMillis() - startWaiting; - - if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) { - LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p - + ":" + e.getMessage()); - } - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - // ignore it and try again - } - } else { - throw new IOException("Failed to open " + p + " for append", e); - } - } - } - LOG.info("Past out lease recovery"); - } - /** * Construct the HLog directory name * @@ -1687,28 +1416,247 @@ public class HLog implements HConstants, Syncable { return new Path(oldLogDir, p.getName()); } - private static void usage() { - System.err.println("Usage: java org.apache.hbase.HLog" + - " {--dump ... | --split ...}"); + /** + * Takes splitLogsMap and concurrently writes them to region directories using a thread pool + * + * @param splitLogsMap map that contains the log splitting result indexed by region + * @param logWriters map that contains a writer per region + * @param rootDir hbase root dir + * @param fs + * @param conf + * @throws IOException + */ + private static void writeEditsBatchToRegions( + final Map> splitLogsMap, + final Map logWriters, + final Path rootDir, final FileSystem fs, final Configuration conf) + throws IOException { + // Number of threads to use when log splitting to rewrite the logs. + // More means faster but bigger mem consumption. + int logWriterThreads = + conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); + boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); + HashMap writeFutureResult = new HashMap(); + NamingThreadFactory f = new NamingThreadFactory( + "SplitWriter-%1$d", Executors.defaultThreadFactory()); + ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, f); + for (final byte[] region : splitLogsMap.keySet()) { + Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf); + writeFutureResult.put(region, threadPool.submit(splitter)); + } + + threadPool.shutdown(); + // Wait for all threads to terminate + try { + for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) { + String message = "Waiting for hlog writers to terminate, elapsed " + j * 5 + " seconds"; + if (j < 30) { + LOG.debug(message); + } else { + LOG.info(message); + } + + } + } catch(InterruptedException ex) { + LOG.warn("Hlog writers were interrupted, possible data loss!"); + if (!skipErrors) { + throw new IOException("Could not finish writing log entries", ex); + //TODO maybe we should fail here regardless if skipErrors is active or not + } + } + + for (Map.Entry entry : writeFutureResult.entrySet()) { + try { + entry.getValue().get(); + } catch (ExecutionException e) { + throw (new IOException(e.getCause())); + } catch (InterruptedException e1) { + LOG.warn("Writer for region " + Bytes.toString(entry.getKey()) + + " was interrupted, however the write process should have " + + "finished. Throwing up ", e1); + throw (new IOException(e1.getCause())); + } + } + } + + /* + * Parse a single hlog and put the edits in @splitLogsMap + * + * @param logfile to split + * @param splitLogsMap output parameter: a map with region names as keys and a + * list of edits as values + * @param fs the filesystem + * @param conf the configuration + * @throws IOException if hlog is corrupted, or can't be open + */ + private static void parseHLog(final FileStatus logfile, + final Map> splitLogsMap, final FileSystem fs, + final Configuration conf) + throws IOException { + // Check for possibly empty file. With appends, currently Hadoop reports a + // zero length even if the file has been sync'd. Revisit if HDFS-376 or + // HDFS-878 is committed. + long length = logfile.getLen(); + if (length <= 0) { + LOG.warn("File " + logfile.getPath() + " might be still open, length is 0"); + } + Path path = logfile.getPath(); + Reader in; + int editsCount = 0; + try { + in = HLog.getReader(fs, path, conf); + } catch (EOFException e) { + if (length <= 0) { + //TODO should we ignore an empty, not-last log file if skip.errors is false? + //Either way, the caller should decide what to do. E.g. ignore if this is the last + //log in sequence. + //TODO is this scenario still possible if the log has been recovered (i.e. closed) + LOG.warn("Could not open " + path + " for reading. File is empty" + e); + return; + } else { + throw e; + } + } + try { + Entry entry; + while ((entry = in.next()) != null) { + byte[] region = entry.getKey().getRegionName(); + LinkedList queue = splitLogsMap.get(region); + if (queue == null) { + queue = new LinkedList(); + splitLogsMap.put(region, queue); + } + queue.addFirst(entry); + editsCount++; + } + LOG.debug("Pushed=" + editsCount + " entries from " + path); + } finally { + try { + if (in != null) { + in.close(); + } + } catch (IOException e) { + LOG.warn("Close log reader in finally threw exception -- continuing", e); + } + } + } + + private static Callable createNewSplitter(final Path rootDir, + final Map logWriters, + final Map> logEntries, + final byte[] region, final FileSystem fs, final Configuration conf) { + return new Callable() { + public String getName() { + return "Split writer thread for region " + Bytes.toStringBinary(region); + } + + @Override + public Void call() throws IOException { + LinkedList entries = logEntries.get(region); + LOG.debug(this.getName()+" got " + entries.size() + " to process"); + long threadTime = System.currentTimeMillis(); + try { + int editsCount = 0; + WriterAndPath wap = logWriters.get(region); + for (ListIterator iterator = entries.listIterator(); + iterator.hasNext();) { + Entry logEntry = iterator.next(); + + if (wap == null) { + Path logFile = getRegionLogPath(logEntry, rootDir); + if (fs.exists(logFile)) { + LOG.warn("Found existing old hlog file. It could be the result of a previous" + + "failed split attempt. Deleting " + logFile + + ", length=" + fs.getFileStatus(logFile).getLen()); + fs.delete(logFile, false); + } + Writer w = createWriter(fs, logFile, conf); + wap = new WriterAndPath(logFile, w); + logWriters.put(region, wap); + LOG.debug("Creating writer path=" + logFile + + " region=" + Bytes.toStringBinary(region)); + } + + wap.w.append(logEntry); + editsCount++; + } + LOG.debug(this.getName() + " Applied " + editsCount + + " total edits to " + Bytes.toStringBinary(region) + + " in " + (System.currentTimeMillis() - threadTime) + "ms"); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.fatal(this.getName() + " Got while writing log entry to log", e); + throw e; + } + return null; + } + }; } /** + * Moves processed logs to a oldLogDir after successful processing + * Moves corrupted logs (any log that couldn't be successfully parsed + * to corruptDir (.corrupt) for later investigation * - * @param list + * @param corruptedLogs + * @param processedLogs + * @param oldLogDir + * @param fs + * @param conf + * @throws IOException */ + private static void archiveLogs(final List corruptedLogs, + final List processedLogs, final Path oldLogDir, + final FileSystem fs, final Configuration conf) + throws IOException{ + final Path corruptDir = new Path(conf.get(HBASE_DIR), + conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt")); + + fs.mkdirs(corruptDir); + fs.mkdirs(oldLogDir); + + for (Path corrupted: corruptedLogs) { + Path p = new Path(corruptDir, corrupted.getName()); + LOG.info("Moving corrupted log " + corrupted + " to " + p); + fs.rename(corrupted, p); + } + + for (Path p: processedLogs) { + Path newPath = getHLogArchivePath(oldLogDir, p); + fs.rename(p, newPath); + LOG.info("Archived processed log " + p + " to " + newPath); + } + } + + private static Path getRegionLogPath(Entry logEntry, Path rootDir) { + Path tableDir = + HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename()); + Path regionDir = + HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName())); + return new Path(regionDir, HREGION_OLDLOGFILE_NAME); + } + + + + + + + + public void addLogActionsListerner(LogActionsListener list) { LOG.info("Adding a listener"); this.actionListeners.add(list); } - /** - * - * @param list - */ public boolean removeLogActionsListener(LogActionsListener list) { return this.actionListeners.remove(list); } + private static void usage() { + System.err.println("Usage: java org.apache.hbase.HLog" + + " {--dump ... | --split ...}"); + } + /** * Pass one or more log file names and it will either dump out a text version * on stdout or split the specified log files. diff --git a/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 9f6bfa1..0feb1e3 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -34,7 +34,9 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.FSConstants; +import org.apache.hadoop.io.SequenceFile; import java.io.DataInputStream; import java.io.IOException; @@ -557,4 +559,88 @@ public class FSUtils { return isdir; } } + + /** + * Heuristic to determine whether is safe or not to open a file for append + * Looks both for dfs.support.append and use reflection to search + * for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush() + * @param conf + * @return True if append support + */ + public static boolean isAppendSupported(final Configuration conf) { + boolean append = conf.getBoolean("dfs.support.append", false); + if (append) { + try { + // TODO: The implementation that comes back when we do a createWriter + // may not be using SequenceFile so the below is not a definitive test. + // Will do for now (hdfs-200). + SequenceFile.Writer.class.getMethod("syncFs", new Class []{}); + append = true; + } catch (SecurityException e) { + } catch (NoSuchMethodException e) { + append = false; + } + } else { + try { + FSDataOutputStream.class.getMethod("hflush", new Class []{}); + } catch (NoSuchMethodException e) { + append = false; + } + } + return append; + } + + + /* + * Recover file lease. Used when a file might be suspect to be had been left open by another process. p + * @param fs + * @param p + * @param append True if append supported + * @throws IOException + */ + public static void recoverFileLease(final FileSystem fs, final Path p, Configuration conf) + throws IOException{ + if (!isAppendSupported(conf)) { + return; + } + // lease recovery not needed for local file system case. + // currently, local file system doesn't implement append either. + if (!(fs instanceof DistributedFileSystem)) { + return; + } + LOG.info("Recovering file" + p); + long startWaiting = System.currentTimeMillis(); + + // Trying recovery + boolean recovered = false; + while (!recovered) { + try { + FSDataOutputStream out = fs.append(p); + out.close(); + recovered = true; + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + if (e instanceof AlreadyBeingCreatedException) { + // We expect that we'll get this message while the lease is still + // within its soft limit, but if we get it past that, it means + // that the RS is holding onto the file even though it lost its + // znode. We could potentially abort after some time here. + long waitedFor = System.currentTimeMillis() - startWaiting; + if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) { + LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p + + ":" + e.getMessage()); + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + // ignore it and try again + } + } else { + throw new IOException("Failed to open " + p + " for append", e); + } + } + } + LOG.info("Finished lease recover attempt for " + p); + } + } \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java new file mode 100644 index 0000000..7c740d3 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java @@ -0,0 +1,37 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; + +import org.apache.hadoop.hbase.util.Bytes; + +public class InstrumentedSequenceFileLogWriter extends SequenceFileLogWriter { + + public static boolean activateFailure = false; + @Override + public void append(HLog.Entry entry) throws IOException { + super.append(entry); + if (activateFailure && Bytes.equals(entry.getKey().getRegionName(), "break".getBytes())) { + System.out.println(getClass().getName() + ": I will throw an exception now..."); + throw(new IOException("This exception is instrumented and should only be thrown for testing")); + } + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java new file mode 100644 index 0000000..29b9fdc --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -0,0 +1,734 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import javax.naming.InsufficientResourcesException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.ipc.RemoteException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Testing {@link HLog} splitting code. + */ +public class TestHLogSplit { + + private Configuration conf; + private FileSystem fs; + + private final static HBaseTestingUtility + TEST_UTIL = new HBaseTestingUtility(); + + + private static final Path hbaseDir = new Path("/hbase"); + private static final Path hlogDir = new Path(hbaseDir, "hlog"); + private static final Path oldLogDir = new Path(hbaseDir, "hlog.old"); + private static final Path corruptDir = new Path(hbaseDir, ".corrupt"); + + private static final int NUM_WRITERS = 10; + private static final int ENTRIES = 10; // entries per writer per region + + private HLog.Writer[] writer = new HLog.Writer[NUM_WRITERS]; + private long seq = 0; + private static final byte[] TABLE_NAME = "t1".getBytes(); + private static final byte[] FAMILY = "f1".getBytes(); + private static final byte[] QUALIFIER = "q1".getBytes(); + private static final byte[] VALUE = "v1".getBytes(); + private static final String HLOG_FILE_PREFIX = "hlog.dat."; + private static List regions; + private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors"; + + + static enum Corruptions { + INSERT_GARBAGE_ON_FIRST_LINE, + INSERT_GARBAGE_IN_THE_MIDDLE, + APPEND_GARBAGE, + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration(). + setInt("hbase.regionserver.flushlogentries", 1); + TEST_UTIL.getConfiguration(). + setBoolean("dfs.support.append", true); + TEST_UTIL.getConfiguration(). + setStrings("hbase.rootdir", hbaseDir.toString()); + TEST_UTIL.getConfiguration(). + setClass("hbase.regionserver.hlog.writer.impl", + InstrumentedSequenceFileLogWriter.class, HLog.Writer.class); + + TEST_UTIL.startMiniDFSCluster(2); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniDFSCluster(); + } + + @Before + public void setUp() throws Exception { + conf = TEST_UTIL.getConfiguration(); + fs = TEST_UTIL.getDFSCluster().getFileSystem(); + FileStatus[] entries = fs.listStatus(new Path("/")); + for (FileStatus dir : entries){ + fs.delete(dir.getPath(), true); + } + seq = 0; + regions = new ArrayList(); + Collections.addAll(regions, "bbb", "ccc"); + InstrumentedSequenceFileLogWriter.activateFailure = false; + // Set the soft lease for hdfs to be down from default of 5 minutes or so. + // TODO: If 0.20 hadoop do one thing, if 0.21 hadoop do another. + // Not available in 0.20 hdfs + // TEST_UTIL.getDFSCluster().getNamesystem().leaseManager. + // setLeasePeriod(100, 50000); + // Use reflection to get at the 0.20 version of above. + MiniDFSCluster dfsCluster = TEST_UTIL.getDFSCluster(); + // private NameNode nameNode; + Field field = dfsCluster.getClass().getDeclaredField("nameNode"); + field.setAccessible(true); + NameNode nn = (NameNode)field.get(dfsCluster); + nn.namesystem.leaseManager.setLeasePeriod(100, 50000); + } + + @After + public void tearDown() throws Exception { + } + + @Test(expected = IOException.class) + public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted() + throws IOException { + AtomicBoolean stop = new AtomicBoolean(false); + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + try { + (new ZombieNewLogWriterRegionServer(stop)).start(); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } finally { + stop.set(true); + } + } + + //TODO: check the edits order is respected (scenarios) + + @Test + public void testEmptyLogFiles() throws IOException { + + injectEmptyFile(".empty", true); + generateHLogs(Integer.MAX_VALUE); + injectEmptyFile("empty", true); + + // make fs act as a different client now + // initialize will create a new DFSClient with a new client ID + fs.initialize(fs.getUri(), conf); + + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + + } + + + @Test + public void testEmptyOpenLogFiles() throws IOException { + injectEmptyFile(".empty", false); + generateHLogs(Integer.MAX_VALUE); + injectEmptyFile("empty", false); + + // make fs act as a different client now + // initialize will create a new DFSClient with a new client ID + fs.initialize(fs.getUri(), conf); + + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + } + + @Test + public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException { + // generate logs but leave hlog.dat.5 open. + generateHLogs(5); + + fs.initialize(fs.getUri(), conf); + + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + + + } + + + @Test + public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException { + conf.setBoolean(HBASE_SKIP_ERRORS, true); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.APPEND_GARBAGE, true, fs); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + } + + + } + + @Test + public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException { + conf.setBoolean(HBASE_SKIP_ERRORS, true); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf)); + } + + + } + + + @Test + public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException { + conf.setBoolean(HBASE_SKIP_ERRORS, true); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + // the entries in the original logs are alternating regions + // considering the sequence file header, the middle corruption should + // affect at least half of the entries + int goodEntries = (NUM_WRITERS - 1) * ENTRIES; + int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1; + assertTrue("The file up to the corrupted area hasn't been parsed", + goodEntries + firstHalfEntries <= countHLog(logfile, fs, conf)); + } + } + + @Test + public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { + conf.setBoolean(HBASE_SKIP_ERRORS, true); + + Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0"); + Path c2 = new Path(hlogDir, HLOG_FILE_PREFIX + "5"); + Path c3 = new Path(hlogDir, HLOG_FILE_PREFIX + (NUM_WRITERS - 1)); + generateHLogs(-1); + corruptHLog(c1, Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs); + corruptHLog(c2, Corruptions.APPEND_GARBAGE, true, fs); + corruptHLog(c3, Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs); + + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + FileStatus[] archivedLogs = fs.listStatus(corruptDir); + + assertEquals("expected a different file", c1.getName(), archivedLogs[0].getPath().getName()); + assertEquals("expected a different file", c2.getName(), archivedLogs[1].getPath().getName()); + assertEquals("expected a different file", c3.getName(), archivedLogs[2].getPath().getName()); + assertEquals(archivedLogs.length, 3); + + } + + @Test + public void testLogsGetArchivedAfterSplit() throws IOException { + conf.setBoolean(HBASE_SKIP_ERRORS, false); + + generateHLogs(-1); + + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + FileStatus[] archivedLogs = fs.listStatus(oldLogDir); + + assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length); + } + + + + @Test(expected = IOException.class) + public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException { + conf.setBoolean(HBASE_SKIP_ERRORS, false); + generateHLogs(Integer.MAX_VALUE); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.APPEND_GARBAGE, true, fs); + + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } + + @Test + public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException { + conf.setBoolean(HBASE_SKIP_ERRORS, false); + generateHLogs(-1); + corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), + Corruptions.APPEND_GARBAGE, true, fs); + fs.initialize(fs.getUri(), conf); + try { + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } catch (IOException e) {/* expected */} + + assertEquals("if skip.errors is false all files should remain in place", + NUM_WRITERS, fs.listStatus(hlogDir).length); + } + + + @Test + public void testSplit() throws IOException { + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + + + for (String region : regions) { + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf)); + + } + } + + @Test + public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() + throws IOException { + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + FileStatus [] statuses = null; + try { + statuses = fs.listStatus(hlogDir); + assertNull(statuses); + } catch (FileNotFoundException e) { + // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null + } + } + + @Test + public void testLogCannotBeWrittenOnceParsed() throws IOException { + AtomicLong counter = new AtomicLong(0); + AtomicBoolean stop = new AtomicBoolean(false); + generateHLogs(9); + fs.initialize(fs.getUri(), conf); + + Thread zombie = new ZombieLastLogWriterRegionServer(writer[9], counter, stop); + + + + try { + zombie.start(); + + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, "juliet"); + + // It's possible that the writer got an error while appending and didn't count it + // however the entry will in fact be written to file and split with the rest + long numberOfEditsInRegion = countHLog(logfile, fs, conf); + assertTrue("The log file could have at most 1 extra log entry, but " + + "can't have less. Zombie could write "+counter.get() +" and logfile had only"+ numberOfEditsInRegion+" " + logfile, counter.get() == numberOfEditsInRegion || + counter.get() + 1 == numberOfEditsInRegion); + } finally { + stop.set(true); + } + } + + @Test + public void testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted() + throws IOException { + AtomicBoolean stop = new AtomicBoolean(false); + generateHLogs(-1); + fs.initialize(fs.getUri(), conf); + Thread zombie = new ZombieNewLogWriterRegionServer(stop); + + try { + zombie.start(); + try { + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + } catch (IOException ex) {/* expected */} + int logFilesNumber = fs.listStatus(hlogDir).length; + + assertEquals("Log files should not be archived if there's an extra file after split", + NUM_WRITERS + 1, logFilesNumber); + } finally { + stop.set(true); + } + + } + + + + @Test(expected = IOException.class) + public void testSplitWillFailIfWritingToRegionFails() throws Exception { + //leave 5th log open so we could append the "trap" + generateHLogs(4); + + fs.initialize(fs.getUri(), conf); + + InstrumentedSequenceFileLogWriter.activateFailure = false; + appendEntry(writer[4], TABLE_NAME, Bytes.toBytes("break"), ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0); + writer[4].close(); + + + try { + InstrumentedSequenceFileLogWriter.activateFailure = true; + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + } catch (IOException e) { + assertEquals("java.io.IOException: This exception is instrumented and should only be thrown for testing", e.getMessage()); + throw e; + } finally { + InstrumentedSequenceFileLogWriter.activateFailure = false; + } + } + + +// @Test + public void testSplittingLargeNumberOfRegionsConsistency() throws IOException { + + regions.removeAll(regions); + for (int i=0; i<500; i++) { + regions.add("region__"+i); + } + + generateHLogs(1, 100, -1); + fs.initialize(fs.getUri(), conf); + + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + fs.rename(oldLogDir, hlogDir); + Path firstSplitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME) + ".first"); + Path splitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME)); + fs.rename(splitPath, + firstSplitPath); + + + fs.initialize(fs.getUri(), conf); + HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + FileStatus[] f1 = fs.listStatus(firstSplitPath); + FileStatus[] f2 = fs.listStatus(splitPath); + + for (int i=0; i 0) { + // Done. + break; + } + } catch (FileNotFoundException e) { + // Expected in hadoop 0.21 + } catch (IOException e1) { + assertTrue("Failed to list status ", false); + } + flushToConsole("Juliet: split not started, sleeping a bit..."); + Threads.sleep(100); + } + + Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet"); + try { + HLog.Writer writer = HLog.createWriter(fs, + julietLog, conf); + appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(), + ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0); + writer.close(); + flushToConsole("Juliet file creator: created file " + julietLog); + } catch (IOException e1) { + assertTrue("Failed to create file " + julietLog, false); + } + } + } + + private void flushToConsole(String s) { + System.out.println(s); + System.out.flush(); + } + + + private void generateHLogs(int leaveOpen) throws IOException { + generateHLogs(NUM_WRITERS, ENTRIES, leaveOpen); + } + + private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException { + for (int i = 0; i < writers; i++) { + writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf); + for (int j = 0; j < entries; j++) { + int prefix = 0; + for (String region : regions) { + String row_key = region + prefix++ + i + j; + appendEntry(writer[i], TABLE_NAME, region.getBytes(), + row_key.getBytes(), FAMILY, QUALIFIER, VALUE, seq); + } + } + if (i != leaveOpen) { + writer[i].close(); + flushToConsole("Closing writer " + i); + } + } + } + + private Path getLogForRegion(Path rootdir, byte[] table, String region) { + return new Path(HRegion.getRegionDir(HTableDescriptor + .getTableDir(rootdir, table), + HRegionInfo.encodeRegionName(region.getBytes())), + "oldlogfile.log"); + } + + private void corruptHLog(Path path, Corruptions corruption, boolean close, + FileSystem fs) throws IOException { + + FSDataOutputStream out; + int fileSize = (int) fs.listStatus(path)[0].getLen(); + + FSDataInputStream in = fs.open(path); + byte[] corrupted_bytes = new byte[fileSize]; + in.readFully(0, corrupted_bytes, 0, fileSize); + in.close(); + + switch (corruption) { + case APPEND_GARBAGE: + out = fs.append(path); + out.write("-----".getBytes()); + closeOrFlush(close, out); + break; + + case INSERT_GARBAGE_ON_FIRST_LINE: + fs.delete(path, false); + out = fs.create(path); + out.write(0); + out.write(corrupted_bytes); + closeOrFlush(close, out); + break; + + case INSERT_GARBAGE_IN_THE_MIDDLE: + fs.delete(path, false); + out = fs.create(path); + int middle = (int) Math.floor(corrupted_bytes.length / 2); + out.write(corrupted_bytes, 0, middle); + out.write(0); + out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); + closeOrFlush(close, out); + break; + } + + + } + + private void closeOrFlush(boolean close, FSDataOutputStream out) + throws IOException { + if (close) { + out.close(); + } else { + out.sync(); + // Not in 0out.hflush(); + } + } + + @SuppressWarnings("unused") + private void dumpHLog(Path log, FileSystem fs, Configuration conf) throws IOException { + HLog.Entry entry; + HLog.Reader in = HLog.getReader(fs, log, conf); + while ((entry = in.next()) != null) { + System.out.println(entry); + } + } + + private int countHLog(Path log, FileSystem fs, Configuration conf) throws IOException { + int count = 0; + HLog.Reader in = HLog.getReader(fs, log, conf); + while (in.next() != null) { + count++; + } + return count; + } + + + public long appendEntry(HLog.Writer writer, byte[] table, byte[] region, + byte[] row, byte[] family, byte[] qualifier, + byte[] value, long seq) + throws IOException { + + long time = System.nanoTime(); + WALEdit edit = new WALEdit(); + seq++; + edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value)); + writer.append(new HLog.Entry(new HLogKey(region, table, seq, time), edit)); + writer.sync(); + return seq; + + } + + + private void injectEmptyFile(String suffix, boolean closeFile) + throws IOException { + HLog.Writer writer = HLog.createWriter( + fs, new Path(hlogDir, HLOG_FILE_PREFIX + suffix), conf); + if (closeFile) writer.close(); + } + + @SuppressWarnings("unused") + private void listLogs(FileSystem fs, Path dir) throws IOException { + for (FileStatus file : fs.listStatus(dir)) { + System.out.println(file.getPath()); + } + + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index c2c15d5..9eae4b4 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -31,13 +31,13 @@ import org.apache.hadoop.hbase.HBaseClusterTestCase; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -220,7 +220,7 @@ public class TestLogRolling extends HBaseClusterTestCase { assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas()); // don't run this test without append support (HDFS-200 & HDFS-142) - assertTrue("Need append support for this test", HLog.isAppend(conf)); + assertTrue("Need append support for this test", FSUtils.isAppendSupported(conf)); // add up the datanode count, to ensure proper replication when we kill 1 dfsCluster.startDataNodes(conf, 1, true, null, null);