diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 3630e9b..7534e9d 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.util.Bytes.len; import java.io.DataInput; import java.io.DataOutput; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -2400,8 +2401,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, * Create a KeyValue reading from the raw InputStream. * Named iscreate so doesn't clash with {@link #create(DataInput)} * @param in - * @return Created KeyValue OR if we find a length of zero, we will return null which - * can be useful marking a stream as done. + * @return Created KeyValue or throws an exception * @throws IOException * {@link Deprecated} As of 1.2. Use {@link KeyValueUtil#iscreate(InputStream, boolean)} instead. */ @@ -2412,7 +2412,9 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, while (bytesRead < intBytes.length) { int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead); if (n < 0) { - if (bytesRead == 0) return null; // EOF at start is ok + if (bytesRead == 0) { + throw new EOFException(); + } throw new IOException("Failed read of int, read " + bytesRead + " bytes"); } bytesRead += n; diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index 24d88b3..98e2205 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase; import java.io.DataInput; import java.io.DataOutput; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -187,7 +188,7 @@ public class KeyValueUtil { * position to the start of the next KeyValue. Does not allocate a new array or copy data. * @param bb * @param includesMvccVersion - * @param includesTags + * @param includesTags */ public static KeyValue nextShallowCopy(final ByteBuffer bb, final boolean includesMvccVersion, boolean includesTags) { @@ -231,7 +232,7 @@ public class KeyValueUtil { return createFirstOnRow(CellUtil.cloneRow(in), CellUtil.cloneFamily(in), CellUtil.cloneQualifier(in), in.getTimestamp() - 1); } - + /** * Create a KeyValue for the specified row, family and qualifier that would be @@ -449,6 +450,7 @@ public class KeyValueUtil { @Deprecated public static List ensureKeyValues(List cells) { List lazyList = Lists.transform(cells, new Function() { + @Override public KeyValue apply(Cell arg0) { return KeyValueUtil.ensureKeyValue(arg0); } @@ -491,8 +493,9 @@ public class KeyValueUtil { while (bytesRead < intBytes.length) { int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead); if (n < 0) { - if (bytesRead == 0) - return null; // EOF at start is ok + if (bytesRead == 0) { + throw new EOFException(); + } throw new IOException("Failed read of int, read " + bytesRead + " bytes"); } bytesRead += n; @@ -555,7 +558,7 @@ public class KeyValueUtil { /** * Create a KeyValue reading length from in - * + * * @param length * @param in * @return Created KeyValue OR if we find a length of zero, we will return diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java index 51801a8..09dc37f 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java @@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.codec; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.PushbackInputStream; + +import javax.annotation.Nonnull; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,27 +35,41 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public abstract class BaseDecoder implements Codec.Decoder { protected static final Log LOG = LogFactory.getLog(BaseDecoder.class); - protected final InputStream in; - private boolean hasNext = true; + + protected final PBIS in; private Cell current = null; + protected static class PBIS extends PushbackInputStream { + public PBIS(InputStream in, int size) { + super(in, size); + } + + public void resetBuf(int size) { + this.buf = new byte[size]; + this.pos = size; + } + } + public BaseDecoder(final InputStream in) { - this.in = in; + this.in = new PBIS(in, 1); } @Override public boolean advance() throws IOException { - if (!this.hasNext) return this.hasNext; - if (this.in.available() == 0) { - this.hasNext = false; - return this.hasNext; + int firstByte = in.read(); + if (firstByte == -1) { + return false; + } else { + in.unread(firstByte); } + try { this.current = parseCell(); } catch (IOException ioEx) { + in.resetBuf(1); // reset the buffer in case the underlying stream is read from upper layers rethrowEofException(ioEx); } - return this.hasNext; + return true; } private void rethrowEofException(IOException ioEx) throws IOException { @@ -72,9 +89,12 @@ public abstract class BaseDecoder implements Codec.Decoder { } /** - * @return extract a Cell + * Extract a Cell. + * @return a parsed Cell or throws an Exception. EOFException or a generic IOException maybe + * thrown if EOF is reached prematurely. Does not return null. * @throws IOException */ + @Nonnull protected abstract Cell parseCell() throws IOException; @Override diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java index a54c76e..666f440 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java @@ -79,6 +79,7 @@ public class CellCodec implements Codec { super(in); } + @Override protected Cell parseCell() throws IOException { byte [] row = readByteArray(this.in); byte [] family = readByteArray(in); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java index 46f3b88..69181e5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -84,12 +83,8 @@ public class SecureWALCellCodec extends WALCellCodec { return super.parseCell(); } int ivLength = 0; - try { - ivLength = StreamUtils.readRawVarint32(in); - } catch (EOFException e) { - // EOF at start is OK - return null; - } + + ivLength = StreamUtils.readRawVarint32(in); // TODO: An IV length of 0 could signify an unwrapped cell, when the // encoder supports that just read the remainder in directly diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 3fa2ed7..3d99523 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -855,9 +855,10 @@ public class ReplicationSource extends Thread int distinctRowKeys = 1; Cell lastCell = cells.get(0); for (int i = 0; i < edit.size(); i++) { - if (!CellUtil.matchingRow(cells.get(i), lastCell)) { + if (!CellUtil.matchingRows(cells.get(i), lastCell)) { distinctRowKeys++; } + lastCell = cells.get(i); } return distinctRowKeys; }