diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index f7721e0934..cd12047fef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers; import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink; import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; @@ -159,7 +160,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { if (outputSink != null) { try { outputSink.finishWritingAndClose(); - } catch (IOException ex) { + } catch (IOException | WALSplitter.CorruptedLogFileException ex) { LOG.warn("Got exception while trying to close OutputSink", ex); } } @@ -247,8 +248,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; - } catch (IOException e) { - LOG.warn("Received IOException while trying to replicate" + } catch (IOException | WALSplitter.CorruptedLogFileException e) { + LOG.warn("Received exception while trying to replicate" + StringUtils.stringifyException(e)); } } @@ -307,6 +308,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { return; } + // Note: this can potentially write a corrupted cell undiscovered. sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(), CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries); } @@ -325,7 +327,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { } @Override - public List finishWritingAndClose() throws IOException { + public List finishWritingAndClose() + throws IOException, WALSplitter.CorruptedLogFileException { finishWriting(true); return null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 66795978f2..7436dfe5f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -144,6 +144,7 @@ public class WALSplitter { // if we limit the number of writers opened for sinking recovered edits private final boolean splitWriterCreationBounded; + private final boolean skipErrors; public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded"; @@ -174,9 +175,10 @@ public class WALSplitter { if(splitWriterCreationBounded){ outputSink = new BoundedLogWriterCreationOutputSink( controller, entryBuffers, numWriterThreads); - }else { + } else { outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); } + this.skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", SPLIT_SKIP_ERRORS_DEFAULT); } /** @@ -233,8 +235,6 @@ public class WALSplitter { Preconditions.checkArgument(logfile.isFile(), "passed in file status is for something other than a regular file."); boolean isCorrupted = false; - boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", - SPLIT_SKIP_ERRORS_DEFAULT); int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); Path logPath = logfile.getPath(); boolean outputSinkStarted = false; @@ -294,12 +294,22 @@ public class WALSplitter { editsSkipped++; continue; } - // Don't send Compaction/Close/Open region events to recovered edit type sinks. - if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) { - editsSkipped++; - continue; + try { + // Don't send Compaction/Close/Open region events to recovered edit type sinks. + if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) { + editsSkipped++; + continue; + } + entryBuffers.appendEntry(entry); + } catch (InterruptedException | IOException | CorruptedLogFileException ex) { + throw ex; + } catch (Exception ex) { + String error = logCorruptedEntry(entry, ex, skipErrors ? "skipping" : "failing"); + if (skipErrors) { + continue; + } + throw new CorruptedLogFileException(error); } - entryBuffers.appendEntry(entry); editsCount++; int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck; // If sufficient edits have passed, check if we should report progress. @@ -320,14 +330,7 @@ public class WALSplitter { iie.initCause(ie); throw iie; } catch (CorruptedLogFileException e) { - LOG.warn("Could not parse, corrupted WAL={}", logPath, e); - if (splitLogWorkerCoordination != null) { - // Some tests pass in a csm of null. - splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS); - } else { - // for tests only - ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS); - } + handleCorruptedWalException(e, logfile, logPath); isCorrupted = true; } catch (IOException e) { e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; @@ -348,6 +351,10 @@ public class WALSplitter { progress_failed = true; progress_failed = outputSink.finishWritingAndClose() == null; } + } catch (CorruptedLogFileException e) { + // It is possible to hit CLFE here because we don't parse entries until the write time. + handleCorruptedWalException(e, logfile, logPath); + isCorrupted = true; } finally { String msg = "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions() @@ -361,6 +368,26 @@ public class WALSplitter { return !progress_failed; } + private void handleCorruptedWalException( + CorruptedLogFileException e, FileStatus logfile, Path logPath) { + LOG.warn("Could not parse, corrupted WAL={}", logPath, e); + if (splitLogWorkerCoordination != null) { + // Some tests pass in a csm of null. + splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS); + } else { + // for tests only + ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS); + } + } + + private static String logCorruptedEntry(Entry entry, Exception ex, String tail) { + // A WAL like this is sometimes created due to HBASE-21601 (root cause unknown for now). + String error = "Found an entry with intact structure and corrupted data for WAL key " + + entry.getKey() + "; " + tail; + LOG.error(error, ex); + return error; + } + /** * Completes the work done by splitLogFile by archiving logs *

