  1. Hadoop Common
  2. HADOOP-15530

RPC could stuck at senderFuture.get()



      In Client.java, sendRpcRequest does the following

         /** Initiates a rpc call by sending the rpc request to the remote server.
           * Note: this is not called from the Connection thread, but by other
           * threads.
           * @param call - the rpc request
          public void sendRpcRequest(final Call call)
              throws InterruptedException, IOException {
            if (shouldCloseConnection.get()) {
            // Serialize the call to be sent. This is done from the actual
            // caller thread, rather than the sendParamsExecutor thread,
            // so that if the serialization throws an error, it is reported
            // properly. This also parallelizes the serialization.
            // Format of a call on the wire:
            // 0) Length of rest below (1 + 2)
            // 1) RpcRequestHeader  - is serialized Delimited hence contains length
            // 2) RpcRequest
            // Items '1' and '2' are prepared here. 
            RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
                call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
            final ResponseBuffer buf = new ResponseBuffer();
            synchronized (sendRpcRequestLock) {
              Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
                public void run() {
                  try {
                    synchronized (ipcStreams.out) {
                      if (shouldCloseConnection.get()) {
                      if (LOG.isDebugEnabled()) {
                        LOG.debug(getName() + " sending #" + call.id
                            + " " + call.rpcRequest);
                      // RpcRequestHeader + RpcRequest
                  } catch (IOException e) {
                    // exception at this point would leave the connection in an
                    // unrecoverable state (eg half a call left on the wire).
                    // So, close the connection, killing any outstanding calls
                  } finally {
                    //the buffer is just an in-memory buffer, but it is still polite to
                    // close early
              try {
              } catch (ExecutionException e) {
                Throwable cause = e.getCause();
                // cause should only be a RuntimeException as the Runnable above
                // catches IOException
                if (cause instanceof RuntimeException) {
                  throw (RuntimeException) cause;
                } else {
                  throw new RuntimeException("unexpected checked exception", cause);

      It's observed that the call can be stuck at senderFuture.get(); with the following stack

      "Thread-13" #40 prio=5 os_prio=0 tid=0x000000000fb0d000 nid=0xf189c waiting on condition [0x00007f697c582000]
         java.lang.Thread.State: WAITING (parking)
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x00000006187e5ec0> (a java.util.concurrent.FutureTask)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
              at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
              at java.util.concurrent.FutureTask.get(FutureTask.java:191)
              at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1088)
              - locked <0x00000006215c1e08> (a java.lang.Object)
              at org.apache.hadoop.ipc.Client.call(Client.java:1483)
              at org.apache.hadoop.ipc.Client.call(Client.java:1441)
              at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
              at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
              at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:266)
              at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
              at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
              at java.lang.reflect.Method.invoke(Method.java:498)
              at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
              at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
              at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
              at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1323)
              at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1310)
              at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1298)
              at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309)
              at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:275)
              - locked <0x00000006187e5530> (a java.lang.Object)
              at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:267)
              at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1629)
              at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:338)
              at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:334)
              at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
              at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:334)

      Given that we support rpcTimeOut, we could chose the second method of Future below:

           * Waits if necessary for the computation to complete, and then
           * retrieves its result.
           * @return the computed result
           * @throws CancellationException if the computation was cancelled
           * @throws ExecutionException if the computation threw an
           * exception
           * @throws InterruptedException if the current thread was interrupted
           * while waiting
          V get() throws InterruptedException, ExecutionException;
           * Waits if necessary for at most the given time for the computation
           * to complete, and then retrieves its result, if available.
           * @param timeout the maximum time to wait
           * @param unit the time unit of the timeout argument
           * @return the computed result
           * @throws CancellationException if the computation was cancelled
           * @throws ExecutionException if the computation threw an
           * exception
           * @throws InterruptedException if the current thread was interrupted
           * while waiting
           * @throws TimeoutException if the wait timed out
          V get(long timeout, TimeUnit unit)
              throws InterruptedException, ExecutionException, TimeoutException;

      In theory, since the RPC at client is serialized, we could just use the main thread to do the execution, instead of using a threadpool to create new thread. This can be discussed in a separate jira.

      And why the RPC is not processed and returned by NN is another topic (HADOOP-15538).


