diff --git src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 4621109..1365411 100644 --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -33,14 +33,10 @@ import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; -import java.util.Hashtable; import java.util.Iterator; -import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.net.SocketFactory; @@ -280,7 +276,7 @@ public class HBaseClient { * otherwise, throw the timeout exception. */ private void handleTimeout(SocketTimeoutException e) throws IOException { - if (shouldCloseConnection.get() || !running.get() || + if (shouldCloseConnection.get() || !running.get() || remoteId.rpcTimeout > 0) { throw e; } @@ -552,14 +548,24 @@ public class HBaseClient { touch(); try { - int id = in.readInt(); // try to read an id + // See HBaseServer.Call.setResponse for where we write out the response. + // It writes the call.id (int), a flag byte, then optionally the length + // of the response (int) followed by data. + + // Read the call id. + int id = in.readInt(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + id); - Call call = calls.remove(id); - boolean isError = in.readBoolean(); // read if error + // Read the flag byte + byte flag = in.readByte(); + boolean isError = ResponseFlag.isError(flag); + if (ResponseFlag.isLength(flag)) { + // Currently length if present is unused. + in.readInt(); + } if (isError) { //noinspection ThrowableInstanceNeverThrown call.setException(new RemoteException( WritableUtils.readString(in), diff --git src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 36b0560..1db05cb 100644 --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -298,7 +298,8 @@ public abstract class HBaseServer implements RpcServer { if (result instanceof WritableWithSize) { // get the size hint. WritableWithSize ohint = (WritableWithSize) result; - long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT; + long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + + (2 * Bytes.SIZEOF_INT); if (hint > Integer.MAX_VALUE) { // oops, new problem. IOException ioe = @@ -313,8 +314,15 @@ public abstract class HBaseServer implements RpcServer { ByteBufferOutputStream buf = new ByteBufferOutputStream(size); DataOutputStream out = new DataOutputStream(buf); try { - out.writeInt(this.id); // write call id - out.writeBoolean(error != null); // write error flag + // Call id. + out.writeInt(this.id); + // Write flag. + byte flag = (error != null)? + ResponseFlag.getErrorAndLengthSet(): ResponseFlag.getLengthSetOnly(); + out.writeByte(flag); + // Place holder for length set later below after we + // fill the buffer with data. + out.writeInt(0xdeadbeef); } catch (IOException e) { errorClass = e.getClass().getName(); error = StringUtils.stringifyException(e); @@ -331,7 +339,16 @@ public abstract class HBaseServer implements RpcServer { LOG.warn("Error sending response to call: ", e); } - this.response = buf.getByteBuffer(); + // Set the length into the ByteBuffer after call id and after + // byte flag. + ByteBuffer bb = buf.getByteBuffer(); + int bufSiz = bb.remaining(); + // Move to the size location in our ByteBuffer past call.id + // and past the byte flag. + bb.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE); + bb.putInt(bufSiz); + bb.position(0); + this.response = bb; } @Override diff --git src/main/java/org/apache/hadoop/hbase/ipc/ResponseFlag.java src/main/java/org/apache/hadoop/hbase/ipc/ResponseFlag.java new file mode 100644 index 0000000..112caef --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/ipc/ResponseFlag.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +/** + * Utility for managing the flag byte passed in response to a + * {@link HBaseServer.Call} + */ +class ResponseFlag { + private static final byte ERROR_BIT = 0x1; + private static final byte LENGTH_BIT = 0x2; + + private ResponseFlag() { + // Make it so this class cannot be constructed. + } + + static boolean isError(final byte flag) { + return (flag & ERROR_BIT) != 0; + } + + static boolean isLength(final byte flag) { + return (flag & LENGTH_BIT) != 0; + } + + static byte getLengthSetOnly() { + return LENGTH_BIT; + } + + static byte getErrorAndLengthSet() { + return LENGTH_BIT & ERROR_BIT; + } +} \ No newline at end of file