Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (revision 1518708) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (working copy) @@ -21,6 +21,7 @@ import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Arrays; @@ -34,6 +35,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.io.LimitInputStream; +import com.google.protobuf.CodedInputStream; import com.google.protobuf.InvalidProtocolBufferException; /** @@ -147,7 +150,7 @@ ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length); this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity()); if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) { - LOG.warn("No trailer found."); + LOG.trace("No trailer found."); return false; } if (trailerSize < 0) { @@ -190,19 +193,40 @@ @Override protected boolean readNext(HLog.Entry entry) throws IOException { while (true) { - if (trailerPresent && this.inputStream.getPos() == this.walEditsStopOffset) return false; + long originalPosition = this.inputStream.getPos(); + if (trailerPresent && originalPosition == this.walEditsStopOffset) return false; WALKey.Builder builder = WALKey.newBuilder(); - boolean hasNext = false; + int size = 0; + // See if available is any good to us. Record before we start reading. My guess is that it + // does not change once reader has been opened but check see. + int originalAvailable = this.inputStream.available(); try { - hasNext = builder.mergeDelimitedFrom(inputStream); + int firstByte = this.inputStream.read(); + if (firstByte == -1) { + seekOnFs(originalPosition); + return false; + } + size = CodedInputStream.readRawVarint32(firstByte, this.inputStream); + if (this.inputStream.available() < size) { + seekOnFs(originalPosition); + return false; + } + final InputStream limitedInput = new LimitInputStream(this.inputStream, size); + builder.mergeFrom(limitedInput); } catch (InvalidProtocolBufferException ipbe) { - LOG.error("Invalid PB while reading WAL, probably an unexpected EOF, ignoring", ipbe); + LOG.warn("Invalid PB, EOF? Ignoring; originalPosition=" + originalPosition + + ", currentPosition=" + this.inputStream.getPos() + ", messageSize=" + size + + ", originalAvailable=" + originalAvailable + ", currentAvailable=" + + this.inputStream.available(), ipbe); + // Return here since we've already dumped out log on problem pb parsing. + seekOnFs(originalPosition); + return false; } - if (!hasNext) return false; if (!builder.isInitialized()) { // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit. // If we can get the KV count, we could, theoretically, try to get next record. - LOG.error("Partial PB while reading WAL, probably an unexpected EOF, ignoring"); + LOG.warn("Partial PB while reading WAL, probably an unexpected EOF, ignoring"); + seekOnFs(originalPosition); return false; } WALKey walKey = builder.build(); @@ -230,6 +254,7 @@ IOException realEofEx = extractHiddenEof(ex); if (realEofEx != null) { LOG.error("EOF " + message, realEofEx); + seekOnFs(originalPosition); return false; } message = "Error " + message;