Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (revision 1519015) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (working copy) @@ -21,6 +21,7 @@ import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Arrays; @@ -34,6 +35,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.io.LimitInputStream; +import com.google.protobuf.CodedInputStream; import com.google.protobuf.InvalidProtocolBufferException; /** @@ -147,7 +150,7 @@ ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length); this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity()); if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) { - LOG.warn("No trailer found."); + LOG.trace("No trailer found."); return false; } if (trailerSize < 0) { @@ -190,58 +193,74 @@ @Override protected boolean readNext(HLog.Entry entry) throws IOException { while (true) { - if (trailerPresent && this.inputStream.getPos() == this.walEditsStopOffset) return false; + long originalPosition = this.inputStream.getPos(); + if (trailerPresent && originalPosition == this.walEditsStopOffset) return false; WALKey.Builder builder = WALKey.newBuilder(); - boolean hasNext = false; + int size = 0; try { - hasNext = builder.mergeDelimitedFrom(inputStream); - } catch (InvalidProtocolBufferException ipbe) { - LOG.error("Invalid PB while reading WAL, probably an unexpected EOF, ignoring", ipbe); - } - if (!hasNext) return false; - if (!builder.isInitialized()) { - // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit. - // If we can get the KV count, we could, theoretically, try to get next record. - LOG.error("Partial PB while reading WAL, probably an unexpected EOF, ignoring"); - return false; - } - WALKey walKey = builder.build(); - entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor); - if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) { - LOG.warn("WALKey has no KVs that follow it; trying the next one"); - continue; - } - int expectedCells = walKey.getFollowingKvCount(); - long posBefore = this.inputStream.getPos(); - try { - int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells); - if (expectedCells != actualCells) { - throw new EOFException("Only read " + actualCells); // other info added in catch + int originalAvailable = this.inputStream.available(); + try { + int firstByte = this.inputStream.read(); + if (firstByte == -1) { + throw new EOFException("First byte is negative"); + } + size = CodedInputStream.readRawVarint32(firstByte, this.inputStream); + if (this.inputStream.available() < size) { + throw new EOFException("Available stream not enough for edit, " + + "inputStream.available()= " + this.inputStream.available() + ", " + + "entry size= " + size); + } + final InputStream limitedInput = new LimitInputStream(this.inputStream, size); + builder.mergeFrom(limitedInput); + } catch (InvalidProtocolBufferException ipbe) { + throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" + + originalPosition + ", currentPosition=" + this.inputStream.getPos() + + ", messageSize=" + size + ", originalAvailable=" + originalAvailable + + ", currentAvailable=" + this.inputStream.available()).initCause(ipbe); } - } catch (Exception ex) { - String posAfterStr = ""; + if (!builder.isInitialized()) { + // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit. + // If we can get the KV count, we could, theoretically, try to get next record. + throw new EOFException("Partial PB while reading WAL, " + + "probably an unexpected EOF, ignoring"); + } + WALKey walKey = builder.build(); + entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor); + if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) { + LOG.trace("WALKey has no KVs that follow it; trying the next one"); + continue; + } + int expectedCells = walKey.getFollowingKvCount(); + long posBefore = this.inputStream.getPos(); try { - posAfterStr = this.inputStream.getPos() + ""; - } catch (Throwable t) { - LOG.trace("Error getting pos for error message - ignoring", t); + int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells); + if (expectedCells != actualCells) { + throw new EOFException("Only read " + actualCells); // other info added in catch + } + } catch (Exception ex) { + String posAfterStr = ""; + try { + posAfterStr = this.inputStream.getPos() + ""; + } catch (Throwable t) { + LOG.trace("Error getting pos for error message - ignoring", t); + } + String message = " while reading " + expectedCells + " WAL KVs; started reading at " + + posBefore + " and read up to " + posAfterStr; + IOException realEofEx = extractHiddenEof(ex); + throw (EOFException) new EOFException("EOF " + message). + initCause(realEofEx != null ? ex : realEofEx); } - String message = " while reading " + expectedCells + " WAL KVs; started reading at " - + posBefore + " and read up to " + posAfterStr; - IOException realEofEx = extractHiddenEof(ex); - if (realEofEx != null) { - LOG.error("EOF " + message, realEofEx); - return false; + if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) { + LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path + + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: " + + this.walEditsStopOffset); + throw new EOFException("Read WALTrailer while reading WALEdits"); } - message = "Error " + message; - LOG.error(message); - throw new IOException(message, ex); + } catch (EOFException eof) { + LOG.trace("Encountered a malformed edit, seeking back to last good position in file", eof); + seekOnFs(originalPosition); + return false; } - if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) { - LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path - + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: " - + this.walEditsStopOffset); - throw new IOException("Read WALTrailer while reading WALEdits"); - } return true; } }