diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 623acd5..6061b5b 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -20,10 +20,7 @@ package org.apache.hadoop.hbase.protobuf; import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier .RegionSpecifierType.REGION_NAME; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; +import java.io.*; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -46,6 +43,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; @@ -3162,19 +3160,21 @@ public final class ProtobufUtil { * @param in Inputsream with delimited protobuf data * @throws IOException */ - public static void mergeDelimitedFrom(Message.Builder builder, InputStream in) + public static int mergeDelimitedFrom(Message.Builder builder, InputStream in) throws IOException { // This used to be builder.mergeDelimitedFrom(in); // but is replaced to allow us to bump the protobuf size limit. + int size = 0; final int firstByte = in.read(); if (firstByte != -1) { - final int size = CodedInputStream.readRawVarint32(firstByte, in); + size = CodedInputStream.readRawVarint32(firstByte, in); final InputStream limitedInput = new LimitInputStream(in, size); final CodedInputStream codedInput = CodedInputStream.newInstance(limitedInput); codedInput.setSizeLimit(size); builder.mergeFrom(codedInput); codedInput.checkLastTagWas(0); } + return size; } /** @@ -3191,6 +3191,12 @@ public final class ProtobufUtil { codedInput.setSizeLimit(size); builder.mergeFrom(codedInput); codedInput.checkLastTagWas(0); + if (codedInput.getTotalBytesRead() < size) { + throw new EOFException("Available stream not enough for the PB message, " + + "inputStream.available()= " + in.available() + ", " + + "entry size= " + size + " at offset =" + + (in instanceof Seekable? ((Seekable) in).getPos() : 0)); + } } /** diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java index 2609398..1ff994a 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java @@ -135,9 +135,6 @@ public class KeyValueCodec implements Codec { } } - /** - * Implementation depends on {@link InputStream#available()} - */ @Override public Decoder getDecoder(final InputStream is) { return new KeyValueDecoder(is); diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java index 63c02e8..45a5ae3 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java @@ -90,9 +90,6 @@ public class KeyValueCodecWithTags implements Codec { } } - /** - * Implementation depends on {@link InputStream#available()} - */ @Override public Decoder getDecoder(final InputStream is) { return new KeyValueDecoder(is); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java index ef6370e..8803ff0 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java @@ -26,6 +26,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.CellComparator.MetaCellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -259,10 +260,9 @@ public class FixedFileTrailer { */ void deserializeFromPB(DataInputStream inputStream) throws IOException { // read PB and skip padding - int start = inputStream.available(); - HFileProtos.FileTrailerProto trailerProto = - HFileProtos.FileTrailerProto.PARSER.parseDelimitedFrom(inputStream); - int size = start - inputStream.available(); + HFileProtos.FileTrailerProto.Builder builder = HFileProtos.FileTrailerProto.newBuilder(); + int size = ProtobufUtil.mergeDelimitedFrom(builder, inputStream); + HFileProtos.FileTrailerProto trailerProto = builder.build(); inputStream.skip(getTrailerSize() - NOT_PB_SIZE - size); // process the PB diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 15bff8b..6d194f5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -336,26 +336,21 @@ public class ProtobufLogReader extends ReaderBase { WALKey.Builder builder = WALKey.newBuilder(); long size = 0; try { - long available = -1; try { int firstByte = this.inputStream.read(); if (firstByte == -1) { throw new EOFException("First byte is negative at offset " + originalPosition); } + // Implementation note: NEVER rely on InputStream.available() to return the remaining + // bytes in the stream. Not all IS's do that. Read the javadoc. size = CodedInputStream.readRawVarint32(firstByte, this.inputStream); - // available may be < 0 on local fs for instance. If so, can't depend on it. - available = this.inputStream.available(); - if (available > 0 && available < size) { - throw new EOFException("Available stream not enough for edit, " + - "inputStream.available()= " + this.inputStream.available() + ", " + - "entry size= " + size + " at offset = " + this.inputStream.getPos()); - } ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size), (int)size); } catch (InvalidProtocolBufferException ipbe) { throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" + originalPosition + ", currentPosition=" + this.inputStream.getPos() + - ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe); + ", messageSize=" + size + ", currentAvailable=" + inputStream.available()) + .initCause(ipbe); } if (!builder.isInitialized()) { // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java index a6f70c3..727621f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java @@ -425,7 +425,7 @@ public class RegionSplitter { private static byte [] readFile(final FileSystem fs, final Path path) throws IOException { FSDataInputStream tmpIn = fs.open(path); try { - byte [] rawData = new byte[tmpIn.available()]; + byte [] rawData = new byte[(int) fs.getFileStatus(path).getLen()]; tmpIn.readFully(rawData); return rawData; } finally { @@ -838,9 +838,11 @@ public class RegionSplitter { // parse split file and process remaining splits FSDataInputStream tmpIn = fs.open(splitFile); - StringBuilder sb = new StringBuilder(tmpIn.available()); - while (tmpIn.available() > 0) { + long len = fs.getFileStatus(splitFile).getLen(), offset = 0; + StringBuilder sb = new StringBuilder((int)len); + while (offset < len) { sb.append(tmpIn.readChar()); + offset += 2; } tmpIn.close(); for (String line : sb.toString().split("\n")) {