diff --git a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
--- a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -180,7 +181,8 @@ public class MasterFileSystem {
this.splitLogLock.lock();
Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName));
try {
- HLog.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf);
+ HLogSplitter splitter = HLogSplitter.createLogSplitter(conf);
+ splitter.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf);
} catch (IOException e) {
LOG.error("Failed splitting " + logDir.toString(), e);
} finally {
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -51,12 +51,12 @@ import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@@ -2435,7 +2435,7 @@ public class HRegion implements HeapSize { // , Writable{
* @return Returns this
* @throws IOException
*/
- HRegion openHRegion(final Progressable reporter)
+ protected HRegion openHRegion(final Progressable reporter)
throws IOException {
long seqid = initialize(reporter);
if (this.log != null) {
@@ -3050,6 +3050,13 @@ public class HRegion implements HeapSize { // , Writable{
}
/**
+ * Give the region a chance to prepare before it is split.
+ */
+ protected void prepareToSplit() {
+ // nothing
+ }
+
+ /**
* Checks every store to see if one has too many
* store files
* @return true if any store has too many store files
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -164,4 +164,9 @@ class LogRoller extends Thread implements WALObserver {
WALEdit logEdit) {
// Not interested.
}
+
+ @Override
+ public void logCloseRequested() {
+ // not interested
+ }
}
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
@@ -134,6 +134,7 @@ class SplitTransaction {
public boolean prepare() {
if (this.parent.isClosed() || this.parent.isClosing()) return false;
HRegionInfo hri = this.parent.getRegionInfo();
+ parent.prepareToSplit();
// Check splitrow.
byte [] startKey = hri.getStartKey();
byte [] endKey = hri.getEndKey();
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -19,11 +19,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
-import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease;
-
import java.io.DataInput;
import java.io.DataOutput;
-import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
@@ -33,23 +30,14 @@ import java.lang.reflect.Method;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -70,10 +58,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -81,8 +66,6 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
/**
* HLog stores all the edits to the HStore. Its the hbase write-ahead-log
* implementation.
@@ -289,7 +272,7 @@ public class HLog implements Syncable {
public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
final Configuration conf)
throws IOException {
- this(fs, dir, oldLogDir, conf, null, null);
+ this(fs, dir, oldLogDir, conf, null, true, null);
}
/**
@@ -312,9 +295,35 @@ public class HLog implements Syncable {
* @throws IOException
*/
public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
- final Configuration conf, final List listeners,
- final String prefix)
- throws IOException {
+ final Configuration conf, final List listeners,
+ final String prefix) throws IOException {
+ this(fs, dir, oldLogDir, conf, listeners, true, prefix);
+ }
+
+ /**
+ * Create an edit log at the given dir location.
+ *
+ * You should never have to load an existing log. If there is a log at
+ * startup, it should have already been processed and deleted by the time the
+ * HLog object is started up.
+ *
+ * @param fs filesystem handle
+ * @param dir path to where hlogs are stored
+ * @param oldLogDir path to where hlogs are archived
+ * @param conf configuration to use
+ * @param listeners Listeners on WAL events. Listeners passed here will
+ * be registered before we do anything else; e.g. the
+ * Constructor {@link #rollWriter().
+ * @param failIfLogDirExists If true IOException will be thrown if dir already exists.
+ * @param prefix should always be hostname and port in distributed env and
+ * it will be URL encoded before being used.
+ * If prefix is null, "hlog" will be used
+ * @throws IOException
+ */
+ public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
+ final Configuration conf, final List listeners,
+ final boolean failIfLogDirExists, final String prefix)
+ throws IOException {
super();
this.fs = fs;
this.dir = dir;
@@ -333,7 +342,7 @@ public class HLog implements Syncable {
this.logrollsize = (long)(this.blocksize * multi);
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
- if (fs.exists(dir)) {
+ if (failIfLogDirExists && fs.exists(dir)) {
throw new IOException("Target HLog directory already exists: " + dir);
}
fs.mkdirs(dir);
@@ -464,7 +473,8 @@ public class HLog implements Syncable {
long currentFilenum = this.filenum;
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename();
- HLog.Writer nextWriter = createWriter(fs, newPath, HBaseConfiguration.create(conf));
+ HLog.Writer nextWriter = this.createWriterInstance(fs, newPath,
+ HBaseConfiguration.create(conf));
int nextInitialReplication = fs.getFileStatus(newPath).getReplication();
// Can we get at the dfsclient outputstream? If an instance of
// SFLW, it'll have done the necessary reflection to get at the
@@ -517,6 +527,21 @@ public class HLog implements Syncable {
}
/**
+ * This method allows subclasses to inject different writers without having to
+ * extend other methods like rollWriter().
+ *
+ * @param fs
+ * @param path
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ protected Writer createWriterInstance(final FileSystem fs, final Path path,
+ final Configuration conf) throws IOException {
+ return createWriter(fs, path, conf);
+ }
+
+ /**
* Get a reader for the WAL.
* @param fs
* @param path
@@ -529,8 +554,8 @@ public class HLog implements Syncable {
throws IOException {
try {
if (logReaderClass == null) {
- logReaderClass =conf.getClass("hbase.regionserver.hlog.reader.impl",
- SequenceFileLogReader.class, Reader.class);
+ logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
+ SequenceFileLogReader.class, Reader.class);
}
HLog.Reader reader = logReaderClass.newInstance();
@@ -557,7 +582,7 @@ public class HLog implements Syncable {
try {
if (logWriterClass == null) {
logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
- SequenceFileLogWriter.class, Writer.class);
+ SequenceFileLogWriter.class, Writer.class);
}
HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance();
writer.init(fs, path, conf);
@@ -757,6 +782,12 @@ public class HLog implements Syncable {
cacheFlushLock.lock();
try {
+ // Tell our listeners that the log is closing
+ if (!this.listeners.isEmpty()) {
+ for (WALObserver i : this.listeners) {
+ i.logCloseRequested();
+ }
+ }
synchronized (updateLock) {
this.closed = true;
if (LOG.isDebugEnabled()) {
@@ -1200,71 +1231,6 @@ public class HLog implements Syncable {
return Bytes.equals(METAFAMILY, family);
}
- /**
- * Split up a bunch of regionserver commit log files that are no longer
- * being written to, into new files, one per region for region to replay on
- * startup. Delete the old log files when finished.
- *
- * @param rootDir qualified root directory of the HBase instance
- * @param srcDir Directory of log files to split: e.g.
- * ${ROOTDIR}/log_HOST_PORT
- * @param oldLogDir directory where processed (split) logs will be archived to
- * @param fs FileSystem
- * @param conf Configuration
- * @throws IOException will throw if corrupted hlogs aren't tolerated
- * @return the list of splits
- */
- public static List splitLog(final Path rootDir, final Path srcDir,
- Path oldLogDir, final FileSystem fs, final Configuration conf)
- throws IOException {
-
- long millis = System.currentTimeMillis();
- List splits = null;
- if (!fs.exists(srcDir)) {
- // Nothing to do
- return splits;
- }
- FileStatus [] logfiles = fs.listStatus(srcDir);
- if (logfiles == null || logfiles.length == 0) {
- // Nothing to do
- return splits;
- }
- LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
- srcDir.toString());
- splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
- try {
- FileStatus[] files = fs.listStatus(srcDir);
- for(FileStatus file : files) {
- Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
- LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to " +
- FSUtils.getPath(newPath));
- fs.rename(file.getPath(), newPath);
- }
- LOG.debug("Moved " + files.length + " log files to " +
- FSUtils.getPath(oldLogDir));
- fs.delete(srcDir, true);
- } catch (IOException e) {
- e = RemoteExceptionHandler.checkIOException(e);
- IOException io = new IOException("Cannot delete: " + srcDir);
- io.initCause(e);
- throw io;
- }
- long endMillis = System.currentTimeMillis();
- LOG.info("hlog file splitting completed in " + (endMillis - millis) +
- " millis for " + srcDir.toString());
- return splits;
- }
-
- // Private immutable datastructure to hold Writer and its Path.
- private final static class WriterAndPath {
- final Path p;
- final Writer w;
- WriterAndPath(final Path p, final Writer w) {
- this.p = p;
- this.w = w;
- }
- }
-
@SuppressWarnings("unchecked")
public static Class extends HLogKey> getKeyClass(Configuration conf) {
return (Class extends HLogKey>)
@@ -1283,103 +1249,6 @@ public class HLog implements Syncable {
}
/**
- * Sorts the HLog edits in the given list of logfiles (that are a mix of edits on multiple regions)
- * by region and then splits them per region directories, in batches of (hbase.hlog.split.batch.size)
- *
- * A batch consists of a set of log files that will be sorted in a single map of edits indexed by region
- * the resulting map will be concurrently written by multiple threads to their corresponding regions
- *
- * Each batch consists of more more log files that are
- * - recovered (files is opened for append then closed to ensure no process is writing into it)
- * - parsed (each edit in the log is appended to a list of edits indexed by region
- * see {@link #parseHLog} for more details)
- * - marked as either processed or corrupt depending on parsing outcome
- * - the resulting edits indexed by region are concurrently written to their corresponding region
- * region directories
- * - original files are then archived to a different directory
- *
- *
- *
- * @param rootDir hbase directory
- * @param srcDir logs directory
- * @param oldLogDir directory where processed logs are archived to
- * @param logfiles the list of log files to split
- * @param fs
- * @param conf
- * @return
- * @throws IOException
- */
- private static List splitLog(final Path rootDir, final Path srcDir,
- Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
- final Configuration conf)
- throws IOException {
- List processedLogs = new ArrayList();
- List corruptedLogs = new ArrayList();
- final Map logWriters =
- Collections.synchronizedMap(
- new TreeMap(Bytes.BYTES_COMPARATOR));
- List splits = null;
-
- // Number of logs in a read batch
- // More means faster but bigger mem consumption
- //TODO make a note on the conf rename and update hbase-site.xml if needed
- int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
- boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
-
-
- try {
- int i = -1;
- while (i < logfiles.length) {
- final Map> editsByRegion =
- new TreeMap>(Bytes.BYTES_COMPARATOR);
- for (int j = 0; j < logFilesPerStep; j++) {
- i++;
- if (i == logfiles.length) {
- break;
- }
- FileStatus log = logfiles[i];
- Path logPath = log.getPath();
- long logLength = log.getLen();
- LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
- ": " + logPath + ", length=" + logLength );
- try {
- recoverFileLease(fs, logPath, conf);
- parseHLog(log, editsByRegion, fs, conf);
- processedLogs.add(logPath);
- } catch (EOFException eof) {
- // truncated files are expected if a RS crashes (see HBASE-2643)
- LOG.info("EOF from hlog " + logPath + ". continuing");
- processedLogs.add(logPath);
- } catch (IOException e) {
- if (skipErrors) {
- LOG.warn("Got while parsing hlog " + logPath +
- ". Marking as corrupted", e);
- corruptedLogs.add(logPath);
- } else {
- throw e;
- }
- }
- }
- writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
- }
- if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) {
- throw new IOException("Discovered orphan hlog after split. Maybe " +
- "HRegionServer was not dead when we started");
- }
- archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
- } finally {
- splits = new ArrayList(logWriters.size());
- for (WriterAndPath wap : logWriters.values()) {
- wap.w.close();
- splits.add(wap.p);
- LOG.debug("Closed " + wap.p);
- }
- }
- return splits;
- }
-
-
- /**
* Utility class that lets us keep track of the edit with it's key
* Only used when splitting logs
*/
@@ -1474,257 +1343,27 @@ public class HLog implements Syncable {
return dirName.toString();
}
- public static boolean validateHLogFilename(String filename) {
- return pattern.matcher(filename).matches();
- }
-
- private static Path getHLogArchivePath(Path oldLogDir, Path p) {
- return new Path(oldLogDir, p.getName());
- }
-
/**
- * Takes splitLogsMap and concurrently writes them to region directories using a thread pool
- *
- * @param splitLogsMap map that contains the log splitting result indexed by region
- * @param logWriters map that contains a writer per region
- * @param rootDir hbase root dir
- * @param fs
- * @param conf
- * @throws IOException
- */
- private static void writeEditsBatchToRegions(
- final Map> splitLogsMap,
- final Map logWriters,
- final Path rootDir, final FileSystem fs, final Configuration conf)
- throws IOException {
- // Number of threads to use when log splitting to rewrite the logs.
- // More means faster but bigger mem consumption.
- int logWriterThreads =
- conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
- boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
- HashMap writeFutureResult = new HashMap();
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- builder.setNameFormat("SplitWriter-%1$d");
- ThreadFactory factory = builder.build();
- ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, factory);
- for (final byte [] region : splitLogsMap.keySet()) {
- Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf);
- writeFutureResult.put(region, threadPool.submit(splitter));
- }
-
- threadPool.shutdown();
- // Wait for all threads to terminate
- try {
- for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) {
- String message = "Waiting for hlog writers to terminate, elapsed " + j * 5 + " seconds";
- if (j < 30) {
- LOG.debug(message);
- } else {
- LOG.info(message);
- }
-
- }
- } catch(InterruptedException ex) {
- LOG.warn("Hlog writers were interrupted, possible data loss!");
- if (!skipErrors) {
- throw new IOException("Could not finish writing log entries", ex);
- //TODO maybe we should fail here regardless if skipErrors is active or not
- }
- }
-
- for (Map.Entry entry : writeFutureResult.entrySet()) {
- try {
- entry.getValue().get();
- } catch (ExecutionException e) {
- throw (new IOException(e.getCause()));
- } catch (InterruptedException e1) {
- LOG.warn("Writer for region " + Bytes.toString(entry.getKey()) +
- " was interrupted, however the write process should have " +
- "finished. Throwing up ", e1);
- throw (new IOException(e1.getCause()));
- }
- }
- }
-
- /*
- * Parse a single hlog and put the edits in @splitLogsMap
- *
- * @param logfile to split
- * @param splitLogsMap output parameter: a map with region names as keys and a
- * list of edits as values
- * @param fs the filesystem
- * @param conf the configuration
- * @throws IOException if hlog is corrupted, or can't be open
+ * Get the directory we are making logs in.
+ *
+ * @return dir
*/
- private static void parseHLog(final FileStatus logfile,
- final Map> splitLogsMap, final FileSystem fs,
- final Configuration conf)
- throws IOException {
- // Check for possibly empty file. With appends, currently Hadoop reports a
- // zero length even if the file has been sync'd. Revisit if HDFS-376 or
- // HDFS-878 is committed.
- long length = logfile.getLen();
- if (length <= 0) {
- LOG.warn("File " + logfile.getPath() + " might be still open, length is 0");
- }
- Path path = logfile.getPath();
- Reader in;
- int editsCount = 0;
- try {
- in = HLog.getReader(fs, path, conf);
- } catch (EOFException e) {
- if (length <= 0) {
- //TODO should we ignore an empty, not-last log file if skip.errors is false?
- //Either way, the caller should decide what to do. E.g. ignore if this is the last
- //log in sequence.
- //TODO is this scenario still possible if the log has been recovered (i.e. closed)
- LOG.warn("Could not open " + path + " for reading. File is empty" + e);
- return;
- } else {
- throw e;
- }
- }
- try {
- Entry entry;
- while ((entry = in.next()) != null) {
- byte[] region = entry.getKey().getEncodedRegionName();
- LinkedList queue = splitLogsMap.get(region);
- if (queue == null) {
- queue = new LinkedList();
- splitLogsMap.put(region, queue);
- }
- queue.addLast(entry);
- editsCount++;
- }
- } finally {
- LOG.debug("Pushed=" + editsCount + " entries from " + path);
- try {
- if (in != null) {
- in.close();
- }
- } catch (IOException e) {
- LOG.warn("Close log reader in finally threw exception -- continuing", e);
- }
- }
+ protected Path getDir() {
+ return dir;
}
-
- private static Callable createNewSplitter(final Path rootDir,
- final Map logWriters,
- final Map> logEntries,
- final byte[] region, final FileSystem fs, final Configuration conf) {
- return new Callable() {
- public String getName() {
- return "Split writer thread for region " + Bytes.toStringBinary(region);
- }
-
- @Override
- public Void call() throws IOException {
- LinkedList entries = logEntries.get(region);
- LOG.debug(this.getName()+" got " + entries.size() + " to process");
- long threadTime = System.currentTimeMillis();
- try {
- int editsCount = 0;
- WriterAndPath wap = logWriters.get(region);
- for (Entry logEntry: entries) {
- if (wap == null) {
- Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir);
- if (fs.exists(regionedits)) {
- LOG.warn("Found existing old edits file. It could be the " +
- "result of a previous failed split attempt. Deleting " +
- regionedits + ", length=" + fs.getFileStatus(regionedits).getLen());
- if (!fs.delete(regionedits, false)) {
- LOG.warn("Failed delete of old " + regionedits);
- }
- }
- Writer w = createWriter(fs, regionedits, conf);
- wap = new WriterAndPath(regionedits, w);
- logWriters.put(region, wap);
- LOG.debug("Creating writer path=" + regionedits +
- " region=" + Bytes.toStringBinary(region));
- }
- wap.w.append(logEntry);
- editsCount++;
- }
- LOG.debug(this.getName() + " Applied " + editsCount +
- " total edits to " + Bytes.toStringBinary(region) +
- " in " + (System.currentTimeMillis() - threadTime) + "ms");
- } catch (IOException e) {
- e = RemoteExceptionHandler.checkIOException(e);
- LOG.fatal(this.getName() + " Got while writing log entry to log", e);
- throw e;
- }
- return null;
- }
- };
+
+ public static boolean validateHLogFilename(String filename) {
+ return pattern.matcher(filename).matches();
}
- /**
- * Moves processed logs to a oldLogDir after successful processing
- * Moves corrupted logs (any log that couldn't be successfully parsed
- * to corruptDir (.corrupt) for later investigation
- *
- * @param corruptedLogs
- * @param processedLogs
- * @param oldLogDir
- * @param fs
- * @param conf
- * @throws IOException
- */
- private static void archiveLogs(final List corruptedLogs,
- final List processedLogs, final Path oldLogDir,
- final FileSystem fs, final Configuration conf)
- throws IOException{
- final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR),
- conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
-
- fs.mkdirs(corruptDir);
- fs.mkdirs(oldLogDir);
-
- for (Path corrupted: corruptedLogs) {
- Path p = new Path(corruptDir, corrupted.getName());
- LOG.info("Moving corrupted log " + corrupted + " to " + p);
- fs.rename(corrupted, p);
- }
-
- for (Path p: processedLogs) {
- Path newPath = getHLogArchivePath(oldLogDir, p);
- fs.rename(p, newPath);
- LOG.info("Archived processed log " + p + " to " + newPath);
- }
+ static Path getHLogArchivePath(Path oldLogDir, Path p) {
+ return new Path(oldLogDir, p.getName());
}
- /*
- * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
- * logEntry named for the sequenceid in the passed
- * logEntry: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
- * This method also ensures existence of RECOVERED_EDITS_DIR under the region
- * creating it if necessary.
- * @param fs
- * @param logEntry
- * @param rootDir HBase root dir.
- * @return Path to file into which to dump split log edits.
- * @throws IOException
- */
- private static Path getRegionSplitEditsPath(final FileSystem fs,
- final Entry logEntry, final Path rootDir)
- throws IOException {
- Path tableDir = HTableDescriptor.getTableDir(rootDir,
- logEntry.getKey().getTablename());
- Path regiondir = HRegion.getRegionDir(tableDir,
- Bytes.toString(logEntry.getKey().getEncodedRegionName()));
- Path dir = getRegionDirRecoveredEditsDir(regiondir);
- if (!fs.exists(dir)) {
- if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
- }
- return new Path(dir,
- formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum()));
- }
-
static String formatRecoveredEditsFileName(final long seqid) {
return String.format("%019d", seqid);
}
-
/**
* Returns sorted set of edit files made by wal-log splitter.
* @param fs
@@ -1736,7 +1375,7 @@ public class HLog implements Syncable {
final Path regiondir)
throws IOException {
Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
- FileStatus [] files = fs.listStatus(editsdir, new PathFilter () {
+ FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
boolean result = false;
@@ -1835,7 +1474,9 @@ public class HLog implements Syncable {
if (!fs.getFileStatus(p).isDir()) {
throw new IOException(p + " is not a directory");
}
- splitLog(baseDir, p, oldLogDir, fs, conf);
+
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(baseDir, p, oldLogDir, fs, conf);
}
/**
@@ -1874,4 +1515,4 @@ public class HLog implements Syncable {
}
}
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -0,0 +1,548 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+import com.google.common.util.concurrent.NamingThreadFactory;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This class is responsible for splitting up a bunch of regionserver commit log
+ * files that are no longer being written to, into new files, one per region for
+ * region to replay on startup. Delete the old log files when finished.
+ */
+public class HLogSplitter {
+
+ private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
+
+ static final Log LOG = LogFactory.getLog(HLogSplitter.class);
+
+ /**
+ * Name of file that holds recovered edits written by the wal log splitting
+ * code, one per region
+ */
+ public static final String RECOVERED_EDITS = "recovered.edits";
+
+ /**
+ * Create a new HLogSplitter using the given {@link Configuration} and the
+ * hbase.hlog.splitter.impl property to derived the instance
+ * class to use.
+ *
+ * @param conf
+ * @return New HLogSplitter instance
+ */
+ public static HLogSplitter createLogSplitter(Configuration conf) {
+ @SuppressWarnings("unchecked")
+ Class extends HLogSplitter> splitterClass = (Class extends HLogSplitter>) conf
+ .getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
+ try {
+ return splitterClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+
+ // Private immutable datastructure to hold Writer and its Path.
+ private final static class WriterAndPath {
+ final Path p;
+ final Writer w;
+
+ WriterAndPath(final Path p, final Writer w) {
+ this.p = p;
+ this.w = w;
+ }
+ }
+
+ /**
+ * Split up a bunch of regionserver commit log files that are no longer being
+ * written to, into new files, one per region for region to replay on startup.
+ * Delete the old log files when finished.
+ *
+ * @param rootDir
+ * qualified root directory of the HBase instance
+ * @param srcDir
+ * Directory of log files to split: e.g.
+ * ${ROOTDIR}/log_HOST_PORT
+ * @param oldLogDir
+ * directory where processed (split) logs will be archived to
+ * @param fs
+ * FileSystem
+ * @param conf
+ * Configuration
+ * @throws IOException
+ * will throw if corrupted hlogs aren't tolerated
+ * @return the list of splits
+ */
+ public List splitLog(final Path rootDir, final Path srcDir,
+ Path oldLogDir, final FileSystem fs, final Configuration conf)
+ throws IOException {
+
+ long millis = System.currentTimeMillis();
+ List splits = null;
+ if (!fs.exists(srcDir)) {
+ // Nothing to do
+ return splits;
+ }
+ FileStatus[] logfiles = fs.listStatus(srcDir);
+ if (logfiles == null || logfiles.length == 0) {
+ // Nothing to do
+ return splits;
+ }
+ LOG.info("Splitting " + logfiles.length + " hlog(s) in "
+ + srcDir.toString());
+ splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
+ try {
+ FileStatus[] files = fs.listStatus(srcDir);
+ for (FileStatus file : files) {
+ Path newPath = HLog.getHLogArchivePath(oldLogDir, file.getPath());
+ LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to "
+ + FSUtils.getPath(newPath));
+ fs.rename(file.getPath(), newPath);
+ }
+ LOG.debug("Moved " + files.length + " log files to "
+ + FSUtils.getPath(oldLogDir));
+ fs.delete(srcDir, true);
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ IOException io = new IOException("Cannot delete: " + srcDir);
+ io.initCause(e);
+ throw io;
+ }
+ long endMillis = System.currentTimeMillis();
+ LOG.info("hlog file splitting completed in " + (endMillis - millis)
+ + " millis for " + srcDir.toString());
+ return splits;
+ }
+
+ /**
+ * Sorts the HLog edits in the given list of logfiles (that are a mix of edits
+ * on multiple regions) by region and then splits them per region directories,
+ * in batches of (hbase.hlog.split.batch.size)
+ *
+ * A batch consists of a set of log files that will be sorted in a single map
+ * of edits indexed by region the resulting map will be concurrently written
+ * by multiple threads to their corresponding regions
+ *
+ * Each batch consists of more more log files that are - recovered (files is
+ * opened for append then closed to ensure no process is writing into it) -
+ * parsed (each edit in the log is appended to a list of edits indexed by
+ * region see {@link #parseHLog} for more details) - marked as either
+ * processed or corrupt depending on parsing outcome - the resulting edits
+ * indexed by region are concurrently written to their corresponding region
+ * region directories - original files are then archived to a different
+ * directory
+ *
+ *
+ *
+ * @param rootDir
+ * hbase directory
+ * @param srcDir
+ * logs directory
+ * @param oldLogDir
+ * directory where processed logs are archived to
+ * @param logfiles
+ * the list of log files to split
+ * @param fs
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ private List splitLog(final Path rootDir, final Path srcDir,
+ Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
+ final Configuration conf) throws IOException {
+ List processedLogs = new ArrayList();
+ List corruptedLogs = new ArrayList();
+ final Map logWriters = Collections
+ .synchronizedMap(new TreeMap(
+ Bytes.BYTES_COMPARATOR));
+ List splits = null;
+
+ // Number of logs in a read batch
+ // More means faster but bigger mem consumption
+ // TODO make a note on the conf rename and update hbase-site.xml if needed
+ int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
+ boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
+
+ try {
+ int i = -1;
+ while (i < logfiles.length) {
+ final Map> editsByRegion = new TreeMap>(
+ Bytes.BYTES_COMPARATOR);
+ for (int j = 0; j < logFilesPerStep; j++) {
+ i++;
+ if (i == logfiles.length) {
+ break;
+ }
+ FileStatus log = logfiles[i];
+ Path logPath = log.getPath();
+ long logLength = log.getLen();
+ LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length
+ + ": " + logPath + ", length=" + logLength);
+ try {
+ recoverFileLease(fs, logPath, conf);
+ parseHLog(log, editsByRegion, fs, conf);
+ processedLogs.add(logPath);
+ } catch (EOFException eof) {
+ // truncated files are expected if a RS crashes (see HBASE-2643)
+ LOG.info("EOF from hlog " + logPath + ". continuing");
+ processedLogs.add(logPath);
+ } catch (IOException e) {
+ if (skipErrors) {
+ LOG.warn("Got while parsing hlog " + logPath
+ + ". Marking as corrupted", e);
+ corruptedLogs.add(logPath);
+ } else {
+ throw e;
+ }
+ }
+ }
+ writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
+ }
+ if (fs.listStatus(srcDir).length > processedLogs.size()
+ + corruptedLogs.size()) {
+ throw new IOException("Discovered orphan hlog after split. Maybe "
+ + "HRegionServer was not dead when we started");
+ }
+ archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
+ } finally {
+ splits = new ArrayList(logWriters.size());
+ for (WriterAndPath wap : logWriters.values()) {
+ wap.w.close();
+ splits.add(wap.p);
+ LOG.debug("Closed " + wap.p);
+ }
+ }
+ return splits;
+ }
+
+
+ /**
+ * Takes splitLogsMap and concurrently writes them to region directories using a thread pool
+ *
+ * @param splitLogsMap map that contains the log splitting result indexed by region
+ * @param logWriters map that contains a writer per region
+ * @param rootDir hbase root dir
+ * @param fs
+ * @param conf
+ * @throws IOException
+ */
+ private void writeEditsBatchToRegions(
+ final Map> splitLogsMap,
+ final Map logWriters, final Path rootDir,
+ final FileSystem fs, final Configuration conf)
+ throws IOException {
+ // Number of threads to use when log splitting to rewrite the logs.
+ // More means faster but bigger mem consumption.
+ int logWriterThreads = conf.getInt(
+ "hbase.regionserver.hlog.splitlog.writer.threads", 3);
+ boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
+ HashMap writeFutureResult = new HashMap();
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ builder.setNameFormat("SplitWriter-%1$d");
+ ThreadFactory factory = builder.build();
+ ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, factory);
+ for (final byte[] region : splitLogsMap.keySet()) {
+ Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap,
+ region, fs, conf);
+ writeFutureResult.put(region, threadPool.submit(splitter));
+ }
+
+ threadPool.shutdown();
+ // Wait for all threads to terminate
+ try {
+ for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) {
+ String message = "Waiting for hlog writers to terminate, elapsed " + j
+ * 5 + " seconds";
+ if (j < 30) {
+ LOG.debug(message);
+ } else {
+ LOG.info(message);
+ }
+
+ }
+ } catch (InterruptedException ex) {
+ LOG.warn("Hlog writers were interrupted, possible data loss!");
+ if (!skipErrors) {
+ throw new IOException("Could not finish writing log entries", ex);
+ // TODO maybe we should fail here regardless if skipErrors is active or not
+ }
+ }
+
+ for (Map.Entry entry : writeFutureResult.entrySet()) {
+ try {
+ entry.getValue().get();
+ } catch (ExecutionException e) {
+ throw (new IOException(e.getCause()));
+ } catch (InterruptedException e1) {
+ LOG.warn("Writer for region " + Bytes.toString(entry.getKey())
+ + " was interrupted, however the write process should have "
+ + "finished. Throwing up ", e1);
+ throw (new IOException(e1.getCause()));
+ }
+ }
+ }
+
+ /**
+ * Moves processed logs to a oldLogDir after successful processing Moves
+ * corrupted logs (any log that couldn't be successfully parsed to corruptDir
+ * (.corrupt) for later investigation
+ *
+ * @param corruptedLogs
+ * @param processedLogs
+ * @param oldLogDir
+ * @param fs
+ * @param conf
+ * @throws IOException
+ */
+ private static void archiveLogs(final List corruptedLogs,
+ final List processedLogs, final Path oldLogDir,
+ final FileSystem fs, final Configuration conf) throws IOException {
+ final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
+ "hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
+
+ fs.mkdirs(corruptDir);
+ fs.mkdirs(oldLogDir);
+
+ for (Path corrupted : corruptedLogs) {
+ Path p = new Path(corruptDir, corrupted.getName());
+ LOG.info("Moving corrupted log " + corrupted + " to " + p);
+ fs.rename(corrupted, p);
+ }
+
+ for (Path p : processedLogs) {
+ Path newPath = HLog.getHLogArchivePath(oldLogDir, p);
+ fs.rename(p, newPath);
+ LOG.info("Archived processed log " + p + " to " + newPath);
+ }
+ }
+
+ /**
+ * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
+ * logEntry named for the sequenceid in the passed
+ * logEntry: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
+ * This method also ensures existence of RECOVERED_EDITS_DIR under the region
+ * creating it if necessary.
+ * @param fs
+ * @param logEntry
+ * @param rootDir HBase root dir.
+ * @return Path to file into which to dump split log edits.
+ * @throws IOException
+ */
+ static Path getRegionSplitEditsPath(final FileSystem fs,
+ final Entry logEntry, final Path rootDir) throws IOException {
+ Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey()
+ .getTablename());
+ Path regiondir = HRegion.getRegionDir(tableDir,
+ Bytes.toString(logEntry.getKey().getEncodedRegionName()));
+ Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir);
+ if (!fs.exists(dir)) {
+ if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
+ }
+ return new Path(dir, formatRecoveredEditsFileName(logEntry.getKey()
+ .getLogSeqNum()));
+ }
+
+ static String formatRecoveredEditsFileName(final long seqid) {
+ return String.format("%019d", seqid);
+ }
+
+ /*
+ * Parse a single hlog and put the edits in @splitLogsMap
+ *
+ * @param logfile to split
+ * @param splitLogsMap output parameter: a map with region names as keys and a
+ * list of edits as values
+ * @param fs the filesystem
+ * @param conf the configuration
+ * @throws IOException if hlog is corrupted, or can't be open
+ */
+ private void parseHLog(final FileStatus logfile,
+ final Map> splitLogsMap, final FileSystem fs,
+ final Configuration conf)
+ throws IOException {
+ // Check for possibly empty file. With appends, currently Hadoop reports a
+ // zero length even if the file has been sync'd. Revisit if HDFS-376 or
+ // HDFS-878 is committed.
+ long length = logfile.getLen();
+ if (length <= 0) {
+ LOG.warn("File " + logfile.getPath() + " might be still open, length is 0");
+ }
+ Path path = logfile.getPath();
+ Reader in;
+ int editsCount = 0;
+ try {
+ in = getReader(fs, path, conf);
+ } catch (EOFException e) {
+ if (length <= 0) {
+ //TODO should we ignore an empty, not-last log file if skip.errors is false?
+ //Either way, the caller should decide what to do. E.g. ignore if this is the last
+ //log in sequence.
+ //TODO is this scenario still possible if the log has been recovered (i.e. closed)
+ LOG.warn("Could not open " + path + " for reading. File is empty" + e);
+ return;
+ } else {
+ throw e;
+ }
+ }
+ try {
+ Entry entry;
+ while ((entry = in.next()) != null) {
+ byte[] region = entry.getKey().getEncodedRegionName();
+ LinkedList queue = splitLogsMap.get(region);
+ if (queue == null) {
+ queue = new LinkedList();
+ splitLogsMap.put(region, queue);
+ }
+ queue.addLast(entry);
+ editsCount++;
+ }
+ } finally {
+ LOG.debug("Pushed=" + editsCount + " entries from " + path);
+ try {
+ if (in != null) {
+ in.close();
+ }
+ } catch (IOException e) {
+ LOG
+ .warn("Close log reader in finally threw exception -- continuing",
+ e);
+ }
+ }
+ }
+
+ private Callable createNewSplitter(final Path rootDir,
+ final Map logWriters,
+ final Map> logEntries, final byte[] region,
+ final FileSystem fs, final Configuration conf) {
+ return new Callable() {
+ public String getName() {
+ return "Split writer thread for region " + Bytes.toStringBinary(region);
+ }
+
+ @Override
+ public Void call() throws IOException {
+ LinkedList entries = logEntries.get(region);
+ LOG.debug(this.getName() + " got " + entries.size() + " to process");
+ long threadTime = System.currentTimeMillis();
+ try {
+ int editsCount = 0;
+ WriterAndPath wap = logWriters.get(region);
+ for (Entry logEntry : entries) {
+ if (wap == null) {
+ Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir);
+ if (fs.exists(regionedits)) {
+ LOG.warn("Found existing old edits file. It could be the "
+ + "result of a previous failed split attempt. Deleting "
+ + regionedits + ", length="
+ + fs.getFileStatus(regionedits).getLen());
+ if (!fs.delete(regionedits, false)) {
+ LOG.warn("Failed delete of old " + regionedits);
+ }
+ }
+ Writer w = createWriter(fs, regionedits, conf);
+ wap = new WriterAndPath(regionedits, w);
+ logWriters.put(region, wap);
+ LOG.debug("Creating writer path=" + regionedits + " region="
+ + Bytes.toStringBinary(region));
+ }
+ wap.w.append(logEntry);
+ editsCount++;
+ }
+ LOG.debug(this.getName() + " Applied " + editsCount
+ + " total edits to " + Bytes.toStringBinary(region) + " in "
+ + (System.currentTimeMillis() - threadTime) + "ms");
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ LOG.fatal(this.getName() + " Got while writing log entry to log", e);
+ throw e;
+ }
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Create a new {@link Writer} for writing log splits.
+ *
+ * @param fs
+ * @param logfile
+ * @param conf
+ * @return A new Writer instance
+ * @throws IOException
+ */
+ protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
+ throws IOException {
+ return HLog.createWriter(fs, logfile, conf);
+ }
+
+ /**
+ * Create a new {@link Reader} for reading logs to split.
+ *
+ * @param fs
+ * @param curLogFile
+ * @param conf
+ * @return A new Reader instance
+ * @throws IOException
+ */
+ protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
+ throws IOException {
+ return HLog.getReader(fs, curLogFile, conf);
+ }
+
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
@@ -27,8 +27,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.io.SequenceFile;
import org.mortbay.log.Log;
@@ -99,7 +97,24 @@ public class SequenceFileLogReader implements HLog.Reader {
Path path;
int edit = 0;
- public SequenceFileLogReader() { }
+ private Class extends HLogKey> keyClass;
+
+ /**
+ * Default constructor.
+ */
+ public SequenceFileLogReader() {
+ }
+
+ /**
+ * This constructor allows a specific HLogKey implementation to override that
+ * which would otherwise be chosen via configuration property.
+ *
+ * @param keyClass
+ */
+ public SequenceFileLogReader(Class extends HLogKey> keyClass) {
+ this.keyClass = keyClass;
+ }
+
@Override
public void init(FileSystem fs, Path path, Configuration conf)
@@ -127,7 +142,19 @@ public class SequenceFileLogReader implements HLog.Reader {
public HLog.Entry next(HLog.Entry reuse) throws IOException {
HLog.Entry e = reuse;
if (e == null) {
- HLogKey key = HLog.newKey(conf);
+ HLogKey key;
+ if (keyClass == null) {
+ key = HLog.newKey(conf);
+ } else {
+ try {
+ key = keyClass.newInstance();
+ } catch (InstantiationException ie) {
+ throw new IOException(ie);
+ } catch (IllegalAccessException iae) {
+ throw new IOException(iae);
+ }
+ }
+
WALEdit val = new WALEdit();
e = new HLog.Entry(key, val);
}
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
@@ -48,21 +48,41 @@ public class SequenceFileLogWriter implements HLog.Writer {
// The syncFs method from hdfs-200 or null if not available.
private Method syncFs;
+ private Class extends HLogKey> keyClass;
+
+ /**
+ * Default constructor.
+ */
public SequenceFileLogWriter() {
super();
}
+ /**
+ * This constructor allows a specific HLogKey implementation to override that
+ * which would otherwise be chosen via configuration property.
+ *
+ * @param keyClass
+ */
+ public SequenceFileLogWriter(Class extends HLogKey> keyClass) {
+ this.keyClass = keyClass;
+ }
+
@Override
public void init(FileSystem fs, Path path, Configuration conf)
throws IOException {
+
+ if (null == keyClass) {
+ keyClass = HLog.getKeyClass(conf);
+ }
+
// Create a SF.Writer instance.
this.writer = SequenceFile.createWriter(fs, conf, path,
- HLog.getKeyClass(conf), WALEdit.class,
+ keyClass, WALEdit.class,
fs.getConf().getInt("io.file.buffer.size", 4096),
(short) conf.getInt("hbase.regionserver.hlog.replication",
- fs.getDefaultReplication()),
+ fs.getDefaultReplication()),
conf.getLong("hbase.regionserver.hlog.blocksize",
- fs.getDefaultBlockSize()),
+ fs.getDefaultBlockSize()),
SequenceFile.CompressionType.NONE,
new DefaultCodec(),
null,
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALObserver.java
@@ -39,6 +39,11 @@ public interface WALObserver {
public void logRollRequested();
/**
+ * The WAL is about to close.
+ */
+ public void logCloseRequested();
+
+ /**
* Called before each write.
* @param info
* @param logKey
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
--- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -162,4 +162,9 @@ public class Replication implements WALObserver {
public void logRollRequested() {
// Not interested
}
+
+ @Override
+ public void logCloseRequested() {
+ // not interested
+ }
}
\ No newline at end of file
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
@@ -25,6 +25,10 @@ import org.apache.hadoop.hbase.util.Bytes;
public class InstrumentedSequenceFileLogWriter extends SequenceFileLogWriter {
+ public InstrumentedSequenceFileLogWriter() {
+ super(HLogKey.class);
+ }
+
public static boolean activateFailure = false;
@Override
public void append(HLog.Entry entry) throws IOException {
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
@@ -160,9 +160,11 @@ public class TestHLog {
log.rollWriter();
}
log.close();
- Path splitsdir = new Path(dir, "splits");
+ Path splitsdir = new Path(this.dir, "splits");
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
List splits =
- HLog.splitLog(splitsdir, logdir, oldLogDir, fs, conf);
+ logSplitter.splitLog(splitsdir, logdir,
+ this.oldLogDir, this.fs, conf);
verifySplits(splits, howmany);
log = null;
} finally {
@@ -313,11 +315,11 @@ public class TestHLog {
}
}
- // For this test to pass, requires:
+ // For this test to pass, requires:
// 1. HDFS-200 (append support)
- // 2. HDFS-988 (SafeMode should freeze file operations
+ // 2. HDFS-988 (SafeMode should freeze file operations
// [FSNamesystem.nextGenerationStampForBlock])
- // 3. HDFS-142 (on restart, maintain pendingCreates)
+ // 3. HDFS-142 (on restart, maintain pendingCreates)
@Test
public void testAppendClose() throws Exception {
byte [] tableName = Bytes.toBytes(getName());
@@ -344,7 +346,7 @@ public class TestHLog {
cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
cluster.shutdown();
try {
- // wal.writer.close() will throw an exception,
+ // wal.writer.close() will throw an exception,
// but still call this since it closes the LogSyncer thread first
wal.close();
} catch (IOException e) {
@@ -369,7 +371,7 @@ public class TestHLog {
Method setLeasePeriod = cluster.getClass()
.getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
setLeasePeriod.setAccessible(true);
- setLeasePeriod.invoke(cluster,
+ setLeasePeriod.invoke(cluster,
new Object[]{new Long(1000), new Long(1000)});
try {
Thread.sleep(1000);
@@ -405,8 +407,8 @@ public class TestHLog {
throw t.exception;
// Make sure you can read all the content
- SequenceFile.Reader reader
- = new SequenceFile.Reader(fs, walPath, conf);
+ SequenceFile.Reader reader
+ = new SequenceFile.Reader(this.fs, walPath, this.conf);
int count = 0;
HLogKey key = HLog.newKey(conf);
WALEdit val = new WALEdit();
@@ -606,5 +608,10 @@ public class TestHLog {
// TODO Auto-generated method stub
}
+
+ @Override
+ public void logCloseRequested() {
+ // not interested
+ }
}
}
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
@@ -47,14 +47,16 @@ public class TestHLogMethods {
fs.delete(regiondir, true);
fs.mkdirs(regiondir);
Path recoverededits = HLog.getRegionDirRecoveredEditsDir(regiondir);
- String first = HLog.formatRecoveredEditsFileName(-1);
+ String first = HLogSplitter.formatRecoveredEditsFileName(-1);
createFile(fs, recoverededits, first);
- createFile(fs, recoverededits, HLog.formatRecoveredEditsFileName(0));
- createFile(fs, recoverededits, HLog.formatRecoveredEditsFileName(1));
- createFile(fs, recoverededits, HLog.formatRecoveredEditsFileName(11));
- createFile(fs, recoverededits, HLog.formatRecoveredEditsFileName(2));
- createFile(fs, recoverededits, HLog.formatRecoveredEditsFileName(50));
- String last = HLog.formatRecoveredEditsFileName(Long.MAX_VALUE);
+ createFile(fs, recoverededits, HLogSplitter.formatRecoveredEditsFileName(0));
+ createFile(fs, recoverededits, HLogSplitter.formatRecoveredEditsFileName(1));
+ createFile(fs, recoverededits, HLogSplitter
+ .formatRecoveredEditsFileName(11));
+ createFile(fs, recoverededits, HLogSplitter.formatRecoveredEditsFileName(2));
+ createFile(fs, recoverededits, HLogSplitter
+ .formatRecoveredEditsFileName(50));
+ String last = HLogSplitter.formatRecoveredEditsFileName(Long.MAX_VALUE);
createFile(fs, recoverededits, last);
createFile(fs, recoverededits,
Long.toString(Long.MAX_VALUE) + "." + System.currentTimeMillis());
@@ -63,13 +65,17 @@ public class TestHLogMethods {
assertEquals(files.pollFirst().getName(), first);
assertEquals(files.pollLast().getName(), last);
assertEquals(files.pollFirst().getName(),
- HLog.formatRecoveredEditsFileName(0));
+ HLogSplitter
+ .formatRecoveredEditsFileName(0));
assertEquals(files.pollFirst().getName(),
- HLog.formatRecoveredEditsFileName(1));
+ HLogSplitter
+ .formatRecoveredEditsFileName(1));
assertEquals(files.pollFirst().getName(),
- HLog.formatRecoveredEditsFileName(2));
+ HLogSplitter
+ .formatRecoveredEditsFileName(2));
assertEquals(files.pollFirst().getName(),
- HLog.formatRecoveredEditsFileName(11));
+ HLogSplitter
+ .formatRecoveredEditsFileName(11));
}
private void createFile(final FileSystem fs, final Path testdir,
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
@@ -81,7 +82,6 @@ public class TestHLogSplit {
private static List regions;
private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
-
static enum Corruptions {
INSERT_GARBAGE_ON_FIRST_LINE,
INSERT_GARBAGE_IN_THE_MIDDLE,
@@ -129,6 +129,22 @@ public class TestHLogSplit {
public void tearDown() throws Exception {
}
+ /**
+ * @throws IOException
+ * @see https://issues.apache.org/jira/browse/HBASE-3020
+ */
+ @Test public void testRecoveredEditsPathForMeta() throws IOException {
+ FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
+ byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
+ long now = System.currentTimeMillis();
+ HLog.Entry entry =
+ new HLog.Entry(new HLogKey(encoded, HConstants.META_TABLE_NAME, 1, now),
+ new WALEdit());
+ Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, new Path("/"));
+ assertEquals(p.getParent().getParent(),
+ HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
+ }
+
@Test(expected = IOException.class)
public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted()
throws IOException {
@@ -137,7 +153,8 @@ public class TestHLogSplit {
fs.initialize(fs.getUri(), conf);
try {
(new ZombieNewLogWriterRegionServer(stop)).start();
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
} finally {
stop.set(true);
}
@@ -152,7 +169,8 @@ public class TestHLogSplit {
generateHLogs(1, 10, -1);
fs.initialize(fs.getUri(), conf);
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
@@ -172,7 +190,8 @@ public class TestHLogSplit {
// initialize will create a new DFSClient with a new client ID
fs.initialize(fs.getUri(), conf);
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
@@ -193,7 +212,8 @@ public class TestHLogSplit {
// initialize will create a new DFSClient with a new client ID
fs.initialize(fs.getUri(), conf);
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@@ -208,7 +228,8 @@ public class TestHLogSplit {
fs.initialize(fs.getUri(), conf);
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@@ -226,7 +247,9 @@ public class TestHLogSplit {
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
Corruptions.APPEND_GARBAGE, true, fs);
fs.initialize(fs.getUri(), conf);
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@@ -243,7 +266,9 @@ public class TestHLogSplit {
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
fs.initialize(fs.getUri(), conf);
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@@ -261,7 +286,8 @@ public class TestHLogSplit {
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
fs.initialize(fs.getUri(), conf);
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@@ -289,7 +315,8 @@ public class TestHLogSplit {
corruptHLog(c3, Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
fs.initialize(fs.getUri(), conf);
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
FileStatus[] archivedLogs = fs.listStatus(corruptDir);
@@ -314,7 +341,8 @@ public class TestHLogSplit {
corruptHLog(c1, Corruptions.TRUNCATE, true, fs);
fs.initialize(fs.getUri(), conf);
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
@@ -337,7 +365,8 @@ public class TestHLogSplit {
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
FileStatus[] archivedLogs = fs.listStatus(oldLogDir);
@@ -355,7 +384,8 @@ public class TestHLogSplit {
Corruptions.APPEND_GARBAGE, true, fs);
fs.initialize(fs.getUri(), conf);
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
}
// TODO: fix this test (HBASE-2935)
@@ -367,7 +397,8 @@ public class TestHLogSplit {
Corruptions.APPEND_GARBAGE, true, fs);
fs.initialize(fs.getUri(), conf);
try {
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
} catch (IOException e) {/* expected */}
assertEquals("if skip.errors is false all files should remain in place",
@@ -379,7 +410,8 @@ public class TestHLogSplit {
public void testSplit() throws IOException {
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
@@ -395,7 +427,8 @@ public class TestHLogSplit {
throws IOException {
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
FileStatus [] statuses = null;
try {
statuses = fs.listStatus(hlogDir);
@@ -404,36 +437,34 @@ public class TestHLogSplit {
// hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
}
}
-/* DISABLED for now. TODO: HBASE-2645
- @Test
- public void testLogCannotBeWrittenOnceParsed() throws IOException {
- AtomicLong counter = new AtomicLong(0);
- AtomicBoolean stop = new AtomicBoolean(false);
- generateHLogs(9);
- fs.initialize(fs.getUri(), conf);
-
- Thread zombie = new ZombieLastLogWriterRegionServer(writer[9], counter, stop);
-
-
- try {
- zombie.start();
-
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
-
- Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, "juliet");
-
- // It's possible that the writer got an error while appending and didn't count it
- // however the entry will in fact be written to file and split with the rest
- long numberOfEditsInRegion = countHLog(logfile, fs, conf);
- assertTrue("The log file could have at most 1 extra log entry, but " +
- "can't have less. Zombie could write "+counter.get() +" and logfile had only"+ numberOfEditsInRegion+" " + logfile, counter.get() == numberOfEditsInRegion ||
- counter.get() + 1 == numberOfEditsInRegion);
- } finally {
- stop.set(true);
- }
- }
-*/
+ /*
+ * DISABLED for now. TODO: HBASE-2645
+ *
+ * @Test public void testLogCannotBeWrittenOnceParsed() throws IOException {
+ * AtomicLong counter = new AtomicLong(0); AtomicBoolean stop = new
+ * AtomicBoolean(false); generateHLogs(9); fs.initialize(fs.getUri(), conf);
+ *
+ * Thread zombie = new ZombieLastLogWriterRegionServer(writer[9], counter,
+ * stop);
+ *
+ *
+ *
+ * try { zombie.start();
+ *
+ * logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ *
+ * Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, "juliet");
+ *
+ * // It's possible that the writer got an error while appending and didn't
+ * count it // however the entry will in fact be written to file and split
+ * with the rest long numberOfEditsInRegion = countHLog(logfile, fs, conf);
+ * assertTrue("The log file could have at most 1 extra log entry, but " +
+ * "can't have less. Zombie could write "+counter.get()
+ * +" and logfile had only"+ numberOfEditsInRegion+" " + logfile,
+ * counter.get() == numberOfEditsInRegion || counter.get() + 1 ==
+ * numberOfEditsInRegion); } finally { stop.set(true); } }
+ */
@Test
public void testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted()
@@ -446,7 +477,8 @@ public class TestHLogSplit {
try {
zombie.start();
try {
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
} catch (IOException ex) {/* expected */}
int logFilesNumber = fs.listStatus(hlogDir).length;
@@ -458,8 +490,6 @@ public class TestHLogSplit {
}
-
-
@Test(expected = IOException.class)
public void testSplitWillFailIfWritingToRegionFails() throws Exception {
//leave 5th log open so we could append the "trap"
@@ -474,7 +504,8 @@ public class TestHLogSplit {
try {
InstrumentedSequenceFileLogWriter.activateFailure = true;
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
} catch (IOException e) {
assertEquals("java.io.IOException: This exception is instrumented and should only be thrown for testing", e.getMessage());
@@ -496,7 +527,8 @@ public class TestHLogSplit {
generateHLogs(1, 100, -1);
fs.initialize(fs.getUri(), conf);
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
fs.rename(oldLogDir, hlogDir);
Path firstSplitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME) + ".first");
Path splitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME));
@@ -505,7 +537,7 @@ public class TestHLogSplit {
fs.initialize(fs.getUri(), conf);
- HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
}
@@ -763,14 +795,14 @@ public class TestHLogSplit {
FileStatus[] f1 = fs.listStatus(p1);
FileStatus[] f2 = fs.listStatus(p2);
- for (int i=0; i splits = HLog.splitLog(this.hbaseRootDir, this.logDir,
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c);
+ List splits = logSplitter.splitLog(this.hbaseRootDir, this.logDir,
this.oldLogDir, fs, c);
// Split should generate only 1 file since there's only 1 region
assertEquals(1, splits.size());