diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index b18e70f..eadea6e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Arrays; @@ -34,6 +35,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.io.LimitInputStream; +import com.google.protobuf.CodedInputStream; import com.google.protobuf.InvalidProtocolBufferException; /** @@ -190,19 +193,32 @@ public class ProtobufLogReader extends ReaderBase { @Override protected boolean readNext(HLog.Entry entry) throws IOException { while (true) { - if (trailerPresent && this.inputStream.getPos() == this.walEditsStopOffset) return false; + long originalPosition = this.inputStream.getPos(); + if (trailerPresent && originalPosition == this.walEditsStopOffset) return false; WALKey.Builder builder = WALKey.newBuilder(); - boolean hasNext = false; + int size = 0; + // See if available is any good to us. Record before we start reading. My guess is that it + // does not change once reader has been opened but check see. + int originalAvailable = this.inputStream.available(); try { - hasNext = builder.mergeDelimitedFrom(inputStream); + int firstByte = this.inputStream.read(); + if (firstByte == -1) return false; + size = CodedInputStream.readRawVarint32(firstByte, this.inputStream); + if (this.inputStream.available() < size) return false; + final InputStream limitedInput = new LimitInputStream(this.inputStream, size); + builder.mergeFrom(limitedInput); } catch (InvalidProtocolBufferException ipbe) { - LOG.error("Invalid PB while reading WAL, probably an unexpected EOF, ignoring", ipbe); + LOG.warn("Invalid PB, EOF? Ignoring; originalPosition=" + originalPosition + + ", currentPosition=" + this.inputStream.getPos() + ", messageSize=" + size + + ", originalAvailable=" + originalAvailable + ", currentAvailable=" + + this.inputStream.available(), ipbe); + // Return here since we've already dumped out log on problem pb parsing. + return false; } - if (!hasNext) return false; if (!builder.isInitialized()) { // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit. // If we can get the KV count, we could, theoretically, try to get next record. - LOG.error("Partial PB while reading WAL, probably an unexpected EOF, ignoring"); + LOG.warn("Partial PB while reading WAL, probably an unexpected EOF, ignoring"); return false; } WALKey walKey = builder.build(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 199c8d7..5259d22 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -567,7 +567,9 @@ public class ReplicationSource extends Thread */ protected boolean sleepForRetries(String msg, int sleepMultiplier) { try { - LOG.debug(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier); + if (LOG.isTraceEnabled()) { + LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier); + } Thread.sleep(this.sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { LOG.debug("Interrupted while sleeping between retries");