diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 787aa47..7ec1545 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -427,7 +427,8 @@ public class AsyncRpcChannel { try(ByteBufOutputStream out = new ByteBufOutputStream(b)) { call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock)); } - + // release cellBlock + client.releaseCellBlock(cellBlock); channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id)); } catch (IOException e) { close(e); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index c2bd457..b4060fd 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -45,7 +45,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -53,6 +52,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MetricsConnection; +import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVM; @@ -96,6 +96,8 @@ public class AsyncRpcClient extends AbstractRpcClient { private final PoolMap connections; final FailedServers failedServers; + + private final BoundedByteBufferPool reservoir; @VisibleForTesting final Bootstrap bootstrap; @@ -175,6 +177,17 @@ public class AsyncRpcClient extends AbstractRpcClient { this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration)); this.failedServers = new FailedServers(configuration); + if (conf.getBoolean("hbase.ipc.client.reservoir.enabled", true)) { + this.reservoir = new BoundedByteBufferPool( + conf.getInt("hbase.ipc.client.reservoir.max.buffer.size", 1024 * 1024), + conf.getInt("hbase.ipc.client.reservoir.initial.buffer.size", 16 * 1024), + conf.getInt("hbase.ipc.client.reservoir.initial.max", + conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)), false); + } else { + reservoir = null; + } + int operationTimeout = configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); @@ -354,7 +367,7 @@ public class AsyncRpcClient extends AbstractRpcClient { * @throws java.io.IOException if block creation fails */ public ByteBuffer buildCellBlock(CellScanner cells) throws IOException { - return ipcUtil.buildCellBlock(this.codec, this.compressor, cells); + return ipcUtil.buildCellBlock(this.codec, this.compressor, cells, reservoir); } /** @@ -500,4 +513,10 @@ public class AsyncRpcClient extends AbstractRpcClient { Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { return WHEEL_TIMER.newTimeout(task, delay, unit); } + + public void releaseCellBlock(ByteBuffer cellBlock) { + if(reservoir != null && cellBlock != null) { + reservoir.putBuffer(cellBlock); + } + } } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index d98d81d..fa36094 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -154,7 +154,13 @@ public class IPCUtil { encoder.flush(); // If no cells, don't mess around. Just return null (could be a bunch of existence checking // gets or something -- stuff that does not return a cell). - if (count == 0) return null; + if (count == 0) { + if( pool != null && baos != null) { + // Put back to pool + pool.putBuffer(baos.getByteBuffer()); + } + return null; + } } catch (BufferOverflowException e) { throw new DoNotRetryIOException(e); } finally { diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java index deddb51..0ff3e50 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java @@ -99,18 +99,31 @@ public class BoundedByteBufferPool { // For reporting, only used in the log private final AtomicLong allocationsRef = new AtomicLong(); + private boolean isDirect = true; /** * @param maxByteBufferSizeToCache * @param initialByteBufferSize * @param maxToCache */ public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize, - final int maxToCache) { + final int maxToCache, boolean isDirect) { this.maxByteBufferSizeToCache = maxByteBufferSizeToCache; this.runningAverageRef = new AtomicInteger(initialByteBufferSize); this.maxToCache = maxToCache; + this.isDirect = isDirect; } + /** + * @param maxByteBufferSizeToCache + * @param initialByteBufferSize + * @param maxToCache + */ + + public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize, + final int maxToCache) { + this(maxByteBufferSizeToCache, initialByteBufferSize, maxToCache, true); + } + public ByteBuffer getBuffer() { ByteBuffer bb = buffers.poll(); if (bb != null) { @@ -134,7 +147,7 @@ public class BoundedByteBufferPool { } int runningAverage = runningAverageRef.get(); - bb = ByteBuffer.allocateDirect(runningAverage); + bb = isDirect? ByteBuffer.allocateDirect(runningAverage) : ByteBuffer.allocate(runningAverage); if (LOG.isTraceEnabled()) { long allocations = allocationsRef.incrementAndGet();