diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 22c5cc1..716271d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.ipc; import java.io.DataInput; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; @@ -178,9 +177,17 @@ public class IPCUtil { * @throws IOException */ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, - final byte [] cellBlock) - throws IOException { - return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock)); + final byte[] cellBlock) throws IOException { + // Use this method from Client side to create the CellScanner + ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock); + if (compressor != null) { + cellBlockBuf = decompress(compressor, cellBlockBuf); + } + // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will + // make Cells directly over the passed BB. This method is called at client side and we don't + // want the Cells to share the same byte[] where the RPC response is being read. Caching of any + // of the Cells at user's app level will make it not possible to GC the response byte[] + return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf)); } /** @@ -191,37 +198,36 @@ public class IPCUtil { * @throws IOException */ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, - final ByteBuffer cellBlock) - throws IOException { + ByteBuffer cellBlock) throws IOException { + // Use this method from HRS to create the CellScanner // If compressed, decompress it first before passing it on else we will leak compression // resources if the stream is not closed properly after we let it out. - InputStream is = null; if (compressor != null) { - // GZIPCodec fails w/ NPE if no configuration. - if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); - Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); - CompressionInputStream cis = - compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor); - ByteBufferOutputStream bbos = null; - try { - // TODO: This is ugly. The buffer will be resized on us if we guess wrong. - // TODO: Reuse buffers. - bbos = new ByteBufferOutputStream(cellBlock.remaining() * - this.cellBlockDecompressionMultiplier); - IOUtils.copy(cis, bbos); - bbos.close(); - ByteBuffer bb = bbos.getByteBuffer(); - is = new ByteBufferInputStream(bb); - } finally { - if (is != null) is.close(); - if (bbos != null) bbos.close(); + cellBlock = decompress(compressor, cellBlock); + } + return codec.getDecoder(cellBlock); + } - CodecPool.returnDecompressor(poolDecompressor); - } - } else { - is = new ByteBufferInputStream(cellBlock); + private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock) + throws IOException { + // GZIPCodec fails w/ NPE if no configuration. + if (compressor instanceof Configurable) ((Configurable) compressor).setConf(this.conf); + Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); + CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock), + poolDecompressor); + ByteBufferOutputStream bbos = null; + try { + // TODO: This is ugly. The buffer will be resized on us if we guess wrong. + // TODO: Reuse buffers. + bbos = new ByteBufferOutputStream( + cellBlock.remaining() * this.cellBlockDecompressionMultiplier); + IOUtils.copy(cis, bbos); + bbos.close(); + cellBlock = bbos.getByteBuffer(); + } finally { + CodecPool.returnDecompressor(poolDecompressor); } - return codec.getDecoder(is); + return cellBlock; } /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java index 666f440..d6b64f6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java @@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.util.Bytes; /** @@ -116,6 +118,11 @@ public class CellCodec implements Codec { } @Override + public Decoder getDecoder(ByteBuffer buf) { + return getDecoder(new ByteBufferInputStream(buf)); + } + + @Override public Encoder getEncoder(OutputStream os) { return new CellEncoder(os); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java index d79be17..7326884 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java @@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.util.Bytes; /** @@ -117,6 +119,11 @@ public class CellCodecWithTags implements Codec { } @Override + public Decoder getDecoder(ByteBuffer buf) { + return getDecoder(new ByteBufferInputStream(buf)); + } + + @Override public Encoder getEncoder(OutputStream os) { return new CellEncoder(os); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java index de44ec6..c8a4cdc 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.codec; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -50,5 +51,6 @@ public interface Codec { interface Decoder extends CellScanner {}; Decoder getDecoder(InputStream is); + Decoder getDecoder(ByteBuffer buf); Encoder getEncoder(OutputStream os); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java index f99bfcb..31be5e5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java @@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.NoTagsKeyValue; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; /** * Codec that does KeyValue version 1 serialization. @@ -69,6 +72,35 @@ public class KeyValueCodec implements Codec { } } + public static class ByteBufferedKeyValueDecoder implements Codec.Decoder { + + protected final ByteBuffer buf; + protected Cell current = null; + + public ByteBufferedKeyValueDecoder(ByteBuffer buf) { + this.buf = buf; + } + + @Override + public boolean advance() throws IOException { + if (this.buf.remaining() <= 0) return false; + int len = ByteBufferUtils.toInt(buf); + assert buf.hasArray(); + this.current = createCell(buf.array(), buf.arrayOffset() + buf.position(), len); + buf.position(buf.position() + len); + return true; + } + + @Override + public Cell current() { + return this.current; + } + + protected Cell createCell(byte[] buf, int offset, int len) { + return new NoTagsKeyValue(buf, offset, len); + } + } + /** * Implementation depends on {@link InputStream#available()} */ @@ -78,6 +110,11 @@ public class KeyValueCodec implements Codec { } @Override + public Decoder getDecoder(ByteBuffer buf) { + return new ByteBufferedKeyValueDecoder(buf); + } + + @Override public Encoder getEncoder(OutputStream os) { return new KeyValueEncoder(os); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java index ad762b4..714cc38 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java @@ -20,9 +20,11 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -75,6 +77,18 @@ public class KeyValueCodecWithTags implements Codec { } } + public static class ByteBufferedKeyValueDecoder + extends KeyValueCodec.ByteBufferedKeyValueDecoder { + + public ByteBufferedKeyValueDecoder(ByteBuffer buf) { + super(buf); + } + + protected Cell createCell(byte[] buf, int offset, int len) { + return new KeyValue(buf, offset, len); + } + } + /** * Implementation depends on {@link InputStream#available()} */ @@ -87,4 +101,9 @@ public class KeyValueCodecWithTags implements Codec { public Encoder getEncoder(OutputStream os) { return new KeyValueEncoder(os); } + + @Override + public Decoder getDecoder(ByteBuffer buf) { + return new ByteBufferedKeyValueDecoder(buf); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index 7f3d777..6b13951 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -782,6 +782,19 @@ public final class ByteBufferUtils { } /** + * Reads an int value at the given buffer's current position. Also advances the buffer's position + */ + public static int toInt(ByteBuffer buffer) { + if (UNSAFE_UNALIGNED) { + int i = UnsafeAccess.toInt(buffer, buffer.position()); + buffer.position(buffer.position() + Bytes.SIZEOF_INT); + return i; + } else { + return buffer.getInt(); + } + } + + /** * Reads an int value at the given buffer's offset. * @param buffer * @param offset diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java index 6c894a5..8f08539 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java @@ -20,9 +20,11 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -80,6 +82,11 @@ public class MessageCodec implements Codec { } @Override + public Decoder getDecoder(ByteBuffer buf) { + return getDecoder(new ByteBufferInputStream(buf)); + } + + @Override public Encoder getEncoder(OutputStream os) { return new MessageEncoder(os); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 05929fa..6b89e89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.codec.BaseDecoder; import org.apache.hadoop.hbase.codec.BaseEncoder; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.util.Dictionary; import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.util.Bytes; @@ -348,6 +350,11 @@ public class WALCellCodec implements Codec { } @Override + public Decoder getDecoder(ByteBuffer buf) { + return getDecoder(new ByteBufferInputStream(buf)); + } + + @Override public Encoder getEncoder(OutputStream os) { return (compression == null) ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java index d99643d..3a9ace2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java @@ -221,7 +221,8 @@ public class TestTags { CellScanner cellScanner = result.cellScanner(); cellScanner.advance(); KeyValue current = (KeyValue) cellScanner.current(); - assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength()); + assertTrue(current.getValueOffset() + current.getValueLength() == current.getOffset() + + current.getLength()); } } finally { if (scanner != null) @@ -239,7 +240,8 @@ public class TestTags { CellScanner cellScanner = result.cellScanner(); cellScanner.advance(); KeyValue current = (KeyValue) cellScanner.current(); - assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength()); + assertTrue(current.getValueOffset() + current.getValueLength() == current.getOffset() + + current.getLength()); } } finally { if (scanner != null) {