diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index 581c6bb5e2..1bf2a9ea58 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -513,51 +513,60 @@ public class KeyValueUtil { + ", length=" + length; } + public static class InvalidKvBufferException extends IllegalArgumentException { + public InvalidKvBufferException(String message) { + super(message); + } + + public InvalidKvBufferException(String message, Exception ex) { + } + } + static void checkKeyValueBytes(byte[] buf, int offset, int length, boolean withTags) { int pos = offset, endOffset = offset + length; // check the key if (pos + Bytes.SIZEOF_INT > endOffset) { - throw new IllegalArgumentException( + throw new InvalidKvBufferException( "Overflow when reading key length at position=" + pos + bytesToHex(buf, offset, length)); } int keyLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT); pos += Bytes.SIZEOF_INT; if (keyLen <= 0 || pos + keyLen > endOffset) { - throw new IllegalArgumentException( + throw new InvalidKvBufferException( "Invalid key length in KeyValue. keyLength=" + keyLen + bytesToHex(buf, offset, length)); } // check the value if (pos + Bytes.SIZEOF_INT > endOffset) { - throw new IllegalArgumentException("Overflow when reading value length at position=" + pos + throw new InvalidKvBufferException("Overflow when reading value length at position=" + pos + bytesToHex(buf, offset, length)); } int valLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT); pos += Bytes.SIZEOF_INT; if (valLen < 0 || pos + valLen > endOffset) { - throw new IllegalArgumentException("Invalid value length in KeyValue, valueLength=" + valLen + throw new InvalidKvBufferException("Invalid value length in KeyValue, valueLength=" + valLen + bytesToHex(buf, offset, length)); } // check the row if (pos + Bytes.SIZEOF_SHORT > endOffset) { - throw new IllegalArgumentException( + throw new InvalidKvBufferException( "Overflow when reading row length at position=" + pos + bytesToHex(buf, offset, length)); } short rowLen = Bytes.toShort(buf, pos, Bytes.SIZEOF_SHORT); pos += Bytes.SIZEOF_SHORT; if (rowLen < 0 || pos + rowLen > endOffset) { - throw new IllegalArgumentException( + throw new InvalidKvBufferException( "Invalid row length in KeyValue, rowLength=" + rowLen + bytesToHex(buf, offset, length)); } pos += rowLen; // check the family if (pos + Bytes.SIZEOF_BYTE > endOffset) { - throw new IllegalArgumentException("Overflow when reading family length at position=" + pos + throw new InvalidKvBufferException("Overflow when reading family length at position=" + pos + bytesToHex(buf, offset, length)); } int familyLen = buf[pos]; pos += Bytes.SIZEOF_BYTE; if (familyLen < 0 || pos + familyLen > endOffset) { - throw new IllegalArgumentException("Invalid family length in KeyValue, familyLength=" + throw new InvalidKvBufferException("Invalid family length in KeyValue, familyLength=" + familyLen + bytesToHex(buf, offset, length)); } pos += familyLen; @@ -565,35 +574,35 @@ public class KeyValueUtil { int qualifierLen = keyLen - Bytes.SIZEOF_SHORT - rowLen - Bytes.SIZEOF_BYTE - familyLen - Bytes.SIZEOF_LONG - Bytes.SIZEOF_BYTE; if (qualifierLen < 0 || pos + qualifierLen > endOffset) { - throw new IllegalArgumentException("Invalid qualifier length in KeyValue, qualifierLen=" + throw new InvalidKvBufferException("Invalid qualifier length in KeyValue, qualifierLen=" + qualifierLen + bytesToHex(buf, offset, length)); } pos += qualifierLen; // check the timestamp if (pos + Bytes.SIZEOF_LONG > endOffset) { - throw new IllegalArgumentException( + throw new InvalidKvBufferException( "Overflow when reading timestamp at position=" + pos + bytesToHex(buf, offset, length)); } long timestamp = Bytes.toLong(buf, pos, Bytes.SIZEOF_LONG); if (timestamp < 0) { - throw new IllegalArgumentException( + throw new InvalidKvBufferException( "Timestamp cannot be negative, ts=" + timestamp + bytesToHex(buf, offset, length)); } pos += Bytes.SIZEOF_LONG; // check the type if (pos + Bytes.SIZEOF_BYTE > endOffset) { - throw new IllegalArgumentException( + throw new InvalidKvBufferException( "Overflow when reading type at position=" + pos + bytesToHex(buf, offset, length)); } byte type = buf[pos]; if (!Type.isValidType(type)) { - throw new IllegalArgumentException( + throw new InvalidKvBufferException( "Invalid type in KeyValue, type=" + type + bytesToHex(buf, offset, length)); } pos += Bytes.SIZEOF_BYTE; // check the value if (pos + valLen > endOffset) { - throw new IllegalArgumentException( + throw new InvalidKvBufferException( "Overflow when reading value part at position=" + pos + bytesToHex(buf, offset, length)); } pos += valLen; @@ -604,26 +613,26 @@ public class KeyValueUtil { return; } if (pos + Bytes.SIZEOF_SHORT > endOffset) { - throw new IllegalArgumentException("Overflow when reading tags length at position=" + pos + throw new InvalidKvBufferException("Overflow when reading tags length at position=" + pos + bytesToHex(buf, offset, length)); } short tagsLen = Bytes.toShort(buf, pos); pos += Bytes.SIZEOF_SHORT; if (tagsLen < 0 || pos + tagsLen > endOffset) { - throw new IllegalArgumentException("Invalid tags length in KeyValue at position=" + throw new InvalidKvBufferException("Invalid tags length in KeyValue at position=" + (pos - Bytes.SIZEOF_SHORT) + bytesToHex(buf, offset, length)); } int tagsEndOffset = pos + tagsLen; for (; pos < tagsEndOffset;) { if (pos + Tag.TAG_LENGTH_SIZE > endOffset) { - throw new IllegalArgumentException("Overflow when reading tag length at position=" + pos + throw new InvalidKvBufferException("Overflow when reading tag length at position=" + pos + bytesToHex(buf, offset, length)); } short tagLen = Bytes.toShort(buf, pos); pos += Tag.TAG_LENGTH_SIZE; // tagLen contains one byte tag type, so must be not less than 1. if (tagLen < 1 || pos + tagLen > endOffset) { - throw new IllegalArgumentException( + throw new InvalidKvBufferException( "Invalid tag length at position=" + (pos - Tag.TAG_LENGTH_SIZE) + ", tagLength=" + tagLen + bytesToHex(buf, offset, length)); } @@ -631,7 +640,7 @@ public class KeyValueUtil { } } if (pos != endOffset) { - throw new IllegalArgumentException("Some redundant bytes in KeyValue's buffer, startOffset=" + throw new InvalidKvBufferException("Some redundant bytes in KeyValue's buffer, startOffset=" + pos + ", endOffset=" + endOffset + bytesToHex(buf, offset, length)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 494cce53ff..a226573c49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -30,7 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.KeyValueUtil;import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.wal.WALSplitter;import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,6 +98,8 @@ public class ProtobufLogReader extends ReaderBase { // cell codec classname private String codecClsName = null; + private boolean allowSkippingCorruptCells; + @InterfaceAudience.Private public long trailerSize() { if (trailerPresent) { @@ -162,10 +164,12 @@ public class ProtobufLogReader extends ReaderBase { } @Override - public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream) - throws IOException { + public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream, + boolean isSplitReader) throws IOException { this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); - super.init(fs, path, conf, stream); + this.allowSkippingCorruptCells = isSplitReader && conf.getBoolean( + WALSplitter.SPLIT_SKIP_ERRORS, WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT); + super.init(fs, path, conf, stream, isSplitReader); } @Override @@ -349,6 +353,7 @@ public class ProtobufLogReader extends ReaderBase { throw new EOFException("First byte is negative at offset " + originalPosition); } size = CodedInputStream.readRawVarint32(firstByte, this.inputStream); + // available may be < 0 on local fs for instance. If so, can't depend on it. available = this.inputStream.available(); if (available > 0 && available < size) { @@ -399,6 +404,11 @@ public class ProtobufLogReader extends ReaderBase { } String message = " while reading " + expectedCells + " WAL KVs; started reading at " + posBefore + " and read up to " + posAfterStr; + if (allowSkippingCorruptCells && ex instanceof KeyValueUtil.InvalidKvBufferException) { + // Throw and allow the caller to handle invalid KVs if they can. + throw new KeyValueUtil.InvalidKvBufferException(message, ex); + } + IOException realEofEx = extractHiddenEof(ex); throw (EOFException) new EOFException("EOF " + message). initCause(realEofEx != null ? realEofEx : ex); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index 27d40b25a2..a931017ee9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.KeyValueUtil;import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; @@ -56,8 +56,8 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader { } @Override - public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream) - throws IOException { + public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream, + boolean isSplitReader) throws IOException { this.conf = conf; this.path = path; this.fs = fs; @@ -96,6 +96,8 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader { boolean hasEntry = false; try { hasEntry = readNext(e); + } catch (KeyValueUtil.InvalidKvBufferException ex) { + throw ex; // Propagate this type of IAE as is. } catch (IllegalArgumentException iae) { TableName tableName = e.getKey().getTableName(); if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index f7721e0934..cd12047fef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers; import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink; import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; @@ -159,7 +160,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { if (outputSink != null) { try { outputSink.finishWritingAndClose(); - } catch (IOException ex) { + } catch (IOException | WALSplitter.CorruptedLogFileException ex) { LOG.warn("Got exception while trying to close OutputSink", ex); } } @@ -247,8 +248,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; - } catch (IOException e) { - LOG.warn("Received IOException while trying to replicate" + } catch (IOException | WALSplitter.CorruptedLogFileException e) { + LOG.warn("Received exception while trying to replicate" + StringUtils.stringifyException(e)); } } @@ -307,6 +308,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { return; } + // Note: this can potentially write a corrupted cell undiscovered. sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(), CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries); } @@ -325,7 +327,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { } @Override - public List finishWritingAndClose() throws IOException { + public List finishWritingAndClose() + throws IOException, WALSplitter.CorruptedLogFileException { finishWriting(true); return null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 1f24548cb9..5cf488c651 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -71,12 +71,10 @@ public abstract class AbstractFSWALProvider> implemen // Only public so classes back in regionserver.wal can access public interface Reader extends WAL.Reader { /** - * @param fs File system. - * @param path Path. - * @param c Configuration. * @param s Input stream that may have been pre-opened by the caller; may be null. */ - void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException; + void init(FileSystem fs, Path path, Configuration c, + FSDataInputStream s, boolean isSplitReader) throws IOException; } protected volatile T wal; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java index e9f9af1059..77d961eb96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.KeyValueUtil;import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.HeapSize; @@ -206,7 +206,16 @@ public class WALEdit implements HeapSize { cells.clear(); cells.ensureCapacity(expectedCount); while (cells.size() < expectedCount && cellDecoder.advance()) { - cells.add(cellDecoder.current()); + try { + cells.add(cellDecoder.current()); + } catch (KeyValueUtil.InvalidKvBufferException ex) { + // Advance decoder all the way so that the caller could skip these cells if possible. + expectedCount -= cells.size(); + while (expectedCount > 0 && cellDecoder.advance()) { + --expectedCount; + } + throw ex; + } } return cells.size(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 8bde6d2001..063515cce0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -309,11 +309,16 @@ public class WALFactory { */ public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter) throws IOException { - return createReader(fs, path, reporter, true); + return createReader(fs, path, reporter, true, false); } public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter, boolean allowCustom) throws IOException { + return createReader(fs, path, reporter, allowCustom, false); + } + + public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter, + boolean allowCustom, boolean isSplit) throws IOException { Class lrClass = allowCustom ? logReaderClass : ProtobufLogReader.class; try { @@ -327,7 +332,7 @@ public class WALFactory { while (true) { try { reader = lrClass.getDeclaredConstructor().newInstance(); - reader.init(fs, path, conf, null); + reader.init(fs, path, conf, null, isSplit); return reader; } catch (IOException e) { if (reader != null) { @@ -454,7 +459,7 @@ public class WALFactory { */ public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path, final Configuration configuration) throws IOException { - return getInstance(configuration).createReader(fs, path, null, false); + return getInstance(configuration).createReader(fs, path, null, false, false); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 66795978f2..f9d70a7c08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.KeyValueUtil;import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; @@ -114,6 +114,7 @@ public class WALSplitter { /** By default we retry errors in splitting, rather than skipping. */ public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false; + public static final String SPLIT_SKIP_ERRORS = "hbase.hlog.split.skip.errors"; // Parameters for split process protected final Path walDir; @@ -144,6 +145,7 @@ public class WALSplitter { // if we limit the number of writers opened for sinking recovered edits private final boolean splitWriterCreationBounded; + private final boolean skipErrors; public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded"; @@ -174,9 +176,10 @@ public class WALSplitter { if(splitWriterCreationBounded){ outputSink = new BoundedLogWriterCreationOutputSink( controller, entryBuffers, numWriterThreads); - }else { + } else { outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); } + this.skipErrors = conf.getBoolean(SPLIT_SKIP_ERRORS, SPLIT_SKIP_ERRORS_DEFAULT); } /** @@ -233,8 +236,6 @@ public class WALSplitter { Preconditions.checkArgument(logfile.isFile(), "passed in file status is for something other than a regular file."); boolean isCorrupted = false; - boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", - SPLIT_SKIP_ERRORS_DEFAULT); int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); Path logPath = logfile.getPath(); boolean outputSinkStarted = false; @@ -266,7 +267,20 @@ public class WALSplitter { outputSinkStarted = true; Entry entry; Long lastFlushedSequenceId = -1L; - while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) { + while (true) { + try { + entry = getNextLogLine(logFileReader, logPath, skipErrors); + } catch (KeyValueUtil.InvalidKvBufferException ex) { + String error = logCorruptedEntry(null, ex, skipErrors ? "skipping" : "failing"); + if (skipErrors) { + continue; + } + throw new CorruptedLogFileException(error); + } + + if (entry == null) { + break; + } byte[] region = entry.getKey().getEncodedRegionName(); String encodedRegionNameAsStr = Bytes.toString(region); lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); @@ -294,12 +308,23 @@ public class WALSplitter { editsSkipped++; continue; } - // Don't send Compaction/Close/Open region events to recovered edit type sinks. - if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) { - editsSkipped++; - continue; + try { + // Don't send Compaction/Close/Open region events to recovered edit type sinks. + if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) { + editsSkipped++; + continue; + } + entryBuffers.appendEntry(entry); + } catch (InterruptedException | IOException | CorruptedLogFileException ex) { + throw ex; + } catch (Exception ex) { + String error = logCorruptedEntry( + entry.getKey(), ex, skipErrors ? "skipping" : "failing"); + if (skipErrors) { + continue; + } + throw new CorruptedLogFileException(error); } - entryBuffers.appendEntry(entry); editsCount++; int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck; // If sufficient edits have passed, check if we should report progress. @@ -320,14 +345,7 @@ public class WALSplitter { iie.initCause(ie); throw iie; } catch (CorruptedLogFileException e) { - LOG.warn("Could not parse, corrupted WAL={}", logPath, e); - if (splitLogWorkerCoordination != null) { - // Some tests pass in a csm of null. - splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS); - } else { - // for tests only - ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS); - } + handleCorruptedWalException(e, logfile, logPath); isCorrupted = true; } catch (IOException e) { e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; @@ -348,6 +366,10 @@ public class WALSplitter { progress_failed = true; progress_failed = outputSink.finishWritingAndClose() == null; } + } catch (CorruptedLogFileException e) { + // It is possible to hit CLFE here because we don't parse entries until the write time. + handleCorruptedWalException(e, logfile, logPath); + isCorrupted = true; } finally { String msg = "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions() @@ -361,6 +383,26 @@ public class WALSplitter { return !progress_failed; } + private void handleCorruptedWalException( + CorruptedLogFileException e, FileStatus logfile, Path logPath) { + LOG.warn("Could not parse, corrupted WAL={}", logPath, e); + if (splitLogWorkerCoordination != null) { + // Some tests pass in a csm of null. + splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS); + } else { + // for tests only + ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS); + } + } + + private static String logCorruptedEntry(WALKeyImpl key, Throwable ex, String tail) { + // A WAL like this is sometimes created due to HBASE-21601 (root cause unknown for now). + String error = "Found an entry with intact structure and corrupted data for WAL key " + + key + "; " + tail; + LOG.error(error, ex); + return error; + } + /** * Completes the work done by splitLogFile by archiving logs *

@@ -762,8 +804,9 @@ public class WALSplitter { return in; } - static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) - throws CorruptedLogFileException, IOException { + @VisibleForTesting + static Entry getNextLogLine(Reader in, Path path, boolean skipErrors) + throws CorruptedLogFileException, IOException { try { return in.next(); } catch (EOFException eof) { @@ -804,7 +847,7 @@ public class WALSplitter { * @return new Reader instance, caller should close */ protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { - return walFactory.createReader(walFS, curLogFile, reporter); + return walFactory.createReader(walFS, curLogFile, reporter, true, true); } /** @@ -837,11 +880,13 @@ public class WALSplitter { /** * Check for errors in the writer threads. If any is found, rethrow it. */ - void checkForErrors() throws IOException { + void checkForErrors() throws IOException, CorruptedLogFileException { Throwable thrown = this.thrown.get(); if (thrown == null) return; if (thrown instanceof IOException) { - throw new IOException(thrown); + throw (IOException)thrown; + } else if (thrown instanceof CorruptedLogFileException) { + throw (CorruptedLogFileException)thrown; } else { throw new RuntimeException(thrown); } @@ -887,18 +932,22 @@ public class WALSplitter { * @throws InterruptedException * @throws IOException */ - public void appendEntry(Entry entry) throws InterruptedException, IOException { + public void appendEntry(Entry entry) + throws InterruptedException, IOException, CorruptedLogFileException { WALKey key = entry.getKey(); RegionEntryBuffer buffer; long incrHeap; synchronized (this) { buffer = buffers.get(key.getEncodedRegionName()); - if (buffer == null) { + boolean isNew = buffer == null; + if (isNew) { buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName()); + } + incrHeap = buffer.appendEntry(entry); + if (isNew) { buffers.put(key.getEncodedRegionName(), buffer); } - incrHeap= buffer.appendEntry(entry); } // If we crossed the chunk threshold, wait for more space to be available @@ -1051,7 +1100,7 @@ public class WALSplitter { } } - private void doRun() throws IOException { + private void doRun() throws IOException, CorruptedLogFileException { LOG.trace("Writer thread starting"); while (true) { RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); @@ -1081,7 +1130,8 @@ public class WALSplitter { } } - private void writeBuffer(RegionEntryBuffer buffer) throws IOException { + private void writeBuffer(RegionEntryBuffer buffer) + throws IOException, CorruptedLogFileException { outputSink.append(buffer); } @@ -1176,7 +1226,8 @@ public class WALSplitter { * @return true when there is no error * @throws IOException */ - protected boolean finishWriting(boolean interrupt) throws IOException { + protected boolean finishWriting(boolean interrupt) + throws IOException, CorruptedLogFileException { LOG.debug("Waiting for split writer threads to finish"); boolean progress_failed = false; for (WriterThread t : writerThreads) { @@ -1205,7 +1256,8 @@ public class WALSplitter { return (!progress_failed); } - public abstract List finishWritingAndClose() throws IOException; + public abstract List finishWritingAndClose() + throws IOException, CorruptedLogFileException; /** * @return a map from encoded region ID to the number of edits written out for that region. @@ -1221,7 +1273,8 @@ public class WALSplitter { * @param buffer A WAL Edit Entry * @throws IOException */ - public abstract void append(RegionEntryBuffer buffer) throws IOException; + public abstract void append(RegionEntryBuffer buffer) + throws IOException, CorruptedLogFileException; /** * WriterThread call this function to help flush internal remaining edits in buffer before close @@ -1260,7 +1313,7 @@ public class WALSplitter { * @throws IOException */ @Override - public List finishWritingAndClose() throws IOException { + public List finishWritingAndClose() throws IOException, CorruptedLogFileException { boolean isSuccessful = false; List result = null; try { @@ -1282,7 +1335,7 @@ public class WALSplitter { private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException { long dstMinLogSeqNum = -1L; - try (WAL.Reader reader = walFactory.createReader(walFS, dst)) { + try (WAL.Reader reader = walFactory.createReader(walFS, dst, null, true, true)) { WAL.Entry entry = reader.next(); if (entry != null) { dstMinLogSeqNum = entry.getKey().getSequenceId(); @@ -1556,11 +1609,12 @@ public class WALSplitter { } @Override - public void append(RegionEntryBuffer buffer) throws IOException { + public void append(RegionEntryBuffer buffer) throws IOException, CorruptedLogFileException { appendBuffer(buffer, true); } - WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{ + WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) + throws IOException, CorruptedLogFileException { List entries = buffer.entryBuffer; if (entries.isEmpty()) { LOG.warn("got an empty buffer, skipping"); @@ -1574,16 +1628,29 @@ public class WALSplitter { int editsCount = 0; for (Entry logEntry : entries) { - if (wap == null) { - wap = getWriterAndPath(logEntry, reusable); + try { if (wap == null) { - // This log spews the full edit. Can be massive in the log. Enable only debugging - // WAL lost edit issues. - LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry); - return null; + wap = getWriterAndPath(logEntry, reusable); + if (wap == null) { + // This log spews the full edit. Can be massive in the log. Enable only debugging + // WAL lost edit issues. + LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry); + return null; + } + } + filterCellByStore(logEntry); + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { + String error = logCorruptedEntry( + logEntry.getKey(), ex, skipErrors ? "skipping" : "failing"); + if (skipErrors) { + continue; } + throw new CorruptedLogFileException(error + ": " + ex); } - filterCellByStore(logEntry); + // Do not handle corrupted entries during append; it could be in the middle of the write. + // Also ExtendedCell impls generally dump the buffer as is so they won't fail like this. if (!logEntry.getEdit().isEmpty()) { wap.w.append(logEntry); this.updateRegionMaximumEditLogSeqNum(logEntry); @@ -1646,7 +1713,7 @@ public class WALSplitter { } @Override - public List finishWritingAndClose() throws IOException { + public List finishWritingAndClose() throws IOException, CorruptedLogFileException { boolean isSuccessful; List result; try { @@ -1716,11 +1783,12 @@ public class WALSplitter { * @throws IOException when closeWriter failed */ @Override - public void append(RegionEntryBuffer buffer) throws IOException { + public void append(RegionEntryBuffer buffer) throws IOException, CorruptedLogFileException { writeThenClose(buffer); } - private Path writeThenClose(RegionEntryBuffer buffer) throws IOException { + private Path writeThenClose(RegionEntryBuffer buffer) + throws IOException, CorruptedLogFileException { WriterAndPath wap = appendBuffer(buffer, false); if(wap != null) { String encodedRegionName = Bytes.toString(buffer.encodedRegionName); @@ -1783,7 +1851,7 @@ public class WALSplitter { } } - static class CorruptedLogFileException extends Exception { + public static class CorruptedLogFileException extends Exception { private static final long serialVersionUID = 1L; CorruptedLogFileException(String s) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index f2fd5916f7..6023a9dd9a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -963,7 +963,7 @@ public abstract class AbstractTestWALReplay { conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, AbstractFSWALProvider.Reader.class); AbstractFSWALProvider.Reader reader = logReaderClass.getDeclaredConstructor().newInstance(); - reader.init(this.fs, editFile, conf, stream); + reader.init(this.fs, editFile, conf, stream, false); final long headerLength = stream.getPos(); reader.close(); FileSystem spyFs = spy(this.fs); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index e6644f07dd..9eb9ecc471 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -19,10 +19,10 @@ package org.apache.hadoop.hbase.wal; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull;import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.io.FileNotFoundException; +import java.io.EOFException;import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Method; import java.security.PrivilegedExceptionAction; @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Objects; +import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -77,6 +78,7 @@ import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -96,6 +98,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Joiner; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -521,6 +524,20 @@ public class TestWALSplit { return result; } + private void splitAndCount(String region, final int expectedEntries) + throws IOException { + useDifferentDFSClient(); + WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + Path[] logfiles = getLogForRegion(TABLE_NAME, region); + int count = 0; + for (Path logfile: logfiles) { + count += countWAL(logfile); + } + if (-1 != expectedEntries) { + assertEquals(expectedEntries, count); + } + } + @Test public void testEmptyLogFiles() throws IOException { testEmptyLogFiles(true); @@ -582,6 +599,85 @@ public class TestWALSplit { REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount); } + + @Test + public void testCellCorruptionSkipErrorsReadsOtherCells() throws Exception { + conf.setBoolean(HBASE_SKIP_ERRORS, true); + // Generate a WAL with 3 records. + // We'll corrupt the middle record and ensure the other two are read. + makeRegionDirs(REGIONS); + fs.mkdirs(WALDIR); + final String region = REGIONS.get(0); + int seq = 0; + Path walPath = new Path(WALDIR, WAL_FILE_PREFIX + 0); + Writer w = wals.createWALWriter(fs, walPath); + appendEntry(w, TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(region + seq), + FAMILY, QUALIFIER, VALUE, seq++); + byte[] value = new byte[1024]; + new Random(0).nextBytes(value); + appendEntry(w, TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(region + seq), + FAMILY, QUALIFIER, value, seq++); + long entryStartOffset = w.getLength(); + appendEntry(w, TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(region + seq), + FAMILY, QUALIFIER, VALUE, seq++); + w.close(); + + // Find where the 2nd record starts. + WALFactory wf = WALFactory.getInstance(conf); + FileSystem walFs = walPath.getFileSystem(conf); + Reader logFileReader = wf.createReader(walFs, walPath, null, false); + int recordPosition, recordEndPos; + try { + Object o = WALSplitter.getNextLogLine(logFileReader, walPath, false); + assertNotNull(o); + recordPosition = (int)logFileReader.getPosition(); + LOG.info("Found the 2nd record at {}", recordPosition); + + o = WALSplitter.getNextLogLine(logFileReader, walPath, false); + assertNotNull(o); + recordEndPos = (int)logFileReader.getPosition(); + LOG.info("Found the 3rd record at {}", recordEndPos); + } finally { + logFileReader.close(); + } + + // Now find the protobuf data size. + int fileSize = (int) walFs.listStatus(walPath)[0].getLen(); + byte[] fileContents = new byte[fileSize]; + FSDataInputStream fdis = walFs.open(walPath); + try { + // WALProtos.WALKey.Builder builder = WALProtos.WALKey.newBuilder(); + fdis.seek(recordPosition); + int firstByte = fdis.read(); + assertFalse(firstByte == -1); + int pbLen = CodedInputStream.readRawVarint32(firstByte, fdis); + recordPosition = (int)fdis.getPos() + pbLen; + LOG.info("Found the 2rd record entries at {}", recordPosition); + // ProtobufUtil.mergeFrom(builder, ByteStreams.limit(fdis, size), (int)size); builder.build(); + fdis.seek(0); + fdis.readFully(0, fileContents, 0, fileSize); + } finally { + fdis.close(); + } + + recordPosition += 4; + LOG.info("Corrupting the file {} {}", walPath, fileSize); + byte[] badData = new byte[recordEndPos - recordPosition]; + Arrays.fill(badData, (byte)255); + walFs.delete(walPath, false); + FSDataOutputStream out = walFs.create(walPath); + try { + out.write(fileContents, 0, recordPosition); + out.write(badData); + out.write(fileContents, recordEndPos, fileContents.length - recordEndPos); + } finally { + out.close(); + } + + LOG.info("Splitting the file {}", walPath); + splitAndCount(region, 2); + } + @Test public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, true);