diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 40205c4..ce15554 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -1868,7 +1869,9 @@ public class HRegion implements HeapSize { // , Writable{ protected long replayRecoveredEditsIfAny(final Path regiondir, final long minSeqId, final Progressable reporter) throws UnsupportedEncodingException, IOException { - Path edits = new Path(regiondir, HLog.RECOVERED_EDITS); + NavigableSet files = getFilesSorted(this.fs, regiondir); + if (files == null || files.isEmpty()) return -1; + for (Path file: files) { if (edits == null || !this.fs.exists(edits)) return -1; if (isZeroLengthThenDelete(this.fs, edits)) return -1; long maxSeqIdInLog = -1; @@ -1892,10 +1895,42 @@ public class HRegion implements HeapSize { // , Writable{ throw e; } } + } return maxSeqIdInLog; } /* + * @param fs + * @param regiondir + * @return Files in passed regiondir as a sorted set. + * @throws IOException + */ + static NavigableSet getFilesSorted(final FileSystem fs, + final Path regiondir) + throws IOException { + // TODO: Add a test for this method. + Path editsdir = new Path(regiondir, HLog.RECOVERED_EDITS); + FileStatus [] files = fs.listStatus(editsdir, new PathFilter () { + @Override + public boolean accept(Path p) { + boolean result = false; + try { + result = fs.isFile(p); + } catch (IOException e) { + LOG.warn("Failed isFile check on " + p); + } + return result; + } + }); + NavigableSet filesSorted = new TreeSet(); + if (files == null) return filesSorted; + for (FileStatus status: files) { + filesSorted.add(status.getPath()); + } + return filesSorted; + } + + /* * @param edits File of recovered edits. * @param minSeqId Minimum sequenceid found in a store file. Edits in log * must be larger than this to be replayed. diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 7044891..08054cc 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -36,7 +36,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.NavigableMap; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; @@ -119,7 +118,6 @@ import com.google.common.util.concurrent.NamingThreadFactory; */ public class HLog implements Syncable { static final Log LOG = LogFactory.getLog(HLog.class); - private static final String HLOG_DATFILE = "hlog.dat."; public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY"); static final byte [] METAROW = Bytes.toBytes("METAROW"); private final FileSystem fs; @@ -144,8 +142,9 @@ public class HLog implements Syncable { private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas final static Object [] NO_ARGS = new Object []{}; - /** Name of file that holds recovered edits written by the wal log splitting - * code, one per region + /** + * Name of directory that holds recovered edits written by the wal log + * splitting code, one per region */ public static final String RECOVERED_EDITS = "recovered.edits"; @@ -1459,7 +1458,7 @@ public class HLog implements Syncable { NamingThreadFactory f = new NamingThreadFactory( "SplitWriter-%1$d", Executors.defaultThreadFactory()); ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, f); - for (final byte[] region : splitLogsMap.keySet()) { + for (final byte [] region : splitLogsMap.keySet()) { Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf); writeFutureResult.put(region, threadPool.submit(splitter)); } @@ -1581,11 +1580,12 @@ public class HLog implements Syncable { if (wap == null) { Path logFile = getRegionLogPath(logEntry, rootDir); if (fs.exists(logFile)) { - LOG.warn("Found existing old hlog file. It could be the result of a previous" + - "failed split attempt. Deleting " + logFile + - ", length=" + fs.getFileStatus(logFile).getLen()); + LOG.warn("Found existing old edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + + logFile + ", length=" + fs.getFileStatus(logFile).getLen()); fs.delete(logFile, false); } + WHAT TO DO ABOUT CREATING RECOVERED_EDITS DIR.. .WHEN? Writer w = createWriter(fs, logFile, conf); wap = new WriterAndPath(logFile, w); logWriters.put(region, wap); @@ -1643,12 +1643,20 @@ public class HLog implements Syncable { } } - private static Path getRegionLogPath(Entry logEntry, Path rootDir) { - Path tableDir = - HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename()); - Path regionDir = - HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName())); - return new Path(regionDir, RECOVERED_EDITS); + /* + * @param logEntry + * @param rootDir HBase root dir. + * @return A file in the RECOVERED_EDITS directory of the logEntry + * region named for the sequenceid in the passed logEntry. + */ + private static Path getRegionLogPath(final Entry logEntry, + final Path rootDir) { + Path tableDir = HTableDescriptor.getTableDir(rootDir, + logEntry.getKey().getTablename()); + Path regionDir = HRegion.getRegionDir(tableDir, + HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName())); + Path dir = new Path(regionDir, RECOVERED_EDITS); + return new Path(dir, Long.toString(logEntry.getKey().getLogSeqNum())); } /**