Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (revision 1358048) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (working copy) @@ -108,9 +108,7 @@ // interrupted or has encountered a transient error and when it has // encountered a bad non-retry-able persistent error. try { - String tmpname = - ZKSplitLog.getSplitLogDirTmpComponent(serverName.toString(), filename); - if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname, + if (HLogSplitter.splitLogFileToTemp(rootdir, null, fs.getFileStatus(new Path(filename)), fs, conf, p) == false) { return Status.PREEMPTED; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (revision 1358048) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (working copy) @@ -139,10 +139,8 @@ this(zkw, conf, stopper, serverName, new TaskFinisher() { @Override public Status finish(ServerName workerName, String logfile) { - String tmpname = - ZKSplitLog.getSplitLogDirTmpComponent(workerName.toString(), logfile); try { - HLogSplitter.moveRecoveredEditsFromTemp(tmpname, logfile, conf); + HLogSplitter.archiveProcessedLogs(logfile, conf); } catch (IOException e) { LOG.warn("Could not finish splitting of log file " + logfile); return Status.ERR; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (revision 1358048) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (working copy) @@ -116,9 +116,9 @@ return worker + "_" + ZKSplitLog.encode(file); } - public static void markCorrupted(Path rootdir, String tmpname, + public static void markCorrupted(Path rootdir, String logFileName, FileSystem fs) { - Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt"); + Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt"); try { fs.createNewFile(file); } catch (IOException e) { @@ -127,9 +127,9 @@ } } - public static boolean isCorrupted(Path rootdir, String tmpname, + public static boolean isCorrupted(Path rootdir, String logFileName, FileSystem fs) throws IOException { - Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt"); + Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt"); boolean isCorrupt; isCorrupt = fs.exists(file); return isCorrupt; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (revision 1358048) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (working copy) @@ -354,7 +354,7 @@ * produced. *

* @param rootDir - * @param tmpname + * @param tmpname null if split log file to region's dir * @param logfile * @param fs * @param conf @@ -389,7 +389,7 @@ in = getReader(fs, logfile, conf, skipErrors); } catch (CorruptedLogFileException e) { LOG.warn("Could not get reader, corrupted log file " + logPath, e); - ZKSplitLog.markCorrupted(rootDir, tmpname, fs); + ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs); isCorrupted = true; } if (in == null) { @@ -431,7 +431,7 @@ throw iie; } catch (CorruptedLogFileException e) { LOG.warn("Could not parse, corrupted log file " + logPath, e); - ZKSplitLog.markCorrupted(rootDir, tmpname, fs); + ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs); isCorrupted = true; } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); @@ -538,7 +538,27 @@ return fset; } + public static void archiveProcessedLogs(String logfile, Configuration conf) + throws IOException { + Path rootdir = FSUtils.getRootDir(conf); + Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME); + List processedLogs = new ArrayList(); + List corruptedLogs = new ArrayList(); + FileSystem fs; + fs = rootdir.getFileSystem(conf); + Path logPath = new Path(logfile); + if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) { + corruptedLogs.add(logPath); + } else { + processedLogs.add(logPath); + } + archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf); + Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName()); + fs.delete(stagingDir, true); + return; + } + /** * Moves processed logs to a oldLogDir after successful processing Moves * corrupted logs (any log that couldn't be successfully parsed to corruptDir