diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index d98d81d..c08feb8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -33,10 +33,10 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.PoolAwareByteBuffersOutputStream;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.compress.CodecPool;
@@ -95,48 +95,26 @@ public class IPCUtil {
public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
final CellScanner cellScanner)
throws IOException {
- return buildCellBlock(codec, compressor, cellScanner, null);
- }
-
- /**
- * Puts CellScanner Cells into a cell block using passed in codec and/or
- * compressor.
- * @param codec
- * @param compressor
- * @param cellScanner
- * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate
- * our own ByteBuffer.
- * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
- * passed in codec and/or compressor; the returned buffer has been
- * flipped and is ready for reading. Use limit to find total size. If pool was not
- * null, then this returned ByteBuffer came from there and should be returned to the pool when
- * done.
- * @throws IOException
- */
- @SuppressWarnings("resource")
- public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
- final CellScanner cellScanner, final BoundedByteBufferPool pool)
- throws IOException {
if (cellScanner == null) return null;
if (codec == null) throw new CellScannerButNoCodecException();
int bufferSize = this.cellBlockBuildingInitialBufferSize;
- ByteBufferOutputStream baos = null;
- if (pool != null) {
- ByteBuffer bb = pool.getBuffer();
- bufferSize = bb.capacity();
- baos = new ByteBufferOutputStream(bb);
- } else {
- // Then we need to make our own to return.
- if (cellScanner instanceof HeapSize) {
- long longSize = ((HeapSize)cellScanner).heapSize();
- // Just make sure we don't have a size bigger than an int.
- if (longSize > Integer.MAX_VALUE) {
- throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
- }
- bufferSize = ClassSize.align((int)longSize);
+ ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
+ encodeCellsTo(baos, cellScanner, codec, compressor);
+ if (LOG.isTraceEnabled()) {
+ if (bufferSize < baos.size()) {
+ LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
+ "; up hbase.ipc.cellblock.building.initial.buffersize?");
}
- baos = new ByteBufferOutputStream(bufferSize);
}
+ ByteBuffer bb = baos.getByteBuffer();
+ // If no cells, don't mess around. Just return null (could be a bunch of existence checking
+ // gets or something -- stuff that does not return a cell).
+ if (!bb.hasRemaining()) return null;
+ return bb;
+ }
+
+ private void encodeCellsTo(ByteBufferOutputStream baos, CellScanner cellScanner, Codec codec,
+ CompressionCodec compressor) throws IOException {
OutputStream os = baos;
Compressor poolCompressor = null;
try {
@@ -146,28 +124,43 @@ public class IPCUtil {
os = compressor.createOutputStream(os, poolCompressor);
}
Codec.Encoder encoder = codec.getEncoder(os);
- int count = 0;
while (cellScanner.advance()) {
encoder.write(cellScanner.current());
- count++;
}
encoder.flush();
- // If no cells, don't mess around. Just return null (could be a bunch of existence checking
- // gets or something -- stuff that does not return a cell).
- if (count == 0) return null;
} catch (BufferOverflowException e) {
throw new DoNotRetryIOException(e);
} finally {
- os.close();
if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
}
- if (LOG.isTraceEnabled()) {
- if (bufferSize < baos.size()) {
- LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
- "; up hbase.ipc.cellblock.building.initial.buffersize?");
- }
- }
- return baos.getByteBuffer();
+ }
+
+ /**
+ * Puts CellScanner Cells into a cell block using passed in codec and/or
+ * compressor.
+ * @param codec
+ * @param compressor
+ * @param cellScanner
+ * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate
+ * our own ByteBuffer.
+ * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
+ * passed in codec and/or compressor; the returned buffer has been
+ * flipped and is ready for reading. Use limit to find total size. If pool was not
+ * null, then this returned ByteBuffer came from there and should be returned to the pool when
+ * done.
+ * @throws IOException
+ */
+ @SuppressWarnings("resource")
+ public PoolAwareByteBuffersOutputStream buildCellBlockStream(final Codec codec,
+ final CompressionCodec compressor, final CellScanner cellScanner, final ByteBufferPool pool)
+ throws IOException {
+ if (cellScanner == null) return null;
+ if (codec == null) throw new CellScannerButNoCodecException();
+ assert pool != null;
+ PoolAwareByteBuffersOutputStream pbbos = new PoolAwareByteBuffersOutputStream(pool);
+ encodeCellsTo(pbbos, cellScanner, codec, compressor);
+ if (pbbos.size() == 0) return null;
+ return pbbos;
}
/**
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
index d4bda18..f77092d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
@@ -44,7 +44,11 @@ public class ByteBufferOutputStream extends OutputStream
// http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
- protected ByteBuffer buf;
+ protected ByteBuffer curBuf = null;
+
+ ByteBufferOutputStream() {
+
+ }
public ByteBufferOutputStream(int capacity) {
this(capacity, false);
@@ -66,12 +70,12 @@ public class ByteBufferOutputStream extends OutputStream
*/
public ByteBufferOutputStream(final ByteBuffer bb) {
assert bb.order() == ByteOrder.BIG_ENDIAN;
- this.buf = bb;
- this.buf.clear();
+ this.curBuf = bb;
+ this.curBuf.clear();
}
public int size() {
- return buf.position();
+ return curBuf.position();
}
private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
@@ -86,25 +90,25 @@ public class ByteBufferOutputStream extends OutputStream
* @return ByteBuffer
*/
public ByteBuffer getByteBuffer() {
- buf.flip();
- return buf;
+ curBuf.flip();
+ return curBuf;
}
- private void checkSizeAndGrow(int extra) {
- long capacityNeeded = buf.position() + (long) extra;
- if (capacityNeeded > buf.limit()) {
+ protected void checkSizeAndGrow(int extra) {
+ long capacityNeeded = curBuf.position() + (long) extra;
+ if (capacityNeeded > curBuf.limit()) {
// guarantee it's possible to fit
if (capacityNeeded > MAX_ARRAY_SIZE) {
throw new BufferOverflowException();
}
// double until hit the cap
- long nextCapacity = Math.min(buf.capacity() * 2L, MAX_ARRAY_SIZE);
+ long nextCapacity = Math.min(curBuf.capacity() * 2L, MAX_ARRAY_SIZE);
// but make sure there is enough if twice the existing capacity is still too small
nextCapacity = Math.max(nextCapacity, capacityNeeded);
- ByteBuffer newBuf = allocate((int) nextCapacity, buf.isDirect());
- buf.flip();
- ByteBufferUtils.copyFromBufferToBuffer(buf, newBuf);
- buf = newBuf;
+ ByteBuffer newBuf = allocate((int) nextCapacity, curBuf.isDirect());
+ curBuf.flip();
+ ByteBufferUtils.copyFromBufferToBuffer(curBuf, newBuf);
+ curBuf = newBuf;
}
}
@@ -112,7 +116,7 @@ public class ByteBufferOutputStream extends OutputStream
@Override
public void write(int b) throws IOException {
checkSizeAndGrow(Bytes.SIZEOF_BYTE);
- buf.put((byte)b);
+ curBuf.put((byte)b);
}
/**
@@ -122,9 +126,9 @@ public class ByteBufferOutputStream extends OutputStream
* @param out the output stream to which to write the data.
* @exception IOException if an I/O error occurs.
*/
- public synchronized void writeTo(OutputStream out) throws IOException {
+ public void writeTo(OutputStream out) throws IOException {
WritableByteChannel channel = Channels.newChannel(out);
- ByteBuffer bb = buf.duplicate();
+ ByteBuffer bb = curBuf.duplicate();
bb.flip();
channel.write(bb);
}
@@ -137,12 +141,12 @@ public class ByteBufferOutputStream extends OutputStream
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkSizeAndGrow(len);
- ByteBufferUtils.copyFromArrayToBuffer(buf, b, off, len);
+ ByteBufferUtils.copyFromArrayToBuffer(curBuf, b, off, len);
}
public void write(ByteBuffer b, int off, int len) throws IOException {
checkSizeAndGrow(len);
- ByteBufferUtils.copyFromBufferToBuffer(b, buf, off, len);
+ ByteBufferUtils.copyFromBufferToBuffer(b, curBuf, off, len);
}
/**
@@ -153,7 +157,7 @@ public class ByteBufferOutputStream extends OutputStream
*/
public void writeInt(int i) throws IOException {
checkSizeAndGrow(Bytes.SIZEOF_INT);
- ByteBufferUtils.putInt(this.buf, i);
+ ByteBufferUtils.putInt(this.curBuf, i);
}
@Override
@@ -167,7 +171,7 @@ public class ByteBufferOutputStream extends OutputStream
}
public byte[] toByteArray(int offset, int length) {
- ByteBuffer bb = buf.duplicate();
+ ByteBuffer bb = curBuf.duplicate();
bb.flip();
byte[] chunk = new byte[length];
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
new file mode 100644
index 0000000..4b7e16a
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. This
+ * pool keeps an upper bound on the count of ByteBuffers in the pool and a fixed size of ByteBuffer
+ * that it will create. When requested, if a free ByteBuffer is already present, it will return
+ * that. And when no free ByteBuffer available and we are below the max count, it will create a new
+ * one and return that.
+ *
+ *
+ * Note: This pool returns off heap ByteBuffers.
+ *
+ * This class is thread safe.
+ */
+@InterfaceAudience.Private
+public class ByteBufferPool {
+ private static final Log LOG = LogFactory.getLog(ByteBufferPool.class);
+ // TODO better config names?
+ // hbase.ipc.server.reservoir.initial.max -> hbase.ipc.server.reservoir.max.buffer.count
+ // hbase.ipc.server.reservoir.initial.buffer.size -> hbase.ipc.server.reservoir.buffer.size
+ public static final String MAX_POOL_SIZE_KEY = "hbase.ipc.server.reservoir.initial.max";
+ public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size";
+ public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;// 64 KB. Making it same as the chunk size
+ // what we will write/read to/from the
+ // socket channel.
+ private final Queue buffers = new ConcurrentLinkedQueue();
+
+ private final int bufferSize;
+ private final int maxPoolSize;
+ private AtomicInteger count; // Count of the BBs created already for this pool.
+
+ /**
+ * @param maxByteBufferSizeToCache
+ * @param initialByteBufferSize
+ * @param maxToCache
+ */
+ public ByteBufferPool(final int bufferSize, final int maxPoolSize) {
+ this.bufferSize = bufferSize;
+ this.maxPoolSize = maxPoolSize;
+ // TODO can add initialPoolSize config also and make those many BBs ready for use.
+ LOG.info("Created ByteBufferPool with bufferSize : " + bufferSize + " and maxPoolSize : "
+ + maxPoolSize);
+ this.count = new AtomicInteger(0);
+ }
+
+ /**
+ * @return One free ByteBuffer from the pool. If no free ByteBuffer and we have not reached the
+ * maximum pool size, it will create a new one and return. In case of max pool size also
+ * reached, will return null. When pool returned a ByteBuffer, make sure to return it back
+ * to pool after use.
+ * @see #putBuffer(ByteBuffer)
+ */
+ public ByteBuffer getBuffer() {
+ ByteBuffer bb = buffers.poll();
+ if (bb != null) {
+ // Clear sets limit == capacity. Position == 0.
+ bb.clear();
+ return bb;
+ }
+ while (true) {
+ int c = this.count.intValue();
+ if (c >= this.maxPoolSize) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Pool already reached its max capacity : " + this.maxPoolSize
+ + " and no free buffers now. Consider increasing the value for '" + MAX_POOL_SIZE_KEY
+ + "' ?");
+ }
+ return null;
+ }
+ if (!this.count.compareAndSet(c, c + 1)) {
+ continue;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Creating a new offheap ByteBuffer of size: " + this.bufferSize);
+ }
+ return ByteBuffer.allocateDirect(this.bufferSize);
+ }
+ }
+
+ /**
+ * Return back a ByteBuffer after its use. Do not try to return put back a ByteBuffer, not
+ * obtained from this pool.
+ * @param buf ByteBuffer to return.
+ */
+ public void putBuffer(ByteBuffer buf) {
+ if (buf.capacity() != this.bufferSize || !buf.isDirect()
+ || this.buffers.size() >= this.maxPoolSize) {
+ LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
+ return;
+ }
+ buffers.offer(buf);
+ }
+
+ int getBufferSize() {
+ return this.bufferSize;
+ }
+
+ /**
+ * @return Number of free buffers
+ */
+ @VisibleForTesting
+ int getQueueSize() {
+ return buffers.size();
+ }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/PoolAwareByteBuffersOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/PoolAwareByteBuffersOutputStream.java
new file mode 100644
index 0000000..bc800ed
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/PoolAwareByteBuffersOutputStream.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+
+/**
+ * An OutputStream which writes data into ByteBuffers. It will try to get ByteBuffer, as and when
+ * needed, from the passed pool. When pool is not giving a ByteBuffer it will create one on heap.
+ * Make sure to call {@link #close()} method once the Stream usage is over and data is transferred
+ * to the wanted destination.
+ * Not thread safe!
+ */
+@InterfaceAudience.Private
+public class PoolAwareByteBuffersOutputStream extends ByteBufferOutputStream {
+
+ private ByteBufferPool pool;
+ // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If
+ // it is not available will make a new one our own and keep writing to that. We keep track of all
+ // the BBs that we got from pool, separately so that on close, we can make sure to return back
+ // all of them to pool
+ protected List allBufs = new ArrayList();
+ protected List bufsFromPool = new ArrayList();
+
+ private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already
+
+ public PoolAwareByteBuffersOutputStream(ByteBufferPool pool) {
+ this.pool = pool;
+ allocateNewBuffer();
+ }
+
+ private void allocateNewBuffer() {
+ if (this.curBuf != null) {
+ this.curBuf.flip();// On the current buf set limit = pos and pos = 0.
+ }
+ // Get an initial BB to work with from the pool
+ this.curBuf = this.pool.getBuffer();
+ if (this.curBuf == null) {
+ // No free BB at this moment. Make a new one. The pool returns off heap BBs. Don't make off
+ // heap BB on demand. It is difficult to account for all such and so proper sizing of Max
+ // direct heap size. See HBASE-15525 also for more details.
+ // Make BB with same size of pool's buffer size.
+ this.curBuf = ByteBuffer.allocate(this.pool.getBufferSize());
+ } else {
+ this.bufsFromPool.add(this.curBuf);
+ }
+ this.allBufs.add(this.curBuf);
+ }
+
+ @Override
+ public int size() {
+ int s = 0;
+ for (int i = 0; i < this.allBufs.size() - 1; i++) {
+ s += this.allBufs.get(i).remaining();
+ }
+ // On the last BB, it might not be flipped yet if getByteBuffers is not yet called
+ if (this.lastBufFlipped) {
+ s += this.curBuf.remaining();
+ } else {
+ s += this.curBuf.position();
+ }
+ return s;
+ }
+
+ @Override
+ public ByteBuffer getByteBuffer() {
+ throw new UnsupportedOperationException("This stream is not backed by a single ByteBuffer");
+ }
+
+ @Override
+ protected void checkSizeAndGrow(int extra) {
+ long capacityNeeded = curBuf.position() + (long) extra;
+ if (capacityNeeded > curBuf.limit()) {
+ allocateNewBuffer();
+ }
+ }
+
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ // No usage of this API in code. Just making it as an Unsupported operation as of now
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {
+ // Return back all the BBs to pool
+ if (this.bufsFromPool != null) {
+ for (int i = 0; i < this.bufsFromPool.size(); i++) {
+ this.pool.putBuffer(this.bufsFromPool.get(i));
+ }
+ this.bufsFromPool = null;
+ }
+ this.allBufs = null;
+ this.curBuf = null;
+ }
+
+ @Override
+ public byte[] toByteArray(int offset, int length) {
+ // No usage of this API in code. Just making it as an Unsupported operation as of now
+ throw new UnsupportedOperationException();
+ }
+
+ public List getByteBuffers() {
+ if (!this.lastBufFlipped) {
+ this.lastBufFlipped = true;
+ // All the other BBs are already flipped while moving to the new BB.
+ curBuf.flip();
+ }
+ return this.allBufs;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ int toWrite = 0;
+ while (len > 0) {
+ toWrite = Math.min(len, this.curBuf.remaining());
+ ByteBufferUtils.copyFromArrayToBuffer(this.curBuf, b, off, toWrite);
+ off += toWrite;
+ len -= toWrite;
+ if (len > 0) {
+ allocateNewBuffer();// The curBuf is over. Let us move to the next one
+ }
+ }
+ }
+
+ @Override
+ public void write(ByteBuffer b, int off, int len) throws IOException {
+ int toWrite = 0;
+ while (len > 0) {
+ toWrite = Math.min(len, this.curBuf.remaining());
+ ByteBufferUtils.copyFromBufferToBuffer(b, this.curBuf, off, toWrite);
+ off += toWrite;
+ len -= toWrite;
+ if (len > 0) {
+ allocateNewBuffer();// The curBuf is over. Let us move to the next one
+ }
+ }
+ }
+}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestPoolAwareByteBuffersOutputStream.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestPoolAwareByteBuffersOutputStream.java
new file mode 100644
index 0000000..44a4d8f
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestPoolAwareByteBuffersOutputStream.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ IOTests.class, SmallTests.class })
+public class TestPoolAwareByteBuffersOutputStream {
+
+ @Test
+ public void testWrites() throws Exception {
+ ByteBufferPool pool = new ByteBufferPool(10, 3);
+ PoolAwareByteBuffersOutputStream pabos = new PoolAwareByteBuffersOutputStream(pool);
+ pabos.write(2);// Write a byte
+ pabos.writeInt(100);// Write an int
+ byte[] b = Bytes.toBytes("row123");// 6 bytes
+ pabos.write(b);
+ // Just use the 3rd BB from pool so that pabos, on request, wont get one
+ ByteBuffer bb1 = pool.getBuffer();
+ ByteBuffer bb = ByteBuffer.wrap(Bytes.toBytes("row123_cf1_q1"));// 13 bytes
+ pabos.write(bb, 0, bb.capacity());
+ pool.putBuffer(bb1);
+ pabos.writeInt(123);
+ pabos.writeInt(124);
+ assertEquals(0, pool.getQueueSize());
+ List allBufs = pabos.getByteBuffers();
+ assertEquals(4, allBufs.size());
+ assertEquals(3, pabos.bufsFromPool.size());
+ ByteBuffer b1 = allBufs.get(0);
+ assertEquals(10, b1.remaining());
+ assertEquals(2, b1.get());
+ assertEquals(100, b1.getInt());
+ byte[] bActual = new byte[b.length];
+ b1.get(bActual, 0, 5);//5 bytes in 1st BB
+ ByteBuffer b2 = allBufs.get(1);
+ assertEquals(10, b2.remaining());
+ b2.get(bActual, 5, 1);// Remaining 1 byte in 2nd BB
+ assertTrue(Bytes.equals(b, bActual));
+ bActual = new byte[bb.capacity()];
+ b2.get(bActual, 0, 9);
+ ByteBuffer b3 = allBufs.get(2);
+ assertEquals(8, b3.remaining());
+ b3.get(bActual, 9, 4);
+ assertTrue(ByteBufferUtils.equals(bb, 0, bb.capacity(), bActual, 0, bActual.length));
+ assertEquals(123, b3.getInt());
+ ByteBuffer b4 = allBufs.get(3);
+ assertEquals(4, b4.remaining());
+ assertEquals(124, b4.getInt());
+ pabos.close();
+ assertEquals(3, pool.getQueueSize());
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
index babd2f8..39efa40 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
@@ -46,6 +46,13 @@ class BufferChain {
this.buffers = bbs.toArray(new ByteBuffer[bbs.size()]);
}
+ BufferChain(List buffers) {
+ for (ByteBuffer b : buffers) {
+ this.remaining += b.remaining();
+ }
+ this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
+ }
+
/**
* Expensive. Makes a new buffer to hold a copy of what is in contained ByteBuffers. This
* call drains this instance; it cannot be used subsequent to the call.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index f0aed2e..7f86149 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -83,9 +83,10 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
-import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.PoolAwareByteBuffersOutputStream;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -281,7 +282,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private UserProvider userProvider;
- private final BoundedByteBufferPool reservoir;
+ private final ByteBufferPool reservoir;
private volatile boolean allowFallbackToSimpleAuth;
@@ -311,7 +312,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected long size; // size of current call
protected boolean isError;
protected TraceInfo tinfo;
- private ByteBuffer cellBlock = null;
+ private PoolAwareByteBuffersOutputStream cellBlockStream = null;
private User user;
private InetAddress remoteAddress;
@@ -352,10 +353,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
justification="Presume the lock on processing request held by caller is protection enough")
void done() {
- if (this.cellBlock != null && reservoir != null) {
- // Return buffer to reservoir now we are done with it.
- reservoir.putBuffer(this.cellBlock);
- this.cellBlock = null;
+ if (this.cellBlockStream != null) {
+ this.cellBlockStream.close();// The close will return back the BBs which we got from pool.
+ this.cellBlockStream = null;
}
this.connection.decRpcCount(); // Say that we're done with this call.
}
@@ -415,38 +415,43 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// Call id.
headerBuilder.setCallId(this.id);
if (t != null) {
- ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
- exceptionBuilder.setExceptionClassName(t.getClass().getName());
- exceptionBuilder.setStackTrace(errorMsg);
- exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
- if (t instanceof RegionMovedException) {
- // Special casing for this exception. This is only one carrying a payload.
- // Do this instead of build a generic system for allowing exceptions carry
- // any kind of payload.
- RegionMovedException rme = (RegionMovedException)t;
- exceptionBuilder.setHostname(rme.getHostname());
- exceptionBuilder.setPort(rme.getPort());
- }
- // Set the exception as the result of the method invocation.
- headerBuilder.setException(exceptionBuilder.build());
+ setExceptionResponse(t, errorMsg, headerBuilder);
}
// Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
// reservoir when finished. This is hacky and the hack is not contained but benefits are
// high when we can avoid a big buffer allocation on each rpc.
- this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
- this.connection.compressionCodec, cells, reservoir);
- if (this.cellBlock != null) {
+ List cellBlock = null;
+ int cellBlockSize = 0;
+ if (reservoir != null) {
+ this.cellBlockStream = ipcUtil.buildCellBlockStream(this.connection.codec,
+ this.connection.compressionCodec, cells, reservoir);
+ if (this.cellBlockStream != null) {
+ cellBlock = this.cellBlockStream.getByteBuffers();
+ cellBlockSize = this.cellBlockStream.size();
+ }
+ } else {
+ ByteBuffer b = ipcUtil.buildCellBlock(this.connection.codec,
+ this.connection.compressionCodec, cells);
+ if (b != null) {
+ cellBlockSize = b.remaining();
+ cellBlock = new ArrayList(1);
+ cellBlock.add(b);
+ }
+ }
+
+ if (cellBlockSize > 0) {
CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
// Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
- cellBlockBuilder.setLength(this.cellBlock.limit());
+ cellBlockBuilder.setLength(cellBlockSize);
headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
}
Message header = headerBuilder.build();
-
- byte[] b = createHeaderAndMessageBytes(result, header);
-
- bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock);
-
+ byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize);
+ List responseBufs = new ArrayList(
+ (cellBlock == null ? 1 : cellBlock.size()) + 1);
+ responseBufs.add(ByteBuffer.wrap(b));
+ if (cellBlock != null) responseBufs.addAll(cellBlock);
+ bc = new BufferChain(responseBufs);
if (connection.useWrap) {
bc = wrapWithSasl(bc);
}
@@ -466,7 +471,25 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
- private byte[] createHeaderAndMessageBytes(Message result, Message header)
+ private void setExceptionResponse(Throwable t, String errorMsg,
+ ResponseHeader.Builder headerBuilder) {
+ ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
+ exceptionBuilder.setExceptionClassName(t.getClass().getName());
+ exceptionBuilder.setStackTrace(errorMsg);
+ exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
+ if (t instanceof RegionMovedException) {
+ // Special casing for this exception. This is only one carrying a payload.
+ // Do this instead of build a generic system for allowing exceptions carry
+ // any kind of payload.
+ RegionMovedException rme = (RegionMovedException)t;
+ exceptionBuilder.setHostname(rme.getHostname());
+ exceptionBuilder.setPort(rme.getPort());
+ }
+ // Set the exception as the result of the method invocation.
+ headerBuilder.setException(exceptionBuilder.build());
+ }
+
+ private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize)
throws IOException {
// Organize the response as a set of bytebuffers rather than collect it all together inside
// one big byte array; save on allocations.
@@ -483,7 +506,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// calculate the total size
int totalSize = headerSerializedSize + headerVintSize
+ (resultSerializedSize + resultVintSize)
- + (this.cellBlock == null ? 0 : this.cellBlock.limit());
+ + cellBlockSize;
// The byte[] should also hold the totalSize of the header, message and the cellblock
byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize
+ resultVintSize + Bytes.SIZEOF_INT];
@@ -2064,11 +2087,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
RpcScheduler scheduler)
throws IOException {
if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
- this.reservoir = new BoundedByteBufferPool(
- conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
- conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
- // Make the max twice the number of handlers to be safe.
- conf.getInt("hbase.ipc.server.reservoir.initial.max",
+ this.reservoir = new ByteBufferPool(
+ conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY, ByteBufferPool.DEFAULT_BUFFER_SIZE),
+ conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
} else {