.../hadoop/hbase/io/ByteArrayOutputStream.java | 6 +
.../hadoop/hbase/io/ByteBuffOutputStream.java | 71 +++++++++
.../hbase/io/ByteBufferListOutputStream.java | 5 +
.../hadoop/hbase/io/ByteBufferOutputStream.java | 5 +
.../apache/hadoop/hbase/io/ByteBufferWriter.java | 8 ++
.../hbase/io/ByteBufferWriterDataOutputStream.java | 5 +
.../hbase/io/ByteBufferWriterOutputStream.java | 5 +
.../java/org/apache/hadoop/hbase/nio/ByteBuff.java | 9 ++
.../org/apache/hadoop/hbase/nio/MultiByteBuff.java | 13 ++
.../apache/hadoop/hbase/nio/SingleByteBuff.java | 6 +
.../apache/hadoop/hbase/nio/TestMultiByteBuff.java | 18 +++
.../com/google/protobuf/CodedOutputStream.java | 1 +
.../apache/hadoop/hbase/ipc/RpcCallContext.java | 6 +
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 159 +++++++++++++++++----
.../regionserver/wal/AsyncProtobufLogWriter.java | 5 +
.../client/TestFromClientSideAsNonJavaClient.java | 54 +++++++
.../org/apache/hadoop/hbase/ipc/TestRpcServer.java | 28 ++++
17 files changed, 373 insertions(+), 31 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
index 22eb156..b441617 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io;
+import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
@@ -56,6 +57,11 @@ public class ByteArrayOutputStream extends OutputStream implements ByteBufferWri
}
@Override
+ public void write(ByteBuffer b) throws IOException {
+ write(b, b.position(), b.remaining());
+ }
+
+ @Override
public void writeInt(int i) {
checkSizeAndGrow(Bytes.SIZEOF_INT);
Bytes.putInt(this.buf, this.pos, i);
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffOutputStream.java
new file mode 100644
index 0000000..4431d7d
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffOutputStream.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+
+@InterfaceAudience.Private
+public class ByteBuffOutputStream extends OutputStream implements ByteBufferWriter {
+
+ private ByteBuff buff;
+
+ public ByteBuffOutputStream(ByteBuff buff) {
+ this.buff = buff;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ this.buff.put((byte) b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ this.buff.put(b, off, len);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(ByteBuffer b, int off, int len) throws IOException {
+ // Take duplicate
+ ByteBuffer bb = b.duplicate();
+ bb.position(off);
+ bb.limit(off + len);
+ write(bb);
+ }
+
+ @Override
+ public void write(ByteBuffer b) throws IOException {
+ this.buff.put(b);
+ }
+
+ @Override
+ public void writeInt(int val) throws IOException {
+ this.buff.putInt(val);
+
+ }
+
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
index c334a5a..191add9 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
@@ -174,4 +174,9 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream {
}
}
}
+
+ @Override
+ public void write(ByteBuffer b) throws IOException {
+ write(b, b.position(), b.remaining());
+ }
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
index f6f7def..4faf6cf 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
@@ -149,6 +149,11 @@ public class ByteBufferOutputStream extends OutputStream
ByteBufferUtils.copyFromBufferToBuffer(b, curBuf, off, len);
}
+ @Override
+ public void write(ByteBuffer b) throws IOException {
+ write(b, b.position(), b.remaining());
+ }
+
/**
* Writes an int to the underlying output stream as four
* bytes, high byte first.
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java
index 012080c..94da2a2 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java
@@ -31,6 +31,14 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
public interface ByteBufferWriter {
/**
+ * Writes len bytes from the specified ByteBuffer starting at the current position.
+ * Note that it does not change the position of the specified ByteBuffer
+ * @param b the data.
+ * @exception IOException if an I/O error occurs.
+ */
+ void write(ByteBuffer b) throws IOException;
+
+ /**
* Writes len bytes from the specified ByteBuffer starting at offset off
*
* @param b the data.
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java
index 7ddb8a9..2ed394a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java
@@ -41,4 +41,9 @@ public class ByteBufferWriterDataOutputStream extends DataOutputStream
ByteBufferUtils.copyBufferToStream(out, b, off, len);
written += len;
}
+
+ @Override
+ public void write(ByteBuffer b) throws IOException {
+ write(b, b.position(), b.remaining());
+ }
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java
index 56c6956..ae10a57 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java
@@ -65,6 +65,11 @@ public class ByteBufferWriterOutputStream extends OutputStream
}
@Override
+ public void write(ByteBuffer b) throws IOException {
+ write(b, b.position(), b.remaining());
+ }
+
+ @Override
public void writeInt(int i) throws IOException {
StreamUtils.writeInt(this.os, i);
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
index 60202a0..e12844a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -410,6 +410,15 @@ public abstract class ByteBuff {
*/
public abstract int read(ReadableByteChannel channel) throws IOException;
+ /**
+ * Copies from the given bytebuffer 'buff' to this ByteBuff. The position of this ByteBuff will be
+ * incremented to the limit of the given buffer and the position of the given 'buff' will be now
+ * set to the limit.
+ * @param buff the bytebuffer to copy
+ * @return this ByteBuffer
+ */
+ public abstract ByteBuff put(ByteBuffer buff);
+
// static helper methods
public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException {
if (buf.remaining() <= NIO_BUFFER_LIMIT) {
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
index 948321d..6655142 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
@@ -856,6 +856,19 @@ public class MultiByteBuff extends ByteBuff {
return this;
}
+ @Override
+ public ByteBuff put(ByteBuffer buff) {
+ if (this.curItem.remaining() >= buff.remaining()) {
+ ByteBufferUtils.copyFromBufferToBuffer(buff, this.curItem);
+ } else {
+ // a slower version
+ while (buff.hasRemaining()) {
+ this.put(buff.get());
+ }
+ }
+ return this;
+ }
+
/**
* Writes a long to this MBB at its current position. Also advances the position by size of long
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
index 0e45410..b1e591f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
@@ -219,6 +219,12 @@ public class SingleByteBuff extends ByteBuff {
}
@Override
+ public ByteBuff put(ByteBuffer buff) {
+ ByteBufferUtils.copyFromBufferToBuffer(buff, this.buf);
+ return this;
+ }
+
+ @Override
public SingleByteBuff put(byte[] src) {
return put(src, 0, src.length);
}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java
index af4c464..9658544 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java
@@ -403,4 +403,22 @@ public class TestMultiByteBuff {
mbb1.get(); // Now we have reached the limit
assertFalse(mbb1.hasRemaining());
}
+
+ @Test
+ public void testPutByteBuffer() {
+ ByteBuffer b1 = ByteBuffer.allocate(8);
+ ByteBuffer b2 = ByteBuffer.allocate(8);
+ ByteBuffer b3 = ByteBuffer.allocate(8);
+ ByteBuffer toPut = ByteBuffer.allocate(8);
+ toPut.putInt(100);
+ toPut.putInt(500);
+ toPut.flip();
+ MultiByteBuff mbb1 = new MultiByteBuff(b1, b2, b3);
+ mbb1.position(8);
+ mbb1.put(toPut);
+ assertEquals(16, mbb1.position());
+ mbb1.position(8);
+ assertEquals(100, mbb1.getInt());
+ assertEquals(500, mbb1.getInt());
+ }
}
diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/com/google/protobuf/CodedOutputStream.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/com/google/protobuf/CodedOutputStream.java
index 03871c9..9e06140 100644
--- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/com/google/protobuf/CodedOutputStream.java
+++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/com/google/protobuf/CodedOutputStream.java
@@ -217,6 +217,7 @@ public abstract class CodedOutputStream extends ByteOutput {
* Setting this to {@code 0} will disable buffering, requiring an allocation for each encoded
* string.
*/
+ // TODO : We need get this public access when we push our changes to PB
static CodedOutputStream newInstance(ByteOutput byteOutput, int bufferSize) {
if (bufferSize < 0) {
throw new IllegalArgumentException("bufferSize must be positive");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
index 9bc8ee7..a453273 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
@@ -100,4 +100,10 @@ public interface RpcCallContext {
* @return The system timestamp of deadline.
*/
long getDeadline();
+
+ /**
+ * Indicates if the current rpc call context is for read. The read API need to set this
+ * @return true if the current rpc call context is for read
+ */
+ boolean isReadRequest();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index a17310c..50eccdd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -93,6 +93,7 @@ import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
+import org.apache.hadoop.hbase.io.ByteBuffOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
@@ -353,6 +354,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected TraceInfo tinfo;
private ByteBufferListOutputStream cellBlockStream = null;
private CallCleanup reqCleanup = null;
+ private CallCleanup responseCleanup = null;
private User user;
private InetAddress remoteAddress;
@@ -402,6 +404,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// got from pool.
this.cellBlockStream = null;
}
+ if (responseCleanup != null) {
+ responseCleanup.run();
+ responseCleanup = null;
+ }
cleanup();// If the call was run successfuly, we might have already returned the
// BB back to pool. No worries..Then inputCellBlock will be null
this.connection.decRpcCount(); // Say that we're done with this call.
@@ -509,20 +515,22 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
}
Message header = headerBuilder.build();
- ByteBuffer headerBuf =
+ ByteBuffer[] hdrMsgBuffers =
createHeaderAndMessageBytes(result, header, cellBlockSize, cellBlock);
ByteBuffer[] responseBufs = null;
int cellBlockBufferSize = 0;
if (cellBlock != null) {
cellBlockBufferSize = cellBlock.size();
- responseBufs = new ByteBuffer[1 + cellBlockBufferSize];
- } else {
- responseBufs = new ByteBuffer[1];
}
- responseBufs[0] = headerBuf;
+ responseBufs = new ByteBuffer[hdrMsgBuffers.length + cellBlockBufferSize];
+ int i = 0;
+ // add all the header/result buffers
+ for (; i < hdrMsgBuffers.length; i++) {
+ responseBufs[i] = hdrMsgBuffers[i];
+ }
if (cellBlock != null) {
- for (int i = 0; i < cellBlockBufferSize; i++) {
- responseBufs[i + 1] = cellBlock.get(i);
+ for (int j = 0; j < cellBlockBufferSize; j++) {
+ responseBufs[i++] = cellBlock.get(j);
}
}
bc = new BufferChain(responseBufs);
@@ -563,7 +571,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
headerBuilder.setException(exceptionBuilder.build());
}
- private ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
+ private ByteBuffer[] createHeaderAndMessageBytes(Message result, Message header,
int cellBlockSize, List cellBlock) throws IOException {
// Organize the response as a set of bytebuffers rather than collect it all together inside
// one big byte array; save on allocations.
@@ -572,8 +580,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// the last buffer in the cellblock. This applies to the cellblock created from the
// pool or even the onheap cellblock buffer in case there is no pool enabled.
// Possible reuse would avoid creating a temporary array for storing the header every time.
- ByteBuffer possiblePBBuf =
- (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null;
+ ByteBuffer[] buffs = null;
int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
resultVintSize = 0;
if (header != null) {
@@ -586,31 +593,73 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
// calculate the total size
int totalSize = headerSerializedSize + headerVintSize
- + (resultSerializedSize + resultVintSize)
- + cellBlockSize;
+ + (resultSerializedSize + resultVintSize) + cellBlockSize;
int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize
+ resultVintSize + Bytes.SIZEOF_INT;
- // Only if the last buffer has enough space for header use it. Else allocate
- // a new buffer. Assume they are all flipped
- if (possiblePBBuf != null
- && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) {
- // duplicate the buffer. This is where the header is going to be written
- ByteBuffer pbBuf = possiblePBBuf.duplicate();
- // get the current limit
- int limit = pbBuf.limit();
- // Position such that we write the header to the end of the buffer
- pbBuf.position(limit);
- // limit to the header size
- pbBuf.limit(totalPBSize + limit);
- // mark the current position
- pbBuf.mark();
- writeToCOS(result, header, totalSize, pbBuf);
- // reset the buffer back to old position
- pbBuf.reset();
- return pbBuf;
+ // TODO : Should we check for totalPBSize >= minSizeForReservoirUse
+ if (cellBlockStream == null && isReadRequest() && !isClientCellBlockSupported()) {
+ // we know that the result that we are processing is for scan/get and the client
+ // is a non java client.
+ // We get them as ByteBuffer[] because we need ByteBuffer[] to be returned for the
+ // BufferChain.
+ Pair buffers =
+ allocateByteBufferArrayToReadInto(reservoir, minSizeForReservoirUse, totalPBSize);
+ buffs = buffers.getFirst();
+ responseCleanup = buffers.getSecond();
+ ByteBuff byteBuffs;
+ if (buffs.length > 1) {
+ byteBuffs = new MultiByteBuff(buffs);
+ } else {
+ // We are backed by single BB
+ byteBuffs = new SingleByteBuff(buffs[0]);
+ }
+ byteBuffs.limit(totalPBSize);
+ CodedOutputStream bcos =
+ CodedOutputStream.newInstance(new ByteBuffOutputStream(byteBuffs), totalPBSize);
+ byteBuffs.putInt(totalSize);
+ if (header != null) {
+ bcos.writeMessageNoTag(header);
+ }
+ if (result != null) {
+ bcos.writeMessageNoTag(result);
+ }
+ bcos.flush();
+ if(byteBuffs.hasRemaining()) {
+ // TODO : Actually bcos should be doing it??
+ // or is it ok to check on ByteBuffs directly?
+ throw new IllegalArgumentException("Not fully written");
+ }
+ // flip all the buffers
+ for (ByteBuffer buf : buffs) {
+ buf.flip();
+ }
} else {
- return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize);
+ ByteBuffer possiblePBBuf = (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null;
+ // Only if the last buffer has enough space for header use it. Else allocate
+ // a new buffer. Assume they are all flipped
+ buffs = new ByteBuffer[1];
+ if (possiblePBBuf != null
+ && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) {
+ // duplicate the buffer. This is where the header is going to be written
+ ByteBuffer pbBuf = possiblePBBuf.duplicate();
+ // get the current limit
+ int limit = pbBuf.limit();
+ // Position such that we write the header to the end of the buffer
+ pbBuf.position(limit);
+ // limit to the header size
+ pbBuf.limit(totalPBSize + limit);
+ // mark the current position
+ pbBuf.mark();
+ writeToCOS(result, header, totalSize, pbBuf);
+ // reset the buffer back to old position
+ pbBuf.reset();
+ buffs[0] = pbBuf;
+ } else {
+ ByteBuffer res = createHeaderAndMessageBytes(result, header, totalSize, totalPBSize);
+ buffs[0] = res;
+ }
}
+ return buffs;
}
private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
@@ -725,6 +774,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
@Override
+ public synchronized boolean isReadRequest() {
+ return this.rpcCallback != null;
+ }
+
+ @Override
public String getRequestUserName() {
User user = getRequestUser();
return user == null? null: user.getShortName();
@@ -2906,6 +2960,49 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
/**
+ * This is extracted to a static method for better unit testing. We try to get buffer(s) from pool
+ * as much as possible.
+ *
+ * @param pool The ByteBufferPool to use
+ * @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer
+ * need of size below this, create on heap ByteBuffer.
+ * @param reqLen Bytes count in request
+ */
+ @VisibleForTesting
+ static Pair allocateByteBufferArrayToReadInto(ByteBufferPool pool,
+ int minSizeForPoolUse, int reqLen) {
+ List bbs = new ArrayList((reqLen / pool.getBufferSize()) + 1);
+ int remain = reqLen;
+ ByteBuffer buf = null;
+ while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) {
+ bbs.add(buf);
+ remain -= pool.getBufferSize();
+ }
+ ByteBuffer[] bufsFromPool = null;
+ if (bbs.size() > 0) {
+ bufsFromPool = new ByteBuffer[bbs.size()];
+ bbs.toArray(bufsFromPool);
+ }
+ if (remain > 0) {
+ bbs.add(ByteBuffer.allocate(remain));
+ }
+ ByteBuffer[] items = new ByteBuffer[bbs.size()];
+ // convert to array
+ bbs.toArray(items);
+ if (bufsFromPool != null) {
+ final ByteBuffer[] bufsFromPoolFinal = bufsFromPool;
+ final ByteBuffer[] itemsFinal = items;
+ return new Pair(itemsFinal, () -> {
+ // Return back all the BBs to pool
+ for (int i = 0; i < bufsFromPoolFinal.length; i++) {
+ pool.putbackBuffer(bufsFromPoolFinal[i]);
+ }
+ });
+ }
+ return new Pair(items, null);
+ }
+
+ /**
* Needed for features such as delayed calls. We need to be able to store the current call
* so that we can complete it later or ask questions of what is supported by the current ongoing
* call.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index a0ac8a2..767ba6f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -82,6 +82,11 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
}
@Override
+ public void write(ByteBuffer b) throws IOException {
+ write(b, b.position(), b.remaining());
+ }
+
+ @Override
public void writeInt(int i) throws IOException {
out.writeInt(i);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsNonJavaClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsNonJavaClient.java
new file mode 100644
index 0000000..2ca0508
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsNonJavaClient.java
@@ -0,0 +1,54 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Run tests that use the HBase clients; {@link Table}.
+ * Sets up the HBase mini cluster once at start and runs through all client tests.
+ * Each creates a table named for the method and does its stuff against that.
+ * The test ensures that the client acts as non-java client
+ */
+@Category({LargeTests.class, ClientTests.class})
+@SuppressWarnings ("deprecation")
+public class TestFromClientSideAsNonJavaClient extends TestFromClientSide {
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Uncomment the following lines if more verbosity is needed for
+ // debugging (see HBASE-12285 for details).
+ // ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
+ // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
+ // ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ MultiRowMutationEndpoint.class.getName());
+ conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests
+ // set no codec - so that it imitates a non java client
+ conf.set("hbase.client.default.rpc.codec", "");
+ // We need more than one region server in this test
+ TEST_UTIL.startMiniCluster(SLAVES);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
index 6fd65f2..19b6aef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java
@@ -23,15 +23,22 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ByteBuffOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -125,6 +132,27 @@ public class TestRpcServer {
assertNull(pair.getSecond());
}
+ @Test
+ public void testByteBuffCOS() throws IOException {
+ KeyValue kv1 =
+ new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), new byte[30]);
+ ByteBuffer dbb1 = ByteBuffer.allocateDirect(3);
+ ByteBuffer dbb2 = ByteBuffer.allocateDirect(3);
+ ByteBuffer dbb3 = ByteBuffer.allocateDirect(3);
+ ByteBuffer dbb4 = ByteBuffer.allocateDirect(3);
+ ByteBuffer dbb5 = ByteBuffer.allocateDirect(50);
+ MultiByteBuff mbb = new MultiByteBuff(dbb1, dbb2, dbb3, dbb4, dbb5);
+ CellProtos.Cell cell = ProtobufUtil.toCell(kv1);
+ int serializedSize = cell.getSerializedSize();
+ int computeRawVarint32Size = CodedOutputStream.computeRawVarint32Size(serializedSize);
+ int totalSize = serializedSize + computeRawVarint32Size;
+ mbb.limit(totalSize);
+ CodedOutputStream cos = CodedOutputStream.newInstance(new ByteBuffOutputStream(mbb), totalSize);
+ cos.writeMessageNoTag(cell);
+ cos.flush();
+ assertEquals(totalSize, mbb.position());
+ }
+
private void initPoolWithAllBuffers(ByteBufferPool pool, int maxBuffersInPool) {
ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool];
// Just call getBuffer() on pool 'maxBuffersInPool' so as to init all buffers and then put back