.../java/org/apache/hadoop/hbase/nio/ByteBuff.java | 16 ++ .../org/apache/hadoop/hbase/nio/MultiByteBuff.java | 25 +++ .../apache/hadoop/hbase/nio/SingleByteBuff.java | 12 ++ .../apache/hadoop/hbase/nio/TestMultiByteBuff.java | 34 ++++ .../com/google/protobuf/CodedOutputStream.java | 2 +- .../apache/hadoop/hbase/ipc/RpcCallContext.java | 11 ++ .../org/apache/hadoop/hbase/ipc/RpcServer.java | 201 +++++++++++++++++---- .../hadoop/hbase/regionserver/RSRpcServices.java | 20 +- .../client/TestFromClientSideAsNonJavaClient.java | 54 ++++++ .../org/apache/hadoop/hbase/ipc/TestRpcServer.java | 1 - 10 files changed, 339 insertions(+), 37 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java index 60202a0..0bbbe93 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java @@ -410,6 +410,22 @@ public abstract class ByteBuff { */ public abstract int read(ReadableByteChannel channel) throws IOException; + /** + * Copies from the given bytebuffer 'buff' to this ByteBuff. The position of this ByteBuff will be + * incremented to the limit of the given buffer and the position of the given 'buff' will be now + * set to the limit. + * @param buff the bytebuffer to copy + * @return this ByteBuffer + */ + public abstract ByteBuff put(ByteBuffer buff); + + /** + * Similar to {@link ByteBuffer}.flip(), ensures that this ByteBuff is flipped such that the limit + * is set to current position and the position is reset to 0. + * @return this ByteBuff + */ + public abstract ByteBuff flip(); + // static helper methods public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException { if (buf.remaining() <= NIO_BUFFER_LIMIT) { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java index 948321d..ab01695 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java @@ -424,6 +424,18 @@ public class MultiByteBuff extends ByteBuff { return this; } + @Override + public MultiByteBuff flip() { + this.limit = position(); + for (int i = 0; i < this.items.length; i++) { + this.items[i].flip(); + } + this.curItemIndex = 0; + this.curItem = this.items[this.curItemIndex]; + this.markedItemIndex = -1; + return this; + } + /** * Marks the current position of the MBB * @return this object @@ -856,6 +868,19 @@ public class MultiByteBuff extends ByteBuff { return this; } + @Override + public ByteBuff put(ByteBuffer buff) { + if (this.curItem.remaining() >= buff.remaining()) { + ByteBufferUtils.copyFromBufferToBuffer(buff, this.curItem); + } else { + // a slower version + while (buff.hasRemaining()) { + this.put(buff.get()); + } + } + return this; + } + /** * Writes a long to this MBB at its current position. Also advances the position by size of long diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java index 0e45410..fd0699e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java @@ -110,6 +110,12 @@ public class SingleByteBuff extends ByteBuff { } @Override + public ByteBuff flip() { + this.buf.flip(); + return this; + } + + @Override public ByteBuffer asSubByteBuffer(int length) { // Just return the single BB that is available return this.buf; @@ -219,6 +225,12 @@ public class SingleByteBuff extends ByteBuff { } @Override + public ByteBuff put(ByteBuffer buff) { + ByteBufferUtils.copyFromBufferToBuffer(buff, this.buf); + return this; + } + + @Override public SingleByteBuff put(byte[] src) { return put(src, 0, src.length); } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java index af4c464..8bc6e93 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java @@ -403,4 +403,38 @@ public class TestMultiByteBuff { mbb1.get(); // Now we have reached the limit assertFalse(mbb1.hasRemaining()); } + + @Test + public void testPutByteBuffer() { + ByteBuffer b1 = ByteBuffer.allocate(8); + ByteBuffer b2 = ByteBuffer.allocate(8); + ByteBuffer b3 = ByteBuffer.allocate(8); + ByteBuffer toPut = ByteBuffer.allocate(8); + toPut.putInt(100); + toPut.putInt(500); + toPut.flip(); + MultiByteBuff mbb1 = new MultiByteBuff(b1, b2, b3); + mbb1.position(8); + mbb1.put(toPut); + assertEquals(16, mbb1.position()); + mbb1.position(8); + assertEquals(100, mbb1.getInt()); + assertEquals(500, mbb1.getInt()); + } + + @Test + public void testFlip() { + ByteBuffer b1 = ByteBuffer.allocate(8); + ByteBuffer b2 = ByteBuffer.allocate(8); + ByteBuffer b3 = ByteBuffer.allocate(8); + MultiByteBuff mbb1 = new MultiByteBuff(b1, b2, b3); + mbb1.putLong(12l); + mbb1.putLong(13l); + mbb1.putInt(15); + assertEquals(20, mbb1.position()); + assertEquals(24, mbb1.limit()); + mbb1.flip(); + assertEquals(0, mbb1.position()); + assertEquals(20, mbb1.limit()); + } } diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/com/google/protobuf/CodedOutputStream.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/com/google/protobuf/CodedOutputStream.java index 03871c9..48c6eb4 100644 --- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/com/google/protobuf/CodedOutputStream.java +++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/com/google/protobuf/CodedOutputStream.java @@ -217,7 +217,7 @@ public abstract class CodedOutputStream extends ByteOutput { * Setting this to {@code 0} will disable buffering, requiring an allocation for each encoded * string. */ - static CodedOutputStream newInstance(ByteOutput byteOutput, int bufferSize) { + public static CodedOutputStream newInstance(ByteOutput byteOutput, int bufferSize) { if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize must be positive"); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java index 9bc8ee7..118a861 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java @@ -100,4 +100,15 @@ public interface RpcCallContext { * @return The system timestamp of deadline. */ long getDeadline(); + + /** + * Indicates if the current rpc call context is for read. The read API need to set this + * @return true if the current rpc call context is for read + */ + boolean isReadRequest(); + + /** + * Indicates that this current rpc call context is for read + */ + void setIsReadRequest(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index a17310c..a727b56 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -150,6 +150,7 @@ import org.codehaus.jackson.map.ObjectMapper; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteOutput; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; @@ -313,7 +314,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // We make this to be 1/6th of the pool buffer size. private final int minSizeForReservoirUse; - private volatile boolean allowFallbackToSimpleAuth; + private volatile boolean allowFallbackToSimpleAuth; /** * Used to get details for scan with a scanner_id
@@ -341,6 +342,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected int timeout; protected long startTime; protected long deadline;// the deadline to handle this call, if exceed we can drop it. + protected boolean readRequest; /** * Chain of buffers to send as response. @@ -353,6 +355,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected TraceInfo tinfo; private ByteBufferListOutputStream cellBlockStream = null; private CallCleanup reqCleanup = null; + private CallCleanup responseCleanup = null; private User user; private InetAddress remoteAddress; @@ -402,6 +405,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // got from pool. this.cellBlockStream = null; } + if (responseCleanup != null) { + responseCleanup.run(); + responseCleanup = null; + } cleanup();// If the call was run successfuly, we might have already returned the // BB back to pool. No worries..Then inputCellBlock will be null this.connection.decRpcCount(); // Say that we're done with this call. @@ -509,20 +516,22 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); } Message header = headerBuilder.build(); - ByteBuffer headerBuf = + ByteBuffer[] hdrMsgBuffers = createHeaderAndMessageBytes(result, header, cellBlockSize, cellBlock); ByteBuffer[] responseBufs = null; int cellBlockBufferSize = 0; if (cellBlock != null) { cellBlockBufferSize = cellBlock.size(); - responseBufs = new ByteBuffer[1 + cellBlockBufferSize]; - } else { - responseBufs = new ByteBuffer[1]; } - responseBufs[0] = headerBuf; + responseBufs = new ByteBuffer[hdrMsgBuffers.length + cellBlockBufferSize]; + int i = 0; + // add all the header/result buffers + for (; i < hdrMsgBuffers.length; i++) { + responseBufs[i] = hdrMsgBuffers[i]; + } if (cellBlock != null) { - for (int i = 0; i < cellBlockBufferSize; i++) { - responseBufs[i + 1] = cellBlock.get(i); + for (int j = 0; j < cellBlockBufferSize; j++) { + responseBufs[i++] = cellBlock.get(j); } } bc = new BufferChain(responseBufs); @@ -563,7 +572,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setException(exceptionBuilder.build()); } - private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, + private ByteBuffer[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize, List cellBlock) throws IOException { // Organize the response as a set of bytebuffers rather than collect it all together inside // one big byte array; save on allocations. @@ -572,8 +581,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // the last buffer in the cellblock. This applies to the cellblock created from the // pool or even the onheap cellblock buffer in case there is no pool enabled. // Possible reuse would avoid creating a temporary array for storing the header every time. - ByteBuffer possiblePBBuf = - (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; + ByteBuffer[] buffs = null; int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, resultVintSize = 0; if (header != null) { @@ -586,31 +594,72 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } // calculate the total size int totalSize = headerSerializedSize + headerVintSize - + (resultSerializedSize + resultVintSize) - + cellBlockSize; + + (resultSerializedSize + resultVintSize) + cellBlockSize; int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize + resultVintSize + Bytes.SIZEOF_INT; - // Only if the last buffer has enough space for header use it. Else allocate - // a new buffer. Assume they are all flipped - if (possiblePBBuf != null - && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) { - // duplicate the buffer. This is where the header is going to be written - ByteBuffer pbBuf = possiblePBBuf.duplicate(); - // get the current limit - int limit = pbBuf.limit(); - // Position such that we write the header to the end of the buffer - pbBuf.position(limit); - // limit to the header size - pbBuf.limit(totalPBSize + limit); - // mark the current position - pbBuf.mark(); - writeToCOS(result, header, totalSize, pbBuf); - // reset the buffer back to old position - pbBuf.reset(); - return pbBuf; + // TODO : Should we check for totalPBSize >= minSizeForReservoirUse + if (cellBlockStream == null && isReadRequest() && !isClientCellBlockSupported()) { + // we know that the result that we are processing is for scan/get and the client + // is a non java client. + // We get them as ByteBuffer[] because we need ByteBuffer[] to be returned for the + // BufferChain. + Pair buffers = + allocateByteBufferArrayToReadInto(reservoir, minSizeForReservoirUse, totalPBSize); + buffs = buffers.getFirst(); + responseCleanup = buffers.getSecond(); + ByteBuff byteBuffs; + if (buffs.length > 1) { + byteBuffs = new MultiByteBuff(buffs); + } else { + // We are backed by single BB + byteBuffs = new SingleByteBuff(buffs[0]); + } + byteBuffs.limit(totalPBSize); + // are we making use of this 'totalPBSize'?? + CodedOutputStream bcos = + CodedOutputStream.newInstance(new ByteBuffedByteOutput(byteBuffs), totalPBSize); + byteBuffs.putInt(totalSize); + if (header != null) { + bcos.writeMessageNoTag(header); + } + if (result != null) { + bcos.writeMessageNoTag(result); + } + bcos.flush(); + if(byteBuffs.hasRemaining()) { + // TODO : Actually bcos should be doing it?? + // or is it ok to check on ByteBuffs directly? + throw new IllegalArgumentException("Not fully written"); + } + // We will add this or just directly call flip on the buffs[]? + byteBuffs.flip(); } else { - return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); + ByteBuffer possiblePBBuf = (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; + // Only if the last buffer has enough space for header use it. Else allocate + // a new buffer. Assume they are all flipped + buffs = new ByteBuffer[1]; + if (possiblePBBuf != null + && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) { + // duplicate the buffer. This is where the header is going to be written + ByteBuffer pbBuf = possiblePBBuf.duplicate(); + // get the current limit + int limit = pbBuf.limit(); + // Position such that we write the header to the end of the buffer + pbBuf.position(limit); + // limit to the header size + pbBuf.limit(totalPBSize + limit); + // mark the current position + pbBuf.mark(); + writeToCOS(result, header, totalSize, pbBuf); + // reset the buffer back to old position + pbBuf.reset(); + buffs[0] = pbBuf; + } else { + ByteBuffer res = createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); + buffs[0] = res; + } } + return buffs; } private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) @@ -725,6 +774,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } @Override + public boolean isReadRequest() { + return readRequest; + } + + @Override + public void setIsReadRequest() { + this.readRequest = true; + } + + @Override public String getRequestUserName() { User user = getRequestUser(); return user == null? null: user.getShortName(); @@ -2449,6 +2508,41 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return pool.getBufferSize() / 6; } + private static class ByteBuffedByteOutput extends ByteOutput { + + private ByteBuff buff; + + public ByteBuffedByteOutput(ByteBuff buff) { + this.buff = buff; + } + + @Override + public void write(byte value) throws IOException { + this.buff.put(value); + } + + @Override + public void write(byte[] value, int offset, int length) throws IOException { + this.buff.put(value, offset, length); + } + + @Override + public void writeLazy(byte[] value, int offset, int length) throws IOException { + write(value, offset, length); + } + + @Override + public void write(ByteBuffer value) throws IOException { + this.buff.put(value); + } + + @Override + public void writeLazy(ByteBuffer value) throws IOException { + write(value); + } + + } + @Override public void onConfigurationChange(Configuration newConf) { initReconfigurable(newConf); @@ -2906,6 +3000,49 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } /** + * This is extracted to a static method for better unit testing. We try to get buffer(s) from pool + * as much as possible. + * + * @param pool The ByteBufferPool to use + * @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer + * need of size below this, create on heap ByteBuffer. + * @param reqLen Bytes count in request + */ + @VisibleForTesting + static Pair allocateByteBufferArrayToReadInto(ByteBufferPool pool, + int minSizeForPoolUse, int reqLen) { + List bbs = new ArrayList((reqLen / pool.getBufferSize()) + 1); + int remain = reqLen; + ByteBuffer buf = null; + while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) { + bbs.add(buf); + remain -= pool.getBufferSize(); + } + ByteBuffer[] bufsFromPool = null; + if (bbs.size() > 0) { + bufsFromPool = new ByteBuffer[bbs.size()]; + bbs.toArray(bufsFromPool); + } + if (remain > 0) { + bbs.add(ByteBuffer.allocate(remain)); + } + ByteBuffer[] items = new ByteBuffer[bbs.size()]; + // convert to array + bbs.toArray(items); + if (bufsFromPool != null) { + final ByteBuffer[] bufsFromPoolFinal = bufsFromPool; + final ByteBuffer[] itemsFinal = items; + return new Pair(itemsFinal, () -> { + // Return back all the BBs to pool + for (int i = 0; i < bufsFromPoolFinal.length; i++) { + pool.putbackBuffer(bufsFromPoolFinal[i]); + } + }); + } + return new Pair(items, null); + } + + /** * Needed for features such as delayed calls. We need to be able to store the current call * so that we can complete it later or ask questions of what is supported by the current ongoing * call. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index a61a9f2..45fd74a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -485,10 +485,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } private void addResults(final ScanResponse.Builder builder, final List results, - final RpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { + final RpcController controller, boolean isDefaultRegion, RpcCallContext context) { builder.setStale(!isDefaultRegion); if (results == null || results.isEmpty()) return; - if (clientCellBlockSupported) { + // set read request - should we only set this if cellblock is not supported? + setReadRequest(context); + if (isClientCellBlockSupport(context)) { for (Result res : results) { builder.addCellsPerResult(res.size()); builder.addPartialFlagPerResult(res.isPartial()); @@ -767,6 +769,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } if (action.hasGet()) { long before = EnvironmentEdgeManager.currentTime(); + // Better to mark this as read request and even if there are + // multiGets and append/increment still we have results being + // sent back to the client. So it is fine to mark them as + // read request?? + setReadRequest(context); try { Get get = ProtobufUtil.toGet(action.getGet()); if (context != null) { @@ -2297,6 +2304,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } else if (r != null) { ClientProtos.Result pbr; RpcCallContext call = RpcServer.getCurrentCall(); + setReadRequest(call); if (isClientCellBlockSupport(call) && controller instanceof HBaseRpcController && VersionInfoUtil.hasMinimumVersion(call.getClientVersionInfo(), 1, 3)) { pbr = ProtobufUtil.toResultNoData(r); @@ -2323,6 +2331,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + private void setReadRequest(RpcCallContext context) { + if (context != null) { + context.setIsReadRequest(); + } + } + private Result get(Get get, HRegion region, RegionScannersCloseCallBack closeCallBack, RpcCallContext context) throws IOException { region.prepareGet(get); @@ -2989,7 +3003,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } else { addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), - isClientCellBlockSupport(context)); + context); } } catch (IOException e) { // The scanner state might be left in a dirty state, so we will tell the Client to diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsNonJavaClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsNonJavaClient.java new file mode 100644 index 0000000..2ca0508 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsNonJavaClient.java @@ -0,0 +1,54 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + * Run tests that use the HBase clients; {@link Table}. + * Sets up the HBase mini cluster once at start and runs through all client tests. + * Each creates a table named for the method and does its stuff against that. + * The test ensures that the client acts as non-java client + */ +@Category({LargeTests.class, ClientTests.class}) +@SuppressWarnings ("deprecation") +public class TestFromClientSideAsNonJavaClient extends TestFromClientSide { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Uncomment the following lines if more verbosity is needed for + // debugging (see HBASE-12285 for details). + // ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); + // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); + // ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName()); + conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests + // set no codec - so that it imitates a non java client + conf.set("hbase.client.default.rpc.codec", ""); + // We need more than one region server in this test + TEST_UTIL.startMiniCluster(SLAVES); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java index 9f3bd94..6fd65f2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java @@ -41,7 +41,6 @@ public class TestRpcServer { @Test public void testAllocateByteBuffToReadInto() throws Exception { - System.out.println(Long.MAX_VALUE); int maxBuffersInPool = 10; ByteBufferPool pool = new ByteBufferPool(6 * 1024, maxBuffersInPool); initPoolWithAllBuffers(pool, maxBuffersInPool);