diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 734227c..33f276d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -26,6 +26,7 @@ import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.ThresholdingOutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; +import org.apache.hadoop.hbase.io.GatheringBuffer; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -81,6 +83,47 @@ public class IPCUtil { */ public static class CellScannerButNoCodecException extends HBaseIOException {}; + // Enough small to enable other succeed processing. + private static final int CELLBLOCK_BYTE_COUNT_THRESHOLD = Integer.MAX_VALUE / 16; + + private static class ThresholdingOutputStreamAdapter extends ThresholdingOutputStream { + final OutputStream out; + + ThresholdingOutputStreamAdapter(OutputStream out) { + super(CELLBLOCK_BYTE_COUNT_THRESHOLD); + this.out = out; + } + + @Override + protected OutputStream getStream() throws IOException { + return out; + } + + @Override + protected void thresholdReached() throws IOException { + throw new IOException( + "The written byte count exceeded the threshold: " + + "count=" + getByteCount() + ", threshold=" + getThreshold()); + } + } + + /** + * @return true at least one cell has been written to {@code os} + */ + private static boolean encode(CellScanner cellScanner, Codec codec, OutputStream os) + throws IOException { + if (! cellScanner.advance()) { + return false; + } + + Codec.Encoder encoder = codec.getEncoder(os); + do { + encoder.write(cellScanner.current()); + } while (cellScanner.advance()); + encoder.flush(); + return true; + } + /** * Puts CellScanner Cells into a cell block using passed in codec and/or * compressor. @@ -96,7 +139,58 @@ public class IPCUtil { public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, final CellScanner cellScanner) throws IOException { - return buildCellBlock(codec, compressor, cellScanner, null); + if (cellScanner == null) return null; + if (codec == null) throw new CellScannerButNoCodecException(); + + // Then we need to make our own to return. + int bufferSize; + if (cellScanner instanceof HeapSize) { + long longSize = ((HeapSize)cellScanner).heapSize(); + // Just make sure we don't have a size bigger than an int. + if (longSize > CELLBLOCK_BYTE_COUNT_THRESHOLD) { + throw new IOException("Size " + longSize + " > " + CELLBLOCK_BYTE_COUNT_THRESHOLD); + } + bufferSize = ClassSize.align((int)longSize); + } else { + bufferSize = this.cellBlockBuildingInitialBufferSize; + } + + ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize); + ThresholdingOutputStream tos = new ThresholdingOutputStreamAdapter(baos); + OutputStream os = tos; + Compressor poolCompressor = null; + boolean written; + try { + if (compressor != null) { + if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); + poolCompressor = CodecPool.getCompressor(compressor); + os = compressor.createOutputStream(os, poolCompressor); + } + + written = encode(cellScanner, codec, os); + os.close(); + + } catch (BufferOverflowException e) { + throw new DoNotRetryIOException(e); + + } catch (IOException e) { + throw tos.isThresholdExceeded() ? new DoNotRetryIOException(e) : e; + + } finally { + try { os.close(); } catch (IOException ignore) {} + if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor); + } + + if (LOG.isTraceEnabled()) { + if (bufferSize < baos.size()) { + LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() + + "; up hbase.ipc.cellblock.building.initial.buffersize?"); + } + } + + // If no cells, don't mess around. Just return null (could be a bunch of existence checking + // gets or something -- stuff that does not return a cell). + return written ? baos.getByteBuffer() : null; } /** @@ -105,70 +199,55 @@ public class IPCUtil { * @param codec * @param compressor * @param cellScanner - * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate - * our own ByteBuffer. - * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using - * passed in codec and/or compressor; the returned buffer has been - * flipped and is ready for reading. Use limit to find total size. If pool was not - * null, then this returned ByteBuffer came from there and should be returned to the pool when - * done. + * @param pool Pool of ByteBuffers to make use of. Must be non-null. + * @return Null or a buffer filled with a cellblock filled with passed-in Cells encoded using + * passed in codec and/or compressor. * @throws IOException */ @SuppressWarnings("resource") - public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, + public GatheringBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, final CellScanner cellScanner, final BoundedByteBufferPool pool) throws IOException { if (cellScanner == null) return null; if (codec == null) throw new CellScannerButNoCodecException(); - int bufferSize = this.cellBlockBuildingInitialBufferSize; - ByteBufferOutputStream baos = null; - if (pool != null) { - ByteBuffer bb = pool.getBuffer(); - bufferSize = bb.capacity(); - baos = new ByteBufferOutputStream(bb); - } else { - // Then we need to make our own to return. - if (cellScanner instanceof HeapSize) { - long longSize = ((HeapSize)cellScanner).heapSize(); - // Just make sure we don't have a size bigger than an int. - if (longSize > Integer.MAX_VALUE) { - throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE); - } - bufferSize = ClassSize.align((int)longSize); - } - baos = new ByteBufferOutputStream(bufferSize); - } - OutputStream os = baos; + + GatheringBuffer buffer = new GatheringBuffer(pool); + ThresholdingOutputStream tos = new ThresholdingOutputStreamAdapter( + buffer.adaptOutputStream()); + OutputStream os = tos; Compressor poolCompressor = null; + boolean written; try { if (compressor != null) { if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); poolCompressor = CodecPool.getCompressor(compressor); os = compressor.createOutputStream(os, poolCompressor); } - Codec.Encoder encoder = codec.getEncoder(os); - int count = 0; - while (cellScanner.advance()) { - encoder.write(cellScanner.current()); - count++; - } - encoder.flush(); - // If no cells, don't mess around. Just return null (could be a bunch of existence checking - // gets or something -- stuff that does not return a cell). - if (count == 0) return null; - } catch (BufferOverflowException e) { - throw new DoNotRetryIOException(e); - } finally { + + written = encode(cellScanner, codec, os); os.close(); + + } catch (IOException e) { + buffer.dispose(); + throw tos.isThresholdExceeded() ? new DoNotRetryIOException(e) : e; + + } catch (Exception e) { + buffer.dispose(); + throw e; + + } finally { + try { os.close(); } catch (IOException ignore) {} if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor); } - if (LOG.isTraceEnabled()) { - if (bufferSize < baos.size()) { - LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() + - "; up hbase.ipc.cellblock.building.initial.buffersize?"); - } + + if (written) { + return buffer; } - return baos.getByteBuffer(); + + // If no cells, don't mess around. Just return null (could be a bunch of existence checking + // gets or something -- stuff that does not return a cell). + buffer.dispose(); + return null; } /** @@ -234,6 +313,20 @@ public class IPCUtil { */ public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException { if (m == null) return null; + return ByteBuffer.wrap(getDelimitedMessageAsByteArray(m)); + } + + private static final byte[] EMPTY_BYTE_ARRAY = {}; + + /** + * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its + * serialization. + * @return The passed in Message serialized with delimiter. + * Return an empty array if m is null + * @throws IOException + */ + public static byte[] getDelimitedMessageAsByteArray(final Message m) throws IOException { + if (m == null) return EMPTY_BYTE_ARRAY; int serializedSize = m.getSerializedSize(); int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize); byte [] buffer = new byte[serializedSize + vintSize]; @@ -243,7 +336,7 @@ public class IPCUtil { cos.writeMessageNoTag(m); cos.flush(); cos.checkNoSpaceLeft(); - return ByteBuffer.wrap(buffer); + return buffer; } /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/GatheringBuffer.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/GatheringBuffer.java new file mode 100644 index 0000000..569b8ce --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/GatheringBuffer.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public class GatheringBuffer { + private final BoundedByteBufferPool pool; + private final Queue bufferQueue = new LinkedList(); + private ByteBuffer busyBuffer; + + /** + * @param pool expected to provide non-zero capacity direct buffers + * @throws NullPointerException if {@code pool} is null + */ + // I wish the pool should have a more abstract type. + public GatheringBuffer(BoundedByteBufferPool pool) { + if (pool == null) { + throw new NullPointerException(); + } + this.pool = pool; + } + + public GatheringBuffer put(byte b) { + prepareBusyBuffer(); + busyBuffer.put(b); + return this; + } + + public GatheringBuffer put(byte[] src) { + return put(src, 0, src.length); + } + + public GatheringBuffer put(byte[] src, int offset, int length) { + if (length == 0) { + return this; + } + + while (true) { + prepareBusyBuffer(); + int remaining = busyBuffer.remaining(); + if (remaining >= length) { + busyBuffer.put(src, offset, length); + return this; + } + + busyBuffer.put(src, offset, remaining); + offset += remaining; + length -= remaining; + } + } + + public GatheringBuffer put(ByteBuffer src) { + if (! src.hasRemaining()) { + return this; + } + + while (true) { + prepareBusyBuffer(); + int remaining = busyBuffer.remaining(); + if (remaining >= src.remaining()) { + busyBuffer.put(src); + return this; + } + + ByteBuffer dup = src.duplicate(); + dup.limit(dup.position() + remaining); + busyBuffer.put(dup); + src.position(src.position() + remaining); + } + } + + /** + * The given {@code src} should be created with the same pool. + */ + public GatheringBuffer put(GatheringBuffer src) { + assert src.pool == pool; + + if (busyBuffer != null) { + busyBuffer.flip(); + bufferQueue.offer(busyBuffer); + busyBuffer = null; + } + + bufferQueue.addAll(src.bufferQueue); + src.bufferQueue.clear(); + + busyBuffer = src.busyBuffer; + src.busyBuffer = null; + + return this; + } + + private void prepareBusyBuffer() { + if (busyBuffer == null) { + busyBuffer = pool.getBuffer(); + + } else if (! busyBuffer.hasRemaining()) { + busyBuffer.flip(); + bufferQueue.offer(busyBuffer); + busyBuffer = pool.getBuffer(); + } + } + + /** + * @throws IOException thrown by {@code target} + */ + public long writeTo(GatheringByteChannel target) throws IOException { + if (bufferQueue.isEmpty()) { + if (busyBuffer == null) { + return 0; + } + + busyBuffer.flip(); + try { + return target.write(busyBuffer); + } finally { + if (busyBuffer.hasRemaining()) { + busyBuffer.compact(); + } else { + pool.putBuffer(busyBuffer); + busyBuffer = null; + } + } + } + + ByteBuffer[] buffers; + if (busyBuffer != null) { + busyBuffer.flip(); + buffers = bufferQueue.toArray(new ByteBuffer[bufferQueue.size() + 1]); + buffers[buffers.length - 1] = busyBuffer; + } else { + buffers = bufferQueue.toArray(new ByteBuffer[bufferQueue.size()]); + } + + try { + return target.write(buffers); + + } finally { + ByteBuffer buf; + while ((buf = bufferQueue.peek()) != null && ! buf.hasRemaining()) { + bufferQueue.poll(); + pool.putBuffer(buf); + } + + if(busyBuffer != null) { + if (busyBuffer.hasRemaining()) { + busyBuffer.compact(); + } else { + pool.putBuffer(busyBuffer); + busyBuffer = null; + } + } + } + } + + /** + * The given {@code target} should not be read-only and should have sufficient space + * (in other words, {@code target.remaining() >= this.length()}). + */ + public void writeTo(ByteBuffer target) { + ByteBuffer buf; + while ((buf = bufferQueue.poll()) != null) { + target.put(buf); + pool.putBuffer(buf); + } + + if (busyBuffer != null) { + busyBuffer.flip(); + target.put(busyBuffer); + pool.putBuffer(busyBuffer); + busyBuffer = null; + } + } + + public boolean isEmpty() { + if (busyBuffer != null && busyBuffer.position() > 0) { + return false; + } + + for (ByteBuffer buffer : bufferQueue) { + if (buffer.hasRemaining()) { + return false; + } + } + + return true; + } + + public long length() { + long count; + if (busyBuffer != null) { + count = busyBuffer.position(); + } else { + count = 0; + } + + for (ByteBuffer buffer : bufferQueue) { + count += buffer.remaining(); + } + + return count; + } + + public void clear() { + ByteBuffer buf; + while ((buf = bufferQueue.poll()) != null) { + pool.putBuffer(buf); + } + + if (busyBuffer != null) { + pool.putBuffer(busyBuffer); + busyBuffer = null; + } + } + + /** + * Intended to call this method before discarding this instance. + */ + public void dispose() { + clear(); + } + + /** + * Creates an output stream which affects contents in this buffer. + */ + public OutputStream adaptOutputStream() { + return new OutputStreamImpl(); + } + + private class OutputStreamImpl extends OutputStream { + @Override + public void write(int b) { + put((byte)b); + } + + @Override + public void write(byte[] b) { + put(b); + } + + @Override + public void write(byte[] b, int off, int len) { + put(b, off, len); + } + } +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestGatheringBuffer.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestGatheringBuffer.java new file mode 100644 index 0000000..f828720 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestGatheringBuffer.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; + +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ IOTests.class, SmallTests.class }) +public class TestGatheringBuffer implements GatheringByteChannel { + private BoundedByteBufferPool pool; + private GatheringBuffer buffer; + private ByteBuffer log; + + @Before + public void setUp() { + pool = new BoundedByteBufferPool(1, 1, 10); + buffer = new GatheringBuffer(pool); + log = ByteBuffer.allocate(100); + } + + @After + public void tearDown() { + buffer.dispose(); + } + + @Test + public void testPutByte() throws IOException { + buffer.put((byte)12); + Assert.assertEquals(1, buffer.writeTo(this)); + Assert.assertEquals(ByteBuffer.wrap(new byte[] { 12 }), log.flip()); + } + + @Test + public void testPutByteArray() throws IOException { + byte[] src = new byte[] { 1, 2, 3 }; + buffer.put(src); + Assert.assertEquals(3, buffer.writeTo(this)); + Assert.assertEquals(ByteBuffer.wrap(src), log.flip()); + } + + @Test + public void testPutByteArrayWithRange() throws IOException { + byte[] src = new byte[] { 1, 2, 3, 4, 5 }; + buffer.put(src, 1, 3); + Assert.assertEquals(3, buffer.writeTo(this)); + Assert.assertEquals(ByteBuffer.wrap(new byte[] { 2, 3, 4 }), log.flip()); + } + + @Test + public void testPutByteBuffer() throws IOException { + ByteBuffer src = ByteBuffer.wrap(new byte[] { 1, 2, 3 }); + buffer.put(src.duplicate()); + Assert.assertEquals(3, buffer.writeTo(this)); + Assert.assertEquals(src, log.flip()); + } + + @Test + public void testIsEmptyAndLength() throws IOException { + Assert.assertTrue(buffer.isEmpty()); + Assert.assertEquals(0, buffer.length()); + + byte[] src = new byte[] { 1, 2, 3 }; + buffer.put(src); + + Assert.assertFalse(buffer.isEmpty()); + Assert.assertEquals(3, buffer.length()); + + buffer.writeTo(this); + + Assert.assertTrue(buffer.isEmpty()); + Assert.assertEquals(0, buffer.length()); + } + + @Test + public void testPutGatheringBuffer() throws IOException { + byte[] src = new byte[] { 1, 2, 3 }; + buffer.put(src); + GatheringBuffer buffer2 = new GatheringBuffer(pool); + buffer2.put(buffer); + Assert.assertTrue(buffer.isEmpty()); + Assert.assertFalse(buffer2.isEmpty()); + Assert.assertEquals(3, buffer2.writeTo(this)); + Assert.assertEquals(ByteBuffer.wrap(src), log.flip()); + } + + @Test + public void testWriteToByteBuffer() throws IOException { + byte[] src = new byte[] { 1, 2, 3 }; + buffer.put(src); + byte[] target = new byte[(int)buffer.length()]; + buffer.writeTo(ByteBuffer.wrap(target)); + Assert.assertTrue(buffer.isEmpty()); + Assert.assertArrayEquals(src, target); + } + + @Test + public void testClear() throws IOException { + byte[] src = new byte[] { 1, 2, 3 }; + buffer.put(src); + + buffer.clear(); + + Assert.assertTrue(buffer.isEmpty()); + } + + @Test + public void testAdaptOutputStream() throws IOException { + OutputStream out = buffer.adaptOutputStream(); + out.write(new byte[] { 1, 2, 3 }); + buffer.writeTo(this); + Assert.assertEquals(ByteBuffer.wrap(new byte[] { 1, 2, 3 }), log.flip()); + } + + @Override + public int write(ByteBuffer src) { + int count = src.remaining(); + log.put(src); + return count; + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) { + long count = 0; + for (int i=0; i 0) this.metrics.sentBytes(count); return count; } @@ -2423,9 +2428,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } /** - * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)} - * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only - * one of readCh or writeCh should be non-null. + * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}. + * Only one of readCh or writeCh should be non-null. * * @param readCh read channel * @param writeCh write channel @@ -2433,7 +2437,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * @return bytes written * @throws java.io.IOException e * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer) - * @see #channelWrite(GatheringByteChannel, BufferChain) */ private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh,