Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (revision 1234274) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (working copy) @@ -735,8 +735,10 @@ private final Set blacklistedRegions = Collections.synchronizedSet( new TreeSet(Bytes.BYTES_COMPARATOR)); - private boolean hasClosed = false; + private boolean closeAndCleanCompleted = false; + + private boolean logWritersClosed = false; /** * Start the threads that will pump data from the entryBuffers * to the output files. @@ -760,20 +762,27 @@ List finishWritingAndClose() throws IOException { LOG.info("Waiting for split writer threads to finish"); - for (WriterThread t : writerThreads) { - t.finish(); - } - for (WriterThread t: writerThreads) { - try { - t.join(); - } catch (InterruptedException ie) { - throw new IOException(ie); + try { + for (WriterThread t : writerThreads) { + t.finish(); } - checkForErrors(); + for (WriterThread t : writerThreads) { + try { + t.join(); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + checkForErrors(); + } + LOG.info("Split writers finished"); + + return closeStreams(); + } finally { + List thrown = closeLogWriters(null); + if (thrown != null && !thrown.isEmpty()) { + throw MultipleIOException.createIOException(thrown); + } } - LOG.info("Split writers finished"); - - return closeStreams(); } /** @@ -781,21 +790,15 @@ * @return the list of paths written. */ private List closeStreams() throws IOException { - Preconditions.checkState(!hasClosed); + Preconditions.checkState(!closeAndCleanCompleted); List paths = new ArrayList(); List thrown = Lists.newArrayList(); + closeLogWriters(thrown); for (WriterAndPath wap : logWriters.values()) { - try { - wap.w.close(); - } catch (IOException ioe) { - LOG.error("Couldn't close log at " + wap.p, ioe); - thrown.add(ioe); - continue; - } - LOG.info("Closed path " + wap.p +" (wrote " + wap.editsWritten + " edits in " - + (wap.nanosSpent / 1000/ 1000) + "ms)"); + LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten + + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)"); Path dst = getCompletedRecoveredEditsFilePath(wap.p); try { if (!dst.equals(wap.p) && fs.exists(dst)) { @@ -822,13 +825,36 @@ } paths.add(dst); } + if (!thrown.isEmpty()) { throw MultipleIOException.createIOException(thrown); } - hasClosed = true; + closeAndCleanCompleted = true; return paths; } + + private List closeLogWriters(List thrown) + throws IOException { + // close the log writer streams only if they are not closed + // in closeStreams(). + if (!logWritersClosed) { + if (thrown == null) { + thrown = Lists.newArrayList(); + } + for (WriterAndPath wap : logWriters.values()) { + try { + wap.w.close(); + } catch (IOException ioe) { + LOG.error("Couldn't close log at " + wap.p, ioe); + thrown.add(ioe); + continue; + } + } + logWritersClosed = true; + } + return thrown; + } /** * Get a writer and path for a log starting at the given entry.