diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 22c5cc1..542f23c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.ipc; import java.io.DataInput; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; @@ -191,11 +190,9 @@ public class IPCUtil { * @throws IOException */ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, - final ByteBuffer cellBlock) - throws IOException { + ByteBuffer cellBlock) throws IOException { // If compressed, decompress it first before passing it on else we will leak compression // resources if the stream is not closed properly after we let it out. - InputStream is = null; if (compressor != null) { // GZIPCodec fails w/ NPE if no configuration. if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); @@ -210,18 +207,13 @@ public class IPCUtil { this.cellBlockDecompressionMultiplier); IOUtils.copy(cis, bbos); bbos.close(); - ByteBuffer bb = bbos.getByteBuffer(); - is = new ByteBufferInputStream(bb); + cellBlock = bbos.getByteBuffer(); } finally { - if (is != null) is.close(); if (bbos != null) bbos.close(); - CodecPool.returnDecompressor(poolDecompressor); } - } else { - is = new ByteBufferInputStream(cellBlock); } - return codec.getDecoder(is); + return codec.getDecoder(cellBlock); } /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java index 666f440..d6b64f6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java @@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.util.Bytes; /** @@ -116,6 +118,11 @@ public class CellCodec implements Codec { } @Override + public Decoder getDecoder(ByteBuffer buf) { + return getDecoder(new ByteBufferInputStream(buf)); + } + + @Override public Encoder getEncoder(OutputStream os) { return new CellEncoder(os); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java index d79be17..7326884 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java @@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.util.Bytes; /** @@ -117,6 +119,11 @@ public class CellCodecWithTags implements Codec { } @Override + public Decoder getDecoder(ByteBuffer buf) { + return getDecoder(new ByteBufferInputStream(buf)); + } + + @Override public Encoder getEncoder(OutputStream os) { return new CellEncoder(os); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java index de44ec6..c8a4cdc 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.codec; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -50,5 +51,6 @@ public interface Codec { interface Decoder extends CellScanner {}; Decoder getDecoder(InputStream is); + Decoder getDecoder(ByteBuffer buf); Encoder getEncoder(OutputStream os); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java index f99bfcb..31be5e5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java @@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.NoTagsKeyValue; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; /** * Codec that does KeyValue version 1 serialization. @@ -69,6 +72,35 @@ public class KeyValueCodec implements Codec { } } + public static class ByteBufferedKeyValueDecoder implements Codec.Decoder { + + protected final ByteBuffer buf; + protected Cell current = null; + + public ByteBufferedKeyValueDecoder(ByteBuffer buf) { + this.buf = buf; + } + + @Override + public boolean advance() throws IOException { + if (this.buf.remaining() <= 0) return false; + int len = ByteBufferUtils.toInt(buf); + assert buf.hasArray(); + this.current = createCell(buf.array(), buf.arrayOffset() + buf.position(), len); + buf.position(buf.position() + len); + return true; + } + + @Override + public Cell current() { + return this.current; + } + + protected Cell createCell(byte[] buf, int offset, int len) { + return new NoTagsKeyValue(buf, offset, len); + } + } + /** * Implementation depends on {@link InputStream#available()} */ @@ -78,6 +110,11 @@ public class KeyValueCodec implements Codec { } @Override + public Decoder getDecoder(ByteBuffer buf) { + return new ByteBufferedKeyValueDecoder(buf); + } + + @Override public Encoder getEncoder(OutputStream os) { return new KeyValueEncoder(os); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java index ad762b4..714cc38 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java @@ -20,9 +20,11 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -75,6 +77,18 @@ public class KeyValueCodecWithTags implements Codec { } } + public static class ByteBufferedKeyValueDecoder + extends KeyValueCodec.ByteBufferedKeyValueDecoder { + + public ByteBufferedKeyValueDecoder(ByteBuffer buf) { + super(buf); + } + + protected Cell createCell(byte[] buf, int offset, int len) { + return new KeyValue(buf, offset, len); + } + } + /** * Implementation depends on {@link InputStream#available()} */ @@ -87,4 +101,9 @@ public class KeyValueCodecWithTags implements Codec { public Encoder getEncoder(OutputStream os) { return new KeyValueEncoder(os); } + + @Override + public Decoder getDecoder(ByteBuffer buf) { + return new ByteBufferedKeyValueDecoder(buf); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java index 0e1c3ae..dba2a77 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.util; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -25,6 +26,7 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import com.google.common.base.Preconditions; @@ -251,4 +253,24 @@ public class StreamUtils { } return result; } + + public static int readInt(InputStream in) throws IOException { + return readInt(in, new byte[Bytes.SIZEOF_INT]); + } + + public static int readInt(InputStream in, byte[] readInBuf) throws IOException { + assert readInBuf.length >= Bytes.SIZEOF_INT; + int bytesRead = 0; + while (bytesRead < readInBuf.length) { + int n = in.read(readInBuf, bytesRead, readInBuf.length - bytesRead); + if (n < 0) { + if (bytesRead == 0) { + throw new EOFException(); + } + throw new IOException("Failed read of int, read " + bytesRead + " bytes"); + } + bytesRead += n; + } + return Bytes.toInt(readInBuf); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index 7f3d777..6b13951 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -782,6 +782,19 @@ public final class ByteBufferUtils { } /** + * Reads an int value at the given buffer's current position. Also advances the buffer's position + */ + public static int toInt(ByteBuffer buffer) { + if (UNSAFE_UNALIGNED) { + int i = UnsafeAccess.toInt(buffer, buffer.position()); + buffer.position(buffer.position() + Bytes.SIZEOF_INT); + return i; + } else { + return buffer.getInt(); + } + } + + /** * Reads an int value at the given buffer's offset. * @param buffer * @param offset diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java index 6c894a5..8f08539 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java @@ -20,9 +20,11 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -80,6 +82,11 @@ public class MessageCodec implements Codec { } @Override + public Decoder getDecoder(ByteBuffer buf) { + return getDecoder(new ByteBufferInputStream(buf)); + } + + @Override public Encoder getEncoder(OutputStream os) { return new MessageEncoder(os); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 05929fa..6b89e89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.codec.BaseDecoder; import org.apache.hadoop.hbase.codec.BaseEncoder; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.util.Dictionary; import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.util.Bytes; @@ -348,6 +350,11 @@ public class WALCellCodec implements Codec { } @Override + public Decoder getDecoder(ByteBuffer buf) { + return getDecoder(new ByteBufferInputStream(buf)); + } + + @Override public Encoder getEncoder(OutputStream os) { return (compression == null) ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java index d99643d..3a9ace2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java @@ -221,7 +221,8 @@ public class TestTags { CellScanner cellScanner = result.cellScanner(); cellScanner.advance(); KeyValue current = (KeyValue) cellScanner.current(); - assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength()); + assertTrue(current.getValueOffset() + current.getValueLength() == current.getOffset() + + current.getLength()); } } finally { if (scanner != null) @@ -239,7 +240,8 @@ public class TestTags { CellScanner cellScanner = result.cellScanner(); cellScanner.advance(); KeyValue current = (KeyValue) cellScanner.current(); - assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength()); + assertTrue(current.getValueOffset() + current.getValueLength() == current.getOffset() + + current.getLength()); } } finally { if (scanner != null) {