diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
index fb2cafa..cb9dc89 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
@@ -21,7 +21,9 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
@@ -35,10 +37,12 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -236,17 +240,16 @@ class CellBlockBuilder {
* @throws IOException if encoding fails
*/
public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
- final byte[] cellBlock) throws IOException {
+ byte[] cellBlock) throws IOException {
// Use this method from Client side to create the CellScanner
- ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock);
if (compressor != null) {
- cellBlockBuf = decompress(compressor, cellBlockBuf);
+ cellBlock = decompress(compressor, cellBlock);
}
// Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
// make Cells directly over the passed BB. This method is called at client side and we don't
// want the Cells to share the same byte[] where the RPC response is being read. Caching of any
// of the Cells at user's app level will make it not possible to GC the response byte[]
- return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
+ return codec.getDecoder(new ByteArrayInputStream(cellBlock));
}
/**
@@ -258,7 +261,7 @@ class CellBlockBuilder {
* @throws IOException if cell encoding fails
*/
public CellScanner createCellScannerReusingBuffers(final Codec codec,
- final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException {
+ final CompressionCodec compressor, ByteBuff cellBlock) throws IOException {
// Use this method from HRS to create the CellScanner
// If compressed, decompress it first before passing it on else we will leak compression
// resources if the stream is not closed properly after we let it out.
@@ -268,27 +271,39 @@ class CellBlockBuilder {
return codec.getDecoder(cellBlock);
}
- private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock)
+ private byte[] decompress(CompressionCodec compressor, byte[] compressedCellBlock)
+ throws IOException {
+ ByteBuffer cellBlock = decompress(compressor, new ByteArrayInputStream(compressedCellBlock),
+ compressedCellBlock.length * this.cellBlockDecompressionMultiplier);
+ assert cellBlock.hasArray();
+ return cellBlock.array();
+ }
+
+ private ByteBuff decompress(CompressionCodec compressor, ByteBuff compressedCellBlock)
throws IOException {
+ ByteBuffer cellBlock = decompress(compressor, new ByteBuffInputStream(compressedCellBlock),
+ compressedCellBlock.remaining() * this.cellBlockDecompressionMultiplier);
+ return new SingleByteBuff(cellBlock);
+ }
+
+ private ByteBuffer decompress(CompressionCodec compressor, InputStream cellBlockStream,
+ int osInitialSize) throws IOException {
// GZIPCodec fails w/ NPE if no configuration.
if (compressor instanceof Configurable) {
((Configurable) compressor).setConf(this.conf);
}
Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
- CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock),
- poolDecompressor);
+ CompressionInputStream cis = compressor.createInputStream(cellBlockStream, poolDecompressor);
ByteBufferOutputStream bbos;
try {
// TODO: This is ugly. The buffer will be resized on us if we guess wrong.
// TODO: Reuse buffers.
- bbos = new ByteBufferOutputStream(
- cellBlock.remaining() * this.cellBlockDecompressionMultiplier);
+ bbos = new ByteBufferOutputStream(osInitialSize);
IOUtils.copy(cis, bbos);
bbos.close();
- cellBlock = bbos.getByteBuffer();
+ return bbos.getByteBuffer();
} finally {
CodecPool.returnDecompressor(poolDecompressor);
}
- return cellBlock;
}
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
index ccabe66..9addaa5 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.io.SizedCellScanner;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -78,7 +79,8 @@ public class TestCellBlockBuilder {
CellScanner cellScanner = sized ? getSizedCellScanner(cells)
: CellUtil.createCellScanner(Arrays.asList(cells).iterator());
ByteBuffer bb = builder.buildCellBlock(codec, compressor, cellScanner);
- cellScanner = builder.createCellScannerReusingBuffers(codec, compressor, bb);
+ cellScanner = builder.createCellScannerReusingBuffers(codec, compressor,
+ new SingleByteBuff(bb));
int i = 0;
while (cellScanner.advance()) {
i++;
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
index 2165362..06a0ed6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
@@ -36,10 +36,10 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
protected final ByteBuffer buf;
protected final int offset;
protected final int length;
+ protected final boolean hasTags;
private final short rowLen;
private final int keyLen;
private long seqId = 0;
- private final boolean hasTags;
// TODO : See if famLen can be cached or not?
private static final int FIXED_OVERHEAD = ClassSize.OBJECT + ClassSize.REFERENCE
@@ -57,6 +57,18 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
this.seqId = seqId;
}
+ public OffheapKeyValue(ByteBuffer buf, int offset, int length) {
+ assert buf.isDirect();
+ this.buf = buf;
+ this.offset = offset;
+ this.length = length;
+ rowLen = ByteBufferUtils.toShort(this.buf, this.offset + KeyValue.ROW_OFFSET);
+ keyLen = ByteBufferUtils.toInt(this.buf, this.offset);
+ int tagsLen = this.length
+ - (this.keyLen + getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE);
+ this.hasTags = tagsLen > 0;
+ }
+
@Override
public byte[] getRowArray() {
return CellUtil.cloneRow(this);
@@ -265,16 +277,19 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
@Override
public void setTimestamp(long ts) throws IOException {
- // This Cell implementation is not yet used in write path.
- // TODO when doing HBASE-15179
- throw new UnsupportedOperationException();
+ ByteBufferUtils.copyFromArrayToBuffer(this.buf, this.getTimestampOffset(), Bytes.toBytes(ts), 0,
+ Bytes.SIZEOF_LONG);
+ }
+
+ private int getTimestampOffset() {
+ return this.offset + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + this.keyLen
+ - KeyValue.TIMESTAMP_TYPE_SIZE;
}
@Override
public void setTimestamp(byte[] ts, int tsOffset) throws IOException {
- // This Cell implementation is not yet used in write path.
- // TODO when doing HBASE-15179
- throw new UnsupportedOperationException();
+ ByteBufferUtils.copyFromArrayToBuffer(this.buf, this.getTimestampOffset(), ts, tsOffset,
+ Bytes.SIZEOF_LONG);
}
@Override
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
index d6b64f6..ca2e3e8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
@@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.codec;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -118,8 +118,8 @@ public class CellCodec implements Codec {
}
@Override
- public Decoder getDecoder(ByteBuffer buf) {
- return getDecoder(new ByteBufferInputStream(buf));
+ public Decoder getDecoder(ByteBuff buf) {
+ return getDecoder(new ByteBuffInputStream(buf));
}
@Override
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java
index 7326884..2dca10a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java
@@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.codec;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -119,8 +119,8 @@ public class CellCodecWithTags implements Codec {
}
@Override
- public Decoder getDecoder(ByteBuffer buf) {
- return getDecoder(new ByteBufferInputStream(buf));
+ public Decoder getDecoder(ByteBuff buf) {
+ return getDecoder(new ByteBuffInputStream(buf));
}
@Override
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java
index c8a4cdc..d1463ee 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java
@@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.codec;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.CellOutputStream;
+import org.apache.hadoop.hbase.nio.ByteBuff;
/**
* Encoder/Decoder for Cell.
@@ -51,6 +51,6 @@ public interface Codec {
interface Decoder extends CellScanner {};
Decoder getDecoder(InputStream is);
- Decoder getDecoder(ByteBuffer buf);
+ Decoder getDecoder(ByteBuff buf);
Encoder getEncoder(OutputStream os);
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
index 2609398..7cb3d16 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
@@ -27,8 +27,10 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NoTagsKeyValue;
+import org.apache.hadoop.hbase.OffheapKeyValue;
import org.apache.hadoop.hbase.ShareableMemory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
@@ -76,12 +78,12 @@ public class KeyValueCodec implements Codec {
}
}
- public static class ByteBufferedKeyValueDecoder implements Codec.Decoder {
+ public static class ByteBuffedKeyValueDecoder implements Codec.Decoder {
- protected final ByteBuffer buf;
+ protected final ByteBuff buf;
protected Cell current = null;
- public ByteBufferedKeyValueDecoder(ByteBuffer buf) {
+ public ByteBuffedKeyValueDecoder(ByteBuff buf) {
this.buf = buf;
}
@@ -90,10 +92,14 @@ public class KeyValueCodec implements Codec {
if (this.buf.remaining() <= 0) {
return false;
}
- int len = ByteBufferUtils.toInt(buf);
- assert buf.hasArray();
- this.current = createCell(buf.array(), buf.arrayOffset() + buf.position(), len);
- buf.position(buf.position() + len);
+ int len = buf.getInt();
+ ByteBuffer bb = buf.asSubByteBuffer(len);
+ if (bb.isDirect()) {
+ this.current = createCell(bb, bb.position(), len);
+ } else {
+ this.current = createCell(bb.array(), bb.arrayOffset() + bb.position(), len);
+ }
+ buf.skip(len);
return true;
}
@@ -106,6 +112,11 @@ public class KeyValueCodec implements Codec {
return new ShareableMemoryNoTagsKeyValue(buf, offset, len);
}
+ protected Cell createCell(ByteBuffer bb, int pos, int len) {
+ // We know there is not going to be any tags.
+ return new ShareableMemoryOffheapKeyValue(bb, pos, len, false, 0);
+ }
+
static class ShareableMemoryKeyValue extends KeyValue implements ShareableMemory {
public ShareableMemoryKeyValue(byte[] bytes, int offset, int length) {
super(bytes, offset, length);
@@ -133,6 +144,31 @@ public class KeyValueCodec implements Codec {
return kv;
}
}
+
+ static class ShareableMemoryOffheapKeyValue extends OffheapKeyValue implements ShareableMemory {
+ public ShareableMemoryOffheapKeyValue(ByteBuffer buf, int offset, int length) {
+ super(buf, offset, length);
+ }
+
+ public ShareableMemoryOffheapKeyValue(ByteBuffer buf, int offset, int length, boolean hasTags,
+ long seqId) {
+ super(buf, offset, length, hasTags, seqId);
+ }
+
+ @Override
+ public Cell cloneToCell() {
+ byte[] copy = new byte[this.length];
+ ByteBufferUtils.copyFromBufferToArray(copy, this.buf, this.offset, 0, this.length);
+ KeyValue kv;
+ if (this.hasTags) {
+ kv = new KeyValue(copy, 0, copy.length);
+ } else {
+ kv = new NoTagsKeyValue(copy, 0, copy.length);
+ }
+ kv.setSequenceId(this.getSequenceId());
+ return kv;
+ }
+ }
}
/**
@@ -144,8 +180,8 @@ public class KeyValueCodec implements Codec {
}
@Override
- public Decoder getDecoder(ByteBuffer buf) {
- return new ByteBufferedKeyValueDecoder(buf);
+ public Decoder getDecoder(ByteBuff buf) {
+ return new ByteBuffedKeyValueDecoder(buf);
}
@Override
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
index 63c02e8..334adc0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
/**
@@ -78,16 +79,21 @@ public class KeyValueCodecWithTags implements Codec {
}
}
- public static class ByteBufferedKeyValueDecoder
- extends KeyValueCodec.ByteBufferedKeyValueDecoder {
+ public static class ByteBuffedKeyValueDecoder
+ extends KeyValueCodec.ByteBuffedKeyValueDecoder {
- public ByteBufferedKeyValueDecoder(ByteBuffer buf) {
+ public ByteBuffedKeyValueDecoder(ByteBuff buf) {
super(buf);
}
protected Cell createCell(byte[] buf, int offset, int len) {
return new ShareableMemoryKeyValue(buf, offset, len);
}
+
+ @Override
+ protected Cell createCell(ByteBuffer bb, int pos, int len) {
+ return new ShareableMemoryOffheapKeyValue(bb, pos, len);
+ }
}
/**
@@ -104,7 +110,7 @@ public class KeyValueCodecWithTags implements Codec {
}
@Override
- public Decoder getDecoder(ByteBuffer buf) {
- return new ByteBufferedKeyValueDecoder(buf);
+ public Decoder getDecoder(ByteBuff buf) {
+ return new ByteBuffedKeyValueDecoder(buf);
}
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
index e528f02..971c42c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
@@ -140,7 +140,7 @@ public class ByteBufferPool {
buffers.offer(buf);
}
- int getBufferSize() {
+ public int getBufferSize() {
return this.bufferSize;
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStreamWrapper.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStreamWrapper.java
new file mode 100644
index 0000000..9ffc7c3
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStreamWrapper.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * When deal with OutputStream which is not ByteBufferSupportOutputStream type, wrap it with this
+ * class. We will have to write offheap ByteBuffer (DBB) data into the OS. This class is having a
+ * temp byte array to which we can copy the DBB data for writing to the OS.
+ *
+ * This is used while writing Cell data to WAL. In case of AsyncWAL, the OS created there is
+ * ByteBufferSupportOutputStream. But in case of FSHLog, the OS passed by DFS client, is not of type
+ * ByteBufferSupportOutputStream. We will need this temp solution until DFS client supports writing
+ * ByteBuffer directly to the OS it creates.
+ *
+ * Note that this class is not thread safe.
+ */
+@InterfaceAudience.Private
+public class ByteBufferSupportOutputStreamWrapper extends OutputStream
+ implements ByteBufferSupportOutputStream {
+
+ private static final int TEMP_BUF_LENGTH = 4 * 1024;
+ private final OutputStream os;
+ private final byte[] tempBuf = new byte[TEMP_BUF_LENGTH];
+
+ public ByteBufferSupportOutputStreamWrapper(OutputStream os) {
+ this.os = os;
+ }
+
+ @Override
+ public void write(ByteBuffer b, int off, int len) throws IOException {
+ byte[] buf = this.tempBuf;
+ if (len > TEMP_BUF_LENGTH) {
+ buf = new byte[len];
+ }
+ ByteBufferUtils.copyFromBufferToArray(buf, b, off, 0, len);
+ this.os.write(buf, 0, len);
+ }
+
+ @Override
+ public void writeInt(int i) throws IOException {
+ Bytes.putInt(this.tempBuf, 0, i);
+ this.os.write(this.tempBuf, 0, Bytes.SIZEOF_INT);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ this.os.write(b);
+ }
+
+ public void write(byte b[], int off, int len) throws IOException {
+ this.os.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ this.os.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.os.close();
+ }
+}
\ No newline at end of file
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
index 183a031..f6a34f8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.nio;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -35,6 +37,8 @@ import org.apache.hadoop.io.WritableUtils;
*/
@InterfaceAudience.Private
public abstract class ByteBuff {
+ private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB.
+
/**
* @return this ByteBuff's current position
*/
@@ -356,6 +360,14 @@ public abstract class ByteBuff {
public abstract long getLongAfterPosition(int offset);
/**
+ * Copy the content from this ByteBuff to a byte[].
+ * @return byte[] with the copied contents from this ByteBuff.
+ */
+ public byte[] toBytes() {
+ return toBytes(0, this.limit());
+ }
+
+ /**
* Copy the content from this ByteBuff to a byte[] based on the given offset and
* length
*
@@ -389,7 +401,39 @@ public abstract class ByteBuff {
*/
public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length);
+ /**
+ * Reads bytes from the given channel into this ByteBuff
+ * @param channel
+ * @return The number of bytes read from the channel
+ * @throws IOException
+ */
+ public abstract int read(ReadableByteChannel channel) throws IOException;
+
// static helper methods
+ public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException {
+ if (buf.remaining() <= NIO_BUFFER_LIMIT) {
+ return channel.read(buf);
+ }
+ int originalLimit = buf.limit();
+ int initialRemaining = buf.remaining();
+ int ret = 0;
+
+ while (buf.remaining() > 0) {
+ try {
+ int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
+ buf.limit(buf.position() + ioSize);
+ ret = channel.read(buf);
+ if (ret < ioSize) {
+ break;
+ }
+ } finally {
+ buf.limit(originalLimit);
+ }
+ }
+ int nBytes = initialRemaining - buf.remaining();
+ return (nBytes > 0) ? nBytes : ret;
+ }
+
/**
* Read integer from ByteBuff coded in 7 bits and increment position.
* @return Read integer.
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
index 107bb3f..399644d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
@@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hbase.nio;
+import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.InvalidMarkException;
+import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -1071,6 +1073,28 @@ public class MultiByteBuff extends ByteBuff {
}
@Override
+ public int read(ReadableByteChannel channel) throws IOException {
+ int total = 0;
+ while (true) {
+ // Read max possible into the current BB
+ int len = channelRead(channel, this.curItem);
+ if (len > 0)
+ total += len;
+ if (this.curItem.hasRemaining()) {
+ // We were not able to read enough to fill the current BB itself. Means there is no point in
+ // doing more reads from Channel. Only this much there for now.
+ break;
+ } else {
+ if (this.curItemIndex >= this.limitedItemIndex)
+ break;
+ this.curItemIndex++;
+ this.curItem = this.items[this.curItemIndex];
+ }
+ }
+ return total;
+ }
+
+ @Override
public boolean equals(Object obj) {
if (!(obj instanceof MultiByteBuff)) return false;
if (this == obj) return true;
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledByteBuff.java
new file mode 100644
index 0000000..3e5bb87
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledByteBuff.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.nio;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This interface marks {@link ByteBuff}s which works with {@link ByteBuffer}s from a Pool.
+ */
+@InterfaceAudience.Private
+public interface PooledByteBuff {
+
+ /**
+ * Release individual {@link ByteBuffer}s to pool if they are obtained from it.
+ */
+ void release();
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledMultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledMultiByteBuff.java
new file mode 100644
index 0000000..62df78d
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledMultiByteBuff.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.nio;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+
+@InterfaceAudience.Private
+public class PooledMultiByteBuff extends MultiByteBuff implements PooledByteBuff {
+
+ private final ByteBuffer[] bbsFromPool;
+ private final ByteBufferPool pool;
+
+ public PooledMultiByteBuff(ByteBuffer[] bbs, ByteBuffer[] bbsFromPool, ByteBufferPool pool) {
+ super(bbs);
+ this.bbsFromPool = bbsFromPool;
+ this.pool = pool;
+ }
+
+ @Override
+ public void release() {
+ // Return back all the BBs to pool
+ if (this.bbsFromPool != null) {
+ for (int i = 0; i < this.bbsFromPool.length; i++) {
+ this.pool.putbackBuffer(this.bbsFromPool[i]);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledSingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledSingleByteBuff.java
new file mode 100644
index 0000000..8626c2c
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledSingleByteBuff.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.nio;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+
+@InterfaceAudience.Private
+public class PooledSingleByteBuff extends SingleByteBuff implements PooledByteBuff {
+
+ private final ByteBuffer bbFromPool;
+ private final ByteBufferPool pool;
+
+ public PooledSingleByteBuff(ByteBuffer bbFromPool, ByteBufferPool pool) {
+ super(bbFromPool);
+ this.bbFromPool = bbFromPool;
+ this.pool = pool;
+ }
+
+ public void release() {
+ // Return back all the BB to pool
+ this.pool.putbackBuffer(this.bbFromPool);
+ }
+}
\ No newline at end of file
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
index 9ad28dc..89e5010 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.nio;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -313,6 +315,11 @@ public class SingleByteBuff extends ByteBuff {
}
@Override
+ public int read(ReadableByteChannel channel) throws IOException {
+ return channelRead(channel, buf);
+ }
+
+ @Override
public boolean equals(Object obj) {
if(!(obj instanceof SingleByteBuff)) return false;
return this.buf.equals(((SingleByteBuff)obj).buf);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java
index ea162fc..41dc387 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java
@@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.codec;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -83,8 +83,8 @@ public class MessageCodec implements Codec {
}
@Override
- public Decoder getDecoder(ByteBuffer buf) {
- return getDecoder(new ByteBufferInputStream(buf));
+ public Decoder getDecoder(ByteBuff buf) {
+ return getDecoder(new ByteBuffInputStream(buf));
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 34140a9..114e585 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.ipc;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CallDroppedException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -45,7 +43,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving
public class CallRunner {
- private static final Log LOG = LogFactory.getLog(CallRunner.class);
private static final CallDroppedException CALL_DROPPED_EXCEPTION
= new CallDroppedException();
@@ -143,6 +140,8 @@ public class CallRunner {
sucessful = true;
}
}
+ // return back the RPC request read BB we can do here. It is done by now.
+ call.releaseRequestBuffer();
// Set the response
Message param = resultPair != null ? resultPair.getFirst() : null;
CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 7bcf3a7..81e9fcc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
@@ -99,6 +100,11 @@ import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.PooledByteBuff;
+import org.apache.hadoop.hbase.nio.PooledMultiByteBuff;
+import org.apache.hadoop.hbase.nio.PooledSingleByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
@@ -144,6 +150,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
@@ -302,6 +309,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private UserProvider userProvider;
private final ByteBufferPool reservoir;
+ // The requests and response will use buffers from ByteBufferPool, when the size of the
+ // request/response is at least this size.
+ // We make this to be 1/6th of the pool buffer size.
+ private final int minSizeForReservoirUse;
private volatile boolean allowFallbackToSimpleAuth;
@@ -336,6 +347,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected boolean isError;
protected TraceInfo tinfo;
private ByteBufferListOutputStream cellBlockStream = null;
+ private PooledByteBuff reqBuff = null;
private User user;
private InetAddress remoteAddress;
@@ -349,7 +361,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
justification="Can't figure why this complaint is happening... see below")
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, Responder responder,
- long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) {
+ long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
+ PooledByteBuff reqBuff) {
this.id = id;
this.service = service;
this.md = md;
@@ -369,6 +382,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
connection == null? null: connection.retryImmediatelySupported;
this.timeout = timeout;
this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE;
+ this.reqBuff = reqBuff;
}
/**
@@ -383,9 +397,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// got from pool.
this.cellBlockStream = null;
}
+ releaseRequestBuffer();// If the call was run successfuly, we might have already returned the
+ // BB back to pool. No worries..Then inputCellBlock will be null
this.connection.decRpcCount(); // Say that we're done with this call.
}
+ protected void releaseRequestBuffer() {
+ if (this.reqBuff != null) {
+ this.reqBuff.release();
+ this.reqBuff = null;
+ }
+ }
+
@Override
public String toString() {
return toShortString() + " param: " +
@@ -1281,7 +1304,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// If the connection header has been read or not.
private boolean connectionHeaderRead = false;
protected SocketChannel channel;
- private ByteBuffer data;
+ private ByteBuff data;
private ByteBuffer dataLengthBuffer;
protected final ConcurrentLinkedDeque responseQueue = new ConcurrentLinkedDeque();
private final Lock responseWriteLock = new ReentrantLock();
@@ -1319,17 +1342,17 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// Fake 'call' for failed authorization response
private static final int AUTHORIZATION_FAILED_CALLID = -1;
private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
- null, null, this, null, 0, null, null, 0);
+ null, null, this, null, 0, null, null, 0, null);
private ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
private static final int SASL_CALLID = -33;
private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
- 0, null, null, 0);
+ 0, null, null, 0, null);
// Fake 'call' for connection header response
private static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
private final Call setConnectionHeaderResponseCall = new Call(CONNECTION_HEADER_RESPONSE_CALLID,
- null, null, null, null, null, this, null, 0, null, null, 0);
+ null, null, null, null, null, this, null, 0, null, null, 0, null);
// was authentication allowed with a fallback to simple auth
private boolean authenticatedWithFallback;
@@ -1429,7 +1452,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return authorizedUgi;
}
- private void saslReadAndProcess(ByteBuffer saslToken) throws IOException,
+ private void saslReadAndProcess(ByteBuff saslToken) throws IOException,
InterruptedException {
if (saslContextEstablished) {
if (LOG.isTraceEnabled())
@@ -1439,13 +1462,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if (!useWrap) {
processOneRpc(saslToken);
} else {
- byte[] b = saslToken.array();
+ byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
byte [] plaintextData;
if (useCryptoAesWrap) {
// unwrap with CryptoAES
- plaintextData = cryptoAES.unwrap(b, saslToken.position(), saslToken.limit());
+ plaintextData = cryptoAES.unwrap(b, 0, b.length);
} else {
- plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit());
+ plaintextData = saslServer.unwrap(b, 0, b.length);
}
processUnwrappedData(plaintextData);
}
@@ -1498,7 +1521,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
LOG.debug("Have read input token of size " + saslToken.limit()
+ " for processing by saslServer.evaluateResponse()");
}
- replyToken = saslServer.evaluateResponse(saslToken.array());
+ replyToken = saslServer
+ .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes());
} catch (IOException e) {
IOException sendToClient = e;
Throwable cause = e;
@@ -1751,7 +1775,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// Notify the client about the offending request
Call reqTooBig = new Call(header.getCallId(), this.service, null, null, null,
- null, this, responder, 0, null, this.addr,0);
+ null, this, responder, 0, null, this.addr,0, null);
metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
// Make sure the client recognizes the underlying exception
// Otherwise, throw a DoNotRetryIOException.
@@ -1771,7 +1795,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return -1;
}
- data = ByteBuffer.allocate(dataLength);
+ data = allocateByteBuffToReadInto(dataLength);
// Increment the rpc count. This counter will be decreased when we write
// the response. If we want the connection to be detected as idle properly, we
@@ -1779,7 +1803,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
incRpcCount();
}
- count = channelRead(channel, data);
+ count = channelDataRead(channel, data);
if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
process();
@@ -1788,11 +1812,71 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return count;
}
+ private ByteBuff allocateByteBuffToReadInto(int length) {
+ // We create random on heap buffers are read into those when
+ // 1. ByteBufferPool is not there.
+ // 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is
+ // waste then. Also if all the reqs are of this size, we will be creating larger sized
+ // buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like
+ // RegionOpen.
+ // 3. If it is an initial handshake signal or initial connection request. Any way then
+ // condition 2 itself will match
+ // 4. When SASL use is ON.
+ ByteBuff resultBuf;
+ if (reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead || useSasl
+ || length < minSizeForReservoirUse) {
+ resultBuf = new SingleByteBuff(ByteBuffer.allocate(length));
+ } else {
+ List bufsFromPool = new ArrayList(
+ (length / reservoir.getBufferSize()) + 1);
+ int remain = length;
+ ByteBuffer buf = null;
+ while (remain >= minSizeForReservoirUse && (buf = reservoir.getBuffer()) != null) {
+ bufsFromPool.add(buf);
+ remain -= reservoir.getBufferSize();
+ }
+ buf = null;
+ ByteBuffer[] itemsFromPool = new ByteBuffer[bufsFromPool.size()];
+ bufsFromPool.toArray(itemsFromPool);
+ if (remain > 0) {
+ buf = ByteBuffer.allocate(remain);
+ }
+ ByteBuffer[] items = new ByteBuffer[buf == null ? itemsFromPool.length
+ : itemsFromPool.length + 1];
+ System.arraycopy(itemsFromPool, 0, items, 0, itemsFromPool.length);
+ if (buf != null) {
+ items[items.length - 1] = buf;
+ }
+ assert items.length > 0;
+ if (items.length > 1) {
+ resultBuf = new PooledMultiByteBuff(items, itemsFromPool, reservoir);
+ } else {
+ // We are backed by single BB
+ assert itemsFromPool.length <= 1;
+ if (itemsFromPool.length == 0) {
+ resultBuf = new SingleByteBuff(items[0]);
+ } else {
+ resultBuf = new PooledSingleByteBuff(itemsFromPool[0], reservoir);
+ }
+ }
+ }
+ resultBuf.limit(length);
+ return resultBuf;
+ }
+
+ protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
+ int count = buf.read(channel);
+ if (count > 0) {
+ metrics.receivedBytes(count);
+ }
+ return count;
+ }
+
/**
* Process the data buffer and clean the connection state for the next call.
*/
private void process() throws IOException, InterruptedException {
- data.flip();
+ data.rewind();
try {
if (skipInitialSaslHandshake) {
skipInitialSaslHandshake = false;
@@ -1823,7 +1907,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
LOG.warn(msg);
- Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null,0);
+ Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null, 0,
+ null);
setupResponse(null, fakeCall, e, msg);
responder.doRespond(fakeCall);
// Returning -1 closes out the connection.
@@ -1831,9 +1916,15 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
// Reads the connection header following version
- private void processConnectionHeader(ByteBuffer buf) throws IOException {
- this.connectionHeader = ConnectionHeader.parseFrom(
- new ByteBufferInputStream(buf));
+ private void processConnectionHeader(ByteBuff buf) throws IOException {
+ if (buf.hasArray()) {
+ this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
+ } else {
+ CodedInputStream cis = CodedInputStream
+ .newInstance(new ByteBuffedByteInput(buf, 0, buf.limit()), true);
+ cis.enableAliasing(true);
+ this.connectionHeader = ConnectionHeader.parseFrom(cis);
+ }
String serviceName = connectionHeader.getServiceName();
if (serviceName == null) throw new EmptyServiceNameException();
this.service = getService(services, serviceName);
@@ -2035,13 +2126,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if (unwrappedData.remaining() == 0) {
unwrappedDataLengthBuffer.clear();
unwrappedData.flip();
- processOneRpc(unwrappedData);
+ processOneRpc(new SingleByteBuff(unwrappedData));
unwrappedData = null;
}
}
}
- private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException {
+ private void processOneRpc(ByteBuff buf) throws IOException, InterruptedException {
if (connectionHeaderRead) {
processRequest(buf);
} else {
@@ -2063,12 +2154,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
* @throws IOException
* @throws InterruptedException
*/
- protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
+ protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
long totalRequestSize = buf.limit();
int offset = 0;
// Here we read in the header. We avoid having pb
// do its default 4k allocation for CodedInputStream. We force it to use backing array.
- CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
+ CodedInputStream cis;
+ if (buf.hasArray()) {
+ cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
+ } else {
+ cis = CodedInputStream.newInstance(new ByteBuffedByteInput(buf, 0, buf.limit()), true);
+ cis.enableAliasing(true);
+ }
int headerSize = cis.readRawVarint32();
offset = cis.getTotalBytesRead();
Message.Builder builder = RequestHeader.newBuilder();
@@ -2085,7 +2182,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) {
final Call callTooBig =
new Call(id, this.service, null, null, null, null, this,
- responder, totalRequestSize, null, null, 0);
+ responder, totalRequestSize, null, null, 0, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
@@ -2119,7 +2216,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
if (header.hasCellBlockMeta()) {
buf.position(offset);
- cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf);
+ cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec,
+ this.compressionCodec, buf);
}
} catch (Throwable t) {
InetSocketAddress address = getListenerAddress();
@@ -2140,7 +2238,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
final Call readParamsFailedCall =
new Call(id, this.service, null, null, null, null, this,
- responder, totalRequestSize, null, null, 0);
+ responder, totalRequestSize, null, null, 0, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, readParamsFailedCall, t,
msg + "; " + t.getMessage());
@@ -2156,7 +2254,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
timeout = Math.max(minClientRequestTimeout, header.getTimeout());
}
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
- totalRequestSize, traceInfo, this.addr, timeout);
+ totalRequestSize, traceInfo, this.addr, timeout,
+ buf instanceof PooledByteBuff ? (PooledByteBuff) buf : null);
if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
callQueueSizeInBytes.add(-1 * call.getSize());
@@ -2293,8 +2392,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
+ this.minSizeForReservoirUse = this.reservoir.getBufferSize() / 6;
} else {
reservoir = null;
+ this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place.
}
this.server = server;
this.services = services;
@@ -3031,4 +3132,44 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
idleScanTimer.schedule(idleScanTask, idleScanInterval);
}
}
-}
+
+ private static class ByteBuffedByteInput extends ByteInput {
+
+ private ByteBuff buf;
+ private int offset;
+ private int length;
+
+ ByteBuffedByteInput(ByteBuff buf, int offset, int length) {
+ this.buf = buf;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ @Override
+ public byte read(int offset) {
+ return this.buf.get(getAbsoluteOffset(offset));
+ }
+
+ private int getAbsoluteOffset(int offset) {
+ return this.offset + offset;
+ }
+
+ @Override
+ public int read(int offset, byte[] out, int outOffset, int len) {
+ this.buf.get(getAbsoluteOffset(offset), out, outOffset, len);
+ return len;
+ }
+
+ @Override
+ public int read(int offset, ByteBuffer out) {
+ int len = out.remaining();
+ this.buf.get(out, getAbsoluteOffset(offset), len);
+ return len;
+ }
+
+ @Override
+ public int size() {
+ return this.length;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
index 52dfae0..9e80bbe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
@@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -33,9 +32,12 @@ import org.apache.hadoop.hbase.codec.BaseDecoder;
import org.apache.hadoop.hbase.codec.BaseEncoder;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBuffInputStream;
+import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStreamWrapper;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -356,14 +358,18 @@ public class WALCellCodec implements Codec {
}
@Override
- public Decoder getDecoder(ByteBuffer buf) {
- return getDecoder(new ByteBufferInputStream(buf));
+ public Decoder getDecoder(ByteBuff buf) {
+ return getDecoder(new ByteBuffInputStream(buf));
}
@Override
public Encoder getEncoder(OutputStream os) {
- return (compression == null)
- ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression);
+ if (compression == null) {
+ os = (os instanceof ByteBufferSupportOutputStream) ? os
+ : new ByteBufferSupportOutputStreamWrapper(os);
+ return new EnsureKvEncoder(os);
+ }
+ return new CompressedKvEncoder(os, compression);
}
public ByteStringCompressor getByteStringCompressor() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 2211e8f..5a9178a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -33,7 +33,6 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
@@ -47,6 +46,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@@ -315,7 +315,7 @@ public abstract class AbstractTestIPC {
}
@Override
- protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
+ protected void processRequest(ByteBuff buf) throws IOException, InterruptedException {
// this will throw exception after the connection header is read, and an RPC is sent
// from client
throw new DoNotRetryIOException("Failing for test");