diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 734227c..33f276d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -26,6 +26,7 @@ import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.ThresholdingOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.io.GatheringBuffer;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -81,6 +83,47 @@ public class IPCUtil {
*/
public static class CellScannerButNoCodecException extends HBaseIOException {};
+ // Enough small to enable other succeed processing.
+ private static final int CELLBLOCK_BYTE_COUNT_THRESHOLD = Integer.MAX_VALUE / 16;
+
+ private static class ThresholdingOutputStreamAdapter extends ThresholdingOutputStream {
+ final OutputStream out;
+
+ ThresholdingOutputStreamAdapter(OutputStream out) {
+ super(CELLBLOCK_BYTE_COUNT_THRESHOLD);
+ this.out = out;
+ }
+
+ @Override
+ protected OutputStream getStream() throws IOException {
+ return out;
+ }
+
+ @Override
+ protected void thresholdReached() throws IOException {
+ throw new IOException(
+ "The written byte count exceeded the threshold: " +
+ "count=" + getByteCount() + ", threshold=" + getThreshold());
+ }
+ }
+
+ /**
+ * @return true at least one cell has been written to {@code os}
+ */
+ private static boolean encode(CellScanner cellScanner, Codec codec, OutputStream os)
+ throws IOException {
+ if (! cellScanner.advance()) {
+ return false;
+ }
+
+ Codec.Encoder encoder = codec.getEncoder(os);
+ do {
+ encoder.write(cellScanner.current());
+ } while (cellScanner.advance());
+ encoder.flush();
+ return true;
+ }
+
/**
* Puts CellScanner Cells into a cell block using passed in codec and/or
* compressor.
@@ -96,7 +139,58 @@ public class IPCUtil {
public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
final CellScanner cellScanner)
throws IOException {
- return buildCellBlock(codec, compressor, cellScanner, null);
+ if (cellScanner == null) return null;
+ if (codec == null) throw new CellScannerButNoCodecException();
+
+ // Then we need to make our own to return.
+ int bufferSize;
+ if (cellScanner instanceof HeapSize) {
+ long longSize = ((HeapSize)cellScanner).heapSize();
+ // Just make sure we don't have a size bigger than an int.
+ if (longSize > CELLBLOCK_BYTE_COUNT_THRESHOLD) {
+ throw new IOException("Size " + longSize + " > " + CELLBLOCK_BYTE_COUNT_THRESHOLD);
+ }
+ bufferSize = ClassSize.align((int)longSize);
+ } else {
+ bufferSize = this.cellBlockBuildingInitialBufferSize;
+ }
+
+ ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
+ ThresholdingOutputStream tos = new ThresholdingOutputStreamAdapter(baos);
+ OutputStream os = tos;
+ Compressor poolCompressor = null;
+ boolean written;
+ try {
+ if (compressor != null) {
+ if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
+ poolCompressor = CodecPool.getCompressor(compressor);
+ os = compressor.createOutputStream(os, poolCompressor);
+ }
+
+ written = encode(cellScanner, codec, os);
+ os.close();
+
+ } catch (BufferOverflowException e) {
+ throw new DoNotRetryIOException(e);
+
+ } catch (IOException e) {
+ throw tos.isThresholdExceeded() ? new DoNotRetryIOException(e) : e;
+
+ } finally {
+ try { os.close(); } catch (IOException ignore) {}
+ if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
+ }
+
+ if (LOG.isTraceEnabled()) {
+ if (bufferSize < baos.size()) {
+ LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
+ "; up hbase.ipc.cellblock.building.initial.buffersize?");
+ }
+ }
+
+ // If no cells, don't mess around. Just return null (could be a bunch of existence checking
+ // gets or something -- stuff that does not return a cell).
+ return written ? baos.getByteBuffer() : null;
}
/**
@@ -105,70 +199,55 @@ public class IPCUtil {
* @param codec
* @param compressor
* @param cellScanner
- * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate
- * our own ByteBuffer.
- * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
- * passed in codec and/or compressor; the returned buffer has been
- * flipped and is ready for reading. Use limit to find total size. If pool was not
- * null, then this returned ByteBuffer came from there and should be returned to the pool when
- * done.
+ * @param pool Pool of ByteBuffers to make use of. Must be non-null.
+ * @return Null or a buffer filled with a cellblock filled with passed-in Cells encoded using
+ * passed in codec and/or compressor.
* @throws IOException
*/
@SuppressWarnings("resource")
- public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
+ public GatheringBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
final CellScanner cellScanner, final BoundedByteBufferPool pool)
throws IOException {
if (cellScanner == null) return null;
if (codec == null) throw new CellScannerButNoCodecException();
- int bufferSize = this.cellBlockBuildingInitialBufferSize;
- ByteBufferOutputStream baos = null;
- if (pool != null) {
- ByteBuffer bb = pool.getBuffer();
- bufferSize = bb.capacity();
- baos = new ByteBufferOutputStream(bb);
- } else {
- // Then we need to make our own to return.
- if (cellScanner instanceof HeapSize) {
- long longSize = ((HeapSize)cellScanner).heapSize();
- // Just make sure we don't have a size bigger than an int.
- if (longSize > Integer.MAX_VALUE) {
- throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
- }
- bufferSize = ClassSize.align((int)longSize);
- }
- baos = new ByteBufferOutputStream(bufferSize);
- }
- OutputStream os = baos;
+
+ GatheringBuffer buffer = new GatheringBuffer(pool);
+ ThresholdingOutputStream tos = new ThresholdingOutputStreamAdapter(
+ buffer.adaptOutputStream());
+ OutputStream os = tos;
Compressor poolCompressor = null;
+ boolean written;
try {
if (compressor != null) {
if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
poolCompressor = CodecPool.getCompressor(compressor);
os = compressor.createOutputStream(os, poolCompressor);
}
- Codec.Encoder encoder = codec.getEncoder(os);
- int count = 0;
- while (cellScanner.advance()) {
- encoder.write(cellScanner.current());
- count++;
- }
- encoder.flush();
- // If no cells, don't mess around. Just return null (could be a bunch of existence checking
- // gets or something -- stuff that does not return a cell).
- if (count == 0) return null;
- } catch (BufferOverflowException e) {
- throw new DoNotRetryIOException(e);
- } finally {
+
+ written = encode(cellScanner, codec, os);
os.close();
+
+ } catch (IOException e) {
+ buffer.dispose();
+ throw tos.isThresholdExceeded() ? new DoNotRetryIOException(e) : e;
+
+ } catch (Exception e) {
+ buffer.dispose();
+ throw e;
+
+ } finally {
+ try { os.close(); } catch (IOException ignore) {}
if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
}
- if (LOG.isTraceEnabled()) {
- if (bufferSize < baos.size()) {
- LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
- "; up hbase.ipc.cellblock.building.initial.buffersize?");
- }
+
+ if (written) {
+ return buffer;
}
- return baos.getByteBuffer();
+
+ // If no cells, don't mess around. Just return null (could be a bunch of existence checking
+ // gets or something -- stuff that does not return a cell).
+ buffer.dispose();
+ return null;
}
/**
@@ -234,6 +313,20 @@ public class IPCUtil {
*/
public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
if (m == null) return null;
+ return ByteBuffer.wrap(getDelimitedMessageAsByteArray(m));
+ }
+
+ private static final byte[] EMPTY_BYTE_ARRAY = {};
+
+ /**
+ * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its
+ * serialization.
+ * @return The passed in Message serialized with delimiter.
+ * Return an empty array if m is null
+ * @throws IOException
+ */
+ public static byte[] getDelimitedMessageAsByteArray(final Message m) throws IOException {
+ if (m == null) return EMPTY_BYTE_ARRAY;
int serializedSize = m.getSerializedSize();
int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
byte [] buffer = new byte[serializedSize + vintSize];
@@ -243,7 +336,7 @@ public class IPCUtil {
cos.writeMessageNoTag(m);
cos.flush();
cos.checkNoSpaceLeft();
- return ByteBuffer.wrap(buffer);
+ return buffer;
}
/**
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/GatheringBuffer.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/GatheringBuffer.java
new file mode 100644
index 0000000..569b8ce
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/GatheringBuffer.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class GatheringBuffer {
+ private final BoundedByteBufferPool pool;
+ private final Queue bufferQueue = new LinkedList();
+ private ByteBuffer busyBuffer;
+
+ /**
+ * @param pool expected to provide non-zero capacity direct buffers
+ * @throws NullPointerException if {@code pool} is null
+ */
+ // I wish the pool should have a more abstract type.
+ public GatheringBuffer(BoundedByteBufferPool pool) {
+ if (pool == null) {
+ throw new NullPointerException();
+ }
+ this.pool = pool;
+ }
+
+ public GatheringBuffer put(byte b) {
+ prepareBusyBuffer();
+ busyBuffer.put(b);
+ return this;
+ }
+
+ public GatheringBuffer put(byte[] src) {
+ return put(src, 0, src.length);
+ }
+
+ public GatheringBuffer put(byte[] src, int offset, int length) {
+ if (length == 0) {
+ return this;
+ }
+
+ while (true) {
+ prepareBusyBuffer();
+ int remaining = busyBuffer.remaining();
+ if (remaining >= length) {
+ busyBuffer.put(src, offset, length);
+ return this;
+ }
+
+ busyBuffer.put(src, offset, remaining);
+ offset += remaining;
+ length -= remaining;
+ }
+ }
+
+ public GatheringBuffer put(ByteBuffer src) {
+ if (! src.hasRemaining()) {
+ return this;
+ }
+
+ while (true) {
+ prepareBusyBuffer();
+ int remaining = busyBuffer.remaining();
+ if (remaining >= src.remaining()) {
+ busyBuffer.put(src);
+ return this;
+ }
+
+ ByteBuffer dup = src.duplicate();
+ dup.limit(dup.position() + remaining);
+ busyBuffer.put(dup);
+ src.position(src.position() + remaining);
+ }
+ }
+
+ /**
+ * The given {@code src} should be created with the same pool.
+ */
+ public GatheringBuffer put(GatheringBuffer src) {
+ assert src.pool == pool;
+
+ if (busyBuffer != null) {
+ busyBuffer.flip();
+ bufferQueue.offer(busyBuffer);
+ busyBuffer = null;
+ }
+
+ bufferQueue.addAll(src.bufferQueue);
+ src.bufferQueue.clear();
+
+ busyBuffer = src.busyBuffer;
+ src.busyBuffer = null;
+
+ return this;
+ }
+
+ private void prepareBusyBuffer() {
+ if (busyBuffer == null) {
+ busyBuffer = pool.getBuffer();
+
+ } else if (! busyBuffer.hasRemaining()) {
+ busyBuffer.flip();
+ bufferQueue.offer(busyBuffer);
+ busyBuffer = pool.getBuffer();
+ }
+ }
+
+ /**
+ * @throws IOException thrown by {@code target}
+ */
+ public long writeTo(GatheringByteChannel target) throws IOException {
+ if (bufferQueue.isEmpty()) {
+ if (busyBuffer == null) {
+ return 0;
+ }
+
+ busyBuffer.flip();
+ try {
+ return target.write(busyBuffer);
+ } finally {
+ if (busyBuffer.hasRemaining()) {
+ busyBuffer.compact();
+ } else {
+ pool.putBuffer(busyBuffer);
+ busyBuffer = null;
+ }
+ }
+ }
+
+ ByteBuffer[] buffers;
+ if (busyBuffer != null) {
+ busyBuffer.flip();
+ buffers = bufferQueue.toArray(new ByteBuffer[bufferQueue.size() + 1]);
+ buffers[buffers.length - 1] = busyBuffer;
+ } else {
+ buffers = bufferQueue.toArray(new ByteBuffer[bufferQueue.size()]);
+ }
+
+ try {
+ return target.write(buffers);
+
+ } finally {
+ ByteBuffer buf;
+ while ((buf = bufferQueue.peek()) != null && ! buf.hasRemaining()) {
+ bufferQueue.poll();
+ pool.putBuffer(buf);
+ }
+
+ if(busyBuffer != null) {
+ if (busyBuffer.hasRemaining()) {
+ busyBuffer.compact();
+ } else {
+ pool.putBuffer(busyBuffer);
+ busyBuffer = null;
+ }
+ }
+ }
+ }
+
+ /**
+ * The given {@code target} should not be read-only and should have sufficient space
+ * (in other words, {@code target.remaining() >= this.length()}).
+ */
+ public void writeTo(ByteBuffer target) {
+ ByteBuffer buf;
+ while ((buf = bufferQueue.poll()) != null) {
+ target.put(buf);
+ pool.putBuffer(buf);
+ }
+
+ if (busyBuffer != null) {
+ busyBuffer.flip();
+ target.put(busyBuffer);
+ pool.putBuffer(busyBuffer);
+ busyBuffer = null;
+ }
+ }
+
+ public boolean isEmpty() {
+ if (busyBuffer != null && busyBuffer.position() > 0) {
+ return false;
+ }
+
+ for (ByteBuffer buffer : bufferQueue) {
+ if (buffer.hasRemaining()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public long length() {
+ long count;
+ if (busyBuffer != null) {
+ count = busyBuffer.position();
+ } else {
+ count = 0;
+ }
+
+ for (ByteBuffer buffer : bufferQueue) {
+ count += buffer.remaining();
+ }
+
+ return count;
+ }
+
+ public void clear() {
+ ByteBuffer buf;
+ while ((buf = bufferQueue.poll()) != null) {
+ pool.putBuffer(buf);
+ }
+
+ if (busyBuffer != null) {
+ pool.putBuffer(busyBuffer);
+ busyBuffer = null;
+ }
+ }
+
+ /**
+ * Intended to call this method before discarding this instance.
+ */
+ public void dispose() {
+ clear();
+ }
+
+ /**
+ * Creates an output stream which affects contents in this buffer.
+ */
+ public OutputStream adaptOutputStream() {
+ return new OutputStreamImpl();
+ }
+
+ private class OutputStreamImpl extends OutputStream {
+ @Override
+ public void write(int b) {
+ put((byte)b);
+ }
+
+ @Override
+ public void write(byte[] b) {
+ put(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) {
+ put(b, off, len);
+ }
+ }
+}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestGatheringBuffer.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestGatheringBuffer.java
new file mode 100644
index 0000000..f828720
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestGatheringBuffer.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ IOTests.class, SmallTests.class })
+public class TestGatheringBuffer implements GatheringByteChannel {
+ private BoundedByteBufferPool pool;
+ private GatheringBuffer buffer;
+ private ByteBuffer log;
+
+ @Before
+ public void setUp() {
+ pool = new BoundedByteBufferPool(1, 1, 10);
+ buffer = new GatheringBuffer(pool);
+ log = ByteBuffer.allocate(100);
+ }
+
+ @After
+ public void tearDown() {
+ buffer.dispose();
+ }
+
+ @Test
+ public void testPutByte() throws IOException {
+ buffer.put((byte)12);
+ Assert.assertEquals(1, buffer.writeTo(this));
+ Assert.assertEquals(ByteBuffer.wrap(new byte[] { 12 }), log.flip());
+ }
+
+ @Test
+ public void testPutByteArray() throws IOException {
+ byte[] src = new byte[] { 1, 2, 3 };
+ buffer.put(src);
+ Assert.assertEquals(3, buffer.writeTo(this));
+ Assert.assertEquals(ByteBuffer.wrap(src), log.flip());
+ }
+
+ @Test
+ public void testPutByteArrayWithRange() throws IOException {
+ byte[] src = new byte[] { 1, 2, 3, 4, 5 };
+ buffer.put(src, 1, 3);
+ Assert.assertEquals(3, buffer.writeTo(this));
+ Assert.assertEquals(ByteBuffer.wrap(new byte[] { 2, 3, 4 }), log.flip());
+ }
+
+ @Test
+ public void testPutByteBuffer() throws IOException {
+ ByteBuffer src = ByteBuffer.wrap(new byte[] { 1, 2, 3 });
+ buffer.put(src.duplicate());
+ Assert.assertEquals(3, buffer.writeTo(this));
+ Assert.assertEquals(src, log.flip());
+ }
+
+ @Test
+ public void testIsEmptyAndLength() throws IOException {
+ Assert.assertTrue(buffer.isEmpty());
+ Assert.assertEquals(0, buffer.length());
+
+ byte[] src = new byte[] { 1, 2, 3 };
+ buffer.put(src);
+
+ Assert.assertFalse(buffer.isEmpty());
+ Assert.assertEquals(3, buffer.length());
+
+ buffer.writeTo(this);
+
+ Assert.assertTrue(buffer.isEmpty());
+ Assert.assertEquals(0, buffer.length());
+ }
+
+ @Test
+ public void testPutGatheringBuffer() throws IOException {
+ byte[] src = new byte[] { 1, 2, 3 };
+ buffer.put(src);
+ GatheringBuffer buffer2 = new GatheringBuffer(pool);
+ buffer2.put(buffer);
+ Assert.assertTrue(buffer.isEmpty());
+ Assert.assertFalse(buffer2.isEmpty());
+ Assert.assertEquals(3, buffer2.writeTo(this));
+ Assert.assertEquals(ByteBuffer.wrap(src), log.flip());
+ }
+
+ @Test
+ public void testWriteToByteBuffer() throws IOException {
+ byte[] src = new byte[] { 1, 2, 3 };
+ buffer.put(src);
+ byte[] target = new byte[(int)buffer.length()];
+ buffer.writeTo(ByteBuffer.wrap(target));
+ Assert.assertTrue(buffer.isEmpty());
+ Assert.assertArrayEquals(src, target);
+ }
+
+ @Test
+ public void testClear() throws IOException {
+ byte[] src = new byte[] { 1, 2, 3 };
+ buffer.put(src);
+
+ buffer.clear();
+
+ Assert.assertTrue(buffer.isEmpty());
+ }
+
+ @Test
+ public void testAdaptOutputStream() throws IOException {
+ OutputStream out = buffer.adaptOutputStream();
+ out.write(new byte[] { 1, 2, 3 });
+ buffer.writeTo(this);
+ Assert.assertEquals(ByteBuffer.wrap(new byte[] { 1, 2, 3 }), log.flip());
+ }
+
+ @Override
+ public int write(ByteBuffer src) {
+ int count = src.remaining();
+ log.put(src);
+ return count;
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) {
+ long count = 0;
+ for (int i=0; i 0) this.metrics.sentBytes(count);
return count;
}
@@ -2423,9 +2428,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
/**
- * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
- * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
- * one of readCh or writeCh should be non-null.
+ * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}.
+ * Only one of readCh or writeCh should be non-null.
*
* @param readCh read channel
* @param writeCh write channel
@@ -2433,7 +2437,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
* @return bytes written
* @throws java.io.IOException e
* @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
- * @see #channelWrite(GatheringByteChannel, BufferChain)
*/
private static int channelIO(ReadableByteChannel readCh,
WritableByteChannel writeCh,