diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index f7721e0934..4a60d64607 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -307,6 +307,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { return; } + // Note: this can potentially write a corrupted cell undiscovered. sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(), CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 66795978f2..1cb048678b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -294,12 +294,19 @@ public class WALSplitter { editsSkipped++; continue; } - // Don't send Compaction/Close/Open region events to recovered edit type sinks. - if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) { - editsSkipped++; + try { + // Don't send Compaction/Close/Open region events to recovered edit type sinks. + if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) { + editsSkipped++; + continue; + } + entryBuffers.appendEntry(entry); + } catch (InterruptedException | IOException ex) { + throw ex; + } catch (Exception ex) { + logCorruptedEntry(entry, ex); continue; } - entryBuffers.appendEntry(entry); editsCount++; int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck; // If sufficient edits have passed, check if we should report progress. @@ -361,6 +368,15 @@ public class WALSplitter { return !progress_failed; } + private static void logCorruptedEntry(Entry entry, Exception ex) { + // A WAL like this is sometimes created due to HBASE-21601 (root cause unknown for now). + // In that case, skipping this entry is harmless. If this is a case of real data loss, + // it's better to lose one corrupted record and not lose the entire WAL and block a + // bunch of regions. + LOG.error("Skipping an entry with intact structure and corrupted data for WAL key {}", + entry.getKey(), ex); + } + /** * Completes the work done by splitLogFile by archiving logs *
@@ -894,11 +910,14 @@ public class WALSplitter { long incrHeap; synchronized (this) { buffer = buffers.get(key.getEncodedRegionName()); - if (buffer == null) { + boolean isNew = buffer == null; + if (isNew) { buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName()); + } + incrHeap = buffer.appendEntry(entry); + if (isNew) { buffers.put(key.getEncodedRegionName(), buffer); } - incrHeap= buffer.appendEntry(entry); } // If we crossed the chunk threshold, wait for more space to be available @@ -1574,16 +1593,25 @@ public class WALSplitter { int editsCount = 0; for (Entry logEntry : entries) { - if (wap == null) { - wap = getWriterAndPath(logEntry, reusable); + try { if (wap == null) { - // This log spews the full edit. Can be massive in the log. Enable only debugging - // WAL lost edit issues. - LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry); - return null; + wap = getWriterAndPath(logEntry, reusable); + if (wap == null) { + // This log spews the full edit. Can be massive in the log. Enable only debugging + // WAL lost edit issues. + LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry); + return null; + } } + filterCellByStore(logEntry); + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { + logCorruptedEntry(logEntry, ex); + continue; } - filterCellByStore(logEntry); + // Do not handle corrupted entries during append; it could be in the middle of the write. + // Also ExtendedCell impls generally dump the buffer as is so they won't fail like this. if (!logEntry.getEdit().isEmpty()) { wap.w.append(logEntry); this.updateRegionMaximumEditLogSeqNum(logEntry); @@ -1911,4 +1939,28 @@ public class WALSplitter { return mutations; } + + /** Tries to replay a single WAL - useful for debugging. */ + public static void main(String[] args) throws Exception { + if (args.length != 1) { + System.err.println("File name missing"); + return; + } + Configuration conf = HBaseConfiguration.create(); + conf.set("fs.default.name", "file:///"); + + Path p = new Path(args[0]); + FileSystem fs = p.getFileSystem(conf); + + Path hbaseRootdir = new Path("rootdir"); + FSUtils.setRootDir(conf, hbaseRootdir); + fs.mkdirs(hbaseRootdir); + FSUtils.setVersion(fs, hbaseRootdir); + + + WALFactory wf = new WALFactory(conf, "main"); + WALSplitter logSplitter = new WALSplitter(wf, conf, hbaseRootdir, fs, null, null); + logSplitter.splitLogFile(fs.getFileStatus(p), null); + } + }