Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (revision 1358897) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (working copy) @@ -1016,10 +1016,9 @@ generateHLogs(1, 10, -1); FileStatus logfile = fs.listStatus(hlogDir)[0]; fs.initialize(fs.getUri(), conf); - HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, - conf, reporter); - HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir, - logfile.getPath().toString(), conf); + HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter); + HLogSplitter.moveSplitLogFile(hbaseDir, oldLogDir, logfile.getPath() + .toString(), conf); Path originalLog = (fs.listStatus(oldLogDir))[0].getPath(); @@ -1046,10 +1045,9 @@ LOG.info("Region directory is" + regiondir); fs.delete(regiondir, true); - HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, - conf, reporter); - HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir, - logfile.getPath().toString(), conf); + HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter); + HLogSplitter.moveSplitLogFile(hbaseDir, oldLogDir, logfile.getPath() + .toString(), conf); assertTrue(!fs.exists(regiondir)); assertTrue(true); @@ -1065,10 +1063,9 @@ fs.initialize(fs.getUri(), conf); - HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, - conf, reporter); - HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir, - logfile.getPath().toString(), conf); + HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter); + HLogSplitter.moveSplitLogFile(hbaseDir, oldLogDir, logfile.getPath() + .toString(), conf); Path tdir = HTableDescriptor.getTableDir(hbaseDir, TABLE_NAME); assertFalse(fs.exists(tdir)); @@ -1082,10 +1079,9 @@ FileStatus logfile = fs.listStatus(hlogDir)[0]; fs.initialize(fs.getUri(), conf); - HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, - conf, reporter); - HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir, - logfile.getPath().toString(), conf); + HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter); + HLogSplitter.moveSplitLogFile(hbaseDir, oldLogDir, logfile.getPath() + .toString(), conf); for (String region : regions) { Path recovered = getLogForRegion(hbaseDir, TABLE_NAME, region); assertEquals(10, countHLog(recovered, fs, conf)); @@ -1103,10 +1099,9 @@ Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs); fs.initialize(fs.getUri(), conf); - HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, - conf, reporter); - HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir, - logfile.getPath().toString(), conf); + HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter); + HLogSplitter.moveSplitLogFile(hbaseDir, oldLogDir, logfile.getPath() + .toString(), conf); final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get( "hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt")); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (revision 1358897) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (working copy) @@ -660,10 +660,10 @@ wal.completeCacheFlush(hri.getEncodedNameAsBytes(), hri.getTableName(), sequenceNumber, false); wal.close(); FileStatus[] listStatus = this.fs.listStatus(wal.getDir()); - HLogSplitter.splitLogFileToTemp(hbaseRootDir, hbaseRootDir + "/temp", listStatus[0], this.fs, - this.conf, null); - FileStatus[] listStatus1 = this.fs.listStatus(new Path(hbaseRootDir + "/temp/" + tableNameStr - + "/" + hri.getEncodedName() + "/recovered.edits")); + HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, + null); + FileStatus[] listStatus1 = this.fs.listStatus(new Path(hbaseRootDir + "/" + + tableNameStr + "/" + hri.getEncodedName() + "/recovered.edits")); int editCount = 0; for (FileStatus fileStatus : listStatus1) { editCount = Integer.parseInt(fileStatus.getPath().getName()); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (revision 1358897) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (working copy) @@ -108,9 +108,7 @@ // interrupted or has encountered a transient error and when it has // encountered a bad non-retry-able persistent error. try { - String tmpname = - ZKSplitLog.getSplitLogDirTmpComponent(serverName.toString(), filename); - if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname, + if (HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(filename)), fs, conf, p) == false) { return Status.PREEMPTED; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (revision 1358897) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (working copy) @@ -139,10 +139,8 @@ this(zkw, conf, stopper, serverName, new TaskFinisher() { @Override public Status finish(ServerName workerName, String logfile) { - String tmpname = - ZKSplitLog.getSplitLogDirTmpComponent(workerName.toString(), logfile); try { - HLogSplitter.moveRecoveredEditsFromTemp(tmpname, logfile, conf); + HLogSplitter.moveSplitLogFile(logfile, conf); } catch (IOException e) { LOG.warn("Could not finish splitting of log file " + logfile); return Status.ERR; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (revision 1358897) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (working copy) @@ -98,27 +98,14 @@ return new Path(new Path(rootdir, HConstants.SPLIT_LOGDIR_NAME), tmpname); } - public static Path stripSplitLogTempDir(Path rootdir, Path file) { - int skipDepth = rootdir.depth() + 2; - List components = new ArrayList(10); - do { - components.add(file.getName()); - file = file.getParent(); - } while (file.depth() > skipDepth); - Path ret = rootdir; - for (int i = components.size() - 1; i >= 0; i--) { - ret = new Path(ret, components.get(i)); - } - return ret; - } public static String getSplitLogDirTmpComponent(final String worker, String file) { return worker + "_" + ZKSplitLog.encode(file); } - public static void markCorrupted(Path rootdir, String tmpname, + public static void markCorrupted(Path rootdir, String logFileName, FileSystem fs) { - Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt"); + Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt"); try { fs.createNewFile(file); } catch (IOException e) { @@ -127,15 +114,12 @@ } } - public static boolean isCorrupted(Path rootdir, String tmpname, + public static boolean isCorrupted(Path rootdir, String logFileName, FileSystem fs) throws IOException { - Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt"); + Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt"); boolean isCorrupt; isCorrupt = fs.exists(file); return isCorrupt; } - public static boolean isCorruptFlagFile(Path file) { - return file.getName().equals("corrupt"); - } } \ No newline at end of file Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (revision 1358897) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (working copy) @@ -346,15 +346,12 @@ } /** - * Splits a HLog file into a temporary staging area. tmpname is used to build - * the name of the staging area where the recovered-edits will be separated - * out by region and stored. + * Splits a HLog file into region's recovered-edits directory *

* If the log file has N regions then N recovered.edits files will be * produced. *

* @param rootDir - * @param tmpname * @param logfile * @param fs * @param conf @@ -362,16 +359,15 @@ * @return false if it is interrupted by the progress-able. * @throws IOException */ - static public boolean splitLogFileToTemp(Path rootDir, String tmpname, - FileStatus logfile, FileSystem fs, - Configuration conf, CancelableProgressable reporter) throws IOException { + static public boolean splitLogFile(Path rootDir, FileStatus logfile, + FileSystem fs, Configuration conf, CancelableProgressable reporter) + throws IOException { HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */, fs); - return s.splitLogFileToTemp(logfile, tmpname, reporter); + return s.splitLogFile(logfile, reporter); } - public boolean splitLogFileToTemp(FileStatus logfile, String tmpname, - CancelableProgressable reporter) - throws IOException { + public boolean splitLogFile(FileStatus logfile, + CancelableProgressable reporter) throws IOException { boolean isCorrupted = false; Preconditions.checkState(status == null); status = TaskMonitor.get().createStatus( @@ -389,7 +385,7 @@ in = getReader(fs, logfile, conf, skipErrors); } catch (CorruptedLogFileException e) { LOG.warn("Could not get reader, corrupted log file " + logPath, e); - ZKSplitLog.markCorrupted(rootDir, tmpname, fs); + ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs); isCorrupted = true; } if (in == null) { @@ -397,8 +393,7 @@ LOG.warn("Nothing to split in log file " + logPath); return true; } - this.setDistributedLogSplittingHelper(new DistributedLogSplittingHelper( - reporter, tmpname)); + this.setDistributedLogSplittingHelper(new DistributedLogSplittingHelper(reporter)); if (!reportProgressIfIsDistributedLogSplitting()) { return false; } @@ -431,7 +426,7 @@ throw iie; } catch (CorruptedLogFileException e) { LOG.warn("Could not parse, corrupted log file " + logPath, e); - ZKSplitLog.markCorrupted(rootDir, tmpname, fs); + ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs); isCorrupted = true; } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); @@ -451,94 +446,40 @@ } /** - * Completes the work done by splitLogFileToTemp by moving the - * recovered.edits from the staging area to the respective region server's - * directories. + * Completes the work done by splitLogFile by archiving logs *

* It is invoked by SplitLogManager once it knows that one of the - * SplitLogWorkers have completed the splitLogFileToTemp() part. If the - * master crashes then this function might get called multiple times. + * SplitLogWorkers have completed the splitLogFile() part. If the master + * crashes then this function might get called multiple times. *

- * @param tmpname + * @param logfile * @param conf * @throws IOException */ - public static void moveRecoveredEditsFromTemp(String tmpname, - String logfile, Configuration conf) - throws IOException{ + public static void moveSplitLogFile(String logfile, Configuration conf) + throws IOException { Path rootdir = FSUtils.getRootDir(conf); Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME); - moveRecoveredEditsFromTemp(tmpname, rootdir, oldLogDir, logfile, conf); + moveSplitLogFile(rootdir, oldLogDir, logfile, conf); } - public static void moveRecoveredEditsFromTemp(String tmpname, - Path rootdir, Path oldLogDir, - String logfile, Configuration conf) - throws IOException{ + public static void moveSplitLogFile(Path rootdir, Path oldLogDir, + String logfile, Configuration conf) throws IOException { List processedLogs = new ArrayList(); List corruptedLogs = new ArrayList(); FileSystem fs; fs = rootdir.getFileSystem(conf); Path logPath = new Path(logfile); - if (ZKSplitLog.isCorrupted(rootdir, tmpname, fs)) { + if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) { corruptedLogs.add(logPath); } else { processedLogs.add(logPath); } - Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, tmpname); - List files = listAll(fs, stagingDir); - for (FileStatus f : files) { - Path src = f.getPath(); - Path dst = ZKSplitLog.stripSplitLogTempDir(rootdir, src); - if (ZKSplitLog.isCorruptFlagFile(dst)) { - continue; - } - if (fs.exists(src)) { - if (fs.exists(dst)) { - fs.delete(dst, false); - } else { - Path regionDir = dst.getParent().getParent(); - if (!fs.exists(regionDir)) { - // See HBASE-6050. - LOG.warn("Could not move recovered edits from " + src + - " to destination " + regionDir + " as it doesn't exist."); - continue; - } - Path dstdir = dst.getParent(); - if (!fs.exists(dstdir)) { - if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir); - } - } - fs.rename(src, dst); - LOG.debug(" moved " + src + " => " + dst); - } else { - LOG.debug("Could not move recovered edits from " + src + - " as it doesn't exist"); - } - } - archiveLogs(null, corruptedLogs, processedLogs, - oldLogDir, fs, conf); + archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf); + Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName()); fs.delete(stagingDir, true); - return; } - private static List listAll(FileSystem fs, Path dir) - throws IOException { - List fset = new ArrayList(100); - FileStatus [] files = fs.exists(dir)? fs.listStatus(dir): null; - if (files != null) { - for (FileStatus f : files) { - if (f.isDir()) { - fset.addAll(listAll(fs, f.getPath())); - } else { - fset.add(f); - } - } - } - return fset; - } - - /** * Moves processed logs to a oldLogDir after successful processing Moves * corrupted logs (any log that couldn't be successfully parsed to corruptDir @@ -1027,14 +968,14 @@ } } - private WriterAndPath createWAP(byte[] region, Entry entry, - Path rootdir, String tmpname, FileSystem fs, Configuration conf) + private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir, + FileSystem fs, Configuration conf) throws IOException { - Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, tmpname==null); + Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true); if (regionedits == null) { return null; } - if ((tmpname == null) && fs.exists(regionedits)) { + if (fs.exists(regionedits)) { LOG.warn("Found existing old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" @@ -1043,18 +984,10 @@ LOG.warn("Failed delete of old " + regionedits); } } - Path editsfile; - if (tmpname != null) { - // During distributed log splitting the output by each - // SplitLogWorker is written to a temporary area. - editsfile = convertRegionEditsToTemp(rootdir, regionedits, tmpname); - } else { - editsfile = regionedits; - } - Writer w = createWriter(fs, editsfile, conf); - LOG.debug("Creating writer path=" + editsfile + " region=" + Writer w = createWriter(fs, regionedits, conf); + LOG.debug("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region)); - return (new WriterAndPath(editsfile, w)); + return (new WriterAndPath(regionedits, w)); } Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) { @@ -1110,12 +1043,9 @@ // How often to send a progress report (default 1/2 master timeout) private final int report_period; private long last_report_at = 0; - private final String tmpDirName; - public DistributedLogSplittingHelper(CancelableProgressable reporter, - String tmpName) { + public DistributedLogSplittingHelper(CancelableProgressable reporter) { this.splitReporter = reporter; - this.tmpDirName = tmpName; report_period = conf.getInt("hbase.splitlog.report.period", conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 2); @@ -1139,10 +1069,6 @@ return true; } } - - String getTmpDirName() { - return this.tmpDirName; - } } /** @@ -1380,9 +1306,7 @@ if (blacklistedRegions.contains(region)) { return null; } - String tmpName = distributedLogSplittingHelper == null ? null - : distributedLogSplittingHelper.getTmpDirName(); - ret = createWAP(region, entry, rootDir, tmpName, fs, conf); + ret = createWAP(region, entry, rootDir, fs, conf); if (ret == null) { blacklistedRegions.add(region); return null;