getByteBuffers() {
+ if (!this.lastBufFlipped) {
+ this.lastBufFlipped = true;
+ // All the other BBs are already flipped while moving to the new BB.
+ curBuf.flip();
+ }
+ return this.allBufs;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ int toWrite = 0;
+ while (len > 0) {
+ toWrite = Math.min(len, this.curBuf.remaining());
+ ByteBufferUtils.copyFromArrayToBuffer(this.curBuf, b, off, toWrite);
+ off += toWrite;
+ len -= toWrite;
+ if (len > 0) {
+ allocateNewBuffer();// The curBuf is over. Let us move to the next one
+ }
+ }
+ }
+
+ @Override
+ public void write(ByteBuffer b, int off, int len) throws IOException {
+ int toWrite = 0;
+ while (len > 0) {
+ toWrite = Math.min(len, this.curBuf.remaining());
+ ByteBufferUtils.copyFromBufferToBuffer(b, this.curBuf, off, toWrite);
+ off += toWrite;
+ len -= toWrite;
+ if (len > 0) {
+ allocateNewBuffer();// The curBuf is over. Let us move to the next one
+ }
+ }
+ }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
index d4bda18..f77092d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
@@ -44,7 +44,11 @@ public class ByteBufferOutputStream extends OutputStream
// http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
- protected ByteBuffer buf;
+ protected ByteBuffer curBuf = null;
+
+ ByteBufferOutputStream() {
+
+ }
public ByteBufferOutputStream(int capacity) {
this(capacity, false);
@@ -66,12 +70,12 @@ public class ByteBufferOutputStream extends OutputStream
*/
public ByteBufferOutputStream(final ByteBuffer bb) {
assert bb.order() == ByteOrder.BIG_ENDIAN;
- this.buf = bb;
- this.buf.clear();
+ this.curBuf = bb;
+ this.curBuf.clear();
}
public int size() {
- return buf.position();
+ return curBuf.position();
}
private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
@@ -86,25 +90,25 @@ public class ByteBufferOutputStream extends OutputStream
* @return ByteBuffer
*/
public ByteBuffer getByteBuffer() {
- buf.flip();
- return buf;
+ curBuf.flip();
+ return curBuf;
}
- private void checkSizeAndGrow(int extra) {
- long capacityNeeded = buf.position() + (long) extra;
- if (capacityNeeded > buf.limit()) {
+ protected void checkSizeAndGrow(int extra) {
+ long capacityNeeded = curBuf.position() + (long) extra;
+ if (capacityNeeded > curBuf.limit()) {
// guarantee it's possible to fit
if (capacityNeeded > MAX_ARRAY_SIZE) {
throw new BufferOverflowException();
}
// double until hit the cap
- long nextCapacity = Math.min(buf.capacity() * 2L, MAX_ARRAY_SIZE);
+ long nextCapacity = Math.min(curBuf.capacity() * 2L, MAX_ARRAY_SIZE);
// but make sure there is enough if twice the existing capacity is still too small
nextCapacity = Math.max(nextCapacity, capacityNeeded);
- ByteBuffer newBuf = allocate((int) nextCapacity, buf.isDirect());
- buf.flip();
- ByteBufferUtils.copyFromBufferToBuffer(buf, newBuf);
- buf = newBuf;
+ ByteBuffer newBuf = allocate((int) nextCapacity, curBuf.isDirect());
+ curBuf.flip();
+ ByteBufferUtils.copyFromBufferToBuffer(curBuf, newBuf);
+ curBuf = newBuf;
}
}
@@ -112,7 +116,7 @@ public class ByteBufferOutputStream extends OutputStream
@Override
public void write(int b) throws IOException {
checkSizeAndGrow(Bytes.SIZEOF_BYTE);
- buf.put((byte)b);
+ curBuf.put((byte)b);
}
/**
@@ -122,9 +126,9 @@ public class ByteBufferOutputStream extends OutputStream
* @param out the output stream to which to write the data.
* @exception IOException if an I/O error occurs.
*/
- public synchronized void writeTo(OutputStream out) throws IOException {
+ public void writeTo(OutputStream out) throws IOException {
WritableByteChannel channel = Channels.newChannel(out);
- ByteBuffer bb = buf.duplicate();
+ ByteBuffer bb = curBuf.duplicate();
bb.flip();
channel.write(bb);
}
@@ -137,12 +141,12 @@ public class ByteBufferOutputStream extends OutputStream
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkSizeAndGrow(len);
- ByteBufferUtils.copyFromArrayToBuffer(buf, b, off, len);
+ ByteBufferUtils.copyFromArrayToBuffer(curBuf, b, off, len);
}
public void write(ByteBuffer b, int off, int len) throws IOException {
checkSizeAndGrow(len);
- ByteBufferUtils.copyFromBufferToBuffer(b, buf, off, len);
+ ByteBufferUtils.copyFromBufferToBuffer(b, curBuf, off, len);
}
/**
@@ -153,7 +157,7 @@ public class ByteBufferOutputStream extends OutputStream
*/
public void writeInt(int i) throws IOException {
checkSizeAndGrow(Bytes.SIZEOF_INT);
- ByteBufferUtils.putInt(this.buf, i);
+ ByteBufferUtils.putInt(this.curBuf, i);
}
@Override
@@ -167,7 +171,7 @@ public class ByteBufferOutputStream extends OutputStream
}
public byte[] toByteArray(int offset, int length) {
- ByteBuffer bb = buf.duplicate();
+ ByteBuffer bb = curBuf.duplicate();
bb.flip();
byte[] chunk = new byte[length];
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
new file mode 100644
index 0000000..a4dfd55
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. This
+ * pool keeps an upper bound on the count of ByteBuffers in the pool and a fixed size of ByteBuffer
+ * that it will create. When requested, if a free ByteBuffer is already present, it will return
+ * that. And when no free ByteBuffer available and we are below the max count, it will create a new
+ * one and return that.
+ *
+ *
+ * Note: This pool returns off heap ByteBuffers.
+ *
+ * This class is thread safe.
+ * @see ByteBufferListOutputStream
+ */
+@InterfaceAudience.Private
+public class ByteBufferPool {
+ private static final Log LOG = LogFactory.getLog(ByteBufferPool.class);
+ // TODO better config names?
+ // hbase.ipc.server.reservoir.initial.max -> hbase.ipc.server.reservoir.max.buffer.count
+ // hbase.ipc.server.reservoir.initial.buffer.size -> hbase.ipc.server.reservoir.buffer.size
+ public static final String MAX_POOL_SIZE_KEY = "hbase.ipc.server.reservoir.initial.max";
+ public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size";
+ public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;// 64 KB. Making it same as the chunk size
+ // what we will write/read to/from the
+ // socket channel.
+ private final Queue buffers = new ConcurrentLinkedQueue();
+
+ private final int bufferSize;
+ private final int maxPoolSize;
+ private AtomicInteger count; // Count of the BBs created already for this pool.
+ private boolean maxPoolSizeInfoLevelLogged = false;
+
+ public ByteBufferPool(final int bufferSize, final int maxPoolSize) {
+ this.bufferSize = bufferSize;
+ this.maxPoolSize = maxPoolSize;
+ // TODO can add initialPoolSize config also and make those many BBs ready for use.
+ LOG.info("Created ByteBufferPool with bufferSize : " + bufferSize + " and maxPoolSize : "
+ + maxPoolSize);
+ this.count = new AtomicInteger(0);
+ }
+
+ /**
+ * @return One free ByteBuffer from the pool. If no free ByteBuffer and we have not reached the
+ * maximum pool size, it will create a new one and return. In case of max pool size also
+ * reached, will return null. When pool returned a ByteBuffer, make sure to return it back
+ * to pool after use.
+ * @see #putbackBuffer(ByteBuffer)
+ */
+ public ByteBuffer getBuffer() {
+ ByteBuffer bb = buffers.poll();
+ if (bb != null) {
+ // Clear sets limit == capacity. Position == 0.
+ bb.clear();
+ return bb;
+ }
+ while (true) {
+ int c = this.count.intValue();
+ if (c >= this.maxPoolSize) {
+ if (maxPoolSizeInfoLevelLogged) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Pool already reached its max capacity : " + this.maxPoolSize
+ + " and no free buffers now. Consider increasing the value for '"
+ + MAX_POOL_SIZE_KEY + "' ?");
+ }
+ } else {
+ LOG.info("Pool already reached its max capacity : " + this.maxPoolSize
+ + " and no free buffers now. Consider increasing the value for '" + MAX_POOL_SIZE_KEY
+ + "' ?");
+ maxPoolSizeInfoLevelLogged = true;
+ }
+ return null;
+ }
+ if (!this.count.compareAndSet(c, c + 1)) {
+ continue;
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Creating a new offheap ByteBuffer of size: " + this.bufferSize);
+ }
+ return ByteBuffer.allocateDirect(this.bufferSize);
+ }
+ }
+
+ /**
+ * Return back a ByteBuffer after its use. Do not try to return put back a ByteBuffer, not
+ * obtained from this pool.
+ * @param buf ByteBuffer to return.
+ */
+ public void putbackBuffer(ByteBuffer buf) {
+ if (buf.capacity() != this.bufferSize || !buf.isDirect()) {
+ LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
+ return;
+ }
+ buffers.offer(buf);
+ }
+
+ int getBufferSize() {
+ return this.bufferSize;
+ }
+
+ /**
+ * @return Number of free buffers
+ */
+ @VisibleForTesting
+ int getQueueSize() {
+ return buffers.size();
+ }
+}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
new file mode 100644
index 0000000..3579256
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ IOTests.class, SmallTests.class })
+public class TestByteBufferListOutputStream {
+
+ @Test
+ public void testWrites() throws Exception {
+ ByteBufferPool pool = new ByteBufferPool(10, 3);
+ ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
+ bbos.write(2);// Write a byte
+ bbos.writeInt(100);// Write an int
+ byte[] b = Bytes.toBytes("row123");// 6 bytes
+ bbos.write(b);
+ // Just use the 3rd BB from pool so that pabos, on request, wont get one
+ ByteBuffer bb1 = pool.getBuffer();
+ ByteBuffer bb = ByteBuffer.wrap(Bytes.toBytes("row123_cf1_q1"));// 13 bytes
+ bbos.write(bb, 0, bb.capacity());
+ pool.putbackBuffer(bb1);
+ bbos.writeInt(123);
+ bbos.writeInt(124);
+ assertEquals(0, pool.getQueueSize());
+ List allBufs = bbos.getByteBuffers();
+ assertEquals(4, allBufs.size());
+ assertEquals(3, bbos.bufsFromPool.size());
+ ByteBuffer b1 = allBufs.get(0);
+ assertEquals(10, b1.remaining());
+ assertEquals(2, b1.get());
+ assertEquals(100, b1.getInt());
+ byte[] bActual = new byte[b.length];
+ b1.get(bActual, 0, 5);//5 bytes in 1st BB
+ ByteBuffer b2 = allBufs.get(1);
+ assertEquals(10, b2.remaining());
+ b2.get(bActual, 5, 1);// Remaining 1 byte in 2nd BB
+ assertTrue(Bytes.equals(b, bActual));
+ bActual = new byte[bb.capacity()];
+ b2.get(bActual, 0, 9);
+ ByteBuffer b3 = allBufs.get(2);
+ assertEquals(8, b3.remaining());
+ b3.get(bActual, 9, 4);
+ assertTrue(ByteBufferUtils.equals(bb, 0, bb.capacity(), bActual, 0, bActual.length));
+ assertEquals(123, b3.getInt());
+ ByteBuffer b4 = allBufs.get(3);
+ assertEquals(4, b4.remaining());
+ assertEquals(124, b4.getInt());
+ bbos.closeAndPutbackBuffers();
+ assertEquals(3, pool.getQueueSize());
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
index babd2f8..39efa40 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
@@ -46,6 +46,13 @@ class BufferChain {
this.buffers = bbs.toArray(new ByteBuffer[bbs.size()]);
}
+ BufferChain(List buffers) {
+ for (ByteBuffer b : buffers) {
+ this.remaining += b.remaining();
+ }
+ this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
+ }
+
/**
* Expensive. Makes a new buffer to hold a copy of what is in contained ByteBuffers. This
* call drains this instance; it cannot be used subsequent to the call.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 1087c42..f244e16 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -85,9 +85,10 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
-import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -289,7 +290,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private UserProvider userProvider;
- private final BoundedByteBufferPool reservoir;
+ private final ByteBufferPool reservoir;
private volatile boolean allowFallbackToSimpleAuth;
@@ -320,7 +321,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected long size; // size of current call
protected boolean isError;
protected TraceInfo tinfo;
- private ByteBuffer cellBlock = null;
+ private ByteBufferListOutputStream cellBlockStream = null;
private User user;
private InetAddress remoteAddress;
@@ -362,10 +363,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
justification="Presume the lock on processing request held by caller is protection enough")
void done() {
- if (this.cellBlock != null && reservoir != null) {
- // Return buffer to reservoir now we are done with it.
- reservoir.putBuffer(this.cellBlock);
- this.cellBlock = null;
+ if (this.cellBlockStream != null) {
+ this.cellBlockStream.closeAndPutbackBuffers();// The close will return back the BBs which we
+ // got from pool.
+ this.cellBlockStream = null;
}
this.connection.decRpcCount(); // Say that we're done with this call.
}
@@ -425,38 +426,43 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// Call id.
headerBuilder.setCallId(this.id);
if (t != null) {
- ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
- exceptionBuilder.setExceptionClassName(t.getClass().getName());
- exceptionBuilder.setStackTrace(errorMsg);
- exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
- if (t instanceof RegionMovedException) {
- // Special casing for this exception. This is only one carrying a payload.
- // Do this instead of build a generic system for allowing exceptions carry
- // any kind of payload.
- RegionMovedException rme = (RegionMovedException)t;
- exceptionBuilder.setHostname(rme.getHostname());
- exceptionBuilder.setPort(rme.getPort());
- }
- // Set the exception as the result of the method invocation.
- headerBuilder.setException(exceptionBuilder.build());
+ setExceptionResponse(t, errorMsg, headerBuilder);
}
// Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
// reservoir when finished. This is hacky and the hack is not contained but benefits are
// high when we can avoid a big buffer allocation on each rpc.
- this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
- this.connection.compressionCodec, cells, reservoir);
- if (this.cellBlock != null) {
+ List cellBlock = null;
+ int cellBlockSize = 0;
+ if (reservoir != null) {
+ this.cellBlockStream = ipcUtil.buildCellBlockStream(this.connection.codec,
+ this.connection.compressionCodec, cells, reservoir);
+ if (this.cellBlockStream != null) {
+ cellBlock = this.cellBlockStream.getByteBuffers();
+ cellBlockSize = this.cellBlockStream.size();
+ }
+ } else {
+ ByteBuffer b = ipcUtil.buildCellBlock(this.connection.codec,
+ this.connection.compressionCodec, cells);
+ if (b != null) {
+ cellBlockSize = b.remaining();
+ cellBlock = new ArrayList(1);
+ cellBlock.add(b);
+ }
+ }
+
+ if (cellBlockSize > 0) {
CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
// Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
- cellBlockBuilder.setLength(this.cellBlock.limit());
+ cellBlockBuilder.setLength(cellBlockSize);
headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
}
Message header = headerBuilder.build();
-
- byte[] b = createHeaderAndMessageBytes(result, header);
-
- bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock);
-
+ byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize);
+ List responseBufs = new ArrayList(
+ (cellBlock == null ? 1 : cellBlock.size()) + 1);
+ responseBufs.add(ByteBuffer.wrap(b));
+ if (cellBlock != null) responseBufs.addAll(cellBlock);
+ bc = new BufferChain(responseBufs);
if (connection.useWrap) {
bc = wrapWithSasl(bc);
}
@@ -476,7 +482,25 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
- private byte[] createHeaderAndMessageBytes(Message result, Message header)
+ private void setExceptionResponse(Throwable t, String errorMsg,
+ ResponseHeader.Builder headerBuilder) {
+ ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
+ exceptionBuilder.setExceptionClassName(t.getClass().getName());
+ exceptionBuilder.setStackTrace(errorMsg);
+ exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
+ if (t instanceof RegionMovedException) {
+ // Special casing for this exception. This is only one carrying a payload.
+ // Do this instead of build a generic system for allowing exceptions carry
+ // any kind of payload.
+ RegionMovedException rme = (RegionMovedException)t;
+ exceptionBuilder.setHostname(rme.getHostname());
+ exceptionBuilder.setPort(rme.getPort());
+ }
+ // Set the exception as the result of the method invocation.
+ headerBuilder.setException(exceptionBuilder.build());
+ }
+
+ private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize)
throws IOException {
// Organize the response as a set of bytebuffers rather than collect it all together inside
// one big byte array; save on allocations.
@@ -493,7 +517,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// calculate the total size
int totalSize = headerSerializedSize + headerVintSize
+ (resultSerializedSize + resultVintSize)
- + (this.cellBlock == null ? 0 : this.cellBlock.limit());
+ + cellBlockSize;
// The byte[] should also hold the totalSize of the header, message and the cellblock
byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize
+ resultVintSize + Bytes.SIZEOF_INT];
@@ -1084,6 +1108,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
} finally {
if (error) {
LOG.debug(getName() + call.toShortString() + ": output error -- closing");
+ // We will be closing this connection itself. Mark this call as done so that all the
+ // buffer(s) it got from pool can get released
+ call.done();
closeConnection(call.connection);
}
}
@@ -1998,11 +2025,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
RpcScheduler scheduler)
throws IOException {
if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
- this.reservoir = new BoundedByteBufferPool(
- conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
- conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
- // Make the max twice the number of handlers to be safe.
- conf.getInt("hbase.ipc.server.reservoir.initial.max",
+ this.reservoir = new ByteBufferPool(
+ conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY, ByteBufferPool.DEFAULT_BUFFER_SIZE),
+ conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY,
conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
} else {