@@ -837,11 +864,13 @@ public class WALSplitter { /** * Check for errors in the writer threads. If any is found, rethrow it. */ - void checkForErrors() throws IOException { + void checkForErrors() throws IOException, CorruptedLogFileException { Throwable thrown = this.thrown.get(); if (thrown == null) return; if (thrown instanceof IOException) { - throw new IOException(thrown); + throw (IOException)thrown; + } else if (thrown instanceof CorruptedLogFileException) { + throw (CorruptedLogFileException)thrown; } else { throw new RuntimeException(thrown); } @@ -887,18 +916,22 @@ public class WALSplitter { * @throws InterruptedException * @throws IOException */ - public void appendEntry(Entry entry) throws InterruptedException, IOException { + public void appendEntry(Entry entry) + throws InterruptedException, IOException, CorruptedLogFileException { WALKey key = entry.getKey(); RegionEntryBuffer buffer; long incrHeap; synchronized (this) { buffer = buffers.get(key.getEncodedRegionName()); - if (buffer == null) { + boolean isNew = buffer == null; + if (isNew) { buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName()); + } + incrHeap = buffer.appendEntry(entry); + if (isNew) { buffers.put(key.getEncodedRegionName(), buffer); } - incrHeap= buffer.appendEntry(entry); } // If we crossed the chunk threshold, wait for more space to be available @@ -1051,7 +1084,7 @@ public class WALSplitter { } } - private void doRun() throws IOException { + private void doRun() throws IOException, CorruptedLogFileException { LOG.trace("Writer thread starting"); while (true) { RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); @@ -1081,7 +1114,8 @@ public class WALSplitter { } } - private void writeBuffer(RegionEntryBuffer buffer) throws IOException { + private void writeBuffer(RegionEntryBuffer buffer) + throws IOException, CorruptedLogFileException { outputSink.append(buffer); } @@ -1176,7 +1210,8 @@ public class WALSplitter { * @return true when there is no error * @throws IOException */ - protected boolean finishWriting(boolean interrupt) throws IOException { + protected boolean finishWriting(boolean interrupt) + throws IOException, CorruptedLogFileException { LOG.debug("Waiting for split writer threads to finish"); boolean progress_failed = false; for (WriterThread t : writerThreads) { @@ -1205,7 +1240,7 @@ public class WALSplitter { return (!progress_failed); } - public abstract List finishWritingAndClose() throws IOException; + public abstract List finishWritingAndClose() throws IOException, CorruptedLogFileException; /** * @return a map from encoded region ID to the number of edits written out for that region. @@ -1221,7 +1256,7 @@ public class WALSplitter { * @param buffer A WAL Edit Entry * @throws IOException */ - public abstract void append(RegionEntryBuffer buffer) throws IOException; + public abstract void append(RegionEntryBuffer buffer) throws IOException, CorruptedLogFileException; /** * WriterThread call this function to help flush internal remaining edits in buffer before close @@ -1260,7 +1295,7 @@ public class WALSplitter { * @throws IOException */ @Override - public List finishWritingAndClose() throws IOException { + public List finishWritingAndClose() throws IOException, CorruptedLogFileException { boolean isSuccessful = false; List result = null; try { @@ -1556,11 +1591,12 @@ public class WALSplitter { } @Override - public void append(RegionEntryBuffer buffer) throws IOException { + public void append(RegionEntryBuffer buffer) throws IOException, CorruptedLogFileException { appendBuffer(buffer, true); } - WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{ + WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) + throws IOException, CorruptedLogFileException { List entries = buffer.entryBuffer; if (entries.isEmpty()) { LOG.warn("got an empty buffer, skipping"); @@ -1574,16 +1610,28 @@ public class WALSplitter { int editsCount = 0; for (Entry logEntry : entries) { - if (wap == null) { - wap = getWriterAndPath(logEntry, reusable); + try { if (wap == null) { - // This log spews the full edit. Can be massive in the log. Enable only debugging - // WAL lost edit issues. - LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry); - return null; + wap = getWriterAndPath(logEntry, reusable); + if (wap == null) { + // This log spews the full edit. Can be massive in the log. Enable only debugging + // WAL lost edit issues. + LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry); + return null; + } + } + filterCellByStore(logEntry); + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { + String error = logCorruptedEntry(logEntry, ex, skipErrors ? "skipping" : "failing"); + if (skipErrors) { + continue; } + throw new CorruptedLogFileException(error + ": " + ex); } - filterCellByStore(logEntry); + // Do not handle corrupted entries during append; it could be in the middle of the write. + // Also ExtendedCell impls generally dump the buffer as is so they won't fail like this. if (!logEntry.getEdit().isEmpty()) { wap.w.append(logEntry); this.updateRegionMaximumEditLogSeqNum(logEntry); @@ -1646,7 +1694,7 @@ public class WALSplitter { } @Override - public List finishWritingAndClose() throws IOException { + public List finishWritingAndClose() throws IOException, CorruptedLogFileException { boolean isSuccessful; List result; try { @@ -1716,11 +1764,12 @@ public class WALSplitter { * @throws IOException when closeWriter failed */ @Override - public void append(RegionEntryBuffer buffer) throws IOException { + public void append(RegionEntryBuffer buffer) throws IOException, CorruptedLogFileException { writeThenClose(buffer); } - private Path writeThenClose(RegionEntryBuffer buffer) throws IOException { + private Path writeThenClose(RegionEntryBuffer buffer) + throws IOException, CorruptedLogFileException { WriterAndPath wap = appendBuffer(buffer, false); if(wap != null) { String encodedRegionName = Bytes.toString(buffer.encodedRegionName); @@ -1783,7 +1832,7 @@ public class WALSplitter { } } - static class CorruptedLogFileException extends Exception { + public static class CorruptedLogFileException extends Exception { private static final long serialVersionUID = 1L; CorruptedLogFileException(String s) {