Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (revision 1206043) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (working copy) @@ -43,10 +43,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; @@ -378,10 +378,32 @@ if (!fs.exists(dir)) { if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir); } - return new Path(dir, formatRecoveredEditsFileName(logEntry.getKey() - .getLogSeqNum())); + // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure + // region's replayRecoveredEdits will not delete it + String fileName = formatRecoveredEditsFileName(logEntry.getKey() + .getLogSeqNum()); + fileName = getTmpRecoveredEditsFileName(fileName); + return new Path(dir, fileName); } + static String getTmpRecoveredEditsFileName(String fileName) { + return fileName + HLog.RECOVERED_LOG_TMPFILE_SUFFIX; + } + + /** + * Convert path to a file under RECOVERED_EDITS_DIR directory without + * RECOVERED_LOG_TMPFILE_SUFFIX + * @param srcPath + * @return dstPath without RECOVERED_LOG_TMPFILE_SUFFIX + */ + static Path getCompletedRecoveredEditsFilePath(Path srcPath) { + String fileName = srcPath.getName(); + if (fileName.endsWith(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)) { + fileName = fileName.split(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)[0]; + } + return new Path(srcPath.getParent(), fileName); + } + static String formatRecoveredEditsFileName(final long seqid) { return String.format("%019d", seqid); } @@ -771,9 +793,33 @@ thrown.add(ioe); continue; } - paths.add(wap.p); LOG.info("Closed path " + wap.p +" (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent / 1000/ 1000) + "ms)"); + Path dst = getCompletedRecoveredEditsFilePath(wap.p); + try { + if (!dst.equals(wap.p) && fs.exists(dst)) { + LOG.warn("Found existing old edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + dst + + ", length=" + fs.getFileStatus(dst).getLen()); + if (!fs.delete(dst, false)) { + LOG.warn("Failed deleting of old " + dst); + throw new IOException("Failed deleting of old " + dst); + } + } + // Skip the unit tests which create a splitter that reads and writes + // the data without touching disk. TestHLogSplit#testThreading is an + // example. + if (fs.exists(wap.p)) { + if (!fs.rename(wap.p, dst)) { + throw new IOException("Failed renaming " + wap.p + " to " + dst); + } + } + } catch (IOException ioe) { + LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); + thrown.add(ioe); + continue; + } + paths.add(dst); } if (!thrown.isEmpty()) { throw MultipleIOException.createIOException(thrown); Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (revision 1206043) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (working copy) @@ -19,7 +19,11 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.FileNotFoundException; import java.io.IOException; @@ -27,6 +31,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -39,14 +44,14 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; -import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; @@ -698,12 +703,64 @@ fail("There shouldn't be any exception but: " + e.toString()); } } - + /** - * Test log split process with fake data and lots of edits to trigger threading - * issues. + * @throws IOException + * @see https://issues.apache.org/jira/browse/HBASE-4862 */ @Test + public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException { + LOG.info("testConcurrentSplitLogAndReplayRecoverEdit"); + // Generate hlogs for our destination region + String regionName = "r0"; + final Path regiondir = new Path(tabledir, regionName); + regions = new ArrayList(); + regions.add(regionName); + generateHLogs(-1); + + HLogSplitter logSplitter = new HLogSplitter(conf, hbaseDir, hlogDir, + oldLogDir, fs) { + protected HLog.Writer createWriter(FileSystem fs, Path logfile, + Configuration conf) throws IOException { + HLog.Writer writer = HLog.createWriter(fs, logfile, conf); + // After creating writer, simulate region's replayRecoveredEditsIfAny() + // which gets SplitEditFiles of this region and delete them, excluding + // files with '.temp' suffix. + NavigableSet files = HLog.getSplitEditFilesSorted(this.fs, + regiondir); + if (files != null && !files.isEmpty()) { + for (Path file : files) { + if (!this.fs.delete(file, false)) { + LOG.error("Failed delete of " + file); + } else { + LOG.debug("Deleted recovered.edits file=" + file); + } + } + } + return writer; + } + }; + try { + logSplitter.splitLog(); + } catch (IOException e) { + LOG.info(e); + Assert.fail("Throws IOException when spliting " + + "log, it is most likely because writing file does not " + + "exist which is caused by concurrent replayRecoveredEditsIfAny()"); + } + if (fs.exists(corruptDir)) { + if (fs.listStatus(corruptDir).length > 0) { + Assert.fail("There are some corrupt logs, " + + "it is most likely caused by concurrent replayRecoveredEditsIfAny()"); + } + } + } + + /** + * Test log split process with fake data and lots of edits to trigger + * threading issues. + */ + @Test public void testThreading() throws Exception { doTestThreading(20000, 128*1024*1024, 0); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1206043) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -117,6 +117,7 @@ private static final String RECOVERED_EDITS_DIR = "recovered.edits"; private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); + static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; private final FileSystem fs; private final Path dir; @@ -1446,7 +1447,8 @@ } /** - * Returns sorted set of edit files made by wal-log splitter. + * Returns sorted set of edit files made by wal-log splitter, excluding files + * with '.temp' suffix. * @param fs * @param regiondir * @return Files in passed regiondir as a sorted set. @@ -1470,6 +1472,11 @@ // it a timestamp suffix. See moveAsideBadEditsFile. Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); result = fs.isFile(p) && m.matches(); + // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, + // because it means splithlog thread is writting this file. + if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { + result = false; + } } catch (IOException e) { LOG.warn("Failed isFile check on " + p); }