.../apache/hadoop/hbase/ipc/CellBlockBuilder.java | 8 +- .../hbase/io/ByteBufferListOutputStream.java | 100 ++--------- .../org/apache/hadoop/hbase/io/ByteBufferPool.java | 2 +- .../hadoop/hbase/io/ByteBufferPoolManager.java | 188 +++++++++++++++++++++ .../hbase/io/TestByteBufferListOutputStream.java | 8 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 81 +++++++-- 6 files changed, 278 insertions(+), 109 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java index fb2cafa..e306526 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; -import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBufferPoolManager; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -212,15 +212,15 @@ class CellBlockBuilder { * @throws IOException if encoding the cells fail */ public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor, - CellScanner cellScanner, ByteBufferPool pool) throws IOException { + CellScanner cellScanner, ByteBufferPoolManager poolManager) throws IOException { if (cellScanner == null) { return null; } if (codec == null) { throw new CellScannerButNoCodecException(); } - assert pool != null; - ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool); + assert poolManager != null; + ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(poolManager); encodeCellsTo(bbos, cellScanner, codec, compressor); if (bbos.size() == 0) { bbos.releaseResources(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java index b4c00c6..3a69eb7 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java @@ -20,13 +20,10 @@ package org.apache.hadoop.hbase.io; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.ByteBufferUtils; /** * An OutputStream which writes data into ByteBuffers. It will try to get ByteBuffer, as and when @@ -39,52 +36,16 @@ import org.apache.hadoop.hbase.util.ByteBufferUtils; public class ByteBufferListOutputStream extends ByteBufferOutputStream { private static final Log LOG = LogFactory.getLog(ByteBufferListOutputStream.class); - private ByteBufferPool pool; - // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If - // it is not available will make a new one our own and keep writing to that. We keep track of all - // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure - // to return back all of them to pool - protected List allBufs = new ArrayList(); - protected List bufsFromPool = new ArrayList(); + private ByteBufferPoolManager poolManager; - private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already - - public ByteBufferListOutputStream(ByteBufferPool pool) { - this.pool = pool; - allocateNewBuffer(); - } - - private void allocateNewBuffer() { - if (this.curBuf != null) { - this.curBuf.flip();// On the current buf set limit = pos and pos = 0. - } - // Get an initial BB to work with from the pool - this.curBuf = this.pool.getBuffer(); - if (this.curBuf == null) { - // No free BB at this moment. Make a new one. The pool returns off heap BBs. Don't make off - // heap BB on demand. It is difficult to account for all such and so proper sizing of Max - // direct heap size. See HBASE-15525 also for more details. - // Make BB with same size of pool's buffer size. - this.curBuf = ByteBuffer.allocate(this.pool.getBufferSize()); - } else { - this.bufsFromPool.add(this.curBuf); - } - this.allBufs.add(this.curBuf); + public ByteBufferListOutputStream(ByteBufferPoolManager poolManager) { + this.poolManager = poolManager; + this.curBuf = this.poolManager.getCurrentBuffer(); } @Override public int size() { - int s = 0; - for (int i = 0; i < this.allBufs.size() - 1; i++) { - s += this.allBufs.get(i).remaining(); - } - // On the last BB, it might not be flipped yet if getByteBuffers is not yet called - if (this.lastBufFlipped) { - s += this.curBuf.remaining(); - } else { - s += this.curBuf.position(); - } - return s; + return this.poolManager.size(); } @Override @@ -94,10 +55,9 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream { @Override protected void checkSizeAndGrow(int extra) { - long capacityNeeded = curBuf.position() + (long) extra; - if (capacityNeeded > curBuf.limit()) { - allocateNewBuffer(); - } + this.poolManager.checkSizeAndGrow(extra); + // update the curBuf reference in case a new buffer was allocated + this.curBuf = this.poolManager.getCurrentBuffer(); } @Override @@ -118,14 +78,7 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream { LOG.debug(e); } // Return back all the BBs to pool - if (this.bufsFromPool != null) { - for (int i = 0; i < this.bufsFromPool.size(); i++) { - this.pool.putbackBuffer(this.bufsFromPool.get(i)); - } - this.bufsFromPool = null; - } - this.allBufs = null; - this.curBuf = null; + this.poolManager.releaseResources(); } @Override @@ -134,40 +87,17 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream { throw new UnsupportedOperationException(); } - public List getByteBuffers() { - if (!this.lastBufFlipped) { - this.lastBufFlipped = true; - // All the other BBs are already flipped while moving to the new BB. - curBuf.flip(); - } - return this.allBufs; - } - @Override public void write(byte[] b, int off, int len) throws IOException { - int toWrite = 0; - while (len > 0) { - toWrite = Math.min(len, this.curBuf.remaining()); - ByteBufferUtils.copyFromArrayToBuffer(this.curBuf, b, off, toWrite); - off += toWrite; - len -= toWrite; - if (len > 0) { - allocateNewBuffer();// The curBuf is over. Let us move to the next one - } - } + poolManager.write(b, off, len); + // update the current buffer + this.curBuf = this.poolManager.getCurrentBuffer(); } @Override public void write(ByteBuffer b, int off, int len) throws IOException { - int toWrite = 0; - while (len > 0) { - toWrite = Math.min(len, this.curBuf.remaining()); - ByteBufferUtils.copyFromBufferToBuffer(b, this.curBuf, off, toWrite); - off += toWrite; - len -= toWrite; - if (len > 0) { - allocateNewBuffer();// The curBuf is over. Let us move to the next one - } - } + poolManager.write(b, off, len); + // update the current buffer + this.curBuf = this.poolManager.getCurrentBuffer(); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java index e528f02..971c42c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java @@ -140,7 +140,7 @@ public class ByteBufferPool { buffers.offer(buf); } - int getBufferSize() { + public int getBufferSize() { return this.bufferSize; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPoolManager.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPoolManager.java new file mode 100644 index 0000000..8d76188 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPoolManager.java @@ -0,0 +1,188 @@ +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; + +@InterfaceAudience.Private +/** + * Manages the byteBuffer pool and helps creating and releasing back the buffers to the pool + */ +public class ByteBufferPoolManager { + private static final Log LOG = LogFactory.getLog(ByteBufferPoolManager.class); + + private ByteBufferPool pool; + + // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If + // it is not available will make a new one our own and keep writing to that. We keep track of all + // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure + // to return back all of them to pool + protected List allBufs = new ArrayList(); + protected List bufsFromPool = new ArrayList(); + + private boolean lastBufFlipped = false; + + private ByteBuffer curBuf; + + /** + * @param pool the ByteBuffer pool that has to be managed + */ + public ByteBufferPoolManager(ByteBufferPool pool) { + this.pool = pool; + getNewBuffer(); + } + + public int getBufferSize() { + return this.pool.getBufferSize(); + } + + /** + * Creates a new byte buffer + * @param length if length is non negative allocates an onheap BB of the specified length + * @return + */ + private ByteBuffer getNewBuffer() { + // Get an initial BB to work with from the pool + if (curBuf != null) { + curBuf.flip(); + } + ByteBuffer buffer; + buffer = this.pool.getBuffer(); + if (buffer == null) { + // No free BB at this moment. Make a new one. The pool returns off heap BBs. Don't make off + // heap BB on demand. It is difficult to account for all such and so proper sizing of Max + // direct heap size. See HBASE-15525 also for more details. + // Make BB with same size of pool's buffer size. + buffer = ByteBuffer.allocate(getBufferSize()); + } else { + this.bufsFromPool.add(buffer); + } + this.allBufs.add(buffer); + this.curBuf = buffer; + return buffer; + } + + /** + * Returns the current active byte buffer + * @return current active bytebuffer + */ + ByteBuffer getCurrentBuffer() { + return this.curBuf; + } + + void checkSizeAndGrow(int extra) { + // this also to go inside Pool manager + long capacityNeeded = curBuf.position() + (long) extra; + if (capacityNeeded > curBuf.limit()) { + getNewBuffer(); + } + } + + /** + * Writes the given buffer's data to the buffers managed by this buffer pool + * @param b + * @param off + * @param len + */ + void write(byte[] b, int off, int len) { + int toWrite = 0; + while (len > 0) { + toWrite = Math.min(len, this.curBuf.remaining()); + ByteBufferUtils.copyFromArrayToBuffer(this.curBuf, b, off, toWrite); + off += toWrite; + len -= toWrite; + if (len > 0) { + getNewBuffer();// The curBuf is over. Let us move to the next one + } + } + } + + int size() { + int s = 0; + for (int i = 0; i < allBufs.size() - 1; i++) { + s += allBufs.get(i).remaining(); + } + // On the last BB, it might not be flipped yet if getByteBuffers is not yet called + if (this.lastBufFlipped) { + s += this.curBuf.remaining(); + } else { + s += this.curBuf.position(); + } + return s; + } + + /** + * Writes the given buffer's data to the buffers managed by this buffer pool + * @param b + * @param off + * @param len + * @throws IOException + */ + void write(ByteBuffer b, int off, int len) throws IOException { + int toWrite = 0; + while (len > 0) { + toWrite = Math.min(len, this.curBuf.remaining()); + ByteBufferUtils.copyFromBufferToBuffer(b, this.curBuf, off, toWrite); + off += toWrite; + len -= toWrite; + if (len > 0) { + getNewBuffer();// The curBuf is over. Let us move to the next one + } + } + } + + /** + * Release the resources it uses (The ByteBuffers) which are obtained from pool. Call this only + * when all the data is fully used. And it must be called at the end of usage else we will leak + * ByteBuffers from pool. + */ + void releaseResources() { + // Return back all the BBs to pool + if (this.bufsFromPool != null) { + for (int i = 0; i < this.bufsFromPool.size(); i++) { + this.pool.putbackBuffer(this.bufsFromPool.get(i)); + } + this.bufsFromPool = null; + } + this.allBufs = null; + } + + /** + * Returns the set of buffers that were allocated + * @return + */ + public List getByteBuffers() { + if (!this.lastBufFlipped) { + this.lastBufFlipped = true; + // All the other BBs are already flipped while moving to the new BB. + curBuf.flip(); + } + return this.allBufs; + } + + /** + * Sees if the currentBuffer can accommodate the given length. If so return the current buffer, if + * not null. + * @param length to be written + * @return the curBuf if the currentBuffer can accommodate the length if not return null; + */ + public ByteBuffer canWriteLenInCurrentBuffer(int length) { + // TODO : Add test case + if (curBuf != null) { + // in case getByteBuffers was not called. In the current usage we don't have this case + // of calling this method before getByteBuffers + int remain = lastBufFlipped ? curBuf.remaining() : curBuf.position(); + if (remain + length <= getBufferSize()) { + return curBuf; + } + } + return null; + } + //TODO : add API to create the list of buffers (ByteBuffs) for write side pool. +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java index e1d1e04..4ca74b2 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java @@ -35,7 +35,9 @@ public class TestByteBufferListOutputStream { @Test public void testWrites() throws Exception { ByteBufferPool pool = new ByteBufferPool(10, 3); - ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool); + ByteBufferPoolManager poolManager = new ByteBufferPoolManager(pool); + ByteBufferListOutputStream bbos = + new ByteBufferListOutputStream(poolManager); bbos.write(2);// Write a byte bbos.writeInt(100);// Write an int byte[] b = Bytes.toBytes("row123");// 6 bytes @@ -48,9 +50,9 @@ public class TestByteBufferListOutputStream { bbos.writeInt(123); bbos.writeInt(124); assertEquals(0, pool.getQueueSize()); - List allBufs = bbos.getByteBuffers(); + List allBufs = poolManager.getByteBuffers(); assertEquals(4, allBufs.size()); - assertEquals(3, bbos.bufsFromPool.size()); + assertEquals(3, poolManager.bufsFromPool.size()); ByteBuffer b1 = allBufs.get(0); assertEquals(10, b1.remaining()); assertEquals(2, b1.get()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index dd9bb01..c917898 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -89,6 +89,7 @@ import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBufferPoolManager; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -110,6 +111,7 @@ import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -436,12 +438,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // high when we can avoid a big buffer allocation on each rpc. List cellBlock = null; int cellBlockSize = 0; + ByteBufferPoolManager poolManager = null; if (reservoir != null) { + poolManager = new ByteBufferPoolManager(reservoir); this.cellBlockStream = cellBlockBuilder.buildCellBlockStream(this.connection.codec, - this.connection.compressionCodec, cells, reservoir); + this.connection.compressionCodec, cells, poolManager); if (this.cellBlockStream != null) { - cellBlock = this.cellBlockStream.getByteBuffers(); - cellBlockSize = this.cellBlockStream.size(); + cellBlock = poolManager.getByteBuffers(); + cellBlockSize = cellBlockStream.size(); } } else { ByteBuffer b = cellBlockBuilder.buildCellBlock(this.connection.codec, @@ -460,7 +464,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); } Message header = headerBuilder.build(); - byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize); + ByteBuffer headerBuf = + createHeaderAndMessageBytes(result, header, cellBlockSize, poolManager); ByteBuffer[] responseBufs = null; int cellBlockBufferSize = 0; if (cellBlock != null) { @@ -469,7 +474,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } else { responseBufs = new ByteBuffer[1]; } - responseBufs[0] = ByteBuffer.wrap(b); + responseBufs[0] = headerBuf; if (cellBlock != null) { for (int i = 0; i < cellBlockBufferSize; i++) { responseBufs[i + 1] = cellBlock.get(i); @@ -513,8 +518,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setException(exceptionBuilder.build()); } - private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize) - throws IOException { + private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize, + ByteBufferPoolManager poolManager) throws IOException { // Organize the response as a set of bytebuffers rather than collect it all together inside // one big byte array; save on allocations. int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, @@ -531,15 +536,58 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { int totalSize = headerSerializedSize + headerVintSize + (resultSerializedSize + resultVintSize) + cellBlockSize; - // The byte[] should also hold the totalSize of the header, message and the cellblock - byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize - + resultVintSize + Bytes.SIZEOF_INT]; - // The RpcClient expects the int to be in a format that code be decoded by - // the DataInputStream#readInt(). Hence going with the Bytes.toBytes(int) - // form of writing int. - Bytes.putInt(b, 0, totalSize); - CodedOutputStream cos = CodedOutputStream.newInstance(b, Bytes.SIZEOF_INT, - b.length - Bytes.SIZEOF_INT); + int headerSize = headerSerializedSize + headerVintSize + resultSerializedSize + + resultVintSize + Bytes.SIZEOF_INT; + if (cellBlockSize > 0 && cellBlockStream != null) { + // Only if the current buffer has enough space for header use it. Else allocate + // a new buffer + ByteBuffer headerBuf = poolManager.canWriteLenInCurrentBuffer(headerSize); + int limit = -1; + if (headerBuf == null) { + // allocate new one because the pool will have to allocate a new curBuf of 64K + // to accommodate this header + return createHeaderAndMessageWithOnheapBuffer(result, header, totalSize, headerSize); + } else { + headerBuf.mark(); + // the current limit + limit = headerBuf.limit(); + // Position such that we write the header to the end of the curBuf + headerBuf.position(limit); + // limit to the header size + headerBuf.limit(headerSize + limit); + ByteBufferUtils.putInt(headerBuf, totalSize); + // create COS that works on BB + CodedOutputStream cos = CodedOutputStream.newInstance(headerBuf); + if (header != null) { + cos.writeMessageNoTag(header); + } + if (result != null) { + cos.writeMessageNoTag(result); + } + cos.flush(); + cos.checkNoSpaceLeft(); + // move back to the starting position + headerBuf.position(limit); + // one object creation can't be avoided!!!! + // this duplicate() ensures that only the header portion is used as an individual BB + ByteBuffer res = headerBuf.duplicate(); + headerBuf.reset(); + // limit the headerBuf to the original limit such that the header is not read + headerBuf.limit(limit); + return res; + } + } else { + // TODO : Use the pool for non PB cases also + // The byte[] should also hold the totalSize of the header, message and the cellblock + return createHeaderAndMessageWithOnheapBuffer(result, header, totalSize, headerSize); + } + } + + private ByteBuffer createHeaderAndMessageWithOnheapBuffer(Message result, Message header, + int totalSize, int headerSize) throws IOException { + ByteBuffer b = ByteBuffer.allocate(headerSize); + ByteBufferUtils.putInt(b, totalSize); + CodedOutputStream cos = CodedOutputStream.newInstance(b); if (header != null) { cos.writeMessageNoTag(header); } @@ -548,6 +596,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } cos.flush(); cos.checkNoSpaceLeft(); + b.flip(); return b; }