diff --git b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index 19c9b54..e4822e4 100644
--- b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -402,10 +403,11 @@ public class HLog implements HConstants, Syncable {
HLog.Reader reader = (HLog.Reader) c.newInstance();
reader.init(fs, path, conf);
return reader;
- } catch (Exception e) {
- IOException ie = new IOException("cannot get log reader");
- ie.initCause(e);
- throw ie;
+ }catch (IOException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IOException("Cannot get log reader", e);
}
}
@@ -1011,13 +1013,15 @@ public class HLog implements HConstants, Syncable {
* @param rootDir qualified root directory of the HBase instance
* @param srcDir Directory of log files to split: e.g.
* ${ROOTDIR}/log_HOST_PORT
- * @param oldLogDir
+ * @param oldLogDir directory where processed (split) logs will be archived to
* @param fs FileSystem
* @param conf HBaseConfiguration
- * @throws IOException
+ * @throws IOException will throw if corrupted hlogs aren't tolerated
+ * @return the list of splits
*/
public static List splitLog(final Path rootDir, final Path srcDir,
- Path oldLogDir, final FileSystem fs, final Configuration conf) throws IOException {
+ Path oldLogDir, final FileSystem fs, final Configuration conf)
+ throws IOException {
long millis = System.currentTimeMillis();
List splits = null;
@@ -1032,18 +1036,11 @@ public class HLog implements HConstants, Syncable {
}
LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
srcDir.toString());
- splits = splitLog(rootDir, oldLogDir, logfiles, fs, conf);
+ splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
+
try {
- FileStatus[] files = fs.listStatus(srcDir);
- for(FileStatus file : files) {
- Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
- LOG.debug("Moving " + FSUtils.getPath(file.getPath()) + " to " +
- FSUtils.getPath(newPath));
- fs.rename(file.getPath(), newPath);
- }
- LOG.debug("Moved " + files.length + " log files to " +
- FSUtils.getPath(oldLogDir));
- fs.delete(srcDir, true);
+ LOG.info("Spliting is done. Removing old log dir "+srcDir);
+ fs.delete(srcDir, false);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
IOException io = new IOException("Cannot delete: " + srcDir);
@@ -1083,201 +1080,317 @@ public class HLog implements HConstants, Syncable {
}
}
- /*
- * @param rootDir
- * @param logfiles
- * @param fs
- * @param conf
- * @throws IOException
- * @return List of splits made.
- */
- private static List splitLog(final Path rootDir,
- Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
+ private static List splitLog(final Path rootDir, final Path srcDir,
+ Path oldLogDir, final FileStatus[] logFiles, final FileSystem fs,
final Configuration conf) throws IOException {
+
+ ArrayList processedLogs = new ArrayList();
+ ArrayList corruptedLogs = new ArrayList();
final Map logWriters =
- Collections.synchronizedMap(
- new TreeMap(Bytes.BYTES_COMPARATOR));
- List splits = null;
-
- // Number of threads to use when log splitting to rewrite the logs.
- // More means faster but bigger mem consumption.
- int logWriterThreads =
- conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
-
- // Number of logs to read concurrently when log splitting.
- // More means faster but bigger mem consumption */
- int concurrentLogReads =
+ Collections.synchronizedMap(
+ new TreeMap(Bytes.BYTES_COMPARATOR));
+
+ List splits;
+
+ // Number of logs in a read batch
+ // More means faster but bigger mem consumption
+ int logFilesPerStep =
conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3);
- // Is append supported?
+ boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
+
try {
- int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
- concurrentLogReads)).intValue();
- for (int step = 0; step < maxSteps; step++) {
- final Map> logEntries =
- new TreeMap>(Bytes.BYTES_COMPARATOR);
- // Stop at logfiles.length when it's the last step
- int endIndex = step == maxSteps - 1? logfiles.length:
- step * concurrentLogReads + concurrentLogReads;
- for (int i = (step * concurrentLogReads); i < endIndex; i++) {
- // Check for possibly empty file. With appends, currently Hadoop
- // reports a zero length even if the file has been sync'd. Revisit if
- // HADOOP-4751 is committed.
- long length = logfiles[i].getLen();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
- ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
+ int i = -1;
+ while (i < logFiles.length) {
+ final Map> editsByRegion =
+ new TreeMap>(Bytes.BYTES_COMPARATOR);
+
+ for (int j = 0; j < logFilesPerStep; j++) {
+ i++;
+ if (i == logFiles.length) {
+ break;
}
- Reader in = null;
- int count = 0;
+
+ FileStatus log = logFiles[i];
+ Path logPath = log.getPath();
+ long logLength = log.getLen();
+
+ LOG.debug("Splitting hlog " + (i + 1) + " of " + logFiles.length +
+ ": " + logPath + ", length=" + logLength );
try {
- in = HLog.getReader(fs, logfiles[i].getPath(), conf);
- try {
- HLog.Entry entry;
- while ((entry = in.next()) != null) {
- byte [] regionName = entry.getKey().getRegionName();
- LinkedList queue = logEntries.get(regionName);
- if (queue == null) {
- queue = new LinkedList();
- LOG.debug("Adding queue for " + Bytes.toStringBinary(regionName));
- logEntries.put(regionName, queue);
- }
- queue.push(entry);
- count++;
- }
- LOG.debug("Pushed=" + count + " entries from " +
- logfiles[i].getPath());
- } catch (IOException e) {
- LOG.debug("IOE Pushed=" + count + " entries from " +
- logfiles[i].getPath());
- e = RemoteExceptionHandler.checkIOException(e);
- if (!(e instanceof EOFException)) {
- LOG.warn("Exception processing " + logfiles[i].getPath() +
- " -- continuing. Possible DATA LOSS!", e);
- }
- }
+
+ recoverLog(true, fs, logPath);
+ parseHLog(log, editsByRegion, fs, conf);
+ processedLogs.add(logPath);
+
} catch (IOException e) {
- if (length <= 0) {
- LOG.warn("Empty hlog, continuing: " + logfiles[i] + " count=" + count, e);
- continue;
+ if (skipErrors) {
+ LOG.warn("Got "+ e +" while parsing hlog " + logPath +
+ ". Marking as corrupted");
+ corruptedLogs.add(logPath);
+ } else {
+ throw e;
}
- throw e;
- } finally {
- try {
- if (in != null) {
- in.close();
- }
- } catch (IOException e) {
- LOG.warn("Close in finally threw exception -- continuing", e);
- }
- // Archive the input file now so we do not replay edits. We could
- // have gotten here because of an exception. If so, probably
- // nothing we can do about it. Replaying it, it could work but we
- // could be stuck replaying for ever. Just continue though we
- // could have lost some edits.
- fs.rename(logfiles[i].getPath(),
- getHLogArchivePath(oldLogDir, logfiles[i].getPath()));
- }
- }
- ExecutorService threadPool =
- Executors.newFixedThreadPool(logWriterThreads);
- for (final byte[] key : logEntries.keySet()) {
- Thread thread = new Thread(Bytes.toStringBinary(key)) {
- @Override
- public void run() {
- LinkedList entries = logEntries.get(key);
- LOG.debug("Thread got " + entries.size() + " to process");
- long threadTime = System.currentTimeMillis();
- try {
- int count = 0;
- // Items were added to the linkedlist oldest first. Pull them
- // out in that order.
- for (ListIterator i =
- entries.listIterator(entries.size());
- i.hasPrevious();) {
- HLog.Entry logEntry = i.previous();
- WriterAndPath wap = logWriters.get(key);
- if (wap == null) {
- Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
- .getTableDir(rootDir, logEntry.getKey().getTablename()),
- HRegionInfo.encodeRegionName(key)),
- HREGION_OLDLOGFILE_NAME);
- Path oldlogfile = null;
- Reader old = null;
- if (fs.exists(logfile)) {
- FileStatus stat = fs.getFileStatus(logfile);
- if (stat.getLen() <= 0) {
- LOG.warn("Old hlog file " + logfile + " is zero " +
- "length. Deleting existing file");
- fs.delete(logfile, false);
- } else {
- LOG.warn("Old hlog file " + logfile + " already " +
- "exists. Copying existing file to new file");
- oldlogfile = new Path(logfile.toString() + ".old");
- fs.rename(logfile, oldlogfile);
- old = getReader(fs, oldlogfile, conf);
- }
- }
- Writer w = createWriter(fs, logfile, conf);
- wap = new WriterAndPath(logfile, w);
- logWriters.put(key, wap);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Creating new hlog file writer for path "
- + logfile + " and region " + Bytes.toStringBinary(key));
- }
-
- if (old != null) {
- // Copy from existing log file
- HLog.Entry entry;
- for (; (entry = old.next()) != null; count++) {
- if (LOG.isDebugEnabled() && count > 0
- && count % 10000 == 0) {
- LOG.debug("Copied " + count + " edits");
- }
- w.append(entry);
- }
- old.close();
- fs.delete(oldlogfile, true);
- }
- }
- wap.w.append(logEntry);
- count++;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Applied " + count + " total edits to "
- + Bytes.toStringBinary(key) + " in "
- + (System.currentTimeMillis() - threadTime) + "ms");
- }
- } catch (IOException e) {
- e = RemoteExceptionHandler.checkIOException(e);
- LOG.warn("Got while writing region " + Bytes.toStringBinary(key)
- + " log " + e);
- e.printStackTrace();
- }
- }
- };
- threadPool.execute(thread);
- }
- threadPool.shutdown();
- // Wait for all threads to terminate
- try {
- for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); i++) {
- LOG.debug("Waiting for hlog writers to terminate, iteration #" + i);
}
- }catch(InterruptedException ex) {
- LOG.warn("Hlog writers were interrupted, possible data loss!");
}
+
+ writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
+ }
+
+ if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) {
+ throw new IOException("Discovered orphan hlog after split. Maybe HRegionServer was not dead when we started");
}
+ archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
+
} finally {
splits = new ArrayList(logWriters.size());
for (WriterAndPath wap : logWriters.values()) {
wap.w.close();
- LOG.debug("Closed " + wap.p);
splits.add(wap.p);
+ LOG.debug("Closed " + wap.p);
}
}
return splits;
}
+ private static void writeEditsBatchToRegions(
+ Map> splitLogsMap,
+ Map logWriters,
+ Path rootDir,
+ FileSystem fs,
+ Configuration conf) throws IOException {
+
+ // Number of threads to use when log splitting to rewrite the logs.
+ // More means faster but bigger mem consumption.
+ int logWriterThreads =
+ conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
+
+ boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
+ ExecutorService threadPool =
+ Executors.newFixedThreadPool(logWriterThreads);
+ for (final byte[] key : splitLogsMap.keySet()) {
+ Thread thread = createNewSplitThread(
+ rootDir, logWriters, splitLogsMap, key, fs, conf);
+ threadPool.execute(thread);
+ }
+ threadPool.shutdown();
+ // Wait for all threads to terminate
+ try {
+ for(int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) {
+ LOG.debug("Waiting for hlog writers to terminate, iteration #" + j);
+ }
+ } catch(InterruptedException ex) {
+ if (skipErrors) {
+ LOG.warn("Hlog writers were interrupted, possible data loss!");
+ } else {
+ throw new IOException("Could not finish writing log entries", ex);
+ }
+ }
+ }
+
+
+ /**
+ * Parse a single hlog and put the edits in @splitLogsMap
+ *
+ * @param logfile to split
+ * @param splitLogsMap output parameter: a map with region names as keys and a
+ * list of edits as values
+ * @param fs the filesystem
+ * @param conf the configuration
+ * @throws IOException if hlog is corrupted, or can't be open
+ */
+ private static void parseHLog(
+ FileStatus logfile,
+ final Map> splitLogsMap,
+ FileSystem fs,
+ Configuration conf) throws IOException {
+
+ // Check for possibly empty file. With appends, currently Hadoop
+ // reports a zero length even if the file has been sync'd. Revisit if
+ // HDFS-376 or HDFS-878 is committed.
+ long length = logfile.getLen();
+ Path path = logfile.getPath();
+
+ Reader in;
+ int editsCount = 0;
+
+ if (length <= 0) {
+ LOG.warn("File " + path + " might be still open, length is 0");
+ }
+ try {
+ in = HLog.getReader(fs, path, conf);
+ } catch (EOFException e) {
+ if (length <= 0) {
+ //TODO should we ignore an empty file if skip.errors is false?
+ //The caller should decide what to do. E.g. ignore if this is the last
+ //log in sequence.
+ //TODO is this scenario still possible if the log has been recovered (i.e. closed)
+ LOG.warn("Could not open " + path + " for reading. File is empty" + e);
+ return;
+ }
+ else {
+ throw e;
+ }
+ }
+
+ try {
+ Entry entry;
+ while ((entry = in.next()) != null) {
+ byte[] region = entry.getKey().getRegionName();
+ LinkedList queue = splitLogsMap.get(region);
+ if (queue == null) {
+ queue = new LinkedList();
+ splitLogsMap.put(region, queue);
+ }
+ queue.push(entry);
+ editsCount++;
+ }
+ LOG.debug("Pushed=" + editsCount + " entries from " + path);
+ } finally {
+ try {
+ if (in != null) {
+ in.close();
+ }
+ } catch (IOException e) {
+ LOG.warn("Close log reader in finally threw exception -- continuing", e);
+ }
+ }
+ }
+
+
+ private static Thread createNewSplitThread(
+ final Path rootDir,
+ final Map logWriters,
+ final Map> logEntries,
+ final byte[] key,
+ final FileSystem fs,
+ final Configuration conf
+ ) {
+
+ return new Thread(Bytes.toStringBinary(key)) {
+ @Override
+ public void run() {
+ LinkedList entries = logEntries.get(key);
+ LOG.debug("Thread " + this.getId() + " got " + entries.size() + " to process");
+ long threadTime = System.currentTimeMillis();
+ try {
+ int editsCount = 0;
+ // Items were added to the linkedlist oldest first. Pull them
+ // out in that order.
+ for (ListIterator iterator = entries.listIterator(entries.size());
+ iterator.hasPrevious();) {
+ Entry logEntry = iterator.previous();
+ WriterAndPath wap = logWriters.get(key);
+ if (wap == null) {
+ Path logFile = getRegionLogPath(logEntry, rootDir, key);
+ if (fs.exists(logFile)) {
+ LOG.warn("Deleting old hlog file: " + logFile);
+ fs.delete(logFile, false);
+ }
+ Writer w = createWriter(fs, logFile, conf);
+ wap = new WriterAndPath(logFile, w);
+ logWriters.put(key, wap);
+ LOG.debug("Creating writer path=" + logFile +
+ " region=" + Bytes.toStringBinary(key));
+ }
+ wap.w.append(logEntry);
+ wap.w.sync();
+ editsCount++;
+ }
+ LOG.debug(this.getId() + " Applied " + editsCount + " total edits to "
+ + Bytes.toStringBinary(key) + " in "
+ + (System.currentTimeMillis() - threadTime) + "ms");
+
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ LOG.warn(this.getId() + "Got while writing log entry " +
+ Bytes.toStringBinary(key) + " log " + e);
+ }
+ }
+ };
+ }
+
+
+ private static void archiveLogs(ArrayList corruptedLogs,
+ ArrayList processedLogs,
+ Path oldLogDir,
+ FileSystem fs,
+ Configuration conf) throws IOException{
+ final Path corruptDir =
+ new Path(conf.get(HBASE_DIR),
+ conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
+
+ if (!fs.exists(corruptDir)) {
+ fs.mkdirs(corruptDir);
+ }
+ if (!fs.exists(oldLogDir)) {
+ fs.mkdirs(oldLogDir);
+ }
+
+ for (Path corrupted: corruptedLogs) {
+ LOG.info("Moving corrupted log " + corrupted + " to " +
+ new Path(corruptDir, corrupted.getName()));
+ fs.rename(corrupted, new Path(corruptDir, corrupted.getName()));
+ }
+
+ for (Path p: processedLogs) {
+ Path newPath = getHLogArchivePath(oldLogDir, p);
+ fs.rename(p, newPath);
+ LOG.info("Archived processed log " + p + " to " + newPath);
+ }
+
+ }
+
+ private static Path getRegionLogPath(Entry logEntry, Path rootDir, byte[] key) {
+ return new Path(HRegion.getRegionDir(HTableDescriptor
+ .getTableDir(rootDir, logEntry.getKey().getTablename()),
+ HRegionInfo.encodeRegionName(key)),
+ HREGION_OLDLOGFILE_NAME);
+ }
+
+ /*
+ * Recover log.
+ * Try and open log in append mode.
+ * Doing this, we get a hold of the file that crashed writer
+ * was writing to. Once we have it, close it. This will
+ * allow subsequent reader to see up to last sync.
+ * @param fs
+ * @param p
+ * @param append
+ */
+ public static void recoverLog(final boolean append, final FileSystem fs, final Path p
+ ) throws IOException{
+ if (!append) {
+ return;
+ }
+ LOG.info("Recovering hlog" + p);
+ // lease recovery not needed for local file system case.
+ // currently, local file system doesn't implement append either.
+ if (!(fs instanceof DistributedFileSystem)) {
+ return;
+ }
+
+ // Trying recovery
+ boolean recovered = false;
+ while (!recovered) {
+ try {
+// HLog.Writer out = HLog.createWriter(fs, p, conf);
+ FSDataOutputStream out = fs.append(p);
+ out.close();
+ recovered = true;
+ } catch (IOException e) {
+ LOG.info("Failed open for append, got "+ e + " waiting on lease recovery: " + p);
+ try {
+ Thread.sleep(1000);
+ }catch(InterruptedException ex) {
+ // ignore it and try again
+ }
+ }
+ }
+ LOG.info("Past out lease recovery " + p);
+ }
+
+
+
/**
* Utility class that lets us keep track of the edit with it's key
* Only used when splitting logs
diff --git b/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java a/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 2e5c142..63adf46 100644
--- b/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ a/core/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -114,6 +114,18 @@ public class HBaseTestingUtility {
startMiniCluster(1);
}
+ public void startMiniDFSCluster(int servers) throws Exception {
+ this.dfsCluster = new MiniDFSCluster(12345, this.conf, servers, true,
+ true, true, null, null, null, null);
+ }
+
+ public void shutdownMiniDFSCluster() throws Exception {
+ if (this.dfsCluster != null) {
+ // The below throws an exception per dn, AsynchronousCloseException.
+ this.dfsCluster.shutdown();
+ }
+ }
+
public void startMiniZKCluster() throws Exception {
// Note that this is done before we create the MiniHBaseCluster because we
// need to edit the config to add the ZooKeeper servers.
@@ -162,7 +174,7 @@ public class HBaseTestingUtility {
// Bring up mini dfs cluster. This spews a bunch of warnings about missing
// scheme. TODO: fix.
// Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
- this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true,
+ this.dfsCluster = new MiniDFSCluster(12345, this.conf, servers, true,
true, true, null, null, null, null);
// Restore System property. minidfscluster accesses content of
// the TEST_DIRECTORY_KEY to make bad blocks, a feature we are not using,
diff --git b/core/src/test/java/org/apache/hadoop/hbase/master/TestHLogSplit.java a/core/src/test/java/org/apache/hadoop/hbase/master/TestHLogSplit.java
new file mode 100644
index 0000000..e0bea20
--- /dev/null
+++ a/core/src/test/java/org/apache/hadoop/hbase/master/TestHLogSplit.java
@@ -0,0 +1,577 @@
+package org.apache.hadoop.hbase.master;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.ipc.RemoteException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHLogSplit {
+
+ private Configuration conf;
+ private FileSystem fs;
+
+ private final static HBaseTestingUtility
+ TEST_UTIL = new HBaseTestingUtility();
+
+
+ private static final Path hbaseDir = new Path("/hbase");
+ private static final Path hlogDir = new Path(hbaseDir, "hlog");
+ private static final Path oldLogDir = new Path(hbaseDir, "hlog.old");
+ private static final Path corruptDir = new Path(hbaseDir, ".corrupt");
+
+ private static final int NUM_WRITERS = 10;
+ private static final int ENTRIES = 10; // entries per writer per region
+
+ private HLog.Writer[] writer = new HLog.Writer[NUM_WRITERS];
+ private long seq = 0;
+ private static final byte[] TABLE_NAME = "t1".getBytes();
+ private static final byte[] FAMILY = "f1".getBytes();
+ private static final byte[] QUALIFIER = "q1".getBytes();
+ private static final byte[] VALUE = "v1".getBytes();
+ private static final String HLOG_FILE_PREFIX = "hlog.dat.";
+ private static List regions;
+
+
+ static enum Corruptions {
+ INSERT_GARBAGE_ON_FIRST_LINE,
+ INSERT_GARBAGE_IN_THE_MIDDLE,
+ APPEND_GARBAGE,
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().
+ setInt("hbase.regionserver.flushlogentries", 1);
+ TEST_UTIL.getConfiguration().
+ setBoolean("dfs.support.append", true);
+ TEST_UTIL.getConfiguration().
+ setStrings("hbase.rootdir", hbaseDir.toString());
+
+ TEST_UTIL.startMiniDFSCluster(2);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniDFSCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+
+ conf = TEST_UTIL.getConfiguration();
+ fs = TEST_UTIL.getDFSCluster().getFileSystem();
+ FileStatus[] entries = fs.listStatus(new Path("/"));
+ for (FileStatus dir : entries){
+ fs.delete(dir.getPath(), true);
+ }
+ seq = 0;
+ regions = new ArrayList();
+ Collections.addAll(regions, "bbb", "ccc");
+ TEST_UTIL.getDFSCluster().getNamesystem().leaseManager.setLeasePeriod(100, 50000);
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+
+ //TODO: check the edits order is respected (scenarios)
+ //TODO: test the split of a large (lots of regions > 500 file). In my tests it seems without hflush
+ // we're losing entries while writing to regions
+
+ @Test
+ public void testEmptyLogFiles() throws IOException {
+
+ injectEmptyFile(".empty", true);
+ generateHLogs(Integer.MAX_VALUE);
+ injectEmptyFile("empty", true);
+
+ // make fs act as a different client now
+ // initialize will create a new DFSClient with a new client ID
+ fs.initialize(fs.getUri(), conf);
+
+ HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+
+ for (String region : regions) {
+ Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
+ assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
+ }
+
+ }
+
+
+ @Test
+ public void testEmptyOpenLogFiles() throws IOException {
+ injectEmptyFile(".empty", false);
+ generateHLogs(Integer.MAX_VALUE);
+ injectEmptyFile("empty", false);
+
+ fs.initialize(fs.getUri(), conf);
+ HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+
+ for (String region : regions) {
+ Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
+ assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
+ }
+
+
+ }
+
+ @Test
+ public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
+ // generate logs but leave hlog.dat.5 open.
+ generateHLogs(5);
+
+ fs.initialize(fs.getUri(), conf);
+
+ HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+ for (String region : regions) {
+ Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
+ assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
+ }
+
+
+ }
+
+
+ @Test
+ public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
+ conf.setBoolean("hbase.skip.errors", true);
+ generateHLogs(Integer.MAX_VALUE);
+ corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
+ Corruptions.APPEND_GARBAGE, true, fs);
+ fs.initialize(fs.getUri(), conf);
+ HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+ for (String region : regions) {
+ Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
+ assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
+ }
+
+
+ }
+
+ @Test
+ public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
+ conf.setBoolean("hbase.skip.errors", true);
+ generateHLogs(Integer.MAX_VALUE);
+ corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
+ Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
+ fs.initialize(fs.getUri(), conf);
+ HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+ for (String region : regions) {
+ Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
+ assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf));
+ }
+
+
+ }
+
+
+ @Test
+ public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
+ conf.setBoolean("hbase.skip.errors", true);
+ generateHLogs(Integer.MAX_VALUE);
+ corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
+ Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
+ fs.initialize(fs.getUri(), conf);
+ HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+ for (String region : regions) {
+ Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
+ // the entries in the original logs are alternating regions
+ // considering the sequence file header, the middle corruption should
+ // affect at least half of the entries
+ int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
+ int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
+ assertTrue("The file up to the corrupted area hasn't been parsed",
+ goodEntries + firstHalfEntries <= countHLog(logfile, fs, conf));
+ }
+ }
+
+ @Test
+ public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
+ conf.setBoolean("hbase.skip.errors", true);
+
+ Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0");
+ Path c2 = new Path(hlogDir, HLOG_FILE_PREFIX + "5");
+ Path c3 = new Path(hlogDir, HLOG_FILE_PREFIX + (NUM_WRITERS - 1));
+ generateHLogs(-1);
+ corruptHLog(c1, Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
+ corruptHLog(c2, Corruptions.APPEND_GARBAGE, true, fs);
+ corruptHLog(c3, Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
+
+ fs.initialize(fs.getUri(), conf);
+ HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+ FileStatus[] archivedLogs = fs.listStatus(corruptDir);
+
+ assertEquals("expected a different file", c1.getName(), archivedLogs[0].getPath().getName());
+ assertEquals("expected a different file", c2.getName(), archivedLogs[1].getPath().getName());
+ assertEquals("expected a different file", c3.getName(), archivedLogs[2].getPath().getName());
+ assertEquals(archivedLogs.length, 3);
+
+ }
+
+ @Test
+ public void testLogsGetArchivedAfterSplit() throws IOException {
+ conf.setBoolean("hbase.skip.errors", false);
+
+ generateHLogs(-1);
+
+ fs.initialize(fs.getUri(), conf);
+ HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+ FileStatus[] archivedLogs = fs.listStatus(oldLogDir);
+
+ assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
+ }
+
+
+
+ @Test(expected = IOException.class)
+ public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException {
+ conf.setBoolean("hbase.skip.errors", false);
+ generateHLogs(Integer.MAX_VALUE);
+ corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
+ Corruptions.APPEND_GARBAGE, true, fs);
+
+ fs.initialize(fs.getUri(), conf);
+ HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ }
+
+ @Test
+ public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException {
+ conf.setBoolean("hbase.skip.errors", false);
+ generateHLogs(-1);
+ corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
+ Corruptions.APPEND_GARBAGE, true, fs);
+ fs.initialize(fs.getUri(), conf);
+ try {
+ HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ } catch (IOException e) {/* expected */}
+
+ assertEquals("if skip.errors is false all files should remain in place",
+ NUM_WRITERS, fs.listStatus(hlogDir).length);
+ }
+
+
+ @Test
+ public void testSplit() throws IOException {
+ generateHLogs(-1);
+ fs.initialize(fs.getUri(), conf);
+ HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+
+
+ for (String region : regions) {
+ Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
+ assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
+
+ }
+ }
+
+ @Test(expected = FileNotFoundException.class)
+ public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit() throws IOException {
+ generateHLogs(-1);
+ fs.initialize(fs.getUri(), conf);
+ HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ fs.listStatus(hlogDir);
+
+ }
+
+ @Test
+ public void testLogCannotBeWrittenOnceParsed() throws IOException {
+ generateHLogs(9);
+ fs.initialize(fs.getUri(), conf);
+
+
+ AtomicLong counter = new AtomicLong(0);
+ (new ZombieLastLogWriterRegionServer(counter)).start();
+
+ HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+
+ Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, "juliet");
+
+ // It's possible that the writer got an error while appending and didn't count it
+ // however the entry will in fact be written to file and split with the rest
+ long numberOfEditsInRegion = countHLog(logfile, fs, conf);
+ assertTrue("The log file could have at most 1 extra log entry, but " +
+ "can't have less" + logfile, counter.get() == numberOfEditsInRegion ||
+ counter.get() + 1 == numberOfEditsInRegion);
+ }
+
+
+ @Test(expected = IOException.class)
+ public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted() throws IOException {
+ generateHLogs(-1);
+ fs.initialize(fs.getUri(), conf);
+ (new ZombieNewLogWriterRegionServer()).start();
+ HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ }
+
+ @Test
+ public void testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted() throws IOException {
+ generateHLogs(-1);
+ fs.initialize(fs.getUri(), conf);
+ (new ZombieNewLogWriterRegionServer()).start();
+ try {
+ HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
+ } catch (IOException ex) {/* expected */}
+ int logFilesNumber = fs.listStatus(hlogDir).length;
+
+ assertEquals("Log files should not be archived if there's an extra file after split",
+ NUM_WRITERS + 1, logFilesNumber);
+ return;
+
+ }
+
+ /**
+ * This thread will keep writing to the file after the split process has started
+ * It simulates a region server that was considered dead but woke up and wrote
+ * some more to he last log entry
+ */
+ class ZombieLastLogWriterRegionServer extends Thread {
+ AtomicLong editsCount;
+ Path log;
+ HLog.Writer lastLogWriter = writer[NUM_WRITERS - 1];
+ public ZombieLastLogWriterRegionServer(AtomicLong counter) {
+ this.editsCount = counter;
+ }
+
+ @Override
+ public void run() {
+ flushToConsole("starting");
+ while (true) {
+ try {
+ appendEntry(lastLogWriter, TABLE_NAME, "juliet".getBytes(),
+ ("r" + editsCount).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
+ lastLogWriter.sync();
+ editsCount.incrementAndGet();
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ //
+ }
+
+
+ } catch (IOException ex) {
+ if (ex instanceof RemoteException) {
+ flushToConsole("Juliet: got RemoteException " +
+ ex.getMessage() + " while writing " + (editsCount.get() + 1));
+ break;
+ } else {
+ assertTrue("Failed to write " + editsCount.get(), false);
+ }
+
+ }
+ }
+
+
+ }
+ }
+
+ /**
+ * This thread will keep adding new log files
+ * It simulates a region server that was considered dead but woke up and wrote
+ * some more to a new hlog
+ */
+ class ZombieNewLogWriterRegionServer extends Thread {
+
+
+ @Override
+ public void run() {
+
+ boolean splitStarted = false;
+ while (!splitStarted) {
+ try {
+ splitStarted = fs.listStatus(new Path(hbaseDir, new String(TABLE_NAME))).length > 0;
+ } catch (FileNotFoundException e) {
+ try {
+ flushToConsole("Juliet: split not started, sleeping a bit...");
+ Thread.sleep(100);
+ } catch (InterruptedException e1) {
+ //
+ }
+ } catch (IOException e1) {
+ assertTrue("Failed to list status ", false);
+ }
+ }
+
+ Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet");
+ try {
+ HLog.Writer writer = HLog.createWriter(fs,
+ julietLog, conf);
+ appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(),
+ ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
+ writer.close();
+ flushToConsole("Juliet file creator: created file " + julietLog);
+ } catch (IOException e1) {
+ assertTrue("Failed to create file " + julietLog, false);
+ }
+
+ }
+
+ }
+
+ private void flushToConsole(String s) {
+ System.out.println(s);
+ System.out.flush();
+ }
+
+
+ private void generateHLogs(int leaveOpen) throws IOException {
+ for (int i = 0; i < NUM_WRITERS; i++) {
+ writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf);
+ for (int j = 0; j < ENTRIES; j++) {
+ int prefix = 0;
+ for (String region : regions) {
+ String row_key = region + prefix++ + i + j;
+ appendEntry(writer[i], TABLE_NAME, region.getBytes(),
+ row_key.getBytes(), FAMILY, QUALIFIER, VALUE, seq);
+ }
+ }
+ if (i != leaveOpen) {
+ writer[i].close();
+ flushToConsole("Closing writer " + i);
+ }
+ }
+ }
+
+ private Path getLogForRegion(Path rootdir, byte[] table, String region) {
+ return new Path(HRegion.getRegionDir(HTableDescriptor
+ .getTableDir(rootdir, table),
+ HRegionInfo.encodeRegionName(region.getBytes())),
+ "oldlogfile.log");
+ }
+
+ private void corruptHLog(Path path, Corruptions corruption, boolean close,
+ FileSystem fs) throws IOException {
+
+ FSDataOutputStream out;
+ int fileSize = (int) fs.listStatus(path)[0].getLen();
+
+ FSDataInputStream in = fs.open(path);
+ byte[] corrupted_bytes = new byte[fileSize];
+ in.readFully(0, corrupted_bytes, 0, fileSize);
+ in.close();
+
+ switch (corruption) {
+ case APPEND_GARBAGE:
+ out = fs.append(path);
+ out.write("-----".getBytes());
+ closeOrFlush(close, out);
+ break;
+
+ case INSERT_GARBAGE_ON_FIRST_LINE:
+ fs.delete(path, false);
+ out = fs.create(path);
+ out.write(0);
+ out.write(corrupted_bytes);
+ closeOrFlush(close, out);
+ break;
+
+ case INSERT_GARBAGE_IN_THE_MIDDLE:
+ fs.delete(path, false);
+ out = fs.create(path);
+ int middle = (int) Math.floor(corrupted_bytes.length / 2);
+ out.write(corrupted_bytes, 0, middle);
+ out.write(0);
+ out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
+ closeOrFlush(close, out);
+ break;
+ }
+
+
+ }
+
+ private void closeOrFlush(boolean close, FSDataOutputStream out) throws IOException {
+ if (close) {
+ out.close();
+ } else {
+ out.sync();
+ out.hflush();
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private void dumpHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
+ HLog.Entry entry;
+ HLog.Reader in = HLog.getReader(fs, log, conf);
+ while ((entry = in.next()) != null) {
+ System.out.println(entry);
+ }
+ }
+
+ private int countHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
+ int count = 0;
+ HLog.Reader in = HLog.getReader(fs, log, conf);
+ while (in.next() != null) {
+ count++;
+ }
+ return count;
+ }
+
+
+ public long appendEntry(HLog.Writer writer, byte[] table, byte[] region,
+ byte[] row, byte[] family, byte[] qualifier,
+ byte[] value, long seq)
+ throws IOException {
+
+ long time = System.nanoTime();
+ WALEdit edit = new WALEdit();
+ seq++;
+ edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
+ writer.append(new HLog.Entry(new HLogKey(region, table, seq, time), edit));
+ writer.sync();
+ return seq;
+
+ }
+
+ private void injectEmptyFile(String suffix, boolean closeFile)
+ throws IOException {
+ HLog.Writer writer = HLog.createWriter(
+ fs, new Path(hlogDir, HLOG_FILE_PREFIX + suffix), conf);
+ if (closeFile) writer.close();
+ }
+
+ @SuppressWarnings("unused")
+ private void listLogs(FileSystem fs, Path dir) throws IOException {
+ for (FileStatus file : fs.listStatus(dir)) {
+ System.out.println(file.getPath());
+ }
+
+ }
+
+}