Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1502643) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -377,7 +377,7 @@ int bufSiz = bb.remaining(); // Move to the size location in our ByteBuffer past call.id // and past the byte flag. - bb.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE); + bb.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE); bb.putInt(bufSiz); bb.position(0); this.response = bb; @@ -428,7 +428,7 @@ public synchronized boolean isReturnValueDelayed() { return this.delayReturnValue; } - + @Override public void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException { if (!connection.channel.isOpen()) { @@ -967,12 +967,12 @@ int numBytes = channelWrite(channel, call.response); if (numBytes < 0) { // Error flag is set, so returning here closes connection and - // clears responseQueue. + // clears responseQueue. return true; } if (!call.response.hasRemaining()) { responseQueue.poll(); - responseQueuesSizeThrottler.decrease(call.response.limit()); + responseQueuesSizeThrottler.decrease(call.response.limit()); call.connection.decRpcCount(); //noinspection RedundantIfStatement if (numElements == 1) { // last call fully processes. @@ -1062,7 +1062,7 @@ // increased responseQueues size already. It shoud be // decreased here. responseQueuesSizeThrottler.decrease(call.response.remaining()); - } + } } private synchronized void incPending() { // call waiting to be enqueued. @@ -1177,8 +1177,8 @@ int count; if (dataLengthBuffer.remaining() > 0) { count = channelRead(channel, dataLengthBuffer); - if (count < 0 || dataLengthBuffer.remaining() > 0) - return count; + if (count < 0) return count; + if (dataLengthBuffer.remaining() > 0) return count; } if (!versionRead) { @@ -1398,7 +1398,7 @@ Call call = myCallQueue.take(); // pop the queue; maybe blocked here updateCallQueueLenMetrics(myCallQueue); status.setStatus("Setting up call"); - status.setConnection(call.connection.getHostAddress(), + status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); if (LOG.isDebugEnabled()) @@ -1423,7 +1423,7 @@ RequestContext.set(call.connection.ticket, getRemoteIp(), call.connection.protocol); // make the call - value = call(call.connection.protocol, call.param, call.timestamp, + value = call(call.connection.protocol, call.param, call.timestamp, status); } catch (Throwable e) { LOG.debug(getName()+", call "+call+": error: " + e, e); @@ -1550,7 +1550,7 @@ this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - this.numOfReplicationHandlers = + this.numOfReplicationHandlers = conf.getInt("hbase.regionserver.replication.handler.count", 3); if (numOfReplicationHandlers > 0) { this.replicationQueue = new LinkedBlockingQueue(maxQueueSize); @@ -1632,7 +1632,7 @@ } connection.responseQueue.clear(); } - responseQueuesSizeThrottler.decrease(bytes); + responseQueuesSizeThrottler.decrease(bytes); rpcMetrics.numOpenConnections.set(numConnections); }