From 7757a509cf2deaf0fb0831f86440452f238ee2a1 Mon Sep 17 00:00:00 2001 From: Sean Busbey Date: Tue, 7 Jun 2016 16:00:46 -0500 Subject: [PATCH] HBASE-15984 Handle premature EOF treatment of WALs in replication. In some particular deployments, the Replication code believes it has reached EOF for a WAL prior to succesfully parsing all bytes known to exist in a cleanly closed file. Consistently this failure happens due to an InvalidProtobufException after some number of seeks during our attempts to tail the in-progress RegionServer WAL. As a work-around, this patch treats cleanly closed files differently than other execution paths. If an EOF is detected due to parsing or other errors while there are still unparsed bytes before the end-of-file trailer, we now reset the WAL to the very beginning and attempt a clean read-through. In current testing, a single such reset is sufficient to work around observed dataloss. However, the above change will retry a given WAL file indefinitely. On each such attempt, a log message like the below will be emitted at the WARN level: Processing end of WAL file '{}'. At position {}, which is too far away from reported file length {}. Restarting WAL reading (see HBASE-15983 for details). Additionally, this patch adds some additional log detail at the TRACE level about file offsets seen while handling recoverable errors. --- .../hbase/regionserver/wal/ProtobufLogReader.java | 33 +++++++++++++++++----- .../regionserver/ReplicationSource.java | 31 +++++++++++++++----- .../regionserver/ReplicationWALReaderManager.java | 10 +++++++ 3 files changed, 60 insertions(+), 14 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 0755358..6dbee7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -97,6 +97,21 @@ public class ProtobufLogReader extends ReaderBase { // cell codec classname private String codecClsName = null; + @InterfaceAudience.Private + public long trailerSize() { + if (trailerPresent) { + // sizeof PB_WAL_COMPLETE_MAGIC + sizof trailerSize + trailer + final long calculatedSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT + trailer.getSerializedSize(); + final long expectedSize = fileLength - walEditsStopOffset; + if (expectedSize != calculatedSize) { + LOG.warn("After parsing the trailer, we expect the total footer to be "+ expectedSize +" bytes, but we calculate it as being " + calculatedSize); + } + return expectedSize; + } else { + return -1L; + } + } + enum WALHdrResult { EOF, // stream is at EOF when method starts SUCCESS, @@ -217,7 +232,7 @@ public class ProtobufLogReader extends ReaderBase { this.seekOnFs(currentPosition); if (LOG.isTraceEnabled()) { LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset - + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + trailerPresent); + + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") + ", currentPosition: " + currentPosition); } codecClsName = hdrCtxt.getCellCodecClsName(); @@ -313,6 +328,7 @@ public class ProtobufLogReader extends ReaderBase { // OriginalPosition might be < 0 on local fs; if so, it is useless to us. long originalPosition = this.inputStream.getPos(); if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) { + LOG.trace("Reached end of expected edits area at offset " + originalPosition); return false; } WALKey.Builder builder = WALKey.newBuilder(); @@ -322,7 +338,7 @@ public class ProtobufLogReader extends ReaderBase { try { int firstByte = this.inputStream.read(); if (firstByte == -1) { - throw new EOFException("First byte is negative"); + throw new EOFException("First byte is negative at offset " + originalPosition); } size = CodedInputStream.readRawVarint32(firstByte, this.inputStream); // available may be < 0 on local fs for instance. If so, can't depend on it. @@ -330,7 +346,7 @@ public class ProtobufLogReader extends ReaderBase { if (available > 0 && available < size) { throw new EOFException("Available stream not enough for edit, " + "inputStream.available()= " + this.inputStream.available() + ", " + - "entry size= " + size); + "entry size= " + size + " at offset = " + this.inputStream.getPos()); } ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size), (int)size); @@ -343,12 +359,12 @@ public class ProtobufLogReader extends ReaderBase { // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit. // If we can get the KV count, we could, theoretically, try to get next record. throw new EOFException("Partial PB while reading WAL, " + - "probably an unexpected EOF, ignoring"); + "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos()); } WALKey walKey = builder.build(); entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor); if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) { - LOG.trace("WALKey has no KVs that follow it; trying the next one"); + LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" + this.inputStream.getPos()); continue; } int expectedCells = walKey.getFollowingKvCount(); @@ -378,11 +394,14 @@ public class ProtobufLogReader extends ReaderBase { throw new EOFException("Read WALTrailer while reading WALEdits"); } } catch (EOFException eof) { - LOG.trace("Encountered a malformed edit, seeking back to last good position in file", eof); // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs) - if (originalPosition < 0) throw eof; + if (originalPosition < 0) { + LOG.trace("Encountered a malformed edit, but can't seek back to last good position because originalPosition is negative. last offset=" + this.inputStream.getPos(), eof); + throw eof; + } // Else restore our position to original location in hope that next time through we will // read successfully. + LOG.trace("Encountered a malformed edit, seeking back to last good position in file, from "+ inputStream.getPos()+" to " + originalPosition, eof); seekOnFs(originalPosition); return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 84e0787..222d73d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -1086,16 +1086,33 @@ public class ReplicationSource extends Thread @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE", justification = "Yeah, this is how it works") protected boolean processEndOfFile() { + // We presume this means the file we're reading is closed. if (this.queue.size() != 0) { - if (LOG.isTraceEnabled()) { - String filesize = "N/A"; - try { - FileStatus stat = fs.getFileStatus(this.currentPath); - filesize = stat.getLen() + ""; - } catch (IOException ex) { + // -1 means the wal wasn't closed cleanly. + final long trailerSize = this.repLogReader.currentTrailerSize(); + final long currentPosition = this.repLogReader.getPosition(); + FileStatus stat = null; + try { + stat = fs.getFileStatus(this.currentPath); + } catch (IOException exception) { + LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + (trailerSize < 0 ? "was not" : "was") + " closed cleanly" + + ", stats: " + getStats()); + } + if (stat != null) { + if (trailerSize < 0) { + if (currentPosition < stat.getLen()) { + LOG.info("Reached the end of WAL file '" + currentPath + "'. It was not closed cleanly, so we did not parse " + (stat.getLen() - currentPosition) + " bytes of data."); + } + } else if (currentPosition + trailerSize < stat.getLen()){ + LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition + ", which is too far away from reported file length " + stat.getLen() + + ". Restarting WAL reading (see HBASE-15983 for details). stats: " + getStats()); + repLogReader.setPosition(0); + return false; } + } + if (LOG.isTraceEnabled()) { LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats() - + ", and the length of the file is " + filesize); + + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen())); } this.currentPath = null; this.repLogReader.finishCurrentFile(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java index b63f66b..3558d08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; @@ -119,6 +120,15 @@ public class ReplicationWALReaderManager { this.position = pos; } + public long currentTrailerSize() { + long size = -1L; + if (reader instanceof ProtobufLogReader) { + final ProtobufLogReader pblr = (ProtobufLogReader)reader; + size = pblr.trailerSize(); + } + return size; + } + /** * Close the current reader * @throws IOException -- 2.7.2