diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index c9da738..d65de48 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -502,20 +502,9 @@ public class KeyValueUtil { * @throws IOException */ public static KeyValue iscreate(final InputStream in, boolean withTags) throws IOException { - byte[] intBytes = new byte[Bytes.SIZEOF_INT]; - int bytesRead = 0; - while (bytesRead < intBytes.length) { - int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead); - if (n < 0) { - if (bytesRead == 0) { - throw new EOFException(); - } - throw new IOException("Failed read of int, read " + bytesRead + " bytes"); - } - bytesRead += n; - } + int len = StreamUtils.readInt(in); // TODO: perhaps some sanity check is needed here. - byte[] bytes = new byte[Bytes.toInt(intBytes)]; + byte[] bytes = new byte[len]; IOUtils.readFully(in, bytes, 0, bytes.length); if (withTags) { return new KeyValue(bytes, 0, bytes.length); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java index 86f8678..a895d1c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; /** * TODO javadoc @@ -39,6 +40,7 @@ public abstract class BaseDecoder implements Codec.Decoder { protected final InputStream in; private Cell current = null; + private final boolean remainAwareIS; protected static class PBIS extends PushbackInputStream { public PBIS(InputStream in, int size) { @@ -52,22 +54,36 @@ public abstract class BaseDecoder implements Codec.Decoder { } public BaseDecoder(final InputStream in) { - this.in = new PBIS(in, 1); + if (in instanceof ByteBufferInputStream) { + this.in = in; + remainAwareIS = true; + } else { + this.in = new PBIS(in, 1); + remainAwareIS = false; + } } @Override public boolean advance() throws IOException { - int firstByte = in.read(); - if (firstByte == -1) { - return false; + if (remainAwareIS) { + if (in.available() == 0) { + return false; + } } else { - ((PBIS)in).unread(firstByte); + int firstByte = in.read(); + if (firstByte == -1) { + return false; + } else { + ((PBIS)in).unread(firstByte); + } } - try { this.current = parseCell(); } catch (IOException ioEx) { - ((PBIS)in).resetBuf(1); // reset the buffer in case the underlying stream is read from upper layers + if (!remainAwareIS) { + ((PBIS) in).resetBuf(1); // reset the buffer in case the underlying stream is read from + // upper layers + } rethrowEofException(ioEx); } return true; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java index f99bfcb..0c21263 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java @@ -20,11 +20,16 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.NoTagsKeyValue; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.util.StreamUtils; +import org.apache.hadoop.io.IOUtils; /** * Codec that does KeyValue version 1 serialization. @@ -64,8 +69,26 @@ public class KeyValueCodec implements Codec { } protected Cell parseCell() throws IOException { - // No tags here - return KeyValueUtil.iscreate(in, false); + if (in instanceof ByteBufferInputStream) { + // This stream is backed by a ByteBuffer. We can directly read the Cell length from this + // Buffer and create Cell instance directly on top of it. No need for reading into temp + // byte[] and create Cell on top of that. It saves lot of garbage. + ByteBufferInputStream bis = (ByteBufferInputStream) in; + int len = bis.readInt(); + ByteBuffer buf = bis.getBuffer(); + assert buf.hasArray(); + Cell c = createCell(buf.array(), buf.arrayOffset() + buf.position(), len); + bis.skip(len); + return c; + } + int len = StreamUtils.readInt(in); + byte[] bytes = new byte[len]; + IOUtils.readFully(in, bytes, 0, bytes.length); + return createCell(bytes, 0, len); + } + + protected Cell createCell(byte[] buf, int offset, int len) { + return new NoTagsKeyValue(buf, offset, len); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java index ad762b4..4e8d720 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java @@ -23,6 +23,7 @@ import java.io.OutputStream; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -64,14 +65,14 @@ public class KeyValueCodecWithTags implements Codec { } } - public static class KeyValueDecoder extends BaseDecoder { + public static class KeyValueDecoder extends KeyValueCodec.KeyValueDecoder { public KeyValueDecoder(final InputStream in) { super(in); } - protected Cell parseCell() throws IOException { - // create KeyValue with tags - return KeyValueUtil.iscreate(in, true); + @Override + protected Cell createCell(byte[] buf, int offset, int len) { + return new KeyValue(buf, offset, len); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java index 8aee07b..eb85655 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java @@ -17,11 +17,15 @@ */ package org.apache.hadoop.hbase.io; +import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; + +import com.sun.xml.internal.rngom.parse.compact.EOFException; /** * Not thread safe! @@ -52,6 +56,22 @@ public class ByteBufferInputStream extends InputStream { } /** + * Reads the next 4 bytes as an 'int' value from this input stream. If not enough bytes left in + * stream, throws EOFException + * + * @return Next 4 bytes of data read as int + * @throws IOException + */ + public int readInt() throws IOException{ + if(this.available()len bytes of data from buffer into passed array(starting from * given offset). * @param b the array into which the data is read. @@ -104,4 +124,8 @@ public class ByteBufferInputStream extends InputStream { public int available() { return this.buf.remaining(); } + + public ByteBuffer getBuffer(){ + return this.buf; + } } \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java index 0e1c3ae..dba2a77 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.util; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -25,6 +26,7 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import com.google.common.base.Preconditions; @@ -251,4 +253,24 @@ public class StreamUtils { } return result; } + + public static int readInt(InputStream in) throws IOException { + return readInt(in, new byte[Bytes.SIZEOF_INT]); + } + + public static int readInt(InputStream in, byte[] readInBuf) throws IOException { + assert readInBuf.length >= Bytes.SIZEOF_INT; + int bytesRead = 0; + while (bytesRead < readInBuf.length) { + int n = in.read(readInBuf, bytesRead, readInBuf.length - bytesRead); + if (n < 0) { + if (bytesRead == 0) { + throw new EOFException(); + } + throw new IOException("Failed read of int, read " + bytesRead + " bytes"); + } + bytesRead += n; + } + return Bytes.toInt(readInBuf); + } }