Uploaded image for project: 'Hadoop Common'
  1. Hadoop Common
  2. HADOOP-15720

rpcTimeout may not have been applied correctly

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: common
    • Labels:
      None

      Description

      org.apache.hadoop.ipc.Client send multiple RPC calls to server synchronously via the same connection as in the following synchronized code block:

            synchronized (sendRpcRequestLock) {
              Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
                @Override
                public void run() {
                  try {
                    synchronized (Connection.this.out) {
                      if (shouldCloseConnection.get()) {
                        return;
                      }
                      
                      if (LOG.isDebugEnabled()) {
                        LOG.debug(getName() + " sending #" + call.id
                            + " " + call.rpcRequest);
                      }
               
                      byte[] data = d.getData();
                      int totalLength = d.getLength();
                      out.writeInt(totalLength); // Total Length
                      out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
                      out.flush();
                    }
                  } catch (IOException e) {
                    // exception at this point would leave the connection in an
                    // unrecoverable state (eg half a call left on the wire).
                    // So, close the connection, killing any outstanding calls
                    markClosed(e);
                  } finally {
                    //the buffer is just an in-memory buffer, but it is still polite to
                    // close early
                    IOUtils.closeStream(d);
                  }
                }
              });
            
              try {
                senderFuture.get();
              } catch (ExecutionException e) {
                Throwable cause = e.getCause();
                
                // cause should only be a RuntimeException as the Runnable above
                // catches IOException
                if (cause instanceof RuntimeException) {
                  throw (RuntimeException) cause;
                } else {
                  throw new RuntimeException("unexpected checked exception", cause);
                }
              }
            }
      

      And it then waits for result asynchronously via

          /* Receive a response.
           * Because only one receiver, so no synchronization on in.
           */
          private void receiveRpcResponse() {
            if (shouldCloseConnection.get()) {
              return;
            }
            touch();
            
            try {
              int totalLen = in.readInt();
              RpcResponseHeaderProto header = 
                  RpcResponseHeaderProto.parseDelimitedFrom(in);
              checkResponse(header);
      
              int headerLen = header.getSerializedSize();
              headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
      
              int callId = header.getCallId();
              if (LOG.isDebugEnabled())
                LOG.debug(getName() + " got value #" + callId);
      
              Call call = calls.get(callId);
              RpcStatusProto status = header.getStatus();
      ......
      

      However, we can see that the call returned by receiveRpcResonse() above may be in any order.

      The following code

              int totalLen = in.readInt();
      

      eventually calls one of the following two methods, where rpcTimeOut is checked against:

            /** Read a byte from the stream.
             * Send a ping if timeout on read. Retries if no failure is detected
             * until a byte is read.
             * @throws IOException for any IO problem other than socket timeout
             */
            @Override
            public int read() throws IOException {
              int waiting = 0;
              do {
                try {
                  return super.read();
                } catch (SocketTimeoutException e) {
                  waiting += soTimeout;
                  handleTimeout(e, waiting);
                }
              } while (true);
            }
      
            /** Read bytes into a buffer starting from offset <code>off</code>
             * Send a ping if timeout on read. Retries if no failure is detected
             * until a byte is read.
             * 
             * @return the total number of bytes read; -1 if the connection is closed.
             */
            @Override
            public int read(byte[] buf, int off, int len) throws IOException {
              int waiting = 0;
              do {
                try {
                  return super.read(buf, off, len);
                } catch (SocketTimeoutException e) {
                  waiting += soTimeout;
                  handleTimeout(e, waiting);
                }
              } while (true);
            }
      

      But the waiting time is always initialized to 0 for each of the above read calls, so each call can take up to rpcTimeout. And the real time to time out a call appears to be accumulative.

      For example, if the client issue call1, call2, then it waits for result, if the first call1 took (rpcTimeout - 1), thus no time out, the second took (rpcTimeout -1), thus no timeout, but it effectively took 2*(rpcTimeout -1) which could be much bigger than rpcTimeout and should time out.

      Worst case is that a RPC may take indeterminatey long and doesn't time out.

      It seems more accurate to remember the time that an RPC is sent to the server, and then check time out here:

        public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
            ConnectionId remoteId, int serviceClass,
            AtomicBoolean fallbackToSimpleAuth) throws IOException {
          final Call call = createCall(rpcKind, rpcRequest);
          Connection connection = getConnection(remoteId, call, serviceClass,
            fallbackToSimpleAuth);
          try {
            connection.sendRpcRequest(call);                 // send the rpc request
          } catch (RejectedExecutionException e) {
            throw new IOException("connection has been closed", e);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.warn("interrupted waiting to send rpc request to server", e);
            throw new IOException(e);
          }
      
          synchronized (call) {
            while (!call.done) {
              try {
                call.wait();                           // wait for the result
              } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new InterruptedIOException("Call interrupted");
              } <=should check how long it has waited here, time out if rpcTimeout has been reached
            }  
      
            if (call.error != null) {
              if (call.error instanceof RemoteException) {
                call.error.fillInStackTrace();
                throw call.error;
              } else { // local exception
                InetSocketAddress address = connection.getRemoteAddress();
                throw NetUtils.wrapException(address.getHostName(),
                        address.getPort(),
                        NetUtils.getHostname(),
                        0,
                        call.error);
              }
            } else {
              return call.getRpcResponse();
            }
          }
        }
      

      basically we should change the call highlighted above from

          public final void wait() throws InterruptedException
      

      to

      public final void wait(long timeout, int nanos) throws InterruptedException
      

      and apply rpcTimeout as the parameter value here (notice that I'm ignoring the time needed to send rpc over to the server, and ideally we should include that too, so rpcTimeout could mean what it intends to mean).

      Hi Daryn Sharp and Kihwal Lee, would you please help taking a look at my above analysis to see if I have any misunderstanding here?

      Thanks a lot.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              yzhangal Yongjun Zhang
            • Votes:
              0 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

              • Created:
                Updated: