.../hadoop/hbase/io/ByteArrayOutputStream.java | 6 + .../hadoop/hbase/io/ByteBuffOutputStream.java | 71 +++++++++ .../hbase/io/ByteBufferListOutputStream.java | 5 + .../hadoop/hbase/io/ByteBufferOutputStream.java | 5 + .../apache/hadoop/hbase/io/ByteBufferWriter.java | 8 ++ .../hbase/io/ByteBufferWriterDataOutputStream.java | 5 + .../hbase/io/ByteBufferWriterOutputStream.java | 5 + .../java/org/apache/hadoop/hbase/nio/ByteBuff.java | 9 ++ .../org/apache/hadoop/hbase/nio/MultiByteBuff.java | 13 ++ .../apache/hadoop/hbase/nio/SingleByteBuff.java | 6 + .../apache/hadoop/hbase/nio/TestMultiByteBuff.java | 18 +++ .../com/google/protobuf/CodedOutputStream.java | 1 + .../apache/hadoop/hbase/ipc/RpcCallContext.java | 6 + .../org/apache/hadoop/hbase/ipc/RpcServer.java | 159 +++++++++++++++++---- .../regionserver/wal/AsyncProtobufLogWriter.java | 5 + .../client/TestFromClientSideAsNonJavaClient.java | 54 +++++++ .../org/apache/hadoop/hbase/ipc/TestRpcServer.java | 28 ++++ 17 files changed, 373 insertions(+), 31 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java index 22eb156..b441617 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io; +import java.io.IOException; import java.io.OutputStream; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; @@ -56,6 +57,11 @@ public class ByteArrayOutputStream extends OutputStream implements ByteBufferWri } @Override + public void write(ByteBuffer b) throws IOException { + write(b, b.position(), b.remaining()); + } + + @Override public void writeInt(int i) { checkSizeAndGrow(Bytes.SIZEOF_INT); Bytes.putInt(this.buf, this.pos, i); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffOutputStream.java new file mode 100644 index 0000000..4431d7d --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffOutputStream.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.ByteBuff; + +@InterfaceAudience.Private +public class ByteBuffOutputStream extends OutputStream implements ByteBufferWriter { + + private ByteBuff buff; + + public ByteBuffOutputStream(ByteBuff buff) { + this.buff = buff; + } + + @Override + public void write(int b) throws IOException { + this.buff.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + this.buff.put(b, off, len); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(ByteBuffer b, int off, int len) throws IOException { + // Take duplicate + ByteBuffer bb = b.duplicate(); + bb.position(off); + bb.limit(off + len); + write(bb); + } + + @Override + public void write(ByteBuffer b) throws IOException { + this.buff.put(b); + } + + @Override + public void writeInt(int val) throws IOException { + this.buff.putInt(val); + + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java index c334a5a..191add9 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java @@ -174,4 +174,9 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream { } } } + + @Override + public void write(ByteBuffer b) throws IOException { + write(b, b.position(), b.remaining()); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java index f6f7def..4faf6cf 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java @@ -149,6 +149,11 @@ public class ByteBufferOutputStream extends OutputStream ByteBufferUtils.copyFromBufferToBuffer(b, curBuf, off, len); } + @Override + public void write(ByteBuffer b) throws IOException { + write(b, b.position(), b.remaining()); + } + /** * Writes an int to the underlying output stream as four * bytes, high byte first. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java index 012080c..94da2a2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java @@ -31,6 +31,14 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; public interface ByteBufferWriter { /** + * Writes len bytes from the specified ByteBuffer starting at the current position. + * Note that it does not change the position of the specified ByteBuffer + * @param b the data. + * @exception IOException if an I/O error occurs. + */ + void write(ByteBuffer b) throws IOException; + + /** * Writes len bytes from the specified ByteBuffer starting at offset off * * @param b the data. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java index 7ddb8a9..2ed394a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterDataOutputStream.java @@ -41,4 +41,9 @@ public class ByteBufferWriterDataOutputStream extends DataOutputStream ByteBufferUtils.copyBufferToStream(out, b, off, len); written += len; } + + @Override + public void write(ByteBuffer b) throws IOException { + write(b, b.position(), b.remaining()); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java index 56c6956..ae10a57 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriterOutputStream.java @@ -65,6 +65,11 @@ public class ByteBufferWriterOutputStream extends OutputStream } @Override + public void write(ByteBuffer b) throws IOException { + write(b, b.position(), b.remaining()); + } + + @Override public void writeInt(int i) throws IOException { StreamUtils.writeInt(this.os, i); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java index 60202a0..e12844a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java @@ -410,6 +410,15 @@ public abstract class ByteBuff { */ public abstract int read(ReadableByteChannel channel) throws IOException; + /** + * Copies from the given bytebuffer 'buff' to this ByteBuff. The position of this ByteBuff will be + * incremented to the limit of the given buffer and the position of the given 'buff' will be now + * set to the limit. + * @param buff the bytebuffer to copy + * @return this ByteBuffer + */ + public abstract ByteBuff put(ByteBuffer buff); + // static helper methods public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException { if (buf.remaining() <= NIO_BUFFER_LIMIT) { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java index 948321d..6655142 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java @@ -856,6 +856,19 @@ public class MultiByteBuff extends ByteBuff { return this; } + @Override + public ByteBuff put(ByteBuffer buff) { + if (this.curItem.remaining() >= buff.remaining()) { + ByteBufferUtils.copyFromBufferToBuffer(buff, this.curItem); + } else { + // a slower version + while (buff.hasRemaining()) { + this.put(buff.get()); + } + } + return this; + } + /** * Writes a long to this MBB at its current position. Also advances the position by size of long diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java index 0e45410..b1e591f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java @@ -219,6 +219,12 @@ public class SingleByteBuff extends ByteBuff { } @Override + public ByteBuff put(ByteBuffer buff) { + ByteBufferUtils.copyFromBufferToBuffer(buff, this.buf); + return this; + } + + @Override public SingleByteBuff put(byte[] src) { return put(src, 0, src.length); } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java index af4c464..9658544 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java @@ -403,4 +403,22 @@ public class TestMultiByteBuff { mbb1.get(); // Now we have reached the limit assertFalse(mbb1.hasRemaining()); } + + @Test + public void testPutByteBuffer() { + ByteBuffer b1 = ByteBuffer.allocate(8); + ByteBuffer b2 = ByteBuffer.allocate(8); + ByteBuffer b3 = ByteBuffer.allocate(8); + ByteBuffer toPut = ByteBuffer.allocate(8); + toPut.putInt(100); + toPut.putInt(500); + toPut.flip(); + MultiByteBuff mbb1 = new MultiByteBuff(b1, b2, b3); + mbb1.position(8); + mbb1.put(toPut); + assertEquals(16, mbb1.position()); + mbb1.position(8); + assertEquals(100, mbb1.getInt()); + assertEquals(500, mbb1.getInt()); + } } diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/com/google/protobuf/CodedOutputStream.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/com/google/protobuf/CodedOutputStream.java index 03871c9..9e06140 100644 --- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/com/google/protobuf/CodedOutputStream.java +++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/com/google/protobuf/CodedOutputStream.java @@ -217,6 +217,7 @@ public abstract class CodedOutputStream extends ByteOutput { * Setting this to {@code 0} will disable buffering, requiring an allocation for each encoded * string. */ + // TODO : We need get this public access when we push our changes to PB static CodedOutputStream newInstance(ByteOutput byteOutput, int bufferSize) { if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize must be positive"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java index 9bc8ee7..a453273 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java @@ -100,4 +100,10 @@ public interface RpcCallContext { * @return The system timestamp of deadline. */ long getDeadline(); + + /** + * Indicates if the current rpc call context is for read. The read API need to set this + * @return true if the current rpc call context is for read + */ + boolean isReadRequest(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index a17310c..50eccdd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -93,6 +93,7 @@ import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RequestTooBigException; +import org.apache.hadoop.hbase.io.ByteBuffOutputStream; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferPool; @@ -353,6 +354,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected TraceInfo tinfo; private ByteBufferListOutputStream cellBlockStream = null; private CallCleanup reqCleanup = null; + private CallCleanup responseCleanup = null; private User user; private InetAddress remoteAddress; @@ -402,6 +404,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // got from pool. this.cellBlockStream = null; } + if (responseCleanup != null) { + responseCleanup.run(); + responseCleanup = null; + } cleanup();// If the call was run successfuly, we might have already returned the // BB back to pool. No worries..Then inputCellBlock will be null this.connection.decRpcCount(); // Say that we're done with this call. @@ -509,20 +515,22 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); } Message header = headerBuilder.build(); - ByteBuffer headerBuf = + ByteBuffer[] hdrMsgBuffers = createHeaderAndMessageBytes(result, header, cellBlockSize, cellBlock); ByteBuffer[] responseBufs = null; int cellBlockBufferSize = 0; if (cellBlock != null) { cellBlockBufferSize = cellBlock.size(); - responseBufs = new ByteBuffer[1 + cellBlockBufferSize]; - } else { - responseBufs = new ByteBuffer[1]; } - responseBufs[0] = headerBuf; + responseBufs = new ByteBuffer[hdrMsgBuffers.length + cellBlockBufferSize]; + int i = 0; + // add all the header/result buffers + for (; i < hdrMsgBuffers.length; i++) { + responseBufs[i] = hdrMsgBuffers[i]; + } if (cellBlock != null) { - for (int i = 0; i < cellBlockBufferSize; i++) { - responseBufs[i + 1] = cellBlock.get(i); + for (int j = 0; j < cellBlockBufferSize; j++) { + responseBufs[i++] = cellBlock.get(j); } } bc = new BufferChain(responseBufs); @@ -563,7 +571,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setException(exceptionBuilder.build()); } - private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, + private ByteBuffer[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize, List cellBlock) throws IOException { // Organize the response as a set of bytebuffers rather than collect it all together inside // one big byte array; save on allocations. @@ -572,8 +580,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // the last buffer in the cellblock. This applies to the cellblock created from the // pool or even the onheap cellblock buffer in case there is no pool enabled. // Possible reuse would avoid creating a temporary array for storing the header every time. - ByteBuffer possiblePBBuf = - (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; + ByteBuffer[] buffs = null; int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, resultVintSize = 0; if (header != null) { @@ -586,31 +593,73 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } // calculate the total size int totalSize = headerSerializedSize + headerVintSize - + (resultSerializedSize + resultVintSize) - + cellBlockSize; + + (resultSerializedSize + resultVintSize) + cellBlockSize; int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize + resultVintSize + Bytes.SIZEOF_INT; - // Only if the last buffer has enough space for header use it. Else allocate - // a new buffer. Assume they are all flipped - if (possiblePBBuf != null - && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) { - // duplicate the buffer. This is where the header is going to be written - ByteBuffer pbBuf = possiblePBBuf.duplicate(); - // get the current limit - int limit = pbBuf.limit(); - // Position such that we write the header to the end of the buffer - pbBuf.position(limit); - // limit to the header size - pbBuf.limit(totalPBSize + limit); - // mark the current position - pbBuf.mark(); - writeToCOS(result, header, totalSize, pbBuf); - // reset the buffer back to old position - pbBuf.reset(); - return pbBuf; + // TODO : Should we check for totalPBSize >= minSizeForReservoirUse + if (cellBlockStream == null && isReadRequest() && !isClientCellBlockSupported()) { + // we know that the result that we are processing is for scan/get and the client + // is a non java client. + // We get them as ByteBuffer[] because we need ByteBuffer[] to be returned for the + // BufferChain. + Pair buffers = + allocateByteBufferArrayToReadInto(reservoir, minSizeForReservoirUse, totalPBSize); + buffs = buffers.getFirst(); + responseCleanup = buffers.getSecond(); + ByteBuff byteBuffs; + if (buffs.length > 1) { + byteBuffs = new MultiByteBuff(buffs); + } else { + // We are backed by single BB + byteBuffs = new SingleByteBuff(buffs[0]); + } + byteBuffs.limit(totalPBSize); + CodedOutputStream bcos = + CodedOutputStream.newInstance(new ByteBuffOutputStream(byteBuffs), totalPBSize); + byteBuffs.putInt(totalSize); + if (header != null) { + bcos.writeMessageNoTag(header); + } + if (result != null) { + bcos.writeMessageNoTag(result); + } + bcos.flush(); + if(byteBuffs.hasRemaining()) { + // TODO : Actually bcos should be doing it?? + // or is it ok to check on ByteBuffs directly? + throw new IllegalArgumentException("Not fully written"); + } + // flip all the buffers + for (ByteBuffer buf : buffs) { + buf.flip(); + } } else { - return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); + ByteBuffer possiblePBBuf = (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; + // Only if the last buffer has enough space for header use it. Else allocate + // a new buffer. Assume they are all flipped + buffs = new ByteBuffer[1]; + if (possiblePBBuf != null + && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) { + // duplicate the buffer. This is where the header is going to be written + ByteBuffer pbBuf = possiblePBBuf.duplicate(); + // get the current limit + int limit = pbBuf.limit(); + // Position such that we write the header to the end of the buffer + pbBuf.position(limit); + // limit to the header size + pbBuf.limit(totalPBSize + limit); + // mark the current position + pbBuf.mark(); + writeToCOS(result, header, totalSize, pbBuf); + // reset the buffer back to old position + pbBuf.reset(); + buffs[0] = pbBuf; + } else { + ByteBuffer res = createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); + buffs[0] = res; + } } + return buffs; } private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) @@ -725,6 +774,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } @Override + public synchronized boolean isReadRequest() { + return this.rpcCallback != null; + } + + @Override public String getRequestUserName() { User user = getRequestUser(); return user == null? null: user.getShortName(); @@ -2906,6 +2960,49 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } /** + * This is extracted to a static method for better unit testing. We try to get buffer(s) from pool + * as much as possible. + * + * @param pool The ByteBufferPool to use + * @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer + * need of size below this, create on heap ByteBuffer. + * @param reqLen Bytes count in request + */ + @VisibleForTesting + static Pair allocateByteBufferArrayToReadInto(ByteBufferPool pool, + int minSizeForPoolUse, int reqLen) { + List bbs = new ArrayList((reqLen / pool.getBufferSize()) + 1); + int remain = reqLen; + ByteBuffer buf = null; + while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) { + bbs.add(buf); + remain -= pool.getBufferSize(); + } + ByteBuffer[] bufsFromPool = null; + if (bbs.size() > 0) { + bufsFromPool = new ByteBuffer[bbs.size()]; + bbs.toArray(bufsFromPool); + } + if (remain > 0) { + bbs.add(ByteBuffer.allocate(remain)); + } + ByteBuffer[] items = new ByteBuffer[bbs.size()]; + // convert to array + bbs.toArray(items); + if (bufsFromPool != null) { + final ByteBuffer[] bufsFromPoolFinal = bufsFromPool; + final ByteBuffer[] itemsFinal = items; + return new Pair(itemsFinal, () -> { + // Return back all the BBs to pool + for (int i = 0; i < bufsFromPoolFinal.length; i++) { + pool.putbackBuffer(bufsFromPoolFinal[i]); + } + }); + } + return new Pair(items, null); + } + + /** * Needed for features such as delayed calls. We need to be able to store the current call * so that we can complete it later or ask questions of what is supported by the current ongoing * call. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index a0ac8a2..767ba6f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -82,6 +82,11 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter } @Override + public void write(ByteBuffer b) throws IOException { + write(b, b.position(), b.remaining()); + } + + @Override public void writeInt(int i) throws IOException { out.writeInt(i); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsNonJavaClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsNonJavaClient.java new file mode 100644 index 0000000..2ca0508 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsNonJavaClient.java @@ -0,0 +1,54 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + * Run tests that use the HBase clients; {@link Table}. + * Sets up the HBase mini cluster once at start and runs through all client tests. + * Each creates a table named for the method and does its stuff against that. + * The test ensures that the client acts as non-java client + */ +@Category({LargeTests.class, ClientTests.class}) +@SuppressWarnings ("deprecation") +public class TestFromClientSideAsNonJavaClient extends TestFromClientSide { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Uncomment the following lines if more verbosity is needed for + // debugging (see HBASE-12285 for details). + // ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); + // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); + // ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName()); + conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests + // set no codec - so that it imitates a non java client + conf.set("hbase.client.default.rpc.codec", ""); + // We need more than one region server in this test + TEST_UTIL.startMiniCluster(SLAVES); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java index 6fd65f2..19b6aef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java @@ -23,15 +23,22 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ByteBuffOutputStream; import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -125,6 +132,27 @@ public class TestRpcServer { assertNull(pair.getSecond()); } + @Test + public void testByteBuffCOS() throws IOException { + KeyValue kv1 = + new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), new byte[30]); + ByteBuffer dbb1 = ByteBuffer.allocateDirect(3); + ByteBuffer dbb2 = ByteBuffer.allocateDirect(3); + ByteBuffer dbb3 = ByteBuffer.allocateDirect(3); + ByteBuffer dbb4 = ByteBuffer.allocateDirect(3); + ByteBuffer dbb5 = ByteBuffer.allocateDirect(50); + MultiByteBuff mbb = new MultiByteBuff(dbb1, dbb2, dbb3, dbb4, dbb5); + CellProtos.Cell cell = ProtobufUtil.toCell(kv1); + int serializedSize = cell.getSerializedSize(); + int computeRawVarint32Size = CodedOutputStream.computeRawVarint32Size(serializedSize); + int totalSize = serializedSize + computeRawVarint32Size; + mbb.limit(totalSize); + CodedOutputStream cos = CodedOutputStream.newInstance(new ByteBuffOutputStream(mbb), totalSize); + cos.writeMessageNoTag(cell); + cos.flush(); + assertEquals(totalSize, mbb.position()); + } + private void initPoolWithAllBuffers(ByteBufferPool pool, int maxBuffersInPool) { ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool]; // Just call getBuffer() on pool 'maxBuffersInPool' so as to init all buffers and then put back