Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (revision 1344525) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (working copy) @@ -397,6 +397,9 @@ List hris, String tname, int num_edits, int edit_size) throws IOException { + // remove root and meta region + hris.remove(HRegionInfo.ROOT_REGIONINFO); + hris.remove(HRegionInfo.FIRST_META_REGIONINFO); byte[] table = Bytes.toBytes(tname); HTableDescriptor htd = new HTableDescriptor(tname); byte[] value = new byte[edit_size]; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (revision 1344525) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (working copy) @@ -33,6 +33,14 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; @@ -58,6 +66,7 @@ import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.io.MultipleIOException; @@ -106,10 +115,13 @@ // Wait/notify for when data has been produced by the reader thread, // consumed by the reader thread, or an exception occurred Object dataAvailable = new Object(); - + private MonitoredTask status; + // Used in distributed log splitting + private DistributedLogSplittingHelper distributedLogSplittingHelper = null; + /** * Create a new HLogSplitter using the given {@link Configuration} and the * hbase.hlog.splitter.impl property to derived the instance @@ -238,6 +250,10 @@ return outputSink.getOutputCounts(); } + void setDistributedLogSplittingHelper(DistributedLogSplittingHelper helper) { + this.distributedLogSplittingHelper = helper; + } + /** * Splits the HLog edits in the given list of logfiles (that are a mix of edits * on multiple regions) by region and then splits them per region directories, @@ -270,7 +286,7 @@ countTotalBytes(logfiles); splitSize = 0; - outputSink.startWriterThreads(entryBuffers); + outputSink.startWriterThreads(); try { int i = 0; @@ -335,8 +351,7 @@ * out by region and stored. *

* If the log file has N regions then N recovered.edits files will be - * produced. There is no buffering in this code. Instead it relies on the - * buffering in the SequenceFileWriter. + * produced. *

* @param rootDir * @param tmpname @@ -357,27 +372,14 @@ public boolean splitLogFileToTemp(FileStatus logfile, String tmpname, CancelableProgressable reporter) throws IOException { - final Map logWriters = Collections. - synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR)); boolean isCorrupted = false; Preconditions.checkState(status == null); status = TaskMonitor.get().createStatus( "Splitting log file " + logfile.getPath() + "into a temporary staging area."); - - Object BAD_WRITER = new Object(); - - boolean progress_failed = false; - boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", HLog.SPLIT_SKIP_ERRORS_DEFAULT); int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); - // How often to send a progress report (default 1/2 the zookeeper session - // timeout of if that not set, the split log DEFAULT_TIMEOUT) - int period = conf.getInt("hbase.splitlog.report.period", - conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 2); - int numOpenedFilesBeforeReporting = - conf.getInt("hbase.splitlog.report.openedfiles", 3); Path logPath = logfile.getPath(); long logLength = logfile.getLen(); LOG.info("Splitting hlog: " + logPath + ", length=" + logLength); @@ -395,59 +397,34 @@ LOG.warn("Nothing to split in log file " + logPath); return true; } - long t = EnvironmentEdgeManager.currentTimeMillis(); - long last_report_at = t; - if (reporter != null && reporter.progress() == false) { - status.markComplete("Failed: reporter.progress asked us to terminate"); + this.setDistributedLogSplittingHelper(new DistributedLogSplittingHelper( + reporter, tmpname)); + if (!reportProgressIfIsDistributedLogSplitting()) { return false; } + boolean progress_failed = false; + outputSink.startWriterThreads(); // Report progress every so many edits and/or files opened (opening a file // takes a bit of time). int editsCount = 0; - int numNewlyOpenedFiles = 0; Entry entry; try { while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) { - byte[] region = entry.getKey().getEncodedRegionName(); - Object o = logWriters.get(region); - if (o == BAD_WRITER) { - continue; - } - WriterAndPath wap = (WriterAndPath)o; - if (wap == null) { - wap = createWAP(region, entry, rootDir, tmpname, fs, conf); - numNewlyOpenedFiles++; - if (wap == null) { - // ignore edits from this region. It doesn't exist anymore. - // It was probably already split. - logWriters.put(region, BAD_WRITER); - continue; - } else { - logWriters.put(region, wap); - } - } - wap.w.append(entry); - outputSink.updateRegionMaximumEditLogSeqNum(entry); + entryBuffers.appendEntry(entry); editsCount++; - // If sufficient edits have passed OR we've opened a few files, check if - // we should report progress. - if (editsCount % interval == 0 || - (numNewlyOpenedFiles > numOpenedFilesBeforeReporting)) { - // Zero out files counter each time we fall in here. - numNewlyOpenedFiles = 0; - String countsStr = "edits=" + editsCount + ", files=" + logWriters.size(); + // If sufficient edits have passed, check if we should report progress. + if (editsCount % interval == 0) { + String countsStr = "edits=" + editsCount; status.setStatus("Split " + countsStr); - long t1 = EnvironmentEdgeManager.currentTimeMillis(); - if ((t1 - last_report_at) > period) { - last_report_at = t; - if (reporter != null && reporter.progress() == false) { - status.markComplete("Failed: reporter.progress asked us to terminate; " + countsStr); - progress_failed = true; - return false; - } + if (!reportProgressIfIsDistributedLogSplitting()) { + return false; } } } + } catch (InterruptedException ie) { + IOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; } catch (CorruptedLogFileException e) { LOG.warn("Could not parse, corrupted log file " + logPath, e); ZKSplitLog.markCorrupted(rootDir, tmpname, fs); @@ -456,79 +433,15 @@ e = RemoteExceptionHandler.checkIOException(e); throw e; } finally { - boolean allWritersClosed = false; - try { - int n = 0; - for (Map.Entry logWritersEntry : logWriters.entrySet()) { - Object o = logWritersEntry.getValue(); - long t1 = EnvironmentEdgeManager.currentTimeMillis(); - if ((t1 - last_report_at) > period) { - last_report_at = t; - if ((progress_failed == false) && (reporter != null) - && (reporter.progress() == false)) { - progress_failed = true; - } - } - if (o == BAD_WRITER) { - continue; - } - n++; - WriterAndPath wap = (WriterAndPath) o; - try { - wap.writerClosed = true; - wap.w.close(); - LOG.debug("Closed " + wap.p); - } catch (IOException e) { - LOG.debug("Exception while closing the writer :", e); - } - Path dst = getCompletedRecoveredEditsFilePath(wap.p, outputSink - .getRegionMaximumEditLogSeqNum(logWritersEntry.getKey())); - if (!dst.equals(wap.p) && fs.exists(dst)) { - LOG.warn("Found existing old edits file. It could be the " - + "result of a previous failed split attempt. Deleting " + dst - + ", length=" + fs.getFileStatus(dst).getLen()); - if (!fs.delete(dst, false)) { - LOG.warn("Failed deleting of old " + dst); - throw new IOException("Failed deleting of old " + dst); - } - } - // Skip the unit tests which create a splitter that reads and writes - // the - // data without touching disk. TestHLogSplit#testThreading is an - // example. - if (fs.exists(wap.p)) { - if (!fs.rename(wap.p, dst)) { - throw new IOException("Failed renaming " + wap.p + " to " + dst); - } - LOG.debug("Rename " + wap.p + " to " + dst); - } - } - allWritersClosed = true; - String msg = "Processed " + editsCount + " edits across " + n - + " regions" + " threw away edits for " + (logWriters.size() - n) - + " regions" + "; log file=" + logPath + " is corrupted = " - + isCorrupted + " progress failed = " + progress_failed; - LOG.info(msg); - status.markComplete(msg); - } finally { - if (!allWritersClosed) { - for (Map.Entry logWritersEntry : logWriters.entrySet()) { - Object o = logWritersEntry.getValue(); - if (o != BAD_WRITER) { - WriterAndPath wap = (WriterAndPath) o; - try { - if (!wap.writerClosed) { - wap.writerClosed = true; - wap.w.close(); - } - } catch (IOException e) { - LOG.debug("Exception while closing the writer :", e); - } - } - } - } - in.close(); - } + LOG.info("Finishing writing output logs and closing down."); + progress_failed = outputSink.finishWritingAndClose() == null; + String msg = "Processed " + editsCount + " edits across " + + outputSink.getOutputCounts().size() + " regions; log file=" + + logPath + " is corrupted = " + isCorrupted + " progress failed = " + + progress_failed; + ; + LOG.info(msg); + status.markComplete(msg); } return !progress_failed; } @@ -1171,7 +1084,64 @@ return ret; } + /*** + * @return false if it is a distributed log splitting and it failed to report + * progress + */ + private boolean reportProgressIfIsDistributedLogSplitting() { + if (this.distributedLogSplittingHelper != null) { + return distributedLogSplittingHelper.reportProgress(); + } else { + return true; + } + } + /** + * A class used in distributed log splitting + * + */ + class DistributedLogSplittingHelper { + // Report progress, only used in distributed log splitting + private final CancelableProgressable splitReporter; + // How often to send a progress report (default 1/2 master timeout) + private final int report_period; + private long last_report_at = 0; + private final String tmpDirName; + + public DistributedLogSplittingHelper(CancelableProgressable reporter, + String tmpName) { + this.splitReporter = reporter; + this.tmpDirName = tmpName; + report_period = conf.getInt("hbase.splitlog.report.period", + conf.getInt("hbase.splitlog.manager.timeout", + SplitLogManager.DEFAULT_TIMEOUT) / 2); + } + + /*** + * @return false if reporter failed progressing + */ + private boolean reportProgress() { + if (splitReporter == null) { + return true; + } else { + long t = EnvironmentEdgeManager.currentTimeMillis(); + if ((t - last_report_at) > report_period) { + last_report_at = t; + if (this.splitReporter.progress() == false) { + LOG.warn("Failed: reporter.progress asked us to terminate"); + return false; + } + } + return true; + } + } + + String getTmpDirName() { + return this.tmpDirName; + } + } + + /** * Class that manages the output streams from the log splitting process. */ class OutputSink { @@ -1189,20 +1159,23 @@ private boolean logWritersClosed = false; - /** - * Start the threads that will pump data from the entryBuffers - * to the output files. - * @return the list of started threads - */ - synchronized void startWriterThreads(EntryBuffers entryBuffers) { + private final int numThreads; + + public OutputSink() { // More threads could potentially write faster at the expense // of causing more disk seeks as the logs are split. // 3. After a certain setting (probably around 3) the // process will be bound on the reader in the current // implementation anyway. - int numThreads = conf.getInt( + numThreads = conf.getInt( "hbase.regionserver.hlog.splitlog.writer.threads", 3); + } + /** + * Start the threads that will pump data from the entryBuffers + * to the output files. + */ + synchronized void startWriterThreads() { for (int i = 0; i < numThreads; i++) { WriterThread t = new WriterThread(i); t.start(); @@ -1210,22 +1183,35 @@ } } + /** + * + * @return null if failed to report progress + * @throws IOException + */ List finishWritingAndClose() throws IOException { LOG.info("Waiting for split writer threads to finish"); + boolean progress_failed = false; try { for (WriterThread t : writerThreads) { t.finish(); } for (WriterThread t : writerThreads) { + if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) { + progress_failed = true; + } try { t.join(); } catch (InterruptedException ie) { - throw new IOException(ie); + IOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; } checkForErrors(); } LOG.info("Split writers finished"); - + if (progress_failed) { + return null; + } return closeStreams(); } finally { List thrown = closeLogWriters(null); @@ -1242,45 +1228,92 @@ private List closeStreams() throws IOException { Preconditions.checkState(!closeAndCleanCompleted); - List paths = new ArrayList(); - List thrown = Lists.newArrayList(); - closeLogWriters(thrown); - for (Map.Entry logWritersEntry : logWriters + final List paths = new ArrayList(); + final List thrown = Lists.newArrayList(); + ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool( + numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() { + private int count = 1; + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "split-log-closeStream-" + count++); + return t; + } + }); + CompletionService completionService = new ExecutorCompletionService( + closeThreadPool); + for (final Map.Entry logWritersEntry : logWriters .entrySet()) { - WriterAndPath wap = logWritersEntry.getValue(); - Path dst = getCompletedRecoveredEditsFilePath(wap.p, - regionMaximumEditLogSeqNum.get(logWritersEntry.getKey())); - try { - if (!dst.equals(wap.p) && fs.exists(dst)) { - LOG.warn("Found existing old edits file. It could be the " - + "result of a previous failed split attempt. Deleting " + dst - + ", length=" + fs.getFileStatus(dst).getLen()); - if (!fs.delete(dst, false)) { - LOG.warn("Failed deleting of old " + dst); - throw new IOException("Failed deleting of old " + dst); + completionService.submit(new Callable() { + public Void call() throws Exception { + WriterAndPath wap = logWritersEntry.getValue(); + try { + wap.w.close(); + } catch (IOException ioe) { + LOG.error("Couldn't close log at " + wap.p, ioe); + thrown.add(ioe); + return null; } - } - // Skip the unit tests which create a splitter that reads and writes - // the data without touching disk. TestHLogSplit#testThreading is an - // example. - if (fs.exists(wap.p)) { - if (!fs.rename(wap.p, dst)) { - throw new IOException("Failed renaming " + wap.p + " to " + dst); + LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten + + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)"); + Path dst = getCompletedRecoveredEditsFilePath(wap.p, + regionMaximumEditLogSeqNum.get(logWritersEntry.getKey())); + try { + if (!dst.equals(wap.p) && fs.exists(dst)) { + LOG.warn("Found existing old edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + + dst + ", length=" + fs.getFileStatus(dst).getLen()); + if (!fs.delete(dst, false)) { + LOG.warn("Failed deleting of old " + dst); + throw new IOException("Failed deleting of old " + dst); + } + } + // Skip the unit tests which create a splitter that reads and + // writes the data without touching disk. + // TestHLogSplit#testThreading is an example. + if (fs.exists(wap.p)) { + if (!fs.rename(wap.p, dst)) { + throw new IOException("Failed renaming " + wap.p + " to " + + dst); + } + LOG.debug("Rename " + wap.p + " to " + dst); + } + } catch (IOException ioe) { + LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); + thrown.add(ioe); + return null; } - LOG.debug("Rename " + wap.p + " to " + dst); + paths.add(dst); + return null; } - } catch (IOException ioe) { - LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); - thrown.add(ioe); - continue; + }); + } + + boolean progress_failed = false; + try { + for (int i = 0; i < logWriters.size(); i++) { + Future future = completionService.take(); + future.get(); + if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) { + progress_failed = true; + } } - paths.add(dst); + } catch (InterruptedException e) { + IOException iie = new InterruptedIOException(); + iie.initCause(e); + throw iie; + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + closeThreadPool.shutdownNow(); } + if (!thrown.isEmpty()) { throw MultipleIOException.createIOException(thrown); } - + logWritersClosed = true; closeAndCleanCompleted = true; + if (progress_failed) { + return null; + } return paths; } @@ -1325,7 +1358,9 @@ if (blacklistedRegions.contains(region)) { return null; } - ret = createWAP(region, entry, rootDir, null, fs, conf); + String tmpName = distributedLogSplittingHelper == null ? null + : distributedLogSplittingHelper.getTmpDirName(); + ret = createWAP(region, entry, rootDir, tmpName, fs, conf); if (ret == null) { blacklistedRegions.add(region); return null;