diff --git a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java --- a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -180,7 +181,8 @@ public class MasterFileSystem { this.splitLogLock.lock(); Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName)); try { - HLog.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf); + HLogSplitter splitter = HLogSplitter.createLogSplitter(conf); + splitter.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf); } catch (IOException e) { LOG.error("Failed splitting " + logDir.toString(), e); } finally { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -51,12 +51,12 @@ import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; -import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -2435,7 +2435,7 @@ public class HRegion implements HeapSize { // , Writable{ * @return Returns this * @throws IOException */ - HRegion openHRegion(final Progressable reporter) + protected HRegion openHRegion(final Progressable reporter) throws IOException { long seqid = initialize(reporter); if (this.log != null) { @@ -3050,6 +3050,13 @@ public class HRegion implements HeapSize { // , Writable{ } /** + * Give the region a chance to prepare before it is split. + */ + protected void prepareToSplit() { + // nothing + } + + /** * Checks every store to see if one has too many * store files * @return true if any store has too many store files diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java --- a/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -164,4 +164,9 @@ class LogRoller extends Thread implements WALObserver { WALEdit logEdit) { // Not interested. } + + @Override + public void logCloseRequested() { + // not interested + } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java --- a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -134,6 +134,7 @@ class SplitTransaction { public boolean prepare() { if (this.parent.isClosed() || this.parent.isClosing()) return false; HRegionInfo hri = this.parent.getRegionInfo(); + parent.prepareToSplit(); // Check splitrow. byte [] startKey = hri.getStartKey(); byte [] endKey = hri.getEndKey(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -19,11 +19,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease; - import java.io.DataInput; import java.io.DataOutput; -import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; @@ -33,23 +30,14 @@ import java.lang.reflect.Method; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -70,10 +58,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.RemoteExceptionHandler; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; @@ -81,8 +66,6 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - /** * HLog stores all the edits to the HStore. Its the hbase write-ahead-log * implementation. @@ -289,7 +272,7 @@ public class HLog implements Syncable { public HLog(final FileSystem fs, final Path dir, final Path oldLogDir, final Configuration conf) throws IOException { - this(fs, dir, oldLogDir, conf, null, null); + this(fs, dir, oldLogDir, conf, null, true, null); } /** @@ -312,9 +295,35 @@ public class HLog implements Syncable { * @throws IOException */ public HLog(final FileSystem fs, final Path dir, final Path oldLogDir, - final Configuration conf, final List listeners, - final String prefix) - throws IOException { + final Configuration conf, final List listeners, + final String prefix) throws IOException { + this(fs, dir, oldLogDir, conf, listeners, true, prefix); + } + + /** + * Create an edit log at the given dir location. + * + * You should never have to load an existing log. If there is a log at + * startup, it should have already been processed and deleted by the time the + * HLog object is started up. + * + * @param fs filesystem handle + * @param dir path to where hlogs are stored + * @param oldLogDir path to where hlogs are archived + * @param conf configuration to use + * @param listeners Listeners on WAL events. Listeners passed here will + * be registered before we do anything else; e.g. the + * Constructor {@link #rollWriter(). + * @param failIfLogDirExists If true IOException will be thrown if dir already exists. + * @param prefix should always be hostname and port in distributed env and + * it will be URL encoded before being used. + * If prefix is null, "hlog" will be used + * @throws IOException + */ + public HLog(final FileSystem fs, final Path dir, final Path oldLogDir, + final Configuration conf, final List listeners, + final boolean failIfLogDirExists, final String prefix) + throws IOException { super(); this.fs = fs; this.dir = dir; @@ -333,7 +342,7 @@ public class HLog implements Syncable { this.logrollsize = (long)(this.blocksize * multi); this.optionalFlushInterval = conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000); - if (fs.exists(dir)) { + if (failIfLogDirExists && fs.exists(dir)) { throw new IOException("Target HLog directory already exists: " + dir); } fs.mkdirs(dir); @@ -464,7 +473,8 @@ public class HLog implements Syncable { long currentFilenum = this.filenum; this.filenum = System.currentTimeMillis(); Path newPath = computeFilename(); - HLog.Writer nextWriter = createWriter(fs, newPath, HBaseConfiguration.create(conf)); + HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, + HBaseConfiguration.create(conf)); int nextInitialReplication = fs.getFileStatus(newPath).getReplication(); // Can we get at the dfsclient outputstream? If an instance of // SFLW, it'll have done the necessary reflection to get at the @@ -517,6 +527,21 @@ public class HLog implements Syncable { } /** + * This method allows subclasses to inject different writers without having to + * extend other methods like rollWriter(). + * + * @param fs + * @param path + * @param conf + * @return + * @throws IOException + */ + protected Writer createWriterInstance(final FileSystem fs, final Path path, + final Configuration conf) throws IOException { + return createWriter(fs, path, conf); + } + + /** * Get a reader for the WAL. * @param fs * @param path @@ -529,8 +554,8 @@ public class HLog implements Syncable { throws IOException { try { if (logReaderClass == null) { - logReaderClass =conf.getClass("hbase.regionserver.hlog.reader.impl", - SequenceFileLogReader.class, Reader.class); + logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", + SequenceFileLogReader.class, Reader.class); } HLog.Reader reader = logReaderClass.newInstance(); @@ -557,7 +582,7 @@ public class HLog implements Syncable { try { if (logWriterClass == null) { logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl", - SequenceFileLogWriter.class, Writer.class); + SequenceFileLogWriter.class, Writer.class); } HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance(); writer.init(fs, path, conf); @@ -757,6 +782,12 @@ public class HLog implements Syncable { cacheFlushLock.lock(); try { + // Tell our listeners that the log is closing + if (!this.listeners.isEmpty()) { + for (WALObserver i : this.listeners) { + i.logCloseRequested(); + } + } synchronized (updateLock) { this.closed = true; if (LOG.isDebugEnabled()) { @@ -1200,71 +1231,6 @@ public class HLog implements Syncable { return Bytes.equals(METAFAMILY, family); } - /** - * Split up a bunch of regionserver commit log files that are no longer - * being written to, into new files, one per region for region to replay on - * startup. Delete the old log files when finished. - * - * @param rootDir qualified root directory of the HBase instance - * @param srcDir Directory of log files to split: e.g. - * ${ROOTDIR}/log_HOST_PORT - * @param oldLogDir directory where processed (split) logs will be archived to - * @param fs FileSystem - * @param conf Configuration - * @throws IOException will throw if corrupted hlogs aren't tolerated - * @return the list of splits - */ - public static List splitLog(final Path rootDir, final Path srcDir, - Path oldLogDir, final FileSystem fs, final Configuration conf) - throws IOException { - - long millis = System.currentTimeMillis(); - List splits = null; - if (!fs.exists(srcDir)) { - // Nothing to do - return splits; - } - FileStatus [] logfiles = fs.listStatus(srcDir); - if (logfiles == null || logfiles.length == 0) { - // Nothing to do - return splits; - } - LOG.info("Splitting " + logfiles.length + " hlog(s) in " + - srcDir.toString()); - splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf); - try { - FileStatus[] files = fs.listStatus(srcDir); - for(FileStatus file : files) { - Path newPath = getHLogArchivePath(oldLogDir, file.getPath()); - LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to " + - FSUtils.getPath(newPath)); - fs.rename(file.getPath(), newPath); - } - LOG.debug("Moved " + files.length + " log files to " + - FSUtils.getPath(oldLogDir)); - fs.delete(srcDir, true); - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - IOException io = new IOException("Cannot delete: " + srcDir); - io.initCause(e); - throw io; - } - long endMillis = System.currentTimeMillis(); - LOG.info("hlog file splitting completed in " + (endMillis - millis) + - " millis for " + srcDir.toString()); - return splits; - } - - // Private immutable datastructure to hold Writer and its Path. - private final static class WriterAndPath { - final Path p; - final Writer w; - WriterAndPath(final Path p, final Writer w) { - this.p = p; - this.w = w; - } - } - @SuppressWarnings("unchecked") public static Class getKeyClass(Configuration conf) { return (Class) @@ -1283,103 +1249,6 @@ public class HLog implements Syncable { } /** - * Sorts the HLog edits in the given list of logfiles (that are a mix of edits on multiple regions) - * by region and then splits them per region directories, in batches of (hbase.hlog.split.batch.size) - * - * A batch consists of a set of log files that will be sorted in a single map of edits indexed by region - * the resulting map will be concurrently written by multiple threads to their corresponding regions - * - * Each batch consists of more more log files that are - * - recovered (files is opened for append then closed to ensure no process is writing into it) - * - parsed (each edit in the log is appended to a list of edits indexed by region - * see {@link #parseHLog} for more details) - * - marked as either processed or corrupt depending on parsing outcome - * - the resulting edits indexed by region are concurrently written to their corresponding region - * region directories - * - original files are then archived to a different directory - * - * - * - * @param rootDir hbase directory - * @param srcDir logs directory - * @param oldLogDir directory where processed logs are archived to - * @param logfiles the list of log files to split - * @param fs - * @param conf - * @return - * @throws IOException - */ - private static List splitLog(final Path rootDir, final Path srcDir, - Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs, - final Configuration conf) - throws IOException { - List processedLogs = new ArrayList(); - List corruptedLogs = new ArrayList(); - final Map logWriters = - Collections.synchronizedMap( - new TreeMap(Bytes.BYTES_COMPARATOR)); - List splits = null; - - // Number of logs in a read batch - // More means faster but bigger mem consumption - //TODO make a note on the conf rename and update hbase-site.xml if needed - int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3); - boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false); - - - try { - int i = -1; - while (i < logfiles.length) { - final Map> editsByRegion = - new TreeMap>(Bytes.BYTES_COMPARATOR); - for (int j = 0; j < logFilesPerStep; j++) { - i++; - if (i == logfiles.length) { - break; - } - FileStatus log = logfiles[i]; - Path logPath = log.getPath(); - long logLength = log.getLen(); - LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length + - ": " + logPath + ", length=" + logLength ); - try { - recoverFileLease(fs, logPath, conf); - parseHLog(log, editsByRegion, fs, conf); - processedLogs.add(logPath); - } catch (EOFException eof) { - // truncated files are expected if a RS crashes (see HBASE-2643) - LOG.info("EOF from hlog " + logPath + ". continuing"); - processedLogs.add(logPath); - } catch (IOException e) { - if (skipErrors) { - LOG.warn("Got while parsing hlog " + logPath + - ". Marking as corrupted", e); - corruptedLogs.add(logPath); - } else { - throw e; - } - } - } - writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf); - } - if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) { - throw new IOException("Discovered orphan hlog after split. Maybe " + - "HRegionServer was not dead when we started"); - } - archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf); - } finally { - splits = new ArrayList(logWriters.size()); - for (WriterAndPath wap : logWriters.values()) { - wap.w.close(); - splits.add(wap.p); - LOG.debug("Closed " + wap.p); - } - } - return splits; - } - - - /** * Utility class that lets us keep track of the edit with it's key * Only used when splitting logs */ @@ -1474,257 +1343,27 @@ public class HLog implements Syncable { return dirName.toString(); } - public static boolean validateHLogFilename(String filename) { - return pattern.matcher(filename).matches(); - } - - private static Path getHLogArchivePath(Path oldLogDir, Path p) { - return new Path(oldLogDir, p.getName()); - } - /** - * Takes splitLogsMap and concurrently writes them to region directories using a thread pool - * - * @param splitLogsMap map that contains the log splitting result indexed by region - * @param logWriters map that contains a writer per region - * @param rootDir hbase root dir - * @param fs - * @param conf - * @throws IOException - */ - private static void writeEditsBatchToRegions( - final Map> splitLogsMap, - final Map logWriters, - final Path rootDir, final FileSystem fs, final Configuration conf) - throws IOException { - // Number of threads to use when log splitting to rewrite the logs. - // More means faster but bigger mem consumption. - int logWriterThreads = - conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); - boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); - HashMap writeFutureResult = new HashMap(); - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - builder.setNameFormat("SplitWriter-%1$d"); - ThreadFactory factory = builder.build(); - ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, factory); - for (final byte [] region : splitLogsMap.keySet()) { - Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf); - writeFutureResult.put(region, threadPool.submit(splitter)); - } - - threadPool.shutdown(); - // Wait for all threads to terminate - try { - for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) { - String message = "Waiting for hlog writers to terminate, elapsed " + j * 5 + " seconds"; - if (j < 30) { - LOG.debug(message); - } else { - LOG.info(message); - } - - } - } catch(InterruptedException ex) { - LOG.warn("Hlog writers were interrupted, possible data loss!"); - if (!skipErrors) { - throw new IOException("Could not finish writing log entries", ex); - //TODO maybe we should fail here regardless if skipErrors is active or not - } - } - - for (Map.Entry entry : writeFutureResult.entrySet()) { - try { - entry.getValue().get(); - } catch (ExecutionException e) { - throw (new IOException(e.getCause())); - } catch (InterruptedException e1) { - LOG.warn("Writer for region " + Bytes.toString(entry.getKey()) + - " was interrupted, however the write process should have " + - "finished. Throwing up ", e1); - throw (new IOException(e1.getCause())); - } - } - } - - /* - * Parse a single hlog and put the edits in @splitLogsMap - * - * @param logfile to split - * @param splitLogsMap output parameter: a map with region names as keys and a - * list of edits as values - * @param fs the filesystem - * @param conf the configuration - * @throws IOException if hlog is corrupted, or can't be open + * Get the directory we are making logs in. + * + * @return dir */ - private static void parseHLog(final FileStatus logfile, - final Map> splitLogsMap, final FileSystem fs, - final Configuration conf) - throws IOException { - // Check for possibly empty file. With appends, currently Hadoop reports a - // zero length even if the file has been sync'd. Revisit if HDFS-376 or - // HDFS-878 is committed. - long length = logfile.getLen(); - if (length <= 0) { - LOG.warn("File " + logfile.getPath() + " might be still open, length is 0"); - } - Path path = logfile.getPath(); - Reader in; - int editsCount = 0; - try { - in = HLog.getReader(fs, path, conf); - } catch (EOFException e) { - if (length <= 0) { - //TODO should we ignore an empty, not-last log file if skip.errors is false? - //Either way, the caller should decide what to do. E.g. ignore if this is the last - //log in sequence. - //TODO is this scenario still possible if the log has been recovered (i.e. closed) - LOG.warn("Could not open " + path + " for reading. File is empty" + e); - return; - } else { - throw e; - } - } - try { - Entry entry; - while ((entry = in.next()) != null) { - byte[] region = entry.getKey().getEncodedRegionName(); - LinkedList queue = splitLogsMap.get(region); - if (queue == null) { - queue = new LinkedList(); - splitLogsMap.put(region, queue); - } - queue.addLast(entry); - editsCount++; - } - } finally { - LOG.debug("Pushed=" + editsCount + " entries from " + path); - try { - if (in != null) { - in.close(); - } - } catch (IOException e) { - LOG.warn("Close log reader in finally threw exception -- continuing", e); - } - } + protected Path getDir() { + return dir; } - - private static Callable createNewSplitter(final Path rootDir, - final Map logWriters, - final Map> logEntries, - final byte[] region, final FileSystem fs, final Configuration conf) { - return new Callable() { - public String getName() { - return "Split writer thread for region " + Bytes.toStringBinary(region); - } - - @Override - public Void call() throws IOException { - LinkedList entries = logEntries.get(region); - LOG.debug(this.getName()+" got " + entries.size() + " to process"); - long threadTime = System.currentTimeMillis(); - try { - int editsCount = 0; - WriterAndPath wap = logWriters.get(region); - for (Entry logEntry: entries) { - if (wap == null) { - Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir); - if (fs.exists(regionedits)) { - LOG.warn("Found existing old edits file. It could be the " + - "result of a previous failed split attempt. Deleting " + - regionedits + ", length=" + fs.getFileStatus(regionedits).getLen()); - if (!fs.delete(regionedits, false)) { - LOG.warn("Failed delete of old " + regionedits); - } - } - Writer w = createWriter(fs, regionedits, conf); - wap = new WriterAndPath(regionedits, w); - logWriters.put(region, wap); - LOG.debug("Creating writer path=" + regionedits + - " region=" + Bytes.toStringBinary(region)); - } - wap.w.append(logEntry); - editsCount++; - } - LOG.debug(this.getName() + " Applied " + editsCount + - " total edits to " + Bytes.toStringBinary(region) + - " in " + (System.currentTimeMillis() - threadTime) + "ms"); - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.fatal(this.getName() + " Got while writing log entry to log", e); - throw e; - } - return null; - } - }; + + public static boolean validateHLogFilename(String filename) { + return pattern.matcher(filename).matches(); } - /** - * Moves processed logs to a oldLogDir after successful processing - * Moves corrupted logs (any log that couldn't be successfully parsed - * to corruptDir (.corrupt) for later investigation - * - * @param corruptedLogs - * @param processedLogs - * @param oldLogDir - * @param fs - * @param conf - * @throws IOException - */ - private static void archiveLogs(final List corruptedLogs, - final List processedLogs, final Path oldLogDir, - final FileSystem fs, final Configuration conf) - throws IOException{ - final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), - conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt")); - - fs.mkdirs(corruptDir); - fs.mkdirs(oldLogDir); - - for (Path corrupted: corruptedLogs) { - Path p = new Path(corruptDir, corrupted.getName()); - LOG.info("Moving corrupted log " + corrupted + " to " + p); - fs.rename(corrupted, p); - } - - for (Path p: processedLogs) { - Path newPath = getHLogArchivePath(oldLogDir, p); - fs.rename(p, newPath); - LOG.info("Archived processed log " + p + " to " + newPath); - } + static Path getHLogArchivePath(Path oldLogDir, Path p) { + return new Path(oldLogDir, p.getName()); } - /* - * Path to a file under RECOVERED_EDITS_DIR directory of the region found in - * logEntry named for the sequenceid in the passed - * logEntry: e.g. /hbase/some_table/2323432434/recovered.edits/2332. - * This method also ensures existence of RECOVERED_EDITS_DIR under the region - * creating it if necessary. - * @param fs - * @param logEntry - * @param rootDir HBase root dir. - * @return Path to file into which to dump split log edits. - * @throws IOException - */ - private static Path getRegionSplitEditsPath(final FileSystem fs, - final Entry logEntry, final Path rootDir) - throws IOException { - Path tableDir = HTableDescriptor.getTableDir(rootDir, - logEntry.getKey().getTablename()); - Path regiondir = HRegion.getRegionDir(tableDir, - Bytes.toString(logEntry.getKey().getEncodedRegionName())); - Path dir = getRegionDirRecoveredEditsDir(regiondir); - if (!fs.exists(dir)) { - if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir); - } - return new Path(dir, - formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum())); - } - static String formatRecoveredEditsFileName(final long seqid) { return String.format("%019d", seqid); } - /** * Returns sorted set of edit files made by wal-log splitter. * @param fs @@ -1736,7 +1375,7 @@ public class HLog implements Syncable { final Path regiondir) throws IOException { Path editsdir = getRegionDirRecoveredEditsDir(regiondir); - FileStatus [] files = fs.listStatus(editsdir, new PathFilter () { + FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { @Override public boolean accept(Path p) { boolean result = false; @@ -1835,7 +1474,9 @@ public class HLog implements Syncable { if (!fs.getFileStatus(p).isDir()) { throw new IOException(p + " is not a directory"); } - splitLog(baseDir, p, oldLogDir, fs, conf); + + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(baseDir, p, oldLogDir, fs, conf); } /** @@ -1874,4 +1515,4 @@ public class HLog implements Syncable { } } } -} \ No newline at end of file +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -0,0 +1,548 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; + +import com.google.common.util.concurrent.NamingThreadFactory; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * This class is responsible for splitting up a bunch of regionserver commit log + * files that are no longer being written to, into new files, one per region for + * region to replay on startup. Delete the old log files when finished. + */ +public class HLogSplitter { + + private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl"; + + static final Log LOG = LogFactory.getLog(HLogSplitter.class); + + /** + * Name of file that holds recovered edits written by the wal log splitting + * code, one per region + */ + public static final String RECOVERED_EDITS = "recovered.edits"; + + /** + * Create a new HLogSplitter using the given {@link Configuration} and the + * hbase.hlog.splitter.impl property to derived the instance + * class to use. + * + * @param conf + * @return New HLogSplitter instance + */ + public static HLogSplitter createLogSplitter(Configuration conf) { + @SuppressWarnings("unchecked") + Class splitterClass = (Class) conf + .getClass(LOG_SPLITTER_IMPL, HLogSplitter.class); + try { + return splitterClass.newInstance(); + } catch (InstantiationException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + + + // Private immutable datastructure to hold Writer and its Path. + private final static class WriterAndPath { + final Path p; + final Writer w; + + WriterAndPath(final Path p, final Writer w) { + this.p = p; + this.w = w; + } + } + + /** + * Split up a bunch of regionserver commit log files that are no longer being + * written to, into new files, one per region for region to replay on startup. + * Delete the old log files when finished. + * + * @param rootDir + * qualified root directory of the HBase instance + * @param srcDir + * Directory of log files to split: e.g. + * ${ROOTDIR}/log_HOST_PORT + * @param oldLogDir + * directory where processed (split) logs will be archived to + * @param fs + * FileSystem + * @param conf + * Configuration + * @throws IOException + * will throw if corrupted hlogs aren't tolerated + * @return the list of splits + */ + public List splitLog(final Path rootDir, final Path srcDir, + Path oldLogDir, final FileSystem fs, final Configuration conf) + throws IOException { + + long millis = System.currentTimeMillis(); + List splits = null; + if (!fs.exists(srcDir)) { + // Nothing to do + return splits; + } + FileStatus[] logfiles = fs.listStatus(srcDir); + if (logfiles == null || logfiles.length == 0) { + // Nothing to do + return splits; + } + LOG.info("Splitting " + logfiles.length + " hlog(s) in " + + srcDir.toString()); + splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf); + try { + FileStatus[] files = fs.listStatus(srcDir); + for (FileStatus file : files) { + Path newPath = HLog.getHLogArchivePath(oldLogDir, file.getPath()); + LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to " + + FSUtils.getPath(newPath)); + fs.rename(file.getPath(), newPath); + } + LOG.debug("Moved " + files.length + " log files to " + + FSUtils.getPath(oldLogDir)); + fs.delete(srcDir, true); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + IOException io = new IOException("Cannot delete: " + srcDir); + io.initCause(e); + throw io; + } + long endMillis = System.currentTimeMillis(); + LOG.info("hlog file splitting completed in " + (endMillis - millis) + + " millis for " + srcDir.toString()); + return splits; + } + + /** + * Sorts the HLog edits in the given list of logfiles (that are a mix of edits + * on multiple regions) by region and then splits them per region directories, + * in batches of (hbase.hlog.split.batch.size) + * + * A batch consists of a set of log files that will be sorted in a single map + * of edits indexed by region the resulting map will be concurrently written + * by multiple threads to their corresponding regions + * + * Each batch consists of more more log files that are - recovered (files is + * opened for append then closed to ensure no process is writing into it) - + * parsed (each edit in the log is appended to a list of edits indexed by + * region see {@link #parseHLog} for more details) - marked as either + * processed or corrupt depending on parsing outcome - the resulting edits + * indexed by region are concurrently written to their corresponding region + * region directories - original files are then archived to a different + * directory + * + * + * + * @param rootDir + * hbase directory + * @param srcDir + * logs directory + * @param oldLogDir + * directory where processed logs are archived to + * @param logfiles + * the list of log files to split + * @param fs + * @param conf + * @return + * @throws IOException + */ + private List splitLog(final Path rootDir, final Path srcDir, + Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs, + final Configuration conf) throws IOException { + List processedLogs = new ArrayList(); + List corruptedLogs = new ArrayList(); + final Map logWriters = Collections + .synchronizedMap(new TreeMap( + Bytes.BYTES_COMPARATOR)); + List splits = null; + + // Number of logs in a read batch + // More means faster but bigger mem consumption + // TODO make a note on the conf rename and update hbase-site.xml if needed + int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3); + boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false); + + try { + int i = -1; + while (i < logfiles.length) { + final Map> editsByRegion = new TreeMap>( + Bytes.BYTES_COMPARATOR); + for (int j = 0; j < logFilesPerStep; j++) { + i++; + if (i == logfiles.length) { + break; + } + FileStatus log = logfiles[i]; + Path logPath = log.getPath(); + long logLength = log.getLen(); + LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length + + ": " + logPath + ", length=" + logLength); + try { + recoverFileLease(fs, logPath, conf); + parseHLog(log, editsByRegion, fs, conf); + processedLogs.add(logPath); + } catch (EOFException eof) { + // truncated files are expected if a RS crashes (see HBASE-2643) + LOG.info("EOF from hlog " + logPath + ". continuing"); + processedLogs.add(logPath); + } catch (IOException e) { + if (skipErrors) { + LOG.warn("Got while parsing hlog " + logPath + + ". Marking as corrupted", e); + corruptedLogs.add(logPath); + } else { + throw e; + } + } + } + writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf); + } + if (fs.listStatus(srcDir).length > processedLogs.size() + + corruptedLogs.size()) { + throw new IOException("Discovered orphan hlog after split. Maybe " + + "HRegionServer was not dead when we started"); + } + archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf); + } finally { + splits = new ArrayList(logWriters.size()); + for (WriterAndPath wap : logWriters.values()) { + wap.w.close(); + splits.add(wap.p); + LOG.debug("Closed " + wap.p); + } + } + return splits; + } + + + /** + * Takes splitLogsMap and concurrently writes them to region directories using a thread pool + * + * @param splitLogsMap map that contains the log splitting result indexed by region + * @param logWriters map that contains a writer per region + * @param rootDir hbase root dir + * @param fs + * @param conf + * @throws IOException + */ + private void writeEditsBatchToRegions( + final Map> splitLogsMap, + final Map logWriters, final Path rootDir, + final FileSystem fs, final Configuration conf) + throws IOException { + // Number of threads to use when log splitting to rewrite the logs. + // More means faster but bigger mem consumption. + int logWriterThreads = conf.getInt( + "hbase.regionserver.hlog.splitlog.writer.threads", 3); + boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); + HashMap writeFutureResult = new HashMap(); + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + builder.setNameFormat("SplitWriter-%1$d"); + ThreadFactory factory = builder.build(); + ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, factory); + for (final byte[] region : splitLogsMap.keySet()) { + Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, + region, fs, conf); + writeFutureResult.put(region, threadPool.submit(splitter)); + } + + threadPool.shutdown(); + // Wait for all threads to terminate + try { + for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) { + String message = "Waiting for hlog writers to terminate, elapsed " + j + * 5 + " seconds"; + if (j < 30) { + LOG.debug(message); + } else { + LOG.info(message); + } + + } + } catch (InterruptedException ex) { + LOG.warn("Hlog writers were interrupted, possible data loss!"); + if (!skipErrors) { + throw new IOException("Could not finish writing log entries", ex); + // TODO maybe we should fail here regardless if skipErrors is active or not + } + } + + for (Map.Entry entry : writeFutureResult.entrySet()) { + try { + entry.getValue().get(); + } catch (ExecutionException e) { + throw (new IOException(e.getCause())); + } catch (InterruptedException e1) { + LOG.warn("Writer for region " + Bytes.toString(entry.getKey()) + + " was interrupted, however the write process should have " + + "finished. Throwing up ", e1); + throw (new IOException(e1.getCause())); + } + } + } + + /** + * Moves processed logs to a oldLogDir after successful processing Moves + * corrupted logs (any log that couldn't be successfully parsed to corruptDir + * (.corrupt) for later investigation + * + * @param corruptedLogs + * @param processedLogs + * @param oldLogDir + * @param fs + * @param conf + * @throws IOException + */ + private static void archiveLogs(final List corruptedLogs, + final List processedLogs, final Path oldLogDir, + final FileSystem fs, final Configuration conf) throws IOException { + final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get( + "hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt")); + + fs.mkdirs(corruptDir); + fs.mkdirs(oldLogDir); + + for (Path corrupted : corruptedLogs) { + Path p = new Path(corruptDir, corrupted.getName()); + LOG.info("Moving corrupted log " + corrupted + " to " + p); + fs.rename(corrupted, p); + } + + for (Path p : processedLogs) { + Path newPath = HLog.getHLogArchivePath(oldLogDir, p); + fs.rename(p, newPath); + LOG.info("Archived processed log " + p + " to " + newPath); + } + } + + /** + * Path to a file under RECOVERED_EDITS_DIR directory of the region found in + * logEntry named for the sequenceid in the passed + * logEntry: e.g. /hbase/some_table/2323432434/recovered.edits/2332. + * This method also ensures existence of RECOVERED_EDITS_DIR under the region + * creating it if necessary. + * @param fs + * @param logEntry + * @param rootDir HBase root dir. + * @return Path to file into which to dump split log edits. + * @throws IOException + */ + static Path getRegionSplitEditsPath(final FileSystem fs, + final Entry logEntry, final Path rootDir) throws IOException { + Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey() + .getTablename()); + Path regiondir = HRegion.getRegionDir(tableDir, + Bytes.toString(logEntry.getKey().getEncodedRegionName())); + Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir); + if (!fs.exists(dir)) { + if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir); + } + return new Path(dir, formatRecoveredEditsFileName(logEntry.getKey() + .getLogSeqNum())); + } + + static String formatRecoveredEditsFileName(final long seqid) { + return String.format("%019d", seqid); + } + + /* + * Parse a single hlog and put the edits in @splitLogsMap + * + * @param logfile to split + * @param splitLogsMap output parameter: a map with region names as keys and a + * list of edits as values + * @param fs the filesystem + * @param conf the configuration + * @throws IOException if hlog is corrupted, or can't be open + */ + private void parseHLog(final FileStatus logfile, + final Map> splitLogsMap, final FileSystem fs, + final Configuration conf) + throws IOException { + // Check for possibly empty file. With appends, currently Hadoop reports a + // zero length even if the file has been sync'd. Revisit if HDFS-376 or + // HDFS-878 is committed. + long length = logfile.getLen(); + if (length <= 0) { + LOG.warn("File " + logfile.getPath() + " might be still open, length is 0"); + } + Path path = logfile.getPath(); + Reader in; + int editsCount = 0; + try { + in = getReader(fs, path, conf); + } catch (EOFException e) { + if (length <= 0) { + //TODO should we ignore an empty, not-last log file if skip.errors is false? + //Either way, the caller should decide what to do. E.g. ignore if this is the last + //log in sequence. + //TODO is this scenario still possible if the log has been recovered (i.e. closed) + LOG.warn("Could not open " + path + " for reading. File is empty" + e); + return; + } else { + throw e; + } + } + try { + Entry entry; + while ((entry = in.next()) != null) { + byte[] region = entry.getKey().getEncodedRegionName(); + LinkedList queue = splitLogsMap.get(region); + if (queue == null) { + queue = new LinkedList(); + splitLogsMap.put(region, queue); + } + queue.addLast(entry); + editsCount++; + } + } finally { + LOG.debug("Pushed=" + editsCount + " entries from " + path); + try { + if (in != null) { + in.close(); + } + } catch (IOException e) { + LOG + .warn("Close log reader in finally threw exception -- continuing", + e); + } + } + } + + private Callable createNewSplitter(final Path rootDir, + final Map logWriters, + final Map> logEntries, final byte[] region, + final FileSystem fs, final Configuration conf) { + return new Callable() { + public String getName() { + return "Split writer thread for region " + Bytes.toStringBinary(region); + } + + @Override + public Void call() throws IOException { + LinkedList entries = logEntries.get(region); + LOG.debug(this.getName() + " got " + entries.size() + " to process"); + long threadTime = System.currentTimeMillis(); + try { + int editsCount = 0; + WriterAndPath wap = logWriters.get(region); + for (Entry logEntry : entries) { + if (wap == null) { + Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir); + if (fs.exists(regionedits)) { + LOG.warn("Found existing old edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + + regionedits + ", length=" + + fs.getFileStatus(regionedits).getLen()); + if (!fs.delete(regionedits, false)) { + LOG.warn("Failed delete of old " + regionedits); + } + } + Writer w = createWriter(fs, regionedits, conf); + wap = new WriterAndPath(regionedits, w); + logWriters.put(region, wap); + LOG.debug("Creating writer path=" + regionedits + " region=" + + Bytes.toStringBinary(region)); + } + wap.w.append(logEntry); + editsCount++; + } + LOG.debug(this.getName() + " Applied " + editsCount + + " total edits to " + Bytes.toStringBinary(region) + " in " + + (System.currentTimeMillis() - threadTime) + "ms"); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.fatal(this.getName() + " Got while writing log entry to log", e); + throw e; + } + return null; + } + }; + } + + /** + * Create a new {@link Writer} for writing log splits. + * + * @param fs + * @param logfile + * @param conf + * @return A new Writer instance + * @throws IOException + */ + protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf) + throws IOException { + return HLog.createWriter(fs, logfile, conf); + } + + /** + * Create a new {@link Reader} for reading logs to split. + * + * @param fs + * @param curLogFile + * @param conf + * @return A new Reader instance + * @throws IOException + */ + protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf) + throws IOException { + return HLog.getReader(fs, curLogFile, conf); + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java @@ -27,8 +27,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.io.SequenceFile; import org.mortbay.log.Log; @@ -99,7 +97,24 @@ public class SequenceFileLogReader implements HLog.Reader { Path path; int edit = 0; - public SequenceFileLogReader() { } + private Class keyClass; + + /** + * Default constructor. + */ + public SequenceFileLogReader() { + } + + /** + * This constructor allows a specific HLogKey implementation to override that + * which would otherwise be chosen via configuration property. + * + * @param keyClass + */ + public SequenceFileLogReader(Class keyClass) { + this.keyClass = keyClass; + } + @Override public void init(FileSystem fs, Path path, Configuration conf) @@ -127,7 +142,19 @@ public class SequenceFileLogReader implements HLog.Reader { public HLog.Entry next(HLog.Entry reuse) throws IOException { HLog.Entry e = reuse; if (e == null) { - HLogKey key = HLog.newKey(conf); + HLogKey key; + if (keyClass == null) { + key = HLog.newKey(conf); + } else { + try { + key = keyClass.newInstance(); + } catch (InstantiationException ie) { + throw new IOException(ie); + } catch (IllegalAccessException iae) { + throw new IOException(iae); + } + } + WALEdit val = new WALEdit(); e = new HLog.Entry(key, val); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -48,21 +48,41 @@ public class SequenceFileLogWriter implements HLog.Writer { // The syncFs method from hdfs-200 or null if not available. private Method syncFs; + private Class keyClass; + + /** + * Default constructor. + */ public SequenceFileLogWriter() { super(); } + /** + * This constructor allows a specific HLogKey implementation to override that + * which would otherwise be chosen via configuration property. + * + * @param keyClass + */ + public SequenceFileLogWriter(Class keyClass) { + this.keyClass = keyClass; + } + @Override public void init(FileSystem fs, Path path, Configuration conf) throws IOException { + + if (null == keyClass) { + keyClass = HLog.getKeyClass(conf); + } + // Create a SF.Writer instance. this.writer = SequenceFile.createWriter(fs, conf, path, - HLog.getKeyClass(conf), WALEdit.class, + keyClass, WALEdit.class, fs.getConf().getInt("io.file.buffer.size", 4096), (short) conf.getInt("hbase.regionserver.hlog.replication", - fs.getDefaultReplication()), + fs.getDefaultReplication()), conf.getLong("hbase.regionserver.hlog.blocksize", - fs.getDefaultBlockSize()), + fs.getDefaultBlockSize()), SequenceFile.CompressionType.NONE, new DefaultCodec(), null, diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java @@ -39,6 +39,11 @@ public interface WALObserver { public void logRollRequested(); /** + * The WAL is about to close. + */ + public void logCloseRequested(); + + /** * Called before each write. * @param info * @param logKey diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -162,4 +162,9 @@ public class Replication implements WALObserver { public void logRollRequested() { // Not interested } + + @Override + public void logCloseRequested() { + // not interested + } } \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java @@ -25,6 +25,10 @@ import org.apache.hadoop.hbase.util.Bytes; public class InstrumentedSequenceFileLogWriter extends SequenceFileLogWriter { + public InstrumentedSequenceFileLogWriter() { + super(HLogKey.class); + } + public static boolean activateFailure = false; @Override public void append(HLog.Entry entry) throws IOException { diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -160,9 +160,11 @@ public class TestHLog { log.rollWriter(); } log.close(); - Path splitsdir = new Path(dir, "splits"); + Path splitsdir = new Path(this.dir, "splits"); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); List splits = - HLog.splitLog(splitsdir, logdir, oldLogDir, fs, conf); + logSplitter.splitLog(splitsdir, logdir, + this.oldLogDir, this.fs, conf); verifySplits(splits, howmany); log = null; } finally { @@ -313,11 +315,11 @@ public class TestHLog { } } - // For this test to pass, requires: + // For this test to pass, requires: // 1. HDFS-200 (append support) - // 2. HDFS-988 (SafeMode should freeze file operations + // 2. HDFS-988 (SafeMode should freeze file operations // [FSNamesystem.nextGenerationStampForBlock]) - // 3. HDFS-142 (on restart, maintain pendingCreates) + // 3. HDFS-142 (on restart, maintain pendingCreates) @Test public void testAppendClose() throws Exception { byte [] tableName = Bytes.toBytes(getName()); @@ -344,7 +346,7 @@ public class TestHLog { cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_ENTER); cluster.shutdown(); try { - // wal.writer.close() will throw an exception, + // wal.writer.close() will throw an exception, // but still call this since it closes the LogSyncer thread first wal.close(); } catch (IOException e) { @@ -369,7 +371,7 @@ public class TestHLog { Method setLeasePeriod = cluster.getClass() .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE}); setLeasePeriod.setAccessible(true); - setLeasePeriod.invoke(cluster, + setLeasePeriod.invoke(cluster, new Object[]{new Long(1000), new Long(1000)}); try { Thread.sleep(1000); @@ -405,8 +407,8 @@ public class TestHLog { throw t.exception; // Make sure you can read all the content - SequenceFile.Reader reader - = new SequenceFile.Reader(fs, walPath, conf); + SequenceFile.Reader reader + = new SequenceFile.Reader(this.fs, walPath, this.conf); int count = 0; HLogKey key = HLog.newKey(conf); WALEdit val = new WALEdit(); @@ -606,5 +608,10 @@ public class TestHLog { // TODO Auto-generated method stub } + + @Override + public void logCloseRequested() { + // not interested + } } } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java @@ -47,14 +47,16 @@ public class TestHLogMethods { fs.delete(regiondir, true); fs.mkdirs(regiondir); Path recoverededits = HLog.getRegionDirRecoveredEditsDir(regiondir); - String first = HLog.formatRecoveredEditsFileName(-1); + String first = HLogSplitter.formatRecoveredEditsFileName(-1); createFile(fs, recoverededits, first); - createFile(fs, recoverededits, HLog.formatRecoveredEditsFileName(0)); - createFile(fs, recoverededits, HLog.formatRecoveredEditsFileName(1)); - createFile(fs, recoverededits, HLog.formatRecoveredEditsFileName(11)); - createFile(fs, recoverededits, HLog.formatRecoveredEditsFileName(2)); - createFile(fs, recoverededits, HLog.formatRecoveredEditsFileName(50)); - String last = HLog.formatRecoveredEditsFileName(Long.MAX_VALUE); + createFile(fs, recoverededits, HLogSplitter.formatRecoveredEditsFileName(0)); + createFile(fs, recoverededits, HLogSplitter.formatRecoveredEditsFileName(1)); + createFile(fs, recoverededits, HLogSplitter + .formatRecoveredEditsFileName(11)); + createFile(fs, recoverededits, HLogSplitter.formatRecoveredEditsFileName(2)); + createFile(fs, recoverededits, HLogSplitter + .formatRecoveredEditsFileName(50)); + String last = HLogSplitter.formatRecoveredEditsFileName(Long.MAX_VALUE); createFile(fs, recoverededits, last); createFile(fs, recoverededits, Long.toString(Long.MAX_VALUE) + "." + System.currentTimeMillis()); @@ -63,13 +65,17 @@ public class TestHLogMethods { assertEquals(files.pollFirst().getName(), first); assertEquals(files.pollLast().getName(), last); assertEquals(files.pollFirst().getName(), - HLog.formatRecoveredEditsFileName(0)); + HLogSplitter + .formatRecoveredEditsFileName(0)); assertEquals(files.pollFirst().getName(), - HLog.formatRecoveredEditsFileName(1)); + HLogSplitter + .formatRecoveredEditsFileName(1)); assertEquals(files.pollFirst().getName(), - HLog.formatRecoveredEditsFileName(2)); + HLogSplitter + .formatRecoveredEditsFileName(2)); assertEquals(files.pollFirst().getName(), - HLog.formatRecoveredEditsFileName(11)); + HLogSplitter + .formatRecoveredEditsFileName(11)); } private void createFile(final FileSystem fs, final Path testdir, diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; @@ -81,7 +82,6 @@ public class TestHLogSplit { private static List regions; private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors"; - static enum Corruptions { INSERT_GARBAGE_ON_FIRST_LINE, INSERT_GARBAGE_IN_THE_MIDDLE, @@ -129,6 +129,22 @@ public class TestHLogSplit { public void tearDown() throws Exception { } + /** + * @throws IOException + * @see https://issues.apache.org/jira/browse/HBASE-3020 + */ + @Test public void testRecoveredEditsPathForMeta() throws IOException { + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); + long now = System.currentTimeMillis(); + HLog.Entry entry = + new HLog.Entry(new HLogKey(encoded, HConstants.META_TABLE_NAME, 1, now), + new WALEdit()); + Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, new Path("/")); + assertEquals(p.getParent().getParent(), + HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); + } + @Test(expected = IOException.class) public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted() throws IOException { @@ -137,7 +153,8 @@ public class TestHLogSplit { fs.initialize(fs.getUri(), conf); try { (new ZombieNewLogWriterRegionServer(stop)).start(); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); } finally { stop.set(true); } @@ -152,7 +169,8 @@ public class TestHLogSplit { generateHLogs(1, 10, -1); fs.initialize(fs.getUri(), conf); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); Path originalLog = (fs.listStatus(oldLogDir))[0].getPath(); Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION); @@ -172,7 +190,8 @@ public class TestHLogSplit { // initialize will create a new DFSClient with a new client ID fs.initialize(fs.getUri(), conf); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); for (String region : regions) { @@ -193,7 +212,8 @@ public class TestHLogSplit { // initialize will create a new DFSClient with a new client ID fs.initialize(fs.getUri(), conf); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); for (String region : regions) { Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); @@ -208,7 +228,8 @@ public class TestHLogSplit { fs.initialize(fs.getUri(), conf); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); for (String region : regions) { Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); @@ -226,7 +247,9 @@ public class TestHLogSplit { corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), Corruptions.APPEND_GARBAGE, true, fs); fs.initialize(fs.getUri(), conf); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); for (String region : regions) { Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); @@ -243,7 +266,9 @@ public class TestHLogSplit { corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs); fs.initialize(fs.getUri(), conf); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); for (String region : regions) { Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); @@ -261,7 +286,8 @@ public class TestHLogSplit { corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"), Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs); fs.initialize(fs.getUri(), conf); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); for (String region : regions) { Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region); @@ -289,7 +315,8 @@ public class TestHLogSplit { corruptHLog(c3, Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs); fs.initialize(fs.getUri(), conf); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); FileStatus[] archivedLogs = fs.listStatus(corruptDir); @@ -314,7 +341,8 @@ public class TestHLogSplit { corruptHLog(c1, Corruptions.TRUNCATE, true, fs); fs.initialize(fs.getUri(), conf); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); Path originalLog = (fs.listStatus(oldLogDir))[0].getPath(); Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION); @@ -337,7 +365,8 @@ public class TestHLogSplit { generateHLogs(-1); fs.initialize(fs.getUri(), conf); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); FileStatus[] archivedLogs = fs.listStatus(oldLogDir); @@ -355,7 +384,8 @@ public class TestHLogSplit { Corruptions.APPEND_GARBAGE, true, fs); fs.initialize(fs.getUri(), conf); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); } // TODO: fix this test (HBASE-2935) @@ -367,7 +397,8 @@ public class TestHLogSplit { Corruptions.APPEND_GARBAGE, true, fs); fs.initialize(fs.getUri(), conf); try { - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); } catch (IOException e) {/* expected */} assertEquals("if skip.errors is false all files should remain in place", @@ -379,7 +410,8 @@ public class TestHLogSplit { public void testSplit() throws IOException { generateHLogs(-1); fs.initialize(fs.getUri(), conf); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); @@ -395,7 +427,8 @@ public class TestHLogSplit { throws IOException { generateHLogs(-1); fs.initialize(fs.getUri(), conf); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); FileStatus [] statuses = null; try { statuses = fs.listStatus(hlogDir); @@ -404,36 +437,34 @@ public class TestHLogSplit { // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null } } -/* DISABLED for now. TODO: HBASE-2645 - @Test - public void testLogCannotBeWrittenOnceParsed() throws IOException { - AtomicLong counter = new AtomicLong(0); - AtomicBoolean stop = new AtomicBoolean(false); - generateHLogs(9); - fs.initialize(fs.getUri(), conf); - - Thread zombie = new ZombieLastLogWriterRegionServer(writer[9], counter, stop); - - - try { - zombie.start(); - - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); - - Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, "juliet"); - - // It's possible that the writer got an error while appending and didn't count it - // however the entry will in fact be written to file and split with the rest - long numberOfEditsInRegion = countHLog(logfile, fs, conf); - assertTrue("The log file could have at most 1 extra log entry, but " + - "can't have less. Zombie could write "+counter.get() +" and logfile had only"+ numberOfEditsInRegion+" " + logfile, counter.get() == numberOfEditsInRegion || - counter.get() + 1 == numberOfEditsInRegion); - } finally { - stop.set(true); - } - } -*/ + /* + * DISABLED for now. TODO: HBASE-2645 + * + * @Test public void testLogCannotBeWrittenOnceParsed() throws IOException { + * AtomicLong counter = new AtomicLong(0); AtomicBoolean stop = new + * AtomicBoolean(false); generateHLogs(9); fs.initialize(fs.getUri(), conf); + * + * Thread zombie = new ZombieLastLogWriterRegionServer(writer[9], counter, + * stop); + * + * + * + * try { zombie.start(); + * + * logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + * + * Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, "juliet"); + * + * // It's possible that the writer got an error while appending and didn't + * count it // however the entry will in fact be written to file and split + * with the rest long numberOfEditsInRegion = countHLog(logfile, fs, conf); + * assertTrue("The log file could have at most 1 extra log entry, but " + + * "can't have less. Zombie could write "+counter.get() + * +" and logfile had only"+ numberOfEditsInRegion+" " + logfile, + * counter.get() == numberOfEditsInRegion || counter.get() + 1 == + * numberOfEditsInRegion); } finally { stop.set(true); } } + */ @Test public void testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted() @@ -446,7 +477,8 @@ public class TestHLogSplit { try { zombie.start(); try { - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); } catch (IOException ex) {/* expected */} int logFilesNumber = fs.listStatus(hlogDir).length; @@ -458,8 +490,6 @@ public class TestHLogSplit { } - - @Test(expected = IOException.class) public void testSplitWillFailIfWritingToRegionFails() throws Exception { //leave 5th log open so we could append the "trap" @@ -474,7 +504,8 @@ public class TestHLogSplit { try { InstrumentedSequenceFileLogWriter.activateFailure = true; - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); } catch (IOException e) { assertEquals("java.io.IOException: This exception is instrumented and should only be thrown for testing", e.getMessage()); @@ -496,7 +527,8 @@ public class TestHLogSplit { generateHLogs(1, 100, -1); fs.initialize(fs.getUri(), conf); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); fs.rename(oldLogDir, hlogDir); Path firstSplitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME) + ".first"); Path splitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME)); @@ -505,7 +537,7 @@ public class TestHLogSplit { fs.initialize(fs.getUri(), conf); - HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); + logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf); assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath)); } @@ -763,14 +795,14 @@ public class TestHLogSplit { FileStatus[] f1 = fs.listStatus(p1); FileStatus[] f2 = fs.listStatus(p2); - for (int i=0; i splits = HLog.splitLog(this.hbaseRootDir, this.logDir, + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c); + List splits = logSplitter.splitLog(this.hbaseRootDir, this.logDir, this.oldLogDir, fs, c); // Split should generate only 1 file since there's only 1 region assertEquals(1, splits.size());