diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 1c12e2d..7b433ba 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -215,6 +215,11 @@ public class RpcServer implements RpcServerInterface { protected final boolean tcpKeepAlive; // if T then use keepalives protected final long purgeTimeout; // in milliseconds + // a timeout counter when reading 0 bytes from non-blocking socket + // if readTimeoutCounter = 50, means the timeout is triggered when + // reading 0 bytes 50 times continuously + protected final int readTimeoutCounter; + /** * This flag is used to indicate to sub threads when they should go down. When we call * {@link #startThreads()}, all threads started will consult this flag on whether they should @@ -1131,6 +1136,7 @@ public class RpcServer implements RpcServerInterface { protected String hostAddress; protected int remotePort; ConnectionHeader connectionHeader; + private long totalZeroReadCount = 0; /** * Codec the client asked use. */ @@ -1405,6 +1411,8 @@ public class RpcServer implements RpcServerInterface { * @throws InterruptedException */ public int readAndProcess() throws IOException, InterruptedException { + int zeroReadCount = 0; + while (true) { // Try and read in an int. If new connection, the int will hold the 'HBas' HEADER. If it // does, read in the rest of the connection preamble, the version and the auth method. @@ -1489,6 +1497,8 @@ public class RpcServer implements RpcServerInterface { if (count < 0) { return count; } else if (data.remaining() == 0) { + //reset the counter since it might be connection header + zeroReadCount = 0; dataLengthBuffer.clear(); data.flip(); if (skipInitialSaslHandshake) { @@ -1507,10 +1517,46 @@ public class RpcServer implements RpcServerInterface { continue; } } else if (count > 0) { + //reset the counter + zeroReadCount = 0; // We got some data and there is more to read still; go around again. if (LOG.isTraceEnabled()) LOG.trace("Continue to read rest of data " + data.remaining()); continue; + } else if (count == 0) { + // we see some cases that reader is stuck at the 3rd channelRead call - + // the reader cannot read the full data(i.e. data.remaining() == 0). + // since we read from non-blocking socket, according to the code analysis, + // it is probably caused by zero read count, so the while loops again and again. + // (the 'count > 0' case looks not possible as we can eventually read the full data) + + // note at this point the 'data' buffer has been allocated, means we already + // read the length of the client data, whether it is a connection header + // or RPC request, we expect to read 'dataLength' bytes of data - if we + // cannot, there must be something wrong. add a simple timeout here to + // avoid the dead loop - by default if 50 continuous reads all get 0 count, + // we think it is a problem. + // 5s(50*100) looks long enough as we are reading from non-blocking socket + zeroReadCount++; + this.totalZeroReadCount++; + Thread.sleep(100); + + if (zeroReadCount == readTimeoutCounter) { + LOG.error("zeroReadTimeout! totalZeroReadCount: " + this.totalZeroReadCount + + " connectionPreambleRead: " + connectionPreambleRead + + " connectionHeaderRead: " + connectionHeaderRead + + " data.remaining: " + data.remaining() + + " data.capacity: " + data.capacity() + + " dataLengthBuffer.remaining: " + dataLengthBuffer.remaining() + + " hostAddress: " + this.hostAddress + + " remotePort: " + this.remotePort); + + // the caller will close the connection. + throw new IOException("zeroReadTimeout!"); + } + + continue; } + return count; } } @@ -1855,6 +1901,7 @@ public class RpcServer implements RpcServerInterface { this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.readTimeoutCounter = conf.getInt("ipc.server.read.timeout.counter", 50); this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);