From 07545453e049fbba21588cab535e45a95b31699b Mon Sep 17 00:00:00 2001 From: Esteban Gutierrez Date: Fri, 18 May 2018 15:11:13 -0500 Subject: [PATCH] HBASE-20604 ProtobufLogReader#readNext can incorrectly loop to the same position in the stream until the the WAL is rolled --- .../hbase/regionserver/wal/ProtobufLogReader.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 5d8d8c00ab..29cebdf757 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -338,6 +338,7 @@ public class ProtobufLogReader extends ReaderBase { } WALKey.Builder builder = WALKey.newBuilder(); long size = 0; + boolean resetPosition = false; try { long available = -1; try { @@ -356,6 +357,7 @@ public class ProtobufLogReader extends ReaderBase { ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), (int)size); } catch (InvalidProtocolBufferException ipbe) { + resetPosition = true; throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" + originalPosition + ", currentPosition=" + this.inputStream.getPos() + ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe); @@ -380,6 +382,7 @@ public class ProtobufLogReader extends ReaderBase { try { int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells); if (expectedCells != actualCells) { + resetPosition = true; throw new EOFException("Only read " + actualCells); // other info added in catch } } catch (Exception ex) { @@ -416,7 +419,15 @@ public class ProtobufLogReader extends ReaderBase { if (LOG.isTraceEnabled()) { LOG.trace("Encountered a malformed edit, seeking back to last good position in file, from "+ inputStream.getPos()+" to " + originalPosition, eof); } - seekOnFs(originalPosition); + // If stuck at the same place and we got and exception, lets go back at the beginning. + if (inputStream.getPos() == originalPosition && resetPosition) { + if (LOG.isTraceEnabled()) { + LOG.trace("Seeking to the beginning of the WAL, current position " + originalPosition + " is the same as the original position."); + } + seekOnFs(0); + } else { + seekOnFs(originalPosition); + } return false; } return true; -- 2.17.0