Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (revision 1358048) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (working copy) @@ -108,9 +108,7 @@ // interrupted or has encountered a transient error and when it has // encountered a bad non-retry-able persistent error. try { - String tmpname = - ZKSplitLog.getSplitLogDirTmpComponent(serverName.toString(), filename); - if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname, + if (HLogSplitter.splitLogFileToTemp(rootdir, null, fs.getFileStatus(new Path(filename)), fs, conf, p) == false) { return Status.PREEMPTED; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (revision 1358048) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (working copy) @@ -139,10 +139,8 @@ this(zkw, conf, stopper, serverName, new TaskFinisher() { @Override public Status finish(ServerName workerName, String logfile) { - String tmpname = - ZKSplitLog.getSplitLogDirTmpComponent(workerName.toString(), logfile); try { - HLogSplitter.moveRecoveredEditsFromTemp(tmpname, logfile, conf); + HLogSplitter.archiveProcessedLogs(logfile, conf); } catch (IOException e) { LOG.warn("Could not finish splitting of log file " + logfile); return Status.ERR; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (revision 1358048) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (working copy) @@ -116,9 +116,9 @@ return worker + "_" + ZKSplitLog.encode(file); } - public static void markCorrupted(Path rootdir, String tmpname, + public static void markCorrupted(Path rootdir, String logFileName, FileSystem fs) { - Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt"); + Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt"); try { fs.createNewFile(file); } catch (IOException e) { @@ -127,9 +127,9 @@ } } - public static boolean isCorrupted(Path rootdir, String tmpname, + public static boolean isCorrupted(Path rootdir, String logFileName, FileSystem fs) throws IOException { - Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt"); + Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt"); boolean isCorrupt; isCorrupt = fs.exists(file); return isCorrupt; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (revision 1358048) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (working copy) @@ -354,7 +354,7 @@ * produced. *
* @param rootDir
- * @param tmpname
+ * @param tmpname null if split log file to region's dir
* @param logfile
* @param fs
* @param conf
@@ -389,7 +389,7 @@
in = getReader(fs, logfile, conf, skipErrors);
} catch (CorruptedLogFileException e) {
LOG.warn("Could not get reader, corrupted log file " + logPath, e);
- ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+ ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
isCorrupted = true;
}
if (in == null) {
@@ -431,7 +431,7 @@
throw iie;
} catch (CorruptedLogFileException e) {
LOG.warn("Could not parse, corrupted log file " + logPath, e);
- ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
+ ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
isCorrupted = true;
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
@@ -538,7 +538,27 @@
return fset;
}
+ public static void archiveProcessedLogs(String logfile, Configuration conf)
+ throws IOException {
+ Path rootdir = FSUtils.getRootDir(conf);
+ Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
+ List